Skip to main content

Overview

The Inngest integration provides Effect-first wrappers for Inngest background functions. Create durable, type-safe background jobs with Effect’s structured concurrency and service management.

Installation

bun add ff-effect effect inngest

Exports

import { createInngest, InngestError } from 'ff-effect/for/inngest';

createInngest

Create an Effect-based Inngest client wrapper.

Signature

function createInngest<
  TClient extends Inngest.Any,
  E,
  R,
  T extends string = '@ff-effect/Inngest'
>(
  createClient: Effect.Effect<TClient, E, R>,
  opts?: { tagId?: T }
): {
  Tag: Context.Tag<T, TClient>;
  layer: Layer<T, E, R>;
  createFunction: <TTrigger, A, EH, RH>(
    config: FunctionConfig,
    trigger: TTrigger,
    handler: (ctx: HandlerContext) => Effect.Effect<A, EH, RH>
  ) => Effect.Effect<InngestFunction.Any, never, T | RH>;
  send: (payload: EventPayload) => Effect.Effect<{ ids: string[] }, InngestError, T>;
  fetchHandler: (opts: ServeOpts) => Effect.Effect<FetchHandler, never, T>;
  httpHandler: (opts: ServeOpts) => Effect.Effect<HttpApp, never, T>;
}

Parameters

createClient
Effect.Effect<Inngest, E, R>
required
An Effect that yields an Inngest client instance.
opts.tagId
string
Optional custom tag identifier for the Inngest client. Defaults to '@ff-effect/Inngest'.

Returns

Tag
Context.Tag
Context tag for accessing the Inngest client.
layer
Layer
Layer that provides the Inngest client to Effects.
createFunction
function
Create an Inngest function with an Effect-based handler.
send
function
Send events to Inngest.
fetchHandler
function
Create a fetch-compatible handler for serving Inngest functions.
httpHandler
function
Create an Effect HttpApp handler for serving Inngest functions.

Basic Usage

Setup

import { createInngest } from 'ff-effect/for/inngest';
import { Inngest } from 'inngest';
import { Effect } from 'effect';

const client = new Inngest({ id: 'my-app' });

const InngestClient = createInngest(
  Effect.succeed(client)
);

Simple Function

import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const helloFunction = yield* InngestClient.createFunction(
    { id: 'hello-world' },
    { event: 'app/hello' },
    ({ event }) =>
      Effect.gen(function* () {
        console.log('Hello', event.data.name);
        return 'success';
      })
  );
  
  return helloFunction;
}).pipe(
  Effect.scoped,
  Effect.provide(InngestClient.layer)
);

Event Handling

Sending Events

import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const result = yield* InngestClient.send({
    name: 'app/hello',
    data: { name: 'Alice' }
  });
  
  console.log('Event sent with IDs:', result.ids);
}).pipe(Effect.provide(InngestClient.layer));

Type-Safe Events

import { Inngest, EventSchemas } from 'inngest';
import { Effect } from 'effect';

const client = new Inngest({
  id: 'my-app',
  schemas: new EventSchemas().fromRecord<{
    'user/created': { data: { userId: string; email: string } };
    'user/deleted': { data: { userId: string } };
  }>()
});

const InngestClient = createInngest(Effect.succeed(client));

const onUserCreated = yield* InngestClient.createFunction(
  { id: 'on-user-created' },
  { event: 'user/created' },
  ({ event }) =>
    Effect.gen(function* () {
      // event.data is fully typed
      const { userId, email } = event.data;
      console.log(`New user: ${email}`);
    })
);

Steps

Use the step API for durable execution with automatic retries.

step.run

Execute an Effect as a step:
const processOrderFunction = yield* InngestClient.createFunction(
  { id: 'process-order' },
  { event: 'order/created' },
  ({ event, step }) =>
    Effect.gen(function* () {
      // Each step is durably executed
      const payment = yield* step.run('process-payment', () =>
        Effect.gen(function* () {
          // Process payment logic
          return { transactionId: 'txn_123' };
        })
      );
      
      const shipment = yield* step.run('create-shipment', () =>
        Effect.gen(function* () {
          // Create shipment
          return { trackingNumber: 'TRACK_456' };
        })
      );
      
      return { payment, shipment };
    })
);

step.sleep

Sleep for a duration (uses Effect’s Duration):
import { Duration, Effect } from 'effect';

