keeping track of peers from whom deltas are received

This commit is contained in:
Ladd Hoffman 2024-12-22 14:00:51 -06:00
parent 518bc4eb44
commit e24be625d1
13 changed files with 3482 additions and 87 deletions

13
__tests__/peer-address.ts Normal file
View File

@ -0,0 +1,13 @@
import {PeerAddress} from '../src/types';
describe('PeerAddress', () => {
it('toString()', () => {
const addr = new PeerAddress('localhost', 1000);
expect(addr.toString()).toBe("localhost:1000");
});
it('fromString()', () => {
const addr = PeerAddress.fromString("localhost:1000");
expect(addr.addr).toBe("localhost");
expect(addr.port).toBe(1000);
});
});

View File

@ -4,4 +4,9 @@ import tseslint from 'typescript-eslint';
export default tseslint.config( export default tseslint.config(
eslint.configs.recommended, eslint.configs.recommended,
tseslint.configs.recommended, tseslint.configs.recommended,
{
ignores: [
"dist/",
],
}
); );

3339
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,11 @@
"scripts": { "scripts": {
"start": "node --experimental-strip-types --experimental-transform-types src/main.ts", "start": "node --experimental-strip-types --experimental-transform-types src/main.ts",
"lint": "eslint", "lint": "eslint",
"test": "echo \"Error: no test specified\" && exit 1" "test": "jest"
},
"jest": {
"testEnvironment": "node",
"preset": "ts-jest"
}, },
"author": "", "author": "",
"license": "Unlicense", "license": "Unlicense",
@ -20,9 +24,12 @@
"devDependencies": { "devDependencies": {
"@eslint/js": "^9.17.0", "@eslint/js": "^9.17.0",
"@types/express": "^5.0.0", "@types/express": "^5.0.0",
"@types/jest": "^29.5.14",
"@types/node": "^22.10.2", "@types/node": "^22.10.2",
"eslint": "^9.17.0", "eslint": "^9.17.0",
"eslint-config-airbnb-base-typescript": "^1.1.0", "eslint-config-airbnb-base-typescript": "^1.1.0",
"jest": "^29.7.0",
"ts-jest": "^29.2.5",
"typescript": "^5.7.2", "typescript": "^5.7.2",
"typescript-eslint": "^8.18.0" "typescript-eslint": "^8.18.0"
} }

View File

