diff --git a/.gitignore b/.gitignore index 1eae0cf..5a541bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ dist/ node_modules/ +*.swp +*.swo diff --git a/src/.collection-layer.ts.swp b/src/.collection-layer.ts.swp deleted file mode 100644 index 9565932..0000000 Binary files a/src/.collection-layer.ts.swp and /dev/null differ diff --git a/src/.example-app.ts.swp b/src/.example-app.ts.swp deleted file mode 100644 index cfefe4b..0000000 Binary files a/src/.example-app.ts.swp and /dev/null differ diff --git a/src/.object-layer.ts.swp b/src/.object-layer.ts.swp deleted file mode 100644 index ad99ea7..0000000 Binary files a/src/.object-layer.ts.swp and /dev/null differ diff --git a/src/collection-layer.ts b/src/collection-layer.ts index c8e93ee..f0ebf3a 100644 --- a/src/collection-layer.ts +++ b/src/collection-layer.ts @@ -2,33 +2,176 @@ // It should enable operations like removing a property removes the value from the entities in the collection // It could then be further extended with e.g. table semantics like filter, sort, join -type Property = { - name: string, - type: number | string; -} -class EntityType { - name: string; - properties?: Property[]; - constructor(name: string) { - this.name = name; - } -} +import EventEmitter from "node:events"; +import { publishDelta, subscribeDeltas } from "./deltas"; +import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer"; +import { Delta } from "./types"; +import { randomUUID } from "node:crypto"; -class Entity { - type: EntityType; - properties?: object; - constructor(type: EntityType) { - this.type = type; - } -} +// type Property = { +// name: string, +// type: number | string; +// } -class Collection { - entities = new Map(); +// class EntityType { +// name: string; +// properties?: Property[]; +// constructor(name: string) { +// this.name = name; +// } +// } + +// class Entity { +// type: EntityType; +// properties?: object; +// constructor(type: EntityType) { +// this.type = type; +// } +// } + +// class Collection { + // update(entityId, properties) // ... -} +// } -export class Collections { - collections = new Map(); +// export class Collections { +// collections = new Map(); +// } + + +export class Collection { + entities = new Map(); + eventStream = new EventEmitter(); + constructor() { + console.log('COLLECTION SUBSCRIBING TO DELTA STREAM'); + subscribeDeltas((delta: Delta) => { + // TODO: Make sure this is the kind of delta we're looking for + console.log('COLLECTION RECEIVED DELTA'); + this.applyDelta(delta); + }); + this.eventStream.on('create', (entity: Entity) => { + console.log(`new entity!`, entity); + }); + } + + // Applies the javascript rules for updating object values, + // e.g. set to `undefined` to delete a property + updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity { + let entity: Entity | undefined; + let eventType: 'create' | 'update' | 'delete' | undefined; + entityId = entityId ?? randomUUID(); + entity = this.entities.get(entityId); + if (!entity) { + entity = new Entity(entityId); + entity.id = entityId; + eventType = 'create'; + } + const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); + + if (!properties) { + // Let's interpret this as entity deletion + this.entities.delete(entityId); + // TODO: prepare and publish a delta + // TODO: execute hooks + eventType = 'delete'; + } else { + let anyChanged = false; + Object.entries(properties).forEach(([key, value]) => { + let changed = false; + if (entity.properties && entity.properties[key] !== value) { + entity.properties[key] = value; + changed = true; + } + if (local && changed) { + // If this is a change, let's generate a delta + deltaBulider.add(key, value); + // We append to the array the caller may provide + // We can update this count as we receive network confirmation for deltas + entity.ahead += 1; + } + anyChanged = anyChanged || changed; + }); + // We've noted that we may be ahead of the server, let's update our + // local image of this entity. + //* In principle, this system can recreate past or alternative states. + //* At worst, by replaying all the deltas up to a particular point. + //* Some sort of checkpointing strategy would probably be helpful. + //* Furthermore, if we can implement reversible transformations, + //* it would then be efficient to calculate the state of the system with + //* specific deltas removed. We could use it to extract a measurement + //* of the effects of some deltas' inclusion or exclusion, the + //* evaluation of which may lend evidence to some possible arguments. + + this.entities.set(entityId, entity); + if (anyChanged) { + deltas?.push(deltaBulider.delta); + eventType = eventType || 'update'; + } + } + if (eventType) { + this.eventStream.emit(eventType, entity); + } + return entity; + } + // We can update our local image of the entity, but we should annotate it + // to indicate that we have not yet received any confirmation of this delta + // having been propagated. + // Later when we receive deltas regarding this entity we can detect when + // we have received back an image that matches our target. + + // So we need a function to generate one or more deltas for each call to put/ + // maybe we stage them and wait for a call to commit() that initiates the + // assembly and transmission of one or more deltas + + applyDelta(delta: Delta) { + // TODO: handle delta representing entity deletion + console.log('applying delta:', delta); + const idPtr = delta.pointers.find(({localContext}) => localContext === 'id'); + if (!idPtr) { + console.error('encountered delta with no entity id', delta); + return; + } + const properties: EntityProperties = {}; + delta.pointers.filter(({localContext}) => localContext !== 'id') + .forEach(({localContext: key, target: value}) => { + properties[key] = value; + }, {}); + const entityId = idPtr.target as string; + // TODO: Handle the scenario where this update has been superceded by a newer one locally + this.updateEntity(entityId, properties); + } + + onCreate(cb: (entity: Entity) => void) { + this.eventStream.on('create', (entity: Entity) => { + cb(entity); + }); + } + onUpdate(cb: (entity: Entity) => void) { + this.eventStream.on('update', (entity: Entity) => { + cb(entity); + }); + } + put(entityId: string | undefined, properties: object): Entity { + const deltas: Delta[] = []; + const entity = this.updateEntity(entityId, properties, true, deltas); + deltas.forEach(async (delta: Delta) => { + await publishDelta(delta); + }); + return entity; + } + del(entityId: string) { + const deltas: Delta[] = []; + this.updateEntity(entityId, undefined, true, deltas); + deltas.forEach(async (delta: Delta) => { + await publishDelta(delta); + }); + } + get(id: string): Entity | undefined { + return this.entities.get(id); + } + getIds(): string[] { + return Array.from(this.entities.keys()); + } } diff --git a/src/deltas.ts b/src/deltas.ts index 0a6d844..848eff7 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -10,7 +10,7 @@ export const deltasRejected: Delta[] = []; export const deltasDeferred: Delta[] = []; export function applyPolicy(delta: Delta): Decision { - return !!delta && Decision.Accept; + return !!delta && Decision.Accept; } export function receiveDelta(delta: Delta) { @@ -18,19 +18,19 @@ export function receiveDelta(delta: Delta) { } export function ingestDelta(delta: Delta) { - const decision = applyPolicy(delta); - switch (decision) { - case Decision.Accept: - deltasAccepted.push(delta); - deltaStream.emit('delta', { delta }); - break; - case Decision.Reject: - deltasRejected.push(delta); - break; - case Decision.Defer: - deltasDeferred.push(delta); - break; - } + const decision = applyPolicy(delta); + switch (decision) { + case Decision.Accept: + deltasAccepted.push(delta); + deltaStream.emit('delta', { delta }); + break; + case Decision.Reject: + deltasRejected.push(delta); + break; + case Decision.Defer: + deltasDeferred.push(delta); + break; + } } export function ingestNext(): boolean { diff --git a/src/example-app.ts b/src/example-app.ts index ff07db4..55a796f 100644 --- a/src/example-app.ts +++ b/src/example-app.ts @@ -2,10 +2,11 @@ import express from "express"; import { bindPublish, } from "./pub-sub"; -import { runDeltas } from "./deltas"; -import { Entities, Entity } from "./object-layer"; +import { deltasAccepted, deltasProposed, runDeltas } from "./deltas"; +import { Entity } from "./object-layer"; +import { Collection } from "./collection-layer"; import { bindReply, runRequestHandlers } from "./request-reply"; -import { subscribeToSeeds } from "./peers"; +import { askAllPeersForDeltas, subscribeToSeeds } from "./peers"; import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config"; @@ -25,7 +26,7 @@ type UserProperties = { }; class Users { - db = new Entities(); + db = new Collection(); create(properties: UserProperties): Entity { // We provide undefined for the id, to let the database generate it // This call returns the id @@ -47,12 +48,23 @@ class Users { } (async () => { - const app = express() + const users = new Users(); + const app = express() app.get("/ids", (req: express.Request, res: express.Response) => { res.json({ ids: users.getIds()}); }); + app.get("/deltas", (req: express.Request, res: express.Response) => { + // TODO: streaming + res.json(deltasAccepted); + }); + + app.get("/deltas/count", (req: express.Request, res: express.Response) => { + // TODO: streaming + res.json(deltasAccepted.length); + }); + if (ENABLE_HTTP_API) { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); @@ -63,21 +75,30 @@ class Users { await bindReply(); runDeltas(); runRequestHandlers(); - await new Promise((resolve) => setTimeout(resolve, 200)); + await new Promise((resolve) => setTimeout(resolve, 500)); subscribeToSeeds(); - await new Promise((resolve) => setTimeout(resolve, 200)); + await new Promise((resolve) => setTimeout(resolve, 500)); + askAllPeersForDeltas(); + await new Promise((resolve) => setTimeout(resolve, 1000)); - const users = new Users(); + setInterval(() => { + console.log('deltasProposed count', deltasProposed.length, + 'deltasAccepted count', deltasAccepted.length); + }, 5000) const taliesin = users.upsert({ - id: 'taliesin-1', + // id: 'taliesin-1', name: 'Taliesin', nameLong: 'Taliesin (Ladd)', age: Math.floor(Math.random() * 1000) }); - taliesin.onUpdate((u: Entity) => { - console.log('User updated', u); + users.db.onUpdate((u: Entity) => { + console.log('User updated:', u); + }); + + users.db.onCreate((u: Entity) => { + console.log('New user!:', u); }); // TODO: Allow configuration regarding read/write concern i.e. diff --git a/src/object-layer.ts b/src/object-layer.ts index f7eaadc..0df5a3b 100644 --- a/src/object-layer.ts +++ b/src/object-layer.ts @@ -7,15 +7,10 @@ // - As typescript interfaces? // - As typescript classes? -import EventEmitter from "node:events"; import { CREATOR, HOST } from "./config"; -import { publishDelta, subscribeDeltas } from "./deltas"; import { Delta, PropertyTypes } from "./types"; -import { randomUUID } from "node:crypto"; -const entityEventStream = new EventEmitter(); - -type EntityProperties = { +export type EntityProperties = { [key: string]: PropertyTypes }; @@ -27,20 +22,11 @@ export class Entity { this.id = id; this.properties = {}; } - onUpdate(cb: (entity: Entity) => void) { - // TODO: This doesn't seem like it will scale well. - entityEventStream.on('update', (entity: Entity) => { - if (entity.id === this.id) { - cb(entity); - } - }); - } } -const entities = new Map(); // TODO: Use leveldb for storing view snapshots -class EntityPropertiesDeltaBuilder { +export class EntityPropertiesDeltaBuilder { delta: Delta; constructor(entityId: string) { this.delta = { @@ -58,123 +44,3 @@ class EntityPropertiesDeltaBuilder { } } -// Applies the javascript rules for updating object values, -// e.g. set to `undefined` to delete a property -function updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity { - let entity: Entity | undefined; - let eventType: 'create' | 'update' | 'delete' | undefined; - entityId = entityId ?? randomUUID(); - entity = entities.get(entityId); - if (!entity) { - entity = new Entity(entityId); - entity.id = entityId; - eventType = 'create'; - } - const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); - - if (!properties) { - // Let's interpret this as entity deletion - entities.delete(entityId); - // TODO: prepare and publish a delta - // TODO: execute hooks - eventType = 'delete'; - } else { - let anyChanged = false; - Object.entries(properties).forEach(([key, value]) => { - let changed = false; - if (entity.properties && entity.properties[key] !== value) { - entity.properties[key] = value; - changed = true; - } - if (local && changed) { - // If this is a change, let's generate a delta - deltaBulider.add(key, value); - // We append to the array the caller may provide - // We can update this count as we receive network confirmation for deltas - entity.ahead += 1; - } - anyChanged = anyChanged || changed; - }); - // We've noted that we may be ahead of the server, let's update our - // local image of this entity. - //* In principle, this system can recreate past or alternative states. - //* At worst, by replaying all the deltas up to a particular point. - //* Some sort of checkpointing strategy would probably be helpful. - //* Furthermore, if we can implement reversible transformations, - //* it would then be efficient to calculate the state of the system with - //* specific deltas removed. We could use it to extract a measurement - //* of the effects of some deltas' inclusion or exclusion, the - //* evaluation of which may lend evidence to some possible arguments. - - entities.set(entityId, entity); - if (anyChanged) { - deltas?.push(deltaBulider.delta); - eventType = eventType || 'update'; - } - } - if (eventType) { - entityEventStream.emit(eventType, entity); - } - return entity; -} - -// We can update our local image of the entity, but we should annotate it -// to indicate that we have not yet received any confirmation of this delta -// having been propagated. -// Later when we receive deltas regarding this entity we can detect when -// we have received back an image that matches our target. - -// So we need a function to generate one or more deltas for each call to put/ -// maybe we stage them and wait for a call to commit() that initiates the -// assembly and transmission of one or more deltas - -function applyDelta(delta: Delta) { - // TODO: handle delta representing entity deletion - const idPtr = delta.pointers.find(({localContext}) => localContext === 'id'); - if (!idPtr) { - console.error('encountered delta with no entity id', delta); - return; - } - const properties: EntityProperties = {}; - delta.pointers.filter(({localContext}) => localContext !== 'id') - .forEach(({localContext: key, target: value}) => { - properties[key] = value; - }, {}); - const entityId = idPtr.target as string; - // TODO: Handle the scenario where this update has been superceded by a newer one locally - updateEntity(entityId, properties); -} - -subscribeDeltas((delta: Delta) => { - // TODO: Make sure this is the kind of delta we're looking for - applyDelta(delta); -}); - -export class Entities { - constructor() { - entityEventStream.on('create', (entity: Entity) => { - console.log(`new entity!`, entity); - }); - } - put(entityId: string | undefined, properties: object): Entity { - const deltas: Delta[] = []; - const entity = updateEntity(entityId, properties, true, deltas); - deltas.forEach(async (delta: Delta) => { - await publishDelta(delta); - }); - return entity; - } - del(entityId: string) { - const deltas: Delta[] = []; - updateEntity(entityId, undefined, true, deltas); - deltas.forEach(async (delta: Delta) => { - await publishDelta(delta); - }); - } - get(id: string): Entity | undefined { - return entities.get(id); - } - getIds(): string[] { - return Array.from(entities.keys()); - } -} diff --git a/src/peers.ts b/src/peers.ts index a496946..588e70b 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -3,11 +3,32 @@ import { registerRequestHandler, PeerRequest, ResponseSocket } from "./request-r import { RequestSocket, } from "./request-reply"; import { SEED_PEERS } from "./config"; import {connectSubscribe} from "./pub-sub"; +import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas"; +import {Delta} from "./types"; export enum PeerMethods { GetPublishAddress, + AskForDeltas } +registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { + console.log('inspecting peer request'); + switch (req.method) { + case PeerMethods.GetPublishAddress: { + console.log('it\'s a request for our publish address'); + await res.send(publishAddr); + break; + } + case PeerMethods.AskForDeltas: { + console.log('it\'s a request for deltas'); + // TODO: stream these rather than + // trying to write them all in one message + await res.send(JSON.stringify(deltasAccepted)); + break; + } + } +}); + export type PeerAddress = { addr: string, port: number @@ -33,23 +54,46 @@ class Peer { connectSubscribe(addr, port); } } + async askForDeltas(): Promise { + // TODO as a first approximation we are trying to cram the entire history + // of accepted deltas, into one (potentially gargantuan) json message. + // A second approximation would be to stream the deltas. + // Third pass should find a way to reduce the number of deltas transmitted. + + // TODO: requestTimeout + const res = await this.reqSock.request(PeerMethods.AskForDeltas); + const deltas = JSON.parse(res.toString()); + return deltas; + } +} + +const peers: Peer[] = []; + +function newPeer(addr: string, port: number) { + const peer = new Peer(addr, port); + peers.push(peer); + return peer; } export async function subscribeToSeeds() { SEED_PEERS.forEach(async ({addr, port}, idx) => { console.log(`SEED PEERS[${idx}]=${addr}:${port}`); - const peer = new Peer(addr, port); + const peer = newPeer(addr, port); await peer.subscribe(); }); } -registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { - console.log('inspecting peer request'); - switch (req.method) { - case PeerMethods.GetPublishAddress: - console.log('it\'s a request for our publish address'); - await res.send(publishAddr); - break; - } -}); - +//! TODO Expect abysmal scaling properties with this function +export async function askAllPeersForDeltas() { + peers.forEach(async (peer, idx) => { + console.log(`Asking peer ${idx} for deltas`); + const deltas = await peer.askForDeltas(); + console.log('received deltas:', deltas); + for (const delta of deltas) { + receiveDelta(delta); + } + console.log('deltasProposed count', deltasProposed.length); + console.log('deltasAccepted count', deltasAccepted.length); + ingestAll(); + }); +}