diff --git a/__tests__/run/002-two-nodes.ts b/__tests__/run/002-two-nodes.ts index 98e6f25..f7a5a80 100644 --- a/__tests__/run/002-two-nodes.ts +++ b/__tests__/run/002-two-nodes.ts @@ -24,7 +24,7 @@ describe('Run', () => { debug('app[1].config.seedPeers after adding:', JSON.stringify(apps[1].config.seedPeers)); debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers)); - await Promise.all(apps.map((app) => app.start())); + await Promise.all(apps.map((app) => app.start(false))); }); afterAll(async () => { diff --git a/src/collection.ts b/src/collection.ts index fbee74c..6ed7d7b 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -45,7 +45,7 @@ export class Collection { rhizomeNode.httpServer.httpApi.serveCollection(this); - debug(`[${this.rhizomeNode.config.peerId}]`, `connected ${this.name} to rhizome`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Connected ${this.name} to rhizome`); } // Applies the javascript rules for updating object values, diff --git a/src/lossless.ts b/src/lossless.ts index fe8b27e..86aeb4f 100644 --- a/src/lossless.ts +++ b/src/lossless.ts @@ -72,7 +72,7 @@ export class Lossless { constructor(readonly rhizomeNode: RhizomeNode) { this.transactions = new Transactions(this); this.transactions.eventStream.on("completed", (transactionId) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `completed transaction ${transactionId}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Completed transaction ${transactionId}`); const transaction = this.transactions.get(transactionId); if (!transaction) return; for (const id of transaction.entityIds) { @@ -143,7 +143,7 @@ export class Lossless { if (delta.transactionId) { if (!this.transactions.isComplete(delta.transactionId)) { // TODO: Test this condition - debug(`[${this.rhizomeNode.config.peerId}]`, `excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`); continue; } } diff --git a/src/lossy.ts b/src/lossy.ts index e231a51..cf212ce 100644 --- a/src/lossy.ts +++ b/src/lossy.ts @@ -100,7 +100,7 @@ export class Lossy { defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { const resolved: ResolvedViewMany = {}; - // debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'default resolver, lossless view', JSON.stringify(losslessView)); + // debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'Default resolver, lossless view', JSON.stringify(losslessView)); for (const [id, ent] of Object.entries(losslessView)) { resolved[id] = {id, properties: {}}; diff --git a/src/node.ts b/src/node.ts index 4560a99..775fd8f 100644 --- a/src/node.ts +++ b/src/node.ts @@ -53,7 +53,7 @@ export class RhizomeNode { pubSubTopic: PUB_SUB_TOPIC, ...config }; - debug(`[${this.config.peerId}]`, 'config', this.config); + debug(`[${this.config.peerId}]`, 'Config', this.config); this.myRequestAddr = new PeerAddress( this.config.requestBindHost, this.config.requestBindPort @@ -70,12 +70,20 @@ export class RhizomeNode { this.lossless = new Lossless(this); } - async start() { + async start(syncOnStart = false) { // Connect our lossless view to the delta stream this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta)); - // Start ZeroMQ publish and reply sockets - await this.pubSub.start(); + // Bind ZeroMQ publish socket + // TODO: Config option to enable zmq pubsub + // await this.pubSub.startZmq(); + + // Initialize Libp2p + await this.pubSub.startLibp2p(); + + // Bind ZeroMQ request socket + // TODO: request/reply via libp2p? + // TODO: config options to enable request/reply, or configure available commands this.requestReply.start(); // Start HTTP server @@ -83,29 +91,40 @@ export class RhizomeNode { this.httpServer.start(); } - // Start libp2p subscription - this.peers.start(); + { + // Start libp2p subscription + // TODO: Config option to enable gossipsub + // TODO: Config options for gossipsub and other libp2p configs + this.peers.start(); - // Wait a short time for sockets to initialize - await new Promise((resolve) => setTimeout(resolve, 500)); + // Wait a short time for peers to connect + await new Promise((resolve) => setTimeout(resolve, 200)); + } - // Subscribe to seed peers - this.peers.subscribeToSeeds(); + { + // Wait a short time for sockets to initialize + // await new Promise((resolve) => setTimeout(resolve, 500)); - // Wait a short time for sockets to initialize - await new Promise((resolve) => setTimeout(resolve, 500)); + // Subscribe to seed peers + // this.peers.subscribeToSeeds(); - // Ask all peers for all deltas - this.peers.askAllPeersForDeltas(); + // Wait a short time for sockets to initialize + // await new Promise((resolve) => setTimeout(resolve, 500)); + } - // Wait to receive all deltas - await new Promise((resolve) => setTimeout(resolve, 1000)); + if (syncOnStart) { + // Ask all peers for all deltas + this.peers.askAllPeersForDeltas(); + + // Wait to receive all deltas + await new Promise((resolve) => setTimeout(resolve, 1000)); + } } async stop() { await this.pubSub.stop(); await this.requestReply.stop(); await this.httpServer.stop(); - debug(`[${this.config.peerId}]`, 'stopped'); + debug(`[${this.config.peerId}]`, 'Stopped'); } } diff --git a/src/peers.ts b/src/peers.ts index faba3da..99b7fe1 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -69,13 +69,13 @@ class Peer { async subscribeDeltas() { if (!this.publishAddr) { - debug(`[${this.rhizomeNode.config.peerId}]`, `requesting publish addr from peer ${this.reqAddr.toAddrString()}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Requesting publish addr from peer ${this.reqAddr.toAddrString()}`); const res = await this.request(RequestMethods.GetPublishAddress); this.publishAddr = PeerAddress.fromString(res.toString()); - debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`); } - debug(`[${this.rhizomeNode.config.peerId}]`, `subscribing to peer ${this.reqAddr.toAddrString()}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Subscribing to peer ${this.reqAddr.toAddrString()}`); // ZeroMQ subscription this.subscription = this.rhizomeNode.pubSub.subscribe( @@ -115,19 +115,19 @@ export class Peers { this.addPeer(this.rhizomeNode.myRequestAddr); this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { - debug(`[${this.rhizomeNode.config.peerId}]`, 'inspecting peer request'); + debug(`[${this.rhizomeNode.config.peerId}]`, 'Inspecting peer request'); switch (req.method) { case RequestMethods.GetPublishAddress: { - debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for our publish address'); + debug(`[${this.rhizomeNode.config.peerId}]`, 'It\'s a request for our publish address'); await res.send(this.rhizomeNode.myPublishAddr.toAddrString()); break; } case RequestMethods.AskForDeltas: { - debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for deltas'); + debug(`[${this.rhizomeNode.config.peerId}]`, 'It\'s a request for deltas'); // TODO: stream these rather than // trying to write them all in one message const deltas = this.rhizomeNode.deltaStream.deltasAccepted; - debug(`[${this.rhizomeNode.config.peerId}]`, `sending ${deltas.length} deltas`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Sending ${deltas.length} deltas`); await res.send(JSON.stringify(deltas)); break; } @@ -150,13 +150,13 @@ export class Peers { addPeer(addr: PeerAddress): Peer { const peer = new Peer(this.rhizomeNode, addr); this.peers.push(peer); - debug(`[${this.rhizomeNode.config.peerId}]`, 'added peer', addr); + debug(`[${this.rhizomeNode.config.peerId}]`, 'Added peer', addr); return peer; } async subscribeToSeeds() { const {seedPeers} = this.rhizomeNode.config; - debug(`[${this.rhizomeNode.config.peerId}]`, `subscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `SubscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`); seedPeers.forEach(async (addr, idx) => { const peer = this.addPeer(addr); @@ -171,11 +171,11 @@ export class Peers { async askAllPeersForDeltas() { this.peers .forEach(async (peer, idx) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf); + debug(`[${this.rhizomeNode.config.peerId}]`, `Peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf); if (peer.isSelf) return; debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`); const deltas = await peer.askForDeltas(); - debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); for (const delta of deltas) { delta.receivedFrom = peer.reqAddr; this.rhizomeNode.deltaStream.receiveDelta(delta); diff --git a/src/pub-sub.ts b/src/pub-sub.ts index d87e45a..3fe2cac 100644 --- a/src/pub-sub.ts +++ b/src/pub-sub.ts @@ -51,29 +51,32 @@ export class Subscription { this.cb(senderStr, msgStr); } - debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `done waiting for subscription socket for topic ${this.topic}`); + debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Done waiting for subscription socket for topic ${this.topic}`); } } export class PubSub { rhizomeNode: RhizomeNode; - publishSock: Publisher; + publishSock?: Publisher; publishAddrStr: string; subscriptions: Subscription[] = []; libp2p?: Libp2p; constructor(rhizomeNode: RhizomeNode) { this.rhizomeNode = rhizomeNode; - this.publishSock = new Publisher(); const {publishBindAddr, publishBindPort} = this.rhizomeNode.config; this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`; } - async start() { + async startZmq() { + this.publishSock = new Publisher(); + await this.publishSock.bind(this.publishAddrStr); debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`); + } + async startLibp2p() { this.libp2p = await createLibp2p({ addresses: { // TODO: Config @@ -91,30 +94,38 @@ export class PubSub { }); this.libp2p.addEventListener("peer:discovery", (event) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail)}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Found peer: ${JSON.stringify(event.detail)}`); this.libp2p?.dial(event.detail.multiaddrs); }); this.libp2p.addEventListener("peer:connect", (event) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail)}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Connected to peer: ${JSON.stringify(event.detail)}`); }); } async publish(topic: string, msg: string) { - debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to ZeroMQ, msg: ${msg}`); - await this.publishSock.send([ - topic, - this.rhizomeNode.myRequestAddr.toAddrString(), - msg - ]); + if (this.publishSock) { + await this.publishSock.send([ + topic, + this.rhizomeNode.myRequestAddr.toAddrString(), + msg + ]); + debug(`[${this.rhizomeNode.config.peerId}]`, `Published to ZeroMQ, msg: ${msg}`); + } if (this.libp2p) { const pubsub = this.libp2p.services.pubsub as GossipSub; - debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to Libp2p, msg: ${msg}`); + let published = false; try { await pubsub.publish(topic, Buffer.from(msg)); + published = true; } catch (e: unknown) { - debug(`[${this.rhizomeNode.config.peerId}]`, 'Libp2p publish:', (e as Error).message); + if (!((e as Error).message as string).match("PublishError.NoPeersSubscribedToTopic")) { + debug(`[${this.rhizomeNode.config.peerId}]`, 'Libp2p publish:', (e as Error).message); + } + } + if (published) { + debug(`[${this.rhizomeNode.config.peerId}]`, `Published to Libp2p, msg: ${msg}`); } } } @@ -137,7 +148,7 @@ export class PubSub { if (!this.subscribedTopics.has(topic)) { pubsub.subscribe(topic); this.subscribedTopics.add(topic); - debug(`[${this.rhizomeNode.config.peerId}]`, 'subscribed topics:', Array.from(this.subscribedTopics.keys())); + debug(`[${this.rhizomeNode.config.peerId}]`, 'Subscribed topics:', Array.from(this.subscribedTopics.keys())); } } @@ -152,9 +163,12 @@ export class PubSub { } async stop() { - await this.publishSock.unbind(this.publishAddrStr); - this.publishSock.close(); - this.publishSock = new Publisher(); + if (this.publishSock) { + await this.publishSock.unbind(this.publishAddrStr); + this.publishSock.close(); + // Free the memory by taking the old object out of scope. + this.publishSock = undefined; + } for (const subscription of this.subscriptions) { subscription.sock.close(); @@ -166,17 +180,17 @@ export class PubSub { pubsub.removeEventListener("message"); for (const topic of this.subscribedTopics) { - debug(`[${this.rhizomeNode.config.peerId}]`, `unsubscribing Libp2p topic ${topic}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `Unsubscribing Libp2p topic ${topic}`); pubsub.unsubscribe(topic) } - debug(`[${this.rhizomeNode.config.peerId}]`, 'stopping gossipsub'); + debug(`[${this.rhizomeNode.config.peerId}]`, 'Stopping gossipsub'); await pubsub.stop(); await this.libp2p.stop(); - debug(`[${this.rhizomeNode.config.peerId}]`, 'stopped libp2p'); + debug(`[${this.rhizomeNode.config.peerId}]`, 'Stopped libp2p'); } } diff --git a/src/transactions.ts b/src/transactions.ts index 04c4d53..d2ca167 100644 --- a/src/transactions.ts +++ b/src/transactions.ts @@ -104,7 +104,7 @@ export class Transactions { if (transactionId && size) { // This delta describes a transaction - debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `transaction ${transactionId} has size ${size}`); + debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `Transaction ${transactionId} has size ${size}`); this.setSize(transactionId, size as number); diff --git a/src/util/md-files.ts b/src/util/md-files.ts index aea1a01..1f588e9 100644 --- a/src/util/md-files.ts +++ b/src/util/md-files.ts @@ -99,14 +99,14 @@ export class MDFiles { switch (eventType) { case 'rename': { - debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} renamed`); + debug(`[${this.rhizomeNode.config.peerId}]`, `File ${name} renamed`); // Remove it from memory and re-scan everything this.files.delete(name); this.readDir(); break; } case 'change': { - debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} changed`); + debug(`[${this.rhizomeNode.config.peerId}]`, `File ${name} changed`); // Re-read this file this.readFile(name) break;