RippleDB
RippleDB
Reference

@rippledb/server

Db interface and server persistence contracts

@rippledb/server

Defines the Db interface for server-side persistence. Implementations live in @rippledb/db-* packages.

Installation

pnpm add @rippledb/server
npm install @rippledb/server
yarn add @rippledb/server

Db Interface

The core interface that all database adapters implement:

interface Db<S extends RippleSchema = RippleSchema> {
  append(req: AppendRequest<S>): Promise<AppendResult>;
  pull(req: PullRequest): Promise<PullResponse<S>>;
}

append(req: AppendRequest<S>): Promise<AppendResult>

Append changes to the log for a stream.

AppendRequest:

type AppendRequest<S extends RippleSchema> = {
  stream: string;
  idempotencyKey?: string;
  changes: Change<S>[];
};
FieldTypeDescription
streamstringPartition key (e.g., user ID, project ID)
idempotencyKeystring?Optional key to prevent duplicate appends
changesChange<S>[]Array of changes to append

AppendResult:

type AppendResult = {
  accepted: number;
  hlc?: Hlc;
};
FieldTypeDescription
acceptednumberNumber of changes accepted (0 if idempotency key was seen)
hlcHlc?Optional server-assigned high watermark

Example:

const result = await db.append({
  stream: "user-123",
  idempotencyKey: "req-abc",
  changes: [
    makeUpsert({
      stream: "user-123",
      entity: "todos",
      entityId: "todo-1",
      patch: { title: "Buy milk" },
      hlc: hlc(),
    }),
  ],
});

console.log(result.accepted); // 1

pull(req: PullRequest): Promise<PullResponse<S>>

Pull changes from the log since a cursor.

PullRequest:

type PullRequest = {
  stream: string;
  cursor: Cursor | null;
  limit?: number;
};
FieldTypeDescription
streamstringPartition key
cursorstring | nullCursor from last pull, or null for beginning
limitnumber?Max changes to return (default: 500)

PullResponse:

type PullResponse<S extends RippleSchema> = {
  changes: Change<S>[];
  nextCursor: Cursor | null;
};
FieldTypeDescription
changesChange<S>[]Changes since cursor
nextCursorstring | nullCursor for next pull

Example:

// First pull (from beginning)
const { changes, nextCursor } = await db.pull({
  stream: "user-123",
  cursor: null,
  limit: 100,
});

console.log(changes.length); // Number of changes
console.log(nextCursor); // e.g., "42"

// Subsequent pull (incremental)
const next = await db.pull({
  stream: "user-123",
  cursor: nextCursor,
  limit: 100,
});

Cursor

Cursors are opaque strings representing a position in the change log:

type Cursor = string;
  • Start with null to pull from the beginning
  • Store the returned nextCursor for incremental pulls
  • Cursors are stream-specific

Implementing a Custom Adapter

import type {
  Db,
  AppendRequest,
  AppendResult,
  PullRequest,
  PullResponse,
} from "@rippledb/server";
import type { RippleSchema } from "@rippledb/core";

class MyDb<S extends RippleSchema> implements Db<S> {
  async append(req: AppendRequest<S>): Promise<AppendResult> {
    // 1. Check idempotency key if provided
    // 2. Insert changes into your storage
    // 3. Optionally materialize to domain tables
    // 4. Return accepted count
    return { accepted: req.changes.length };
  }

  async pull(req: PullRequest): Promise<PullResponse<S>> {
    // 1. Decode cursor to position
    // 2. Query changes after position
    // 3. Encode next position as cursor
    return { changes: [], nextCursor: req.cursor };
  }
}

Built-in Adapters

PackageDatabaseNotes
@rippledb/db-sqliteSQLiteUses better-sqlite3
@rippledb/db-tursoTurso/libSQLEdge-ready
@rippledb/db-drizzleAny Drizzle DBDatabase-agnostic
@rippledb/db-memoryIn-memoryTesting only

On this page