import type { NonEmptyArray } from "../Array.ts" import * as Context from "../Context.ts" import type { Effect } from "../Effect.ts" import type { Exit } from "../Exit.ts" import type { Fiber } from "../Fiber.ts" import { dual } from "../Function.ts" import type * as Request from "../Request.ts" import { makeEntry } from "../Request.ts" import type { RequestResolver } from "../RequestResolver.ts" import { Scheduler } from "../Scheduler.ts" import { exitDie, isEffect } from "./core.ts" import * as effect from "./effect.ts" /** @internal */ export const request: { ( resolver: RequestResolver | Effect, EX, RX> ): (self: A) => Effect< Request.Success, Request.Error | EX, Request.Services | RX > ( self: A, resolver: RequestResolver | Effect, EX, RX> ): Effect< Request.Success, Request.Error | EX, Request.Services | RX > } = dual( 2, ( self: A, resolver: RequestResolver | Effect, EX, RX> ): Effect< Request.Success, Request.Error | EX, Request.Services | RX > => { const withResolver = (resolver: RequestResolver) => effect.callback< Request.Success, Request.Error, Request.Services >((resume) => { const entry = addEntry(resolver, self, resume, effect.getCurrentFiber()!) return maybeRemoveEntry(resolver, entry) }) return isEffect(resolver) ? effect.flatMap(resolver, withResolver) : withResolver(resolver) } ) /** @internal */ export const requestUnsafe = ( self: A, options: { readonly resolver: RequestResolver readonly onExit: (exit: Exit, Request.Error>) => void readonly context: Context.Context } ): () => void => { const entry = addEntry(options.resolver, self, options.onExit, { context: options.context, currentScheduler: Context.get(options.context, Scheduler) }) return () => removeEntryUnsafe(options.resolver, entry) } interface Batch { key: unknown resolver: RequestResolver map: Map readonly entrySet: Set> readonly entries: Set> readonly delayEffect: Effect readonly run: Effect fiber?: Fiber | undefined } const batchPool: Array = [] const pendingBatches = new Map, Map>() const addEntry = ( resolver: RequestResolver, request: A, resume: (exit: Exit) => void, fiber: { readonly context: Context.Context readonly currentScheduler: Scheduler readonly id?: number } ) => { let batchMap = pendingBatches.get(resolver) if (!batchMap) { batchMap = new Map() pendingBatches.set(resolver, batchMap) } let batch: Batch | undefined let completed = false const entry = makeEntry({ request, context: fiber.context as any, uninterruptible: false, completeUnsafe(effect) { if (completed) return completed = true resume(effect) batch?.entrySet.delete(entry) } }) if (resolver.preCheck !== undefined && !resolver.preCheck(entry)) { return entry } const key = resolver.batchKey(entry) batch = batchMap.get(key) if (!batch) { if (batchPool.length > 0) { batch = batchPool.pop()! batch.key = key batch.resolver = resolver batch.map = batchMap } else { const newBatch: Batch = { key, resolver, map: batchMap, entrySet: new Set(), entries: new Set(), delayEffect: effect.flatMap( effect.suspend(() => newBatch.resolver.delay), (_) => runBatch(newBatch) ) as Effect, run: effect.onExit( effect.suspend(() => newBatch.resolver.runAll(Array.from(newBatch.entries) as NonEmptyArray>, newBatch.key) ), (exit) => { for (const entry of newBatch.entrySet) { entry.completeUnsafe( exit._tag === "Success" ? exitDie( new Error("Effect.request: RequestResolver did not complete request", { cause: entry.request }) ) : exit ) } newBatch.entries.clear() if (batchPool.length < 128) { newBatch.entrySet.clear() newBatch.key = undefined newBatch.fiber = undefined batchPool.push(newBatch) } return effect.void } ) } batch = newBatch } batchMap.set(key, batch) batch.fiber = effect.runForkWith(fiber.context)(batch.delayEffect, { scheduler: fiber.currentScheduler }) } batch.entrySet.add(entry) batch.entries.add(entry) if (batch.resolver.collectWhile(batch.entries)) return entry batch.fiber!.interruptUnsafe(fiber.id) batch.fiber = effect.runForkWith(fiber.context)(runBatch(batch), { scheduler: fiber.currentScheduler }) return entry } const removeEntryUnsafe = ( resolver: RequestResolver, entry: Request.Entry ) => { if (entry.uninterruptible) return const batchMap = pendingBatches.get(resolver) if (!batchMap) return const key = resolver.batchKey(entry.request as any) const batch = batchMap.get(key) if (!batch) return batch.entries.delete(entry) batch.entrySet.delete(entry) if (batch.entries.size === 0) { batchMap.delete(key) batch.fiber?.interruptUnsafe() } } const maybeRemoveEntry = ( resolver: RequestResolver, entry: Request.Entry ) => effect.sync(() => removeEntryUnsafe(resolver, entry)) function runBatch(batch: Batch) { if (!batch.map.has(batch.key)) return effect.void batch.map.delete(batch.key) return batch.run }