@rippledb/client
Client-side sync orchestration and Store interface
@rippledb/client
Client-side sync orchestration including the Store interface, outbox, and replicator.
Installation
pnpm add @rippledb/clientnpm install @rippledb/clientyarn add @rippledb/clientStore Interface
The contract for client-side local storage:
interface Store<S extends RippleSchema, ListQuery = unknown> {
applyChanges(changes: Change<S>[]): Promise<void>;
getRow<E extends EntityName<S>>(entity: E, id: string): Promise<S[E] | null>;
getRows<E extends EntityName<S>>(
entity: E,
ids: string[],
): Promise<Map<string, S[E]>>;
listRows(query: ListQuery): Promise<Array<S[EntityName<S>]>>;
onEvent?(cb: (event: DbEvent<S>) => void): () => void;
}applyChanges(changes: Change<S>[]): Promise<void>
Apply changes to local storage transactionally.
await store.applyChanges([
makeUpsert({ ... }),
makeDelete({ ... }),
]);getRow<E>(entity: E, id: string): Promise<S[E] | null>
Fetch a single entity by primary key.
const todo = await store.getRow("todos", "todo-1");getRows<E>(entity: E, ids: string[]): Promise<Map<string, S[E]>>
Bulk read by primary key. Required method for efficient batch loading. Implementations should optimize this when possible (e.g., single SQL query), but can fall back to parallel getRow calls if needed.
const todos = await store.getRows("todos", ["todo-1", "todo-2"]);
// Returns Map with found entities (missing/deleted ones are omitted)listRows(query: ListQuery): Promise<Array<...>>
Run an arbitrary query. Shape is store-specific.
// Example with SQL store
const todos = await store.listRows("SELECT * FROM todos WHERE done = 0");onEvent?(cb: (event: DbEvent<S>) => void): () => void
Subscribe to write events. Used for UI invalidation.
const unsubscribe = store.onEvent?.((event) => {
console.log(event.entity, event.kind, event.id);
// Invalidate UI caches
});DbEvent
type DbEventKind = "insert" | "update" | "delete";
type DbEvent<S extends RippleSchema, E extends EntityName<S>> = {
entity: E;
kind: DbEventKind;
id?: string;
};Remote Interface
The contract for server communication:
type Remote<S extends RippleSchema> = {
pull(req: {
stream: string;
cursor: string | null;
limit?: number;
}): Promise<{
changes: Change<S>[];
nextCursor: string | null;
}>;
append(req: {
stream: string;
idempotencyKey?: string;
changes: Change<S>[];
}): Promise<{ accepted: number }>;
};Use @rippledb/remote-http for the HTTP implementation.
Outbox
Buffer for local changes waiting to be pushed:
interface Outbox<S extends RippleSchema> {
push(entry: OutboxEntry<S>): void;
drain(stream: string): OutboxEntry<S>[];
size(stream?: string): number;
}
type OutboxEntry<S extends RippleSchema> = {
stream: string;
change: Change<S>;
};InMemoryOutbox
Built-in in-memory implementation:
import { InMemoryOutbox } from '@rippledb/client';
const outbox = new InMemoryOutbox<MySchema>();
// Add a local change
outbox.push({
stream: 'user-123',
change: makeUpsert({ ... }),
});
// Get pending changes for a stream
const pending = outbox.drain('user-123');syncOnce
Single sync cycle: pull → apply → push.
import { syncOnce } from "@rippledb/client";
const result = await syncOnce({
stream: "user-123",
store,
remote,
cursor: lastCursor,
outbox,
limit: 100,
idempotencyKey: "sync-abc",
});
console.log(result.pulled); // Changes pulled from server
console.log(result.pushed); // Changes pushed to server
console.log(result.nextCursor); // Save for next syncOptions:
type SyncOnceOptions<S extends RippleSchema> = {
stream: string;
store: Store<S>;
remote: Remote<S>;
cursor: string | null;
outbox: Outbox<S>;
limit?: number;
idempotencyKey?: string;
};Result:
type SyncOnceResult = {
nextCursor: string | null;
pulled: number;
pushed: number;
};Replicator
Convenience wrapper managing cursor and outbox:
import { createReplicator } from '@rippledb/client';
const replicator = createReplicator({
stream: 'user-123',
store,
remote,
cursor: null,
});
// Local write
await replicator.pushLocal(makeUpsert({ ... }));
// Sync with server
const result = await replicator.sync();
// Get current cursor
const cursor = replicator.getCursor();Interface:
type Replicator<S extends RippleSchema> = {
pushLocal(change: Change<S>): Promise<void>;
sync(): Promise<SyncOnceResult>;
getCursor(): string | null;
};Options:
type ReplicatorOptions<S extends RippleSchema> = {
stream: string;
store: Store<S>;
remote: Remote<S>;
outbox?: Outbox<S>; // Default: InMemoryOutbox
cursor?: string | null;
limit?: number;
idempotencyKey?: string;
};Sync Flow
The canonical sync order is Pull → Apply → Push:
This ensures:
- Remote truth is incorporated first
- Local changes are reapplied
- Conflicts resolve deterministically
Related Packages
- @rippledb/client-query — Recommended high-level client API
- @rippledb/remote-http — HTTP transport for client-server sync
- @rippledb/core — Change types and HLC utilities
- @rippledb/server — Server-side persistence counterparts