Overview
The Inngest integration provides Effect-first wrappers for Inngest background functions. Create durable, type-safe background jobs with Effect’s structured concurrency and service management.Installation
Copy
bun add ff-effect effect inngest
Exports
Copy
import { createInngest, InngestError } from 'ff-effect/for/inngest';
createInngest
Create an Effect-based Inngest client wrapper.Signature
Copy
function createInngest<
TClient extends Inngest.Any,
E,
R,
T extends string = '@ff-effect/Inngest'
>(
createClient: Effect.Effect<TClient, E, R>,
opts?: { tagId?: T }
): {
Tag: Context.Tag<T, TClient>;
layer: Layer<T, E, R>;
createFunction: <TTrigger, A, EH, RH>(
config: FunctionConfig,
trigger: TTrigger,
handler: (ctx: HandlerContext) => Effect.Effect<A, EH, RH>
) => Effect.Effect<InngestFunction.Any, never, T | RH>;
send: (payload: EventPayload) => Effect.Effect<{ ids: string[] }, InngestError, T>;
fetchHandler: (opts: ServeOpts) => Effect.Effect<FetchHandler, never, T>;
httpHandler: (opts: ServeOpts) => Effect.Effect<HttpApp, never, T>;
}
Parameters
An Effect that yields an Inngest client instance.
Optional custom tag identifier for the Inngest client. Defaults to
'@ff-effect/Inngest'.Returns
Context tag for accessing the Inngest client.
Layer that provides the Inngest client to Effects.
Create an Inngest function with an Effect-based handler.
Send events to Inngest.
Create a fetch-compatible handler for serving Inngest functions.
Create an Effect HttpApp handler for serving Inngest functions.
Basic Usage
Setup
Copy
import { createInngest } from 'ff-effect/for/inngest';
import { Inngest } from 'inngest';
import { Effect } from 'effect';
const client = new Inngest({ id: 'my-app' });
const InngestClient = createInngest(
Effect.succeed(client)
);
Simple Function
Copy
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const helloFunction = yield* InngestClient.createFunction(
{ id: 'hello-world' },
{ event: 'app/hello' },
({ event }) =>
Effect.gen(function* () {
console.log('Hello', event.data.name);
return 'success';
})
);
return helloFunction;
}).pipe(
Effect.scoped,
Effect.provide(InngestClient.layer)
);
Event Handling
Sending Events
Copy
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const result = yield* InngestClient.send({
name: 'app/hello',
data: { name: 'Alice' }
});
console.log('Event sent with IDs:', result.ids);
}).pipe(Effect.provide(InngestClient.layer));
Type-Safe Events
Copy
import { Inngest, EventSchemas } from 'inngest';
import { Effect } from 'effect';
const client = new Inngest({
id: 'my-app',
schemas: new EventSchemas().fromRecord<{
'user/created': { data: { userId: string; email: string } };
'user/deleted': { data: { userId: string } };
}>()
});
const InngestClient = createInngest(Effect.succeed(client));
const onUserCreated = yield* InngestClient.createFunction(
{ id: 'on-user-created' },
{ event: 'user/created' },
({ event }) =>
Effect.gen(function* () {
// event.data is fully typed
const { userId, email } = event.data;
console.log(`New user: ${email}`);
})
);
Steps
Use thestep API for durable execution with automatic retries.
step.run
Execute an Effect as a step:Copy
const processOrderFunction = yield* InngestClient.createFunction(
{ id: 'process-order' },
{ event: 'order/created' },
({ event, step }) =>
Effect.gen(function* () {
// Each step is durably executed
const payment = yield* step.run('process-payment', () =>
Effect.gen(function* () {
// Process payment logic
return { transactionId: 'txn_123' };
})
);
const shipment = yield* step.run('create-shipment', () =>
Effect.gen(function* () {
// Create shipment
return { trackingNumber: 'TRACK_456' };
})
);
return { payment, shipment };
})
);
step.sleep
Sleep for a duration (uses Effect’sDuration):
Copy
import { Duration, Effect } from 'effect';
const reminderFunction = yield* InngestClient.createFunction(
{ id: 'send-reminder' },
{ event: 'reminder/scheduled' },
({ step }) =>
Effect.gen(function* () {
yield* step.sleep('wait-24h', Duration.hours(24));
yield* step.run('send-email', () =>
Effect.gen(function* () {
// Send reminder email
})
);
})
);
step.sleepUntil
Sleep until a specific time:Copy
const scheduledFunction = yield* InngestClient.createFunction(
{ id: 'scheduled-task' },
{ event: 'task/scheduled' },
({ event, step }) =>
Effect.gen(function* () {
const scheduledTime = new Date(event.data.scheduledAt);
yield* step.sleepUntil('wait-until-scheduled', scheduledTime);
yield* step.run('execute-task', () =>
Effect.sync(() => console.log('Task executed!'))
);
})
);
step.sendEvent
Send events from within a function:Copy
const cascadeFunction = yield* InngestClient.createFunction(
{ id: 'cascade-events' },
{ event: 'process/start' },
({ step }) =>
Effect.gen(function* () {
const result = yield* step.sendEvent('trigger-downstream', {
name: 'process/step1',
data: { value: 42 }
});
console.log('Triggered events:', result.ids);
})
);
step.invoke
Invoke another function and wait for its result:Copy
const orchestratorFunction = yield* InngestClient.createFunction(
{ id: 'orchestrator' },
{ event: 'orchestrate' },
({ step }) =>
Effect.gen(function* () {
const result = yield* step.invoke<{ status: string }>('call-worker', {
function: workerFunction,
data: { task: 'process-data' }
});
console.log('Worker result:', result.status);
})
);
step.waitForEvent
Wait for an event before continuing:Copy
const approvalFunction = yield* InngestClient.createFunction(
{ id: 'wait-for-approval' },
{ event: 'request/submitted' },
({ event, step }) =>
Effect.gen(function* () {
const approval = yield* step.waitForEvent<{ approved: boolean }>('wait-for-approval', {
event: 'request/approved',
timeout: '7d',
match: `data.requestId == "${event.data.requestId}"`
});
if (approval?.approved) {
yield* step.run('process-approved', () =>
Effect.sync(() => console.log('Request approved!'))
);
}
})
);
Cron Schedules
Use Effect’sCron for type-safe cron schedules:
Copy
import { Cron, Effect } from 'effect';
const dailyReportFunction = yield* InngestClient.createFunction(
{ id: 'daily-report' },
{ cron: Cron.unsafeParse('0 9 * * *') }, // Every day at 9 AM
({ step }) =>
Effect.gen(function* () {
yield* step.run('generate-report', () =>
Effect.gen(function* () {
// Generate and send daily report
})
);
})
);
Using Services
Inngest functions can access Effect services:Copy
import { Effect } from 'effect';
class EmailService extends Effect.Service<EmailService>()('EmailService', {
effect: Effect.succeed({
send: (to: string, subject: string, body: string) =>
Effect.tryPromise(() =>
fetch('/api/email', {
method: 'POST',
body: JSON.stringify({ to, subject, body })
})
)
})
}) {}
class Database extends Effect.Service<Database>()('Database', {
effect: Effect.succeed({
getUser: (id: string) => Effect.succeed({ id, email: 'user@example.com' })
})
}) {}
const program = Effect.gen(function* () {
const welcomeEmailFunction = yield* InngestClient.createFunction(
{ id: 'send-welcome-email' },
{ event: 'user/created' },
({ event, step }) =>
Effect.gen(function* () {
const db = yield* Database;
const email = yield* EmailService;
const user = yield* step.run('fetch-user', () =>
db.getUser(event.data.userId)
);
yield* step.run('send-email', () =>
email.send(
user.email,
'Welcome!',
'Thanks for signing up!'
)
);
})
);
return welcomeEmailFunction;
}).pipe(
Effect.scoped,
Effect.provide(InngestClient.layer),
Effect.provide(Database.Default),
Effect.provide(EmailService.Default)
);
Serving Functions
Fetch Handler (Bun, Deno, etc.)
Copy
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const fn1 = yield* InngestClient.createFunction(
{ id: 'function-1' },
{ event: 'app/event1' },
() => Effect.succeed('done')
);
const fn2 = yield* InngestClient.createFunction(
{ id: 'function-2' },
{ event: 'app/event2' },
() => Effect.succeed('done')
);
const handler = yield* InngestClient.fetchHandler({
functions: [fn1, fn2],
servePath: '/api/inngest'
});
// Use with Bun.serve, Deno.serve, etc.
return handler;
}).pipe(
Effect.scoped,
Effect.provide(InngestClient.layer)
);
const handler = await Effect.runPromise(program);
Bun.serve({
port: 3000,
fetch: handler
});
Effect HttpApp Handler
Copy
import { HttpServer } from '@effect/platform';
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const fn = yield* InngestClient.createFunction(
{ id: 'my-function' },
{ event: 'app/event' },
() => Effect.succeed('done')
);
const httpApp = yield* InngestClient.httpHandler({
functions: [fn]
});
yield* HttpServer.serve(httpApp);
}).pipe(
Effect.scoped,
Effect.provide(InngestClient.layer),
Effect.provide(HttpServer.layer)
);
Error Handling
InngestError
Copy
import { InngestError } from 'ff-effect/for/inngest';
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const result = yield* InngestClient.send({
name: 'app/event',
data: {}
}).pipe(
Effect.catchTag('ff-effect/InngestError', (error) =>
Effect.gen(function* () {
console.error('Failed to send event:', error.message);
return { ids: [] };
})
)
);
return result;
}).pipe(Effect.provide(InngestClient.layer));
Step Errors
Step errors are automatically retried by Inngest:Copy
const retryableFunction = yield* InngestClient.createFunction(
{ id: 'with-retry' },
{ event: 'retry/test' },
({ step }) =>
Effect.gen(function* () {
const result = yield* step.run('might-fail', () =>
Effect.gen(function* () {
// This will be retried automatically by Inngest
if (Math.random() < 0.5) {
return yield* Effect.fail(new Error('Random failure'));
}
return 'success';
})
);
return result;
})
);
Complete Example
Copy
import { createInngest } from 'ff-effect/for/inngest';
import { Inngest, EventSchemas } from 'inngest';
import { Effect, Duration, Cron } from 'effect';
// Setup client with typed events
const client = new Inngest({
id: 'order-processor',
schemas: new EventSchemas().fromRecord<{
'order/created': { data: { orderId: string; userId: string; amount: number } };
'order/shipped': { data: { orderId: string; trackingNumber: string } };
}>()
});
const InngestClient = createInngest(Effect.succeed(client));
// Services
class PaymentService extends Effect.Service<PaymentService>()('PaymentService', {
effect: Effect.succeed({
charge: (amount: number) => Effect.succeed({ transactionId: 'txn_123' })
})
}) {}
class ShippingService extends Effect.Service<ShippingService>()('ShippingService', {
effect: Effect.succeed({
createLabel: (orderId: string) => Effect.succeed({ trackingNumber: 'TRACK_456' })
})
}) {}
// Function
const program = Effect.gen(function* () {
const processOrder = yield* InngestClient.createFunction(
{ id: 'process-order' },
{ event: 'order/created' },
({ event, step }) =>
Effect.gen(function* () {
const payment = yield* PaymentService;
const shipping = yield* ShippingService;
// Charge payment
const transaction = yield* step.run('charge-payment', () =>
payment.charge(event.data.amount)
);
// Wait before shipping
yield* step.sleep('processing-time', Duration.minutes(5));
// Create shipping label
const label = yield* step.run('create-shipping-label', () =>
shipping.createLabel(event.data.orderId)
);
// Send shipped event
yield* step.sendEvent('notify-shipped', {
name: 'order/shipped',
data: {
orderId: event.data.orderId,
trackingNumber: label.trackingNumber
}
});
return { transaction, label };
})
);
const handler = yield* InngestClient.fetchHandler({
functions: [processOrder]
});
return handler;
}).pipe(
Effect.scoped,
Effect.provide(InngestClient.layer),
Effect.provide(PaymentService.Default),
Effect.provide(ShippingService.Default)
);
const handler = await Effect.runPromise(program);
Bun.serve({ port: 3000, fetch: handler });
See Also
- Inngest Documentation
- extract - Used internally for function handlers
- Examples - More usage patterns