Queue
A Queue
is a lightweight in-memory queue built on Effect with composable and transparent back-pressure.
It is fully asynchronous (no locks or blocking), purely-functional and type-safe.
Basic Operations
A Queue<A>
stores values of type A
and provides two fundamental operations:
Queue.offer
: This operation adds a value of typeA
to theQueue
.Queue.take
: It removes and returns the oldest value from theQueue
.
Here's an example demonstrating these basic operations:
import { Effect, Queue } from "effect"
// $ExpectType Effect<never, never, number>
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
yield* _(Queue.offer(queue, 1)) // Add 1 to the queue
const value = yield* _(Queue.take(queue)) // Retrieve and remove the oldest value
return value
})
Effect.runPromise(program).then(console.log) // Output: 1
Creating a Queue
A Queue
can have bounded (limited capacity) or unbounded storage. Depending on your requirements, you can choose from various strategies to handle new values when the queue reaches its capacity.
Bounded Queue
A bounded queue provides back-pressure when it's full. This means that if the queue is full, any attempt to add more items will be suspended until there's space available.
import { Queue } from "effect"
// Creating a bounded queue with a capacity of 100
const boundedQueue = Queue.bounded<number>(100)
Dropping Queue
A dropping queue simply drops new items when it's full. It doesn't wait for space to become available.
import { Queue } from "effect"
// Creating a dropping queue with a capacity of 100
const droppingQueue = Queue.dropping<number>(100)
Sliding Queue
A sliding queue removes old items when it's full to accommodate new ones.
import { Queue } from "effect"
// Creating a sliding queue with a capacity of 100
const slidingQueue = Queue.sliding<number>(100)
Unbounded Queue
An unbounded queue has no capacity limit.
import { Queue } from "effect"
// Creating an unbounded queue
const unboundedQueue = Queue.unbounded<number>()
Adding Items to a Queue
To add a value to the queue, you can use the Queue.offer
operation:
import { Effect, Queue } from "effect"
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
yield* _(Queue.offer(queue, 1)) // put 1 in the queue
})
If you're using a back-pressured queue and it's full, the offer
operation might suspend. In such cases, you can use Effect.fork
to wait in a different execution context (fiber).
import { Effect, Queue, Fiber } from "effect"
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(1))
yield* _(Queue.offer(queue, 1))
const fiber = yield* _(Effect.fork(Queue.offer(queue, 2))) // will be suspended because the queue is full
yield* _(Queue.take(queue))
yield* _(Fiber.join(fiber))
})
You can also add multiple values at once using Queue.offerAll
:
import { Effect, Queue, ReadonlyArray } from "effect"
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
const items = ReadonlyArray.range(1, 10)
yield* _(Queue.offerAll(queue, items))
return yield* _(Queue.size(queue))
})
Effect.runPromise(program).then(console.log) // Output: 10
Consuming Items from a Queue
The Queue.take
operation removes the oldest item from the queue and returns it. If the queue is empty, it will suspend and resume only when an item is added to the queue. You can also use Effect.fork
to wait for the value in a different execution context (fiber).
import { Effect, Queue, Fiber } from "effect"
const oldestItem = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<string>(100))
const fiber = yield* _(Effect.fork(Queue.take(queue))) // will be suspended because the queue is empty
yield* _(Queue.offer(queue, "something"))
const value = yield* _(Fiber.join(fiber))
return value
})
Effect.runPromise(oldestItem).then(console.log) // Output: something
You can retrieve the first item using Queue.poll
. If the queue is empty, you'll get None
; otherwise, the top item will be wrapped in Some
.
import { Effect, Queue } from "effect"
// $ExpectType Effect<never, never, Option<number>>
const polled = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
yield* _(Queue.offer(queue, 10))
yield* _(Queue.offer(queue, 20))
const head = yield* _(Queue.poll(queue))
return head
})
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 10
}
*/
You can retrieve multiple items at once using Queue.takeUpTo
. If the queue doesn't have enough items to return, it will return all the available items without waiting for more offers.
import { Effect, Queue } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const polled = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
yield* _(Queue.offer(queue, 10))
yield* _(Queue.offer(queue, 20))
yield* _(Queue.offer(queue, 30))
const chunk = yield* _(Queue.takeUpTo(queue, 2))
return chunk
})
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20 ]
}
*/
Similarly, you can retrieve all items at once using Queue.takeAll
. It returns immediately, providing an empty collection if the queue is empty.
import { Effect, Queue } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const polled = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(100))
yield* _(Queue.offer(queue, 10))
yield* _(Queue.offer(queue, 20))
yield* _(Queue.offer(queue, 30))
const chunk = yield* _(Queue.takeAll(queue))
return chunk
})
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20, 30 ]
}
*/
Shutting Down a Queue
With Queue.shutdown
, you can interrupt all fibers that are suspended on offer*
or take*
. It also empties the queue and causes all future offer*
and take*
calls to terminate immediately.
import { Effect, Queue, Fiber } from "effect"
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(3))
const fiber = yield* _(Effect.fork(Queue.take(queue)))
yield* _(Queue.shutdown(queue)) // will interrupt fiber
yield* _(Fiber.join(fiber)) // will terminate
})
You can use Queue.awaitShutdown
to execute an effect when the queue is shut down. This function waits until the queue is shut down, and if it's already shut down, it resumes immediately.
import { Effect, Queue, Fiber, Console } from "effect"
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<number>(3))
const fiber = yield* _(
Effect.fork(
Queue.awaitShutdown(queue).pipe(
Effect.flatMap(() => Console.log("shutting down"))
)
)
)
yield* _(Queue.shutdown(queue))
yield* _(Fiber.join(fiber))
})
Effect.runPromise(program) // Output: shutting down
Offer-only / Take-only Queues
In some situations, you may need specific parts of your code to have exclusive capabilities, such as only offering values (Enqueue
) or only taking values (Dequeue
) from a queue. Effect provides a straightforward way to achieve this.
All operations related to offering values are defined by the Enqueue
interface. Here's an example of how to use it:
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
// This enqueue can only be used to offer values
// @ts-expect-error
Queue.take(offerOnlyQueue)
// Ok
return Queue.offer(offerOnlyQueue, value)
}
Similarly, all operations related to taking values are defined by the Dequeue
interface. Here's an example:
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
// This dequeue can only be used to take values
// @ts-expect-error
Queue.offer(takeOnlyQueue, 1)
// Ok
return Queue.take(takeOnlyQueue)
}
The Queue
type extends both Enqueue
and Dequeue
, allowing you to conveniently pass it to different parts of your code where you want to enforce either Enqueue
or Dequeue
behavior:
const program = Effect.gen(function* (_) {
const queue = yield* _(Queue.unbounded<number>())
// Offer values to the queue
yield* _(send(queue, 1))
yield* _(send(queue, 2))
// Take values from the queue
console.log(yield* _(receive(queue)))
console.log(yield* _(receive(queue)))
})
Effect.runPromise(program)
/*
Output:
1
2
*/