/** * @since 3.8.0 */ import * as Arr from "./Array.ts"; import type { Cause, Done } from "./Cause.ts"; import type { Effect } from "./Effect.ts"; import type { Exit, Failure } from "./Exit.ts"; import type { Inspectable } from "./Inspectable.ts"; import * as MutableList from "./MutableList.ts"; import * as Option from "./Option.ts"; import * as Pull from "./Pull.ts"; import type { SchedulerDispatcher } from "./Scheduler.ts"; import type * as Types from "./Types.ts"; declare const TypeId = "~effect/Queue"; declare const EnqueueTypeId = "~effect/Queue/Enqueue"; declare const DequeueTypeId = "~effect/Queue/Dequeue"; /** * Type guard to check if a value is a Queue. * * @since 3.8.0 * @category Guards */ export declare const isQueue: (u: unknown) => u is Queue; /** * Type guard to check if a value is an Enqueue. * * @since 4.0.0 * @category Guards */ export declare const isEnqueue: (u: unknown) => u is Enqueue; /** * Type guard to check if a value is a Dequeue. * * @since 4.0.0 * @category Guards */ export declare const isDequeue: (u: unknown) => u is Dequeue; /** * Converts a Queue to an Enqueue (write-only interface). * * @since 4.0.0 * @category Conversions */ export declare const asEnqueue: (self: Queue) => Enqueue; /** * Convert a Queue to a Dequeue, allowing only read operations. * * @since 4.0.0 * @category Conversions */ export declare const asDequeue: (self: Queue) => Dequeue; /** * An `Enqueue` is a queue that can be offered to. * * This interface represents the write-only part of a Queue, allowing you to offer * elements to the queue but not take elements from it. * * @example * ```ts * import { Effect, Queue } from "effect" * * // Function that only needs write access to a queue * const producer = (enqueue: Queue.Enqueue) => * Effect.gen(function*() { * yield* Queue.offer(enqueue as Queue.Queue, "hello") * yield* Queue.offerAll(enqueue as Queue.Queue, ["world", "!"]) * }) * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * yield* producer(queue) * }) * ``` * * @since 4.0.0 * @category Models */ export interface Enqueue extends Inspectable { readonly [EnqueueTypeId]: Enqueue.Variance; readonly strategy: "suspend" | "dropping" | "sliding"; readonly dispatcher: SchedulerDispatcher; capacity: number; messages: MutableList.MutableList; state: Queue.State; scheduleRunning: boolean; } /** * @since 4.0.0 * @category Models */ export declare namespace Enqueue { /** * Variance interface for Enqueue types, defining the type parameter constraints. * * @since 4.0.0 * @category Models */ interface Variance { _A: Types.Contravariant; _E: Types.Contravariant; } } /** * A `Dequeue` is a queue that can be taken from. * * This interface represents the read-only part of a Queue, allowing you to take * elements from the queue but not offer elements to it. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // A Dequeue can only take elements * const dequeue: Queue.Dequeue = queue * * // Pre-populate the queue * yield* Queue.offerAll(queue, ["a", "b", "c"]) * * // Take elements using dequeue interface * const item = yield* Queue.take(dequeue) * console.log(item) // "a" * }) * ``` * * @since 3.8.0 * @category Models */ export interface Dequeue extends Inspectable { readonly [DequeueTypeId]: Dequeue.Variance; readonly strategy: "suspend" | "dropping" | "sliding"; readonly dispatcher: SchedulerDispatcher; capacity: number; messages: MutableList.MutableList; state: Queue.State; scheduleRunning: boolean; } /** * @since 4.0.0 * @category Models */ export declare namespace Dequeue { /** * Variance interface for Dequeue types, defining the type parameter constraints. * * @since 3.8.0 * @category Models */ interface Variance { _A: Types.Covariant; _E: Types.Covariant; } } /** * A `Queue` is an asynchronous queue that can be offered to and taken from. * * It also supports signaling that it is done or failed. * * **Previously Known As** * * This API replaces the following from Effect 3.x: * * - `Mailbox` * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * // Create a bounded queue * const queue = yield* Queue.bounded(10) * * // Producer: offer items to the queue * yield* Queue.offer(queue, "hello") * yield* Queue.offerAll(queue, ["world", "!"]) * * // Consumer: take items from the queue * const item1 = yield* Queue.take(queue) * const item2 = yield* Queue.take(queue) * const item3 = yield* Queue.take(queue) * * console.log([item1, item2, item3]) // ["hello", "world", "!"] * }) * ``` * * @since 3.8.0 * @category Models */ export interface Queue extends Enqueue, Dequeue { readonly [TypeId]: Queue.Variance; } /** * @since 3.8.0 * @category Models */ export declare namespace Queue { /** * Variance interface for Queue types, defining the type parameter constraints. * * @since 3.8.0 * @category Models */ interface Variance { _A: Types.Invariant; _E: Types.Invariant; } /** * Represents the internal state of a Queue. * * @since 4.0.0 * @category Models */ type State = { readonly _tag: "Open"; readonly takers: Set<(_: Effect) => void>; readonly offers: Set>; readonly awaiters: Set<(_: Effect) => void>; } | { readonly _tag: "Closing"; readonly takers: Set<(_: Effect) => void>; readonly offers: Set>; readonly awaiters: Set<(_: Effect) => void>; readonly exit: Failure; } | { readonly _tag: "Done"; readonly exit: Failure; }; /** * Represents an entry in the queue's offer buffer. * * @since 4.0.0 * @category Models */ type OfferEntry = { readonly _tag: "Array"; readonly remaining: Array; offset: number; readonly resume: (_: Effect>) => void; } | { readonly _tag: "Single"; readonly message: A; readonly resume: (_: Effect) => void; }; } /** * A `Queue` is an asynchronous queue that can be offered to and taken from. * * It also supports signaling that it is done or failed. * * **Previously Known As** * * This API replaces the following from Effect 3.x: * * - `Mailbox.make` * * @since 3.8.0 * @category Constructors * @example * ```ts * import { Cause, Effect, Queue } from "effect" * import * as assert from "node:assert" * * Effect.gen(function*() { * const queue = yield* Queue.make() * * // add messages to the queue * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * yield* Queue.offerAll(queue, [3, 4, 5]) * * // take messages from the queue * const messages = yield* Queue.takeAll(queue) * assert.deepStrictEqual(messages, [1, 2, 3, 4, 5]) * * // signal that the queue is done * yield* Queue.end(queue) * const done = yield* Effect.flip(Queue.takeAll(queue)) * assert.deepStrictEqual(done, Cause.Done) * * // signal that the queue has failed * yield* Queue.fail(queue, "boom") * }) * ``` */ export declare const make: (options?: { readonly capacity?: number | undefined; readonly strategy?: "suspend" | "dropping" | "sliding" | undefined; } | undefined) => Effect>; /** * Creates a bounded queue with the specified capacity that uses backpressure strategy. * * When the queue reaches capacity, producers will be suspended until space becomes available. * This ensures all messages are processed but may slow down producers. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(5) * * // This will succeed as queue has capacity * yield* Queue.offer(queue, "first") * yield* Queue.offer(queue, "second") * * const size = yield* Queue.size(queue) * console.log(size) // 2 * }) * ``` * * @since 2.0.0 * @category Constructors */ export declare const bounded: (capacity: number) => Effect>; /** * Creates a bounded queue with sliding strategy. When the queue reaches capacity, * new elements are added and the oldest elements are dropped. * * This strategy prevents producers from being blocked but may result in message loss. * Useful when you want to maintain a rolling window of the most recent messages. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.sliding(3) * * // Fill the queue to capacity * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * yield* Queue.offer(queue, 3) * * // This will succeed, dropping the oldest element (1) * yield* Queue.offer(queue, 4) * * const all = yield* Queue.takeAll(queue) * console.log(all) // [2, 3, 4] - oldest element (1) was dropped * }) * ``` * * @since 2.0.0 * @category Constructors */ export declare const sliding: (capacity: number) => Effect>; /** * Creates a bounded queue with dropping strategy. When the queue reaches capacity, * new elements are dropped and the offer operation returns false. * * This strategy prevents producers from being blocked and preserves existing messages, * but new messages may be lost when the queue is full. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.dropping(2) * * // Fill the queue to capacity * const success1 = yield* Queue.offer(queue, 1) * const success2 = yield* Queue.offer(queue, 2) * console.log(success1, success2) // true, true * * // This will be dropped * const success3 = yield* Queue.offer(queue, 3) * console.log(success3) // false * * const all = yield* Queue.takeAll(queue) * console.log(all) // [1, 2] - element 3 was dropped * }) * ``` * * @since 2.0.0 * @category Constructors */ export declare const dropping: (capacity: number) => Effect>; /** * Creates an unbounded queue that can grow to any size without blocking producers. * * Unlike bounded queues, unbounded queues never apply backpressure - producers * can always add messages successfully. This is useful when you want to prioritize * producer throughput over memory usage control. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.unbounded() * * // Producers can always add messages without blocking * yield* Queue.offer(queue, "message1") * yield* Queue.offer(queue, "message2") * yield* Queue.offerAll(queue, ["message3", "message4", "message5"]) * * // Check current size * const size = yield* Queue.size(queue) * console.log(size) // Some(5) * * // Take all messages * const messages = yield* Queue.takeAll(queue) * console.log(messages) // ["message1", "message2", "message3", "message4", "message5"] * }) * ``` * * @since 2.0.0 * @category Constructors */ export declare const unbounded: () => Effect>; /** * Add a message to the queue. Returns `false` if the queue is done. * * For bounded queues, this operation may suspend if the queue is at capacity, * depending on the backpressure strategy. For dropping/sliding queues, it may * return false or succeed immediately by dropping/sliding existing messages. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * // Successfully add messages to queue * const success1 = yield* Queue.offer(queue, 1) * const success2 = yield* Queue.offer(queue, 2) * console.log(success1, success2) // true, true * * // Queue state * const size = yield* Queue.size(queue) * console.log(size) // 2 * }) * ``` * * @category Offering * @since 4.0.0 */ export declare const offer: (self: Enqueue, message: Types.NoInfer) => Effect; /** * Add a message to the queue synchronously. Returns `false` if the queue is done. * * This is an unsafe operation that directly modifies the queue without Effect wrapping. * Use this only when you're certain about the synchronous nature of the operation. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * // Create a queue effect and extract the queue for unsafe operations * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * // Add messages synchronously using unsafe API * const success1 = Queue.offerUnsafe(queue, 1) * const success2 = Queue.offerUnsafe(queue, 2) * console.log(success1, success2) // true, true * * // Check current size * const size = Queue.sizeUnsafe(queue) * console.log(size) // 2 * }) * ``` * * @category Offering * @since 4.0.0 */ export declare const offerUnsafe: (self: Enqueue, message: Types.NoInfer) => boolean; /** * Add multiple messages to the queue. Returns the remaining messages that * were not added. * * For bounded queues, this operation may suspend if the queue doesn't have * enough capacity. The operation returns an array of messages that couldn't * be added (empty array means all messages were successfully added). * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * // Try to add more messages than capacity * const remaining1 = yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) * console.log(remaining1) // [4, 5] - couldn't fit the last 2 * }) * ``` * * @category Offering * @since 4.0.0 */ export declare const offerAll: (self: Enqueue, messages: Iterable) => Effect>; /** * Add multiple messages to the queue synchronously. Returns the remaining messages that * were not added. * * This is an unsafe operation that directly modifies the queue without Effect wrapping. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * // Create a bounded queue and use unsafe API * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * // Try to add 5 messages to capacity-3 queue using unsafe API * const remaining = Queue.offerAllUnsafe(queue, [1, 2, 3, 4, 5]) * console.log(remaining) // [4, 5] - couldn't fit the last 2 * * // Check what's in the queue * const size = Queue.sizeUnsafe(queue) * console.log(size) // 3 * }) * ``` * * @category Offering * @since 4.0.0 */ export declare const offerAllUnsafe: (self: Enqueue, messages: Iterable) => Array; /** * Fail the queue with an error. If the queue is already done, `false` is * returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * * // Fail the queue with an error * const failed = yield* Queue.fail(queue, "Something went wrong") * console.log(failed) // true * * // Subsequent operations will reflect the failure * // Taking from failed queue will fail with the error * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const fail: (self: Enqueue, error: E) => Effect; /** * Fail the queue with a cause. If the queue is already done, `false` is * returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * * // Create a cause and fail the queue * const cause = Cause.fail("Queue processing failed") * const failed = yield* Queue.failCause(queue, cause) * console.log(failed) // true * * // The queue is now in failed state with the specified cause * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const failCause: { /** * Fail the queue with a cause. If the queue is already done, `false` is * returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * * // Create a cause and fail the queue * const cause = Cause.fail("Queue processing failed") * const failed = yield* Queue.failCause(queue, cause) * console.log(failed) // true * * // The queue is now in failed state with the specified cause * }) * ``` * * @category Completion * @since 4.0.0 */ (cause: Cause): (self: Enqueue) => Effect; /** * Fail the queue with a cause. If the queue is already done, `false` is * returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * * // Create a cause and fail the queue * const cause = Cause.fail("Queue processing failed") * const failed = yield* Queue.failCause(queue, cause) * console.log(failed) // true * * // The queue is now in failed state with the specified cause * }) * ``` * * @category Completion * @since 4.0.0 */ (self: Enqueue, cause: Cause): Effect; }; /** * Fail the queue with a cause synchronously. If the queue is already done, `false` is * returned. * * This is an unsafe operation that directly modifies the queue without Effect wrapping. * * @example * ```ts * import { Effect, Cause } from "effect" * import { Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * Queue.offerUnsafe(queue, 1) * * // Create a cause and fail the queue synchronously * const cause = Cause.fail("Processing error") * const failed = Queue.failCauseUnsafe(queue, cause) * console.log(failed) // true * * // The queue is now in failed state * console.log(queue.state._tag) // "Done" * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const failCauseUnsafe: (self: Enqueue, cause: Cause) => boolean; /** * Signal that the queue is complete. If the queue is already done, `false` is * returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * * // Signal completion - no more messages will be accepted * const ended = yield* Queue.end(queue) * console.log(ended) // true * * // Trying to offer more messages will return false * const offerResult = yield* Queue.offer(queue, 3) * console.log(offerResult) // false * * // But we can still take existing messages * const message = yield* Queue.take(queue) * console.log(message) // 1 * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const end: (self: Enqueue) => Effect; /** * Signal that the queue is complete synchronously. If the queue is already done, `false` is * returned. * * This is an unsafe operation that directly modifies the queue without Effect wrapping. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * // Create a queue and use unsafe operations * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * Queue.offerUnsafe(queue, 1) * Queue.offerUnsafe(queue, 2) * * // End the queue synchronously * const ended = Queue.endUnsafe(queue) * console.log(ended) // true * * // The queue is now done * console.log(queue.state._tag) // "Done" * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const endUnsafe: (self: Enqueue) => boolean; /** * Interrupts the queue gracefully, transitioning it to a closing state. * * This operation stops accepting new offers but allows existing messages to be consumed. * Once all messages are drained, the queue transitions to the Done state with an interrupt cause. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * * // Interrupt gracefully - no more offers accepted, but messages can be consumed * const interrupted = yield* Queue.interrupt(queue) * console.log(interrupted) // true * * // Trying to offer more messages will return false * const offerResult = yield* Queue.offer(queue, 3) * console.log(offerResult) // false * * // But we can still take existing messages * const message1 = yield* Queue.take(queue) * console.log(message1) // 1 * * const message2 = yield* Queue.take(queue) * console.log(message2) // 2 * * // After all messages are consumed, queue is done * const isDone = queue.state._tag === "Done" * console.log(isDone) // true * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const interrupt: (self: Enqueue) => Effect; /** * Shutdown the queue, canceling any pending operations. * If the queue is already done, `false` is returned. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(2) * * // Add messages * yield* Queue.offer(queue, 1) * yield* Queue.offer(queue, 2) * * // Try to add more than capacity (will be pending) * const pendingOffer = Queue.offer(queue, 3) * * // Shutdown cancels pending operations and clears the queue * const wasShutdown = yield* Queue.shutdown(queue) * console.log(wasShutdown) // true * * // Queue is now done and cleared * const size = yield* Queue.size(queue) * console.log(size) // 0 * }) * ``` * * @category Completion * @since 4.0.0 */ export declare const shutdown: (self: Enqueue) => Effect; /** * Take all messages from the queue, returning an empty array if the queue * is empty or done. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add several messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) * * // Clear all messages from the queue * const messages = yield* Queue.clear(queue) * console.log(messages) // [1, 2, 3, 4, 5] * * // Queue is now empty * const size = yield* Queue.size(queue) * console.log(size) // 0 * * // Clearing empty queue returns empty array * const empty = yield* Queue.clear(queue) * console.log(empty) // [] * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const clear: (self: Dequeue) => Effect, Pull.ExcludeDone>; /** * Take all messages from the queue, or wait for messages to be available. * * If the queue is done, the `done` flag will be `true`. If the queue * fails, the Effect will fail with the error. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(5) * * // Add several messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) * * // Take all available messages * const messages1 = yield* Queue.takeAll(queue) * console.log(messages1) // [1, 2, 3, 4, 5] * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const takeAll: (self: Dequeue) => Effect, E>; /** * Take all messages from the queue, until the queue has errored or is done. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(5) * * // Add several messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) * // Some time later, end the queue * yield* Effect.forkChild(Queue.end(queue)) * * // Collect all available messages * const messages = yield* Queue.collect(queue) * console.log(messages) // [1, 2, 3, 4, 5] * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const collect: (self: Dequeue) => Effect, Pull.ExcludeDone>; /** * Take a specified number of messages from the queue. It will only take * up to the capacity of the queue. * * If the queue is done, the `done` flag will be `true`. If the queue * fails, the Effect will fail with the error. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add several messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7]) * * // Take exactly 3 messages * const first3 = yield* Queue.takeN(queue, 3) * console.log(first3) // [1, 2, 3] * * // Take exactly 2 more messages * const next2 = yield* Queue.takeN(queue, 2) * console.log(next2) // [4, 5] * * // End the queue before taking; now it can return fewer than requested * yield* Queue.end(queue) * * // Take remaining messages (takes 2, even though we asked for 5) * const remaining = yield* Queue.takeN(queue, 5) * console.log(remaining) // [6, 7] * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const takeN: (self: Dequeue, n: number) => Effect, E>; /** * Take a variable number of messages from the queue, between specified min and max. * It will only take up to the capacity of the queue. * * If the queue is done, the `done` flag will be `true`. If the queue * fails, the Effect will fail with the error. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add several messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7, 8]) * * // Take between 2 and 5 messages * const batch1 = yield* Queue.takeBetween(queue, 2, 5) * console.log(batch1) // [1, 2, 3, 4, 5] - took 5 (up to max) * * // Take between 1 and 10 messages (but only 3 remain) * const batch2 = yield* Queue.takeBetween(queue, 1, 10) * console.log(batch2) // [6, 7, 8] - took 3 (all remaining) * * // No more messages available, will wait or return done * // const batch3 = yield* Queue.takeBetween(queue, 1, 3) * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const takeBetween: (self: Dequeue, min: number, max: number) => Effect, E>; /** * Take a single message from the queue, or wait for a message to be * available. * * If the queue is done, it will fail with `Done`. If the * queue fails, the Effect will fail with the error. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * // Add some messages * yield* Queue.offer(queue, "first") * yield* Queue.offer(queue, "second") * * // Take messages one by one * const msg1 = yield* Queue.take(queue) * const msg2 = yield* Queue.take(queue) * console.log(msg1, msg2) // "first", "second" * * // End the queue * yield* Queue.end(queue) * * // Taking from ended queue fails with None * const result = yield* Effect.match(Queue.take(queue), { * onFailure: (error: Cause.Done) => true, * onSuccess: (value: string) => false * }) * console.log("Queue ended:", result) // true * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const take: (self: Dequeue) => Effect; /** * Tries to take an item from the queue without blocking. * * Returns `Option.some` with the item if available, or `Option.none` if the queue is empty or done. * * @example * ```ts * import { Effect, Option, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Poll returns Option.none if empty * const maybe1 = yield* Queue.poll(queue) * console.log(Option.isNone(maybe1)) // true * * // Add an item * yield* Queue.offer(queue, 42) * * // Poll returns Option.some with the item * const maybe2 = yield* Queue.poll(queue) * console.log(Option.getOrNull(maybe2)) // 42 * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const poll: (self: Dequeue) => Effect>; /** * Views the next item without removing it. * * Blocks until an item is available. If the queue is done or fails, the error is propagated. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * yield* Queue.offer(queue, 42) * * // Peek at the next item without removing it * const item = yield* Queue.peek(queue) * console.log(item) // 42 * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const peek: (self: Dequeue) => Effect; /** * Take a single message from the queue synchronously, or wait for a message to be * available. * * If the queue is done, it will fail with `Done`. If the * queue fails, the Effect will fail with the error. * Returns `undefined` if no message is immediately available. * * This is an unsafe operation that directly accesses the queue without Effect wrapping. * * @example * ```ts * import { Effect, Queue } from "effect" * * // Create a queue and use unsafe operations * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Add some messages * Queue.offerUnsafe(queue, 1) * Queue.offerUnsafe(queue, 2) * * // Take a message synchronously * const result1 = Queue.takeUnsafe(queue) * console.log(result1) // Success(1) or Exit containing value 1 * * const result2 = Queue.takeUnsafe(queue) * console.log(result2) // Success(2) * * // No more messages - returns undefined * const result3 = Queue.takeUnsafe(queue) * console.log(result3) // undefined * }) * ``` * * @category Taking * @since 4.0.0 */ export declare const takeUnsafe: (self: Dequeue) => Exit | undefined; declare const await_: (self: Dequeue) => Effect>; export { /** * Wait for the queue to be done. * * @category Completion * @since 4.0.0 */ await_ as await }; /** * Check the size of the queue. * * If the queue is complete, it will return `None`. * * @example * ```ts * import { Cause, Effect, Option, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Check size of empty queue * const size1 = yield* Queue.size(queue) * console.log(size1) // 0 * * // Add some messages * yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) * * // Check size after adding messages * const size2 = yield* Queue.size(queue) * console.log(size2) // 5 * * // End the queue * yield* Queue.end(queue) * * // Size of ended queue is 0 * const size3 = yield* Queue.size(queue) * console.log(size3) // 0 * }) * ``` * * @category Size * @since 4.0.0 */ export declare const size: (self: Dequeue) => Effect; /** * Check if the queue is full. * * @example * ```ts * import { Cause, Effect, Option, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * console.log(yield* Queue.isFull(queue)) // false * * // Add some messages * yield* Queue.offerAll(queue, [1, 2, 3]) * * console.log(yield* Queue.isFull(queue)) // true * }) * ``` * * @category Size * @since 4.0.0 */ export declare const isFull: (self: Dequeue) => Effect; /** * Check the size of the queue synchronously. * * If the queue is complete, it will return `None`. * This is an unsafe operation that directly accesses the queue without Effect wrapping. * * @example * ```ts * import { Cause, Effect, Option, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Check size of empty queue * const size1 = Queue.sizeUnsafe(queue) * console.log(size1) // 0 * * // Add some messages * Queue.offerUnsafe(queue, 1) * Queue.offerUnsafe(queue, 2) * Queue.offerUnsafe(queue, 3) * * // Check size after adding messages * const size2 = Queue.sizeUnsafe(queue) * console.log(size2) // 3 * * // End the queue * Queue.endUnsafe(queue) * * // Size of ended queue is 0 * const size3 = Queue.sizeUnsafe(queue) * console.log(size3) // 0 * }) * ``` * * @category Size * @since 4.0.0 */ export declare const sizeUnsafe: (self: Dequeue) => number; /** * Check if the queue is full synchronously. * * @example * ```ts * import { Cause, Effect, Option, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(3) * * console.log(Queue.isFullUnsafe(queue)) // false * * // Add some messages * yield* Queue.offerAll(queue, [1, 2, 3]) * * console.log(Queue.isFullUnsafe(queue)) // true * }) * ``` * * @category Size * @since 4.0.0 */ export declare const isFullUnsafe: (self: Dequeue) => boolean; /** * Run an `Effect` into a `Queue`, where success ends the queue and failure * fails the queue. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Create an effect that succeeds * const dataProcessing = Effect.gen(function*() { * yield* Effect.sleep("100 millis") * return "Processing completed successfully" * }) * * // Pipe the effect into the queue * // If dataProcessing succeeds, queue ends successfully * // If dataProcessing fails, queue fails with the error * const effectIntoQueue = Queue.into(queue)(dataProcessing) * * const wasCompleted = yield* effectIntoQueue * console.log("Queue operation completed:", wasCompleted) // true * * // Queue state now reflects the effect's outcome * console.log("Queue state:", queue.state._tag) // "Done" * }) * ``` * * @since 3.8.0 * @category Completion */ export declare const into: { /** * Run an `Effect` into a `Queue`, where success ends the queue and failure * fails the queue. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Create an effect that succeeds * const dataProcessing = Effect.gen(function*() { * yield* Effect.sleep("100 millis") * return "Processing completed successfully" * }) * * // Pipe the effect into the queue * // If dataProcessing succeeds, queue ends successfully * // If dataProcessing fails, queue fails with the error * const effectIntoQueue = Queue.into(queue)(dataProcessing) * * const wasCompleted = yield* effectIntoQueue * console.log("Queue operation completed:", wasCompleted) // true * * // Queue state now reflects the effect's outcome * console.log("Queue state:", queue.state._tag) // "Done" * }) * ``` * * @since 3.8.0 * @category Completion */ (self: Enqueue): (effect: Effect) => Effect; /** * Run an `Effect` into a `Queue`, where success ends the queue and failure * fails the queue. * * @example * ```ts * import { Cause, Effect, Queue } from "effect" * * const program = Effect.gen(function*() { * const queue = yield* Queue.bounded(10) * * // Create an effect that succeeds * const dataProcessing = Effect.gen(function*() { * yield* Effect.sleep("100 millis") * return "Processing completed successfully" * }) * * // Pipe the effect into the queue * // If dataProcessing succeeds, queue ends successfully * // If dataProcessing fails, queue fails with the error * const effectIntoQueue = Queue.into(queue)(dataProcessing) * * const wasCompleted = yield* effectIntoQueue * console.log("Queue operation completed:", wasCompleted) // true * * // Queue state now reflects the effect's outcome * console.log("Queue state:", queue.state._tag) // "Done" * }) * ``` * * @since 3.8.0 * @category Completion */ (effect: Effect, self: Enqueue): Effect; }; //# sourceMappingURL=Queue.d.ts.map