From a5fb49475b200effa767e565e545222002dc2fb5 Mon Sep 17 00:00:00 2001 From: Ladd Date: Wed, 25 Dec 2024 16:13:48 -0600 Subject: [PATCH] refactored globals into classes --- README.md | 9 +- __tests__/run.ts | 60 ++++++++++ src/collection.ts | 38 +++--- src/config.ts | 4 +- src/context.ts | 0 src/deltas.ts | 173 +++++++++++++--------------- src/example-app.ts | 32 ++---- src/http-api.ts | 249 +++++++++++++++++++++------------------- src/node.ts | 82 +++++++++++++ src/object-layer.ts | 12 +- src/peers.ts | 164 +++++++++++++++----------- src/pub-sub.ts | 88 +++++++++++--- src/query.ts | 15 --- src/request-reply.ts | 117 +++++++++++-------- src/typed-collection.ts | 1 + src/types.ts | 8 ++ 16 files changed, 649 insertions(+), 403 deletions(-) create mode 100644 __tests__/run.ts create mode 100644 src/context.ts create mode 100644 src/node.ts delete mode 100644 src/query.ts diff --git a/README.md b/README.md index 28e50c1..b6ce005 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,8 @@ Then if their clocks drift relative to ours, we can seek consensus among a broad But at that point just run ntpd. Can still do consensus to verify but probably no need to implement custom time synchronization protocol. -Wait NTP is centralized isn't it, not peer to peer... +Apparently PTP, Precision Time Protocol, is a thing. +PTP affords for a layer of user defined priority for best clock selection. ## Peering @@ -118,7 +119,7 @@ To demonstrate the example application, you can open multiple terminals, and in export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4000 export RHIZOME_PUBLISH_BIND_PORT=4001 -export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004' +export RHIZOME_SEED_PEERS='localhost:4002, localhost:4004' export RHIZOME_HTTP_API_PORT=3000 export RHIZOME_PEER_ID=peer1 npm run example-app @@ -128,7 +129,7 @@ npm run example-app export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4002 export RHIZOME_PUBLISH_BIND_PORT=4003 -export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004' +export RHIZOME_SEED_PEERS='localhost:4000, localhost:4004' export RHIZOME_HTTP_API_PORT=3001 export RHIZOME_PEER_ID=peer2 npm run example-app @@ -138,7 +139,7 @@ npm run example-app export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4004 export RHIZOME_PUBLISH_BIND_PORT=4005 -export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002' +export RHIZOME_SEED_PEERS='localhost:4000, localhost:4002' export RHIZOME_HTTP_API_PORT=3002 export RHIZOME_PEER_ID=peer3 npm run example-app diff --git a/__tests__/run.ts b/__tests__/run.ts new file mode 100644 index 0000000..73133c8 --- /dev/null +++ b/__tests__/run.ts @@ -0,0 +1,60 @@ +import Debug from 'debug'; +import {RhizomeNode, RhizomeNodeConfig} from "../src/node"; +import {TypedCollection} from '../src/typed-collection'; +const debug = Debug('test:run'); + +type User = { + id?: string; + name: string; + nameLong?: string; + email?: string; + age: number; +}; + +class App extends RhizomeNode { + constructor(config?: Partial) { + super(config); + const users = new TypedCollection("users"); + users.rhizomeConnect(this); + } +} + +describe('Run', () => { + let app: App; + + beforeAll(async () => { + app = new App({ + // TODO expose more conveniently as test config options + httpPort: 5000, + httpEnable: true, + requestBindPort: 5001, + publishBindPort: 5002, + }); + await app.start(); + }); + + afterAll(async () => { + debug('attempting to stop app'); + await app.stop(); + }); + + it('can put a new user', async () => { + const res = await fetch('http://localhost:5000/users', { + method: 'PUT', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({ + name: "Peon", + id: "peon-1", + age: 263 + }) + }); + const data = await res.json(); + expect(data).toMatchObject({ + properties: { + name: "Peon", + id: "peon-1", + age: 263 + } + }); + }); +}); diff --git a/src/collection.ts b/src/collection.ts index bb2c0c6..8cb3ca5 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -3,12 +3,11 @@ // It should enable operations like removing a property removes the value from the entities in the collection // It could then be further extended with e.g. table semantics like filter, sort, join +import {randomUUID} from "node:crypto"; import EventEmitter from "node:events"; -import { deltasAccepted, publishDelta, subscribeDeltas } from "./deltas"; -import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer"; -import { Delta } from "./types"; -import { randomUUID } from "node:crypto"; -import {myRequestAddr} from "./peers"; +import {RhizomeNode} from "./node"; +import {Entity, EntityProperties, EntityPropertiesDeltaBuilder} from "./object-layer"; +import {Delta} from "./types"; // type Property = { // name: string, @@ -24,13 +23,24 @@ import {myRequestAddr} from "./peers"; // } export class Collection { + rhizomeNode?: RhizomeNode; + name: string; entities = new Map(); eventStream = new EventEmitter(); - constructor() { - subscribeDeltas((delta: Delta) => { + + constructor(name: string) { + this.name = name; + } + + rhizomeConnect(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; + + rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => { // TODO: Make sure this is the kind of delta we're looking for this.applyDelta(delta); }); + + rhizomeNode.httpApi.serveCollection(this); } // Applies the javascript rules for updating object values, @@ -45,7 +55,7 @@ export class Collection { entity.id = entityId; eventType = 'create'; } - const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); + const deltaBulider = new EntityPropertiesDeltaBuilder(this.rhizomeNode!, entityId); if (!properties) { // Let's interpret this as entity deletion @@ -80,7 +90,7 @@ export class Collection { //* specific deltas removed. We could use it to extract a measurement //* of the effects of some deltas' inclusion or exclusion, the //* evaluation of which may lend evidence to some possible arguments. - + this.entities.set(entityId, entity); if (anyChanged) { deltas?.push(deltaBulider.delta); @@ -135,9 +145,9 @@ export class Collection { const deltas: Delta[] = []; const entity = this.updateEntity(entityId, properties, true, deltas); deltas.forEach(async (delta: Delta) => { - delta.receivedFrom = myRequestAddr; - deltasAccepted.push(delta); - await publishDelta(delta); + delta.receivedFrom = this.rhizomeNode!.myRequestAddr; + this.rhizomeNode!.deltaStream.deltasAccepted.push(delta); + await this.rhizomeNode!.deltaStream.publishDelta(delta); }); return entity; } @@ -146,8 +156,8 @@ export class Collection { const deltas: Delta[] = []; this.updateEntity(entityId, undefined, true, deltas); deltas.forEach(async (delta: Delta) => { - deltasAccepted.push(delta); - await publishDelta(delta); + this.rhizomeNode!.deltaStream.deltasAccepted.push(delta); + await this.rhizomeNode!.deltaStream.publishDelta(delta); }); } diff --git a/src/config.ts b/src/config.ts index 1821528..24cf79b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -6,7 +6,7 @@ import {PeerAddress} from "./types"; export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data'; export const CREATOR = process.env.USER!; -export const HOST_ID = process.env.RHIZOME_PEER_ID || randomUUID(); +export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID(); export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost'; export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS; export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000'); @@ -14,7 +14,7 @@ export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUES export const PUBLISH_BIND_ADDR = process.env.RHIZOME_PUBLISH_BIND_ADDR || ADDRESS; export const PUBLISH_BIND_PORT = parseInt(process.env.RHIZOME_PUBLISH_BIND_PORT || '4001'); export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLISH_BIND_ADDR; -export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || '127.0.0.1'; +export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost'; export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000'); export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true'; export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',') diff --git a/src/context.ts b/src/context.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/deltas.ts b/src/deltas.ts index 021ac62..a4d9890 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -1,103 +1,94 @@ +import Debug from 'debug'; import EventEmitter from 'node:events'; import objectHash from 'object-hash'; -import {myRequestAddr} from './peers'; -import {publishSock, subscribeSock} from './pub-sub'; -import {Decision, Delta, PeerAddress} from './types'; -import Debug from 'debug'; +import {RhizomeNode} from './node'; +import {Decision, Delta} from './types'; const debug = Debug('deltas'); -export const deltaStream = new EventEmitter(); +export class DeltaStream { + rhizomeNode: RhizomeNode; + deltaStream = new EventEmitter(); + deltasProposed: Delta[] = []; + deltasAccepted: Delta[] = []; + deltasRejected: Delta[] = []; + deltasDeferred: Delta[] = []; + hashesReceived = new Set(); -export const deltasProposed: Delta[] = []; -export const deltasAccepted: Delta[] = []; -export const deltasRejected: Delta[] = []; -export const deltasDeferred: Delta[] = []; - -export const hashesReceived = new Set(); - -export function applyPolicy(delta: Delta): Decision { - return !!delta && Decision.Accept; -} - -export function receiveDelta(delta: Delta) { - // Deduplication: if we already received this delta, disregard it - const hash = objectHash(delta); - if (!hashesReceived.has(hash)) { - hashesReceived.add(hash); - deltasProposed.push(delta); + constructor(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; } -} -export function ingestDelta(delta: Delta) { - const decision = applyPolicy(delta); - switch (decision) { - case Decision.Accept: - deltasAccepted.push(delta); - deltaStream.emit('delta', {delta}); - break; - case Decision.Reject: - deltasRejected.push(delta); - break; - case Decision.Defer: - deltasDeferred.push(delta); - break; + applyPolicy(delta: Delta): Decision { + return !!delta && Decision.Accept; } -} -export function ingestNext(): boolean { - const delta = deltasProposed.shift(); - if (!delta) { - return false; - } - ingestDelta(delta); - return true; -} - -export function ingestAll() { - while (ingestNext()); -} - -export function ingestNextDeferred(): boolean { - const delta = deltasDeferred.shift(); - if (!delta) { - return false; - } - ingestDelta(delta); - return true; -} - -export function ingestAllDeferred() { - while (ingestNextDeferred()); -} - -export function subscribeDeltas(fn: (delta: Delta) => void) { - deltaStream.on('delta', ({delta}) => { - fn(delta); - }); -} - -export async function publishDelta(delta: Delta) { - debug(`Publishing delta: ${JSON.stringify(delta)}`); - await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]); -} - -function serializeDelta(delta: Delta) { - return JSON.stringify(delta); -} - -function deserializeDelta(input: string) { - return JSON.parse(input); -} - -export async function runDeltas() { - for await (const [topic, sender, msg] of subscribeSock) { - if (topic.toString() !== "deltas") { - continue; + receiveDelta(delta: Delta) { + // Deduplication: if we already received this delta, disregard it + const hash = objectHash(delta); + if (!this.hashesReceived.has(hash)) { + this.hashesReceived.add(hash); + this.deltasProposed.push(delta); } - const delta = deserializeDelta(msg.toString()); - delta.receivedFrom = PeerAddress.fromString(sender.toString()); - debug(`Received delta: ${JSON.stringify(delta)}`); - ingestDelta(delta); + } + + ingestDelta(delta: Delta) { + const decision = this.applyPolicy(delta); + switch (decision) { + case Decision.Accept: + this.deltasAccepted.push(delta); + this.deltaStream.emit('delta', {delta}); + break; + case Decision.Reject: + this.deltasRejected.push(delta); + break; + case Decision.Defer: + this.deltasDeferred.push(delta); + break; + } + } + + ingestNext(): boolean { + const delta = this.deltasProposed.shift(); + if (!delta) { + return false; + } + this.ingestDelta(delta); + return true; + } + + ingestAll() { + while (this.ingestNext()); + } + + ingestNextDeferred(): boolean { + const delta = this.deltasDeferred.shift(); + if (!delta) { + return false; + } + this.ingestDelta(delta); + return true; + } + + ingestAllDeferred() { + while (this.ingestNextDeferred()); + } + + subscribeDeltas(fn: (delta: Delta) => void) { + this.deltaStream.on('delta', ({delta}) => { + fn(delta); + }); + } + + async publishDelta(delta: Delta) { + debug(`Publishing delta: ${JSON.stringify(delta)}`); + await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta)); + } + + serializeDelta(delta: Delta) { + return JSON.stringify(delta); + } + + deserializeDelta(input: string) { + return JSON.parse(input); } } - diff --git a/src/example-app.ts b/src/example-app.ts index d767986..057a473 100644 --- a/src/example-app.ts +++ b/src/example-app.ts @@ -1,12 +1,7 @@ -import {HTTP_API_ENABLE} from "./config"; -import {runDeltas} from "./deltas"; -import {runHttpApi} from "./http-api"; -import {Entity} from "./object-layer"; -import {askAllPeersForDeltas, subscribeToSeeds} from "./peers"; -import {bindPublish, } from "./pub-sub"; -import {bindReply, runRequestHandlers} from "./request-reply"; -import {TypedCollection} from "./typed-collection"; import Debug from 'debug'; +import {RhizomeNode} from "./node"; +import {Entity} from "./object-layer"; +import {TypedCollection} from "./typed-collection"; const debug = Debug('example-app'); // As an app we want to be able to write and read data. @@ -23,21 +18,9 @@ type User = { }; (async () => { - const users = new TypedCollection(); - - await bindPublish(); - await bindReply(); - if (HTTP_API_ENABLE) { - runHttpApi({users}); - } - - runDeltas(); - runRequestHandlers(); - await new Promise((resolve) => setTimeout(resolve, 500)); - subscribeToSeeds(); - await new Promise((resolve) => setTimeout(resolve, 500)); - askAllPeersForDeltas(); - await new Promise((resolve) => setTimeout(resolve, 1000)); + const rhizomeNode = new RhizomeNode(); + const users = new TypedCollection("users"); + users.rhizomeConnect(rhizomeNode); users.onUpdate((u: Entity) => { debug('User updated:', u); @@ -47,6 +30,9 @@ type User = { debug('New user!:', u); }); + + await rhizomeNode.start() + const taliesin = users.put(undefined, { // id: 'taliesin-1', name: 'Taliesin', diff --git a/src/http-api.ts b/src/http-api.ts index b01aeb6..76417df 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -1,25 +1,22 @@ import Debug from "debug"; import express from "express"; +import {FSWatcher} from "fs"; import {readdirSync, readFileSync, watch} from "fs"; +import {Server} from "http"; import path, {join} from "path"; import {Converter} from "showdown"; import {Collection} from "./collection"; -import {HTTP_API_ADDR, HTTP_API_PORT} from "./config"; -import {deltasAccepted} from "./deltas"; -import {peers} from "./peers"; +import {RhizomeNode} from "./node"; import {Delta} from "./types"; const debug = Debug('http-api'); -type CollectionsToServe = { - [key: string]: Collection; -}; - const docConverter = new Converter({ completeHTMLDocument: true, // simpleLineBreaks: true, tables: true, tasklists: true }); + const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md); type mdFileInfo = { @@ -31,6 +28,8 @@ type mdFileInfo = { class MDFiles { files = new Map(); readme?: mdFileInfo; + dirWatcher?: FSWatcher; + readmeWatcher?: FSWatcher; readFile(name: string) { const md = readFileSync(join('./markdown', `${name}.md`)).toString(); @@ -66,7 +65,7 @@ class MDFiles { } watchDir() { - watch('./markdown', null, (eventType, filename) => { + this.dirWatcher = watch('./markdown', null, (eventType, filename) => { if (!filename) return; if (!filename.endsWith(".md")) return; @@ -91,7 +90,7 @@ class MDFiles { } watchReadme() { - watch('./README.md', null, (eventType, filename) => { + this.readmeWatcher = watch('./README.md', null, (eventType, filename) => { if (!filename) return; switch (eventType) { @@ -104,127 +103,143 @@ class MDFiles { } }); } + + close() { + this.dirWatcher?.close(); + this.readmeWatcher?.close(); + } } -export function runHttpApi(collections?: CollectionsToServe) { - const app = express(); - app.use(express.json()); +export class HttpApi { + rhizomeNode: RhizomeNode; + app = express(); + mdFiles = new MDFiles(); + server?: Server; - // Get list of markdown files - const mdFiles = new MDFiles(); - mdFiles.readDir(); - mdFiles.readReadme(); - mdFiles.watchDir(); - mdFiles.watchReadme(); + constructor(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; + this.app.use(express.json()); + } - // Serve README - app.get('/html/README', (_req: express.Request, res: express.Response) => { - const html = mdFiles.getReadmeHTML(); - res.setHeader('content-type', 'text/html').send(html); - }); + start() { + // Scan and watch for markdown files + this.mdFiles.readDir(); + this.mdFiles.readReadme(); + this.mdFiles.watchDir(); + this.mdFiles.watchReadme(); - // Serve markdown files as html - app.get('/html/:name', (req: express.Request, res: express.Response) => { - let html = mdFiles.getHtml(req.params.name); - if (!html) { - res.status(404); - html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)'); - } - res.setHeader('content-type', 'text/html'); - res.send(html); - }); - - // Serve index - { - let md = `# Files\n\n`; - md += `[README](/html/README)\n\n`; - for (const name of mdFiles.list()) { - md += `- [${name}](./${name})\n`; - } - const html = htmlDocFromMarkdown(md); - - app.get('/html', (_req: express.Request, res: express.Response) => { + // Serve README + this.app.get('/html/README', (_req: express.Request, res: express.Response) => { + const html = this.mdFiles.getReadmeHTML(); res.setHeader('content-type', 'text/html').send(html); }); + + // Serve markdown files as html + this.app.get('/html/:name', (req: express.Request, res: express.Response) => { + let html = this.mdFiles.getHtml(req.params.name); + if (!html) { + res.status(404); + html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)'); + } + res.setHeader('content-type', 'text/html'); + res.send(html); + }); + + // Serve index + { + let md = `# Files\n\n`; + md += `[README](/html/README)\n\n`; + for (const name of this.mdFiles.list()) { + md += `- [${name}](./${name})\n`; + } + const html = htmlDocFromMarkdown(md); + + this.app.get('/html', (_req: express.Request, res: express.Response) => { + res.setHeader('content-type', 'text/html').send(html); + }); + } + + // Serve list of all deltas accepted + // TODO: This won't scale well + this.app.get("/deltas", (_req: express.Request, res: express.Response) => { + res.json(this.rhizomeNode.deltaStream.deltasAccepted); + }); + + // Get the number of deltas ingested by this node + this.app.get("/deltas/count", (_req: express.Request, res: express.Response) => { + res.json(this.rhizomeNode.deltaStream.deltasAccepted.length); + }); + + // Get the list of peers seen by this node (including itself) + this.app.get("/peers", (_req: express.Request, res: express.Response) => { + res.json(this.rhizomeNode.peers.peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { + const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted + .filter((delta: Delta) => { + return delta.receivedFrom?.addr == reqAddr.addr && + delta.receivedFrom?.port == reqAddr.port; + }) + .length; + const peerInfo = { + reqAddr: reqAddr.toAddrString(), + publishAddr: publishAddr?.toAddrString(), + isSelf, + isSeedPeer, + deltaCount: { + accepted: deltasAcceptedCount + } + }; + return peerInfo; + })); + }); + + // Get the number of peers seen by this node (including itself) + this.app.get("/peers/count", (_req: express.Request, res: express.Response) => { + res.json(this.rhizomeNode.peers.peers.length); + }); + + const {httpAddr, httpPort} = this.rhizomeNode.config; + this.server = this.app.listen(httpPort, httpAddr, () => { + debug(`HTTP API bound to ${httpAddr}:${httpPort}`); + }); } - // Set up API routes + serveCollection(collection: Collection) { + const {name} = collection; - if (collections) { - for (const [name, collection] of Object.entries(collections)) { - debug(`collection: ${name}`); + // Get the ID of all domain entities + this.app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => { + res.json({ids: collection.getIds()}); + }); - // Get the ID of all domain entities - app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => { - res.json({ids: collection.getIds()}); - }); + // Get a single domain entity by ID + this.app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => { + const {params: {id}} = req; + const ent = collection.get(id); + res.json(ent); + }); - // Get a single domain entity by ID - app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => { - const {params: {id}} = req; - const ent = collection.get(id); - res.json(ent); - }); + // Add a new domain entity + // TODO: schema validation + this.app.put(`/${name}`, (req: express.Request, res: express.Response) => { + const {body: properties} = req; + const ent = collection.put(properties.id, properties); + res.json(ent); + }); - // Add a new domain entity - // TODO: schema validation - app.put(`/${name}`, (req: express.Request, res: express.Response) => { - const {body: properties} = req; - const ent = collection.put(properties.id, properties); - res.json(ent); - }); - - // Update a domain entity - app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => { - const {body: properties, params: {id}} = req; - if (properties.id && properties.id !== id) { - res.status(400).json({error: "ID Mismatch", param: id, property: properties.id}); - return; - } - const ent = collection.put(id, properties); - res.json(ent); - }); - } + // Update a domain entity + this.app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => { + const {body: properties, params: {id}} = req; + if (properties.id && properties.id !== id) { + res.status(400).json({error: "ID Mismatch", param: id, property: properties.id}); + return; + } + const ent = collection.put(id, properties); + res.json(ent); + }); } - app.get("/deltas", (_req: express.Request, res: express.Response) => { - // TODO: streaming - res.json(deltasAccepted); - }); - - // Get the number of deltas ingested by this node - app.get("/deltas/count", (_req: express.Request, res: express.Response) => { - res.json(deltasAccepted.length); - }); - - // Get the list of peers seen by this node (including itself) - app.get("/peers", (_req: express.Request, res: express.Response) => { - res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { - const deltasAcceptedCount = deltasAccepted - .filter((delta: Delta) => { - return delta.receivedFrom?.addr == reqAddr.addr && - delta.receivedFrom?.port == reqAddr.port; - }) - .length; - const peerInfo = { - reqAddr: reqAddr.toAddrString(), - publishAddr: publishAddr?.toAddrString(), - isSelf, - isSeedPeer, - deltaCount: { - accepted: deltasAcceptedCount - } - }; - return peerInfo; - })); - }); - - // Get the number of peers seen by this node (including itself) - app.get("/peers/count", (_req: express.Request, res: express.Response) => { - res.json(peers.length); - }); - - app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { - debug(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); - }); + async stop() { + this.server?.close(); + this.mdFiles.close(); + } } diff --git a/src/node.ts b/src/node.ts new file mode 100644 index 0000000..c57d33a --- /dev/null +++ b/src/node.ts @@ -0,0 +1,82 @@ +import Debug from 'debug'; +import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config'; +import {DeltaStream} from './deltas'; +import {HttpApi} from './http-api'; +import {Peers} from './peers'; +import {PubSub} from './pub-sub'; +import {RequestReply} from './request-reply'; +import {PeerAddress} from './types'; +import {Collection} from './collection'; +const debug = Debug('rhizome-node'); + +export type RhizomeNodeConfig = { + requestBindAddr: string; + requestBindHost: string; + requestBindPort: number; + publishBindAddr: string; + publishBindHost: string; + publishBindPort: number; + httpAddr: string; + httpPort: number; + httpEnable: boolean; + seedPeers: PeerAddress[]; + peerId: string; + creator: string; // TODO each host should be able to support multiple users +}; + +// So that we can run more than one instance in the same process (for testing) +export class RhizomeNode { + config: RhizomeNodeConfig; + pubSub: PubSub; + requestReply: RequestReply; + httpApi: HttpApi; + deltaStream: DeltaStream; + peers: Peers; + myRequestAddr: PeerAddress; + myPublishAddr: PeerAddress; + + constructor(config?: Partial) { + this.config = { + requestBindAddr: REQUEST_BIND_ADDR, + requestBindHost: REQUEST_BIND_HOST, + requestBindPort: REQUEST_BIND_PORT, + publishBindAddr: PUBLISH_BIND_ADDR, + publishBindHost: PUBLISH_BIND_HOST, + publishBindPort: PUBLISH_BIND_PORT, + httpAddr: HTTP_API_ADDR, + httpPort: HTTP_API_PORT, + httpEnable: HTTP_API_ENABLE, + seedPeers: SEED_PEERS, + peerId: PEER_ID, + creator: CREATOR, + ...config + }; + debug('config', this.config); + this.myRequestAddr = new PeerAddress(this.config.requestBindHost, this.config.requestBindPort); + this.myPublishAddr = new PeerAddress(this.config.publishBindHost, this.config.publishBindPort); + this.pubSub = new PubSub(this); + this.requestReply = new RequestReply(this); + this.httpApi = new HttpApi(this); + this.deltaStream = new DeltaStream(this); + this.peers = new Peers(this); + } + + async start() { + this.pubSub.start(); + this.requestReply.start(); + if (this.config.httpEnable) { + this.httpApi.start(); + } + await new Promise((resolve) => setTimeout(resolve, 500)); + this.peers.subscribeToSeeds(); + await new Promise((resolve) => setTimeout(resolve, 500)); + this.peers.askAllPeersForDeltas(); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + async stop() { + await this.pubSub.stop(); + await this.requestReply.stop(); + await this.httpApi.stop(); + } +} diff --git a/src/object-layer.ts b/src/object-layer.ts index 4ab91d5..bab164d 100644 --- a/src/object-layer.ts +++ b/src/object-layer.ts @@ -1,14 +1,14 @@ // The goal here is to provide a translation for // entities and their properties // to and from (sequences of) deltas. - + // How can our caller define the entities and their properties? // - As typescript types? // - As typescript interfaces? // - As typescript classes? -import { CREATOR, HOST_ID } from "./config"; -import { Delta, PropertyTypes } from "./types"; +import {RhizomeNode} from "./node"; +import {Delta, PropertyTypes} from "./types"; export type EntityProperties = { [key: string]: PropertyTypes; @@ -29,10 +29,10 @@ export class Entity { export class EntityPropertiesDeltaBuilder { delta: Delta; - constructor(entityId: string) { + constructor(rhizomeNode: RhizomeNode, entityId: string) { this.delta = { - creator: CREATOR, - host: HOST_ID, + creator: rhizomeNode.config.creator, + host: rhizomeNode.config.peerId, pointers: [{ localContext: 'id', target: entityId, diff --git a/src/peers.ts b/src/peers.ts index 3dfc0a3..82f0331 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -1,9 +1,10 @@ -import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config"; -import {deltasAccepted, ingestAll, receiveDelta} from "./deltas"; -import {connectSubscribe} from "./pub-sub"; -import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply"; -import {Delta, PeerAddress} from "./types"; import Debug from 'debug'; +import {Message} from 'zeromq'; +import {SEED_PEERS} from "./config"; +import {RhizomeNode} from "./node"; +import {Subscription} from './pub-sub'; +import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply"; +import {Delta, PeerAddress} from "./types"; const debug = Debug('peers'); export enum PeerMethods { @@ -11,48 +12,50 @@ export enum PeerMethods { AskForDeltas } -export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_PORT); -export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT); - -registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { - debug('inspecting peer request'); - switch (req.method) { - case PeerMethods.GetPublishAddress: { - debug('it\'s a request for our publish address'); - await res.send(myPublishAddr.toAddrString()); - break; - } - case PeerMethods.AskForDeltas: { - debug('it\'s a request for deltas'); - // TODO: stream these rather than - // trying to write them all in one message - await res.send(JSON.stringify(deltasAccepted)); - break; - } - } -}); - class Peer { + rhizomeNode: RhizomeNode; reqAddr: PeerAddress; - reqSock: RequestSocket; + reqSock?: RequestSocket; publishAddr: PeerAddress | undefined; isSelf: boolean; isSeedPeer: boolean; - constructor(addr: string, port: number) { - this.reqAddr = new PeerAddress(addr, port); - this.reqSock = new RequestSocket(addr, port); - this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port; - this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => - addr === seedPeer.addr && port === seedPeer.port); + subscription?: Subscription; + + constructor(rhizomeNode: RhizomeNode, reqAddr: PeerAddress) { + this.rhizomeNode = rhizomeNode; + this.reqAddr = reqAddr; + this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr); + this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer)); } - async subscribe() { - if (!this.publishAddr) { - const res = await this.reqSock.request(PeerMethods.GetPublishAddress); - // TODO: input validation - this.publishAddr = PeerAddress.fromString(res.toString()); - connectSubscribe(this.publishAddr!); + + async request(method: PeerMethods): Promise { + if (!this.reqSock) { + this.reqSock = new RequestSocket(this.reqAddr); } + return this.reqSock.request(method); } + + async subscribeDeltas() { + if (!this.publishAddr) { + debug(`requesting publish addr from peer ${this.reqAddr.toAddrString()}`); + const res = await this.request(PeerMethods.GetPublishAddress); + this.publishAddr = PeerAddress.fromString(res.toString()); + debug(`received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`); + } + + this.subscription = this.rhizomeNode.pubSub.subscribe( + this.publishAddr, + "deltas", + (sender, msg) => { + const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg.toString()); + delta.receivedFrom = sender; + debug(`Received delta: ${JSON.stringify(delta)}`); + this.rhizomeNode.deltaStream.ingestDelta(delta); + }); + + this.subscription.start(); + } + async askForDeltas(): Promise { // TODO as a first approximation we are trying to cram the entire history // of accepted deltas, into one (potentially gargantuan) json message. @@ -60,42 +63,69 @@ class Peer { // Third pass should find a way to reduce the number of deltas transmitted. // TODO: requestTimeout - const res = await this.reqSock.request(PeerMethods.AskForDeltas); + const res = await this.request(PeerMethods.AskForDeltas); const deltas = JSON.parse(res.toString()); return deltas; } } -export const peers: Peer[] = []; +export class Peers { + rhizomeNode: RhizomeNode; + peers: Peer[] = []; -peers.push(new Peer(myRequestAddr.addr, myRequestAddr.port)); + constructor(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; -function newPeer(addr: string, port: number) { - const peer = new Peer(addr, port); - peers.push(peer); - return peer; -} + // Add self to the list of peers, but don't connect + this.addPeer(this.rhizomeNode.myRequestAddr); -export async function subscribeToSeeds() { - SEED_PEERS.forEach(async ({addr, port}, idx) => { - debug(`SEED PEERS[${idx}]=${addr}:${port}`); - const peer = newPeer(addr, port); - await peer.subscribe(); - }); -} - -//! TODO Expect abysmal scaling properties with this function -export async function askAllPeersForDeltas() { - peers - .filter(({isSelf}) => !isSelf) - .forEach(async (peer, idx) => { - debug(`Asking peer ${idx} for deltas`); - const deltas = await peer.askForDeltas(); - debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); - for (const delta of deltas) { - delta.receivedFrom = peer.reqAddr; - receiveDelta(delta); + this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { + debug('inspecting peer request'); + switch (req.method) { + case PeerMethods.GetPublishAddress: { + debug('it\'s a request for our publish address'); + await res.send(this.rhizomeNode.myPublishAddr.toAddrString()); + break; + } + case PeerMethods.AskForDeltas: { + debug('it\'s a request for deltas'); + // TODO: stream these rather than + // trying to write them all in one message + await res.send(JSON.stringify(this.rhizomeNode.deltaStream.deltasAccepted)); + break; + } } - ingestAll(); }); + + } + + addPeer(addr: PeerAddress): Peer { + const peer = new Peer(this.rhizomeNode, addr); + this.peers.push(peer); + debug('added peer', addr); + return peer; + } + + async subscribeToSeeds() { + SEED_PEERS.forEach(async (addr, idx) => { + debug(`SEED PEERS[${idx}]=${addr.toAddrString()}`); + const peer = this.addPeer(addr); + await peer.subscribeDeltas(); + }); + } + + //! TODO Expect abysmal scaling properties with this function + async askAllPeersForDeltas() { + this.peers.filter(({isSelf}) => !isSelf) + .forEach(async (peer, idx) => { + debug(`Asking peer ${idx} for deltas`); + const deltas = await peer.askForDeltas(); + debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); + for (const delta of deltas) { + delta.receivedFrom = peer.reqAddr; + this.rhizomeNode.deltaStream.receiveDelta(delta); + } + this.rhizomeNode.deltaStream.ingestAll(); + }); + } } diff --git a/src/pub-sub.ts b/src/pub-sub.ts index 4356241..5286255 100644 --- a/src/pub-sub.ts +++ b/src/pub-sub.ts @@ -1,23 +1,79 @@ -import {Publisher, Subscriber} from 'zeromq'; -import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config'; -import {PeerAddress} from './types'; import Debug from 'debug'; +import {Message, Publisher, Subscriber} from 'zeromq'; +import {RhizomeNode} from './node'; +import {PeerAddress} from './types'; const debug = Debug('pub-sub'); -export const publishSock = new Publisher(); -export const subscribeSock = new Subscriber(); +export type SubscribedMessageHandler = (sender: PeerAddress, msg: Message) => void; -export async function bindPublish() { - const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`; - await publishSock.bind(addrStr); - debug(`Publishing socket bound to ${addrStr}`); +// TODO: Allow subscribing to multiple topics on one socket +export class Subscription { + sock = new Subscriber(); + topic: string; + publishAddr: PeerAddress; + publishAddrStr: string; + cb: SubscribedMessageHandler; + + constructor(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler) { + this.cb = cb; + this.topic = topic; + this.publishAddr = publishAddr; + this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`; + } + + async start() { + this.sock.connect(this.publishAddrStr); + this.sock.subscribe(this.topic); + debug(`Subscribing to ${this.topic} topic on ${this.publishAddrStr}`); + + for await (const [, sender, msg] of this.sock) { + const senderAddr = PeerAddress.fromString(sender.toString()); + this.cb(senderAddr, msg); + } + } } -export function connectSubscribe(publishAddr: PeerAddress) { - // TODO: peer discovery - const addrStr = `tcp://${publishAddr.toAddrString()}`; - debug('connectSubscribe', {addrStr}); - subscribeSock.connect(addrStr); - subscribeSock.subscribe("deltas"); - debug(`Subscribing to ${addrStr}`); +export class PubSub { + rhizomeNode: RhizomeNode; + publishSock: Publisher; + publishAddrStr: string; + subscriptions: Subscription[] = []; + + constructor(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; + this.publishSock = new Publisher(); + + const {publishBindAddr, publishBindPort} = this.rhizomeNode.config; + this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`; + } + + async start() { + await this.publishSock.bind(this.publishAddrStr); + debug(`Publishing socket bound to ${this.publishAddrStr}`); + } + + async publish(topic: string, msg: string) { + await this.publishSock.send([ + topic, + this.rhizomeNode.myRequestAddr.toAddrString(), + msg + ]); + } + + subscribe(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler): Subscription { + const subscription = new Subscription(publishAddr, topic, cb); + this.subscriptions.push(subscription); + return subscription; + } + + async stop() { + await this.publishSock.unbind(this.publishAddrStr); + this.publishSock.close(); + this.publishSock = new Publisher(); + + for (const subscription of this.subscriptions) { + subscription.sock.close(); + debug('subscription socket is closed?', subscription.sock.closed); + } + } } diff --git a/src/query.ts b/src/query.ts deleted file mode 100644 index aa3199e..0000000 --- a/src/query.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { Query, QueryResult, } from './types'; -import { deltasAccepted } from './deltas'; -import { applyFilter } from './filter'; - -// export const queryResultMemo = new Map(); - -export function issueQuery(query: Query): QueryResult { - const deltas = applyFilter(deltasAccepted, query.filterExpr); - return { - deltas - // TODO: Materialized view / state collapse snapshot - }; -} - - diff --git a/src/request-reply.ts b/src/request-reply.ts index 6a5bb33..029e912 100644 --- a/src/request-reply.ts +++ b/src/request-reply.ts @@ -1,8 +1,9 @@ -import { Request, Reply, Message } from 'zeromq'; -import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config'; -import { EventEmitter } from 'node:events'; -import { PeerMethods } from './peers'; +import {Request, Reply, Message} from 'zeromq'; +import {EventEmitter} from 'node:events'; +import {PeerMethods} from './peers'; import Debug from 'debug'; +import {RhizomeNode} from './node'; +import {PeerAddress} from './types'; const debug = Debug('request-reply'); export type PeerRequest = { @@ -11,32 +12,28 @@ export type PeerRequest = { export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void; -export const replySock = new Reply(); -const requestStream = new EventEmitter(); +// TODO: Retain handle to request socket for each peer, so we only need to open once +export class RequestSocket { + sock = new Request(); -export async function bindReply() { - const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`; - await replySock.bind(addrStr); - debug(`Reply socket bound to ${addrStr}`); -} - -export async function runRequestHandlers() { - for await (const [msg] of replySock) { - debug(`Received message`, {msg: msg.toString()}); - const req = peerRequestFromMsg(msg); - requestStream.emit('request', req); + constructor(addr: PeerAddress) { + const addrStr = `tcp://${addr.addr}:${addr.port}`; + this.sock.connect(addrStr); + debug(`Request socket connecting to ${addrStr}`); } -} -function peerRequestFromMsg(msg: Message): PeerRequest | null { - let req: PeerRequest | null = null; - try { - const obj = JSON.parse(msg.toString()); - req = {...obj}; - } catch(e) { - debug('error receiving command', e); + async request(method: PeerMethods): Promise { + const req: PeerRequest = { + method + }; + await this.sock.send(JSON.stringify(req)); + // Wait for a response. + // TODO: Timeout + // TODO: Retry + // this.sock.receiveTimeout = ... + const [res] = await this.sock.receive(); + return res; } - return req; } export class ResponseSocket { @@ -53,30 +50,54 @@ export class ResponseSocket { } } -export function registerRequestHandler(handler: RequestHandler) { - requestStream.on('request', (req) => { - const res = new ResponseSocket(replySock); - handler(req, res); - }); +function peerRequestFromMsg(msg: Message): PeerRequest | null { + let req: PeerRequest | null = null; + try { + const obj = JSON.parse(msg.toString()); + req = {...obj}; + } catch (e) { + debug('error receiving command', e); + } + return req; } -export class RequestSocket { - sock = new Request(); - constructor(host: string, port: number) { - const addrStr = `tcp://${host}:${port}`; - this.sock.connect(addrStr); - debug(`Request socket connecting to ${addrStr}`); +export class RequestReply { + rhizomeNode: RhizomeNode; + replySock = new Reply(); + requestStream = new EventEmitter(); + requestBindAddrStr: string; + + constructor(rhizomeNode: RhizomeNode) { + this.rhizomeNode = rhizomeNode; + const {requestBindAddr, requestBindPort} = this.rhizomeNode.config; + this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`; } - async request(method: PeerMethods): Promise { - const req: PeerRequest = { - method - }; - await this.sock.send(JSON.stringify(req)); - // Wait for a response. - // TODO: Timeout - // TODO: Retry - // this.sock.receiveTimeout = ... - const [res] = await this.sock.receive(); - return res; + + // Listen for incoming requests + async start() { + + await this.replySock.bind(this.requestBindAddrStr); + debug(`Reply socket bound to ${this.requestBindAddrStr}`); + + for await (const [msg] of this.replySock) { + debug(`Received message`, {msg: msg.toString()}); + const req = peerRequestFromMsg(msg); + this.requestStream.emit('request', req); + } + } + + // Add a top level handler for incoming requests. + // Each handler will get a copy of every message. + registerRequestHandler(handler: RequestHandler) { + this.requestStream.on('request', (req) => { + const res = new ResponseSocket(this.replySock); + handler(req, res); + }); + } + + async stop() { + await this.replySock.unbind(this.requestBindAddrStr); + this.replySock.close(); + this.replySock = new Reply(); } } diff --git a/src/typed-collection.ts b/src/typed-collection.ts index fec9742..28c188c 100644 --- a/src/typed-collection.ts +++ b/src/typed-collection.ts @@ -5,6 +5,7 @@ export class TypedCollection extends Collection { put(id: string | undefined, properties: T): Entity { return super.put(id, properties); } + get(id: string): Entity | undefined { return super.get(id); } diff --git a/src/types.ts b/src/types.ts index e72b124..816347e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -46,19 +46,27 @@ export type Properties = {[key: string]: PropertyTypes}; export class PeerAddress { addr: string; port: number; + constructor(addr: string, port: number) { this.addr = addr; this.port = port; } + static fromString(addrString: string): PeerAddress { const [addr, port] = addrString.trim().split(':'); return new PeerAddress(addr, parseInt(port)); } + toAddrString() { return `${this.addr}:${this.port}`; } + toJSON() { return this.toAddrString(); } + + isEqual(other: PeerAddress) { + return this.addr === other.addr && this.port === other.port; + } };