@ -2,9 +2,8 @@
// It should enable operations like removing a property removes the value from the entities in the collection // It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join // It could then be further extended with e.g. table semantics like filter, sort, join
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import { publishDelta, subscribeDeltas } from "./deltas"; import { deltasAccepted, publishDelta, subscribeDeltas } from "./deltas";
import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer"; import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer";
import { Delta } from "./types"; import { Delta } from "./types";
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
@ -45,10 +44,8 @@ export class Collection {
entities = new Map<string, Entity>(); entities = new Map<string, Entity>();
eventStream = new EventEmitter(); eventStream = new EventEmitter();
constructor() { constructor() {
console.log('COLLECTION SUBSCRIBING TO DELTA STREAM');
subscribeDeltas((delta: Delta) => { subscribeDeltas((delta: Delta) => {
// TODO: Make sure this is the kind of delta we're looking for // TODO: Make sure this is the kind of delta we're looking for
console.log('COLLECTION RECEIVED DELTA');
this.applyDelta(delta); this.applyDelta(delta);
}); });
this.eventStream.on('create', (entity: Entity) => { this.eventStream.on('create', (entity: Entity) => {
@ -69,6 +66,7 @@ export class Collection {
eventType = 'create'; eventType = 'create';
} }
const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
console.log('deltaBulider -->', deltaBulider.delta);
if (!properties) { if (!properties) {
// Let's interpret this as entity deletion // Let's interpret this as entity deletion
@ -157,6 +155,7 @@ export class Collection {
const deltas: Delta[] = []; const deltas: Delta[] = [];
const entity = this.updateEntity(entityId, properties, true, deltas); const entity = this.updateEntity(entityId, properties, true, deltas);
deltas.forEach(async (delta: Delta) => { deltas.forEach(async (delta: Delta) => {
deltasAccepted.push(delta);
await publishDelta(delta); await publishDelta(delta);
}); });
return entity; return entity;
@ -165,6 +164,7 @@ export class Collection {
const deltas: Delta[] = []; const deltas: Delta[] = [];
this.updateEntity(entityId, undefined, true, deltas); this.updateEntity(entityId, undefined, true, deltas);
deltas.forEach(async (delta: Delta) => { deltas.forEach(async (delta: Delta) => {
deltasAccepted.push(delta);
await publishDelta(delta); await publishDelta(delta);
}); });
} }

View File

@ -1,17 +1,19 @@
import {randomUUID} from "crypto";
import {PeerAddress} from "./types";
export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data'; export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
export const CREATOR = process.env.USER!; export const CREATOR = process.env.USER!;
export const HOST = process.env.HOST!; export const HOST_ID = process.env.RHIZOME_PEER_ID || randomUUID();
export const ADDRESS = process.env.ADDRESS ?? '127.0.0.1'; export const ADDRESS = process.env.RHIZOME_ADDRESS ?? '127.0.0.1';
export const REQUEST_BIND_PORT = parseInt(process.env.REQUEST_BIND_PORT || '4000'); export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
export const PUBLISH_BIND_PORT = parseInt(process.env.PUBLISH_BIND_PORT || '4001'); export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS || '127.0.0.1';
export const REQUEST_BIND_ADDR = process.env.ADDRESS || '127.0.0.1'; export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR || '127.0.0.1';
export const PUBLISH_BIND_ADDR = process.env.ADDRESS || '127.0.0.1'; export const PUBLISH_BIND_PORT = parseInt(process.env.RHIZOME_PUBLISH_BIND_PORT || '4001');
export const HTTP_API_PORT = parseInt(process.env.HTTP_API_PORT || '3000'); export const PUBLISH_BIND_ADDR = process.env.RHIZOME_PUBLISH_BIND_ADDR || ADDRESS || '127.0.0.1';
export const HTTP_API_ADDR = process.env.ADDRESS || '127.0.0.1'; export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLISH_BIND_ADDR || '127.0.0.1';
export const ENABLE_HTTP_API = process.env.ENABLE_HTTP_API === 'true'; export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
export const SEED_PEERS = (process.env.SEED_PEERS || '').split(',') export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || '127.0.0.1';
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
.filter(x => !!x) .filter(x => !!x)
.map((peer: string) => { .map((peer: string) => PeerAddress.fromString(peer));
const [addr, port] = peer.trim().split(':');
return {addr, port: parseInt(port)};
});

View File

@ -1,6 +1,8 @@
import EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import { Delta, Decision } from './types'; import {REQUEST_BIND_HOST, REQUEST_BIND_PORT} from './config';
import {publishSock, subscribeSock} from './pub-sub'; import {publishSock, subscribeSock} from './pub-sub';
import {Decision, Delta, PeerAddress} from './types';
import {myRequestAddr} from './peers';
export const deltaStream = new EventEmitter(); export const deltaStream = new EventEmitter();
@ -67,7 +69,7 @@ export function subscribeDeltas(fn: (delta: Delta) => void) {
export async function publishDelta(delta: Delta) { export async function publishDelta(delta: Delta) {
console.log(`Publishing delta: ${JSON.stringify(delta)}`); console.log(`Publishing delta: ${JSON.stringify(delta)}`);
await publishSock.send(["deltas", serializeDelta(delta)]) await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]);
} }
function serializeDelta(delta: Delta) { function serializeDelta(delta: Delta) {
@ -79,11 +81,12 @@ function deserializeDelta(input: string) {
} }
export async function runDeltas() { export async function runDeltas() {
for await (const [topic, msg] of subscribeSock) { for await (const [topic, sender, msg] of subscribeSock) {
if (topic.toString() !== "deltas") { if (topic.toString() !== "deltas") {
continue; continue;
} }
const delta = deserializeDelta(msg.toString()); const delta = deserializeDelta(msg.toString());
delta.receivedFrom = PeerAddress.fromString(sender.toString());
console.log(`Received delta: ${JSON.stringify(delta)}`); console.log(`Received delta: ${JSON.stringify(delta)}`);
ingestDelta(delta); ingestDelta(delta);
} }

View File

@ -1,14 +1,14 @@
// We can start to use deltas to express relational data in a given context // We can start to use deltas to express relational data in a given context
import express from "express"; import express from "express";
import { bindPublish, } from "./pub-sub"; import {Collection} from "./collection-layer";
import {HTTP_API_ENABLE, HTTP_API_ADDR, HTTP_API_PORT, SEED_PEERS} from "./config";
import {deltasAccepted, deltasProposed, runDeltas} from "./deltas"; import {deltasAccepted, deltasProposed, runDeltas} from "./deltas";
import {Entity} from "./object-layer"; import {Entity} from "./object-layer";
import { Collection } from "./collection-layer"; import {askAllPeersForDeltas, peers, subscribeToSeeds} from "./peers";
import {bindPublish, } from "./pub-sub";
import {bindReply, runRequestHandlers} from "./request-reply"; import {bindReply, runRequestHandlers} from "./request-reply";
import { askAllPeersForDeltas, subscribeToSeeds } from "./peers"; import {Delta, PeerAddress} from "./types";
import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config";
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.
// The data is whatever shape we define it to be in a given context. // The data is whatever shape we define it to be in a given context.
@ -16,7 +16,6 @@ import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config";
// e.g. entities and their properties. // e.g. entities and their properties.
// This implies at least one layer on top of the underlying primitive deltas. // This implies at least one layer on top of the underlying primitive deltas.
type UserProperties = { type UserProperties = {
id?: string; id?: string;
name: string; name: string;
@ -48,7 +47,9 @@ class Users {
} }
(async () => { (async () => {
console.log('1');
const users = new Users(); const users = new Users();
console.log('2');
const app = express() const app = express()
app.get("/ids", (req: express.Request, res: express.Response) => { app.get("/ids", (req: express.Request, res: express.Response) => {
@ -65,14 +66,39 @@ class Users {
res.json(deltasAccepted.length); res.json(deltasAccepted.length);
}); });
if (ENABLE_HTTP_API) { app.get("/peers", (req: express.Request, res: express.Response) => {
res.json(peers.map(({reqAddr, publishAddr}) => {
const isSeedPeer = !!SEED_PEERS.find(({addr, port}) =>
addr === reqAddr.addr && port === reqAddr.port);
const deltasAcceptedCount = deltasAccepted
.filter((delta: Delta) => {
return delta.receivedFrom?.addr == reqAddr.addr &&
delta.receivedFrom?.port == reqAddr.port;
})
.length;
const peerInfo = {
reqAddr: reqAddr.toAddrString(),
publishAddr: publishAddr?.toAddrString(),
isSeedPeer,
deltaCount: {
accepted: deltasAcceptedCount
}
};
return peerInfo;
}));
});
if (HTTP_API_ENABLE) {
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
}); });
} }
console.log('3');
await bindPublish(); await bindPublish();
console.log('3a');
await bindReply(); await bindReply();
console.log('3b');
runDeltas(); runDeltas();
runRequestHandlers(); runRequestHandlers();
await new Promise((resolve) => setTimeout(resolve, 500)); await new Promise((resolve) => setTimeout(resolve, 500));
@ -81,11 +107,7 @@ class Users {
askAllPeersForDeltas(); askAllPeersForDeltas();
await new Promise((resolve) => setTimeout(resolve, 1000)); await new Promise((resolve) => setTimeout(resolve, 1000));
setInterval(() => { console.log('4');
console.log('deltasProposed count', deltasProposed.length,
'deltasAccepted count', deltasAccepted.length);
}, 5000)
const taliesin = users.upsert({ const taliesin = users.upsert({
// id: 'taliesin-1', // id: 'taliesin-1',
name: 'Taliesin', name: 'Taliesin',

View File

@ -1,6 +1,6 @@
import express from "express"; import express from "express";
import { runDeltas } from "./deltas"; import { runDeltas } from "./deltas";
import {ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT} from "./config"; import {HTTP_API_ENABLE, HTTP_API_ADDR, HTTP_API_PORT} from "./config";
const app = express() const app = express()
@ -8,7 +8,7 @@ app.get("/", (req: express.Request, res: express.Response) => {
res.json({ message: "Welcome to the Express + TypeScript Server!" }); res.json({ message: "Welcome to the Express + TypeScript Server!" });
}); });
if (ENABLE_HTTP_API) { if (HTTP_API_ENABLE) {
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
}); });

View File

@ -7,7 +7,7 @@
// - As typescript interfaces? // - As typescript interfaces?
// - As typescript classes? // - As typescript classes?
import { CREATOR, HOST } from "./config"; import { CREATOR, HOST_ID } from "./config";
import { Delta, PropertyTypes } from "./types"; import { Delta, PropertyTypes } from "./types";
export type EntityProperties = { export type EntityProperties = {
@ -31,7 +31,7 @@ export class EntityPropertiesDeltaBuilder {
constructor(entityId: string) { constructor(entityId: string) {
this.delta = { this.delta = {
creator: CREATOR, creator: CREATOR,
host: HOST, host: HOST_ID,
pointers: [{ pointers: [{
localContext: 'id', localContext: 'id',
target: entityId, target: entityId,

View File

@ -1,22 +1,23 @@
import { PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT } from "./config"; import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config";
import { registerRequestHandler, PeerRequest, ResponseSocket } from "./request-reply";
import { RequestSocket, } from "./request-reply";
import { SEED_PEERS } from "./config";
import {connectSubscribe} from "./pub-sub";
import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas"; import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas";
import {Delta} from "./types"; import {connectSubscribe} from "./pub-sub";
import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply";
import {Delta, PeerAddress} from "./types";
export enum PeerMethods { export enum PeerMethods {
GetPublishAddress, GetPublishAddress,
AskForDeltas AskForDeltas
} }
export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_PORT);
export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT);
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
console.log('inspecting peer request'); console.log('inspecting peer request');
switch (req.method) { switch (req.method) {
case PeerMethods.GetPublishAddress: { case PeerMethods.GetPublishAddress: {
console.log('it\'s a request for our publish address'); console.log('it\'s a request for our publish address');
await res.send(publishAddr); await res.send(myPublishAddr.toAddrString());
break; break;
} }
case PeerMethods.AskForDeltas: { case PeerMethods.AskForDeltas: {
@ -29,29 +30,20 @@ registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
} }
}); });
export type PeerAddress = {
addr: string,
port: number
};
const publishAddr: PeerAddress = {
addr: PUBLISH_BIND_ADDR,
port: PUBLISH_BIND_PORT
};
class Peer { class Peer {
reqAddr: PeerAddress;
reqSock: RequestSocket; reqSock: RequestSocket;
publishAddr: PeerAddress | undefined; publishAddr: PeerAddress | undefined;
constructor(addr: string, port: number) { constructor(addr: string, port: number) {
this.reqAddr = new PeerAddress(addr, port);
this.reqSock = new RequestSocket(addr, port); this.reqSock = new RequestSocket(addr, port);
} }
async subscribe() { async subscribe() {
if (!this.publishAddr) { if (!this.publishAddr) {
const res = await this.reqSock.request(PeerMethods.GetPublishAddress); const res = await this.reqSock.request(PeerMethods.GetPublishAddress);
// TODO: input validation // TODO: input validation
const {addr, port} = JSON.parse(res.toString()); this.publishAddr = PeerAddress.fromString(res.toString());
this.publishAddr = {addr, port}; connectSubscribe(this.publishAddr!);
connectSubscribe(addr, port);
} }
} }
async askForDeltas(): Promise<Delta[]> { async askForDeltas(): Promise<Delta[]> {
@ -67,7 +59,7 @@ class Peer {
} }
} }
const peers: Peer[] = []; export const peers: Peer[] = [];
function newPeer(addr: string, port: number) { function newPeer(addr: string, port: number) {
const peer = new Peer(addr, port); const peer = new Peer(addr, port);
@ -90,6 +82,7 @@ export async function askAllPeersForDeltas() {
const deltas = await peer.askForDeltas(); const deltas = await peer.askForDeltas();
console.log('received deltas:', deltas); console.log('received deltas:', deltas);
for (const delta of deltas) { for (const delta of deltas) {
delta.receivedFrom = peer.reqAddr;
receiveDelta(delta); receiveDelta(delta);
} }
console.log('deltasProposed count', deltasProposed.length); console.log('deltasProposed count', deltasProposed.length);

View File

@ -1,18 +1,21 @@
import {Publisher, Subscriber} from 'zeromq'; import {Publisher, Subscriber} from 'zeromq';
import { PUBLISH_BIND_PORT, PUBLISH_BIND_ADDR} from './config'; import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config';
import {PeerAddress} from './types';
export const publishSock = new Publisher(); export const publishSock = new Publisher();
export const subscribeSock = new Subscriber(); export const subscribeSock = new Subscriber();
export async function bindPublish() { export async function bindPublish() {
const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`; const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`;
console.log('addrStr:', addrStr);
await publishSock.bind(addrStr); await publishSock.bind(addrStr);
console.log(`Publishing socket bound to ${addrStr}`); console.log(`Publishing socket bound to ${addrStr}`);
} }
export function connectSubscribe(host: string, port: number) { export function connectSubscribe(publishAddr: PeerAddress) {
// TODO: peer discovery // TODO: peer discovery
const addrStr = `tcp://${host}:${port}`; const addrStr = `tcp://${publishAddr.toAddrString()}`;
console.log('connectSubscribe', {addrStr});
subscribeSock.connect(addrStr); subscribeSock.connect(addrStr);
subscribeSock.subscribe("deltas"); subscribeSock.subscribe("deltas");
console.log(`Subscribing to ${addrStr}`); console.log(`Subscribing to ${addrStr}`);

View File

@ -8,6 +8,7 @@ export type Delta = {
creator: string, creator: string,
host: string, host: string,
pointers: Pointer[], pointers: Pointer[],
receivedFrom?: PeerAddress,
} }
export type DeltaContext = Delta & { export type DeltaContext = Delta & {
@ -37,3 +38,24 @@ export type FilterGenerator = () => FilterExpr;
export type PropertyTypes = string | number | undefined; export type PropertyTypes = string | number | undefined;
export class PeerAddress {
addr: string;
port: number;
constructor(addr: string, port: number) {
this.addr = addr;
this.port = port;
}
static fromString(addrString: string): PeerAddress {
const [addr, port] = addrString.trim().split(':');
return new PeerAddress(addr, parseInt(port));
}
toAddrString() {
console.log('toAddrStr...', {addr: this.addr, port: this.port});
return `${this.addr}:${this.port}`;
}
toJSON() {
console.log('toAddrStr...', {addr: this.addr, port: this.port});
return `${this.addr}:${this.port}`;
}
};