/** * The `Channel` module provides a powerful abstraction for bi-directional communication * and streaming operations. A `Channel` is a nexus of I/O operations that supports both * reading and writing, forming the foundation for Effect's Stream and Sink abstractions. * * ## What is a Channel? * * A `Channel` represents: * - **OutElem**: The type of elements the channel outputs * - **OutErr**: The type of errors the channel can produce * - **OutDone**: The type of the final value when the channel completes * - **InElem**: The type of elements the channel reads * - **InErr**: The type of errors the channel can receive * - **InDone**: The type of the final value from upstream * - **Env**: The environment/context required by the channel * * ## Key Features * * - **Bi-directional**: Channels can both read and write * - **Composable**: Channels can be piped, sequenced, and concatenated * - **Resource-safe**: Automatic cleanup and resource management * - **Error-handling**: Built-in error propagation and handling * - **Concurrent**: Support for concurrent operations * * ## Composition Patterns * * 1. **Piping**: Connect channels where output of one becomes input of another * 2. **Sequencing**: Use the result of one channel to create another * 3. **Concatenating**: Combine multiple channels into a single channel * * @example * ```ts * import { Channel } from "effect" * * // Simple channel that outputs numbers * const numberChannel = Channel.succeed(42) * * // Transform channel that doubles values * const doubleChannel = Channel.map(numberChannel, (n) => n * 2) * * // Running the channel would output: 84 * ``` * * @example * ```ts * import { Channel } from "effect" * * // Channel from an array of values * const arrayChannel = Channel.fromArray([1, 2, 3, 4, 5]) * * // Transform the channel by mapping over values * const transformedChannel = Channel.map(arrayChannel, (n) => n * 2) * * // This channel will output: 2, 4, 6, 8, 10 * ``` * * @since 2.0.0 */ // @effect-diagnostics returnEffectInGen:off import * as Arr from "./Array.ts" import * as Cause from "./Cause.ts" import * as Chunk from "./Chunk.ts" import * as Context from "./Context.ts" import * as Effect from "./Effect.ts" import * as Exit from "./Exit.ts" import * as Fiber from "./Fiber.ts" import type * as Filter from "./Filter.ts" import type { LazyArg } from "./Function.ts" import { constant, constTrue, constVoid, dual, identity as identity_ } from "./Function.ts" import { ClockRef, endSpan } from "./internal/effect.ts" import { addSpanStackTrace } from "./internal/tracer.ts" import * as Iterable from "./Iterable.ts" import * as Latch from "./Latch.ts" import * as Layer from "./Layer.ts" import type { Severity } from "./LogLevel.ts" import * as Option from "./Option.ts" import type { Pipeable } from "./Pipeable.ts" import { pipeArguments } from "./Pipeable.ts" import type * as Predicate from "./Predicate.ts" import { hasProperty, isTagged } from "./Predicate.ts" import * as PubSub from "./PubSub.ts" import * as Pull from "./Pull.ts" import * as Queue from "./Queue.ts" import { TracerTimingEnabled } from "./References.ts" import * as Result from "./Result.ts" import * as Schedule from "./Schedule.ts" import * as Scope from "./Scope.ts" import * as Semaphore from "./Semaphore.ts" import * as String from "./String.ts" import * as Take from "./Take.ts" import { ParentSpan, type SpanOptions } from "./Tracer.ts" import type * as Types from "./Types.ts" import type * as Unify from "./Unify.ts" /** * @since 4.0.0 * @category Type Identifiers */ export type TypeId = "~effect/Channel" /** * @since 4.0.0 * @category Type Identifiers */ export const TypeId: TypeId = "~effect/Channel" /** * Tests if a value is a `Channel`. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.succeed(42) * console.log(Channel.isChannel(channel)) // true * console.log(Channel.isChannel("not a channel")) // false * ``` * * @category guards * @since 3.5.4 */ export const isChannel = ( u: unknown ): u is Channel => hasProperty(u, TypeId) /** * A `Channel` is a nexus of I/O operations, which supports both reading and * writing. A channel may read values of type `InElem` and write values of type * `OutElem`. When the channel finishes, it yields a value of type `OutDone`. A * channel may fail with a value of type `OutErr`. * * Channels are the foundation of Streams: both streams and sinks are built on * channels. Most users shouldn't have to use channels directly, as streams and * sinks are much more convenient and cover all common use cases. However, when * adding new stream and sink operators, or doing something highly specialized, * it may be useful to use channels directly. * * Channels compose in a variety of ways: * * - **Piping**: One channel can be piped to another channel, assuming the * input type of the second is the same as the output type of the first. * - **Sequencing**: The terminal value of one channel can be used to create * another channel, and both the first channel and the function that makes * the second channel can be composed into a channel. * - **Concatenating**: The output of one channel can be used to create other * channels, which are all concatenated together. The first channel and the * function that makes the other channels can be composed into a channel. * * @example * ```ts * import type { Channel } from "effect" * * // A channel that outputs numbers and requires no environment * type NumberChannel = Channel.Channel * * // A channel that outputs strings, can fail with Error, completes with boolean * type StringChannel = Channel.Channel * * // A channel with all type parameters specified * type FullChannel = Channel.Channel< * string, // OutElem - output elements * Error, // OutErr - output errors * number, // OutDone - completion value * number, // InElem - input elements * string, // InErr - input errors * boolean, // InDone - input completion * { db: string } // Env - required environment * > * ``` * * @since 2.0.0 * @category models */ export interface Channel< out OutElem, out OutErr = never, out OutDone = void, in InElem = unknown, in InErr = unknown, in InDone = unknown, out Env = never > extends Variance, Pipeable { [Unify.typeSymbol]?: unknown [Unify.unifySymbol]?: ChannelUnify [Unify.ignoreSymbol]?: ChannelUnifyIgnore } /** * @since 2.0.0 * @category models */ export interface ChannelUnify extends Effect.EffectUnify { Channel?: () => A[Unify.typeSymbol] extends | Channel | infer _ ? Channel : never } /** * @category models * @since 2.0.0 */ export interface ChannelUnifyIgnore { Effect?: true } type TagsWithReason = { [T in Types.Tags]: Types.ReasonTags> extends never ? never : T }[Types.Tags] /** * @since 2.0.0 * @category models */ export interface Variance< out OutElem, out OutErr, out OutDone, in InElem, in InErr, in InDone, out Env > { readonly [TypeId]: VarianceStruct } /** * @since 2.0.0 * @category models */ export interface VarianceStruct< out OutElem, out OutErr, out OutDone, in InElem, in InErr, in InDone, out Env > { _Env: Types.Covariant _InErr: Types.Contravariant _InElem: Types.Contravariant _InDone: Types.Contravariant _OutErr: Types.Covariant _OutElem: Types.Covariant _OutDone: Types.Covariant } const ChannelProto = { [TypeId]: { _Env: identity_, _InErr: identity_, _InElem: identity_, _OutErr: identity_, _OutElem: identity_ }, pipe() { return pipeArguments(this, arguments) } } // ----------------------------------------------------------------------------- // Constructors // ----------------------------------------------------------------------------- /** * Creates a `Channel` from a transformation function that operates on upstream pulls. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.fromTransform((upstream, scope) => * Effect.succeed(upstream) * ) * ``` * * @category constructors * @since 4.0.0 */ export const fromTransform = ( transform: ( upstream: Pull.Pull, scope: Scope.Scope ) => Effect.Effect, EX, Env> ): Channel< OutElem, Pull.ExcludeDone | EX, OutDone, InElem, InErr, InDone, Env | EnvX > => { const self = Object.create(ChannelProto) self.transform = (upstream: any, scope: Scope.Scope) => Effect.catchCause(transform(upstream, scope), (cause) => Effect.succeed(Effect.failCause(cause))) return self } /** * Transforms a Channel by applying a function to its Pull implementation. * * @example * ```ts * import { Channel, Effect } from "effect" * * // Transform a channel by modifying its pull behavior * const originalChannel = Channel.fromIterable([1, 2, 3]) * * const transformedChannel = Channel.transformPull( * originalChannel, * (pull, scope) => * Effect.succeed( * Effect.map(pull, (value) => value * 2) * ) * ) * // Outputs: 2, 4, 6 * ``` * * @category constructors * @since 4.0.0 */ export const transformPull = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem2, OutErr2, OutDone2, Env2, OutErrX, EnvX >( self: Channel, f: ( pull: Pull.Pull, scope: Scope.Scope ) => Effect.Effect, OutErrX, EnvX> ): Channel< OutElem2, Pull.ExcludeDone | OutErrX, OutDone2, InElem, InErr, InDone, Env | Env2 | EnvX > => fromTransform((upstream, scope) => Effect.flatMap(toTransform(self)(upstream, scope), (pull) => f(pull, scope))) /** * Creates a `Channel` from an `Effect` that produces a `Pull`. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.fromPull( * Effect.succeed(Effect.succeed(42)) * ) * ``` * * @category constructors * @since 4.0.0 */ export const fromPull = ( effect: Effect.Effect, EX, Env> ): Channel | EX, OutDone, unknown, unknown, unknown, Env | EnvX> => fromTransform((_, __) => effect) as any /** * Creates a `Channel` from a transformation function that operates on upstream * pulls, but also provides a forked scope that closes when the resulting * Channel completes. * * @since 4.0.0 * @category constructors */ export const fromTransformBracket = ( f: ( upstream: Pull.Pull, scope: Scope.Scope, forkedScope: Scope.Scope ) => Effect.Effect, EX, Env> ): Channel | EX, OutDone, InElem, InErr, InDone, Env | EnvX> => fromTransform( Effect.fnUntraced(function*(upstream, scope) { const closableScope = Scope.forkUnsafe(scope) const onCause = (cause: Cause.Cause>) => Scope.close(closableScope, Pull.doneExitFromCause(cause)) const pull = yield* Effect.onError( f(upstream, scope, closableScope), onCause ) return Effect.onError(pull, onCause) }) ) /** * Converts a `Channel` back to its underlying transformation function. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.succeed(42) * const transform = Channel.toTransform(channel) * // transform can now be used directly * ``` * * @category destructors * @since 4.0.0 */ export const toTransform = ( channel: Channel ): ( upstream: Pull.Pull, scope: Scope.Scope ) => Effect.Effect, never, Env> => (channel as any).transform /** * The default chunk size used by channels for batching operations. * * @example * ```ts * import { Channel } from "effect" * * console.log(Channel.DefaultChunkSize) // 4096 * ``` * * @category constants * @since 2.0.0 */ export const DefaultChunkSize: number = 4096 const asyncQueue = ( scope: Scope.Scope, f: (queue: Queue.Queue) => Effect.Effect, options?: { readonly bufferSize?: number | undefined readonly strategy?: "sliding" | "dropping" | "suspend" | undefined } ) => Queue.make({ capacity: options?.bufferSize, strategy: options?.strategy }).pipe( Effect.tap((queue) => Scope.addFinalizer(scope, Queue.shutdown(queue))), Effect.tap((queue) => Effect.forkIn(Scope.provide(f(queue), scope), scope)) ) /** * Creates a `Channel` that interacts with a callback function using a queue. * * @example * ```ts * import { Channel, Effect, Queue } from "effect" * * const channel = Channel.callback((queue) => * Effect.gen(function*() { * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * yield* Queue.offer(queue, 3) * }) * ) * ``` * * @category constructors * @since 2.0.0 */ export const callback = ( f: (queue: Queue.Queue) => Effect.Effect, options?: { readonly bufferSize?: number | undefined readonly strategy?: "sliding" | "dropping" | "suspend" | undefined } ): Channel> => fromTransform((_, scope) => Effect.map(asyncQueue(scope, f, options), Queue.take)) /** * Creates a `Channel` that interacts with a callback function using a queue, emitting arrays. * * @example * ```ts * import { Channel, Effect, Queue } from "effect" * * const channel = Channel.callbackArray(Effect.fn(function*(queue) { * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * })) * // Emits arrays of numbers instead of individual numbers * ``` * * @category constructors * @since 4.0.0 */ export const callbackArray = ( f: (queue: Queue.Queue) => Effect.Effect, options?: { readonly bufferSize?: number | undefined readonly strategy?: "sliding" | "dropping" | "suspend" | undefined } ): Channel, E, void, unknown, unknown, unknown, Exclude> => fromTransform((_, scope) => Effect.map(asyncQueue(scope, f, options), Queue.takeAll)) /** * Creates a `Channel` that lazily evaluates to another channel. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.suspend(() => Channel.succeed(42)) * // The inner channel is not created until the suspended channel is run * ``` * * @category constructors * @since 2.0.0 */ export const suspend = ( evaluate: LazyArg> ): Channel => fromTransform((upstream, scope) => Effect.suspend(() => toTransform(evaluate())(upstream, scope))) /** * Creates a `Channel` with resource management using acquire-use-release pattern. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.acquireUseRelease( * Effect.succeed("resource"), * (resource) => Channel.succeed(resource.toUpperCase()), * (resource, exit) => Effect.log(`Released: ${resource}`) * ) * ``` * * @category constructors * @since 2.0.0 */ export const acquireUseRelease = ( acquire: Effect.Effect, use: (a: A) => Channel, release: (a: A, exit: Exit.Exit) => Effect.Effect ): Channel => fromTransformBracket( Effect.fnUntraced(function*(upstream, scope, forkedScope) { let option = Option.none() yield* Scope.addFinalizerExit(forkedScope, (exit) => Option.isSome(option) ? release(option.value, exit as any) : Effect.void) const value = yield* Effect.uninterruptible(acquire) option = Option.some(value) return yield* toTransform(use(value))(upstream, scope) }) ) /** * Creates a `Channel` with resource management using acquire-release pattern. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.acquireRelease( * Effect.succeed("resource"), * (resource, exit) => Effect.log(`Released: ${resource}`) * ) * ``` * * @category constructors * @since 2.0.0 */ export const acquireRelease: { /** * Creates a `Channel` with resource management using acquire-release pattern. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.acquireRelease( * Effect.succeed("resource"), * (resource, exit) => Effect.log(`Released: ${resource}`) * ) * ``` * * @category constructors * @since 2.0.0 */ (release: (z: Z, e: Exit.Exit) => Effect.Effect): (self: Effect.Effect) => Channel /** * Creates a `Channel` with resource management using acquire-release pattern. * * @example * ```ts * import { Channel, Effect } from "effect" * * const channel = Channel.acquireRelease( * Effect.succeed("resource"), * (resource, exit) => Effect.log(`Released: ${resource}`) * ) * ``` * * @category constructors * @since 2.0.0 */ ( self: Effect.Effect, release: (z: Z, e: Exit.Exit) => Effect.Effect ): Channel } = dual(2, ( self: Effect.Effect, release: (z: Z, e: Exit.Exit) => Effect.Effect ): Channel => unwrap(Effect.map( Effect.acquireRelease(self, release), succeed ))) /** * Creates a `Channel` from an iterator. * * @example * ```ts * import { Channel } from "effect" * * const numbers = [1, 2, 3, 4, 5] * const channel = Channel.fromIterator(() => numbers[Symbol.iterator]()) * // Emits: 1, 2, 3, 4, 5 * ``` * * @category constructors * @since 2.0.0 */ export const fromIterator = (iterator: LazyArg>): Channel => fromPull( Effect.sync(() => { const iter = iterator() return Effect.suspend(() => { const state = iter.next() return state.done ? Cause.done(state.value) : Effect.succeed(state.value) }) }) ) /** * Creates a `Channel` that emits all elements from an array. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.fromArray([1, 2, 3, 4, 5]) * // Emits: 1, 2, 3, 4, 5 * ``` * * @category constructors * @since 2.0.0 */ export const fromArray = (array: ReadonlyArray): Channel => fromPull(Effect.sync(() => { let index = 0 return Effect.suspend(() => index >= array.length ? Cause.done() : Effect.succeed(array[index++])) })) /** * Creates a `Channel` that emits all elements from a chunk. * * @example * ```ts * import { Channel, Chunk } from "effect" * * const chunk = Chunk.make(1, 2, 3) * const channel = Channel.fromChunk(chunk) * // Emits: 1, 2, 3 * ``` * * @category constructors * @since 2.0.0 */ export const fromChunk = (chunk: Chunk.Chunk): Channel => fromArray(Chunk.toReadonlyArray(chunk)) /** * Creates a `Channel` from an iterator that emits arrays of elements. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel from a simple iterator * const numberIterator = (): Iterator => { * let count = 0 * return { * next: () => { * if (count < 3) { * return { value: count++, done: false } * } * return { value: "finished", done: true } * } * } * } * * const channel = Channel.fromIteratorArray(() => numberIterator(), 2) * // This will emit arrays: [0, 1], [2], then complete with "finished" * ``` * * @example * ```ts * import { Channel } from "effect" * * // Create channel from a generator function * function* fibonacci(): Generator { * let a = 0, b = 1 * for (let i = 0; i < 5; i++) { * yield a * ;[a, b] = [b, a + b] * } * } * * const fibChannel = Channel.fromIteratorArray(() => fibonacci(), 3) * // Emits: [0, 1, 1], [2, 3], then completes * ``` * * @since 2.0.0 * @category constructors */ export const fromIteratorArray = ( iterator: LazyArg>, chunkSize = DefaultChunkSize ): Channel, never, L> => fromPull( Effect.sync(() => { const iter = iterator() let done = Option.none() return Effect.suspend(() => { if (done._tag === "Some") return Cause.done(done.value) const buffer: Array = [] while (buffer.length < chunkSize) { const state = iter.next() if (state.done) { if (buffer.length === 0) { return Cause.done(state.value) } done = Option.some(state.value) break } buffer.push(state.value) } return Effect.succeed(buffer as any as Arr.NonEmptyReadonlyArray) }) }) ) /** * Creates a `Channel` that emits all elements from an iterable. * * @example * ```ts * import { Channel } from "effect" * * const set = new Set([1, 2, 3]) * const channel = Channel.fromIterable(set) * // Emits: 1, 2, 3 * ``` * * @category constructors * @since 2.0.0 */ export const fromIterable = (iterable: Iterable): Channel => fromIterator(() => iterable[Symbol.iterator]()) /** * Creates a `Channel` that emits arrays of elements from an iterable. * * @example * ```ts * import { Channel } from "effect" * * const numbers = [1, 2, 3, 4, 5] * const channel = Channel.fromIterableArray(numbers) * // Emits arrays like: [1, 2, 3, 4], [5] (based on chunk size) * ``` * * @category constructors * @since 2.0.0 */ export const fromIterableArray = ( iterable: Iterable, chunkSize = DefaultChunkSize ): Channel, never, L> => fromIteratorArray(() => iterable[Symbol.iterator](), chunkSize) /** * Creates a `Channel` that emits a single value and then ends. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.succeed(42) * // Emits: 42 * ``` * * @category constructors * @since 2.0.0 */ export const succeed = (value: A): Channel => fromEffect(Effect.succeed(value)) /** * Creates a `Channel` that immediately ends with the specified value. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.end("done") * // Ends immediately with "done", emits nothing * ``` * * @category constructors * @since 4.0.0 */ export const end = (value: A): Channel => fromPull(Effect.succeed(Cause.done(value))) /** * Creates a `Channel` that immediately ends with the lazily evaluated value. * * @category constructors * @since 4.0.0 */ export const endSync = (evaluate: LazyArg): Channel => fromPull(Effect.sync(() => Cause.done(evaluate()))) /** * Creates a `Channel` that emits a single value computed by a lazy evaluation. * * @example * ```ts * import { Channel } from "effect" * * const channel = Channel.sync(() => Math.random()) * // Emits a random number computed when the channel runs * ``` * * @category constructors * @since 2.0.0 */ export const sync = (evaluate: LazyArg): Channel => fromEffect(Effect.sync(evaluate)) /** * Represents an Channel that emits no elements * * @example * ```ts * import { Channel } from "effect" * * // Create an empty channel * const emptyChannel = Channel.empty * * // Use empty channel in composition * const combined = Channel.concatWith(emptyChannel, () => Channel.succeed(42)) * // Will immediately provide the second channel's output * * // Empty channel can be used as a no-op in conditional logic * const conditionalChannel = (shouldEmit: boolean) => * shouldEmit ? Channel.succeed("data") : Channel.empty * ``` * * @since 2.0.0 * @category constructors */ export const empty: Channel = fromPull(Effect.succeed(Cause.done())) /** * Represents an Channel that never completes * * @example * ```ts * import { Channel } from "effect" * * // Create a channel that never completes * const neverChannel = Channel.never * * // Use in conditional logic * const withFallback = Channel.concatWith( * neverChannel, * () => Channel.succeed("fallback") * ) * * // Never channel is useful for testing or as a placeholder * const conditionalChannel = (shouldComplete: boolean) => * shouldComplete ? Channel.succeed("done") : Channel.never * ``` * * @since 2.0.0 * @category constructors */ export const never: Channel = fromPull(Effect.succeed(Effect.never)) /** * Constructs a channel that fails immediately with the specified error. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel that fails with a string error * const failedChannel = Channel.fail("Something went wrong") * * // Create a channel that fails with a custom error * class CustomError extends Error { * constructor(message: string) { * super(message) * this.name = "CustomError" * } * } * const customErrorChannel = Channel.fail(new CustomError("Custom error")) * * // Use in error handling by piping to another channel * const channelWithFallback = Channel.concatWith( * failedChannel, * () => Channel.succeed("fallback value") * ) * ``` * * @since 2.0.0 * @category constructors */ export const fail = (error: E): Channel => fromPull(Effect.succeed(Effect.fail(error))) /** * Constructs a channel that fails immediately with the specified lazily * evaluated error. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel that fails with a lazily computed error * const failedChannel = Channel.failSync(() => { * console.log("Computing error...") * return new Error("Computed at runtime") * }) * * // The error computation is deferred until the channel runs * const conditionalError = Channel.failSync(() => * Math.random() > 0.5 ? "Error A" : "Error B" * ) * * // Use with expensive error construction * const expensiveError = Channel.failSync(() => { * const timestamp = Date.now() * return new Error(`Failed at: ${timestamp}`) * }) * ``` * * @since 2.0.0 * @category constructors */ export const failSync = (evaluate: LazyArg): Channel => fromPull(Effect.failSync(evaluate)) /** * Constructs a channel that fails immediately with the specified `Cause`. * * @example * ```ts * import { Cause, Channel } from "effect" * * // Create a channel that fails with a simple cause * const simpleCause = Cause.fail("Simple error") * const failedChannel = Channel.failCause(simpleCause) * * // Create a channel with a die cause * const dieCause = Cause.die(new Error("System error")) * const dieFailure = Channel.failCause(dieCause) * * // Create a channel with a simple fail cause * const failCause = Cause.fail("Simple error") * const simpleFail = Channel.failCause(failCause) * ``` * * @since 2.0.0 * @category constructors */ export const failCause = (cause: Cause.Cause): Channel => fromPull(Effect.failCause(cause)) /** * Constructs a channel that fails immediately with the specified lazily * evaluated `Cause`. * * @example * ```ts * import { Cause, Channel } from "effect" * * // Create a channel that fails with a lazily computed cause * const failedChannel = Channel.failCauseSync(() => { * const errorType = Math.random() > 0.5 ? "A" : "B" * return Cause.fail(`Runtime error ${errorType}`) * }) * * // Create a channel with die cause computation * const dieCauseChannel = Channel.failCauseSync(() => { * const timestamp = Date.now() * return Cause.die(`Error at ${timestamp}`) * }) * ``` * * @since 2.0.0 * @category constructors */ export const failCauseSync = ( evaluate: LazyArg> ): Channel => fromPull(Effect.failCauseSync(evaluate)) /** * Constructs a channel that fails immediately with the specified defect. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel that dies with a string defect * const diedChannel = Channel.die("Unrecoverable error") * * // Create a channel that dies with an Error object * const errorDefect = Channel.die(new Error("System failure")) * * // Die with any value as a defect * const objectDefect = Channel.die({ * code: "SYSTEM_FAILURE", * details: "Critical system component failed" * }) * ``` * * @since 2.0.0 * @category constructors */ export const die = (defect: unknown): Channel => failCause(Cause.die(defect)) /** * Use an effect to write a single value to the channel. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class DatabaseError extends Data.TaggedError("DatabaseError")<{ * readonly message: string * }> {} * * // Create a channel from a successful effect * const successChannel = Channel.fromEffect( * Effect.succeed("Hello from effect!") * ) * * // Create a channel from an effect that might fail * const fetchUserChannel = Channel.fromEffect( * Effect.tryPromise({ * try: () => fetch("/api/user").then((res) => res.json()), * catch: (error) => new DatabaseError({ message: String(error) }) * }) * ) * * // Channel from effect with async computation * const asyncChannel = Channel.fromEffect( * Effect.gen(function*() { * yield* Effect.sleep("100 millis") * return "Async result" * }) * ) * ``` * * @since 2.0.0 * @category constructors */ export const fromEffect = ( effect: Effect.Effect ): Channel, void, unknown, unknown, unknown, R> => fromPull( Effect.sync(() => { let done = false return Effect.suspend((): Pull.Pull => { if (done) return Cause.done() done = true return effect }) }) ) /** * @since 4.0.0 * @category constructors */ export const fromEffectDone = ( effect: Effect.Effect ): Channel, A, unknown, unknown, unknown, R> => fromPull(Effect.succeed(Effect.flatMap(effect, Cause.done))) /** * Use an effect and discard its result. * * @since 4.0.0 * @category constructors */ export const fromEffectDrain = ( effect: Effect.Effect ): Channel => fromPull(Effect.flatMap(effect, () => Cause.done())) as any /** * @since 4.0.0 * @category constructors */ export const fromEffectTake = ( effect: Effect.Effect, E2, R> ): Channel, E | E2, Done, unknown, unknown, unknown, R> => fromPull(Effect.succeed(Effect.flatMap(effect, Take.toPull))) /** * Create a channel from a queue * * @example * ```ts * import { Channel, Data, Effect, Queue } from "effect" * * class QueueError extends Data.TaggedError("QueueError")<{ * readonly reason: string * }> {} * * const program = Effect.gen(function*() { * // Create a bounded queue * const queue = yield* Queue.bounded(10) * * // Add some items to the queue * yield* Queue.offer(queue, "item1") * yield* Queue.offer(queue, "item2") * yield* Queue.offer(queue, "item3") * * // Create a channel from the queue * const channel = Channel.fromQueue(queue) * * // The channel will read items from the queue one by one * return channel * }) * * // Sliding queue example * const slidingProgram = Effect.gen(function*() { * const slidingQueue = yield* Queue.sliding(5) * yield* Queue.offerAll(slidingQueue, [1, 2, 3, 4, 5, 6]) * return Channel.fromQueue(slidingQueue) * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromQueue = ( queue: Queue.Dequeue ): Channel> => fromPull(Effect.succeed(Queue.take(queue))) /** * Create a channel from a queue that emits arrays of elements * * @example * ```ts * import { Channel, Data, Effect, Queue } from "effect" * * class ProcessingError extends Data.TaggedError("ProcessingError")<{ * readonly stage: string * }> {} * * const program = Effect.gen(function*() { * // Create a queue for batch processing * const queue = yield* Queue.bounded(100) * * // Fill queue with data * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) * * // Create a channel that reads arrays from the queue * const arrayChannel = Channel.fromQueueArray(queue) * * // This will emit non-empty arrays of elements instead of individual items * // Useful for batch processing scenarios * return arrayChannel * }) * * // High-throughput processing example * const batchProcessor = Effect.gen(function*() { * const dataQueue = yield* Queue.dropping(1000) * const batchChannel = Channel.fromQueueArray(dataQueue) * * // Process data in batches for better performance * return Channel.map( * batchChannel, * (batch) => batch.map((item) => item.toUpperCase()) * ) * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromQueueArray = ( queue: Queue.Dequeue ): Channel, Exclude> => fromPull(Effect.succeed(Queue.takeAll(queue))) /** * @since 2.0.0 * @category Constructors */ export const identity = (): Channel => fromTransform((upstream, _scope) => Effect.succeed(upstream)) /** * Create a channel from a PubSub subscription * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class SubscriptionError extends Data.TaggedError("SubscriptionError")<{ * readonly reason: string * }> {} * * const program = Effect.gen(function*() { * // Create a PubSub * const pubsub = yield* PubSub.bounded(32) * * // Create a subscription * const subscription = yield* PubSub.subscribe(pubsub) * * // Publish some messages * yield* PubSub.publish(pubsub, "Hello") * yield* PubSub.publish(pubsub, "World") * yield* PubSub.publish(pubsub, "from") * yield* PubSub.publish(pubsub, "PubSub") * * // Create a channel from the subscription * const channel = Channel.fromSubscription(subscription) * * // The channel will receive all published messages * return channel * }) * * // Real-time notifications example * const notificationChannel = Effect.gen(function*() { * const eventBus = yield* PubSub.unbounded<{ type: string; payload: any }>() * const userSubscription = yield* PubSub.subscribe(eventBus) * * return Channel.fromSubscription(userSubscription) * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromSubscription = ( subscription: PubSub.Subscription ): Channel => fromPull(Effect.succeed(Effect.onInterrupt(PubSub.take(subscription), () => Cause.done()))) /** * Create a channel from a PubSub subscription that outputs arrays of values. * * This constructor creates a channel that reads from a PubSub subscription and outputs * arrays of values in chunks. It's useful when you want to process multiple values at once * for better performance. * * @param subscription - The PubSub subscription to read from * @param chunkSize - The maximum number of elements to read in each chunk (default: 4096) * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class StreamError extends Data.TaggedError("StreamError")<{ * readonly message: string * }> {} * * const program = Effect.gen(function*() { * const pubsub = yield* PubSub.bounded(16) * const subscription = yield* PubSub.subscribe(pubsub) * * // Create a channel that reads arrays of values * const channel = Channel.fromSubscriptionArray(subscription) * * // Publish some values * yield* PubSub.publish(pubsub, 1) * yield* PubSub.publish(pubsub, 2) * yield* PubSub.publish(pubsub, 3) * yield* PubSub.publish(pubsub, 4) * * // The channel will output arrays like [1, 2, 3] and [4] * return channel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class BatchProcessingError extends Data.TaggedError("BatchProcessingError")<{ * readonly reason: string * }> {} * * const batchProcessor = Effect.gen(function*() { * const pubsub = yield* PubSub.bounded(32) * const subscription = yield* PubSub.subscribe(pubsub) * * // Create a channel that processes items in batches * const batchChannel = Channel.fromSubscriptionArray(subscription) * * // Transform to process each batch * const processedChannel = Channel.map(batchChannel, (batch) => { * console.log(`Processing batch of ${batch.length} items:`, batch) * return batch.map((item) => item.toUpperCase()) * }) * * return processedChannel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class MetricsError extends Data.TaggedError("MetricsError")<{ * readonly cause: string * }> {} * * const metricsAggregator = Effect.gen(function*() { * const metricsPubSub = yield* PubSub.bounded< * { timestamp: number; value: number } * >(100) * const subscription = yield* PubSub.subscribe(metricsPubSub) * * // Create a channel that collects metrics in chunks * const metricsChannel = Channel.fromSubscriptionArray(subscription) * * // Transform to calculate aggregate statistics * const aggregatedChannel = Channel.map(metricsChannel, (metrics) => { * const values = metrics.map((m) => m.value) * const sum = values.reduce((a, b) => a + b, 0) * const avg = sum / values.length * const min = Math.min(...values) * const max = Math.max(...values) * * return { * count: values.length, * sum, * average: avg, * min, * max, * timestamp: Date.now() * } * }) * * return aggregatedChannel * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromSubscriptionArray = ( subscription: PubSub.Subscription ): Channel> => fromPull(Effect.succeed(Effect.onInterrupt(PubSub.takeAll(subscription), () => Cause.done()))) /** * Create a channel from a PubSub that outputs individual values. * * This constructor creates a channel that reads from a PubSub by automatically * subscribing to it. The channel outputs individual values as they are published * to the PubSub, making it ideal for real-time streaming scenarios. * * @param pubsub - The PubSub to read from * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class StreamError extends Data.TaggedError("StreamError")<{ * readonly message: string * }> {} * * const program = Effect.gen(function*() { * const pubsub = yield* PubSub.bounded(16) * * // Create a channel that reads individual values * const channel = Channel.fromPubSub(pubsub) * * // Publish some values * yield* PubSub.publish(pubsub, 1) * yield* PubSub.publish(pubsub, 2) * yield* PubSub.publish(pubsub, 3) * * // The channel will output: 1, 2, 3 (individual values) * return channel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class NotificationError extends Data.TaggedError("NotificationError")<{ * readonly reason: string * }> {} * * const notificationService = Effect.gen(function*() { * const notificationPubSub = yield* PubSub.bounded(50) * * // Create a channel for real-time notifications * const notificationChannel = Channel.fromPubSub(notificationPubSub) * * // Transform notifications to add timestamps * const timestampedChannel = Channel.map(notificationChannel, (message) => ({ * message, * timestamp: new Date().toISOString(), * id: Math.random().toString(36).substr(2, 9) * })) * * return timestampedChannel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class EventProcessingError extends Data.TaggedError("EventProcessingError")<{ * readonly eventType: string * readonly cause: string * }> {} * * interface DomainEvent { * readonly type: string * readonly payload: unknown * readonly timestamp: number * } * * const eventProcessor = Effect.gen(function*() { * const eventPubSub = yield* PubSub.bounded(100) * * // Create a channel for processing domain events * const eventChannel = Channel.fromPubSub(eventPubSub) * * // Filter and transform events * const processedChannel = Channel.map(eventChannel, (event) => { * if (event.type === "user.created") { * return { * ...event, * processed: true, * processedAt: Date.now() * } * } * return event * }) * * return processedChannel * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromPubSub = ( pubsub: PubSub.PubSub ): Channel => unwrap(Effect.map(PubSub.subscribe(pubsub), fromSubscription)) /** * Create a channel from a PubSub that outputs arrays of values. * * This constructor creates a channel that reads from a PubSub by automatically * subscribing to it and collecting values into arrays. The channel outputs * arrays of values in chunks, making it ideal for batch processing scenarios. * * @param pubsub - The PubSub to read from * @param chunkSize - The maximum number of elements to collect in each array (default: 4096) * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class BatchError extends Data.TaggedError("BatchError")<{ * readonly message: string * }> {} * * const program = Effect.gen(function*() { * const pubsub = yield* PubSub.bounded(16) * * // Create a channel that reads arrays of values * const channel = Channel.fromPubSubArray(pubsub) * * // Publish some values * yield* PubSub.publish(pubsub, 1) * yield* PubSub.publish(pubsub, 2) * yield* PubSub.publish(pubsub, 3) * yield* PubSub.publish(pubsub, 4) * * // The channel will output arrays like [1, 2, 3] and [4] * return channel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class OrderProcessingError extends Data.TaggedError("OrderProcessingError")<{ * readonly orderId: string * readonly reason: string * }> {} * * interface Order { * readonly id: string * readonly customerId: string * readonly items: ReadonlyArray * readonly total: number * } * * const orderBatchProcessor = Effect.gen(function*() { * const orderPubSub = yield* PubSub.bounded(100) * * // Create a channel that processes orders in batches * const orderChannel = Channel.fromPubSubArray(orderPubSub) * * // Transform to process each batch of orders * const processedChannel = Channel.map(orderChannel, (orderBatch) => { * const totalRevenue = orderBatch.reduce((sum, order) => sum + order.total, 0) * const customerCount = new Set(orderBatch.map((order) => * order.customerId * )).size * * return { * batchSize: orderBatch.length, * totalRevenue, * uniqueCustomers: customerCount, * processedAt: Date.now(), * orders: orderBatch * } * }) * * return processedChannel * }) * ``` * * @example * ```ts * import { Channel, Data, Effect, PubSub } from "effect" * * class LogProcessingError extends Data.TaggedError("LogProcessingError")<{ * readonly batchId: string * readonly cause: string * }> {} * * interface LogEntry { * readonly timestamp: number * readonly level: "info" | "warn" | "error" * readonly message: string * readonly source: string * } * * const logAggregator = Effect.gen(function*() { * const logPubSub = yield* PubSub.bounded(500) * * // Create a channel that collects logs in batches * const logChannel = Channel.fromPubSubArray(logPubSub) * * // Transform to analyze log batches * const analysisChannel = Channel.map(logChannel, (logBatch) => { * const errorCount = logBatch.filter((log) => log.level === "error").length * const warnCount = logBatch.filter((log) => log.level === "warn").length * const infoCount = logBatch.filter((log) => log.level === "info").length * * const timeRange = { * start: Math.min(...logBatch.map((log) => log.timestamp)), * end: Math.max(...logBatch.map((log) => log.timestamp)) * } * * return { * batchId: Math.random().toString(36).substr(2, 9), * totalEntries: logBatch.length, * errorCount, * warnCount, * infoCount, * timeRange, * sources: [...new Set(logBatch.map((log) => log.source))] * } * }) * * return analysisChannel * }) * ``` * * @since 4.0.0 * @category constructors */ export const fromPubSubArray = (pubsub: PubSub.PubSub): Channel> => unwrap(Effect.map(PubSub.subscribe(pubsub), fromSubscriptionArray)) /** * @since 4.0.0 * @category constructors */ export const fromPubSubTake = ( pubsub: PubSub.PubSub> ): Channel, E, Done> => unwrap(Effect.map(PubSub.subscribe(pubsub), (sub) => fromEffectTake(PubSub.take(sub)))) /** * Creates a Channel from a Schedule. * * @since 4.0.0 * @category constructors */ export const fromSchedule = ( schedule: Schedule.Schedule ): Channel => fromPull(Effect.map(Schedule.toStepWithSleep(schedule), (step) => step(void 0))) /** * Creates a Channel from a AsyncIterable. * * @since 4.0.0 * @category constructors */ export const fromAsyncIterable = ( iterable: AsyncIterable, onError: (error: unknown) => E ): Channel => fromTransform(Effect.fnUntraced(function*(_, scope) { const iter = iterable[Symbol.asyncIterator]() if (iter.return) { yield* Scope.addFinalizer(scope, Effect.promise(() => iter.return!())) } return Effect.flatMap( Effect.tryPromise({ try: () => iter.next(), catch: onError }), (result) => result.done ? Cause.done(result.value) : Effect.succeed(result.value) ) })) /** * Creates a Channel from a AsyncIterable that emits arrays of elements. * * @since 4.0.0 * @category constructors */ export const fromAsyncIterableArray = ( iterable: AsyncIterable, onError: (error: unknown) => E ): Channel, E, D> => map(fromAsyncIterable(iterable, onError), Arr.of) /** * Maps the output of this channel using the specified function. * * @example * ```ts * import { Channel, Data } from "effect" * * class TransformError extends Data.TaggedError("TransformError")<{ * readonly reason: string * }> {} * * // Basic mapping of channel values * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const doubledChannel = Channel.map(numbersChannel, (n) => n * 2) * // Outputs: 2, 4, 6, 8, 10 * * // Transform string data * const wordsChannel = Channel.fromIterable(["hello", "world", "effect"]) * const upperCaseChannel = Channel.map(wordsChannel, (word) => word.toUpperCase()) * // Outputs: "HELLO", "WORLD", "EFFECT" * * // Complex object transformation * type User = { id: number; name: string } * type UserDisplay = { displayName: string; isActive: boolean } * * const usersChannel = Channel.fromIterable([ * { id: 1, name: "Alice" }, * { id: 2, name: "Bob" } * ]) * const displayChannel = Channel.map(usersChannel, (user): UserDisplay => ({ * displayName: `User: ${user.name}`, * isActive: true * })) * ``` * * @since 2.0.0 * @category Sequencing */ export const map: { /** * Maps the output of this channel using the specified function. * * @example * ```ts * import { Channel, Data } from "effect" * * class TransformError extends Data.TaggedError("TransformError")<{ * readonly reason: string * }> {} * * // Basic mapping of channel values * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const doubledChannel = Channel.map(numbersChannel, (n) => n * 2) * // Outputs: 2, 4, 6, 8, 10 * * // Transform string data * const wordsChannel = Channel.fromIterable(["hello", "world", "effect"]) * const upperCaseChannel = Channel.map(wordsChannel, (word) => word.toUpperCase()) * // Outputs: "HELLO", "WORLD", "EFFECT" * * // Complex object transformation * type User = { id: number; name: string } * type UserDisplay = { displayName: string; isActive: boolean } * * const usersChannel = Channel.fromIterable([ * { id: 1, name: "Alice" }, * { id: 2, name: "Bob" } * ]) * const displayChannel = Channel.map(usersChannel, (user): UserDisplay => ({ * displayName: `User: ${user.name}`, * isActive: true * })) * ``` * * @since 2.0.0 * @category Sequencing */ (f: (o: OutElem, i: number) => OutElem2): ( self: Channel ) => Channel /** * Maps the output of this channel using the specified function. * * @example * ```ts * import { Channel, Data } from "effect" * * class TransformError extends Data.TaggedError("TransformError")<{ * readonly reason: string * }> {} * * // Basic mapping of channel values * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const doubledChannel = Channel.map(numbersChannel, (n) => n * 2) * // Outputs: 2, 4, 6, 8, 10 * * // Transform string data * const wordsChannel = Channel.fromIterable(["hello", "world", "effect"]) * const upperCaseChannel = Channel.map(wordsChannel, (word) => word.toUpperCase()) * // Outputs: "HELLO", "WORLD", "EFFECT" * * // Complex object transformation * type User = { id: number; name: string } * type UserDisplay = { displayName: string; isActive: boolean } * * const usersChannel = Channel.fromIterable([ * { id: 1, name: "Alice" }, * { id: 2, name: "Bob" } * ]) * const displayChannel = Channel.map(usersChannel, (user): UserDisplay => ({ * displayName: `User: ${user.name}`, * isActive: true * })) * ``` * * @since 2.0.0 * @category Sequencing */ ( self: Channel, f: (o: OutElem, i: number) => OutElem2 ): Channel } = dual( 2, ( self: Channel, f: (o: OutElem, i: number) => OutElem2 ): Channel => transformPull(self, (pull) => Effect.sync(() => { let i = 0 return Effect.map(pull, (o) => f(o, i++)) })) ) /** * Maps the done value of this channel using the specified function. * * @since 2.0.0 * @category Sequencing */ export const mapDone: { /** * Maps the done value of this channel using the specified function. * * @since 2.0.0 * @category Sequencing */ (f: (o: OutDone) => OutDone2): ( self: Channel ) => Channel /** * Maps the done value of this channel using the specified function. * * @since 2.0.0 * @category Sequencing */ ( self: Channel, f: (o: OutDone) => OutDone2 ): Channel } = dual( 2, ( self: Channel, f: (o: OutDone) => OutDone2 ): Channel => mapDoneEffect(self, (o) => Effect.succeed(f(o))) ) /** * Maps the done value of this channel using the specified effectful function. * * @since 2.0.0 * @category Sequencing */ export const mapDoneEffect: { /** * Maps the done value of this channel using the specified effectful function. * * @since 2.0.0 * @category Sequencing */ (f: (o: OutDone) => Effect.Effect): ( self: Channel ) => Channel /** * Maps the done value of this channel using the specified effectful function. * * @since 2.0.0 * @category Sequencing */ ( self: Channel, f: (o: OutDone) => Effect.Effect ): Channel } = dual( 2, ( self: Channel, f: (o: OutDone) => Effect.Effect ): Channel => transformPull(self, (pull) => Effect.succeed(Pull.catchDone( pull, (done) => Effect.flatMap(f(done as OutDone), Cause.done) ))) ) const concurrencyIsSequential = ( concurrency: number | "unbounded" | undefined ) => concurrency === undefined || (concurrency !== "unbounded" && concurrency <= 1) /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class NetworkError extends Data.TaggedError("NetworkError")<{ * readonly url: string * }> {} * * // Transform values using effectful operations * const urlsChannel = Channel.fromIterable([ * "/api/users/1", * "/api/users/2", * "/api/users/3" * ]) * * const fetchDataChannel = Channel.mapEffect( * urlsChannel, * (url) => * Effect.tryPromise({ * try: () => fetch(url).then((res) => res.json()), * catch: () => new NetworkError({ url }) * }) * ) * * // Concurrent processing with options * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const processedChannel = Channel.mapEffect( * numbersChannel, * (n) => * Effect.gen(function*() { * yield* Effect.sleep("100 millis") // Simulate async work * return n * n * }), * { concurrency: 3, unordered: true } * ) * ``` * * @since 2.0.0 * @category sequencing */ export const mapEffect: { /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class NetworkError extends Data.TaggedError("NetworkError")<{ * readonly url: string * }> {} * * // Transform values using effectful operations * const urlsChannel = Channel.fromIterable([ * "/api/users/1", * "/api/users/2", * "/api/users/3" * ]) * * const fetchDataChannel = Channel.mapEffect( * urlsChannel, * (url) => * Effect.tryPromise({ * try: () => fetch(url).then((res) => res.json()), * catch: () => new NetworkError({ url }) * }) * ) * * // Concurrent processing with options * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const processedChannel = Channel.mapEffect( * numbersChannel, * (n) => * Effect.gen(function*() { * yield* Effect.sleep("100 millis") // Simulate async work * return n * n * }), * { concurrency: 3, unordered: true } * ) * ``` * * @since 2.0.0 * @category sequencing */ ( f: (d: OutElem, i: number) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): ( self: Channel ) => Channel /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class NetworkError extends Data.TaggedError("NetworkError")<{ * readonly url: string * }> {} * * // Transform values using effectful operations * const urlsChannel = Channel.fromIterable([ * "/api/users/1", * "/api/users/2", * "/api/users/3" * ]) * * const fetchDataChannel = Channel.mapEffect( * urlsChannel, * (url) => * Effect.tryPromise({ * try: () => fetch(url).then((res) => res.json()), * catch: () => new NetworkError({ url }) * }) * ) * * // Concurrent processing with options * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * const processedChannel = Channel.mapEffect( * numbersChannel, * (n) => * Effect.gen(function*() { * yield* Effect.sleep("100 millis") // Simulate async work * return n * n * }), * { concurrency: 3, unordered: true } * ) * ``` * * @since 2.0.0 * @category sequencing */ ( self: Channel, f: (d: OutElem, i: number) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): Channel } = dual( (args) => isChannel(args[0]), ( self: Channel, f: (d: OutElem, i: number) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): Channel => concurrencyIsSequential(options?.concurrency) ? mapEffectSequential(self, f) : mapEffectConcurrent(self, f, options as any) ) const mapEffectSequential = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem2, EX, RX >( self: Channel, f: (o: OutElem, i: number) => Effect.Effect ): Channel => fromTransform((upstream, scope) => { let i = 0 return Effect.map(toTransform(self)(upstream, scope), Effect.flatMap((o) => f(o, i++))) }) const mapEffectConcurrent = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem2, EX, RX >( self: Channel, f: (o: OutElem, i: number) => Effect.Effect, options: { readonly concurrency: number | "unbounded" readonly unordered?: boolean | undefined } ): Channel => fromTransformBracket( Effect.fnUntraced(function*(upstream, scope, forkedScope) { let i = 0 const pull = yield* toTransform(self)(upstream, scope) const concurrencyN = options.concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : options.concurrency const queue = yield* Queue.bounded>(0) yield* Scope.addFinalizer(forkedScope, Queue.shutdown(queue)) const runFork = Effect.runForkWith(yield* Effect.context()) const trackFiber = Fiber.runIn(forkedScope) if (options.unordered) { const semaphore = Semaphore.makeUnsafe(concurrencyN) const release = constant(semaphore.release(1)) const handle = Effect.matchCauseEffect({ onFailure: (cause: Cause.Cause) => Effect.flatMap(Queue.failCause(queue, cause), release), onSuccess: (value: OutElem2) => Effect.flatMap(Queue.offer(queue, value), release) }) yield* semaphore.take(1).pipe( Effect.flatMap(() => pull), Effect.flatMap((value) => { trackFiber(runFork(handle(f(value, i++)))) return Effect.void }), Effect.forever({ disableYield: true }), Effect.catchCause((cause) => semaphore.withPermits(concurrencyN - 1)( Queue.failCause(queue, cause) ) ), Effect.forkIn(forkedScope) ) } else { // capacity is n - 2 because // - 1 for the offer *after* starting a fiber // - 1 for the current processing fiber const effects = yield* Queue.bounded< Effect.Effect>, OutErr | EX | Cause.Done >(concurrencyN - 2) yield* Scope.addFinalizer(forkedScope, Queue.shutdown(queue)) yield* Queue.take(effects).pipe( Effect.flatten, Effect.flatMap((value) => Queue.offer(queue, value)), Effect.forever({ disableYield: true }), Effect.catchCause((cause) => Queue.failCause(queue, cause)), Effect.forkIn(forkedScope) ) let errorCause: Cause.Cause | undefined const onExit = (exit: Exit.Exit) => { if (exit._tag === "Success") return errorCause = exit.cause Queue.failCauseUnsafe(queue, exit.cause) } yield* pull.pipe( Effect.flatMap((value) => { if (errorCause) return Effect.failCause(errorCause) const fiber = runFork(f(value, i++)) trackFiber(fiber) fiber.addObserver(onExit) return Queue.offer(effects, Fiber.join(fiber)) }), Effect.forever({ disableYield: true }), Effect.catchCause((cause) => Queue.offer(effects, Exit.failCause(cause)).pipe( Effect.andThen(Queue.failCause(effects, cause)) ) ), Effect.forkIn(forkedScope) ) } return Queue.take(queue) }) ) /** * Returns a new channel which is the same as this one but applies the given * function to the input channel’s input elements. * * @since 2.0.0 * @category sequencing */ export const mapInput: { /** * Returns a new channel which is the same as this one but applies the given * function to the input channel’s input elements. * * @since 2.0.0 * @category sequencing */ (f: (i: InElem2) => Effect.Effect): ( self: Channel ) => Channel /** * Returns a new channel which is the same as this one but applies the given * function to the input channel’s input elements. * * @since 2.0.0 * @category sequencing */ ( self: Channel, f: (i: InElem2) => Effect.Effect ): Channel } = dual(2, ( self: Channel, f: (i: InElem2) => Effect.Effect ): Channel => fromTransform((upstream, scope) => toTransform(self)( Effect.flatMap(upstream, (el) => f(el)) as Pull.Pull, scope ) )) /** * Returns a new channel which is the same as this one but applies the given * function to the input errors. * * @since 2.0.0 * @category sequencing */ export const mapInputError: { /** * Returns a new channel which is the same as this one but applies the given * function to the input errors. * * @since 2.0.0 * @category sequencing */ (f: (i: InErr2) => Effect.Effect): ( self: Channel ) => Channel /** * Returns a new channel which is the same as this one but applies the given * function to the input errors. * * @since 2.0.0 * @category sequencing */ ( self: Channel, f: (i: InErr2) => Effect.Effect ): Channel } = dual(2, ( self: Channel, f: (i: InErr2) => Effect.Effect ): Channel => fromTransform((upstream, scope) => toTransform(self)( Effect.catch(upstream, (err): Pull.Pull => { if (Cause.isDone(err)) return Effect.fail(err) return Effect.flatMap(f(err), Effect.fail) as Pull.Pull }), scope ) )) /** * Applies a side effect function to each output element of the channel, * returning a new channel that emits the same elements. * * The `tap` function allows you to perform side effects (like logging or * debugging) on each element emitted by a channel without modifying the * elements themselves. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class LogError extends Data.TaggedError("LogError")<{ * readonly message: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Tap into each output element to perform side effects * const tappedChannel = Channel.tap( * numberChannel, * (n) => Console.log(`Processing number: ${n}`) * ) * * // The channel still outputs the same elements but logs each one * // Outputs: 1, 2, 3 (while logging each) * ``` * * @since 4.0.0 * @category sequencing */ export const tap: { /** * Applies a side effect function to each output element of the channel, * returning a new channel that emits the same elements. * * The `tap` function allows you to perform side effects (like logging or * debugging) on each element emitted by a channel without modifying the * elements themselves. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class LogError extends Data.TaggedError("LogError")<{ * readonly message: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Tap into each output element to perform side effects * const tappedChannel = Channel.tap( * numberChannel, * (n) => Console.log(`Processing number: ${n}`) * ) * * // The channel still outputs the same elements but logs each one * // Outputs: 1, 2, 3 (while logging each) * ``` * * @since 4.0.0 * @category sequencing */ ( f: (d: Types.NoInfer) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined } ): ( self: Channel ) => Channel /** * Applies a side effect function to each output element of the channel, * returning a new channel that emits the same elements. * * The `tap` function allows you to perform side effects (like logging or * debugging) on each element emitted by a channel without modifying the * elements themselves. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class LogError extends Data.TaggedError("LogError")<{ * readonly message: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Tap into each output element to perform side effects * const tappedChannel = Channel.tap( * numberChannel, * (n) => Console.log(`Processing number: ${n}`) * ) * * // The channel still outputs the same elements but logs each one * // Outputs: 1, 2, 3 (while logging each) * ``` * * @since 4.0.0 * @category sequencing */ ( self: Channel, f: (d: Types.NoInfer) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined } ): Channel } = dual( (args) => isChannel(args[0]), ( self: Channel, f: (d: Types.NoInfer) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined } ): Channel => mapEffect(self, (a) => Effect.as(f(a), a), options) ) /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly cause: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // FlatMap each number to create new channels * const flatMappedChannel = Channel.flatMap( * numberChannel, * (n) => * Channel.fromIterable(Array.from({ length: n }, (_, i) => `item-${n}-${i}`)) * ) * * // Flattens nested channels into a single stream * // Outputs: "item-1-0", "item-2-0", "item-2-1", "item-3-0", "item-3-1", "item-3-2" * ``` * * @since 2.0.0 * @category sequencing */ export const flatMap: { /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly cause: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // FlatMap each number to create new channels * const flatMappedChannel = Channel.flatMap( * numberChannel, * (n) => * Channel.fromIterable(Array.from({ length: n }, (_, i) => `item-${n}-${i}`)) * ) * * // Flattens nested channels into a single stream * // Outputs: "item-1-0", "item-2-0", "item-2-1", "item-3-0", "item-3-1", "item-3-2" * ``` * * @since 2.0.0 * @category sequencing */ ( f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): ( self: Channel ) => Channel< OutElem1, OutErr1 | OutErr, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly cause: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // FlatMap each number to create new channels * const flatMappedChannel = Channel.flatMap( * numberChannel, * (n) => * Channel.fromIterable(Array.from({ length: n }, (_, i) => `item-${n}-${i}`)) * ) * * // Flattens nested channels into a single stream * // Outputs: "item-1-0", "item-2-0", "item-2-1", "item-3-0", "item-3-1", "item-3-2" * ``` * * @since 2.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual( (args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => concurrencyIsSequential(options?.concurrency) ? flatMapSequential(self, f) : flatMapConcurrent(self, f, options as any) ) const flatMapSequential = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => fromTransform((upstream, scope) => Effect.map(toTransform(self)(upstream, scope), (pull) => { let childPull: Effect.Effect | undefined let childScope: Scope.Closeable | undefined const makePull: Pull.Pull< OutElem1, OutErr | OutErr1, OutDone, Env1 > = Effect.flatMap(pull, (value) => { childScope ??= Scope.forkUnsafe(scope) return Effect.flatMapEager(toTransform(f(value))(upstream, childScope), (pull) => { childPull = catchHalt(pull) as any return childPull! }) }) const catchHalt = Pull.catchDone((_) => { childPull = undefined // we can reuse the scope if the only finalizer is the "fork" one if (childScope!.state._tag === "Open" && childScope!.state.finalizers.size === 1) { return makePull } const close = Scope.close(childScope!, Exit.void) childScope = undefined return Effect.flatMap(close, () => makePull) }) return Effect.suspend(() => childPull ?? makePull) }) ) const flatMapConcurrent = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel, options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined } ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => self.pipe(map(f), mergeAll(options)) /** * Concatenates this channel with another channel created from the terminal value * of this channel. The new channel is created using the provided function. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers and terminates with sum * const numberChannel = Channel.fromIterable([1, 2, 3]).pipe( * Channel.concatWith((sum: void) => Channel.succeed(`Completed processing`)) * ) * * // Concatenates additional channel based on completion value * // Outputs: 1, 2, 3, then "Completed processing" * ``` * * @since 2.0.0 * @category sequencing */ export const concatWith: { /** * Concatenates this channel with another channel created from the terminal value * of this channel. The new channel is created using the provided function. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers and terminates with sum * const numberChannel = Channel.fromIterable([1, 2, 3]).pipe( * Channel.concatWith((sum: void) => Channel.succeed(`Completed processing`)) * ) * * // Concatenates additional channel based on completion value * // Outputs: 1, 2, 3, then "Completed processing" * ``` * * @since 2.0.0 * @category sequencing */ ( f: (leftover: Types.NoInfer) => Channel ): ( self: Channel ) => Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Concatenates this channel with another channel created from the terminal value * of this channel. The new channel is created using the provided function. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers and terminates with sum * const numberChannel = Channel.fromIterable([1, 2, 3]).pipe( * Channel.concatWith((sum: void) => Channel.succeed(`Completed processing`)) * ) * * // Concatenates additional channel based on completion value * // Outputs: 1, 2, 3, then "Completed processing" * ``` * * @since 2.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (leftover: Types.NoInfer) => Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (leftover: Types.NoInfer) => Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > => fromTransform((upstream, scope) => Effect.sync(() => { let currentPull: Pull.Pull | undefined const forkedScope = Scope.forkUnsafe(scope) const makePull = Effect.flatMap(toTransform(self)(upstream, forkedScope), (pull) => { currentPull = Pull.catchDone(pull, (leftover) => { return Scope.close(forkedScope, Exit.void).pipe( Effect.flatMap(() => toTransform(f(leftover as OutDone))(upstream, scope)), Effect.flatMap((pull) => { currentPull = pull return pull }) ) }) return currentPull }) return Effect.suspend(() => currentPull ?? makePull) }) )) /** * Concatenates this channel with another channel, so that the second channel * starts emitting values after the first channel has completed. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create two channels * const firstChannel = Channel.fromIterable([1, 2, 3]) * const secondChannel = Channel.fromIterable(["a", "b", "c"]) * * // Concatenate them * const concatenatedChannel = Channel.concat(firstChannel, secondChannel) * * // Outputs: 1, 2, 3, "a", "b", "c" * ``` * * @since 2.0.0 * @category sequencing */ export const concat: { /** * Concatenates this channel with another channel, so that the second channel * starts emitting values after the first channel has completed. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create two channels * const firstChannel = Channel.fromIterable([1, 2, 3]) * const secondChannel = Channel.fromIterable(["a", "b", "c"]) * * // Concatenate them * const concatenatedChannel = Channel.concat(firstChannel, secondChannel) * * // Outputs: 1, 2, 3, "a", "b", "c" * ``` * * @since 2.0.0 * @category sequencing */ (that: Channel): ( self: Channel ) => Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Concatenates this channel with another channel, so that the second channel * starts emitting values after the first channel has completed. * * @example * ```ts * import { Channel, Data } from "effect" * * class ConcatError extends Data.TaggedError("ConcatError")<{ * readonly reason: string * }> {} * * // Create two channels * const firstChannel = Channel.fromIterable([1, 2, 3]) * const secondChannel = Channel.fromIterable(["a", "b", "c"]) * * // Concatenate them * const concatenatedChannel = Channel.concat(firstChannel, secondChannel) * * // Outputs: 1, 2, 3, "a", "b", "c" * ``` * * @since 2.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, that: Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, that: Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > => concatWith(self, (_) => that)) /** * Combines the elements from this channel and the specified channel by * repeatedly applying the function `f` to extract an element using both sides * and conceptually "offer" it to the destination channel. `f` can maintain * some internal state to control the combining process, with the initial * state being specified by `s`. * * @since 4.0.0 * @category sequencing */ export const combine: { /** * Combines the elements from this channel and the specified channel by * repeatedly applying the function `f` to extract an element using both sides * and conceptually "offer" it to the destination channel. `f` can maintain * some internal state to control the combining process, with the initial * state being specified by `s`. * * @since 4.0.0 * @category sequencing */ ( that: Channel, s: LazyArg, f: ( s: S, pullLeft: Pull.Pull, pullRight: Pull.Pull ) => Effect.Effect ): (self: Channel) => Channel< A, Pull.ExcludeDone, Cause.Done.Extract, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env | Env2 | R > /** * Combines the elements from this channel and the specified channel by * repeatedly applying the function `f` to extract an element using both sides * and conceptually "offer" it to the destination channel. `f` can maintain * some internal state to control the combining process, with the initial * state being specified by `s`. * * @since 4.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem2, OutErr2, OutDone2, InElem2, InErr2, InDone2, Env2, S, A, E, R >( self: Channel, that: Channel, s: LazyArg, f: ( s: S, pullLeft: Pull.Pull, pullRight: Pull.Pull ) => Effect.Effect ): Channel< A, Pull.ExcludeDone, Cause.Done.Extract, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env | Env2 | R > } = dual(4, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem2, OutErr2, OutDone2, InElem2, InErr2, InDone2, Env2, S, A, E, R >( self: Channel, that: Channel, s: LazyArg, f: ( s: S, pullLeft: Pull.Pull, pullRight: Pull.Pull ) => Effect.Effect ): Channel< A, Pull.ExcludeDone, Cause.Done.Extract, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env | Env2 | R > => fromTransform(Effect.fnUntraced(function*(upstream, scope) { const leftPull = yield* toTransform(self)(upstream, scope) const rightPull = yield* toTransform(that)(upstream, scope) let state = s() return Effect.suspend(() => { const combinedPull = f(state, leftPull, rightPull) return Effect.map(combinedPull, ([a, s1]) => { state = s1 return a }) }) }))) /** * @since 2.0.0 * @category sequencing */ export const orElseIfEmpty: { /** * @since 2.0.0 * @category sequencing */ ( f: (leftover: Types.NoInfer) => Channel ): ( self: Channel ) => Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * @since 2.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (leftover: Types.NoInfer) => Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (leftover: Types.NoInfer) => Channel ): Channel< OutElem | OutElem1, OutErr1 | OutErr, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > => fromTransform((upstream, scope) => Effect.sync(() => { let currentPull: Pull.Pull | undefined const forkedScope = Scope.forkUnsafe(scope) const makePull = Effect.flatMap(toTransform(self)(upstream, forkedScope), (pull) => { const next = pull.pipe( Effect.tap(() => { currentPull = pull return Effect.void }), Pull.catchDone((leftover) => Scope.close(forkedScope, Exit.succeed(leftover)).pipe( Effect.andThen(toTransform(f(leftover as OutDone))(upstream, scope)), Effect.flatMap((pull) => { currentPull = pull return pull }) ) ) ) currentPull = next return next }) return Effect.suspend(() => currentPull ?? makePull) }) )) /** * Flatten a channel of channels. * * @example * ```ts * import { Channel, Data } from "effect" * * class FlattenError extends Data.TaggedError("FlattenError")<{ * readonly cause: string * }> {} * * // Create a channel that outputs channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Flatten the nested channels * const flattenedChannel = Channel.flatten(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 * ``` * * @since 2.0.0 * @category constructors */ export const flatten = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( channels: Channel< Channel, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 > ): Channel => flatMap(channels, identity_) /** * Flattens a channel that outputs arrays into a channel that outputs individual elements. * * @example * ```ts * import { Channel, Data } from "effect" * * class FlattenError extends Data.TaggedError("FlattenError")<{ * readonly message: string * }> {} * * // Create a channel that outputs arrays * const arrayChannel = Channel.fromIterable([ * [1, 2, 3], * [4, 5], * [6, 7, 8, 9] * ]) * * // Flatten the arrays into individual elements * const flattenedChannel = Channel.flattenArray(arrayChannel) * * // Outputs: 1, 2, 3, 4, 5, 6, 7, 8, 9 * ``` * * @since 4.0.0 * @category utils */ export const flattenArray = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env >( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ): Channel => transformPull(self, (pull) => { let array: ReadonlyArray | undefined let index = 0 const pump = Effect.suspend(function loop(): Pull.Pull { if (array === undefined) { return Effect.flatMap(pull, (array_) => { switch (array_.length) { case 0: return loop() case 1: return Effect.succeed(array_[0]) default: { array = array_ return Effect.succeed(array_[index++]) } } }) } const next = array[index++] if (index >= array.length) { array = undefined index = 0 } return Effect.succeed(next) }) return Effect.succeed(pump) }) /** * @since 4.0.0 * @category utils */ export const flattenTake = < OutElem, OutErr, OutDone, OutErr2, OutDone2, InElem, InErr, InDone, Env >( self: Channel, OutErr2, OutDone2, InElem, InErr, InDone, Env> ): Channel, OutErr | OutErr2, OutDone, InElem, InErr, InDone, Env> => mapEffectSequential(self, Take.toPull) as any /** * Creates a new channel that consumes all output from the source channel * but emits nothing, preserving only the completion value. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel that outputs values * const sourceChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Drain all output, keeping only the completion * const drainedChannel = Channel.drain(sourceChannel) * * // The channel completes but emits no values * // Useful for consuming side effects without collecting output * ``` * * @since 2.0.0 * @category constructors */ export const drain = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env >( self: Channel< OutElem, OutErr, OutDone, InElem, InErr, InDone, Env > ): Channel => transformPull(self, (pull) => Effect.succeed(Effect.forever(pull, { disableYield: true }))) /** * Repeats this channel according to the provided schedule. * * @since 4.0.0 * @category utils */ export const repeat: { /** * Repeats this channel according to the provided schedule. * * @since 4.0.0 * @category utils */ ( schedule: | Schedule.Schedule, SE, SR> | (( $: (_: Schedule.Schedule, SE, SR>) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): ( self: Channel ) => Channel /** * Repeats this channel according to the provided schedule. * * @since 4.0.0 * @category utils */ ( self: Channel, schedule: | Schedule.Schedule | (( $: (_: Schedule.Schedule, SE, SR>) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): Channel } = dual(2, ( self: Channel, schedule: | Schedule.Schedule | (( $: (_: Schedule.Schedule, SE, SR>) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): Channel => Schedule.toStepWithMetadata(typeof schedule === "function" ? schedule(identity_) : schedule).pipe( Effect.map((step) => { let meta = Schedule.CurrentMetadata.defaultValue() const loop: Channel< OutElem, OutErr | SE, OutDone, InElem, InErr, InDone, Env | SR > = concatWith( provideServiceEffect(self, Schedule.CurrentMetadata, Effect.sync(() => meta)), (done) => step(done).pipe( Effect.map((meta_) => { meta = meta_ return loop }), Pull.catchDone(() => Effect.succeed(end(done))), unwrap ) ) return loop }), unwrap )) /** * Repeats this channel forever. * * @since 4.0.0 * @category utils */ export const forever = ( self: Channel ): Channel => concatWith(self, () => forever(self)) /** * @since 4.0.0 * @category utils */ export const schedule: { /** * @since 4.0.0 * @category utils */ (schedule: Schedule.Schedule, SE, SR>): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, schedule: Schedule.Schedule ): Channel } = dual(2, ( self: Channel, schedule: Schedule.Schedule ): Channel => transformPull( self, (pull, _scope) => Effect.map( Schedule.toStepWithSleep(schedule), (step) => { const pullWithStep: Pull.Pull< OutElem, OutErr | SE, OutDone | SO, SR > = Effect.tap(pull, step) return pullWithStep } ) )) /** * Filters the output elements of a channel using a predicate function. * Elements that don't match the predicate are discarded. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with mixed numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]) * * // Filter to keep only even numbers * const evenChannel = Channel.filter(numbersChannel, (n) => n % 2 === 0) * // Outputs: 2, 4, 6, 8 * * // Filter with type refinement * const mixedChannel = Channel.fromIterable([1, "hello", 2, "world", 3]) * const numbersOnlyChannel = Channel.filter( * mixedChannel, * (value): value is number => typeof value === "number" * ) * // Outputs: 1, 2, 3 (all typed as numbers) * ``` * * @since 2.0.0 * @category Filtering */ export const filter: { /** * Filters the output elements of a channel using a predicate function. * Elements that don't match the predicate are discarded. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with mixed numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]) * * // Filter to keep only even numbers * const evenChannel = Channel.filter(numbersChannel, (n) => n % 2 === 0) * // Outputs: 2, 4, 6, 8 * * // Filter with type refinement * const mixedChannel = Channel.fromIterable([1, "hello", 2, "world", 3]) * const numbersOnlyChannel = Channel.filter( * mixedChannel, * (value): value is number => typeof value === "number" * ) * // Outputs: 1, 2, 3 (all typed as numbers) * ``` * * @since 2.0.0 * @category Filtering */ (refinement: Predicate.Refinement): ( self: Channel ) => Channel /** * Filters the output elements of a channel using a predicate function. * Elements that don't match the predicate are discarded. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with mixed numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]) * * // Filter to keep only even numbers * const evenChannel = Channel.filter(numbersChannel, (n) => n % 2 === 0) * // Outputs: 2, 4, 6, 8 * * // Filter with type refinement * const mixedChannel = Channel.fromIterable([1, "hello", 2, "world", 3]) * const numbersOnlyChannel = Channel.filter( * mixedChannel, * (value): value is number => typeof value === "number" * ) * // Outputs: 1, 2, 3 (all typed as numbers) * ``` * * @since 2.0.0 * @category Filtering */ (predicate: Predicate.Predicate): ( self: Channel ) => Channel /** * Filters the output elements of a channel using a predicate function. * Elements that don't match the predicate are discarded. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with mixed numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]) * * // Filter to keep only even numbers * const evenChannel = Channel.filter(numbersChannel, (n) => n % 2 === 0) * // Outputs: 2, 4, 6, 8 * * // Filter with type refinement * const mixedChannel = Channel.fromIterable([1, "hello", 2, "world", 3]) * const numbersOnlyChannel = Channel.filter( * mixedChannel, * (value): value is number => typeof value === "number" * ) * // Outputs: 1, 2, 3 (all typed as numbers) * ``` * * @since 2.0.0 * @category Filtering */ ( self: Channel, refinement: Predicate.Refinement ): Channel /** * Filters the output elements of a channel using a predicate function. * Elements that don't match the predicate are discarded. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with mixed numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]) * * // Filter to keep only even numbers * const evenChannel = Channel.filter(numbersChannel, (n) => n % 2 === 0) * // Outputs: 2, 4, 6, 8 * * // Filter with type refinement * const mixedChannel = Channel.fromIterable([1, "hello", 2, "world", 3]) * const numbersOnlyChannel = Channel.filter( * mixedChannel, * (value): value is number => typeof value === "number" * ) * // Outputs: 1, 2, 3 (all typed as numbers) * ``` * * @since 2.0.0 * @category Filtering */ ( self: Channel, predicate: Predicate.Predicate ): Channel } = dual(2, ( self: Channel, predicate: Predicate.Predicate ): Channel => fromTransform((upstream, scope) => Effect.map( toTransform(self)(upstream, scope), (pull) => Effect.flatMap(pull, function loop(elem): Pull.Pull { return predicate(elem) ? Effect.succeed(elem) : Effect.flatMap(pull, loop) }) ) )) /** * @since 4.0.0 * @category Filtering */ export const filterMap: { /** * @since 4.0.0 * @category Filtering */ (filter: Filter.Filter): ( self: Channel ) => Channel /** * @since 4.0.0 * @category Filtering */ ( self: Channel, filter: Filter.Filter ): Channel } = dual(2, ( self: Channel, filter: Filter.Filter ): Channel => fromTransform((upstream, scope) => Effect.map( toTransform(self)(upstream, scope), (pull) => Effect.flatMap(pull, function loop(elem): Pull.Pull { const result = filter(elem) return Result.isFailure(result) ? Effect.flatMap(pull, loop) : Effect.succeed(result.success) }) ) )) /** * @since 4.0.0 * @category Filtering */ export const filterEffect: { /** * @since 4.0.0 * @category Filtering */ (predicate: (a: OutElem) => Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category Filtering */ ( self: Channel, predicate: (a: OutElem) => Effect.Effect ): Channel } = dual(2, ( self: Channel, predicate: (a: OutElem) => Effect.Effect ): Channel => fromTransform((upstream, scope) => Effect.map( toTransform(self)(upstream, scope), (pull) => Effect.flatMap(pull, function loop(elem): Pull.Pull { return Effect.flatMap( predicate(elem), (passes) => passes ? Effect.succeed(elem) : Effect.flatMap(pull, loop) ) }) ) )) /** * @since 4.0.0 * @category Filtering */ export const filterMapEffect: { /** * @since 4.0.0 * @category Filtering */ (filter: Filter.FilterEffect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category Filtering */ ( self: Channel, filter: Filter.FilterEffect ): Channel } = dual(2, ( self: Channel, filter: Filter.FilterEffect ): Channel => fromTransform((upstream, scope) => Effect.map( toTransform(self)(upstream, scope), (pull) => Effect.flatMap(pull, function loop(elem): Pull.Pull { return Effect.flatMap( filter(elem), (result) => Result.isFailure(result) ? Effect.flatMap(pull, loop) : Effect.succeed(result.success) ) }) ) )) /** * Filters arrays of elements emitted by a channel, applying the filter * to each element within the arrays and only emitting non-empty filtered arrays. * * @example * ```ts * import { Array, Channel } from "effect" * * const nonEmptyArrayPredicate = Array.isReadonlyArrayNonEmpty * * // Create a channel that outputs arrays of mixed data * const arrayChannel = Channel.fromIterable([ * Array.make(1, 2, 3, 4, 5), * Array.make(6, 7, 8, 9, 10), * Array.make(11, 12, 13, 14, 15) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * * // Filter arrays to keep only even numbers * const evenArraysChannel = Channel.filterArray(arrayChannel, (n) => n % 2 === 0) * // Outputs: [2, 4], [6, 8, 10], [12, 14] * // Note: Only non-empty filtered arrays are emitted * * // Arrays that would become empty after filtering are discarded entirely * const oddChannel = Channel.fromIterable([ * Array.make(1, 3, 5), * Array.make(2, 4), * Array.make(7, 9) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * const filteredOddChannel = Channel.filterArray(oddChannel, (n) => n % 2 === 0) * // Outputs: [2, 4] (the arrays [1,3,5] and [7,9] are discarded) * ``` * * @since 4.0.0 * @category Filtering */ export const filterArray: { /** * Filters arrays of elements emitted by a channel, applying the filter * to each element within the arrays and only emitting non-empty filtered arrays. * * @example * ```ts * import { Array, Channel } from "effect" * * const nonEmptyArrayPredicate = Array.isReadonlyArrayNonEmpty * * // Create a channel that outputs arrays of mixed data * const arrayChannel = Channel.fromIterable([ * Array.make(1, 2, 3, 4, 5), * Array.make(6, 7, 8, 9, 10), * Array.make(11, 12, 13, 14, 15) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * * // Filter arrays to keep only even numbers * const evenArraysChannel = Channel.filterArray(arrayChannel, (n) => n % 2 === 0) * // Outputs: [2, 4], [6, 8, 10], [12, 14] * // Note: Only non-empty filtered arrays are emitted * * // Arrays that would become empty after filtering are discarded entirely * const oddChannel = Channel.fromIterable([ * Array.make(1, 3, 5), * Array.make(2, 4), * Array.make(7, 9) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * const filteredOddChannel = Channel.filterArray(oddChannel, (n) => n % 2 === 0) * // Outputs: [2, 4] (the arrays [1,3,5] and [7,9] are discarded) * ``` * * @since 4.0.0 * @category Filtering */ (refinement: Predicate.Refinement): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr, OutDone, InElem, InErr, InDone, Env> /** * Filters arrays of elements emitted by a channel, applying the filter * to each element within the arrays and only emitting non-empty filtered arrays. * * @example * ```ts * import { Array, Channel } from "effect" * * const nonEmptyArrayPredicate = Array.isReadonlyArrayNonEmpty * * // Create a channel that outputs arrays of mixed data * const arrayChannel = Channel.fromIterable([ * Array.make(1, 2, 3, 4, 5), * Array.make(6, 7, 8, 9, 10), * Array.make(11, 12, 13, 14, 15) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * * // Filter arrays to keep only even numbers * const evenArraysChannel = Channel.filterArray(arrayChannel, (n) => n % 2 === 0) * // Outputs: [2, 4], [6, 8, 10], [12, 14] * // Note: Only non-empty filtered arrays are emitted * * // Arrays that would become empty after filtering are discarded entirely * const oddChannel = Channel.fromIterable([ * Array.make(1, 3, 5), * Array.make(2, 4), * Array.make(7, 9) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * const filteredOddChannel = Channel.filterArray(oddChannel, (n) => n % 2 === 0) * // Outputs: [2, 4] (the arrays [1,3,5] and [7,9] are discarded) * ``` * * @since 4.0.0 * @category Filtering */ (predicate: Predicate.Predicate>): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr, OutDone, InElem, InErr, InDone, Env> /** * Filters arrays of elements emitted by a channel, applying the filter * to each element within the arrays and only emitting non-empty filtered arrays. * * @example * ```ts * import { Array, Channel } from "effect" * * const nonEmptyArrayPredicate = Array.isReadonlyArrayNonEmpty * * // Create a channel that outputs arrays of mixed data * const arrayChannel = Channel.fromIterable([ * Array.make(1, 2, 3, 4, 5), * Array.make(6, 7, 8, 9, 10), * Array.make(11, 12, 13, 14, 15) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * * // Filter arrays to keep only even numbers * const evenArraysChannel = Channel.filterArray(arrayChannel, (n) => n % 2 === 0) * // Outputs: [2, 4], [6, 8, 10], [12, 14] * // Note: Only non-empty filtered arrays are emitted * * // Arrays that would become empty after filtering are discarded entirely * const oddChannel = Channel.fromIterable([ * Array.make(1, 3, 5), * Array.make(2, 4), * Array.make(7, 9) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * const filteredOddChannel = Channel.filterArray(oddChannel, (n) => n % 2 === 0) * // Outputs: [2, 4] (the arrays [1,3,5] and [7,9] are discarded) * ``` * * @since 4.0.0 * @category Filtering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, refinement: Predicate.Refinement ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> /** * Filters arrays of elements emitted by a channel, applying the filter * to each element within the arrays and only emitting non-empty filtered arrays. * * @example * ```ts * import { Array, Channel } from "effect" * * const nonEmptyArrayPredicate = Array.isReadonlyArrayNonEmpty * * // Create a channel that outputs arrays of mixed data * const arrayChannel = Channel.fromIterable([ * Array.make(1, 2, 3, 4, 5), * Array.make(6, 7, 8, 9, 10), * Array.make(11, 12, 13, 14, 15) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * * // Filter arrays to keep only even numbers * const evenArraysChannel = Channel.filterArray(arrayChannel, (n) => n % 2 === 0) * // Outputs: [2, 4], [6, 8, 10], [12, 14] * // Note: Only non-empty filtered arrays are emitted * * // Arrays that would become empty after filtering are discarded entirely * const oddChannel = Channel.fromIterable([ * Array.make(1, 3, 5), * Array.make(2, 4), * Array.make(7, 9) * ]).pipe(Channel.filter(nonEmptyArrayPredicate)) * const filteredOddChannel = Channel.filterArray(oddChannel, (n) => n % 2 === 0) * // Outputs: [2, 4] (the arrays [1,3,5] and [7,9] are discarded) * ``` * * @since 4.0.0 * @category Filtering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, predicate: Predicate.Predicate> ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> } = dual(2, ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, predicate: Predicate.Predicate> ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> => transformPull(self, (pull) => Effect.succeed(Effect.flatMap( pull, function loop(arr): Pull.Pull, OutErr, OutDone> { const passes: Array = [] for (let i = 0; i < arr.length; i++) { if (predicate(arr[i] as Types.NoInfer)) { passes.push(arr[i]) } } return Arr.isReadonlyArrayNonEmpty(passes) ? Effect.succeed(passes) : Effect.flatMap(pull, loop) } )))) /** * @since 4.0.0 * @category Filtering */ export const filterMapArray: { /** * @since 4.0.0 * @category Filtering */ (filter: Filter.Filter, B, X>): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr, OutDone, InElem, InErr, InDone, Env> /** * @since 4.0.0 * @category Filtering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, filter: Filter.Filter ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> } = dual(2, ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, filter: Filter.Filter ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> => transformPull(self, (pull) => Effect.succeed(Effect.flatMap( pull, function loop(arr): Pull.Pull, OutErr, OutDone> { const passes: Array = [] for (let i = 0; i < arr.length; i++) { const result = filter(arr[i]) if (Result.isSuccess(result)) { passes.push(result.success) } } return Arr.isReadonlyArrayNonEmpty(passes) ? Effect.succeed(passes) : Effect.flatMap(pull, loop) } )))) /** * @since 4.0.0 * @category Filtering */ export const filterArrayEffect: { /** * @since 4.0.0 * @category Filtering */ ( predicate: (a: Types.NoInfer, index: number) => Effect.Effect ): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr | E, OutDone, InElem, InErr, InDone, Env | R> /** * @since 4.0.0 * @category Filtering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, predicate: (a: Types.NoInfer, index: number) => Effect.Effect ): Channel, OutErr | E, OutDone, InElem, InErr, InDone, Env | R> } = dual(2, ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, predicate: (a: Types.NoInfer, index: number) => Effect.Effect ): Channel, OutErr | E, OutDone, InElem, InErr, InDone, Env | R> => transformPull(self, (pull) => { const f = Effect.flatMap(pull, (arr) => Effect.filter(arr, predicate)) return Effect.succeed(Effect.flatMap( f, function loop(arr): Pull.Pull, OutErr | E, OutDone, R> { return Arr.isReadonlyArrayNonEmpty(arr) ? Effect.succeed(arr) : Effect.flatMap(f, loop) } )) })) /** * @since 4.0.0 * @category Filtering */ export const filterMapArrayEffect: { /** * @since 4.0.0 * @category Filtering */ (filter: Filter.FilterEffect, B, X, EX, RX>): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr | EX, OutDone, InElem, InErr, InDone, Env | RX> /** * @since 4.0.0 * @category Filtering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, filter: Filter.FilterEffect ): Channel, OutErr | EX, OutDone, InElem, InErr, InDone, Env | RX> } = dual(2, ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, filter: Filter.FilterEffect ): Channel, OutErr | EX, OutDone, InElem, InErr, InDone, Env | RX> => transformPull(self, (pull) => Effect.succeed(Effect.flatMap( pull, function loop(arr): Pull.Pull, OutErr | EX, OutDone, RX> { return Effect.flatMap( Effect.filterMapEffect(arr, filter as any), (passes) => Arr.isReadonlyArrayNonEmpty(passes) ? Effect.succeed(passes as Arr.NonEmptyReadonlyArray) : Effect.flatMap(pull, loop) ) } )))) /** * Statefully maps over a channel with an accumulator, where each element can produce multiple output values. * * @example * ```ts * import { Channel, Effect } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Use mapAccum to create running sums and emit both current and sum * const runningSum = Channel.mapAccum( * numbersChannel, * () => 0, // initial accumulator state * (sum, current) => { * const newSum = sum + current * // Return [newState, outputValues] * return [newSum, [current, newSum]] as const * } * ) * // Outputs: 1, 1, 2, 3, 3, 6, 4, 10 * * // Using with Effect for async processing * const asyncMapAccum = Channel.mapAccum( * numbersChannel, * () => "", * (acc, value) => * Effect.gen(function*() { * const newAcc = acc + value.toString() * return [newAcc, [`${value}-processed`, newAcc]] as const * }) * ) * ``` * * @since 2.0.0 * @category Sequencing */ export const mapAccum: { /** * Statefully maps over a channel with an accumulator, where each element can produce multiple output values. * * @example * ```ts * import { Channel, Effect } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Use mapAccum to create running sums and emit both current and sum * const runningSum = Channel.mapAccum( * numbersChannel, * () => 0, // initial accumulator state * (sum, current) => { * const newSum = sum + current * // Return [newState, outputValues] * return [newSum, [current, newSum]] as const * } * ) * // Outputs: 1, 1, 2, 3, 3, 6, 4, 10 * * // Using with Effect for async processing * const asyncMapAccum = Channel.mapAccum( * numbersChannel, * () => "", * (acc, value) => * Effect.gen(function*() { * const newAcc = acc + value.toString() * return [newAcc, [`${value}-processed`, newAcc]] as const * }) * ) * ``` * * @since 2.0.0 * @category Sequencing */ ( initial: LazyArg, f: ( s: S, a: Types.NoInfer ) => | Effect.Effect], E, R> | readonly [state: S, values: ReadonlyArray], options?: { readonly onHalt?: ((state: S) => Array) | undefined } ): < OutErr, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< B, OutErr | E, OutDone, InElem, InErr, InDone, Env | R > /** * Statefully maps over a channel with an accumulator, where each element can produce multiple output values. * * @example * ```ts * import { Channel, Effect } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Use mapAccum to create running sums and emit both current and sum * const runningSum = Channel.mapAccum( * numbersChannel, * () => 0, // initial accumulator state * (sum, current) => { * const newSum = sum + current * // Return [newState, outputValues] * return [newSum, [current, newSum]] as const * } * ) * // Outputs: 1, 1, 2, 3, 3, 6, 4, 10 * * // Using with Effect for async processing * const asyncMapAccum = Channel.mapAccum( * numbersChannel, * () => "", * (acc, value) => * Effect.gen(function*() { * const newAcc = acc + value.toString() * return [newAcc, [`${value}-processed`, newAcc]] as const * }) * ) * ``` * * @since 2.0.0 * @category Sequencing */ ( self: Channel, initial: LazyArg, f: ( s: S, a: Types.NoInfer ) => | Effect.Effect], E, R> | readonly [state: S, values: ReadonlyArray], options?: { readonly onHalt?: ((state: S) => Array) | undefined } ): Channel } = dual( (args) => isChannel(args[0]), ( self: Channel, initial: LazyArg, f: ( s: S, a: Types.NoInfer ) => | Effect.Effect], E, R> | readonly [state: S, values: ReadonlyArray], options?: { readonly onHalt?: ((state: S) => ReadonlyArray) | undefined } ): Channel => fromTransform((upstream, scope) => Effect.map(toTransform(self)(upstream, scope), (pull) => { let state = initial() let current: ReadonlyArray | undefined let index = 0 let cause: Cause.Cause> | undefined const pullNext = Effect.matchCauseEffect(pull, { onFailure(cause_) { cause = cause_ const b = options?.onHalt && options.onHalt(state) return b && b.length > 0 ? Effect.succeed([state, b] as const) : Effect.failCause(cause_) }, onSuccess(a): Effect.Effect], E, R> { const b = f(state, a) return Arr.isArray(b) ? Effect.succeed(b as any) : b as any } }) const pump = Effect.suspend(function loop(): Pull.Pull { if (current === undefined) { if (cause) return Effect.failCause(cause) return Effect.flatMap(pullNext, ([newState, values]) => { state = newState if (values.length === 0) { return loop() } else if (values.length === 1) { return Effect.succeed(values[0]) } current = values return loop() }) } const next = current[index++] if (index >= current.length) { current = undefined index = 0 } return Effect.succeed(next) }) return pump }) ) ) /** * Statefully transforms a channel by scanning over its output with an accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Scan to create running sum * const runningSumChannel = Channel.scan(numbersChannel, 0, (sum, n) => sum + n) * // Outputs: 0, 1, 3, 6, 10, 15 * // Note: emits the initial value and each intermediate result * * // Scan with string concatenation * const wordsChannel = Channel.fromIterable(["hello", "world", "from", "effect"]) * const sentenceChannel = Channel.scan( * wordsChannel, * "", * (sentence, word) => sentence === "" ? word : `${sentence} ${word}` * ) * // Outputs: "", "hello", "hello world", "hello world from", "hello world from effect" * ``` * * @since 2.0.0 * @category Sequencing */ export const scan: { /** * Statefully transforms a channel by scanning over its output with an accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Scan to create running sum * const runningSumChannel = Channel.scan(numbersChannel, 0, (sum, n) => sum + n) * // Outputs: 0, 1, 3, 6, 10, 15 * // Note: emits the initial value and each intermediate result * * // Scan with string concatenation * const wordsChannel = Channel.fromIterable(["hello", "world", "from", "effect"]) * const sentenceChannel = Channel.scan( * wordsChannel, * "", * (sentence, word) => sentence === "" ? word : `${sentence} ${word}` * ) * // Outputs: "", "hello", "hello world", "hello world from", "hello world from effect" * ``` * * @since 2.0.0 * @category Sequencing */ (initial: S, f: (s: S, a: Types.NoInfer) => S): < OutErr, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< S, OutErr, OutDone, InElem, InErr, InDone, Env > /** * Statefully transforms a channel by scanning over its output with an accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel } from "effect" * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Scan to create running sum * const runningSumChannel = Channel.scan(numbersChannel, 0, (sum, n) => sum + n) * // Outputs: 0, 1, 3, 6, 10, 15 * // Note: emits the initial value and each intermediate result * * // Scan with string concatenation * const wordsChannel = Channel.fromIterable(["hello", "world", "from", "effect"]) * const sentenceChannel = Channel.scan( * wordsChannel, * "", * (sentence, word) => sentence === "" ? word : `${sentence} ${word}` * ) * // Outputs: "", "hello", "hello world", "hello world from", "hello world from effect" * ``` * * @since 2.0.0 * @category Sequencing */ ( self: Channel, initial: S, f: (s: S, a: Types.NoInfer) => S ): Channel } = dual(3, ( self: Channel, initial: S, f: (s: S, a: Types.NoInfer) => S ): Channel => scanEffect(self, initial, (s, a) => Effect.succeed(f(s, a)))) /** * Statefully transforms a channel by scanning over its output with an effectful accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class ScanError extends Data.TaggedError("ScanError")<{ * readonly reason: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Effectful scan with async operations * const asyncScanChannel = Channel.scanEffect( * numbersChannel, * "", * (acc, value) => * Effect.gen(function*() { * // Simulate async work * yield* Effect.sleep("10 millis") * return acc + value.toString() * }) * ) * // Outputs: "", "1", "12", "123", "1234" * * // Scan with error handling * const errorHandlingScan = Channel.scanEffect( * numbersChannel, * 0, * (sum, n) => { * if (n < 0) { * return Effect.fail(new ScanError({ reason: "negative number" })) * } * return Effect.succeed(sum + n) * } * ) * ``` * * @since 2.0.0 * @category Sequencing */ export const scanEffect: { /** * Statefully transforms a channel by scanning over its output with an effectful accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class ScanError extends Data.TaggedError("ScanError")<{ * readonly reason: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Effectful scan with async operations * const asyncScanChannel = Channel.scanEffect( * numbersChannel, * "", * (acc, value) => * Effect.gen(function*() { * // Simulate async work * yield* Effect.sleep("10 millis") * return acc + value.toString() * }) * ) * // Outputs: "", "1", "12", "123", "1234" * * // Scan with error handling * const errorHandlingScan = Channel.scanEffect( * numbersChannel, * 0, * (sum, n) => { * if (n < 0) { * return Effect.fail(new ScanError({ reason: "negative number" })) * } * return Effect.succeed(sum + n) * } * ) * ``` * * @since 2.0.0 * @category Sequencing */ (initial: S, f: (s: S, a: Types.NoInfer) => Effect.Effect): < OutErr, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< S, OutErr | E, OutDone, InElem, InErr, InDone, Env | R > /** * Statefully transforms a channel by scanning over its output with an effectful accumulator function. * Emits the intermediate results of the scan operation. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class ScanError extends Data.TaggedError("ScanError")<{ * readonly reason: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4]) * * // Effectful scan with async operations * const asyncScanChannel = Channel.scanEffect( * numbersChannel, * "", * (acc, value) => * Effect.gen(function*() { * // Simulate async work * yield* Effect.sleep("10 millis") * return acc + value.toString() * }) * ) * // Outputs: "", "1", "12", "123", "1234" * * // Scan with error handling * const errorHandlingScan = Channel.scanEffect( * numbersChannel, * 0, * (sum, n) => { * if (n < 0) { * return Effect.fail(new ScanError({ reason: "negative number" })) * } * return Effect.succeed(sum + n) * } * ) * ``` * * @since 2.0.0 * @category Sequencing */ ( self: Channel, initial: S, f: (s: S, a: Types.NoInfer) => Effect.Effect ): Channel } = dual(3, ( self: Channel, initial: S, f: (s: S, a: Types.NoInfer) => Effect.Effect ): Channel => fromTransform((upstream, scope) => Effect.map(toTransform(self)(upstream, scope), (pull) => { let state = initial let isFirst = true return Effect.suspend(() => { if (isFirst) { isFirst = false return Effect.succeed(state) } return Effect.map( Effect.flatMap(pull, (a) => f(state, a)), (newState) => { state = newState return state } ) }) }) )) /** * Catches any cause of failure from the channel and allows recovery by * creating a new channel based on the caught cause. * * @example * ```ts * import { Cause, Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly reason: string * }> {} * * class RecoveryError extends Data.TaggedError("RecoveryError")<{ * readonly message: string * }> {} * * // Create a failing channel * const failingChannel = Channel.fail( * new ProcessError({ reason: "network error" }) * ) * * // Catch the cause and provide recovery * const recoveredChannel = Channel.catchCause(failingChannel, (cause) => { * if (Cause.hasFails(cause)) { * return Channel.succeed("Recovered from failure") * } * return Channel.succeed("Recovered from interruption") * }) * * // The channel recovers gracefully from errors * ``` * * @since 4.0.0 * @category Error handling */ export const catchCause: { /** * Catches any cause of failure from the channel and allows recovery by * creating a new channel based on the caught cause. * * @example * ```ts * import { Cause, Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly reason: string * }> {} * * class RecoveryError extends Data.TaggedError("RecoveryError")<{ * readonly message: string * }> {} * * // Create a failing channel * const failingChannel = Channel.fail( * new ProcessError({ reason: "network error" }) * ) * * // Catch the cause and provide recovery * const recoveredChannel = Channel.catchCause(failingChannel, (cause) => { * if (Cause.hasFails(cause)) { * return Channel.succeed("Recovered from failure") * } * return Channel.succeed("Recovered from interruption") * }) * * // The channel recovers gracefully from errors * ``` * * @since 4.0.0 * @category Error handling */ ( f: (d: Cause.Cause) => Channel ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > /** * Catches any cause of failure from the channel and allows recovery by * creating a new channel based on the caught cause. * * @example * ```ts * import { Cause, Channel, Data } from "effect" * * class ProcessError extends Data.TaggedError("ProcessError")<{ * readonly reason: string * }> {} * * class RecoveryError extends Data.TaggedError("RecoveryError")<{ * readonly message: string * }> {} * * // Create a failing channel * const failingChannel = Channel.fail( * new ProcessError({ reason: "network error" }) * ) * * // Catch the cause and provide recovery * const recoveredChannel = Channel.catchCause(failingChannel, (cause) => { * if (Cause.hasFails(cause)) { * return Channel.succeed("Recovered from failure") * } * return Channel.succeed("Recovered from interruption") * }) * * // The channel recovers gracefully from errors * ``` * * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: Cause.Cause) => Channel ): Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: Cause.Cause) => Channel ): Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => fromTransform((upstream, scope) => { let forkedScope = Scope.forkUnsafe(scope) return Effect.map(toTransform(self)(upstream, forkedScope), (pull) => { let currentPull: Pull.Pull = pull.pipe( Effect.catchCause((cause): Pull.Pull => { if (Pull.isDoneCause(cause)) { return Effect.failCause(cause as Cause.Cause>) } const toClose = forkedScope forkedScope = Scope.forkUnsafe(scope) return Scope.close(toClose, Exit.failCause(cause)).pipe( Effect.andThen(toTransform(f(cause as Cause.Cause))(upstream, forkedScope)), Effect.flatMap((childPull) => { currentPull = childPull return childPull }) ) }) ) return Effect.suspend(() => currentPull) }) })) /** * @since 4.0.0 * @category Error handling */ export const tapCause: { /** * @since 4.0.0 * @category Error handling */ (f: (d: Cause.Cause) => Effect.Effect): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, A, E, R >( self: Channel, f: (d: Cause.Cause) => Effect.Effect ): Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, A, E, R >( self: Channel, f: (d: Cause.Cause) => Effect.Effect ): Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > => catchCause(self, (cause) => fromEffectDrain(Effect.flatMap(f(cause), (_) => Effect.failCause(cause))))) /** * Catches causes of failure that match a specific filter, allowing * conditional error recovery based on the type of failure. * * @since 4.0.0 * @category Error handling */ export const catchCauseIf: { /** * Catches causes of failure that match a specific filter, allowing * conditional error recovery based on the type of failure. * * @since 4.0.0 * @category Error handling */ < OutErr, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( predicate: Predicate.Predicate>, f: (cause: Cause.Cause) => Channel ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > /** * Catches causes of failure that match a specific filter, allowing * conditional error recovery based on the type of failure. * * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, predicate: Predicate.Predicate>, f: (cause: Cause.Cause) => Channel ): Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual(3, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, predicate: Predicate.Predicate>, f: (cause: Cause.Cause) => Channel ): Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => catchCause( self, ( cause ): Channel< OutElem1, OutErr | OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 > => { return predicate(cause) ? f(cause) : failCause(cause as any) } )) /** * @since 4.0.0 * @category Error handling */ export const catchCauseFilter: { /** * @since 4.0.0 * @category Error handling */ < OutErr, EB, X extends Cause.Cause, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( filter: Filter.Filter, EB, X>, f: ( failure: EB, cause: Cause.Cause ) => Channel ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1, Cause.Cause.Error | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, EB, X extends Cause.Cause, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, filter: Filter.Filter, EB, X>, f: ( failure: EB, cause: Cause.Cause ) => Channel ): Channel< OutElem | OutElem1, Cause.Cause.Error | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual(3, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, EB, X extends Cause.Cause, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, filter: Filter.Filter, EB, X>, f: ( failure: EB, cause: Cause.Cause ) => Channel ): Channel< OutElem | OutElem1, Cause.Cause.Error | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => catchCause( self, ( cause ): Channel< OutElem1, Cause.Cause.Error | OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 > => { const result = filter(cause) return Result.isFailure(result) ? failCause(result.failure) : f(result.success, cause) } )) const catch_: { ( f: (d: OutErr) => Channel ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutErr) => Channel ): Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutErr) => Channel ): Channel< OutElem | OutElem1, OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => catchCauseFilter(self, Cause.findError, (e) => f(e))) export { /** * @since 4.0.0 * @category Error handling */ catch_ as catch } /** * @since 4.0.0 * @category Error handling */ export const tapError: { /** * @since 4.0.0 * @category Error handling */ (f: (d: OutErr) => Effect.Effect): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, A, E, R >( self: Channel, f: (d: OutErr) => Effect.Effect ): Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, A, E, R >( self: Channel, f: (d: OutErr) => Effect.Effect ): Channel< OutElem, OutErr | E, OutDone | void, InElem, InErr, InDone, Env | R > => transformPull( self, (pull) => Effect.succeed(Effect.tapError( pull, (err) => Cause.isDone(err) ? Effect.void : Effect.asVoid(f(err)) )) )) /** * @since 4.0.0 * @category Error handling */ export const catchIf: { /** * @since 4.0.0 * @category Error handling */ < OutErr, EB extends OutErr, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = Exclude, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( refinement: Predicate.Refinement, f: (failure: EB) => Channel, orElse?: | (( failure: Exclude ) => Channel) | undefined ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * @since 4.0.0 * @category Error handling */ < OutErr, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = OutErr, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( predicate: Predicate.Predicate, f: (failure: OutErr) => Channel, orElse?: | (( failure: OutErr ) => Channel) | undefined ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, EB extends OutErr, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = Exclude, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, refinement: Predicate.Refinement, f: (failure: EB) => Channel, orElse?: | (( failure: Exclude ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = OutErr, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, predicate: Predicate.Predicate, f: (failure: OutErr) => Channel, orElse?: | (( failure: OutErr ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > } = dual((args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = OutErr, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, predicate: Predicate.Predicate, f: (failure: OutErr) => Channel, orElse?: | (( failure: OutErr ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => catch_( self, (err): Channel< OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone1 | OutDone2, InElem1 & InElem2, InErr1 & InErr2, InDone1 & InDone2, Env1 | Env2 > => { return predicate(err) ? f(err) : orElse ? orElse(err) : fail(err as any) as any } )) /** * @since 4.0.0 * @category Error handling */ export const catchFilter: { /** * @since 4.0.0 * @category Error handling */ < OutErr, EB, X, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = X, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( filter: Filter.Filter, f: (failure: EB) => Channel, orElse?: | (( failure: X ) => Channel) | undefined ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, EB, X, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = X, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, filter: Filter.Filter, f: (failure: EB) => Channel, orElse?: | (( failure: X ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > } = dual((args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, EB, X, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = X, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, filter: Filter.Filter, f: (failure: EB) => Channel, orElse?: | (( failure: X ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => catch_( self, (err): Channel< OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone1 | OutDone2, InElem1 & InElem2, InErr1 & InErr2, InDone1 & InDone2, Env1 | Env2 > => { const result = filter(err) return Result.isFailure(result) ? orElse ? orElse(result.failure) : fail(result.failure as any) as any : f(result.success) } )) /** * @since 4.0.0 * @category Error handling */ export const catchTag: { /** * @since 4.0.0 * @category Error handling */ < OutErr, const K extends Types.Tags | Arr.NonEmptyReadonlyArray>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = Types.ExcludeTag ? K[number] : K>, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( k: K, f: ( e: Types.ExtractTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel, orElse?: | (( e: Types.ExcludeTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel) | undefined ): < OutElem, OutDone, InElem, InErr, InDone, Env >(self: Channel) => Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, const K extends Types.Tags | Arr.NonEmptyReadonlyArray>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = Types.ExcludeTag ? K[number] : K>, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, k: K, f: ( e: Types.ExtractTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel, orElse?: | (( e: Types.ExcludeTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > } = dual((args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, const K extends Types.Tags | Arr.NonEmptyReadonlyArray>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = never, OutErr2 = Types.ExcludeTag ? K[number] : K>, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, k: K, f: ( e: Types.ExtractTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel, orElse?: | (( e: Types.ExcludeTag, K extends Arr.NonEmptyReadonlyArray ? K[number] : K> ) => Channel) | undefined ): Channel< OutElem | OutElem1 | OutElem2, OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => { const pred = Array.isArray(k) ? ((e: OutErr): e is any => hasProperty(e, "_tag") && k.includes(e._tag)) : isTagged(k as string) return catchIf(self, pred, f, orElse as any) as any }) /** * Catches a specific reason within a tagged error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const recovered = channel.pipe( * Channel.catchReason("AiError", "RateLimitError", (reason) => * Channel.succeed(`retry: ${reason.retryAfter}`) * ) * ) * ``` * * @since 4.0.0 * @category Error handling */ export const catchReason: { /** * Catches a specific reason within a tagged error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const recovered = channel.pipe( * Channel.catchReason("AiError", "RateLimitError", (reason) => * Channel.succeed(`retry: ${reason.retryAfter}`) * ) * ) * ``` * * @since 4.0.0 * @category Error handling */ < OutErr, K extends Types.Tags, RK extends Types.ReasonTags, K>>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = Types.unassigned, OutErr2 = never, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( errorTag: K, reasonTag: RK, f: ( reason: Types.ExtractReason, K>, RK>, error: Types.NarrowReason, K>, RK> ) => Channel, orElse?: | (( reason: Types.ExcludeReason, K>, RK>, error: Types.OmitReason, K>, RK> ) => Channel) | undefined ): < OutElem, OutDone, InElem, InErr, InDone, Env >( self: Channel ) => Channel< OutElem | OutElem1 | Exclude, (OutElem2 extends Types.unassigned ? OutErr : Types.ExcludeTag) | OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > /** * Catches a specific reason within a tagged error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const recovered = channel.pipe( * Channel.catchReason("AiError", "RateLimitError", (reason) => * Channel.succeed(`retry: ${reason.retryAfter}`) * ) * ) * ``` * * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, K extends Types.Tags, RK extends Types.ReasonTags, K>>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = Types.unassigned, OutErr2 = never, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, errorTag: K, reasonTag: RK, f: ( reason: Types.ExtractReason, K>, RK>, error: Types.NarrowReason, K>, RK> ) => Channel, orElse?: | (( reason: Types.ExcludeReason, K>, RK>, error: Types.OmitReason, K>, RK> ) => Channel) | undefined ): Channel< OutElem | OutElem1 | Exclude, (OutElem2 extends Types.unassigned ? OutErr : Types.ExcludeTag) | OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > } = dual((args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, K extends Types.Tags, RK extends Types.ReasonTags, K>>, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1, OutElem2 = Types.unassigned, OutErr2 = never, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, errorTag: K, reasonTag: RK, f: ( reason: Types.ExtractReason, K>, RK>, error: Types.NarrowReason, K>, RK> ) => Channel, orElse?: | (( reason: Types.ExcludeReason, K>, RK>, error: Types.OmitReason, K>, RK> ) => Channel) | undefined ): Channel< OutElem | OutElem1 | Exclude, (OutElem2 extends Types.unassigned ? OutErr : Types.ExcludeTag) | OutErr1 | OutErr2, OutDone | OutDone1 | OutDone2, InElem & InElem1 & InElem2, InErr & InErr1 & InErr2, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => catch_( self, (error): Channel< OutElem1 | Exclude, OutErr1 | OutErr2, OutDone1 | OutDone2, InElem1 & InElem2, InErr1 & InErr2, InDone1 & InDone2, Env1 | Env2 > => { if (isTagged(error, errorTag) && hasProperty(error, "reason")) { const reason = error.reason as Types.ExcludeReason, K>, RK> if (isTagged(reason, reasonTag)) { return f(reason as any, error as any) } return orElse ? orElse(reason, error as any) as any : fail(error) as any } return fail(error) as any } )) /** * Catches multiple reasons within a tagged error using an object of handlers. * * @since 4.0.0 * @category Error handling */ export const catchReasons: { /** * Catches multiple reasons within a tagged error using an object of handlers. * * @since 4.0.0 * @category Error handling */ < K extends Types.Tags, OutErr, Cases extends { [RK in Types.ReasonTags, K>>]+?: ( reason: Types.ExtractReason, K>, RK>, error: Types.NarrowReason, K>, RK> ) => Channel }, OutElem2 = Types.unassigned, OutErr2 = never, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( errorTag: K, cases: Cases, orElse?: | (( reason: Types.ExcludeReason, K>, Extract>, error: Types.OmitReason, K>, Extract> ) => Channel) | undefined ): ( self: Channel ) => Channel< | OutElem | Exclude | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutElem1 : never }[keyof Cases], | (OutElem2 extends Types.unassigned ? OutErr : Types.ExcludeTag) | OutErr2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutErr1 : never }[keyof Cases], | OutDone | OutDone2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutDone1 : never }[keyof Cases], & InElem & InElem2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InElem1 : never }[keyof Cases], & InErr & InErr2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InErr1 : never }[keyof Cases], & InDone & InDone2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InDone1 : never }[keyof Cases], | Env | Env2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? Env1 : never }[keyof Cases] > /** * Catches multiple reasons within a tagged error using an object of handlers. * * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, K extends Types.Tags, Cases extends { [RK in Types.ReasonTags>]+?: ( reason: Types.ExtractReason, RK>, error: Types.NarrowReason, RK> ) => Channel }, OutElem2 = Types.unassigned, OutErr2 = never, OutDone2 = never, InElem2 = unknown, InErr2 = unknown, InDone2 = unknown, Env2 = never >( self: Channel, errorTag: K, cases: Cases, orElse?: | (( reason: Types.ExcludeReason, K>, Extract>, error: Types.OmitReason, K>, Extract> ) => Channel) | undefined ): Channel< | OutElem | Exclude | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutElem1 : never }[keyof Cases], | (OutElem2 extends Types.unassigned ? OutErr : Types.ExcludeTag) | OutErr2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutErr1 : never }[keyof Cases], | OutDone | OutDone2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? OutDone1 : never }[keyof Cases], & InElem & InElem2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InElem1 : never }[keyof Cases], & InErr & InErr2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InErr1 : never }[keyof Cases], & InDone & InDone2 & { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? InDone1 : never }[keyof Cases], | Env | Env2 | { [RK in keyof Cases]: Cases[RK] extends (...args: Array) => Channel ? Env1 : never }[keyof Cases] > } = dual((args) => isChannel(args[0]), (self, errorTag, cases, orElse) => { let keys: Set return catch_(self, (error) => { if ( isTagged(error, errorTag) && hasProperty(error, "reason") && hasProperty(error.reason, "_tag") && String.isString(error.reason._tag) ) { const reason = error.reason as { readonly _tag: string } keys ??= new Set(Object.keys(cases)) if (keys.has(reason._tag)) { return (cases as any)[reason._tag](reason as any, error) } return orElse ? orElse(reason, error) as any : fail(error) as any } return fail(error) as any }) }) /** * Promotes nested reason errors into the channel error, replacing the parent error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const unwrapped = channel.pipe(Channel.unwrapReason("AiError")) * ``` * * @since 4.0.0 * @category Error handling */ export const unwrapReason: { /** * Promotes nested reason errors into the channel error, replacing the parent error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const unwrapped = channel.pipe(Channel.unwrapReason("AiError")) * ``` * * @since 4.0.0 * @category Error handling */ < K extends TagsWithReason, OutErr >(errorTag: K): ( self: Channel ) => Channel< OutElem, Types.ExcludeTag | Types.ReasonOf>, OutDone, InElem, InErr, InDone, Env > /** * Promotes nested reason errors into the channel error, replacing the parent error. * * @example * ```ts * import { Channel, Data } from "effect" * * class RateLimitError extends Data.TaggedError("RateLimitError")<{ * retryAfter: number * }> {} * * class QuotaExceededError extends Data.TaggedError("QuotaExceededError")<{ * limit: number * }> {} * * class AiError extends Data.TaggedError("AiError")<{ * reason: RateLimitError | QuotaExceededError * }> {} * * const channel = Channel.fail( * new AiError({ reason: new RateLimitError({ retryAfter: 60 }) }) * ) * * const unwrapped = channel.pipe(Channel.unwrapReason("AiError")) * ``` * * @since 4.0.0 * @category Error handling */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, K extends TagsWithReason >( self: Channel, errorTag: K ): Channel< OutElem, Types.ExcludeTag | Types.ReasonOf>, OutDone, InElem, InErr, InDone, Env > } = dual(2, < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, K extends TagsWithReason >( self: Channel, errorTag: K ): Channel< OutElem, Types.ExcludeTag | Types.ReasonOf>, OutDone, InElem, InErr, InDone, Env > => catchFilter( self, (error) => isTagged(error, errorTag) && hasProperty(error, "reason") ? Result.succeed(error.reason) : Result.fail(error), fail ) as any) /** * Returns a new channel, which is the same as this one, except the failure * value of the returned channel is created by applying the specified function * to the failure value of this channel. * * @since 2.0.0 * @category Error handling */ export const mapError: { /** * Returns a new channel, which is the same as this one, except the failure * value of the returned channel is created by applying the specified function * to the failure value of this channel. * * @since 2.0.0 * @category Error handling */ (f: (err: OutErr) => OutErr2): ( self: Channel ) => Channel /** * Returns a new channel, which is the same as this one, except the failure * value of the returned channel is created by applying the specified function * to the failure value of this channel. * * @since 2.0.0 * @category Error handling */ ( self: Channel, f: (err: OutErr) => OutErr2 ): Channel } = dual(2, ( self: Channel, f: (err: OutErr) => OutErr2 ): Channel => catch_(self, (err) => fail(f(err)))) /** * Converts all errors in the channel to defects (unrecoverable failures). * This is useful when you want to treat errors as programming errors. * * @example * ```ts * import { Channel, Data } from "effect" * * class ValidationError extends Data.TaggedError("ValidationError")<{ * readonly field: string * }> {} * * // Create a channel that might fail * const failingChannel = Channel.fail(new ValidationError({ field: "email" })) * * // Convert failures to defects * const fatalChannel = Channel.orDie(failingChannel) * * // Any failure will now become a defect (uncaught exception) * ``` * * @since 4.0.0 * @category Error handling */ export const orDie = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env >( self: Channel ): Channel => catch_(self, die) /** * Ignores all errors in the channel, converting them to an empty channel. * * Use the `log` option to emit the full {@link Cause} when the channel fails. * * @since 4.0.0 * @category Error handling */ export const ignore: < Arg extends Channel | { readonly log?: boolean | Severity | undefined } | undefined = { readonly log?: boolean | Severity | undefined } >( selfOrOptions: Arg, options?: { readonly log?: boolean | Severity | undefined } | undefined ) => [Arg] extends [Channel] ? Channel : ( self: Channel ) => Channel = dual( (args) => isChannel(args[0]), ( self: Channel, options?: { readonly log?: boolean | Severity | undefined } | undefined ): Channel => { if (!options?.log) { return catch_(self, () => empty) } const logEffect = Effect.logWithLevel(options.log === true ? undefined : options.log) return catch_( tapCause(self, (cause) => Cause.hasFails(cause) ? logEffect(cause) : Effect.void), () => empty ) } ) const ignoreCause_ = < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env >( self: Channel ): Channel => catchCause(self, () => empty) /** * Ignores all errors in the channel including defects, converting them to an empty channel. * * Use the `log` option to emit the full {@link Cause} when the channel fails. * * @since 4.0.0 * @category Error handling */ export const ignoreCause: < Arg extends Channel | { readonly log?: boolean | Severity | undefined } | undefined = { readonly log?: boolean | Severity | undefined } >( selfOrOptions: Arg, options?: { readonly log?: boolean | Severity | undefined } | undefined ) => [Arg] extends [Channel] ? Channel : ( self: Channel ) => Channel = dual( (args) => isChannel(args[0]), ( self: Channel, options?: { readonly log?: boolean | Severity | undefined } | undefined ): Channel => { if (!options?.log) return ignoreCause_(self) const logEffect = Effect.logWithLevel(options.log === true ? undefined : options.log) return ignoreCause_(tapCause(self, (cause) => logEffect(cause))) } ) /** * Returns a new channel that retries this channel according to the specified * schedule whenever it fails. * * @since 4.0.0 * @category utils */ export const retry: { /** * Returns a new channel that retries this channel according to the specified * schedule whenever it fails. * * @since 4.0.0 * @category utils */ ( schedule: | Schedule.Schedule, SE, SR> | (( $: ( _: Schedule.Schedule, SE, SR> ) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): ( self: Channel ) => Channel /** * Returns a new channel that retries this channel according to the specified * schedule whenever it fails. * * @since 4.0.0 * @category utils */ ( self: Channel, schedule: | Schedule.Schedule | (( $: ( _: Schedule.Schedule, SE, SR> ) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): Channel } = dual(2, ( self: Channel, schedule: | Schedule.Schedule | (( $: (_: Schedule.Schedule, SE, R>) => Schedule.Schedule ) => Schedule.Schedule, SE, SR>) ): Channel => suspend(() => { let step: ((input: OutErr) => Pull.Pull, SE, SO, SR>) | undefined = undefined let meta = Schedule.CurrentMetadata.defaultValue() const selfWithMeta = provideServiceEffect(self, Schedule.CurrentMetadata, Effect.sync(() => meta)) const withReset = onFirst(selfWithMeta, () => { step = undefined return Effect.void }) const resolvedSchedule = typeof schedule === "function" ? schedule(identity_) : schedule const loop: Channel< OutElem, OutErr | SE, OutDone, InElem, InErr, InDone, Env | SR > = catch_( withReset, Effect.fnUntraced( function*(error) { if (!step) { step = yield* Schedule.toStepWithMetadata(resolvedSchedule) } meta = yield* step(error) return loop }, (effect, error) => Pull.catchDone(effect, () => Effect.succeed(fail(error))), unwrap ) ) return loop })) /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class SwitchError extends Data.TaggedError("SwitchError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Switch to new channels based on each value * const switchedChannel = Channel.switchMap( * numberChannel, * (n) => Channel.fromIterable([`value-${n}`]) * ) * * // Outputs: "value-1", "value-2", "value-3" * ``` * * @since 2.0.0 * @category sequencing */ export const switchMap: { /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class SwitchError extends Data.TaggedError("SwitchError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Switch to new channels based on each value * const switchedChannel = Channel.switchMap( * numberChannel, * (n) => Channel.fromIterable([`value-${n}`]) * ) * * // Outputs: "value-1", "value-2", "value-3" * ``` * * @since 2.0.0 * @category sequencing */ ( f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): ( self: Channel ) => Channel< OutElem1, OutErr1 | OutErr, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Returns a new channel, which sequentially combines this channel, together * with the provided factory function, which creates a second channel based on * the output values of this channel. The result is a channel that will first * perform the functions of this channel, before performing the functions of * the created channel (including yielding its terminal value). * * @example * ```ts * import { Channel, Data } from "effect" * * class SwitchError extends Data.TaggedError("SwitchError")<{ * readonly reason: string * }> {} * * // Create a channel that outputs numbers * const numberChannel = Channel.fromIterable([1, 2, 3]) * * // Switch to new channels based on each value * const switchedChannel = Channel.switchMap( * numberChannel, * (n) => Channel.fromIterable([`value-${n}`]) * ) * * // Outputs: "value-1", "value-2", "value-3" * ``` * * @since 2.0.0 * @category sequencing */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual( (args) => isChannel(args[0]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( self: Channel, f: (d: OutElem) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< OutElem1, OutErr | OutErr1, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => self.pipe( map(f), mergeAll({ ...options, concurrency: options?.concurrency ?? 1, switch: true }) ) ) /** * Merges multiple channels with specified concurrency and buffering options. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeAllError extends Data.TaggedError("MergeAllError")<{ * readonly reason: string * }> {} * * // Create channels that output other channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Merge all channels with bounded concurrency * const mergedChannel = Channel.mergeAll({ * concurrency: 2, * bufferSize: 16 * })(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 (order may vary due to concurrency) * ``` * * @since 2.0.0 * @category utils */ export const mergeAll: { /** * Merges multiple channels with specified concurrency and buffering options. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeAllError extends Data.TaggedError("MergeAllError")<{ * readonly reason: string * }> {} * * // Create channels that output other channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Merge all channels with bounded concurrency * const mergedChannel = Channel.mergeAll({ * concurrency: 2, * bufferSize: 16 * })(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 (order may vary due to concurrency) * ``` * * @since 2.0.0 * @category utils */ ( options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly switch?: boolean | undefined } ): ( channels: Channel< Channel, OutErr, OutDone, InElem, InErr, InDone, Env > ) => Channel< OutElem, OutErr1 | OutErr, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Merges multiple channels with specified concurrency and buffering options. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeAllError extends Data.TaggedError("MergeAllError")<{ * readonly reason: string * }> {} * * // Create channels that output other channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Merge all channels with bounded concurrency * const mergedChannel = Channel.mergeAll({ * concurrency: 2, * bufferSize: 16 * })(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 (order may vary due to concurrency) * ``` * * @since 2.0.0 * @category utils */ ( channels: Channel< Channel, OutErr, OutDone, InElem, InErr, InDone, Env >, options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly switch?: boolean | undefined } ): Channel< OutElem, OutErr1 | OutErr, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > } = dual( 2, ( channels: Channel< Channel, OutErr, OutDone, InElem, InErr, InDone, Env >, { bufferSize = 16, concurrency, switch: switch_ = false }: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly switch?: boolean | undefined } ): Channel< OutElem, OutErr1 | OutErr, OutDone, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > => fromTransformBracket( Effect.fnUntraced(function*(upstream, scope, forkedScope) { const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : Math.max(1, concurrency) const semaphore = switch_ ? undefined : Semaphore.makeUnsafe(concurrencyN) const doneLatch = yield* Latch.make(true) const fibers = new Set>() const queue = yield* Queue.bounded {} * * // Create channels that output other channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Merge all channels with bounded concurrency * const mergedChannel = Channel.mergeAll({ * concurrency: 2, * bufferSize: 16 * })(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 (order may vary due to concurrency) * ``` * * @since 2.0.0 * @category utils */ OutElem, /** * Merges multiple channels with specified concurrency and buffering options. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeAllError extends Data.TaggedError("MergeAllError")<{ * readonly reason: string * }> {} * * // Create channels that output other channels * const nestedChannels = Channel.fromIterable([ * Channel.fromIterable([1, 2]), * Channel.fromIterable([3, 4]), * Channel.fromIterable([5, 6]) * ]) * * // Merge all channels with bounded concurrency * const mergedChannel = Channel.mergeAll({ * concurrency: 2, * bufferSize: 16 * })(nestedChannels) * * // Outputs: 1, 2, 3, 4, 5, 6 (order may vary due to concurrency) * ``` * * @since 2.0.0 * @category utils */ OutErr | OutErr1 | Cause.Done>( bufferSize ) yield* Scope.addFinalizer(forkedScope, Queue.shutdown(queue)) const pull = yield* toTransform(channels)(upstream, scope) yield* Effect.gen(function*() { while (true) { if (semaphore) yield* semaphore.take(1) const channel = yield* pull const childScope = Scope.forkUnsafe(forkedScope) const childPull = yield* toTransform(channel)(upstream, childScope) while (fibers.size >= concurrencyN) { const fiber = Iterable.headUnsafe(fibers) fibers.delete(fiber) if (fibers.size === 0) yield* doneLatch.open yield* Fiber.interrupt(fiber) } const fiber = yield* childPull.pipe( Effect.tap(() => Effect.yieldNow), Effect.flatMap((value) => Queue.offer(queue, value)), Effect.forever({ disableYield: true }), Effect.onError(Effect.fnUntraced(function*(cause) { const halt = Pull.filterDone(cause) yield* Effect.exit(Scope.close( childScope, !Result.isFailure(halt) ? Exit.succeed(halt.success.value) : Exit.failCause(halt.failure) )) if (!fibers.has(fiber)) return fibers.delete(fiber) if (semaphore) yield* semaphore.release(1) if (fibers.size === 0) yield* doneLatch.open if (Result.isSuccess(halt)) return return yield* Queue.failCause(queue, cause as any) })), Effect.forkChild ) doneLatch.closeUnsafe() fibers.add(fiber) } }).pipe( Effect.catchCause((cause) => doneLatch.whenOpen(Queue.failCause(queue, cause))), Effect.forkIn(forkedScope) ) return Queue.take(queue) }) ) ) /** * Represents strategies for halting merged channels when one completes or fails. * * @example * ```ts * import type { Channel } from "effect" * * // Different halt strategies for channel merging * const leftFirst: Channel.HaltStrategy = "left" // Stop when left channel halts * const rightFirst: Channel.HaltStrategy = "right" // Stop when right channel halts * const both: Channel.HaltStrategy = "both" // Stop when both channels halt * const either: Channel.HaltStrategy = "either" // Stop when either channel halts * ``` * * @since 2.0.0 * @category models */ export type HaltStrategy = "left" | "right" | "both" | "either" /** * Returns a new channel, which is the merge of this channel and the specified * channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeError extends Data.TaggedError("MergeError")<{ * readonly source: string * }> {} * * // Create two channels * const leftChannel = Channel.fromIterable([1, 2, 3]) * const rightChannel = Channel.fromIterable(["a", "b", "c"]) * * // Merge them with "either" halt strategy * const mergedChannel = Channel.merge(leftChannel, rightChannel, { * haltStrategy: "either" * }) * * // Outputs elements from both channels concurrently * // Order may vary: 1, "a", 2, "b", 3, "c" * ``` * * @since 2.0.0 * @category utils */ export const merge: { /** * Returns a new channel, which is the merge of this channel and the specified * channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeError extends Data.TaggedError("MergeError")<{ * readonly source: string * }> {} * * // Create two channels * const leftChannel = Channel.fromIterable([1, 2, 3]) * const rightChannel = Channel.fromIterable(["a", "b", "c"]) * * // Merge them with "either" halt strategy * const mergedChannel = Channel.merge(leftChannel, rightChannel, { * haltStrategy: "either" * }) * * // Outputs elements from both channels concurrently * // Order may vary: 1, "a", 2, "b", 3, "c" * ``` * * @since 2.0.0 * @category utils */ ( right: Channel, options?: { readonly haltStrategy?: HaltStrategy | undefined } | undefined ): ( left: Channel ) => Channel< OutElem1 | OutElem, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env1 | Env > /** * Returns a new channel, which is the merge of this channel and the specified * channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeError extends Data.TaggedError("MergeError")<{ * readonly source: string * }> {} * * // Create two channels * const leftChannel = Channel.fromIterable([1, 2, 3]) * const rightChannel = Channel.fromIterable(["a", "b", "c"]) * * // Merge them with "either" halt strategy * const mergedChannel = Channel.merge(leftChannel, rightChannel, { * haltStrategy: "either" * }) * * // Outputs elements from both channels concurrently * // Order may vary: 1, "a", 2, "b", 3, "c" * ``` * * @since 2.0.0 * @category utils */ < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( left: Channel, right: Channel, options?: { readonly haltStrategy?: HaltStrategy | undefined } | undefined ): Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > } = dual((args) => isChannel(args[0]) && isChannel(args[1]), < OutElem, OutErr, OutDone, InElem, InErr, InDone, Env, OutElem1, OutErr1, OutDone1, InElem1, InErr1, InDone1, Env1 >( left: Channel, right: Channel, options?: { readonly haltStrategy?: HaltStrategy | undefined } | undefined ): Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 > => fromTransformBracket(Effect.fnUntraced(function*(upstream, _scope, forkedScope) { const strategy = options?.haltStrategy ?? "both" const queue = yield* Queue.bounded {} * * // Create two channels * const leftChannel = Channel.fromIterable([1, 2, 3]) * const rightChannel = Channel.fromIterable(["a", "b", "c"]) * * // Merge them with "either" halt strategy * const mergedChannel = Channel.merge(leftChannel, rightChannel, { * haltStrategy: "either" * }) * * // Outputs elements from both channels concurrently * // Order may vary: 1, "a", 2, "b", 3, "c" * ``` * * @since 2.0.0 * @category utils */ OutElem | OutElem1, /** * Returns a new channel, which is the merge of this channel and the specified * channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class MergeError extends Data.TaggedError("MergeError")<{ * readonly source: string * }> {} * * // Create two channels * const leftChannel = Channel.fromIterable([1, 2, 3]) * const rightChannel = Channel.fromIterable(["a", "b", "c"]) * * // Merge them with "either" halt strategy * const mergedChannel = Channel.merge(leftChannel, rightChannel, { * haltStrategy: "either" * }) * * // Outputs elements from both channels concurrently * // Order may vary: 1, "a", 2, "b", 3, "c" * ``` * * @since 2.0.0 * @category utils */ OutErr | OutErr1 | Cause.Done>(0) yield* Scope.addFinalizer(forkedScope, Queue.shutdown(queue)) let done = 0 function onExit( side: "left" | "right", cause: Cause.Cause> ): Effect.Effect { done++ if (!Pull.isDoneCause(cause)) { return Queue.failCause(queue, cause) } switch (strategy) { case "both": { return done === 2 ? Queue.failCause(queue, cause) : Effect.void } case "left": case "right": { return side === strategy ? Queue.failCause(queue, cause) : Effect.void } case "either": { return Queue.failCause(queue, cause) } } } const runSide = ( side: "left" | "right", channel: Channel< OutElem | OutElem1, OutErr | OutErr1, OutDone | OutDone1, InElem & InElem1, InErr & InErr1, InDone & InDone1, Env | Env1 >, scope: Scope.Closeable ) => toTransform(channel)(upstream, scope).pipe( Effect.flatMap((pull) => pull.pipe( Effect.flatMap((value) => Queue.offer(queue, value)), Effect.forever ) ), Effect.onError((cause) => Effect.andThen( Scope.close(scope, Pull.doneExitFromCause(cause)), onExit(side, cause) ) ), Effect.forkIn(forkedScope) ) yield* runSide("left", left, Scope.forkUnsafe(forkedScope)) yield* runSide("right", right, Scope.forkUnsafe(forkedScope)) return Queue.take(queue) }))) /** * @since 4.0.0 * @category utils */ export const mergeEffect: { /** * @since 4.0.0 * @category utils */ (effect: Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, effect: Effect.Effect ): Channel } = dual(2, ( self: Channel, effect: Effect.Effect ): Channel => merge( self, fromEffectDrain(effect), { haltStrategy: "left" } ) as any) /** * Splits upstream string chunks into lines, recognizing `\n`, `\r\n`, and * standalone `\r` as line terminators. The behavior matches * `String.linesIterator` regardless of how the input is chunked. * * A line terminator at the very end of the stream does **not** produce a * trailing empty line (consistent with `String.linesIterator`). Conversely, * if the stream ends without a terminator the final partial line is still * emitted. * * **Example** * * ```ts * import { Effect, Stream } from "effect" * * Effect.runPromise(Effect.gen(function*() { * const result = yield* Stream.runCollect( * Stream.splitLines(Stream.make("hel", "lo\r\nwor", "ld\n")) * ) * console.log(result) * // [ 'hello', 'world' ] * })) * ``` * * @since 2.0.0 * @category String manipulation */ export const splitLines = (): Channel< Arr.NonEmptyReadonlyArray, Err, Done, Arr.NonEmptyReadonlyArray, Err, Done > => fromTransform((upstream, _scope) => Effect.sync(() => { // Accumulates text that has not yet been terminated by a line break. // Content is carried across chunks until a terminator is found. let stringBuilder = "" // Set when a chunk ends with \r so the next chunk can check whether // the following character is \n (completing a \r\n pair) or not // (standalone \r, which is itself a line terminator). let midCRLF = false // Remembers the upstream Done value after the first time the upstream // signals completion, so subsequent pulls return Done immediately // without pulling upstream again. let done = Option.none() function splitLinesArray(chunk: Arr.NonEmptyReadonlyArray): Arr.NonEmptyReadonlyArray | null { const chunkBuilder: Array = [] function pushLine(segment: string): void { if (stringBuilder.length === 0) { chunkBuilder.push(segment) } else { chunkBuilder.push(stringBuilder + segment) stringBuilder = "" } } for (let i = 0; i < chunk.length; i++) { const str = chunk[i] if (str.length !== 0) { let from = 0 let indexOfCR = str.indexOf("\r") let indexOfLF = str.indexOf("\n") if (midCRLF) { if (indexOfLF === 0) { pushLine("") from = 1 indexOfLF = str.indexOf("\n", from) } else { pushLine("") } midCRLF = false } while (indexOfCR !== -1 || indexOfLF !== -1) { if (indexOfCR === -1 || (indexOfLF !== -1 && indexOfLF < indexOfCR)) { pushLine(str.substring(from, indexOfLF)) from = indexOfLF + 1 indexOfLF = str.indexOf("\n", from) } else { if (str.length === indexOfCR + 1) { midCRLF = true indexOfCR = -1 } else { pushLine(str.substring(from, indexOfCR)) from = indexOfCR + (indexOfLF === indexOfCR + 1 ? 2 : 1) indexOfCR = str.indexOf("\r", from) indexOfLF = str.indexOf("\n", from) } } } stringBuilder = stringBuilder + str.substring(from, str.length - (midCRLF ? 1 : 0)) } } return Arr.isReadonlyArrayNonEmpty(chunkBuilder) ? chunkBuilder : null } const pullOrFlush: Pull.Pull, Err, Done> = Effect.suspend(() => { if (done._tag === "Some") { return Cause.done(done.value) } return Pull.matchEffect(upstream, { onSuccess: loop, onFailure: Effect.failCause, onDone: (leftover) => { done = Option.some(leftover) if (stringBuilder.length > 0 || midCRLF) { const last = stringBuilder stringBuilder = "" midCRLF = false return Effect.succeed([last] as Arr.NonEmptyReadonlyArray) } return Cause.done(leftover) } }) }) function loop(chunk: Arr.NonEmptyReadonlyArray): Pull.Pull, Err, Done> { const lines = splitLinesArray(chunk) return lines !== null ? Effect.succeed(lines) : pullOrFlush } return pullOrFlush }) ) /** * @since 4.0.0 * @category String manipulation */ export const decodeText = (encoding?: string, options?: TextDecoderOptions): Channel< Arr.NonEmptyReadonlyArray, Err, Done, Arr.NonEmptyReadonlyArray, Err, Done > => fromTransform((upstream, _scope) => Effect.sync(() => { const decoder = new TextDecoder(encoding, options) return Effect.map(upstream, Arr.map((line) => decoder.decode(line))) }) ) /** * @since 4.0.0 * @category String manipulation */ export const encodeText = (): Channel< Arr.NonEmptyReadonlyArray, Err, Done, Arr.NonEmptyReadonlyArray, Err, Done > => fromTransform((upstream, _scope) => Effect.sync(() => { const encoder = new TextEncoder() return Effect.map(upstream, Arr.map((line) => encoder.encode(line))) }) ) /** * Returns a new channel that pipes the output of this channel into the * specified channel. The returned channel has the input type of this channel, * and the output type of the specified channel, terminating with the value of * the specified channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class PipeError extends Data.TaggedError("PipeError")<{ * readonly stage: string * }> {} * * // Create source and transform channels * const sourceChannel = Channel.fromIterable([1, 2, 3]) * const transformChannel = Channel.map(sourceChannel, (n: number) => n * 2) * * // Pipe the source into the transform * const pipedChannel = Channel.pipeTo(sourceChannel, transformChannel) * * // Outputs: 2, 4, 6 * ``` * * @since 2.0.0 * @category utils */ export const pipeTo: { /** * Returns a new channel that pipes the output of this channel into the * specified channel. The returned channel has the input type of this channel, * and the output type of the specified channel, terminating with the value of * the specified channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class PipeError extends Data.TaggedError("PipeError")<{ * readonly stage: string * }> {} * * // Create source and transform channels * const sourceChannel = Channel.fromIterable([1, 2, 3]) * const transformChannel = Channel.map(sourceChannel, (n: number) => n * 2) * * // Pipe the source into the transform * const pipedChannel = Channel.pipeTo(sourceChannel, transformChannel) * * // Outputs: 2, 4, 6 * ``` * * @since 2.0.0 * @category utils */ (that: Channel): ( self: Channel ) => Channel /** * Returns a new channel that pipes the output of this channel into the * specified channel. The returned channel has the input type of this channel, * and the output type of the specified channel, terminating with the value of * the specified channel. * * @example * ```ts * import { Channel, Data } from "effect" * * class PipeError extends Data.TaggedError("PipeError")<{ * readonly stage: string * }> {} * * // Create source and transform channels * const sourceChannel = Channel.fromIterable([1, 2, 3]) * const transformChannel = Channel.map(sourceChannel, (n: number) => n * 2) * * // Pipe the source into the transform * const pipedChannel = Channel.pipeTo(sourceChannel, transformChannel) * * // Outputs: 2, 4, 6 * ``` * * @since 2.0.0 * @category utils */ ( self: Channel, that: Channel ): Channel } = dual( 2, ( self: Channel, that: Channel ): Channel => fromTransform((upstream, scope) => Effect.flatMap(toTransform(self)(upstream, scope), (upstream) => toTransform(that)(upstream, scope)) ) ) /** * Returns a new channel that pipes the output of this channel into the * specified channel and preserves this channel's failures without providing * them to the other channel for observation. * * @example * ```ts * import { Channel, Data } from "effect" * * class SourceError extends Data.TaggedError("SourceError")<{ * readonly code: number * }> {} * * // Create a failing source channel * const failingSource = Channel.fail(new SourceError({ code: 404 })) * const safeTransform = Channel.succeed("transformed") * * // Pipe while preserving source failures * const safePipedChannel = Channel.pipeToOrFail(failingSource, safeTransform) * * // Source errors are preserved and not sent to transform channel * ``` * * @since 2.0.0 * @category utils */ export const pipeToOrFail: { /** * Returns a new channel that pipes the output of this channel into the * specified channel and preserves this channel's failures without providing * them to the other channel for observation. * * @example * ```ts * import { Channel, Data } from "effect" * * class SourceError extends Data.TaggedError("SourceError")<{ * readonly code: number * }> {} * * // Create a failing source channel * const failingSource = Channel.fail(new SourceError({ code: 404 })) * const safeTransform = Channel.succeed("transformed") * * // Pipe while preserving source failures * const safePipedChannel = Channel.pipeToOrFail(failingSource, safeTransform) * * // Source errors are preserved and not sent to transform channel * ``` * * @since 2.0.0 * @category utils */ (that: Channel): ( self: Channel ) => Channel /** * Returns a new channel that pipes the output of this channel into the * specified channel and preserves this channel's failures without providing * them to the other channel for observation. * * @example * ```ts * import { Channel, Data } from "effect" * * class SourceError extends Data.TaggedError("SourceError")<{ * readonly code: number * }> {} * * // Create a failing source channel * const failingSource = Channel.fail(new SourceError({ code: 404 })) * const safeTransform = Channel.succeed("transformed") * * // Pipe while preserving source failures * const safePipedChannel = Channel.pipeToOrFail(failingSource, safeTransform) * * // Source errors are preserved and not sent to transform channel * ``` * * @since 2.0.0 * @category utils */ ( self: Channel, that: Channel ): Channel } = dual( 2, ( self: Channel, that: Channel ): Channel => fromTransform((upstream, scope) => Effect.flatMap(toTransform(self)(upstream, scope), (upstream) => { const upstreamPull = Effect.catchCause( upstream, (cause) => Pull.isDoneCause(cause) ? Effect.failCause(cause) : Effect.die(Cause.Done(cause)) ) as Pull.Pull return Effect.map( toTransform(that)(upstreamPull, scope), (pull) => Effect.catchDefect( pull, (defect) => Cause.isDone(defect) ? Effect.failCause(defect.value as Cause.Cause) : Effect.die(defect) ) ) }) ) ) /** * Constructs a `Channel` from a scoped effect that will result in a * `Channel` if successful. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class UnwrapError extends Data.TaggedError("UnwrapError")<{ * readonly reason: string * }> {} * * // Create an effect that produces a channel * const channelEffect = Effect.succeed( * Channel.fromIterable([1, 2, 3]) * ) * * // Unwrap the effect to get the channel * const unwrappedChannel = Channel.unwrap(channelEffect) * * // The resulting channel outputs: 1, 2, 3 * ``` * * @since 2.0.0 * @category constructors */ export const unwrap = ( channel: Effect.Effect, E, R> ): Channel | R2> => fromTransform((upstream, scope) => { let pull: Pull.Pull | undefined return Effect.succeed(Effect.suspend(() => { if (pull) return pull return channel.pipe( Scope.provide(scope), Effect.flatMap((channel) => toTransform(channel)(upstream, scope)), Effect.flatMap((pull_) => pull = pull_) ) })) }) /** * @since 2.0.0 * @category utils */ export const scoped = ( self: Channel ): Channel> => fromTransformBracket((upstream, scope, forkedScope) => Scope.provide(toTransform(self)(upstream, scope), forkedScope)) /** * Returns a new channel which embeds the given input handler into a Channel. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class EmbedError extends Data.TaggedError("EmbedError")<{ * readonly stage: string * }> {} * * // Create a base channel * const baseChannel = Channel.fromIterable([1, 2, 3]) * * // Embed input handling - simplified example * const embeddedChannel = Channel.embedInput( * baseChannel, * (_upstream) => Effect.void * ) * ``` * * @since 2.0.0 * @category utils */ export const embedInput: { /** * Returns a new channel which embeds the given input handler into a Channel. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class EmbedError extends Data.TaggedError("EmbedError")<{ * readonly stage: string * }> {} * * // Create a base channel * const baseChannel = Channel.fromIterable([1, 2, 3]) * * // Embed input handling - simplified example * const embeddedChannel = Channel.embedInput( * baseChannel, * (_upstream) => Effect.void * ) * ``` * * @since 2.0.0 * @category utils */ ( input: ( upstream: Pull.Pull ) => Effect.Effect ): ( self: Channel ) => Channel /** * Returns a new channel which embeds the given input handler into a Channel. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class EmbedError extends Data.TaggedError("EmbedError")<{ * readonly stage: string * }> {} * * // Create a base channel * const baseChannel = Channel.fromIterable([1, 2, 3]) * * // Embed input handling - simplified example * const embeddedChannel = Channel.embedInput( * baseChannel, * (_upstream) => Effect.void * ) * ``` * * @since 2.0.0 * @category utils */ ( self: Channel, input: ( upstream: Pull.Pull ) => Effect.Effect ): Channel } = dual( 2, ( self: Channel, input: ( upstream: Pull.Pull ) => Effect.Effect ): Channel => fromTransformBracket((upstream, scope, forkedScope) => Effect.andThen( Effect.forkIn(input(upstream), forkedScope), toTransform(self)(Cause.done(), scope) ) ) ) /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ export const buffer: { /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ ( options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): ( self: Channel ) => Channel /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ ( self: Channel, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Channel } = dual(2, ( self: Channel, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Channel => fromTransform(Effect.fnUntraced(function*(upstream, scope) { const pull = yield* toTransform(self)(upstream, scope) const queue = yield* Queue.make>({ capacity: options.capacity === "unbounded" ? undefined : options.capacity, strategy: options.capacity === "unbounded" ? undefined : options.strategy }) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) yield* pull.pipe( Effect.flatMap((value) => Queue.offer(queue, value)), Effect.forever({ disableYield: true }), Effect.onError((cause) => Queue.failCause(queue, cause)), Effect.forkIn(scope) ) return Queue.take(queue) }))) /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ export const bufferArray: { /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ ( options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env> ) => Channel, OutErr, OutDone, InElem, InErr, InDone, Env> /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a queue. * * @since 2.0.0 * @category Buffering */ ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> } = dual(2, ( self: Channel, OutErr, OutDone, InElem, InErr, InDone, Env>, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Channel, OutErr, OutDone, InElem, InErr, InDone, Env> => fromTransform(Effect.fnUntraced(function*(upstream, scope) { const pull = yield* toTransform(self)(upstream, scope) const queue = yield* Queue.make>({ capacity: options.capacity === "unbounded" ? undefined : options.capacity, strategy: options.capacity === "unbounded" ? undefined : options.strategy }) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) yield* pull.pipe( Effect.flatMap((value) => Queue.offerAll(queue, value)), Effect.forever({ disableYield: true }), Effect.onError((cause) => Queue.failCause(queue, cause)), Effect.forkIn(scope) ) return Queue.takeAll(queue) }))) /** * Returns a new channel, which is the same as this one, except it will be * interrupted when the specified effect completes. If the effect completes * successfully before the underlying channel is done, then the returned * channel will yield the success value of the effect as its terminal value. * On the other hand, if the underlying channel finishes first, then the * returned channel will yield the success value of the underlying channel as * its terminal value. * * @since 2.0.0 * @category utils */ export const interruptWhen: { /** * Returns a new channel, which is the same as this one, except it will be * interrupted when the specified effect completes. If the effect completes * successfully before the underlying channel is done, then the returned * channel will yield the success value of the effect as its terminal value. * On the other hand, if the underlying channel finishes first, then the * returned channel will yield the success value of the underlying channel as * its terminal value. * * @since 2.0.0 * @category utils */ (effect: Effect.Effect): ( self: Channel ) => Channel /** * Returns a new channel, which is the same as this one, except it will be * interrupted when the specified effect completes. If the effect completes * successfully before the underlying channel is done, then the returned * channel will yield the success value of the effect as its terminal value. * On the other hand, if the underlying channel finishes first, then the * returned channel will yield the success value of the underlying channel as * its terminal value. * * @since 2.0.0 * @category utils */ ( self: Channel, effect: Effect.Effect ): Channel } = dual(2, ( self: Channel, effect: Effect.Effect ): Channel => merge( self, fromPull(Effect.succeed(Effect.flatMap(effect, Cause.done))), { haltStrategy: "either" } )) /** * @since 4.0.0 * @category utils */ export const haltWhen: { /** * @since 4.0.0 * @category utils */ (effect: Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, effect: Effect.Effect ): Channel } = dual(2, ( self: Channel, effect: Effect.Effect ): Channel => fromTransformBracket(Effect.fnUntraced(function*(upstream, scope, forkedScope) { const pull = yield* toTransform(self)(upstream, scope) let haltCause: Cause.Cause> | undefined = undefined yield* effect.pipe( Effect.catchCause((cause) => { haltCause = cause return Effect.void }), Effect.forkIn(forkedScope) ) return Effect.suspend((): Pull.Pull => haltCause ? Effect.failCause(haltCause) : pull ) }))) /** * @since 4.0.0 * @category utils */ export const onError: { /** * @since 4.0.0 * @category utils */ ( finalizer: (cause: Cause.Cause) => Effect.Effect ): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, finalizer: (cause: Cause.Cause) => Effect.Effect ): Channel } = dual(2, ( self: Channel, finalizer: (cause: Cause.Cause) => Effect.Effect ): Channel => onExit(self, (exit) => Exit.isFailure(exit) ? finalizer(exit.cause) : Effect.void)) /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data, Exit } from "effect" * * class ExitError extends Data.TaggedError("ExitError")<{ * readonly stage: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Attach exit handler * const channelWithExit = Channel.onExit(dataChannel, (exit) => { * if (Exit.isSuccess(exit)) { * return Console.log(`Channel completed successfully with: ${exit.value}`) * } else { * return Console.log(`Channel failed with: ${exit.cause}`) * } * }) * ``` * * @since 4.0.0 * @category utils */ export const onExit: { /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data, Exit } from "effect" * * class ExitError extends Data.TaggedError("ExitError")<{ * readonly stage: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Attach exit handler * const channelWithExit = Channel.onExit(dataChannel, (exit) => { * if (Exit.isSuccess(exit)) { * return Console.log(`Channel completed successfully with: ${exit.value}`) * } else { * return Console.log(`Channel failed with: ${exit.cause}`) * } * }) * ``` * * @since 4.0.0 * @category utils */ ( finalizer: (e: Exit.Exit) => Effect.Effect ): ( self: Channel ) => Channel /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data, Exit } from "effect" * * class ExitError extends Data.TaggedError("ExitError")<{ * readonly stage: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Attach exit handler * const channelWithExit = Channel.onExit(dataChannel, (exit) => { * if (Exit.isSuccess(exit)) { * return Console.log(`Channel completed successfully with: ${exit.value}`) * } else { * return Console.log(`Channel failed with: ${exit.cause}`) * } * }) * ``` * * @since 4.0.0 * @category utils */ ( self: Channel, finalizer: (e: Exit.Exit) => Effect.Effect ): Channel } = dual(2, ( self: Channel, finalizer: (e: Exit.Exit) => Effect.Effect ): Channel => fromTransformBracket((upstream, scope, forkedScope) => Scope.addFinalizerExit(forkedScope, finalizer as any).pipe( Effect.andThen(toTransform(self)(upstream, scope)) ) )) /** * @since 4.0.0 * @category utils */ export const onStart: { /** * @since 4.0.0 * @category utils */ (onStart: Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, onStart: Effect.Effect ): Channel } = dual(2, ( self: Channel, onStart: Effect.Effect ): Channel => unwrap(Effect.as(onStart, self))) /** * @since 4.0.0 * @category utils */ export const onFirst: { /** * @since 4.0.0 * @category utils */ (onFirst: (element: Types.NoInfer) => Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, onFirst: (element: Types.NoInfer) => Effect.Effect ): Channel } = dual(2, ( self: Channel, onFirst: (element: Types.NoInfer) => Effect.Effect ): Channel => transformPull(self, (pull) => Effect.sync(() => { let isFirst = true const pullFirst = Effect.tap(pull, (element) => { isFirst = false return onFirst(element) }) return Effect.suspend(() => isFirst ? pullFirst : pull) }))) /** * @since 4.0.0 * @category utils */ export const onEnd: { /** * @since 4.0.0 * @category utils */ (onEnd: Effect.Effect): ( self: Channel ) => Channel /** * @since 4.0.0 * @category utils */ ( self: Channel, onEnd: Effect.Effect ): Channel } = dual(2, ( self: Channel, onEnd: Effect.Effect ): Channel => transformPull(self, (pull) => Effect.succeed(Pull.catchDone( pull, (leftover) => Effect.flatMap(onEnd, () => Cause.done(leftover as OutDone)) )))) /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class EnsureError extends Data.TaggedError("EnsureError")<{ * readonly operation: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Ensure cleanup always runs * const channelWithCleanup = Channel.ensuring( * dataChannel, * Console.log("Cleanup executed regardless of success or failure") * ) * ``` * * @since 2.0.0 * @category utils */ export const ensuring: { /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class EnsureError extends Data.TaggedError("EnsureError")<{ * readonly operation: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Ensure cleanup always runs * const channelWithCleanup = Channel.ensuring( * dataChannel, * Console.log("Cleanup executed regardless of success or failure") * ) * ``` * * @since 2.0.0 * @category utils */ (finalizer: Effect.Effect): ( self: Channel ) => Channel /** * Returns a new channel with an attached finalizer. The finalizer is * guaranteed to be executed so long as the channel begins execution (and * regardless of whether or not it completes). * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class EnsureError extends Data.TaggedError("EnsureError")<{ * readonly operation: string * }> {} * * // Create a channel * const dataChannel = Channel.fromIterable([1, 2, 3]) * * // Ensure cleanup always runs * const channelWithCleanup = Channel.ensuring( * dataChannel, * Console.log("Cleanup executed regardless of success or failure") * ) * ``` * * @since 2.0.0 * @category utils */ ( self: Channel, finalizer: Effect.Effect ): Channel } = dual(2, ( self: Channel, finalizer: Effect.Effect ): Channel => onExit(self, (_) => finalizer)) const runWith = < OutElem, OutErr, OutDone, Env, EX, RX, AH = OutDone, EH = never, RH = never >( self: Channel, f: (pull: Pull.Pull) => Effect.Effect, onHalt?: (leftover: OutDone) => Effect.Effect ): Effect.Effect | EH, Env | RX | RH> => Effect.suspend(() => { const scope = Scope.makeUnsafe() const makePull = toTransform(self)(Cause.done(), scope) return Pull.catchDone(Effect.flatMap(makePull, f), onHalt ? onHalt : Effect.succeed as any).pipe( Effect.onExit((exit) => Scope.close(scope, exit)) ) as any }) /** * Create a channel from the specified services. * * @since 2.0.0 * @category Services */ export const contextWith = ( f: (context: Context.Context) => Channel ): Channel => fromTransform((upstream, scope) => Effect.contextWith((context: Context.Context) => toTransform(f(context))(upstream, scope)) ) /** * Provides a layer or context to the channel, removing the corresponding * service requirements. Use `options.local` to build the layer every time; by * default, layers are shared between provide calls. * * @since 4.0.0 * @category Services */ export const provideContext: { /** * Provides a layer or context to the channel, removing the corresponding * service requirements. Use `options.local` to build the layer every time; by * default, layers are shared between provide calls. * * @since 4.0.0 * @category Services */ (context: Context.Context): ( self: Channel ) => Channel> /** * Provides a layer or context to the channel, removing the corresponding * service requirements. Use `options.local` to build the layer every time; by * default, layers are shared between provide calls. * * @since 4.0.0 * @category Services */ ( self: Channel, context: Context.Context ): Channel> } = dual(2, ( self: Channel, context: Context.Context ): Channel> => fromTransform((upstream, scope) => Effect.map( Effect.provideContext(toTransform(self)(upstream, scope), context), Effect.provideContext(context) ) )) /** * @since 4.0.0 * @category Services */ export const provideService: { /** * @since 4.0.0 * @category Services */ (key: Context.Key, service: NoInfer): ( self: Channel ) => Channel> /** * @since 4.0.0 * @category Services */ ( self: Channel, key: Context.Key, service: NoInfer ): Channel> } = dual(3, ( self: Channel, key: Context.Key, service: NoInfer ): Channel> => fromTransform((upstream, scope) => Effect.map( Effect.provideService(toTransform(self)(upstream, scope), key, service), Effect.provideService(key, service) ) )) /** * @since 4.0.0 * @category Services */ export const provideServiceEffect: { /** * @since 4.0.0 * @category Services */ (key: Context.Key, service: Effect.Effect, ES, RS>): ( self: Channel ) => Channel | RS> /** * @since 4.0.0 * @category Services */ ( self: Channel, key: Context.Key, service: Effect.Effect, ES, RS> ): Channel | RS> } = dual(3, ( self: Channel, key: Context.Key, service: Effect.Effect, ES, RS> ): Channel | RS> => fromTransform((upstream, scope) => Effect.flatMap( service, (s) => toTransform(provideService(self, key, s))(upstream, scope) ) )) /** * @since 4.0.0 * @category Services */ export const provide: { /** * @since 4.0.0 * @category Services */ ( layer: Layer.Layer | Context.Context, options?: { readonly local?: boolean | undefined } | undefined ): ( self: Channel ) => Channel | R> /** * @since 4.0.0 * @category Services */ ( self: Channel, layer: Layer.Layer | Context.Context, options?: { readonly local?: boolean | undefined } | undefined ): Channel | R> } = dual((args) => isChannel(args[0]), ( self: Channel, layer: Layer.Layer | Context.Context, options?: { readonly local?: boolean | undefined } | undefined ): Channel | R> => Context.isContext(layer) ? provideContext(self, layer) : fromTransform((upstream, scope) => Effect.flatMap( options?.local ? Layer.buildWithMemoMap(layer, Layer.makeMemoMapUnsafe(), scope) : Layer.buildWithScope(layer, scope), (context) => Effect.map( Effect.provideContext(toTransform(self)(upstream, scope), context), Effect.provideContext(context) ) ) )) /** * @since 2.0.0 * @category Services */ export const updateContext: { /** * @since 2.0.0 * @category Services */ (f: (context: Context.Context) => Context.Context): ( self: Channel ) => Channel /** * @since 2.0.0 * @category Services */ ( self: Channel, f: (context: Context.Context) => Context.Context ): Channel } = dual(2, ( self: Channel, f: (context: Context.Context) => Context.Context ): Channel => fromTransform((upstream, scope) => Effect.contextWith((context) => { const toProvide = f(context) return toTransform(provideContext(self, toProvide))(upstream, scope) }) )) /** * @since 2.0.0 * @category Services */ export const updateService: { /** * @since 2.0.0 * @category Services */ (key: Context.Key, f: (service: NoInfer) => S): ( self: Channel ) => Channel /** * @since 2.0.0 * @category Services */ ( self: Channel, service: Context.Key, f: (service: NoInfer) => S ): Channel } = dual(3, ( self: Channel, service: Context.Key, f: (service: NoInfer) => S ): Channel => updateContext(self, (context) => Context.add( context, service, f(Context.get(context, service)) ))) /** * @since 4.0.0 * @category Tracing */ export const withSpan: { /** * @since 4.0.0 * @category Tracing */ (name: string, options?: SpanOptions): ( self: Channel ) => Channel> /** * @since 4.0.0 * @category Tracing */ ( self: Channel, name: string, options?: SpanOptions ): Channel> } = function() { const dataFirst = isChannel(arguments[0]) const name = dataFirst ? arguments[1] : arguments[0] const options = addSpanStackTrace(dataFirst ? arguments[2] : arguments[1]) if (dataFirst) { const self = arguments[0] return withSpanImpl(self, name, options) } return (self: any) => withSpanImpl(self, name, options) } as any const withSpanImpl = ( self: Channel, name: string, options?: SpanOptions ): Channel> => acquireUseRelease( Effect.makeSpan(name, options), (span) => provideService(self, ParentSpan, span), (span, exit) => Effect.withFiber((fiber) => { const clock = fiber.getRef(ClockRef) const timingEnabled = fiber.getRef(TracerTimingEnabled) return endSpan(span, exit, clock, timingEnabled) }) ) /** * @since 4.0.0 * @category Do notation */ export const Do: Channel<{}> = succeed({}) const let_: { ( name: Exclude, f: (a: NoInfer) => B ): ( self: Channel ) => Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr, OutDone, InElem, InErr, InDone, R > ( self: Channel, name: Exclude, f: (a: NoInfer) => B ): Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr, OutDone, InElem, InErr, InDone, R > } = dual(3, ( self: Channel, name: Exclude, f: (a: NoInfer) => B ): Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr, OutDone, InElem, InErr, InDone, R > => map(self, (elem) => (({ ...elem, [name]: f(elem) }) as any))) export { /** * @since 4.0.0 * @category Do notation */ let_ as let } /** * @since 4.0.0 * @category Do notation */ export const bind: { /** * @since 4.0.0 * @category Do notation */ ( name: Exclude, f: (a: NoInfer) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): ( self: Channel ) => Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr2 | OutErr, OutDone, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env2 | Env > /** * @since 4.0.0 * @category Do notation */ < OutElem extends object, OutErr, OutDone, InElem, InErr, InDone, Env, N extends string, B, OutErr2, OutDone2, InElem2, InErr2, InDone2, Env2 >( self: Channel, name: Exclude, f: (a: NoInfer) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr2 | OutErr, OutDone, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env2 | Env > } = dual((args) => isChannel(args[0]), < OutElem extends object, OutErr, OutDone, InElem, InErr, InDone, Env, N extends string, B, OutErr2, OutDone2, InElem2, InErr2, InDone2, Env2 >( self: Channel, name: Exclude, f: (a: NoInfer) => Channel, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ): Channel< { [K in N | keyof OutElem]: K extends keyof OutElem ? OutElem[K] : B }, OutErr2 | OutErr, OutDone, InElem & InElem2, InErr & InErr2, InDone & InDone2, Env2 | Env > => flatMap( self, (elem) => map(f(elem), (b) => ({ ...elem, [name]: b } as any)), options )) /** * @since 4.0.0 * @category Do notation */ export const bindTo: { /** * @since 4.0.0 * @category Do notation */ (name: N): ( self: Channel ) => Channel< { [K in N]: OutElem }, OutErr, OutDone, InElem, InErr, InDone, Env > /** * @since 4.0.0 * @category Do notation */ ( self: Channel, name: N ): Channel< { [K in N]: OutElem }, OutErr, OutDone, InElem, InErr, InDone, Env > } = dual(2, ( self: Channel, name: N ): Channel< { [K in N]: OutElem }, OutErr, OutDone, InElem, InErr, InDone, Env > => map(self, (elem) => ({ [name]: elem } as any))) /** * Runs a channel and counts the number of elements it outputs. * * @example * ```ts * import { Channel, Data } from "effect" * * class CountError extends Data.TaggedError("CountError")<{ * readonly reason: string * }> {} * * // Create a channel with multiple elements * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Count the elements * const countEffect = Channel.runCount(numbersChannel) * * // Effect.runSync(countEffect) // Returns: 5 * ``` * * @since 2.0.0 * @category execution */ export const runCount = ( self: Channel ): Effect.Effect => runFold(self, () => 0, (acc) => acc + 1) /** * Runs a channel and discards all output elements, returning only the final result. * * @example * ```ts * import { Channel, Data } from "effect" * * class DrainError extends Data.TaggedError("DrainError")<{ * readonly stage: string * }> {} * * // Create a channel that outputs elements and completes with a result * const resultChannel = Channel.fromIterable([1, 2, 3]) * const completedChannel = Channel.concatWith( * resultChannel, * () => Channel.succeed("completed") * ) * * // Drain all elements and get only the final result * const drainEffect = Channel.runDrain(completedChannel) * * // Effect.runSync(drainEffect) // Returns: "completed" * ``` * * @since 2.0.0 * @category execution */ export const runDrain = ( self: Channel ): Effect.Effect => runWith(self, (pull) => Effect.forever(pull, { disableYield: true })) /** * Runs a channel and applies an effect to each output element. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class ForEachError extends Data.TaggedError("ForEachError")<{ * readonly element: unknown * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3]) * * // Run forEach to log each element * const forEachEffect = Channel.runForEach( * numbersChannel, * (n) => Console.log(`Processing: ${n}`) * ) * * // Logs: "Processing: 1", "Processing: 2", "Processing: 3" * ``` * * @since 2.0.0 * @category execution */ export const runForEach: { /** * Runs a channel and applies an effect to each output element. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class ForEachError extends Data.TaggedError("ForEachError")<{ * readonly element: unknown * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3]) * * // Run forEach to log each element * const forEachEffect = Channel.runForEach( * numbersChannel, * (n) => Console.log(`Processing: ${n}`) * ) * * // Logs: "Processing: 1", "Processing: 2", "Processing: 3" * ``` * * @since 2.0.0 * @category execution */ (f: (o: OutElem) => Effect.Effect): ( self: Channel ) => Effect.Effect /** * Runs a channel and applies an effect to each output element. * * @example * ```ts * import { Channel, Console, Data } from "effect" * * class ForEachError extends Data.TaggedError("ForEachError")<{ * readonly element: unknown * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3]) * * // Run forEach to log each element * const forEachEffect = Channel.runForEach( * numbersChannel, * (n) => Console.log(`Processing: ${n}`) * ) * * // Logs: "Processing: 1", "Processing: 2", "Processing: 3" * ``` * * @since 2.0.0 * @category execution */ ( self: Channel, f: (o: OutElem) => Effect.Effect ): Effect.Effect } = dual( 2, ( self: Channel, f: (o: OutElem) => Effect.Effect ): Effect.Effect => runWith(self, (pull) => Effect.forever(Effect.flatMap(pull, f), { disableYield: true })) ) /** * @since 2.0.0 * @category execution */ export const runForEachWhile: { /** * @since 2.0.0 * @category execution */ (f: (o: OutElem) => Effect.Effect): ( self: Channel ) => Effect.Effect /** * @since 2.0.0 * @category execution */ ( self: Channel, f: (o: OutElem) => Effect.Effect ): Effect.Effect } = dual( 2, ( self: Channel, f: (o: OutElem) => Effect.Effect ): Effect.Effect => runWith(self, (pull) => pull.pipe( Effect.flatMap(f), Effect.flatMap((cont) => (cont ? Effect.void : Cause.done())), Effect.forever({ disableYield: true }) )) ) /** * Runs a channel and collects all output elements into an array. * * @example * ```ts * import { Channel, Data } from "effect" * * class CollectError extends Data.TaggedError("CollectError")<{ * readonly reason: string * }> {} * * // Create a channel with elements * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Collect all elements into an array * const collectEffect = Channel.runCollect(numbersChannel) * * // Effect.runSync(collectEffect) // Returns: [1, 2, 3, 4, 5] * ``` * * @since 2.0.0 * @category execution */ export const runCollect = ( self: Channel ): Effect.Effect, OutErr, Env> => runFold(self, () => [] as Array, (acc, o) => { acc.push(o) return acc }) /** * Runs a channel and outputs the done value. * * @since 4.0.0 * @category execution */ export const runDone = ( self: Channel ): Effect.Effect => runWith(self, identity_, Effect.succeed) /** * @since 2.0.0 * @category execution */ export const runHead = ( self: Channel ): Effect.Effect, OutErr, Env> => Effect.suspend(() => { let head = Option.none() return runWith(self, (pull) => pull.pipe( Effect.asSome, Effect.flatMap((head_) => { head = head_ return Cause.done() }) ), () => Effect.succeed(head)) }) /** * @since 2.0.0 * @category execution */ export const runLast = ( self: Channel ): Effect.Effect, OutErr, Env> => Effect.suspend(() => { const absent = Symbol() // Prevent boxing let last: typeof absent | OutElem = absent return runWith( self, (pull) => Effect.forever( Effect.flatMap(pull, (item) => { last = item return Effect.void }), { disableYield: true } ), () => last === absent ? Effect.succeedNone : Effect.succeedSome(last) ) }) /** * Runs a channel and folds over all output elements with an accumulator. * * @example * ```ts * import { Channel, Data } from "effect" * * class FoldError extends Data.TaggedError("FoldError")<{ * readonly operation: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Fold to calculate sum * const sumEffect = Channel.runFold(numbersChannel, () => 0, (acc, n) => acc + n) * * // Effect.runSync(sumEffect) // Returns: 15 * ``` * * @since 2.0.0 * @category execution */ export const runFold: { /** * Runs a channel and folds over all output elements with an accumulator. * * @example * ```ts * import { Channel, Data } from "effect" * * class FoldError extends Data.TaggedError("FoldError")<{ * readonly operation: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Fold to calculate sum * const sumEffect = Channel.runFold(numbersChannel, () => 0, (acc, n) => acc + n) * * // Effect.runSync(sumEffect) // Returns: 15 * ``` * * @since 2.0.0 * @category execution */ (initial: LazyArg, f: (acc: Z, o: OutElem) => Z): ( self: Channel ) => Effect.Effect /** * Runs a channel and folds over all output elements with an accumulator. * * @example * ```ts * import { Channel, Data } from "effect" * * class FoldError extends Data.TaggedError("FoldError")<{ * readonly operation: string * }> {} * * // Create a channel with numbers * const numbersChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Fold to calculate sum * const sumEffect = Channel.runFold(numbersChannel, () => 0, (acc, n) => acc + n) * * // Effect.runSync(sumEffect) // Returns: 15 * ``` * * @since 2.0.0 * @category execution */ ( self: Channel, initial: LazyArg, f: (acc: Z, o: OutElem) => Z ): Effect.Effect } = dual(3, ( self: Channel, initial: LazyArg, f: (acc: Z, o: OutElem) => Z ): Effect.Effect => Effect.suspend(() => { let state = initial() return runWith( self, (pull) => Effect.whileLoop({ while: constTrue, body: () => pull, step: (value) => { state = f(state, value) } }), () => Effect.succeed(state) ) })) /** * @since 2.0.0 * @category execution */ export const runFoldEffect: { /** * @since 2.0.0 * @category execution */ (initial: LazyArg, f: (acc: Z, o: OutElem) => Effect.Effect): ( self: Channel ) => Effect.Effect /** * @since 2.0.0 * @category execution */ ( self: Channel, initial: LazyArg, f: (acc: Z, o: OutElem) => Effect.Effect ): Effect.Effect } = dual(3, ( self: Channel, initial: LazyArg, f: (acc: Z, o: OutElem) => Effect.Effect ): Effect.Effect => Effect.suspend(() => { let state = initial() return runWith( self, (pull) => Effect.whileLoop({ while: constTrue, body: constant(pull.pipe( Effect.flatMap((o) => f(state, o)), Effect.map((s) => { state = s }) )), step: constVoid }), () => Effect.succeed(state) ) })) /** * Converts a channel to a Pull data structure for low-level consumption. * * @example * ```ts * import { Channel, Data, Effect } from "effect" * * class PullError extends Data.TaggedError("PullError")<{ * readonly step: string * }> {} * * // Create a channel * const numbersChannel = Channel.fromIterable([1, 2, 3]) * * // Convert to Pull within a scope * const pullEffect = Effect.scoped( * Channel.toPull(numbersChannel) * ) * * // Use the Pull to manually consume elements * ``` * * @since 2.0.0 * @category Destructors */ export const toPull: ( self: Channel ) => Effect.Effect< Pull.Pull, never, Env | Scope.Scope > = Effect.fnUntraced( function*( self: Channel ) { const semaphore = Semaphore.makeUnsafe(1) const context = yield* Effect.context() const scope = Context.get(context, Scope.Scope) const pull = yield* toTransform(self)(Cause.done(), scope) return pull.pipe( Effect.provideContext(context), semaphore.withPermits(1) ) }, // ensure errors are redirected to the pull effect Effect.catchCause((cause) => Effect.succeed(Effect.failCause(cause))) ) as any /** * Converts a channel to a Pull within an existing scope. * * @example * ```ts * import { Channel, Data, Effect, Scope } from "effect" * * class ScopedPullError extends Data.TaggedError("ScopedPullError")<{ * readonly reason: string * }> {} * * // Create a channel * const numbersChannel = Channel.fromIterable([1, 2, 3]) * * // Convert to Pull with explicit scope * const scopedPullEffect = Effect.gen(function*() { * const scope = yield* Scope.make() * const pull = yield* Channel.toPullScoped(numbersChannel, scope) * return pull * }) * ``` * * @since 4.0.0 * @category Destructors */ export const toPullScoped = ( self: Channel, scope: Scope.Scope ): Effect.Effect, never, Env> => toTransform(self)(Cause.done(), scope) /** * @since 4.0.0 * @category Destructors */ export const runIntoQueue: { /** * @since 4.0.0 * @category Destructors */ (queue: Queue.Queue): ( self: Channel ) => Effect.Effect /** * @since 4.0.0 * @category Destructors */ ( self: Channel, queue: Queue.Queue ): Effect.Effect } = dual( (args) => isChannel(args[0]), ( self: Channel, queue: Queue.Queue ): Effect.Effect => Effect.uninterruptibleMask((restore) => runForEach(self, (value) => Queue.offer(queue, value)).pipe( restore, Effect.exit, Effect.flatMap((exit) => { if (Exit.isSuccess(exit)) { Queue.endUnsafe(queue) } else { Queue.failCauseUnsafe(queue, exit.cause) } return Effect.void }) ) ) ) /** * @since 4.0.0 * @category Destructors */ export const runIntoQueueArray: { /** * @since 4.0.0 * @category Destructors */ (queue: Queue.Queue): ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env> ) => Effect.Effect /** * @since 4.0.0 * @category Destructors */ ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, queue: Queue.Queue ): Effect.Effect } = dual( (args) => isChannel(args[0]), ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, queue: Queue.Queue ): Effect.Effect => Effect.uninterruptibleMask((restore) => runForEach(self, (value) => Queue.offerAll(queue, value)).pipe( restore, Effect.exit, Effect.flatMap((exit) => { if (Exit.isSuccess(exit)) { Queue.endUnsafe(queue) } else { Queue.failCauseUnsafe(queue, exit.cause) } return Effect.void }) ) ) ) /** * Converts a channel to a queue for concurrent consumption. * * @example * ```ts * import { Channel, Data } from "effect" * * class QueueError extends Data.TaggedError("QueueError")<{ * readonly operation: string * }> {} * * // Create a channel with data * const dataChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Convert to queue for concurrent processing * const queueEffect = Channel.toQueue(dataChannel, { capacity: 32 }) * * // The queue can be used for concurrent consumption * // Multiple consumers can read from the queue * ``` * * @since 4.0.0 * @category Destructors */ export const toQueue: { /** * Converts a channel to a queue for concurrent consumption. * * @example * ```ts * import { Channel, Data } from "effect" * * class QueueError extends Data.TaggedError("QueueError")<{ * readonly operation: string * }> {} * * // Create a channel with data * const dataChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Convert to queue for concurrent processing * const queueEffect = Channel.toQueue(dataChannel, { capacity: 32 }) * * // The queue can be used for concurrent consumption * // Multiple consumers can read from the queue * ``` * * @since 4.0.0 * @category Destructors */ ( options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): ( self: Channel ) => Effect.Effect, never, Env | Scope.Scope> /** * Converts a channel to a queue for concurrent consumption. * * @example * ```ts * import { Channel, Data } from "effect" * * class QueueError extends Data.TaggedError("QueueError")<{ * readonly operation: string * }> {} * * // Create a channel with data * const dataChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Convert to queue for concurrent processing * const queueEffect = Channel.toQueue(dataChannel, { capacity: 32 }) * * // The queue can be used for concurrent consumption * // Multiple consumers can read from the queue * ``` * * @since 4.0.0 * @category Destructors */ ( self: Channel, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Effect.Effect, never, Env | Scope.Scope> } = dual( (args) => isChannel(args[0]), Effect.fnUntraced(function*( self: Channel, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ) { const scope = yield* Effect.scope const queue = yield* Queue.make {} * * // Create a channel with data * const dataChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Convert to queue for concurrent processing * const queueEffect = Channel.toQueue(dataChannel, { capacity: 32 }) * * // The queue can be used for concurrent consumption * // Multiple consumers can read from the queue * ``` * * @since 4.0.0 * @category Destructors */ OutElem, /** * Converts a channel to a queue for concurrent consumption. * * @example * ```ts * import { Channel, Data } from "effect" * * class QueueError extends Data.TaggedError("QueueError")<{ * readonly operation: string * }> {} * * // Create a channel with data * const dataChannel = Channel.fromIterable([1, 2, 3, 4, 5]) * * // Convert to queue for concurrent processing * const queueEffect = Channel.toQueue(dataChannel, { capacity: 32 }) * * // The queue can be used for concurrent consumption * // Multiple consumers can read from the queue * ``` * * @since 4.0.0 * @category Destructors */ OutErr | Cause.Done>({ capacity: typeof options.capacity === "number" ? options.capacity : undefined, strategy: typeof options.capacity === "number" ? options.strategy : undefined }) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) yield* Effect.forkIn(runIntoQueue(self, queue), scope) return queue }) ) /** * @since 4.0.0 * @category Destructors */ export const toQueueArray: { /** * @since 4.0.0 * @category Destructors */ ( options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env> ) => Effect.Effect, never, Env | Scope.Scope> /** * @since 4.0.0 * @category Destructors */ ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ): Effect.Effect, never, Env | Scope.Scope> } = dual( (args) => isChannel(args[0]), Effect.fnUntraced(function*( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } ) { const scope = yield* Effect.scope const queue = yield* Queue.make({ capacity: typeof options.capacity === "number" ? options.capacity : undefined, strategy: typeof options.capacity === "number" ? options.strategy : undefined }) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) yield* Effect.forkIn(runIntoQueueArray(self, queue), scope) return queue }) ) /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ export const toPubSub: { /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ ( options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ): ( self: Channel ) => Effect.Effect, never, Env | Scope.Scope> /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ ( self: Channel, options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ): Effect.Effect, never, Env | Scope.Scope> } = dual( 2, Effect.fnUntraced(function*( self: Channel, options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ) { const pubsub = yield* makePubSub(options) yield* Effect.forkScoped(runIntoPubSub(self, pubsub, { shutdownOnEnd: options.shutdownOnEnd !== false })) return pubsub }) ) /** * @since 4.0.0 * @category Destructors */ export const runIntoPubSub: { /** * @since 4.0.0 * @category Destructors */ ( pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ): ( self: Channel ) => Effect.Effect /** * @since 4.0.0 * @category Destructors */ ( self: Channel, pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ): Effect.Effect } = dual( (args) => isChannel(args[0]), ( self: Channel, pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ) => runForEach(self, (value) => PubSub.publish(pubsub, value)).pipe( options?.shutdownOnEnd === true ? Effect.ensuring(PubSub.shutdown(pubsub)) : identity_ ) ) const makePubSub = ( options: { readonly capacity: "unbounded" readonly replay?: number | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined } ) => Effect.acquireRelease( options.capacity === "unbounded" ? PubSub.unbounded(options) : options.strategy === "dropping" ? PubSub.dropping(options) : options.strategy === "sliding" ? PubSub.sliding(options) : PubSub.bounded(options), PubSub.shutdown ) /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ export const toPubSubArray: { /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ ( options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ): ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env> ) => Effect.Effect, never, Env | Scope.Scope> /** * Converts a channel to a PubSub for concurrent consumption. * * `shutdownOnEnd` indicates whether the PubSub should be shut down when the * channel ends. By default this is `true`. * * @since 4.0.0 * @category Destructors */ ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ): Effect.Effect, never, Env | Scope.Scope> } = dual( 2, Effect.fnUntraced(function*( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined readonly shutdownOnEnd?: boolean | undefined } ) { const pubsub = yield* makePubSub(options) yield* Effect.forkScoped(runIntoPubSubArray(self, pubsub, { shutdownOnEnd: options.shutdownOnEnd !== false })) return pubsub }) ) /** * @since 4.0.0 * @category Destructors */ export const runIntoPubSubArray: { /** * @since 4.0.0 * @category Destructors */ ( pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ): ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env> ) => Effect.Effect /** * @since 4.0.0 * @category Destructors */ ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ): Effect.Effect } = dual( (args) => isChannel(args[0]), ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, pubsub: PubSub.PubSub, options?: { readonly shutdownOnEnd?: boolean | undefined } | undefined ) => runForEach(self, (value) => PubSub.publishAll(pubsub, value)).pipe( options?.shutdownOnEnd === true ? Effect.ensuring(PubSub.shutdown(pubsub)) : identity_ ) ) /** * Converts a channel to a PubSub for concurrent consumption. * * @since 4.0.0 * @category Destructors */ export const toPubSubTake: { /** * Converts a channel to a PubSub for concurrent consumption. * * @since 4.0.0 * @category Destructors */ ( options: { readonly capacity: "unbounded" readonly replay?: number | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined } ): ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env> ) => Effect.Effect< PubSub.PubSub>, never, Env | Scope.Scope > /** * Converts a channel to a PubSub for concurrent consumption. * * @since 4.0.0 * @category Destructors */ ( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" readonly replay?: number | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined } ): Effect.Effect< PubSub.PubSub>, never, Env | Scope.Scope > } = dual( 2, Effect.fnUntraced(function*( self: Channel, OutErr, OutDone, unknown, unknown, unknown, Env>, options: { readonly capacity: "unbounded" readonly replay?: number | undefined } | { readonly capacity: number readonly strategy?: "dropping" | "sliding" | "suspend" | undefined readonly replay?: number | undefined } ) { const pubsub = yield* makePubSub>(options) yield* runForEach(self, (value) => PubSub.publish(pubsub, value)).pipe( Effect.onExit((exit) => PubSub.publish(pubsub, exit)), Effect.forkScoped ) return pubsub }) )