Skip to main content

Overview

The Drizzle provider is a production-ready implementation of the ConversationStore interface using PostgreSQL and Drizzle ORM.

Installation

Install the required dependencies:
bun add ff-ai drizzle-orm postgres

Database Schema

The provider uses two tables in the ff_ai schema:

threads Table

Stores conversation threads:
export const threads = schema.table(
  'threads',
  {
    id: bigserial({ mode: 'number' }).primaryKey(),        // Internal PK
    publicId: text().notNull(),                            // Your threadId
    resourceId: text().notNull(),                          // Your resourceId
    createdAt: timestamp().defaultNow().notNull(),
    updatedAt: timestamp().defaultNow().notNull(),
  },
  (table) => ({
    uniqueResourcePublicId: unique().on(table.resourceId, table.publicId),
  }),
);
id
bigserial
Internal primary key (auto-incrementing)
publicId
text
Your application’s thread identifier (from ThreadIdentifier.threadId)
resourceId
text
Your application’s resource identifier (from ThreadIdentifier.resourceId)
createdAt
timestamp
When the thread was first created
updatedAt
timestamp
When the thread was last updated (any message activity)

messages Table

Stores individual messages:
export const messages = schema.table('messages', {
  id: bigserial({ mode: 'number' }).primaryKey(),         // Internal PK
  uuid: uuid().notNull().unique(),                        // Message ID
  threadId: bigint({ mode: 'number' })
    .references(() => threads.id, { onDelete: 'cascade' })
    .notNull(),
  aiSdkV5: jsonb().$type<Ai.ModelMessage>(),             // Message content
  createdAt: timestamp().defaultNow().notNull(),
});
id
bigserial
Internal primary key for efficient querying
uuid
uuid
The message’s public UUID (from ConversationMessage.id)
threadId
bigint
Foreign key to threads table (cascading delete)
aiSdkV5
jsonb
The complete AI SDK ModelMessage object (role, content, etc.)
createdAt
timestamp
When the message was created

Setup

1. Create the Database Schema

Generate and run migrations:
# Generate migration
bun drizzle-kit generate

# Run migration
bun drizzle-kit migrate
Or create the schema manually:
CREATE SCHEMA IF NOT EXISTS ff_ai;

CREATE TABLE ff_ai.threads (
  id BIGSERIAL PRIMARY KEY,
  public_id TEXT NOT NULL,
  resource_id TEXT NOT NULL,
  created_at TIMESTAMP DEFAULT NOW() NOT NULL,
  updated_at TIMESTAMP DEFAULT NOW() NOT NULL,
  UNIQUE(resource_id, public_id)
);

CREATE TABLE ff_ai.messages (
  id BIGSERIAL PRIMARY KEY,
  uuid UUID NOT NULL UNIQUE,
  thread_id BIGINT NOT NULL REFERENCES ff_ai.threads(id) ON DELETE CASCADE,
  ai_sdk_v5 JSONB,
  created_at TIMESTAMP DEFAULT NOW() NOT NULL
);

CREATE INDEX idx_messages_thread_id ON ff_ai.messages(thread_id);
CREATE INDEX idx_messages_created_at ON ff_ai.messages(created_at);

2. Configure the Provider

import { createDrizzleStoreLayer } from 'ff-ai/providers/drizzle';
import postgres from 'postgres';

const sql = postgres(process.env.DATABASE_URL!);

const storeLayer = createDrizzleStoreLayer(sql, {
  store: {
    casing: 'snake_case'  // Match your database naming convention
  }
});

3. Use in Your Application

import { createTurnHandler } from 'ff-ai';
import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const handler = yield* createTurnHandler({
    identifier: {
      resourceId: 'user-123',
      threadId: 'conversation-456'
    }
  });

  // Use handler...
}).pipe(
  Effect.provide(storeLayer),
  Effect.runPromise
);

API Reference

createDrizzleStoreLayer

Create a Layer that provides the ConversationStore service.
function createDrizzleStoreLayer(
  conn: postgres.Sql,
  opts?: {
    store?: {
      casing?: 'snake_case' | 'camelCase'
    }
  }
): Layer<ConversationStore>
conn
postgres.Sql
required
A postgres.js connection instance
opts.store.casing
'snake_case' | 'camelCase'
default:"undefined"
Database column naming convention. Set to 'snake_case' if your database uses snake_case naming.
return
Layer<ConversationStore>
An Effect Layer that provides the ConversationStore service

Implementation Details

Window Size Query

