Batching & Caching
Before we dig into the Effect's solution to batching and caching let's start with a description of the problem.
It is very common in apps to depend on a number of external data sources like:
- HTTP APIs
- Databases
- Filesystems
Model Definition
Let's start with a fairly minimal model description:
export interface User {
readonly _tag: "User"
readonly id: number
readonly name: string
readonly email: string
}
export class GetUserError {
readonly _tag = "GetUserError"
}
export interface Todo {
readonly _tag: "Todo"
readonly id: number
readonly message: string
readonly ownerId: number
}
export class GetTodosError {
readonly _tag = "GetTodosError"
}
export class SendEmailError {
readonly _tag = "SendEmailError"
}
In a real world scenario we may want to use a more precise types instead of directly using primitives for identifiers (see Branded Types). We may also want to include more information in the errors.
Classic Approach
Given such a model we usually write up functions to call some API (or database, etc.) like the following:
import { Effect } from "effect"
import * as Model from "./Model"
export const simulatedValidation = async <A>(
promise: Promise<Response>
): Promise<A> => {
// In a real world scenario we may not want to trust our APIs to actually return the expected data
return promise.then((res) => res.json() as Promise<A>)
}
// $ExpectType Effect<never, GetTodosError, Todo[]>
export const getTodos = Effect.tryPromise({
try: () =>
simulatedValidation<Array<Model.Todo>>(
fetch("https://api.example.demo/todos")
),
catch: () => new Model.GetTodosError()
})
// $ExpectType (id: number) => Effect<never, GetUserError, User>
export const getUserById = (id: number) =>
Effect.tryPromise({
try: () =>
simulatedValidation<Model.User>(
fetch(`https://api.example.demo/getUserById?id=${id}`)
),
catch: () => new Model.GetUserError()
})
// $ExpectType (address: string, text: string) => Effect<never, SendEmailError, void>
export const sendEmail = (address: string, text: string) =>
Effect.tryPromise({
try: () =>
simulatedValidation<void>(
fetch("https://api.example.demo/sendEmail", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({ address, text })
})
),
catch: () => new Model.SendEmailError()
})
// $ExpectType (id: number, message: string) => Effect<never, GetUserError | SendEmailError, void>
export const sendEmailToUser = (id: number, message: string) =>
Effect.flatMap(getUserById(id), (user) => sendEmail(user.email, message))
// $ExpectType (todo: Todo) => Effect<never, GetUserError | SendEmailError, void>
export const notifyOwner = (todo: Model.Todo) =>
Effect.flatMap(getUserById(todo.ownerId), (user) =>
sendEmailToUser(user.id, `hey ${user.name} you got a todo!`)
)
In a real world scenario we may not want to trust our APIs to actually return
the expected data - for doing this properly you can use @effect/schema
or
similar alternatives such as zod
.
When using the utilities we defined it is normal to end up with code that looks like the following:
import { Effect } from "effect"
import * as API from "./API"
// $ExpectType Effect<never, GetTodosError | GetUserError | SendEmailError, void>
const program = Effect.gen(function* (_) {
const todos = yield* _(API.getTodos)
yield* _(
Effect.forEach(todos, (todo) => API.notifyOwner(todo), {
concurrency: "unbounded"
})
)
})
Here we used the forEach
to repeat an effect for every Todo
and the effect repeated
first fetches the User
who owns the todo and then sends an email.
We like writing code this way because it is very expressive and very easy to read, but is it efficient?
This code will execute tons of individual API calls. Many Todo
s will likely have the same owner and
our APIs may also provide batched alternatives where you can request as many users as you would like to in one call.
So what can we do? Rewrite all our code to use a different form of API? Should we really do that?
Well not anymore.
Declaring Requests
Let's rewrite our example to be as efficient as possible - we'll start by writing a model for the requests that our data sources support:
import { Request } from "effect"
import * as Model from "./Model"
export interface GetTodos
extends Request.Request<Model.GetTodosError, Array<Model.Todo>> {
readonly _tag: "GetTodos"
}
export const GetTodos = Request.tagged<GetTodos>("GetTodos")
export interface GetUserById
extends Request.Request<Model.GetUserError, Model.User> {
readonly _tag: "GetUserById"
readonly id: number
}
export const GetUserById = Request.tagged<GetUserById>("GetUserById")
export interface SendEmail extends Request.Request<Model.SendEmailError, void> {
readonly _tag: "SendEmail"
readonly address: string
readonly text: string
}
export const SendEmail = Request.tagged<SendEmail>("SendEmail")
export type ApiRequest = GetTodos | GetUserById | SendEmail
Declaring Resolvers
Now that we have our requests defined it is time to tell Effect how to resolve those requests. That's where we would use a RequestResolver
.
Here we will define a single resolver per query. There is no right or wrong answer in how granular your resolvers should be but usually you will split up your resolvers based on which API calls can be batched.
import { Effect, RequestResolver, Request } from "effect"
import * as API from "./API"
import * as Model from "./Model"
import * as RequestModel from "./RequestModel"
// we assume we cannot batch GetTodos, we create a normal resolver
// $ExpectType RequestResolver<GetTodos, never>
export const GetTodosResolver = RequestResolver.fromEffect(
(request: RequestModel.GetTodos) =>
Effect.tryPromise({
try: () =>
API.simulatedValidation<Array<Model.Todo>>(
fetch("https://api.example.demo/todos")
),
catch: () => new Model.GetTodosError()
})
)
// we assume we can batch GetUserById, we create a batched resolver
// $ExpectType RequestResolver<GetUserById, never>
export const GetUserByIdResolver = RequestResolver.makeBatched(
(requests: Array<RequestModel.GetUserById>) =>
Effect.tryPromise({
try: () =>
API.simulatedValidation<Array<Model.User>>(
fetch("https://api.example.demo/getUserByIdBatch", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({ users: requests.map(({ id }) => ({ id })) })
})
),
catch: () => new Model.GetUserError()
}).pipe(
Effect.flatMap((users) =>
Effect.forEach(requests, (request, index) =>
Request.completeEffect(request, Effect.succeed(users[index]))
)
),
Effect.catchAll((error) =>
Effect.forEach(requests, (request) =>
Request.completeEffect(request, Effect.fail(error))
)
)
)
)
// we assume we can batch SendEmail, we create a batched resolver
// $ExpectType RequestResolver<SendEmail, never>
export const SendEmailResolver = RequestResolver.makeBatched(
(requests: Array<RequestModel.SendEmail>) =>
Effect.tryPromise({
try: () =>
fetch("https://api.example.demo/sendEmailBatch", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
emails: requests.map(({ address, text }) => ({ address, text }))
})
}).then((_) => _.json()),
catch: () => new Model.SendEmailError()
}).pipe(
Effect.flatMap(() =>
Effect.forEach(requests, (request) =>
Request.completeEffect(request, Effect.unit)
)
),
Effect.catchAll((error) =>
Effect.forEach(requests, (request) =>
Request.completeEffect(request, Effect.fail(error))
)
)
)
)
Resolvers can also access context like any other Effect
and there are many
different ways of creating resolvers. You may want to check the reference
documentation of the RequestResolver
module next.
Defining Queries
At this point we are ready to plug the pieces together! Let's do just that:
import { Effect } from "effect"
import * as Model from "./Model"
import * as RequestModel from "./RequestModel"
import * as Resolvers from "./Resolvers"
export const getTodos: Effect.Effect<
never,
Model.GetTodosError,
Array<Model.Todo>
> = Effect.request(RequestModel.GetTodos({}), Resolvers.GetTodosResolver)
export const getUserById = (id: number) =>
Effect.request(
RequestModel.GetUserById({ id }),
Resolvers.GetUserByIdResolver
)
export const sendEmail = (address: string, text: string) =>
Effect.request(
RequestModel.SendEmail({ address, text }),
Resolvers.SendEmailResolver
)
export const sendEmailToUser = (id: number, message: string) =>
Effect.flatMap(getUserById(id), (user) => sendEmail(user.email, message))
export const notifyOwner = (todo: Model.Todo) =>
Effect.flatMap(getUserById(todo.ownerId), (user) =>
sendEmailToUser(user.id, `hey ${user.name} you got a todo!`)
)
It looks like we are back at the beginning, same exact types and same exact composition.
But now the following program:
import { Effect } from "effect"
import * as Queries from "./Queries"
// $ExpectType Effect<never, GetTodosError | GetUserError | SendEmailError, void>
const program = Effect.gen(function* (_) {
const todos = yield* _(Queries.getTodos)
yield* _(
Effect.forEach(todos, (todo) => Queries.notifyOwner(todo), {
batching: true
})
)
})
Will only require 3 queries to be executed to our APIs instead of 1 + 2n where n is the number of todos.
Resolvers with Context
There may be cases where you want to access some context as part of the request resolver, in order for requests to be batchable the resolver they reference has to be the same so it is important to avoid over providing context to a resolver because having even two slightly different services makes the resolvers incompatible leading to no batching.
To avoid easy mistakes we decided to force the context of the resolver passed to Effect.request
to never
so that
you always have to specify how context is accessed.
Let's see how we would do it:
import { Effect, Context, Layer, RequestResolver } from "effect"
import * as API from "./API"
import * as Model from "./Model"
import * as RequestModel from "./RequestModel"
export interface HttpService {
fetch: typeof fetch
}
export const HttpService = Context.Tag<HttpService>(
Symbol.for("@app/services/HttpService")
)
export const HttpServiceLive = Layer.sync(HttpService, () =>
HttpService.of({ fetch })
)
export const GetTodosResolver: Effect.Effect<
HttpService,
never,
RequestResolver.RequestResolver<RequestModel.GetTodos, never>
> =
// we create a normal resolver like we did before
RequestResolver.fromEffect((request: RequestModel.GetTodos) =>
Effect.flatMap(HttpService, (http) =>
Effect.tryPromise({
try: () =>
API.simulatedValidation<Array<Model.Todo>>(
http.fetch("https://api.example.demo/todos")
),
catch: () => new Model.GetTodosError()
})
)
).pipe(
// we list the tags that the resolver can access
RequestResolver.contextFromServices(HttpService)
)
We can see now that the type of GetTodosResolver
is no longer a RequestResolver
but instead it is:
Effect<HttpService, never, RequestResolver<GetTodos, never>>
which is an Effect
that access the HttpService
and returns a composed resolver that has the minimal context ready to use.
Once we have such Effect
we can directly use it in our request definition:
import { Effect } from "effect"
import * as Model from "./Model"
import * as RequestModel from "./RequestModel"
import * as ResolversWithContext from "./ResolversWithContext"
export const getTodos: Effect.Effect<
ResolversWithContext.HttpService,
Model.GetTodosError,
Array<Model.Todo>
> = Effect.request(
RequestModel.GetTodos({}),
ResolversWithContext.GetTodosResolver
)
We can see that the Effect
correctly requires HttpService
to be provided.
Alternatively you can create RequestResolver
s as part of layers direcly accessing or closing over context from construction.
For example:
import { Effect, Context, Layer, RequestResolver } from "effect"
import * as API from "./API"
import * as Model from "./Model"
import * as RequestModel from "./RequestModel"
import * as ResolversWithContext from "./ResolversWithContext"
export interface TodosService {
getTodos: Effect.Effect<never, Model.GetTodosError, Array<Model.Todo>>
}
export const TodosService = Context.Tag<TodosService>(
Symbol.for("@app/services/TodosService")
)
export const TodosServiceLive = Layer.effect(
TodosService,
Effect.gen(function* ($) {
const http = yield* $(ResolversWithContext.HttpService)
const resolver = RequestResolver.fromEffect(
(request: RequestModel.GetTodos) =>
Effect.tryPromise({
try: () =>
API.simulatedValidation<Array<Model.Todo>>(
http.fetch("https://api.example.demo/todos")
),
catch: () => new Model.GetTodosError()
})
)
return {
getTodos: Effect.request(RequestModel.GetTodos({}), resolver)
}
})
)
export const getTodos: Effect.Effect<
TodosService,
Model.GetTodosError,
Array<Model.Todo>
> = Effect.flatMap(TodosService, (service) => service.getTodos)
This way is probably the best for most of the cases given that layers are the natural primitive where to wire services together.
Controlling Batching
Batching can be locally disabled using the Effect.withRequestBatching
utility in the following way:
import { Effect } from "effect"
import * as Queries from "./Queries"
// $ExpectType Effect<never, GetTodosError | GetUserError | SendEmailError, void>
const program = Effect.gen(function* (_) {
const todos = yield* _(Queries.getTodos)
yield* _(
Effect.forEach(todos, (todo) => Queries.notifyOwner(todo), {
concurrency: "unbounded"
})
)
}).pipe(Effect.withRequestBatching(false))
Request Caching
Up to this point we optimized how requests are executed but there is still a catch - we are not doing any caching.
This leads to request duplication...
Fortunately we also have a primitive for caching in Effect
and we use that to automatically cache requests.
import { Effect } from "effect"
import * as RequestModel from "./RequestModel"
import * as Resolvers from "./Resolvers"
export const getUserById = (id: number) =>
Effect.request(
RequestModel.GetUserById({ id }),
Resolvers.GetUserByIdResolver
).pipe(Effect.withRequestCaching(true))
Final Program
Assuming you've wired everything up correctly:
import { Effect, Schedule } from "effect"
import * as Queries from "./Queries"
// $ExpectType Effect<never, GetTodosError | GetUserError | SendEmailError, number>
const program = Effect.gen(function* (_) {
const todos = yield* _(Queries.getTodos)
yield* _(
Effect.forEach(todos, (todo) => Queries.notifyOwner(todo), {
concurrency: "unbounded"
})
)
}).pipe(Effect.repeat(Schedule.fixed("10 seconds")))
With this program, the getTodos
operation retrieves the todos for each user. Then, the Effect.forEach
function is used to notify the owner of each todo concurrently, without waiting for the notifications to complete.
The repeat
function is applied to the entire chain of operations, and it ensures that the program repeats every 10 seconds using a fixed schedule. This means that the entire process, including fetching todos and sending notifications, will be executed repeatedly with a 10-second interval.
The program incorporates a caching mechanism, which prevents the same GetUserById
operation from being executed more than once within a span of 1 minute (unless there are more than 65,000 users). This default caching behavior helps optimize the program's execution and reduces unnecessary requests to fetch user data.
Furthermore, the program is designed to send emails in batches, allowing for efficient processing and better utilization of resources.
The Request Cache
There may be cases where you want to localize a cache (use a cache only for a part of your program) or maybe you want a global cache with a different setup, or a mix of both.
To cover those scenarios you'd create a custom cache like the following:
import { Effect, Schedule, Layer, Request } from "effect"
import * as Queries from "./Queries"
// $ExpectType Effect<never, GetTodosError | GetUserError | SendEmailError, number>
const program = Effect.gen(function* (_) {
const todos = yield* _(Queries.getTodos)
yield* _(
Effect.forEach(todos, (todo) => Queries.notifyOwner(todo), {
concurrency: "unbounded"
})
)
}).pipe(
Effect.repeat(Schedule.fixed("10 seconds")),
Effect.provide(
Layer.setRequestCache(
Request.makeCache({ capacity: 256, timeToLive: "60 minutes" })
)
)
)
Alternatively you can also directly construct a cache with Request.makeCache({ capacity: 256, timeToLive: "60 minutes" })
and then use
Effect.withRequestCache(program, myCache)
on a program to make sure the requests generated from that program uses
the custom cache (when enabled with Effect.withRequestCaching(true)
).
How is this possible?
We recently introduced a new key primitive in the fiber that enables an execution to pause when it sees the program requires a request. In the process of pausing, the fiber will reify its stack into a continuation that can be externally performed.
import { Effect } from "effect"
import * as Queries from "./Queries"
const program = Effect.gen(function* (_) {
const todos = yield* _(Queries.getTodos)
yield* _(
Effect.forEach(todos, (todo) => Queries.notifyOwner(todo), {
concurrency: "unbounded"
})
)
})
const nextStep = Effect.step(program).pipe(
Effect.flatMap((step) => {
switch (step._op) {
// the program is blocked on a number of requests
case "Blocked": {
const requests = step.i0
const continuation = step.i1
return shipRequestsToBeExecutedAndWait(requests).pipe(
Effect.flatMap(() => continuation)
)
}
// the program completed
case "Success":
case "Failure": {
return step
}
}
})
)
const shipRequestsToBeExecutedAndWait = <E, A>(
requests: Effect.Blocked<E, A>["i0"]
): Effect.Effect<never, E, void> => {
// go on mars and come back
return Effect.unit
}
By using the functions provided by the RequestBlock
module, you can combine requests from multiple blocked
effects. By using the function Effect.blocked(requests, continuation)
, you can express an effect that is blocked
on requests
that should continue with continuation
.
Using Cache Directly
There are many cases where you have functions (key: Key) => Effect<R, E, A>
that you would like to cache
and not necessarily every case is a good fit for the request model shown above. For example, non-batchable API
calls or intensive work.
Let's see how we would go about using cache:
import { Effect, Cache } from "effect"
declare const intensiveWork: (
key: string
) => Effect.Effect<never, never, string>
const program = Effect.gen(function* ($) {
const cache = yield* $(
Cache.make({
capacity: Number.MAX_SAFE_INTEGER,
timeToLive: Infinity,
lookup: intensiveWork
})
)
const a0 = yield* $(cache.get("key0"))
const b0 = yield* $(cache.get("key1"))
const a1 = yield* $(cache.get("key0"))
const b1 = yield* $(cache.get("key1"))
if (a0 === a1 && b0 === b1) {
console.log("I'll always end up here....")
}
})
In order for the cache to correctly compare two Key
values if you are not
using primitives (e.g. string
, boolean
, number
), you should use types
that implement the Equal
interface.
There are many more methods available in the Cache
module. As a next step, check out the reference docs (opens in a new tab)!