/** * TxPubSub is a transactional publish/subscribe hub that provides Software Transactional Memory * (STM) semantics for message broadcasting. Publishers broadcast messages to all current * subscribers, with each subscriber receiving its own copy of every published message. * * Supports multiple queue strategies: bounded, unbounded, dropping, and sliding. * * @since 4.0.0 */ import * as Effect from "./Effect.ts"; import type { Inspectable } from "./Inspectable.ts"; import type { Pipeable } from "./Pipeable.ts"; import type * as Scope from "./Scope.ts"; import * as TxQueue from "./TxQueue.ts"; declare const TypeId = "~effect/transactions/TxPubSub"; /** * A TxPubSub represents a transactional publish/subscribe hub that broadcasts messages * to all current subscribers using Software Transactional Memory (STM) semantics. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, "hello") * const msg = yield* TxQueue.take(sub) * console.log(msg) // "hello" * }) * ) * }) * ``` * * @since 4.0.0 * @category models */ export interface TxPubSub extends Inspectable, Pipeable { readonly [TypeId]: typeof TypeId; readonly strategy: "bounded" | "unbounded" | "dropping" | "sliding"; readonly capacity: number; } /** * Creates a bounded TxPubSub with the specified capacity. When a subscriber's * queue is full, the publisher will retry the transaction until space is available. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.bounded(16) * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, 42) * const value = yield* TxQueue.take(sub) * console.log(value) // 42 * }) * ) * }) * ``` * * @since 4.0.0 * @category constructors */ export declare const bounded: (capacity: number) => Effect.Effect>; /** * Creates a dropping TxPubSub with the specified capacity. When a subscriber's * queue is full, the message is dropped for that subscriber. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.dropping(2) * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, 1) * yield* TxPubSub.publish(hub, 2) * yield* TxPubSub.publish(hub, 3) // dropped * const v1 = yield* TxQueue.take(sub) * const v2 = yield* TxQueue.take(sub) * console.log(v1, v2) // 1 2 * }) * ) * }) * ``` * * @since 4.0.0 * @category constructors */ export declare const dropping: (capacity: number) => Effect.Effect>; /** * Creates a sliding TxPubSub with the specified capacity. When a subscriber's * queue is full, the oldest message in that subscriber's queue is dropped. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.sliding(2) * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, 1) * yield* TxPubSub.publish(hub, 2) * yield* TxPubSub.publish(hub, 3) // evicts 1 * const v1 = yield* TxQueue.take(sub) * console.log(v1) // 2 * }) * ) * }) * ``` * * @since 4.0.0 * @category constructors */ export declare const sliding: (capacity: number) => Effect.Effect>; /** * Creates an unbounded TxPubSub with unlimited capacity. Messages are always accepted. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, "msg") * const msg = yield* TxQueue.take(sub) * console.log(msg) // "msg" * }) * ) * }) * ``` * * @since 4.0.0 * @category constructors */ export declare const unbounded: () => Effect.Effect>; /** * Returns the capacity of the TxPubSub. * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.bounded(16) * console.log(TxPubSub.capacity(hub)) // 16 * }) * ``` * * @since 4.0.0 * @category getters */ export declare const capacity: (self: TxPubSub) => number; /** * Returns the current number of messages across all subscriber queues (the max). * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, 1) * yield* TxPubSub.publish(hub, 2) * const s = yield* TxPubSub.size(hub) * console.log(s) // 2 * }) * ) * }) * ``` * * @since 4.0.0 * @category getters */ export declare const size: (self: TxPubSub) => Effect.Effect; /** * Checks if the TxPubSub has no pending messages (all subscriber queues are empty). * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * const empty = yield* TxPubSub.isEmpty(hub) * console.log(empty) // true * }) * ``` * * @since 4.0.0 * @category getters */ export declare const isEmpty: (self: TxPubSub) => Effect.Effect; /** * Checks if any subscriber queue is at capacity. * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.bounded(2) * const full = yield* TxPubSub.isFull(hub) * console.log(full) // false * }) * ``` * * @since 4.0.0 * @category getters */ export declare const isFull: (self: TxPubSub) => Effect.Effect; /** * Checks if the TxPubSub has been shut down. * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * console.log(yield* TxPubSub.isShutdown(hub)) // false * yield* TxPubSub.shutdown(hub) * console.log(yield* TxPubSub.isShutdown(hub)) // true * }) * ``` * * @since 4.0.0 * @category getters */ export declare const isShutdown: (self: TxPubSub) => Effect.Effect; /** * Publishes a message to all current subscribers. * * Returns `true` if the message was delivered to all subscribers, or `false` if * the hub is shut down or the message was dropped for any subscriber (dropping strategy). * * For bounded strategy, retries the transaction if any subscriber queue is full. * For sliding strategy, drops oldest messages in full subscriber queues. * For dropping strategy, drops the message for full subscriber queues and returns `false`. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * // No subscribers - publish is a no-op * const r1 = yield* TxPubSub.publish(hub, "no one listening") * console.log(r1) // true * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, "hello") * const msg = yield* TxQueue.take(sub) * console.log(msg) // "hello" * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ export declare const publish: { /** * Publishes a message to all current subscribers. * * Returns `true` if the message was delivered to all subscribers, or `false` if * the hub is shut down or the message was dropped for any subscriber (dropping strategy). * * For bounded strategy, retries the transaction if any subscriber queue is full. * For sliding strategy, drops oldest messages in full subscriber queues. * For dropping strategy, drops the message for full subscriber queues and returns `false`. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * // No subscribers - publish is a no-op * const r1 = yield* TxPubSub.publish(hub, "no one listening") * console.log(r1) // true * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, "hello") * const msg = yield* TxQueue.take(sub) * console.log(msg) // "hello" * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ (value: A): (self: TxPubSub) => Effect.Effect; /** * Publishes a message to all current subscribers. * * Returns `true` if the message was delivered to all subscribers, or `false` if * the hub is shut down or the message was dropped for any subscriber (dropping strategy). * * For bounded strategy, retries the transaction if any subscriber queue is full. * For sliding strategy, drops oldest messages in full subscriber queues. * For dropping strategy, drops the message for full subscriber queues and returns `false`. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * // No subscribers - publish is a no-op * const r1 = yield* TxPubSub.publish(hub, "no one listening") * console.log(r1) // true * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publish(hub, "hello") * const msg = yield* TxQueue.take(sub) * console.log(msg) // "hello" * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ (self: TxPubSub, value: A): Effect.Effect; }; /** * Publishes all messages from an iterable to all current subscribers. * * Returns `true` if all messages were delivered to all subscribers. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publishAll(hub, [1, 2, 3]) * const v1 = yield* TxQueue.take(sub) * const v2 = yield* TxQueue.take(sub) * const v3 = yield* TxQueue.take(sub) * console.log(v1, v2, v3) // 1 2 3 * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ export declare const publishAll: { /** * Publishes all messages from an iterable to all current subscribers. * * Returns `true` if all messages were delivered to all subscribers. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publishAll(hub, [1, 2, 3]) * const v1 = yield* TxQueue.take(sub) * const v2 = yield* TxQueue.take(sub) * const v3 = yield* TxQueue.take(sub) * console.log(v1, v2, v3) // 1 2 3 * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ (values: Iterable): (self: TxPubSub) => Effect.Effect; /** * Publishes all messages from an iterable to all current subscribers. * * Returns `true` if all messages were delivered to all subscribers. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub = yield* TxPubSub.subscribe(hub) * yield* TxPubSub.publishAll(hub, [1, 2, 3]) * const v1 = yield* TxQueue.take(sub) * const v2 = yield* TxQueue.take(sub) * const v3 = yield* TxQueue.take(sub) * console.log(v1, v2, v3) // 1 2 3 * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ (self: TxPubSub, values: Iterable): Effect.Effect; }; /** * Subscribes to the TxPubSub, returning a TxQueue that receives all messages * published after subscription. The subscription is automatically removed when * the scope is closed. * * @example * ```ts * import { Effect, TxPubSub, TxQueue } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * yield* Effect.scoped( * Effect.gen(function*() { * const sub1 = yield* TxPubSub.subscribe(hub) * const sub2 = yield* TxPubSub.subscribe(hub) * * yield* TxPubSub.publish(hub, "broadcast") * * const msg1 = yield* TxQueue.take(sub1) * const msg2 = yield* TxQueue.take(sub2) * console.log(msg1, msg2) // "broadcast" "broadcast" * }) * ) * }) * ``` * * @since 4.0.0 * @category mutations */ export declare const subscribe: (self: TxPubSub) => Effect.Effect, never, Scope.Scope>; /** * Creates a subscriber queue and registers it with the pub/sub. * * This is the transactional acquire step of `subscribe`, exposed so that * callers can compose it with other Tx operations in a single transaction * (e.g. `TxSubscriptionRef.changes`). * * @since 4.0.0 * @category mutations */ export declare const acquireSubscriber: (self: TxPubSub) => Effect.Effect, never, Effect.Transaction>; /** * Removes a subscriber queue from the pub/sub and shuts it down. * * This is the transactional release step of `subscribe`, exposed so that * callers can compose it with other Tx operations in a single transaction. * * @since 4.0.0 * @category mutations */ export declare const releaseSubscriber: { /** * Removes a subscriber queue from the pub/sub and shuts it down. * * This is the transactional release step of `subscribe`, exposed so that * callers can compose it with other Tx operations in a single transaction. * * @since 4.0.0 * @category mutations */ (queue: TxQueue.TxQueue): (self: TxPubSub) => Effect.Effect; /** * Removes a subscriber queue from the pub/sub and shuts it down. * * This is the transactional release step of `subscribe`, exposed so that * callers can compose it with other Tx operations in a single transaction. * * @since 4.0.0 * @category mutations */ (self: TxPubSub, queue: TxQueue.TxQueue): Effect.Effect; }; /** * Shuts down the TxPubSub and all subscriber queues. Subsequent publish operations * will return `false`. Subsequent subscribe operations will receive an already-shutdown queue. * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * yield* TxPubSub.shutdown(hub) * * const shut = yield* TxPubSub.isShutdown(hub) * console.log(shut) // true * * const accepted = yield* TxPubSub.publish(hub, 1) * console.log(accepted) // false * }) * ``` * * @since 4.0.0 * @category mutations */ export declare const shutdown: (self: TxPubSub) => Effect.Effect; /** * Waits for the TxPubSub to be shut down. * * @example * ```ts * import { Effect, TxPubSub } from "effect" * * const program = Effect.gen(function*() { * const hub = yield* TxPubSub.unbounded() * * const fiber = yield* Effect.forkChild(TxPubSub.awaitShutdown(hub)) * yield* TxPubSub.shutdown(hub) * yield* fiber.await * }) * ``` * * @since 4.0.0 * @category mutations */ export declare const awaitShutdown: (self: TxPubSub) => Effect.Effect; /** * Checks if the given value is a TxPubSub. * * @example * ```ts * import { TxPubSub } from "effect" * * declare const someValue: unknown * * if (TxPubSub.isTxPubSub(someValue)) { * console.log("This is a TxPubSub") * } * ``` * * @since 4.0.0 * @category guards */ export declare const isTxPubSub: (u: unknown) => u is TxPubSub; export {}; //# sourceMappingURL=TxPubSub.d.ts.map