added gossipsub in parallel with zeromq. converted project to ESM for compatibility with gossipsub

This commit is contained in:
Ladd Hoffman 2024-12-31 11:35:09 -06:00
parent 940c3212be
commit 44e6cfa13c
29 changed files with 1596 additions and 112 deletions

View File

@ -1,5 +1,5 @@
import {Delta, DeltaFilter} from '../src/delta'; import {Delta, DeltaFilter} from '../src/delta.js';
import {Lossless} from '../src/lossless'; import {Lossless} from '../src/lossless.js';
describe('Lossless', () => { describe('Lossless', () => {
it('creates a lossless view of keanu as neo in the matrix', () => { it('creates a lossless view of keanu as neo in the matrix', () => {

View File

@ -1,6 +1,6 @@
import {Delta, PointerTarget} from "../src/delta"; import {Delta, PointerTarget} from "../src/delta.js";
import {Lossless, LosslessViewMany} from "../src/lossless"; import {Lossless, LosslessViewMany} from "../src/lossless.js";
import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta, ResolvedViewMany} from "../src/lossy"; import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js";
describe('Lossy', () => { describe('Lossy', () => {
describe('se a provided function to resolve entity views', () => { describe('se a provided function to resolve entity views', () => {

View File

@ -1,4 +1,4 @@
import {PeerAddress} from '../src/types'; import {PeerAddress} from '../src/types.js';
describe('PeerAddress', () => { describe('PeerAddress', () => {
it('toString()', () => { it('toString()', () => {

View File

@ -1,4 +1,4 @@
import {App} from "../../util/app"; import {App} from "../../util/app.js";
describe('Run', () => { describe('Run', () => {
let app: App; let app: App;

View File

@ -1,5 +1,5 @@
import Debug from 'debug'; import Debug from 'debug';
import {App} from '../../util/app'; import {App} from '../../util/app.js';
const debug = Debug('test:two'); const debug = Debug('test:two');
describe('Run', () => { describe('Run', () => {
@ -13,6 +13,8 @@ describe('Run', () => {
apps[1] = new App({ apps[1] = new App({
httpEnable: true, httpEnable: true,
peerId: 'app1', peerId: 'app1',
// Make the apps use the same pubsub topic so they can talk to each other
pubSubTopic: apps[0].config.pubSubTopic,
}); });
apps[0].config.seedPeers.push(apps[1].myRequestAddr); apps[0].config.seedPeers.push(apps[1].myRequestAddr);
apps[1].config.seedPeers.push(apps[0].myRequestAddr); apps[1].config.seedPeers.push(apps[0].myRequestAddr);

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {Collection} from "../src/collection"; import {Collection} from "../src/collection.js";
import {Entity} from "../src/entity"; import {Entity} from "../src/entity.js";
import {RhizomeNode} from "../src/node"; import {RhizomeNode} from "../src/node.js";
const debug = Debug('example-app'); const debug = Debug('example-app');
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.

1329
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -2,11 +2,12 @@
"name": "rhizome-node", "name": "rhizome-node",
"version": "0.1.0", "version": "0.1.0",
"description": "Rhizomatic database engine node", "description": "Rhizomatic database engine node",
"type": "module",
"scripts": { "scripts": {
"build": "tsc", "build": "tsc",
"build:watch": "tsc --watch", "build:watch": "tsc --watch",
"lint": "eslint", "lint": "eslint",
"test": "jest", "test": "node --experimental-vm-modules node_modules/.bin/jest",
"coverage": "./scripts/coverage.sh", "coverage": "./scripts/coverage.sh",
"example-app": "node dist/examples/app.js" "example-app": "node dist/examples/app.js"
}, },
@ -18,15 +19,38 @@
], ],
"testMatch": [ "testMatch": [
"**/__tests__/**/*" "**/__tests__/**/*"
] ],
"verbose": true,
"transform": {
"^.+\\.ts?$": [
"ts-jest",
{
"useESM": true
}
]
},
"extensionsToTreatAsEsm": [
".ts"
],
"moduleNameMapper": {
"^(\\.{1,2}/.*)\\.js$": "$1"
}
}, },
"author": "Taliesin (Ladd) <ladd@dgov.io>", "author": "Taliesin (Ladd) <ladd@dgov.io>",
"license": "Unlicense", "license": "Unlicense",
"dependencies": { "dependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/identify": "^3.0.14",
"@libp2p/mdns": "^11.0.16",
"@libp2p/ping": "^2.0.14",
"@libp2p/tcp": "^10.0.14",
"debug": "^4.4.0", "debug": "^4.4.0",
"express": "^4.21.2", "express": "^4.21.2",
"json-logic-js": "^2.0.5", "json-logic-js": "^2.0.5",
"level": "^9.0.0", "level": "^9.0.0",
"libp2p": "^2.4.2",
"microtime": "^3.1.1", "microtime": "^3.1.1",
"object-hash": "^3.0.0", "object-hash": "^3.0.0",
"showdown": "^2.1.0", "showdown": "^2.1.0",
@ -34,20 +58,21 @@
"zeromq": "^6.1.2" "zeromq": "^6.1.2"
}, },
"devDependencies": { "devDependencies": {
"@eslint/js": "^9.17.0",
"@types/bluebird": "^3.5.42", "@types/bluebird": "^3.5.42",
"@types/debug": "^4.1.12", "@types/debug": "^4.1.12",
"@types/json-logic-js": "^2.0.8",
"@types/microtime": "^2.1.2",
"@types/object-hash": "^3.0.6",
"@eslint/js": "^9.17.0",
"@types/express": "^5.0.0", "@types/express": "^5.0.0",
"@types/jest": "^29.5.14", "@types/jest": "^29.5.14",
"@types/json-logic-js": "^2.0.8",
"@types/microtime": "^2.1.2",
"@types/node": "^22.10.2", "@types/node": "^22.10.2",
"@types/object-hash": "^3.0.6",
"@types/showdown": "^2.0.6", "@types/showdown": "^2.0.6",
"eslint": "^9.17.0", "eslint": "^9.17.0",
"eslint-config-airbnb-base-typescript": "^1.1.0", "eslint-config-airbnb-base-typescript": "^1.1.0",
"jest": "^29.7.0", "jest": "^29.7.0",
"ts-jest": "^29.2.5", "ts-jest": "^29.2.5",
"tsc-alias": "^1.8.10",
"typescript": "^5.7.2", "typescript": "^5.7.2",
"typescript-eslint": "^8.18.0" "typescript-eslint": "^8.18.0"
} }

View File

@ -6,11 +6,11 @@
import Debug from 'debug'; import Debug from 'debug';
import {randomUUID} from "node:crypto"; import {randomUUID} from "node:crypto";
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import {Delta, DeltaFilter} from "./delta"; import {Delta, DeltaFilter} from "./delta.js";
import {Entity, EntityProperties} from "./entity"; import {Entity, EntityProperties} from "./entity.js";
import {Lossy, ResolvedViewOne, Resolver} from "./lossy"; import {Lossy, ResolvedViewOne, Resolver} from "./lossy.js";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node.js";
import {DomainEntityID} from "./types"; import {DomainEntityID} from "./types.js";
const debug = Debug('collection'); const debug = Debug('collection');
export class Collection { export class Collection {
@ -214,9 +214,10 @@ export class Collection {
resolver?: Resolver, resolver?: Resolver,
deltaFilter?: DeltaFilter deltaFilter?: DeltaFilter
): T | undefined { ): T | undefined {
if (!this.rhizomeNode) return undefined; if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
if (!this.lossy) throw new Error('lossy view not initialized');
const res = this.lossy?.resolve(resolver, [id], deltaFilter) || {}; const res = this.lossy.resolve(resolver, [id], deltaFilter) || {};
return res[id] as T; return res[id] as T;
} }

View File

@ -1,5 +1,5 @@
import {randomUUID} from "crypto"; import {randomUUID} from "crypto";
import {PeerAddress} from "./types"; import {PeerAddress} from "./types.js";
// _HOST refers to the address from an external perspective // _HOST refers to the address from an external perspective
// _ADDR refers to the interface address from the service's perspective // _ADDR refers to the interface address from the service's perspective
@ -20,3 +20,5 @@ export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',') export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
.filter(x => !!x) .filter(x => !!x)
.map((peer: string) => PeerAddress.fromString(peer)); .map((peer: string) => PeerAddress.fromString(peer));
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;

View File

@ -1,6 +1,6 @@
import {randomUUID} from "crypto"; import {randomUUID} from "crypto";
import microtime from 'microtime'; import microtime from 'microtime';
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types"; import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js";
export type DeltaID = string; export type DeltaID = string;

View File

@ -1,8 +1,8 @@
import Debug from 'debug'; import Debug from 'debug';
import EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import objectHash from 'object-hash'; import objectHash from 'object-hash';
import {Delta, DeltaNetworkImage} from './delta'; import {Delta, DeltaNetworkImage} from './delta.js';
import {RhizomeNode} from './node'; import {RhizomeNode} from './node.js';
const debug = Debug('deltas'); const debug = Debug('deltas');
enum Decision { enum Decision {
@ -87,7 +87,10 @@ export class DeltaStream {
async publishDelta(delta: Delta) { async publishDelta(delta: Delta) {
debug(`Publishing delta: ${JSON.stringify(delta)}`); debug(`Publishing delta: ${JSON.stringify(delta)}`);
await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta)); await this.rhizomeNode.pubSub.publish(
this.rhizomeNode.config.pubSubTopic,
this.serializeDelta(delta)
);
} }
serializeDelta(delta: Delta): string { serializeDelta(delta: Delta): string {

View File

@ -7,8 +7,8 @@
// - As typescript interfaces? // - As typescript interfaces?
// - As typescript classes? // - As typescript classes?
import {Collection} from "./collection"; import {Collection} from "./collection.js";
import {PropertyTypes} from "./types"; import {PropertyTypes} from "./types.js";
export type EntityProperties = { export type EntityProperties = {
[key: string]: PropertyTypes; [key: string]: PropertyTypes;

View File

@ -1,5 +1,5 @@
import { add_operation, apply } from 'json-logic-js'; import { add_operation, apply } from 'json-logic-js';
import { Delta } from '../delta'; import { Delta } from '../delta.js';
type DeltaContext = Delta & { type DeltaContext = Delta & {
creatorAddress: string; creatorAddress: string;

View File

@ -1,4 +1,4 @@
import { FilterExpr } from "../types"; import { FilterExpr } from "../types.js";
// import { map } from 'radash'; // import { map } from 'radash';
// A creator as seen by a host // A creator as seen by a host

View File

@ -30,24 +30,25 @@ export class HttpApi {
// Get the list of peers seen by this node (including itself) // Get the list of peers seen by this node (including itself)
this.router.get("/peers", (_req: express.Request, res: express.Response) => { this.router.get("/peers", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.peers.peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { res.json(this.rhizomeNode.peers.peers.map(
const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted ({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
.filter((delta: Delta) => { const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted
return delta.receivedFrom?.addr == reqAddr.addr && .filter((delta: Delta) => {
delta.receivedFrom?.port == reqAddr.port; return delta.receivedFrom?.addr == reqAddr.addr &&
}) delta.receivedFrom?.port == reqAddr.port;
.length; })
const peerInfo = { .length;
reqAddr: reqAddr.toAddrString(), const peerInfo = {
publishAddr: publishAddr?.toAddrString(), reqAddr: reqAddr.toAddrString(),
isSelf, publishAddr: publishAddr?.toAddrString(),
isSeedPeer, isSelf,
deltaCount: { isSeedPeer,
accepted: deltasAcceptedCount deltaCount: {
} accepted: deltasAcceptedCount
}; }
return peerInfo; };
})); return peerInfo;
}));
}); });
// Get the number of peers seen by this node (including itself) // Get the number of peers seen by this node (including itself)

View File

@ -1,5 +1,5 @@
import express, {Router} from "express"; import express, {Router} from "express";
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files"; import {htmlDocFromMarkdown, MDFiles} from "../util/md-files.js";
export class HttpHtml { export class HttpHtml {
router = Router(); router = Router();

View File

@ -1,9 +1,9 @@
import Debug from "debug"; import Debug from "debug";
import express from "express"; import express from "express";
import {Server} from "http"; import {Server} from "http";
import {RhizomeNode} from "../node"; import {RhizomeNode} from "../node.js";
import {HttpApi} from "./api"; import {HttpApi} from "./api.js";
import {HttpHtml} from "./html"; import {HttpHtml} from "./html.js";
const debug = Debug('http-api'); const debug = Debug('http-api');
export class HttpServer { export class HttpServer {

View File

@ -3,9 +3,9 @@
import Debug from 'debug'; import Debug from 'debug';
import EventEmitter from 'events'; import EventEmitter from 'events';
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta'; import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js';
import {Transactions} from './transactions'; import {Transactions} from './transactions.js';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types"; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js";
const debug = Debug('lossless'); const debug = Debug('lossless');
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
@ -141,6 +141,7 @@ export class Lossless {
if (delta.transactionId) { if (delta.transactionId) {
if (!this.transactions.isComplete(delta.transactionId)) { if (!this.transactions.isComplete(delta.transactionId)) {
// TODO: Test this condition // TODO: Test this condition
debug(`excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`);
continue; continue;
} }
} }

View File

@ -6,9 +6,9 @@
// Fields in the output can be described as transformations // Fields in the output can be described as transformations
import Debug from 'debug'; import Debug from 'debug';
import {DeltaFilter} from "./delta"; import {DeltaFilter} from "./delta.js";
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless"; import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js";
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types"; import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js";
const debug = Debug('lossy'); const debug = Debug('lossy');
type TimestampedProperty = { type TimestampedProperty = {
@ -77,14 +77,14 @@ export function lastValueFromLosslessViewOne(
function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
const resolved: ResolvedViewMany = {}; const resolved: ResolvedViewMany = {};
// debug('default resolver, lossless view', JSON.stringify(losslessView)); debug('default resolver, lossless view', JSON.stringify(losslessView));
for (const [id, ent] of Object.entries(losslessView)) { for (const [id, ent] of Object.entries(losslessView)) {
resolved[id] = {id, properties: {}}; resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) { for (const key of Object.keys(ent.properties)) {
const {value} = lastValueFromLosslessViewOne(ent, key) || {}; const {value} = lastValueFromLosslessViewOne(ent, key) || {};
// debug(`[ ${key} ] = ${value}`); debug(`[ ${key} ] = ${value}`);
resolved[id].properties[key] = value; resolved[id].properties[key] = value;
} }
} }

View File

@ -1,12 +1,12 @@
import Debug from 'debug'; import Debug from 'debug';
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config'; import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB_TOPIC, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config.js';
import {DeltaStream} from './deltas'; import {DeltaStream} from './deltas.js';
import {HttpServer} from './http'; import {HttpServer} from './http/index.js';
import {Lossless} from './lossless'; import {Lossless} from './lossless.js';
import {Peers} from './peers'; import {Peers} from './peers.js';
import {PubSub} from './pub-sub'; import {PubSub} from './pub-sub.js';
import {RequestReply} from './request-reply'; import {RequestReply} from './request-reply.js';
import {PeerAddress} from './types'; import {PeerAddress} from './types.js';
const debug = Debug('rhizome-node'); const debug = Debug('rhizome-node');
export type RhizomeNodeConfig = { export type RhizomeNodeConfig = {
@ -22,6 +22,7 @@ export type RhizomeNodeConfig = {
seedPeers: PeerAddress[]; seedPeers: PeerAddress[];
peerId: string; peerId: string;
creator: string; // TODO each host should be able to support multiple users creator: string; // TODO each host should be able to support multiple users
pubSubTopic: string;
}; };
// So that we can run more than one instance in the same process (for testing) // So that we can run more than one instance in the same process (for testing)
@ -50,6 +51,7 @@ export class RhizomeNode {
seedPeers: SEED_PEERS, seedPeers: SEED_PEERS,
peerId: PEER_ID, peerId: PEER_ID,
creator: CREATOR, creator: CREATOR,
pubSubTopic: PUB_SUB_TOPIC,
...config ...config
}; };
debug('config', this.config); debug('config', this.config);
@ -74,7 +76,7 @@ export class RhizomeNode {
this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta)); this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta));
// Start ZeroMQ publish and reply sockets // Start ZeroMQ publish and reply sockets
this.pubSub.start(); await this.pubSub.start();
this.requestReply.start(); this.requestReply.start();
// Start HTTP server // Start HTTP server
@ -82,6 +84,9 @@ export class RhizomeNode {
this.httpServer.start(); this.httpServer.start();
} }
// Start libp2p subscription
this.peers.start();
// Wait a short time for sockets to initialize // Wait a short time for sockets to initialize
await new Promise((resolve) => setTimeout(resolve, 500)); await new Promise((resolve) => setTimeout(resolve, 500));

View File

@ -1,11 +1,11 @@
import Debug from 'debug'; import Debug from 'debug';
import {Message} from 'zeromq'; import {Message} from 'zeromq';
import {SEED_PEERS} from "./config"; import {SEED_PEERS} from "./config.js";
import {Delta} from "./delta"; import {Delta} from "./delta.js";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node.js";
import {Subscription} from './pub-sub'; import {Subscription} from './pub-sub.js';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply"; import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
import {PeerAddress} from "./types"; import {PeerAddress} from "./types.js";
const debug = Debug('peers'); const debug = Debug('peers');
export enum RequestMethods { export enum RequestMethods {
@ -46,9 +46,9 @@ class Peer {
this.subscription = this.rhizomeNode.pubSub.subscribe( this.subscription = this.rhizomeNode.pubSub.subscribe(
this.publishAddr, this.publishAddr,
"deltas", this.rhizomeNode.config.pubSubTopic,
(sender, msg) => { (sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg.toString()); const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender; delta.receivedFrom = sender;
debug(`Received delta: ${JSON.stringify(delta)}`); debug(`Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta); this.rhizomeNode.deltaStream.ingestDelta(delta);
@ -99,7 +99,18 @@ export class Peers {
} }
} }
}); });
}
start() {
this.rhizomeNode.pubSub.subscribeTopic(
this.rhizomeNode.config.pubSubTopic,
(sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender;
debug(`Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta);
}
);
} }
addPeer(addr: PeerAddress): Peer { addPeer(addr: PeerAddress): Peer {

View File

@ -1,10 +1,18 @@
import {GossipSub, gossipsub} from '@chainsafe/libp2p-gossipsub';
import {noise} from '@chainsafe/libp2p-noise';
import {yamux} from '@chainsafe/libp2p-yamux';
import {identify} from '@libp2p/identify';
import {mdns} from '@libp2p/mdns';
import {ping} from '@libp2p/ping';
import {tcp} from '@libp2p/tcp';
import Debug from 'debug'; import Debug from 'debug';
import {Message, Publisher, Subscriber} from 'zeromq'; import {Libp2p, createLibp2p} from 'libp2p';
import {RhizomeNode} from './node'; import {Publisher, Subscriber} from 'zeromq';
import {PeerAddress} from './types'; import {RhizomeNode} from './node.js';
import {PeerAddress} from './types.js';
const debug = Debug('pub-sub'); const debug = Debug('pub-sub');
export type SubscribedMessageHandler = (sender: PeerAddress, msg: Message) => void; export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
// TODO: Allow subscribing to multiple topics on one socket // TODO: Allow subscribing to multiple topics on one socket
export class Subscription { export class Subscription {
@ -13,22 +21,33 @@ export class Subscription {
publishAddr: PeerAddress; publishAddr: PeerAddress;
publishAddrStr: string; publishAddrStr: string;
cb: SubscribedMessageHandler; cb: SubscribedMessageHandler;
libp2p?: Libp2p;
constructor(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler) { constructor(
publishAddr: PeerAddress,
topic: string,
cb: SubscribedMessageHandler,
libp2p?: Libp2p
) {
this.cb = cb; this.cb = cb;
this.topic = topic; this.topic = topic;
this.publishAddr = publishAddr; this.publishAddr = publishAddr;
this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`; this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
this.libp2p = libp2p;
} }
async start() { async start() {
this.sock.connect(this.publishAddrStr); this.sock.connect(this.publishAddrStr);
this.sock.subscribe(this.topic); this.sock.subscribe(this.topic);
debug(`Subscribing to ${this.topic} topic on ${this.publishAddrStr}`); debug(`Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`);
// Wait for ZeroMQ messages.
// This will block indefinitely.
for await (const [, sender, msg] of this.sock) { for await (const [, sender, msg] of this.sock) {
const senderAddr = PeerAddress.fromString(sender.toString()); const senderStr = PeerAddress.fromString(sender.toString());
this.cb(senderAddr, msg); const msgStr = msg.toString();
debug(`ZeroMQ subscribtion received msg: ${msgStr}`);
this.cb(senderStr, msgStr);
} }
} }
} }
@ -38,6 +57,7 @@ export class PubSub {
publishSock: Publisher; publishSock: Publisher;
publishAddrStr: string; publishAddrStr: string;
subscriptions: Subscription[] = []; subscriptions: Subscription[] = [];
libp2p?: Libp2p;
constructor(rhizomeNode: RhizomeNode) { constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode; this.rhizomeNode = rhizomeNode;
@ -49,19 +69,82 @@ export class PubSub {
async start() { async start() {
await this.publishSock.bind(this.publishAddrStr); await this.publishSock.bind(this.publishAddrStr);
debug(`Publishing socket bound to ${this.publishAddrStr}`); debug(`ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
this.libp2p = await createLibp2p({
addresses: {
// TODO: Config
listen: ['/ip4/127.0.0.1/tcp/0']
},
transports: [tcp()],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [mdns()],
services: {
pubsub: gossipsub(),
identify: identify(),
ping: ping(),
}
});
this.libp2p.addEventListener("peer:discovery", (event) => {
debug(`found peer: ${JSON.stringify(event.detail, null, 2)}`);
this.libp2p?.dial(event.detail.multiaddrs);
});
this.libp2p.addEventListener("peer:connect", (event) => {
debug(`connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
// TODO: Subscribe
});
} }
async publish(topic: string, msg: string) { async publish(topic: string, msg: string) {
debug(`publishing to ZeroMQ, msg: ${msg}`);
await this.publishSock.send([ await this.publishSock.send([
topic, topic,
this.rhizomeNode.myRequestAddr.toAddrString(), this.rhizomeNode.myRequestAddr.toAddrString(),
msg msg
]); ]);
if (this.libp2p) {
const pubsub = this.libp2p.services.pubsub as GossipSub;
debug(`publishing to Libp2p, msg: ${msg}`);
try {
await pubsub.publish(topic, Buffer.from(msg));
} catch (e: unknown) {
debug('Libp2p publish:', (e as Error).message);
}
}
} }
subscribe(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler): Subscription { subscribedTopics = new Set<string>();
const subscription = new Subscription(publishAddr, topic, cb);
subscribeTopic(topic: string, cb: SubscribedMessageHandler) {
if (!this.libp2p) throw new Error('libp2p not initialized');
const pubsub = this.libp2p.services.pubsub as GossipSub;
// TODO: If we subscribe to multiple topics this callback will be duplicated
pubsub.addEventListener("message", (event) => {
const msg = Buffer.from(event.detail.data).toString();
debug(`Libp2p subscribtion received msg: ${msg}`);
cb(new PeerAddress('libp2p', 0), msg);
});
// Add to our list of subscribed topics so we can unsubscribe later.
// Also has the effect of not calling subscribe more than once per topic.
if (!this.subscribedTopics.has(topic)) {
pubsub.subscribe(topic);
this.subscribedTopics.add(topic);
debug('subscribed topics:', Array.from(this.subscribedTopics.keys()));
}
}
subscribe(
publishAddr: PeerAddress,
topic: string,
cb: SubscribedMessageHandler
): Subscription {
const subscription = new Subscription(publishAddr, topic, cb, this.libp2p);
this.subscriptions.push(subscription); this.subscriptions.push(subscription);
return subscription; return subscription;
} }
@ -73,7 +156,23 @@ export class PubSub {
for (const subscription of this.subscriptions) { for (const subscription of this.subscriptions) {
subscription.sock.close(); subscription.sock.close();
debug('subscription socket is closed?', subscription.sock.closed); }
if (this.libp2p) {
const pubsub = this.libp2p.services.pubsub as GossipSub;
pubsub.removeEventListener("message");
for (const topic of this.subscribedTopics) {
debug(`unsubscribing Libp2p topic ${topic}`);
pubsub.unsubscribe(topic)
}
debug('stopping gossipsub');
await pubsub.stop();
await this.libp2p.stop();
} }
} }
} }

View File

@ -1,9 +1,9 @@
import {Request, Reply, Message} from 'zeromq';
import {EventEmitter} from 'node:events';
import {RequestMethods} from './peers';
import Debug from 'debug'; import Debug from 'debug';
import {RhizomeNode} from './node'; import {EventEmitter} from 'node:events';
import {PeerAddress} from './types'; import {Message, Reply, Request} from 'zeromq';
import {RhizomeNode} from './node.js';
import {RequestMethods} from './peers.js';
import {PeerAddress} from './types.js';
const debug = Debug('request-reply'); const debug = Debug('request-reply');
export type PeerRequest = { export type PeerRequest = {

View File

@ -1,5 +1,5 @@
import { Level } from 'level'; import { Level } from 'level';
import { LEVEL_DB_DIR } from './config'; import { LEVEL_DB_DIR } from './config.js';
import path from 'path'; import path from 'path';
function newStore(name: string): Level { function newStore(name: string): Level {

View File

@ -1,7 +1,7 @@
import Debug from "debug"; import Debug from "debug";
import EventEmitter from "events"; import EventEmitter from "events";
import {Delta, DeltaID} from "./delta"; import {Delta, DeltaID} from "./delta.js";
import {DomainEntityID, TransactionID} from "./types"; import {DomainEntityID, TransactionID} from "./types.js";
const debug = Debug("transactions"); const debug = Debug("transactions");
function getDeltaTransactionId(delta: Delta): TransactionID | undefined { function getDeltaTransactionId(delta: Delta): TransactionID | undefined {

View File

@ -1,7 +1,8 @@
import Debug from "debug"; import Debug from "debug";
import {FSWatcher, readdirSync, readFileSync, watch} from "fs"; import {FSWatcher, readdirSync, readFileSync, watch} from "fs";
import path, {join} from "path"; import path, {join} from "path";
import {Converter} from "showdown"; import showdown from "showdown";
const {Converter} = showdown;
const debug = Debug('md-files'); const debug = Debug('md-files');
const docConverter = new Converter({ const docConverter = new Converter({

View File

@ -1,12 +1,17 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "ES6", "target": "ESNext",
"module": "CommonJS", "module": "ESNext",
"esModuleInterop": true, "esModuleInterop": true,
"moduleResolution": "Node", "moduleResolution": "bundler",
"sourceMap": false, "sourceMap": false,
"isolatedModules": true,
/* "allowImportingTsExtensions": true, */
/* "noEmit": true, */
"baseUrl": ".", "baseUrl": ".",
"outDir": "dist", "outDir": "dist",
"lib": ["ESNext"],
"types": ["node", "jest"],
"importsNotUsedAsValues": "remove", "importsNotUsedAsValues": "remove",
"strict": true, "strict": true,
"skipLibCheck": true, "skipLibCheck": true,
@ -21,5 +26,8 @@
], ],
"exclude": [ "exclude": [
"node_modules" "node_modules"
] ],
"tsc-alias": {
"resolveFullPaths": true
}
} }

View File

@ -1,5 +1,6 @@
import {RhizomeNode, RhizomeNodeConfig} from "../src/node"; import {RhizomeNode, RhizomeNodeConfig} from "../src/node.js";
import {Collection} from "../src/collection"; import {Collection} from "../src/collection.js";
import {randomUUID} from "crypto";
const start = 5000; const start = 5000;
const range = 5000; const range = 5000;
@ -14,6 +15,7 @@ export class App extends RhizomeNode {
publishBindPort: getRandomPort(), publishBindPort: getRandomPort(),
requestBindPort: getRandomPort(), requestBindPort: getRandomPort(),
httpPort: getRandomPort(), httpPort: getRandomPort(),
pubSubTopic: config?.pubSubTopic || `deltas-${randomUUID()}`,
...config, ...config,
}); });