From 066c03f690e5d63936bbcdd5e2d0ef3fbce0173e Mon Sep 17 00:00:00 2001 From: Ladd Date: Sun, 29 Dec 2024 17:50:20 -0600 Subject: [PATCH] scaffolding to support transactions --- __tests__/lossy.ts | 4 +- examples/app.ts | 6 +++ src/collection.ts | 72 +++++++++++++++++--------------- src/http/api.ts | 45 ++++++++++++++++++++ src/lossless.ts | 100 +++++++++++++++++++++++++++++++++++++++++---- src/lossy.ts | 39 ++++++++++-------- src/types.ts | 1 + 7 files changed, 208 insertions(+), 59 deletions(-) diff --git a/__tests__/lossy.ts b/__tests__/lossy.ts index 903120f..c37ccc6 100644 --- a/__tests__/lossy.ts +++ b/__tests__/lossy.ts @@ -1,6 +1,6 @@ import {Delta, PointerTarget} from "../src/delta"; import {Lossless, LosslessViewMany} from "../src/lossless"; -import {Lossy, firstValueFromLosslessViewOne, valueFromCollapsedDelta} from "../src/lossy"; +import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta, ResolvedViewMany} from "../src/lossy"; describe('Lossy', () => { describe('se a provided function to resolve entity views', () => { @@ -48,7 +48,7 @@ describe('Lossy', () => { const roles: Role[] = []; for (const [id, ent] of Object.entries(losslessView)) { if (ent.referencedAs.includes("role")) { - const {delta, value: actor} = firstValueFromLosslessViewOne(ent, "actor") ?? {}; + const {delta, value: actor} = lastValueFromLosslessViewOne(ent, "actor") ?? {}; if (!delta) continue; // TODO: panic if (!actor) continue; // TODO: panic const film = valueFromCollapsedDelta(delta, "film"); diff --git a/examples/app.ts b/examples/app.ts index dcef86c..0fd7ce8 100644 --- a/examples/app.ts +++ b/examples/app.ts @@ -19,6 +19,10 @@ type User = { (async () => { const rhizomeNode = new RhizomeNode(); + + // Enable API to read lossless view + rhizomeNode.httpServer.httpApi.serveLossless(); + const users = new Collection("user"); users.rhizomeConnect(rhizomeNode); @@ -66,6 +70,8 @@ type User = { const resolved = users.resolve('taliesin-1'); if (!resolved) throw new Error('unable to resolve entity we just created'); + debug('resolved', resolved); + const resolvedUser = { id: resolved.id, ...resolved.properties diff --git a/src/collection.ts b/src/collection.ts index 10d4153..bcb657a 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -8,8 +8,7 @@ import {randomUUID} from "node:crypto"; import EventEmitter from "node:events"; import {Delta, DeltaID} from "./delta"; import {Entity, EntityProperties} from "./entity"; -import {LosslessViewMany} from "./lossless"; -import {lastValueFromLosslessViewOne, Lossy, ResolvedViewMany, ResolvedViewOne, Resolver} from "./lossy"; +import {Lossy, ResolvedViewOne, Resolver} from "./lossy"; import {RhizomeNode} from "./node"; import {DomainEntityID} from "./types"; const debug = Debug('collection'); @@ -60,9 +59,9 @@ export class Collection { generateDeltas( entityId: DomainEntityID, newProperties: EntityProperties, - resolver?: Resolver, - creator?: string, - host?: string + creator: string, + host: string, + resolver?: Resolver ): Delta[] { const deltas: Delta[] = []; let oldProperties: EntityProperties = {}; @@ -74,9 +73,12 @@ export class Collection { } } + // Generate a transaction ID + const transactionId = `transaction-${randomUUID()}`; + // Generate a delta for each changed property Object.entries(newProperties).forEach(([key, value]) => { - // Disallow property named "id" TODO: Clarify id semantics + // Disallow property named "id" if (key === 'id') return; if (oldProperties[key] !== value && host && creator) { @@ -84,6 +86,10 @@ export class Collection { creator, host, pointers: [{ + localContext: "_transaction", + target: transactionId, + targetContext: "deltas" + }, { localContext: this.name, target: entityId, targetContext: key @@ -95,7 +101,21 @@ export class Collection { } }); - return deltas; + // We can generate a separate delta describing this transaction + const transactionDelta = new Delta({ + creator, + host, + pointers: [{ + localContext: "_transaction", + target: transactionId, + targetContext: "size" + }, { + localContext: "size", + target: deltas.length + }] + }); + + return [transactionDelta, ...deltas]; } onCreate(cb: (entity: Entity) => void) { @@ -114,7 +134,9 @@ export class Collection { getIds(): string[] { if (!this.rhizomeNode) return []; - return Array.from(this.rhizomeNode.lossless.domainEntities.keys()); + const set = this.rhizomeNode.lossless.referencedAs.get(this.name); + if (!set) return []; + return Array.from(set.values()); } // THIS PUT SHOULD CORRESOND TO A PARTICULAR MATERIALIZED VIEW... @@ -126,6 +148,8 @@ export class Collection { properties: EntityProperties, resolver?: Resolver ): Promise { + if (!this.rhizomeNode) throw new Error('collection not connecte to rhizome'); + // For convenience, we allow setting id via properties.id if (!entityId && !!properties.id && typeof properties.id === 'string') { entityId = properties.id; @@ -138,9 +162,9 @@ export class Collection { const deltas = this.generateDeltas( entityId, properties, - resolver, this.rhizomeNode?.config.creator, this.rhizomeNode?.config.peerId, + resolver, ); debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas)); @@ -149,6 +173,8 @@ export class Collection { // ingested into our lossless view before proceeding. // TODO: Hoist this into a more generic transaction mechanism. + // + const allIngested = new Promise((resolve) => { const ingestedIds = new Set(); this.eventStream.on('ingested', (delta: Delta) => { @@ -190,25 +216,7 @@ export class Collection { return res; } - // TODO: default should probably be last write wins - defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { - const resolved: ResolvedViewMany = {}; - - // debug('default resolver, lossless view', JSON.stringify(losslessView)); - for (const [id, ent] of Object.entries(losslessView)) { - resolved[id] = {id, properties: {}}; - - for (const key of Object.keys(ent.properties)) { - const {value} = lastValueFromLosslessViewOne(ent, key) || {}; - - // debug(`[ ${key} ] = ${value}`); - resolved[id].properties[key] = value; - } - } - return resolved; - } - - resolve(id: string, resolver?: Resolver): ResolvedViewOne | undefined { + resolve(id: string, resolver?: Resolver): T | undefined { // Now with lossy view approach, instead of just returning what we // already have, let's compute our view now. // return this.entities.resolve(id); @@ -216,15 +224,11 @@ export class Collection { if (!this.rhizomeNode) return undefined; - if (!resolver) { - debug('using default resolver'); - resolver = (view) => this.defaultResolver(view); - } - const lossy = new Lossy(this.rhizomeNode.lossless); + // TODO: deltaFilter const res = lossy.resolve(resolver, [id]); debug('lossy view', res); - return res[id]; + return res[id] as T; } } diff --git a/src/http/api.ts b/src/http/api.ts index 4aef2cc..b325a64 100644 --- a/src/http/api.ts +++ b/src/http/api.ts @@ -88,4 +88,49 @@ export class HttpApi { res.json(ent); }); } + + serveLossless() { + // Get all domain entity IDs. TODO: This won't scale + this.router.get('/lossless/ids', (_req: express.Request, res: express.Response) => { + res.json({ + ids: Array.from(this.rhizomeNode.lossless.domainEntities.keys()) + }); + }); + + // Get all transaction IDs. TODO: This won't scale + this.router.get('/transaction/ids', (_req: express.Request, res: express.Response) => { + const set = this.rhizomeNode.lossless.referencedAs.get("_transaction"); + res.json({ + ids: set ? Array.from(set.values()) : [] + }); + }); + + // View a single transaction + this.router.get('/transaction/:id', (req: express.Request, res: express.Response) => { + const {params: {id}} = req; + const v = this.rhizomeNode.lossless.view([id]); + const ent = v[id]; + if (!ent.referencedAs.includes("_transaction")) { + res.status(400).json({error: "Entity is not a transaction", id}); + return; + } + + res.json({ + ...ent, + isComplete: this.rhizomeNode.lossless.transactions.isComplete(id) + }); + }); + + // Get a lossless view of a single domain entity + this.router.get('/lossless/:id', (req: express.Request, res: express.Response) => { + const {params: {id}} = req; + const v = this.rhizomeNode.lossless.view([id]); + const ent = v[id]; + + res.json({ + ...ent, + isComplete: this.rhizomeNode.lossless.transactions.isComplete(id) + }); + }); + } } diff --git a/src/lossless.ts b/src/lossless.ts index ff3b56a..edab20f 100644 --- a/src/lossless.ts +++ b/src/lossless.ts @@ -2,11 +2,11 @@ // We can maintain a record of all the targeted entities, and the deltas that targeted them import Debug from 'debug'; -import {Delta, DeltaFilter} from './delta'; -import {DomainEntityID, PropertyID, PropertyTypes, ViewMany} from "./types"; +import {Delta, DeltaFilter, DeltaID} from './delta'; +import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types"; const debug = Debug('lossless'); -export type CollapsedPointer = {[key: string]: PropertyTypes}; +export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; export type CollapsedDelta = Omit & { pointers: CollapsedPointer[]; @@ -61,8 +61,48 @@ class DomainEntity { } } +class Transaction { + size?: number; + receivedDeltaIds = new Set(); +} + +class Transactions { + transactions = new Map(); + + getOrInit(id: TransactionID): Transaction { + let t = this.transactions.get(id); + if (!t) { + t = new Transaction(); + this.transactions.set(id, t); + } + return t; + } + + receivedDelta(id: TransactionID, deltaId: DeltaID) { + const t = this.getOrInit(id); + t.receivedDeltaIds.add(deltaId); + } + + isComplete(id: TransactionID) { + const t = this.getOrInit(id); + return t.size !== undefined && t.receivedDeltaIds.size === t.size; + } + + setSize(id: TransactionID, size: number) { + const t = this.getOrInit(id); + t.size = size; + } + + get ids() { + return Array.from(this.transactions.keys()); + } +} + export class Lossless { domainEntities = new DomainEntityMap(); + transactions = new Transactions(); + referencedAs = new Map>(); + // referencingAs = new Map>(); ingestDelta(delta: Delta) { const targets = delta.pointers @@ -78,15 +118,61 @@ export class Lossless { this.domainEntities.set(target, ent); } - debug('before add, domain entity:', JSON.stringify(ent)); - ent.addDelta(delta); + } - debug('after add, domain entity:', JSON.stringify(ent)); + for (const {target, localContext} of delta.pointers) { + if (typeof target === "string" && this.domainEntities.has(target)) { + if (this.domainEntities.has(target)) { + let referencedAs = this.referencedAs.get(localContext); + if (!referencedAs) { + referencedAs = new Set(); + this.referencedAs.set(localContext, referencedAs); + } + referencedAs.add(target); + } + } + } + + const {target: transactionId} = delta.pointers.find(({ + localContext, + target, + targetContext + }) => + localContext === "_transaction" && + typeof target === "string" && + targetContext === "deltas" + ) || {}; + + if (transactionId) { + // This delta is part of a transaction + this.transactions.receivedDelta(transactionId as string, delta.id); + } else { + const {target: transactionId} = delta.pointers.find(({ + localContext, + target, + targetContext + }) => + localContext === "_transaction" && + typeof target === "string" && + targetContext === "size" + ) || {}; + + if (transactionId) { + // This delta describes a transaction + const {target: size} = delta.pointers.find(({ + localContext, + target + }) => + localContext === "size" && + typeof target === "number" + ) || {}; + + this.transactions.setSize(transactionId as string, size as number); + } } } - //TODO: json logic -- view(deltaFilter?: FilterExpr) { view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany { const view: LosslessViewMany = {}; entityIds = entityIds ?? Array.from(this.domainEntities.keys()); diff --git a/src/lossy.ts b/src/lossy.ts index 3bf7d70..96eb7bc 100644 --- a/src/lossy.ts +++ b/src/lossy.ts @@ -45,20 +45,6 @@ export function valueFromCollapsedDelta( } } -// Example function for resolving a value for an entity by taking the first value we find -export function firstValueFromLosslessViewOne( - ent: LosslessViewOne, - key: string -): { - delta: CollapsedDelta, - value: string | number -} | undefined { - debug(`trying to get first value for ${key} from ${JSON.stringify(ent.properties[key])}`); - for (const delta of ent.properties[key] || []) { - const value = valueFromCollapsedDelta(delta, key); - if (value) return {delta, value}; - } -} // Function for resolving a value for an entity by last write wins export function lastValueFromLosslessViewOne( @@ -89,6 +75,24 @@ export function lastValueFromLosslessViewOne( return res; } +function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { + const resolved: ResolvedViewMany = {}; + + // debug('default resolver, lossless view', JSON.stringify(losslessView)); + for (const [id, ent] of Object.entries(losslessView)) { + resolved[id] = {id, properties: {}}; + + for (const key of Object.keys(ent.properties)) { + const {value} = lastValueFromLosslessViewOne(ent, key) || {}; + + // debug(`[ ${key} ] = ${value}`); + resolved[id].properties[key] = value; + } + } + return resolved; + }; + + export class Lossy { lossless: Lossless; @@ -100,9 +104,12 @@ export class Lossy { // apply a filter to the deltas composing that lossless view, // and then apply a supplied resolver function which receives // the filtered lossless view as input. - resolve(fn: Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter) { + resolve(fn?: Resolver | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { + if (!fn) { + fn = defaultResolver; + } const losslessView = this.lossless.view(entityIds, deltaFilter); - return fn(losslessView); + return fn(losslessView) as T; } } diff --git a/src/types.ts b/src/types.ts index bdee6a8..2616441 100644 --- a/src/types.ts +++ b/src/types.ts @@ -8,6 +8,7 @@ export type PropertyTypes = string | number | undefined; export type DomainEntityID = string; export type PropertyID = string; +export type TransactionID = string; export type Timestamp = number;