The provider implements the window size feature efficiently:
  1. Finds the thread by resourceId and publicId
  2. Queries for the N most recent user messages
  3. Identifies the oldest user message in the window
  4. Returns all messages (user, assistant, tool) from that point forward
This ensures you get complete conversation context including all assistant responses and tool interactions.
-- Step 1: Find recent user messages (window)
SELECT id, created_at
FROM ff_ai.messages
WHERE thread_id = $1
  AND ai_sdk_v5->>'role' = 'user'
ORDER BY created_at DESC, id DESC
LIMIT $windowSize;

-- Step 2: Get all messages from oldest in window
SELECT uuid, ai_sdk_v5, created_at
FROM ff_ai.messages
WHERE thread_id = $1
  AND id >= $oldestId
ORDER BY created_at ASC, id ASC;

Transactions

The saveMessages method uses a database transaction to ensure atomicity:
// Pseudocode of the implementation
transaction(async (tx) => {
  // 1. Insert or update thread
  const thread = await tx
    .insert(threads)
    .values({ publicId, resourceId, updatedAt: new Date() })
    .onConflictDoUpdate({
      target: [resourceId, publicId],
      set: { updatedAt: new Date() }
    });

  // 2. Insert messages
  if (messages.length > 0) {
    await tx.insert(messages).values(
      messages.map(msg => ({
        uuid: msg.id,
        threadId: thread.id,
        aiSdkV5: msg,  // Everything except id and createdAt
        createdAt: msg.createdAt
      }))
    );
  }
});
This guarantees:
  • Thread is created/updated atomically with messages
  • Either all messages save or none do (no partial writes)
  • Thread updatedAt always reflects latest activity

Cascade Deletes

Messages are automatically deleted when their thread is deleted:
threadId: bigint({ mode: 'number' })
  .references(() => threads.id, { onDelete: 'cascade' })
This simplifies conversation cleanup:
-- Delete a conversation and all its messages
DELETE FROM ff_ai.threads
WHERE resource_id = 'user-123'
  AND public_id = 'thread-456';
-- Messages are automatically deleted

Performance Considerations

The schema includes essential indexes:
-- On messages.thread_id for fast thread lookups
CREATE INDEX idx_messages_thread_id ON ff_ai.messages(thread_id);

-- On messages.created_at for window queries
CREATE INDEX idx_messages_created_at ON ff_ai.messages(created_at);
Consider adding:
-- For user-specific queries
CREATE INDEX idx_threads_resource_id ON ff_ai.threads(resource_id);

-- For JSONB queries (if needed)
CREATE INDEX idx_messages_role ON ff_ai.messages 
  USING gin ((ai_sdk_v5->'role'));
Use connection pooling for better performance:
const sql = postgres(process.env.DATABASE_URL!, {
  max: 10,  // Maximum pool size
  idle_timeout: 20,
  connect_timeout: 10
});
The aiSdkV5 column stores the complete message as JSONB:
  • Pros: Flexible schema, easy queries, no migration needed
  • Cons: Slightly slower than relational columns
For high-volume applications, consider:
  • Extracting frequently-queried fields to columns
  • Using JSONB operators for efficient queries
  • Adding GIN indexes for JSONB queries
Larger window sizes require more database queries:
  • Window size 10: ~50-100 messages typically returned
  • Window size 50: ~250-500 messages typically returned
Monitor query performance and adjust window sizes accordingly.

Troubleshooting

Ensure the ff_ai schema exists:
CREATE SCHEMA IF NOT EXISTS ff_ai;
Check your connection string and network:
const sql = postgres(process.env.DATABASE_URL!, {
  onnotice: () => {},  // Suppress notices
  debug: true,         // Enable debug logging
});
If you see column name errors, set the casing option:
createDrizzleStoreLayer(sql, {
  store: { casing: 'snake_case' }
});
Ensure threads are created before messages:
// Don't insert messages manually
// Always use ConversationStore.saveMessages
yield* store.saveMessages({
  resourceId: 'user-123',
  threadId: 'thread-456',
  messages: [...]
});

Migration from Other Stores

If you’re migrating from another storage system:
import { ConversationMessage } from 'ff-ai';
import { Effect } from 'effect';

const migrate = Effect.gen(function* () {
  const store = yield* ConversationStore;

  // Fetch from old system
  const oldMessages = yield* fetchOldMessages();

  // Convert to ConversationMessage format
  const messages = oldMessages.map((msg) =>
    ConversationMessage.fromModelMessage({
      role: msg.role,
      content: msg.content
    })
  );

  // Save to new system
  yield* store.saveMessages({
    resourceId: msg.userId,
    threadId: msg.conversationId,
    messages
  });
});

Next Steps