RippleDB
RippleDB
Reference

@rippledb/client

Client-side sync orchestration and Store interface

@rippledb/client

Client-side sync orchestration including the Store interface, outbox, and replicator.

Installation

pnpm add @rippledb/client
npm install @rippledb/client
yarn add @rippledb/client

Store Interface

The contract for client-side local storage:

interface Store<S extends RippleSchema, ListQuery = unknown> {
  applyChanges(changes: Change<S>[]): Promise<void>;
  getRow<E extends EntityName<S>>(entity: E, id: string): Promise<S[E] | null>;
  getRows<E extends EntityName<S>>(
    entity: E,
    ids: string[],
  ): Promise<Map<string, S[E]>>;
  listRows(query: ListQuery): Promise<Array<S[EntityName<S>]>>;
  onEvent?(cb: (event: DbEvent<S>) => void): () => void;
}

applyChanges(changes: Change<S>[]): Promise<void>

Apply changes to local storage transactionally.

await store.applyChanges([
  makeUpsert({ ... }),
  makeDelete({ ... }),
]);

getRow<E>(entity: E, id: string): Promise<S[E] | null>

Fetch a single entity by primary key.

const todo = await store.getRow("todos", "todo-1");

getRows<E>(entity: E, ids: string[]): Promise<Map<string, S[E]>>

Bulk read by primary key. Required method for efficient batch loading. Implementations should optimize this when possible (e.g., single SQL query), but can fall back to parallel getRow calls if needed.

const todos = await store.getRows("todos", ["todo-1", "todo-2"]);
// Returns Map with found entities (missing/deleted ones are omitted)

listRows(query: ListQuery): Promise<Array<...>>

Run an arbitrary query. Shape is store-specific.

// Example with SQL store
const todos = await store.listRows("SELECT * FROM todos WHERE done = 0");

onEvent?(cb: (event: DbEvent<S>) => void): () => void

Subscribe to write events. Used for UI invalidation.

const unsubscribe = store.onEvent?.((event) => {
  console.log(event.entity, event.kind, event.id);
  // Invalidate UI caches
});

DbEvent

type DbEventKind = "insert" | "update" | "delete";

type DbEvent<S extends RippleSchema, E extends EntityName<S>> = {
  entity: E;
  kind: DbEventKind;
  id?: string;
};

Remote Interface

The contract for server communication:

type Remote<S extends RippleSchema> = {
  pull(req: {
    stream: string;
    cursor: string | null;
    limit?: number;
  }): Promise<{
    changes: Change<S>[];
    nextCursor: string | null;
  }>;

  append(req: {
    stream: string;
    idempotencyKey?: string;
    changes: Change<S>[];
  }): Promise<{ accepted: number }>;
};

Use @rippledb/remote-http for the HTTP implementation.

Outbox

Buffer for local changes waiting to be pushed:

interface Outbox<S extends RippleSchema> {
  push(entry: OutboxEntry<S>): void;
  drain(stream: string): OutboxEntry<S>[];
  size(stream?: string): number;
}

type OutboxEntry<S extends RippleSchema> = {
  stream: string;
  change: Change<S>;
};

InMemoryOutbox

Built-in in-memory implementation:

import { InMemoryOutbox } from '@rippledb/client';

const outbox = new InMemoryOutbox<MySchema>();

// Add a local change
outbox.push({
  stream: 'user-123',
  change: makeUpsert({ ... }),
});

// Get pending changes for a stream
const pending = outbox.drain('user-123');

syncOnce

Single sync cycle: pull → apply → push.

import { syncOnce } from "@rippledb/client";

const result = await syncOnce({
  stream: "user-123",
  store,
  remote,
  cursor: lastCursor,
  outbox,
  limit: 100,
  idempotencyKey: "sync-abc",
});

console.log(result.pulled); // Changes pulled from server
console.log(result.pushed); // Changes pushed to server
console.log(result.nextCursor); // Save for next sync

Options:

type SyncOnceOptions<S extends RippleSchema> = {
  stream: string;
  store: Store<S>;
  remote: Remote<S>;
  cursor: string | null;
  outbox: Outbox<S>;
  limit?: number;
  idempotencyKey?: string;
};

Result:

type SyncOnceResult = {
  nextCursor: string | null;
  pulled: number;
  pushed: number;
};

Replicator

Convenience wrapper managing cursor and outbox:

import { createReplicator } from '@rippledb/client';

const replicator = createReplicator({
  stream: 'user-123',
  store,
  remote,
  cursor: null,
});

// Local write
await replicator.pushLocal(makeUpsert({ ... }));

// Sync with server
const result = await replicator.sync();

// Get current cursor
const cursor = replicator.getCursor();

Interface:

type Replicator<S extends RippleSchema> = {
  pushLocal(change: Change<S>): Promise<void>;
  sync(): Promise<SyncOnceResult>;
  getCursor(): string | null;
};

Options:

type ReplicatorOptions<S extends RippleSchema> = {
  stream: string;
  store: Store<S>;
  remote: Remote<S>;
  outbox?: Outbox<S>; // Default: InMemoryOutbox
  cursor?: string | null;
  limit?: number;
  idempotencyKey?: string;
};

Sync Flow

The canonical sync order is Pull → Apply → Push:

ServerClientpullapply changesOutboxServerpush

This ensures:

  1. Remote truth is incorporated first
  2. Local changes are reapplied
  3. Conflicts resolve deterministically

On this page