Creating Sinks
In the world of streams, sinks are used to consume and process the elements of a stream. Here, we will introduce some common sink constructors that allow you to create sinks for specific tasks.
Common Constructors
head
The head
sink creates a sink that captures the first element of a stream. If the stream is empty, it returns None
.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Option<number>>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.head()))
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 1
}
*/
last
The last
sink consumes all elements of a stream and returns the last element of the stream.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Option<number>>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.last()))
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 4
}
*/
count
The count
sink consumes all elements of the stream and counts the number of elements fed to it.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.count))
Effect.runPromise(effect).then(console.log)
/*
Output:
4
*/
sum
The sum
sink consumes all elements of the stream and sums incoming numeric values.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.sum))
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/
take
The take
sink takes the specified number of values from the stream and results in a Chunk
data type.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.take(3)))
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/
drain
The drain
sink ignores its inputs, effectively discarding them.
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, void>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.drain))
Effect.runPromise(effect).then(console.log)
/*
Output:
undefined
*/
timed
The timed
sink executes the stream and measures its execution time, providing the duration.
import { Stream, Schedule, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Duration>
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.run(Sink.timed)
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Duration",
_tag: "Millis",
millis: 523
}
*/
forEach
The forEach
sink executes the provided effectful function for every element fed to it.
import { Stream, Sink, Console, Effect } from "effect"
// $ExpectType Effect<never, never, void>
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.forEach(Console.log))
)
Effect.runPromise(effect).then(console.log)
/*
Output:
1
2
3
4
undefined
*/
From Success and Failure
In the realm of data streams, similar to crafting streams to hold and manipulate data, we can also create sinks using the Sink.fail
and Sink.succeed
functions.
Succeeding Sink
Let's start with a sink that doesn't consume any elements from its upstream but instead succeeds with a numeric value:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.succeed(0)))
Effect.runPromise(effect).then(console.log)
/*
Output:
0
*/
Failing Sink
Now, consider a sink that also doesn't consume any elements from its upstream but deliberately fails, generating an error message of type string
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, string, never>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.fail("fail!")))
Effect.runPromise(effect).then(console.log, console.error)
/*
Output:
{
_id: "FiberFailure",
cause: {
_id: "Cause",
_tag: "Fail",
failure: "fail!"
}
}
*/
Collecting
Collecting All Elements
To gather all the elements from a data stream into a Chunk
, you can employ the Sink.collectAll()
function:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.collectAll()))
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/
Collecting into a HashSet
If you want to accumulate the elements into a HashSet
, you can use Sink.collectAllToSet()
. This function ensures that each element appears only once in the resulting set:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, HashSet<number>>
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSet())
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3, 4 ]
}
*/
Collecting into a HashMap
For more advanced collection needs, you can use Sink.collectAllToMap()
. This function allows you to accumulate and merge elements into a HashMap<K, A>
using a specified merge function. In the following example, we determine map keys with (n) => n % 3
and merge elements with the same key using (a, b) => a + b
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, HashMap<number, number>>
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMap(
(n) => n % 3,
(a, b) => a + b
)
)
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]
]
}
*/
Collecting a Specified Number
If you only want to collect a specific number of elements from a stream into a Chunk
, you can use Sink.collectAllN(n)
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const effect = Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAllN(3)))
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/
Collecting While Meeting a Condition
To accumulate elements as long as they satisfy a specific condition, you can use Sink.collectAllWhile(predicate)
. This function will keep gathering elements until the predicate returns false
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, Chunk<number>>
const effect = Stream.make(1, 2, 0, 4, 0, 6, 7).pipe(
Stream.run(Sink.collectAllWhile((n) => n !== 0))
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2 ]
}
*/
Collecting into HashSets of a Specific Size
For more controlled collection into sets with a maximum size of n
, you can utilize Sink.collectAllToSetN(n)
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, HashSet<number>>
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSetN(3))
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3 ]
}
*/
Collecting into HashMaps with Limited Keys
If you need to accumulate elements into maps with a maximum of n
keys, you can employ Sink.collectAllToMapN(n, keyFunction, mergeFunction)
:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, HashMap<number, number>>
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMapN(
3,
(n) => n,
(a, b) => a + b
)
)
)
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]
]
}
*/
Folding
Folding Left
Imagine you have a stream of numbers, and you want to reduce them into a single value by applying an operation to each element sequentially. You can achieve this using the Sink.foldLeft
function:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.foldLeft(0, (a, b) => a + b))
)
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/
Folding with Termination
In some cases, you may want to fold elements in a stream but stop the folding process when a certain condition is met. This is called "short-circuiting." You can achieve this using the Sink.fold
function, which allows you to specify a termination predicate:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.iterate(0, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum <= 10,
(a, b) => a + b
)
)
)
Effect.runPromise(effect).then(console.log)
/*
Output:
15
*/
Folding with Weighted Elements
Sometimes, you may want to fold elements based on their weight or cost, accumulating them until a certain maximum cost is reached. You can do this using Sink.foldWeighted
. In the following example, we group elements based on a weight of 1, restarting the folding process when the total weight reaches 3:
import { Stream, Sink, Chunk, Effect } from "effect"
// $ExpectType Stream<never, never, Chunk<number>>
const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<number>(),
maxCost: 3,
cost: () => 1,
body: (acc, el) => Chunk.append(acc, el)
})
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 3, 2, 4 ]
}, {
_id: "Chunk",
values: [ 1, 5, 6 ]
}, {
_id: "Chunk",
values: [ 2, 1, 3 ]
}, {
_id: "Chunk",
values: [ 5, 6 ]
}
]
}
*/
Folding Until a Limit
If you want to fold elements up to a specific limit, you can use Sink.foldUntil
. In the following example, we fold elements until we have accumulated 3 of them:
import { Stream, Sink, Effect } from "effect"
// $ExpectType Effect<never, never, number>
const effect = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
Stream.run(Sink.foldUntil(0, 3, (a, b) => a + b))
)
Effect.runPromise(effect).then(console.log)
/*
Output:
6
*/