diff --git a/examples/app.ts b/examples/app.ts index 0fd7ce8..e6fcbe0 100644 --- a/examples/app.ts +++ b/examples/app.ts @@ -1,7 +1,7 @@ import Debug from 'debug'; -import {RhizomeNode} from "../src/node"; -import {Entity} from "../src/entity"; import {Collection} from "../src/collection"; +import {Entity} from "../src/entity"; +import {RhizomeNode} from "../src/node"; const debug = Debug('example-app'); // As an app we want to be able to write and read data. @@ -36,11 +36,17 @@ type User = { await rhizomeNode.start(); - // Let's use the rhizomic database for some more things. + // TODO: Use the rhizomic database for some more things. // Like what? // - Logging // - Chat - // + + // TODO: Allow configuration regarding read/write concern i.e. + // if we perform a read immediately do we see the value we wrote? + // Intuition says yes, we want that-- but how do we expose the propagation status? + + // Insert a "user" record + const taliesinData: User = { id: 'taliesin-1', name: 'Taliesin', @@ -48,10 +54,13 @@ type User = { age: Math.floor(Math.random() * 1000) }; - const taliesinPutResult = await users.put(undefined, taliesinData); - { - const result = JSON.stringify(taliesinPutResult); + const taliesinPutResult = await users.put(undefined, taliesinData); + const resolvedUser = { + id: taliesinPutResult.id, + ...taliesinPutResult.properties + } as User; + const result = JSON.stringify(resolvedUser); const expected = JSON.stringify(taliesinData); if (result === expected) { @@ -63,41 +72,27 @@ type User = { } } - // TODO: Allow configuration regarding read/write concern i.e. - // if we perform a read immediately do we see the value we wrote? - // Intuition says yes, we want that-- but how do we expose the propagation status? + // Read back what we wrote - const resolved = users.resolve('taliesin-1'); - if (!resolved) throw new Error('unable to resolve entity we just created'); + { + const resolved = users.resolve('taliesin-1'); + if (!resolved) throw new Error('unable to resolve entity we just created'); - debug('resolved', resolved); + const resolvedUser = { + id: resolved.id, + ...resolved.properties + } as User; - const resolvedUser = { - id: resolved.id, - ...resolved.properties - } as User; + const result = JSON.stringify(resolvedUser); + const expected = JSON.stringify(taliesinData); - /* - function sortKeys (o: {[key: string]: unknown}): {[key: string]: unknown} { - const r: {[key: string]: unknown} = {}; - r.id = o.id; - Object.keys(o).sort().forEach((key) => { - if (key === "id") return; - r[key] = o[key]; - }) - return r; - } - */ - - const result = JSON.stringify(resolvedUser); - const expected = JSON.stringify(taliesinData); - - if (result === expected) { - debug('Get result matches expected: ' + expected); - } else { - debug(`Get result does not match expected.` + - `\n\nExpected \n${expected}` + - `\nReceived\n${result}`); + if (result === expected) { + debug('Get result matches expected: ' + expected); + } else { + debug(`Get result does not match expected.` + + `\n\nExpected \n${expected}` + + `\nReceived\n${result}`); + } } diff --git a/src/collection.ts b/src/collection.ts index bcb657a..6beb83d 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -6,7 +6,7 @@ import Debug from 'debug'; import {randomUUID} from "node:crypto"; import EventEmitter from "node:events"; -import {Delta, DeltaID} from "./delta"; +import {Delta, DeltaFilter} from "./delta"; import {Entity, EntityProperties} from "./entity"; import {Lossy, ResolvedViewOne, Resolver} from "./lossy"; import {RhizomeNode} from "./node"; @@ -17,6 +17,7 @@ export class Collection { rhizomeNode?: RhizomeNode; name: string; eventStream = new EventEmitter(); + lossy?: Lossy; constructor(name: string) { this.name = name; @@ -31,10 +32,15 @@ export class Collection { rhizomeConnect(rhizomeNode: RhizomeNode) { this.rhizomeNode = rhizomeNode; - rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => { - // TODO: Make sure this is the kind of delta we're looking for - debug(`collection ${this.name} received delta ${JSON.stringify(delta)}`); - this.ingestDelta(delta); + this.lossy = new Lossy(this.rhizomeNode.lossless); + + // Listen for completed transactions, and emit updates to event stream + this.rhizomeNode.lossless.eventStream.on("updated", (id) => { + // TODO: Filter so we only get members of our collection + + // TODO: Reslover / Delta Filter? + const res = this.resolve(id); + this.eventStream.emit("update", res); }); rhizomeNode.httpServer.httpApi.serveCollection(this); @@ -42,15 +48,6 @@ export class Collection { debug(`connected ${this.name} to rhizome`); } - ingestDelta(delta: Delta) { - if (!this.rhizomeNode) return; - - const updated = this.rhizomeNode.lossless.ingestDelta(delta); - - this.eventStream.emit('ingested', delta); - this.eventStream.emit('updated', updated); - } - // Applies the javascript rules for updating object values, // e.g. set to `undefined` to delete a property. // This function is here instead of Entity so that it can: @@ -62,7 +59,10 @@ export class Collection { creator: string, host: string, resolver?: Resolver - ): Delta[] { + ): { + transactionDelta: Delta | undefined, + deltas: Delta[] + } { const deltas: Delta[] = []; let oldProperties: EntityProperties = {}; @@ -86,10 +86,6 @@ export class Collection { creator, host, pointers: [{ - localContext: "_transaction", - target: transactionId, - targetContext: "deltas" - }, { localContext: this.name, target: entityId, targetContext: key @@ -101,21 +97,34 @@ export class Collection { } }); - // We can generate a separate delta describing this transaction - const transactionDelta = new Delta({ - creator, - host, - pointers: [{ - localContext: "_transaction", - target: transactionId, - targetContext: "size" - }, { - localContext: "size", - target: deltas.length - }] - }); + let transactionDelta: Delta | undefined; - return [transactionDelta, ...deltas]; + if (deltas.length > 1) { + // We can generate a separate delta describing this transaction + transactionDelta = new Delta({ + creator, + host, + pointers: [{ + localContext: "_transaction", + target: transactionId, + targetContext: "size" + }, { + localContext: "size", + target: deltas.length + }] + }); + + // Also need to annotate the deltas with the transactionId + for (const delta of deltas) { + delta.pointers.unshift({ + localContext: "_transaction", + target: transactionId, + targetContext: "deltas" + }); + } + } + + return {transactionDelta, deltas}; } onCreate(cb: (entity: Entity) => void) { @@ -159,7 +168,7 @@ export class Collection { entityId = randomUUID(); } - const deltas = this.generateDeltas( + const {transactionDelta, deltas} = this.generateDeltas( entityId, properties, this.rhizomeNode?.config.creator, @@ -167,67 +176,47 @@ export class Collection { resolver, ); - debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas)); - - // Here we set up a listener so we can wait for all our deltas to be - // ingested into our lossless view before proceeding. - // TODO: Hoist this into a more generic transaction mechanism. - - // - - const allIngested = new Promise((resolve) => { - const ingestedIds = new Set(); - this.eventStream.on('ingested', (delta: Delta) => { - // TODO: timeout - if (deltas.map(({id}) => id).includes(delta.id)) { - ingestedIds.add(delta.id); - if (ingestedIds.size === deltas.length) { - resolve(true); - } - } + const ingested = new Promise((resolve) => { + this.rhizomeNode!.lossless.eventStream.on("updated", (id: DomainEntityID) => { + if (id === entityId) resolve(true); }) }); - deltas.forEach(async (delta: Delta) => { + if (transactionDelta) { + deltas.unshift(transactionDelta); + } + deltas.forEach(async (delta: Delta) => { // record this delta just as if we had received it from a peer delta.receivedFrom = this.rhizomeNode!.myRequestAddr; this.rhizomeNode!.deltaStream.deltasAccepted.push(delta); // publish the delta to our subscribed peers await this.rhizomeNode!.deltaStream.publishDelta(delta); - debug(`published delta ${JSON.stringify(delta)}`); // ingest the delta as though we had received it from a peer - this.ingestDelta(delta); + this.rhizomeNode!.lossless.ingestDelta(delta); }); // Return updated view of this entity // Let's wait for an event notifying us that the entity has been updated. // This means all of our deltas have been applied. - await allIngested; + await ingested; const res = this.resolve(entityId, resolver); if (!res) throw new Error("could not get what we just put!"); - - this.eventStream.emit("update", res); - return res; } - resolve(id: string, resolver?: Resolver): T | undefined { - // Now with lossy view approach, instead of just returning what we - // already have, let's compute our view now. - // return this.entities.resolve(id); - // TODO: Caching - + resolve( + id: string, + resolver?: Resolver, + deltaFilter?: DeltaFilter + ): T | undefined { if (!this.rhizomeNode) return undefined; - const lossy = new Lossy(this.rhizomeNode.lossless); - // TODO: deltaFilter - const res = lossy.resolve(resolver, [id]); - debug('lossy view', res); + const res = this.lossy?.resolve(resolver, [id], deltaFilter) || {}; return res[id] as T; } diff --git a/src/delta.ts b/src/delta.ts index 8f4a176..6c8612f 100644 --- a/src/delta.ts +++ b/src/delta.ts @@ -1,6 +1,6 @@ -import microtime from 'microtime'; import {randomUUID} from "crypto"; -import {PeerAddress, Timestamp} from "./types"; +import microtime from 'microtime'; +import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types"; export type DeltaID = string; @@ -12,22 +12,39 @@ export type Pointer = { targetContext?: string; }; -export class Delta { +export class DeltaNetworkImage { id: DeltaID; + timeCreated: Timestamp; + host: HostID; + creator: CreatorID; + pointers: Pointer[]; + constructor({id, timeCreated, host, creator, pointers}: DeltaNetworkImage) { + this.id = id; + this.host = host; + this.creator = creator; + this.timeCreated = timeCreated; + this.pointers = pointers; + } +}; + +export class Delta extends DeltaNetworkImage { receivedFrom?: PeerAddress; timeReceived: Timestamp; - timeCreated: Timestamp; - creator: string; - host: string; - pointers: Pointer[] = []; - constructor(delta: Omit) { - this.id = randomUUID(); - this.timeCreated = microtime.now(); + transactionId?: TransactionID; + + // TODO: Verify the following assumption: + // We're assuming that you only call this constructor when + // actually creating a new delta. + // When receiving one from the network, you can + constructor({host, creator, pointers}: Partial) { + // TODO: Verify that when receiving a delta from the network we can + // retain the delta's id. + const id = randomUUID(); + const timeCreated = microtime.now(); + if (!host || !creator || !pointers) throw new Error('uninitializied values'); + super({id, timeCreated, host, creator, pointers}); + this.timeCreated = timeCreated; this.timeReceived = this.timeCreated; - this.creator = delta.creator; - this.host = delta.host; - this.receivedFrom = delta.receivedFrom; - this.pointers = delta.pointers; } } diff --git a/src/deltas.ts b/src/deltas.ts index eda77fe..d15341d 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -1,7 +1,7 @@ import Debug from 'debug'; import EventEmitter from 'node:events'; import objectHash from 'object-hash'; -import {Delta} from './delta'; +import {Delta, DeltaNetworkImage} from './delta'; import {RhizomeNode} from './node'; const debug = Debug('deltas'); @@ -91,7 +91,8 @@ export class DeltaStream { } serializeDelta(delta: Delta): string { - return JSON.stringify(delta); + const deltaNetworkImage = new DeltaNetworkImage(delta); + return JSON.stringify(deltaNetworkImage); } deserializeDelta(input: string): Delta { diff --git a/src/lossless.ts b/src/lossless.ts index edab20f..38d1d49 100644 --- a/src/lossless.ts +++ b/src/lossless.ts @@ -2,13 +2,15 @@ // We can maintain a record of all the targeted entities, and the deltas that targeted them import Debug from 'debug'; -import {Delta, DeltaFilter, DeltaID} from './delta'; +import EventEmitter from 'events'; +import {Delta, DeltaFilter, DeltaNetworkImage} from './delta'; +import {Transactions} from './transactions'; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types"; const debug = Debug('lossless'); export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; -export type CollapsedDelta = Omit & { +export type CollapsedDelta = Omit & { pointers: CollapsedPointer[]; }; @@ -21,9 +23,9 @@ export type LosslessViewOne = { export type LosslessViewMany = ViewMany; -class DomainEntityMap extends Map {}; +class LosslessEntityMap extends Map {}; -class DomainEntity { +class LosslessEntity { id: DomainEntityID; properties = new Map>(); @@ -44,7 +46,6 @@ class DomainEntity { this.properties.set(targetContext, propertyDeltas); } - debug(`adding delta for entity ${this.id}`); propertyDeltas.add(delta); } } @@ -61,50 +62,24 @@ class DomainEntity { } } -class Transaction { - size?: number; - receivedDeltaIds = new Set(); -} - -class Transactions { - transactions = new Map(); - - getOrInit(id: TransactionID): Transaction { - let t = this.transactions.get(id); - if (!t) { - t = new Transaction(); - this.transactions.set(id, t); - } - return t; - } - - receivedDelta(id: TransactionID, deltaId: DeltaID) { - const t = this.getOrInit(id); - t.receivedDeltaIds.add(deltaId); - } - - isComplete(id: TransactionID) { - const t = this.getOrInit(id); - return t.size !== undefined && t.receivedDeltaIds.size === t.size; - } - - setSize(id: TransactionID, size: number) { - const t = this.getOrInit(id); - t.size = size; - } - - get ids() { - return Array.from(this.transactions.keys()); - } -} - export class Lossless { - domainEntities = new DomainEntityMap(); + domainEntities = new LosslessEntityMap(); transactions = new Transactions(); referencedAs = new Map>(); - // referencingAs = new Map>(); + eventStream = new EventEmitter(); - ingestDelta(delta: Delta) { + constructor() { + this.transactions.eventStream.on("completed", (transactionId) => { + debug(`completed transaction ${transactionId}`); + const transaction = this.transactions.get(transactionId); + if (!transaction) return; + for (const id of transaction.entityIds) { + this.eventStream.emit("updated", id); + } + }); + } + + ingestDelta(delta: Delta): TransactionID | undefined { const targets = delta.pointers .filter(({targetContext}) => !!targetContext) .map(({target}) => target) @@ -114,7 +89,7 @@ export class Lossless { let ent = this.domainEntities.get(target); if (!ent) { - ent = new DomainEntity(target); + ent = new LosslessEntity(target); this.domainEntities.set(target, ent); } @@ -134,43 +109,15 @@ export class Lossless { } } - const {target: transactionId} = delta.pointers.find(({ - localContext, - target, - targetContext - }) => - localContext === "_transaction" && - typeof target === "string" && - targetContext === "deltas" - ) || {}; + const transactionId = this.transactions.ingestDelta(delta, targets); - if (transactionId) { - // This delta is part of a transaction - this.transactions.receivedDelta(transactionId as string, delta.id); - } else { - const {target: transactionId} = delta.pointers.find(({ - localContext, - target, - targetContext - }) => - localContext === "_transaction" && - typeof target === "string" && - targetContext === "size" - ) || {}; - - if (transactionId) { - // This delta describes a transaction - const {target: size} = delta.pointers.find(({ - localContext, - target - }) => - localContext === "size" && - typeof target === "number" - ) || {}; - - this.transactions.setSize(transactionId as string, size as number); + if (!transactionId) { + // No transaction -- we can issue an update event immediately + for (const id of targets) { + this.eventStream.emit("updated", id); } } + return transactionId; } view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany { @@ -180,8 +127,6 @@ export class Lossless { const ent = this.domainEntities.get(id); if (!ent) continue; - debug(`domain entity ${id}`, JSON.stringify(ent)); - const referencedAs = new Set(); const properties: { [key: PropertyID]: CollapsedDelta[] @@ -191,6 +136,14 @@ export class Lossless { properties[key] = properties[key] || []; for (const delta of deltas) { + // If this delta is part of a transaction, + // we need to be able to wait for the whole transaction. + if (delta.transactionId) { + if (!this.transactions.isComplete(delta.transactionId)) { + // TODO: Test this condition + continue; + } + } if (deltaFilter) { const include = deltaFilter(delta); diff --git a/src/lossy.ts b/src/lossy.ts index 96eb7bc..05db4e1 100644 --- a/src/lossy.ts +++ b/src/lossy.ts @@ -60,7 +60,6 @@ export function lastValueFromLosslessViewOne( value?: string | number, timeUpdated?: number } = {}; - debug(`trying to get last value for ${key} from ${JSON.stringify(ent.properties[key])}`); res.timeUpdated = 0; for (const delta of ent.properties[key] || []) { @@ -76,23 +75,25 @@ export function lastValueFromLosslessViewOne( } function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { - const resolved: ResolvedViewMany = {}; + const resolved: ResolvedViewMany = {}; - // debug('default resolver, lossless view', JSON.stringify(losslessView)); - for (const [id, ent] of Object.entries(losslessView)) { - resolved[id] = {id, properties: {}}; + // debug('default resolver, lossless view', JSON.stringify(losslessView)); + for (const [id, ent] of Object.entries(losslessView)) { + resolved[id] = {id, properties: {}}; - for (const key of Object.keys(ent.properties)) { - const {value} = lastValueFromLosslessViewOne(ent, key) || {}; + for (const key of Object.keys(ent.properties)) { + const {value} = lastValueFromLosslessViewOne(ent, key) || {}; - // debug(`[ ${key} ] = ${value}`); - resolved[id].properties[key] = value; - } + // debug(`[ ${key} ] = ${value}`); + resolved[id].properties[key] = value; } - return resolved; - }; - + } + return resolved; +}; +// TODO: Incremental updates of lossy models. For example, with last-write-wins, +// we keep the timeUpdated for each field. A second stage resolver can rearrange +// the data structure to a preferred shape and may discard the timeUpdated info. export class Lossy { lossless: Lossless; @@ -104,6 +105,7 @@ export class Lossy { // apply a filter to the deltas composing that lossless view, // and then apply a supplied resolver function which receives // the filtered lossless view as input. + // TODO: Cache things! resolve(fn?: Resolver | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { if (!fn) { fn = defaultResolver; diff --git a/src/node.ts b/src/node.ts index d0633ee..9ea3a06 100644 --- a/src/node.ts +++ b/src/node.ts @@ -31,7 +31,7 @@ export class RhizomeNode { requestReply: RequestReply; httpServer: HttpServer; deltaStream: DeltaStream; - lossless = new Lossless(); + lossless: Lossless; peers: Peers; myRequestAddr: PeerAddress; myPublishAddr: PeerAddress; @@ -66,9 +66,13 @@ export class RhizomeNode { this.httpServer = new HttpServer(this); this.deltaStream = new DeltaStream(this); this.peers = new Peers(this); + this.lossless = new Lossless(); } async start() { + // Connect our lossless view to the delta stream + this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta)); + // Start ZeroMQ publish and reply sockets this.pubSub.start(); this.requestReply.start(); diff --git a/src/peers.ts b/src/peers.ts index 4a20b33..3307076 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -1,11 +1,11 @@ import Debug from 'debug'; import {Message} from 'zeromq'; import {SEED_PEERS} from "./config"; +import {Delta} from "./delta"; import {RhizomeNode} from "./node"; import {Subscription} from './pub-sub'; import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply"; import {PeerAddress} from "./types"; -import {Delta} from "./delta"; const debug = Debug('peers'); export enum RequestMethods { @@ -92,7 +92,9 @@ export class Peers { debug('it\'s a request for deltas'); // TODO: stream these rather than // trying to write them all in one message - await res.send(JSON.stringify(this.rhizomeNode.deltaStream.deltasAccepted)); + const deltas = this.rhizomeNode.deltaStream.deltasAccepted; + debug(`sending ${deltas.length} deltas`); + await res.send(JSON.stringify(deltas)); break; } } diff --git a/src/request-reply.ts b/src/request-reply.ts index 6eb1ff6..9ea0d5c 100644 --- a/src/request-reply.ts +++ b/src/request-reply.ts @@ -45,7 +45,6 @@ export class ResponseSocket { if (typeof msg === 'object') { msg = JSON.stringify(msg); } - debug('sending reply', {msg}); await this.sock.send(msg); } } diff --git a/src/transactions.ts b/src/transactions.ts new file mode 100644 index 0000000..5c8ece5 --- /dev/null +++ b/src/transactions.ts @@ -0,0 +1,136 @@ +import Debug from "debug"; +import EventEmitter from "events"; +import {Delta, DeltaID} from "./delta"; +import {DomainEntityID, TransactionID} from "./types"; +const debug = Debug("transactions"); + +function getDeltaTransactionId(delta: Delta): TransactionID | undefined { + const {target: transactionId} = delta.pointers.find(({ + localContext, + target, + targetContext + }) => + localContext === "_transaction" && + typeof target === "string" && + targetContext === "deltas" + ) || {}; + + if (transactionId && typeof transactionId === "string") { + return transactionId; + } +} + +function getTransactionSize(delta: Delta): { + transactionId: TransactionID, + size: number +} | undefined { + const {target: transactionId} = delta.pointers.find(({ + localContext, + target, + targetContext + }) => + localContext === "_transaction" && + typeof target === "string" && + targetContext === "size" + ) || {}; + + if (transactionId && typeof transactionId === "string") { + // This delta describes a transaction + const {target: size} = delta.pointers.find(({ + localContext, + target + }) => + localContext === "size" && + typeof target === "number" + ) || {}; + + return {transactionId, size: size as number}; + } +} + +export class Transaction { + size?: number; + receivedDeltaIds = new Set(); + entityIds = new Set(); +} + +export class Transactions { + transactions = new Map(); + eventStream = new EventEmitter(); + + get(id: TransactionID): Transaction | undefined { + return this.transactions.get(id); + } + + getOrInit(id: TransactionID): Transaction { + let t = this.transactions.get(id); + if (!t) { + t = new Transaction(); + this.transactions.set(id, t); + } + return t; + } + + ingestDelta(delta: Delta, targets: DomainEntityID[]): TransactionID | undefined { + { + const transactionId = getDeltaTransactionId(delta); + if (transactionId) { + const t = this.getOrInit(transactionId); + for (const id of targets) { + t.entityIds.add(id); + } + + // This delta is part of a transaction + // Add this to the delta's data structure for quick reference + delta.transactionId = transactionId; + + // Update our transaction tracking + this.receivedDelta(transactionId, delta.id); + + // Notify that the transaction is complete + if (this.isComplete(transactionId)) { + this.eventStream.emit("completed", transactionId); + } + + return transactionId; + } + } + + { + const {transactionId, size} = getTransactionSize(delta) || {}; + if (transactionId && size) { + // This delta describes a transaction + + debug(`transaction ${transactionId} has size ${size}`); + + this.setSize(transactionId, size as number); + + // Check if the transaction is complete + if (this.isComplete(transactionId)) { + this.eventStream.emit("completed", transactionId); + } + + return transactionId; + } + } + } + + receivedDelta(id: TransactionID, deltaId: DeltaID) { + const t = this.getOrInit(id); + t.receivedDeltaIds.add(deltaId); + } + + isComplete(id: TransactionID) { + const t = this.getOrInit(id); + return t.size !== undefined && t.receivedDeltaIds.size === t.size; + } + + setSize(id: TransactionID, size: number) { + const t = this.getOrInit(id); + t.size = size; + } + + get ids() { + return Array.from(this.transactions.keys()); + } +} diff --git a/src/types.ts b/src/types.ts index 2616441..0e3916e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -9,6 +9,8 @@ export type PropertyTypes = string | number | undefined; export type DomainEntityID = string; export type PropertyID = string; export type TransactionID = string; +export type HostID = string; +export type CreatorID = string; export type Timestamp = number;