const reminderFunction = yield* InngestClient.createFunction(
  { id: 'send-reminder' },
  { event: 'reminder/scheduled' },
  ({ step }) =>
    Effect.gen(function* () {
      yield* step.sleep('wait-24h', Duration.hours(24));
      
      yield* step.run('send-email', () =>
        Effect.gen(function* () {
          // Send reminder email
        })
      );
    })
);

step.sleepUntil

Sleep until a specific time:
const scheduledFunction = yield* InngestClient.createFunction(
  { id: 'scheduled-task' },
  { event: 'task/scheduled' },
  ({ event, step }) =>
    Effect.gen(function* () {
      const scheduledTime = new Date(event.data.scheduledAt);
      yield* step.sleepUntil('wait-until-scheduled', scheduledTime);
      
      yield* step.run('execute-task', () =>
        Effect.sync(() => console.log('Task executed!'))
      );
    })
);

step.sendEvent

Send events from within a function:
const cascadeFunction = yield* InngestClient.createFunction(
  { id: 'cascade-events' },
  { event: 'process/start' },
  ({ step }) =>
    Effect.gen(function* () {
      const result = yield* step.sendEvent('trigger-downstream', {
        name: 'process/step1',
        data: { value: 42 }
      });
      
      console.log('Triggered events:', result.ids);
    })
);

step.invoke

Invoke another function and wait for its result:
const orchestratorFunction = yield* InngestClient.createFunction(
  { id: 'orchestrator' },
  { event: 'orchestrate' },
  ({ step }) =>
    Effect.gen(function* () {
      const result = yield* step.invoke<{ status: string }>('call-worker', {
        function: workerFunction,
        data: { task: 'process-data' }
      });
      
      console.log('Worker result:', result.status);
    })
);

step.waitForEvent

Wait for an event before continuing:
const approvalFunction = yield* InngestClient.createFunction(
  { id: 'wait-for-approval' },
  { event: 'request/submitted' },
  ({ event, step }) =>
    Effect.gen(function* () {
      const approval = yield* step.waitForEvent<{ approved: boolean }>('wait-for-approval', {
        event: 'request/approved',
        timeout: '7d',
        match: `data.requestId == "${event.data.requestId}"`
      });
      
      if (approval?.approved) {
        yield* step.run('process-approved', () =>
          Effect.sync(() => console.log('Request approved!'))
        );
      }
    })
);

Cron Schedules

Use Effect’s Cron for type-safe cron schedules:
import { Cron, Effect } from 'effect';

const dailyReportFunction = yield* InngestClient.createFunction(
  { id: 'daily-report' },
  { cron: Cron.unsafeParse('0 9 * * *') }, // Every day at 9 AM
  ({ step }) =>
    Effect.gen(function* () {
      yield* step.run('generate-report', () =>
        Effect.gen(function* () {
          // Generate and send daily report
        })
      );
    })
);

Using Services

Inngest functions can access Effect services:
import { Effect } from 'effect';

class EmailService extends Effect.Service<EmailService>()('EmailService', {
  effect: Effect.succeed({
    send: (to: string, subject: string, body: string) =>
      Effect.tryPromise(() =>
        fetch('/api/email', {
          method: 'POST',
          body: JSON.stringify({ to, subject, body })
        })
      )
  })
}) {}

class Database extends Effect.Service<Database>()('Database', {
  effect: Effect.succeed({
    getUser: (id: string) => Effect.succeed({ id, email: 'user@example.com' })
  })
}) {}

const program = Effect.gen(function* () {
  const welcomeEmailFunction = yield* InngestClient.createFunction(
    { id: 'send-welcome-email' },
    { event: 'user/created' },
    ({ event, step }) =>
      Effect.gen(function* () {
        const db = yield* Database;
        const email = yield* EmailService;
        
        const user = yield* step.run('fetch-user', () =>
          db.getUser(event.data.userId)
        );
        
        yield* step.run('send-email', () =>
          email.send(
            user.email,
            'Welcome!',
            'Thanks for signing up!'
          )
        );
      })
  );
  
  return welcomeEmailFunction;
}).pipe(
  Effect.scoped,
  Effect.provide(InngestClient.layer),
  Effect.provide(Database.Default),
  Effect.provide(EmailService.Default)
);

Serving Functions

Fetch Handler (Bun, Deno, etc.)

