/** * @since 2.0.0 */ import type { NonEmptyReadonlyArray } from "./Array.ts" import * as Arr from "./Array.ts" import * as Cause from "./Cause.ts" import * as Channel from "./Channel.ts" import * as Clock from "./Clock.ts" import type * as Context from "./Context.ts" import * as Duration from "./Duration.ts" import * as Effect from "./Effect.ts" import * as Exit from "./Exit.ts" import type * as Filter from "./Filter.ts" import type { LazyArg } from "./Function.ts" import { constant, constFalse, constTrue, constVoid, dual, identity, pipe } from "./Function.ts" import * as internalStream from "./internal/stream.ts" import * as Option from "./Option.ts" import { type Pipeable, pipeArguments } from "./Pipeable.ts" import type { Predicate, Refinement } from "./Predicate.ts" import { hasProperty } from "./Predicate.ts" import * as PubSub from "./PubSub.ts" import * as Pull from "./Pull.ts" import * as Queue from "./Queue.ts" import * as Result from "./Result.ts" import * as Scope from "./Scope.ts" import type { Stream } from "./Stream.ts" import type * as Types from "./Types.ts" import type * as Unify from "./Unify.ts" const TypeId = "~effect/Sink" /** * A `Sink` is used to consume elements produced by a `Stream`. * You can think of a sink as a function that will consume a variable amount of * `In` elements (could be 0, 1, or many), might fail with an error of type `E`, * and will eventually yield a value of type `A` together with a remainder of * type `L` (i.e. any leftovers). * * @example * ```ts * import { Effect } from "effect" * import * as Sink from "effect/Sink" * import * as Stream from "effect/Stream" * * // Create a simple sink that always succeeds with a value * const sink: Sink.Sink = Sink.succeed(42) * * // Use the sink to consume a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).then(console.log) * // Output: 42 * ``` * * @since 2.0.0 * @category models */ export interface Sink extends Sink.Variance, Pipeable { readonly transform: ( upstream: Pull.Pull, never, void>, scope: Scope.Scope ) => Effect.Effect, E, R> [Unify.typeSymbol]?: unknown [Unify.unifySymbol]?: SinkUnify [Unify.ignoreSymbol]?: SinkUnifyIgnore } /** * @since 2.0.0 * @category models */ export type End = readonly [value: A, leftover?: NonEmptyReadonlyArray | undefined] const endVoid = Effect.succeed([void 0] as End) /** * Interface for Sink unification, used internally by the Effect type system * to provide proper type inference when using Sink with other Effect types. * * @example * ```ts * import type { Effect } from "effect" * import type * as Sink from "effect/Sink" * import type * as Unify from "effect/Unify" * * // SinkUnify helps unify Sink and Effect types * declare const sink: Sink.Sink * declare const effect: Effect.Effect * * // The unification system handles mixed operations * type Combined = Sink.SinkUnify<{ [Unify.typeSymbol]?: any }> * ``` * * @since 2.0.0 * @category models */ export interface SinkUnify extends Effect.EffectUnify { Sink?: () => A[Unify.typeSymbol] extends | Sink< infer A, infer In, infer L, infer E, infer R > | infer _ ? Sink : never } /** * Interface used to ignore certain types during Sink unification. * Part of the internal type system machinery. * * @example * ```ts * import type * as Sink from "effect/Sink" * * // Used internally by the type system * type IgnoreConfig = Sink.SinkUnifyIgnore * ``` * * @category models * @since 2.0.0 */ export interface SinkUnifyIgnore { Effect?: true } /** * Namespace containing types and interfaces for Sink variance and type relationships. * * @example * ```ts * import type * as Sink from "effect/Sink" * * // The Sink namespace contains internal type definitions * // These are used internally for type safety and variance * type SinkType = Sink.Sink * ``` * * @since 2.0.0 * @category models */ export declare namespace Sink { /** * Represents the variance annotations for a Sink type. * Used internally to track how type parameters flow through the Sink. * * @example * ```ts * import type * as Sink from "effect/Sink" * * // The variance interface is used internally * // It defines how type parameters behave in Sink * type SinkWithVariance = Sink.Sink & { variance: "internal" } * ``` * * @since 2.0.0 * @category models */ export interface Variance { readonly [TypeId]: VarianceStruct } /** * The internal structure representing Sink variance annotations. * Contains the actual variance markers for each type parameter. * * @example * ```ts * import type * as Sink from "effect/Sink" * * // The variance structure is used internally by the type system * // It ensures proper type safety for Sink operations * type SinkInstance = Sink.Sink * ``` * * @since 2.0.0 * @category models */ export interface VarianceStruct { _A: Types.Covariant _In: Types.Contravariant _L: Types.Covariant _E: Types.Covariant _R: Types.Covariant } } const sinkVariance = { _A: identity, _In: identity, _L: identity, _E: identity, _R: identity } const SinkProto = { [TypeId]: sinkVariance, pipe() { return pipeArguments(this, arguments) } } /** * Checks if a value is a Sink. * * @example * ```ts * import { Sink } from "effect" * * const sink = Sink.never * const notStream = { data: [1, 2, 3] } * * console.log(Sink.isSink(sink)) // true * console.log(Sink.isSink(notStream)) // false * ``` * * @since 2.0.0 * @category guards */ export const isSink = (u: unknown): u is Sink => hasProperty(u, TypeId) /** * Creates a sink from a `Channel`. * * @since 2.0.0 * @category constructors */ export const fromChannel = ( channel: Channel.Channel< never, E, End, NonEmptyReadonlyArray, never, void, R > ): Sink => fromTransform((upstream, scope) => Channel.toTransform(channel)(upstream, scope).pipe( Effect.flatMap(Effect.forever({ disableYield: true })), Pull.catchDone(Effect.succeed) ) as Effect.Effect, E, R> ) /** * @since 4.0.0 * @category constructors */ export const fromTransform = ( transform: ( upstream: Pull.Pull, never, void>, scope: Scope.Scope ) => Effect.Effect, E, R> ): Sink => { const self = Object.create(SinkProto) self.transform = transform return self } /** * Creates a `Channel` from a Sink. * * @example * ```ts * import { Sink } from "effect" * * // Create a sink and extract its channel * const sink = Sink.succeed(42) * const channel = Sink.toChannel(sink) * ``` * * @since 2.0.0 * @category constructors */ export const toChannel = ( self: Sink ): Channel.Channel, NonEmptyReadonlyArray, never, void, R> => Channel.fromTransform((upstream, scope) => Effect.succeed(Effect.flatMap( self.transform(upstream, scope), Cause.done )) ) /** * @since 4.0.0 * @category constructors */ export const make = (): make.Constructor => (...fns: []) => fromTransform((upstream, scope) => pipe( internalStream.fromChannel(Channel.fromPull(Effect.succeed(upstream))), ...fns as any as [() => Effect.Effect], Effect.flatMap((a) => Cause.done>([a])), Scope.provide(scope) ) ) /** * @since 4.0.0 */ export declare namespace make { /** * @since 4.0.0 */ export interface Constructor { (ab: (_: Stream) => Effect.Effect): Sink> ( ab: (_: Stream) => B, bc: (_: B) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => Effect.Effect ): Sink> ( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => J, jk: (_: J) => Effect.Effect ): Sink> < E, R, B = never, C = never, D = never, F = never, G = never, H = never, I = never, J = never, K = never, L = never >( ab: (_: Stream) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => J, jk: (_: J) => K, kl: (_: K) => Effect.Effect ): Sink> } } /** * @since 4.0.0 * @category constructors */ export const fromEffectEnd = ( effect: Effect.Effect, E, R> ): Sink => fromTransform(() => effect) /** * @since 4.0.0 * @category constructors */ export const fromEffect = ( effect: Effect.Effect ): Sink => fromEffectEnd(Effect.map(effect, (a) => [a])) /** * @since 2.0.0 * @category constructors */ export const fromQueue = ( queue: Queue.Queue ): Sink => fromTransform((upstream) => upstream.pipe( Effect.flatMap((arr) => Queue.offerAll(queue, arr)), Effect.forever({ disableYield: true }), Pull.catchDone((_) => { Queue.endUnsafe(queue) return endVoid }) ) ) /** * @since 2.0.0 * @category constructors */ export const fromPubSub = ( pubsub: PubSub.PubSub ): Sink => forEachArray((arr) => PubSub.publishAll(pubsub, arr)) /** * A sink that immediately ends with the specified value. * * @example * ```ts * import { Effect, Sink, Stream } from "effect" * * // Create a sink that always yields the same value * const sink = Sink.succeed(42) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).then(console.log) * // Output: 42 * ``` * * @since 2.0.0 * @category constructors */ export const succeed = (a: A, leftovers?: NonEmptyReadonlyArray | undefined): Sink => fromEffectEnd(Effect.succeed([a, leftovers])) /** * A sink that immediately ends with the specified lazily evaluated value. * * @since 2.0.0 * @category constructors */ export const sync = (a: LazyArg): Sink => fromEffect(Effect.sync(a)) /** * A sink that is created from a lazily evaluated sink. * * @since 2.0.0 * @category constructors */ export const suspend = (evaluate: LazyArg>): Sink => fromTransform((upstream, scope) => evaluate().transform(upstream, scope)) /** * A sink that always fails with the specified error. * * @example * ```ts * import { Effect, Sink, Stream } from "effect" * * // Create a sink that always fails * const sink = Sink.fail(new Error("Sink failed")) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).catch(console.log) * // Output: Error: Sink failed * ``` * * @since 2.0.0 * @category constructors */ export const fail = (e: E): Sink => fromEffectEnd(Effect.fail(e)) /** * A sink that always fails with the specified lazily evaluated error. * * @example * ```ts * import { Effect, Sink, Stream } from "effect" * * // Create a sink that fails with a lazy error * const sink = Sink.failSync(() => new Error("Lazy error")) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).catch(console.log) * // Output: Error: Lazy error * ``` * * @since 2.0.0 * @category constructors */ export const failSync = (evaluate: LazyArg): Sink => fromEffectEnd(Effect.failSync(evaluate)) /** * Creates a sink halting with a specified `Cause`. * * @example * ```ts * import { Cause, Effect, Sink, Stream } from "effect" * * // Create a sink that fails with a specific cause * const sink = Sink.failCause(Cause.fail(new Error("Custom cause"))) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).catch(console.log) * // Output: Error: Custom cause * ``` * * @since 2.0.0 * @category constructors */ export const failCause = (cause: Cause.Cause): Sink => fromEffectEnd(Effect.failCause(cause)) /** * Creates a sink halting with a specified lazily evaluated `Cause`. * * @example * ```ts * import { Cause, Effect, Sink, Stream } from "effect" * * // Create a sink that fails with a lazy cause * const sink = Sink.failCauseSync(() => Cause.fail(new Error("Lazy cause"))) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).catch(console.log) * // Output: Error: Lazy cause * ``` * * @since 2.0.0 * @category constructors */ export const failCauseSync = (evaluate: LazyArg>): Sink => fromEffectEnd(Effect.failCauseSync(evaluate)) /** * Creates a sink halting with a specified defect. * * @example * ```ts * import { Effect, Sink, Stream } from "effect" * * // Create a sink that dies with a defect * const sink = Sink.die(new Error("Defect error")) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program).catch(console.log) * // Output: Error: Defect error * ``` * * @since 2.0.0 * @category constructors */ export const die = (defect: unknown): Sink => fromEffectEnd(Effect.die(defect)) /** * A sink that never completes. * * @since 2.0.0 * @category constructors */ export const never: Sink = fromEffectEnd(Effect.never) /** * Drains the remaining elements from the stream after the sink finishes * * @since 2.0.0 * @category utils */ export const ignoreLeftover = (self: Sink): Sink => mapEnd(self, ([a]) => [a]) /** * Drains elements from the stream by ignoring all inputs. * * @since 2.0.0 * @category constructors */ export const drain: Sink = fromTransform((upstream) => Pull.catchDone( Effect.forever(upstream, { disableYield: true }), () => endVoid ) ) /** * A sink that folds its inputs with the provided function, termination * predicate and initial state. * * @since 2.0.0 * @category folding */ export const fold = ( s: LazyArg, contFn: Predicate, f: (s: S, input: In) => Effect.Effect ): Sink => fromTransform((upstream) => { let state = s() return Effect.gen(function*() { while (true) { const arr = yield* upstream for (let i = 0; i < arr.length; i++) { state = yield* f(state, arr[i]) if (contFn(state)) continue return [ state, (i + 1) < arr.length ? (arr.slice(i + 1) as any) : undefined ] as const } } }).pipe( Pull.catchDone(() => Effect.succeed>([state])) ) }) /** * @since 2.0.0 * @category folding */ export const foldArray = ( s: LazyArg, contFn: Predicate, f: (s: S, input: Arr.NonEmptyReadonlyArray) => Effect.Effect ): Sink => fromTransform((upstream) => { let state = s() return Effect.gen(function*() { while (true) { const arr = yield* upstream state = yield* f(state, arr) if (contFn(state)) continue return [state] as const } }).pipe( Pull.catchDone(() => Effect.succeed>([state])) ) }) /** * @since 2.0.0 * @category folding */ export const foldUntil = ( s: LazyArg, max: number, f: (s: S, input: In) => Effect.Effect ): Sink => fold( () => [s(), 0], (tuple) => tuple[1] < max, ([output, count], input) => Effect.map(f(output, input), (s) => [s, count + 1] as const) ).pipe( map((tuple) => tuple[0]) ) /** * A sink that returns whether all elements satisfy the specified predicate. * * @since 2.0.0 * @category constructors */ export const every = (predicate: Predicate): Sink => fold( constTrue, identity, (_, a) => Effect.succeed(predicate(a)) ) /** * A sink that returns whether an element satisfies the specified predicate. * * @since 2.0.0 * @category constructors */ export const some = (predicate: Predicate): Sink => fold( constFalse, (b) => !b, (_, a) => Effect.succeed(predicate(a)) ) /** * Transforms this sink's result. * * @since 2.0.0 * @category mapping */ export const map: { /** * Transforms this sink's result. * * @since 2.0.0 * @category mapping */ (f: (a: A) => A2): (self: Sink) => Sink /** * Transforms this sink's result. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (a: A) => A2): Sink } = dual( 2, (self: Sink, f: (a: A) => A2): Sink => mapEnd(self, ([a, l]) => [f(a), l]) ) /** * Set the sink's result to a constant value. * * @since 2.0.0 * @category mapping */ export const as: { /** * Set the sink's result to a constant value. * * @since 2.0.0 * @category mapping */ (a2: A2): (self: Sink) => Sink /** * Set the sink's result to a constant value. * * @since 2.0.0 * @category mapping */ (self: Sink, a2: A2): Sink } = dual( 2, (self: Sink, a2: A2): Sink => map(self, () => a2) ) /** * Transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ export const mapInput: { /** * Transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ (f: (input: In0) => In): (self: Sink) => Sink /** * Transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (input: In0) => In): Sink } = dual( 2, (self: Sink, f: (input: In0) => In): Sink => mapInputArray(self, Arr.map(f)) ) /** * Effectfully transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ export const mapInputEffect: { /** * Effectfully transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ (f: (input: In0) => Effect.Effect): (self: Sink) => Sink /** * Effectfully transforms this sink's input elements. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (input: In0) => Effect.Effect): Sink } = dual( 2, ( self: Sink, f: (input: In0) => Effect.Effect ): Sink => mapInputArrayEffect(self, Effect.forEach(f)) ) /** * Transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ export const mapInputArray: { /** * Transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ ( f: (input: Arr.NonEmptyReadonlyArray) => Arr.NonEmptyReadonlyArray ): (self: Sink) => Sink /** * Transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ ( self: Sink, f: (input: Arr.NonEmptyReadonlyArray) => Arr.NonEmptyReadonlyArray ): Sink } = dual( 2, ( self: Sink, f: (input: Arr.NonEmptyReadonlyArray) => Arr.NonEmptyReadonlyArray ): Sink => fromTransform((upstream, scope) => self.transform(Effect.map(upstream, f), scope)) ) /** * Effectfully transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ export const mapInputArrayEffect: { /** * Effectfully transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ ( f: (input: Arr.NonEmptyReadonlyArray) => Effect.Effect, E2, R2> ): (self: Sink) => Sink /** * Effectfully transforms this sink's input elements. * * @since 4.0.0 * @category mapping */ ( self: Sink, f: (input: Arr.NonEmptyReadonlyArray) => Effect.Effect, E2, R2> ): Sink } = dual( 2, ( self: Sink, f: (input: Arr.NonEmptyReadonlyArray) => Effect.Effect, E2, R2> ): Sink => fromTransform((upstream, scope) => self.transform( Effect.flatMap(upstream, f) as any, scope ) ) ) /** * Transforms this sink's result. * * @since 4.0.0 * @category mapping */ export const mapEnd: { /** * Transforms this sink's result. * * @since 4.0.0 * @category mapping */ (f: (a: End) => End): (self: Sink) => Sink /** * Transforms this sink's result. * * @since 4.0.0 * @category mapping */ (self: Sink, f: (a: End) => End): Sink } = dual( 2, ( self: Sink, f: (a: End) => End ): Sink => fromTransform((upstream, scope) => Effect.map( self.transform(upstream, scope), f ) ) ) const transformEffect = ( self: Sink, f: (effect: Effect.Effect, E, R>) => Effect.Effect, E2, R2> ): Sink => fromTransform((upstream, scope) => f(self.transform(upstream, scope))) /** * Effectfully transforms this sink's result. * * @since 4.0.0 * @category mapping */ export const mapEffectEnd: { /** * Effectfully transforms this sink's result. * * @since 4.0.0 * @category mapping */ (f: (end: End) => Effect.Effect, E2, R2>): (self: Sink) => Sink /** * Effectfully transforms this sink's result. * * @since 4.0.0 * @category mapping */ ( self: Sink, f: (end: End) => Effect.Effect, E2, R2> ): Sink } = dual(2, ( self: Sink, f: (end: End) => Effect.Effect, E2, R2> ): Sink => transformEffect(self, Effect.flatMap(f))) /** * Effectfully transforms this sink's result. * * @since 2.0.0 * @category mapping */ export const mapEffect: { /** * Effectfully transforms this sink's result. * * @since 2.0.0 * @category mapping */ (f: (a: A) => Effect.Effect): (self: Sink) => Sink /** * Effectfully transforms this sink's result. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (a: A) => Effect.Effect): Sink } = dual(2, ( self: Sink, f: (a: A) => Effect.Effect ): Sink => mapEffectEnd(self, ([a, l]) => Effect.map(f(a), (a2) => [a2, l] as End))) /** * Transforms the errors emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ export const mapError: { /** * Transforms the errors emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ (f: (error: E) => E2): (self: Sink) => Sink /** * Transforms the errors emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (error: E) => E2): Sink } = dual(2, ( self: Sink, f: (error: E) => E2 ): Sink => transformEffect(self, Effect.mapError(f))) /** * Transforms the leftovers emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ export const mapLeftover: { /** * Transforms the leftovers emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ (f: (leftover: L) => L2): (self: Sink) => Sink /** * Transforms the leftovers emitted by this sink using `f`. * * @since 2.0.0 * @category mapping */ (self: Sink, f: (leftover: L) => L2): Sink } = dual(2, ( self: Sink, f: (leftover: L) => L2 ): Sink => mapEnd(self, ([a, l]) => [a, l && Arr.map(l, f)])) /** * @since 2.0.0 * @category collecting */ export const take = (n: number): Sink, In, In> => fromTransform((upstream) => { const taken: Array = [] if (n <= 0) { return Effect.succeed([taken] as const) } let leftover: NonEmptyReadonlyArray | undefined = undefined return upstream.pipe( Effect.flatMap((arr) => { if (taken.length + arr.length <= n) { taken.push(...arr) if (taken.length === n) { return Cause.done() } return Effect.void } for (let i = 0; i < arr.length; i++) { taken.push(arr[i]) if (taken.length === n) { if ((i + 1) < arr.length) { leftover = arr.slice(i + 1) as any } return Cause.done() } } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([taken, leftover] as const)) ) }) /** * Runs this sink until it yields a result, then uses that result to create * another sink from the provided function which will continue to run until it * yields a result. * * This function essentially runs sinks in sequence. * * @since 2.0.0 * @category sequencing */ export const flatMap: { /** * Runs this sink until it yields a result, then uses that result to create * another sink from the provided function which will continue to run until it * yields a result. * * This function essentially runs sinks in sequence. * * @since 2.0.0 * @category sequencing */ (f: (a: A) => Sink): (self: Sink) => Sink /** * Runs this sink until it yields a result, then uses that result to create * another sink from the provided function which will continue to run until it * yields a result. * * This function essentially runs sinks in sequence. * * @since 2.0.0 * @category sequencing */ (self: Sink, f: (a: A) => Sink): Sink } = dual(2, ( self: Sink, f: (a: A) => Sink ): Sink => fromTransform((upstream, scope) => { let upstreamDone = false const pull = Effect.catchCause(upstream, (cause) => { upstreamDone = true return Effect.failCause(cause) }) return Effect.flatMap( self.transform(pull, scope), ([a, leftover]) => f(a).transform( Effect.suspend(() => { if (leftover) { const arr = leftover as Arr.NonEmptyReadonlyArray leftover = undefined return Effect.succeed(arr) } else if (upstreamDone) { return Cause.done() } return upstream }), scope ) ) })) /** * A sink that reduces its inputs using the provided function `f` starting from * the provided `initial` state while the specified `predicate` returns `true`. * * @since 2.0.0 * @category reducing */ export const reduceWhile = ( initial: LazyArg, predicate: Predicate, f: (s: S, input: In) => S ): Sink => fromTransform((upstream) => { let state = initial() let leftover: NonEmptyReadonlyArray | undefined = undefined if (!predicate(state)) { return Effect.succeed([state] as const) } return upstream.pipe( Effect.flatMap((arr) => { for (let i = 0; i < arr.length; i++) { state = f(state, arr[i]) if (!predicate(state)) { if ((i + 1) < arr.length) { leftover = arr.slice(i + 1) as any } return Cause.done() } } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([state, leftover] as const)) ) }) /** * A sink that reduces its inputs using the provided effectful function `f` * starting from the provided `initial` state while the specified `predicate` * returns `true`. * * @since 2.0.0 * @category reducing */ export const reduceWhileEffect = ( initial: LazyArg, predicate: Predicate, f: (s: S, input: In) => Effect.Effect ): Sink => fromTransform((upstream) => { let state = initial() let leftover: NonEmptyReadonlyArray | undefined = undefined if (!predicate(state)) { return Effect.succeed([state] as const) } return upstream.pipe( Effect.flatMap((arr) => { let i = 0 return Effect.whileLoop({ while: () => i < arr.length, body: constant(Effect.flatMap(Effect.suspend(() => f(state, arr[i++])), (s) => { state = s if (!predicate(state)) { if (i < arr.length) { leftover = arr.slice(i) as any } return Cause.done() } return Effect.void })), step: constVoid }) }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([state, leftover] as const)) ) }) /** * A sink that reduces its inputs using the provided function `f` starting from * the provided `initial` state while the specified `predicate` returns `true`. * * @since 4.0.0 * @category reducing */ export const reduceWhileArray = ( initial: LazyArg, contFn: Predicate, f: (s: S, input: NonEmptyReadonlyArray) => S ): Sink => fromTransform((upstream) => { let state = initial() if (!contFn(state)) { return Effect.succeed([state] as const) } return upstream.pipe( Effect.flatMap((arr) => { for (let i = 0; i < arr.length; i++) { state = f(state, arr) if (!contFn(state)) { return Cause.done() } } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([state] as const)) ) }) /** * A sink that reduces its inputs using the provided effectful function `f` * starting from the provided `initial` state while the specified `predicate` * returns `true`. * * @since 4.0.0 * @category reducing */ export const reduceWhileArrayEffect = ( initial: LazyArg, predicate: Predicate, f: (s: S, input: NonEmptyReadonlyArray) => Effect.Effect ): Sink => fromTransform((upstream) => { let state = initial() if (!predicate(state)) { return Effect.succeed([state] as const) } return upstream.pipe( Effect.flatMap((arr) => f(state, arr)), Effect.flatMap((s) => { state = s if (!predicate(state)) { return Cause.done() } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([state] as const)) ) }) /** * A sink that reduces its inputs using the provided function `f` starting from * the provided `initial` state. * * @since 2.0.0 * @category reducing */ export const reduce = (initial: LazyArg, f: (s: S, input: In) => S): Sink => reduceArray(initial, (s, arr) => { for (let i = 0; i < arr.length; i++) { s = f(s, arr[i]) } return s }) /** * A sink that reduces its inputs using the provided function `f` starting from * the specified `initial` state. * * @since 2.0.0 * @category reducing */ export const reduceArray = ( initial: LazyArg, f: (s: S, input: NonEmptyReadonlyArray) => S ): Sink => fromTransform((upstream) => { let state = initial() return upstream.pipe( Effect.flatMap((arr) => { state = f(state, arr) return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([state] as const)) ) }) /** * A sink that reduces its inputs using the provided effectful function `f` * starting from the specified `initial` state. * * @since 2.0.0 * @category reducing */ export const reduceEffect = ( initial: LazyArg, f: (s: S, input: In) => Effect.Effect ): Sink => reduceWhileEffect(initial, constTrue, f) as any const head_ = reduceWhile(Option.none, Option.isNone, (_, in_) => Option.some(in_)) /** * Creates a sink containing the first value. * * @since 2.0.0 * @category constructors */ export const head = (): Sink, In, In> => head_ as any const last_ = reduceArray(Option.none, (_, arr) => Arr.last(arr)) /** * Creates a sink containing the last value. * * @since 2.0.0 * @category constructors */ export const last = (): Sink, In> => last_ as any /** * Creates a sink containing the first matching value. * * @since 4.0.0 * @category constructors */ export const find: { /** * Creates a sink containing the first matching value. * * @since 4.0.0 * @category constructors */ (refinement: Refinement): Sink, In, In> /** * Creates a sink containing the first matching value. * * @since 4.0.0 * @category constructors */ (predicate: Predicate): Sink, In, In> } = (predicate: Predicate): Sink, In, In> => reduceWhile( Option.none, Option.isNone, (acc, in_) => predicate(in_) ? Option.some(in_) : acc ) /** * Creates a sink containing the first matching value. * * @since 4.0.0 * @category constructors */ export const findEffect = ( predicate: (input: In) => Effect.Effect ): Sink, In, In, E, R> => reduceWhileEffect( Option.none, Option.isNone, (acc, in_) => Effect.map(predicate(in_), (b) => b ? Option.some(in_) : acc) ) /** * Creates a sink which sums up its inputs. * * @since 2.0.0 * @category constructors */ export const sum: Sink = reduceArray(() => 0, (s, arr) => { for (let i = 0; i < arr.length; i++) { s += arr[i] } return s }) /** * A sink that counts the number of elements fed to it. * * @since 2.0.0 * @category constructors */ export const count: Sink = reduceArray(() => 0, (s, arr) => s + arr.length) /** * Accumulates incoming elements into an array. * * @since 2.0.0 * @category constructors */ export const collect = (): Sink, In> => reduceArray(Arr.empty, (s, arr) => { s.push(...arr) return s }) /** * @since 4.0.0 * @category constructors */ export const takeWhile: { /** * @since 4.0.0 * @category constructors */ (refinement: Refinement): Sink, In, In> /** * @since 4.0.0 * @category constructors */ (predicate: Predicate): Sink, In, In> } = (predicate: Predicate): Sink, In, In> => fromTransform((upstream) => { const out = Arr.empty() return upstream.pipe( Effect.flatMap((arr) => { for (let i = 0; i < arr.length; i++) { if (!predicate(arr[i])) { const leftover: Arr.NonEmptyReadonlyArray | undefined = (i + 1) < arr.length ? arr.slice(i + 1) as any : undefined return Cause.done([out, leftover] as const) } out.push(arr[i]) } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone((end) => Effect.succeed, In>>(end ?? [out])) ) }) /** * @since 4.0.0 * @category constructors */ export const takeWhileFilter = ( filter: Filter.Filter ): Sink, In, In> => fromTransform((upstream) => { const out = Arr.empty() return upstream.pipe( Effect.flatMap((arr) => { for (let i = 0; i < arr.length; i++) { const result = filter(arr[i]) if (Result.isFailure(result)) { const leftover: Arr.NonEmptyReadonlyArray | undefined = (i + 1) < arr.length ? arr.slice(i + 1) as any : undefined return Cause.done([out, leftover] as const) } out.push(result.success) } return Effect.void }), Effect.forever({ disableYield: true }), Pull.catchDone((end) => Effect.succeed, In>>(end ?? [out])) ) }) /** * @since 4.0.0 * @category constructors */ export const takeWhileEffect: { /** * @since 4.0.0 * @category constructors */ (predicate: (input: In) => Effect.Effect): Sink, In, In, E, R> } = ( predicate: (input: In) => Effect.Effect ): Sink, In, In, E, R> => fromTransform((upstream) => { const out = Arr.empty() let leftover: Arr.NonEmptyReadonlyArray | undefined = undefined return upstream.pipe( Effect.flatMap((arr) => { let i = 0 return Effect.whileLoop({ while: () => i < arr.length, body: constant(Effect.flatMap( Effect.suspend(() => { const input = arr[i++] return Effect.map(predicate(input), (passes) => [input, passes] as const) }), ([input, passes]) => { if (!passes) { if (i < arr.length) { leftover = arr.slice(i) as any } return Cause.done() } out.push(input) return Effect.void } )), step: constVoid }) }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([out, leftover] as const)) ) }) /** * @since 4.0.0 * @category constructors */ export const takeWhileFilterEffect = ( filter: Filter.FilterEffect ): Sink, In, In, E, R> => fromTransform((upstream) => { const out = Arr.empty() let leftover: Arr.NonEmptyReadonlyArray | undefined = undefined return upstream.pipe( Effect.flatMap((arr) => { let i = 0 return Effect.whileLoop({ while: () => i < arr.length, body: constant(Effect.flatMap(Effect.suspend(() => filter(arr[i++])), (result) => { if (Result.isFailure(result)) { if (i < arr.length) { leftover = arr.slice(i) as any } return Cause.done() } out.push(result.success) return Effect.void })), step: constVoid }) }), Effect.forever({ disableYield: true }), Pull.catchDone(() => Effect.succeed([out, leftover] as const)) ) }) /** * @since 4.0.0 * @category constructors */ export const takeUntil = (predicate: Predicate): Sink, In, In> => suspend(() => { let done = false return takeWhile((i) => { if (done) return false done = predicate(i) return true }) }) /** * @since 4.0.0 * @category constructors */ export const takeUntilEffect = ( predicate: (input: In) => Effect.Effect ): Sink, In, In, E, R> => suspend(() => { let done = false return takeWhileEffect((input) => { if (done) { return Effect.succeed(false) } return Effect.map(predicate(input), (b) => { done = b return true }) }) }) /** * A sink that executes the provided effectful function for every item fed * to it. * * @example * ```ts * import { Console, Effect, Sink, Stream } from "effect" * * // Create a sink that logs each item * const sink = Sink.forEach((item: number) => Console.log(`Processing: ${item}`)) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program) * // Output: * // Processing: 1 * // Processing: 2 * // Processing: 3 * ``` * * @since 2.0.0 * @category constructors */ export const forEach = ( f: (input: In) => Effect.Effect ): Sink => forEachArray(Effect.forEach((_) => f(_), { discard: true })) /** * A sink that executes the provided effectful function for every Chunk fed * to it. * * @example * ```ts * import { Console, Effect, Sink, Stream } from "effect" * * // Create a sink that processes chunks * const sink = Sink.forEachArray((chunk: ReadonlyArray) => * Console.log( * `Processing chunk of ${chunk.length} items: [${chunk.join(", ")}]` * ) * ) * * // Use it with a stream * const stream = Stream.make(1, 2, 3, 4, 5) * const program = Stream.run(stream, sink) * * Effect.runPromise(program) * // Output: Processing chunk of 5 items: [1, 2, 3, 4, 5] * ``` * * @since 4.0.0 * @category constructors */ export const forEachArray = ( f: (input: NonEmptyReadonlyArray) => Effect.Effect ): Sink => fromTransform((upstream) => upstream.pipe( Effect.flatMap(f), Effect.forever({ disableYield: true }), Pull.catchDone(() => endVoid) ) ) /** * @since 2.0.0 * @category constructors */ export const forEachWhile = ( f: (input: In) => Effect.Effect ): Sink => forEachWhileArray(Effect.fnUntraced(function*(input) { for (let i = 0; i < input.length; i++) { const cont = yield* f(input[i]) if (!cont) return false } return true })) /** * @since 2.0.0 * @category constructors */ export const forEachWhileArray = ( f: (input: NonEmptyReadonlyArray) => Effect.Effect ): Sink => fromTransform((upstream) => upstream.pipe( Effect.flatMap(f), Effect.flatMap((cont) => cont ? Effect.void : Cause.done()), Effect.forever({ disableYield: true }), Pull.catchDone(() => endVoid) ) ) /** * Creates a sink produced from a scoped effect. * * @example * ```ts * import { Console, Effect, Sink, Stream } from "effect" * * // Create a sink from an effect that produces a sink * const sinkEffect = Effect.succeed( * Sink.forEach((item: number) => Console.log(`Item: ${item}`)) * ) * const sink = Sink.unwrap(sinkEffect) * * // Use it with a stream * const stream = Stream.make(1, 2, 3) * const program = Stream.run(stream, sink) * * Effect.runPromise(program) * // Output: * // Item: 1 * // Item: 2 * // Item: 3 * ``` * * @since 2.0.0 * @category constructors */ export const unwrap = ( effect: Effect.Effect, E, R> ): Sink | R2> => fromChannel(Channel.unwrap(Effect.map(effect, toChannel))) /** * Summarize a sink by running an effect when the sink starts and again when * it completes. * * @since 2.0.0 * @category utils */ export const summarized: { /** * Summarize a sink by running an effect when the sink starts and again when * it completes. * * @since 2.0.0 * @category utils */ (summary: Effect.Effect, f: (start: A2, end: A2) => A3): (self: Sink) => Sink<[A, A3], In, L, E2 | E, R2 | R> /** * Summarize a sink by running an effect when the sink starts and again when * it completes. * * @since 2.0.0 * @category utils */ ( self: Sink, summary: Effect.Effect, f: (start: A2, end: A2) => A3 ): Sink<[A, A3], In, L, E | E2, R | R2> } = dual(3, ( self: Sink, summary: Effect.Effect, f: (start: A2, end: A2) => A3 ): Sink<[A, A3], In, L, E | E2, R | R2> => fromTransform(Effect.fnUntraced(function*(upstream, scope) { const start = yield* summary const [done, leftover] = yield* self.transform(upstream, scope) const end = yield* summary return [[done, f(start, end)], leftover] as const }))) /** * Returns the sink that executes this one and times its execution. * * @since 2.0.0 * @category utils */ export const withDuration = ( self: Sink ): Sink<[A, Duration.Duration], In, L, E, R> => summarized(self, Clock.currentTimeNanos, (start, end) => Duration.nanos(end - start)) /** * @since 2.0.0 * @category constructors */ export const timed: Sink = map(withDuration(drain), ([, duration]) => duration) /** * @since 4.0.0 * @category Services */ export const provideContext: { /** * @since 4.0.0 * @category Services */ (context: Context.Context): (self: Sink) => Sink> /** * @since 4.0.0 * @category Services */ (self: Sink, context: Context.Context): Sink> } = dual(2, ( self: Sink, context: Context.Context ): Sink> => fromTransform((upstream, scope) => self.transform(upstream, scope).pipe( Effect.provideContext(context) ) )) /** * @since 4.0.0 * @category Services */ export const provideService: { /** * @since 4.0.0 * @category Services */ (key: Context.Key, value: Types.NoInfer): (self: Sink) => Sink> /** * @since 4.0.0 * @category Services */ ( self: Sink, key: Context.Key, value: Types.NoInfer ): Sink> } = dual(3, ( self: Sink, key: Context.Key, value: Types.NoInfer ): Sink> => fromTransform((upstream, scope) => self.transform(upstream, scope).pipe( Effect.provideService(key, value) ) )) /** * @since 2.0.0 * @category Error handling */ export const orElse: { /** * @since 2.0.0 * @category Error handling */ (f: (error: Types.NoInfer) => Sink): (self: Sink) => Sink /** * @since 2.0.0 * @category Error handling */ (self: Sink, f: (error: E) => Sink): Sink } = dual(2, ( self: Sink, f: (error: E) => Sink ): Sink => fromTransform((upstream, scope) => { let upstreamDone = false const pull = Effect.catchCause(upstream, (cause) => { upstreamDone = true return Effect.failCause(cause) }) return Effect.catch( self.transform(pull, scope) as Effect.Effect, E, R>, (error) => f(error).transform( Effect.suspend(() => { if (upstreamDone) { return Cause.done() } return upstream }), scope ) ) })) /** * @since 4.0.0 * @category Error handling */ export const catchCause: { /** * @since 4.0.0 * @category Error handling */ (f: (error: Cause.Cause>) => Effect.Effect): (self: Sink) => Sink /** * @since 4.0.0 * @category Error handling */ ( self: Sink, f: (error: Cause.Cause) => Effect.Effect ): Sink } = dual(2, ( self: Sink, f: (error: Cause.Cause) => Effect.Effect ): Sink => transformEffect( self, Effect.catchCause((cause) => Effect.map(f(cause), (a2) => [a2 as A | A2] as const)) )) const catch_: { ( f: (error: Types.NoInfer) => Effect.Effect ): (self: Sink) => Sink ( self: Sink, f: (error: E) => Effect.Effect ): Sink } = dual(2, ( self: Sink, f: (error: E) => Effect.Effect ): Sink => transformEffect( self, Effect.catch((error) => Effect.map(f(error), (a2) => [a2 as A | A2] as const)) )) export { /** * @since 4.0.0 * @category Error handling */ catch_ as catch } /** * @since 4.0.0 * @category Finalization */ export const onExit: { /** * @since 4.0.0 * @category Finalization */ (f: (exit: Exit.Exit) => Effect.Effect): (self: Sink) => Sink /** * @since 4.0.0 * @category Finalization */ ( self: Sink, f: (exit: Exit.Exit) => Effect.Effect ): Sink } = dual(2, ( self: Sink, f: (exit: Exit.Exit) => Effect.Effect ): Sink => transformEffect( self, Effect.onExit((exit) => f(Exit.map(exit, ([a]) => a))) )) /** * @since 4.0.0 * @category Finalization */ export const ensuring: { /** * @since 4.0.0 * @category Finalization */ (effect: Effect.Effect): (self: Sink) => Sink /** * @since 4.0.0 * @category Finalization */ (self: Sink, effect: Effect.Effect): Sink } = dual(2, ( self: Sink, effect: Effect.Effect ): Sink => onExit(self, () => effect))