import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const fn1 = yield* InngestClient.createFunction(
    { id: 'function-1' },
    { event: 'app/event1' },
    () => Effect.succeed('done')
  );
  
  const fn2 = yield* InngestClient.createFunction(
    { id: 'function-2' },
    { event: 'app/event2' },
    () => Effect.succeed('done')
  );
  
  const handler = yield* InngestClient.fetchHandler({
    functions: [fn1, fn2],
    servePath: '/api/inngest'
  });
  
  // Use with Bun.serve, Deno.serve, etc.
  return handler;
}).pipe(
  Effect.scoped,
  Effect.provide(InngestClient.layer)
);

const handler = await Effect.runPromise(program);

Bun.serve({
  port: 3000,
  fetch: handler
});

Effect HttpApp Handler

import { HttpServer } from '@effect/platform';
import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const fn = yield* InngestClient.createFunction(
    { id: 'my-function' },
    { event: 'app/event' },
    () => Effect.succeed('done')
  );
  
  const httpApp = yield* InngestClient.httpHandler({
    functions: [fn]
  });
  
  yield* HttpServer.serve(httpApp);
}).pipe(
  Effect.scoped,
  Effect.provide(InngestClient.layer),
  Effect.provide(HttpServer.layer)
);

Error Handling

InngestError

import { InngestError } from 'ff-effect/for/inngest';
import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const result = yield* InngestClient.send({
    name: 'app/event',
    data: {}
  }).pipe(
    Effect.catchTag('ff-effect/InngestError', (error) =>
      Effect.gen(function* () {
        console.error('Failed to send event:', error.message);
        return { ids: [] };
      })
    )
  );
  
  return result;
}).pipe(Effect.provide(InngestClient.layer));

Step Errors

Step errors are automatically retried by Inngest:
const retryableFunction = yield* InngestClient.createFunction(
  { id: 'with-retry' },
  { event: 'retry/test' },
  ({ step }) =>
    Effect.gen(function* () {
      const result = yield* step.run('might-fail', () =>
        Effect.gen(function* () {
          // This will be retried automatically by Inngest
          if (Math.random() < 0.5) {
            return yield* Effect.fail(new Error('Random failure'));
          }
          return 'success';
        })
      );
      
      return result;
    })
);

Complete Example

import { createInngest } from 'ff-effect/for/inngest';
import { Inngest, EventSchemas } from 'inngest';
import { Effect, Duration, Cron } from 'effect';

// Setup client with typed events
const client = new Inngest({
  id: 'order-processor',
  schemas: new EventSchemas().fromRecord<{
    'order/created': { data: { orderId: string; userId: string; amount: number } };
    'order/shipped': { data: { orderId: string; trackingNumber: string } };
  }>()
});

const InngestClient = createInngest(Effect.succeed(client));

// Services
class PaymentService extends Effect.Service<PaymentService>()('PaymentService', {
  effect: Effect.succeed({
    charge: (amount: number) => Effect.succeed({ transactionId: 'txn_123' })
  })
}) {}

class ShippingService extends Effect.Service<ShippingService>()('ShippingService', {
  effect: Effect.succeed({
    createLabel: (orderId: string) => Effect.succeed({ trackingNumber: 'TRACK_456' })
  })
}) {}

// Function
const program = Effect.gen(function* () {
  const processOrder = yield* InngestClient.createFunction(
    { id: 'process-order' },
    { event: 'order/created' },
    ({ event, step }) =>
      Effect.gen(function* () {
        const payment = yield* PaymentService;
        const shipping = yield* ShippingService;
        
        // Charge payment
        const transaction = yield* step.run('charge-payment', () =>
          payment.charge(event.data.amount)
        );
        
        // Wait before shipping
        yield* step.sleep('processing-time', Duration.minutes(5));
        
        // Create shipping label
        const label = yield* step.run('create-shipping-label', () =>
          shipping.createLabel(event.data.orderId)
        );
        
        // Send shipped event
        yield* step.sendEvent('notify-shipped', {
          name: 'order/shipped',
          data: {
            orderId: event.data.orderId,
            trackingNumber: label.trackingNumber
          }
        });
        
        return { transaction, label };
      })
  );
  
  const handler = yield* InngestClient.fetchHandler({
    functions: [processOrder]
  });
  
  return handler;
}).pipe(
  Effect.scoped,
  Effect.provide(InngestClient.layer),
  Effect.provide(PaymentService.Default),
  Effect.provide(ShippingService.Default)
);

const handler = await Effect.runPromise(program);
Bun.serve({ port: 3000, fetch: handler });

See Also