From cb096065904d76920bcb8944a19a24a4fd6d5f0e Mon Sep 17 00:00:00 2001 From: Ladd Date: Tue, 31 Dec 2024 14:04:34 -0600 Subject: [PATCH] fixed a small bug with global seedPeers --- __tests__/peer-address.ts | 11 ++++++- __tests__/run/002-two-nodes.ts | 5 ++++ src/config.ts | 7 ++--- src/delta.ts | 3 +- src/node.ts | 6 ++-- src/peers.ts | 55 +++++++++++++++++++++++++++++----- src/pub-sub.ts | 8 +++-- src/request-reply.ts | 3 +- src/types.ts | 28 ----------------- 9 files changed, 77 insertions(+), 49 deletions(-) diff --git a/__tests__/peer-address.ts b/__tests__/peer-address.ts index 38b7f99..3dbaafc 100644 --- a/__tests__/peer-address.ts +++ b/__tests__/peer-address.ts @@ -1,13 +1,22 @@ -import {PeerAddress} from '../src/types.js'; +import {parseAddressList, PeerAddress} from '../src/peers.js'; describe('PeerAddress', () => { it('toString()', () => { const addr = new PeerAddress('localhost', 1000); expect(addr.toAddrString()).toBe("localhost:1000"); }); + it('fromString()', () => { const addr = PeerAddress.fromString("localhost:1000"); expect(addr.addr).toBe("localhost"); expect(addr.port).toBe(1000); }); + + it('parseAddressList()', () => { + const input = "255.255.255.255:99999, 0.0.0.0:0"; + const result = parseAddressList(input); + expect(result).toHaveLength(2); + expect(result[0].isEqual(new PeerAddress("255.255.255.255", 99999))).toBeTruthy(); + expect(result[1].isEqual(new PeerAddress("0.0.0.0", 0))).toBeTruthy(); + }); }); diff --git a/__tests__/run/002-two-nodes.ts b/__tests__/run/002-two-nodes.ts index 5bebf55..98e6f25 100644 --- a/__tests__/run/002-two-nodes.ts +++ b/__tests__/run/002-two-nodes.ts @@ -16,8 +16,13 @@ describe('Run', () => { // Make the apps use the same pubsub topic so they can talk to each other pubSubTopic: apps[0].config.pubSubTopic, }); + debug('app[0].config.seedPeers before adding:', JSON.stringify(apps[0].config.seedPeers)); apps[0].config.seedPeers.push(apps[1].myRequestAddr); + debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers)); + debug('app[1].config.seedPeers before adding:', JSON.stringify(apps[1].config.seedPeers)); apps[1].config.seedPeers.push(apps[0].myRequestAddr); + debug('app[1].config.seedPeers after adding:', JSON.stringify(apps[1].config.seedPeers)); + debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers)); await Promise.all(apps.map((app) => app.start())); }); diff --git a/src/config.ts b/src/config.ts index 9e179d2..41448f7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,4 @@ import {randomUUID} from "crypto"; -import {PeerAddress} from "./types.js"; // _HOST refers to the address from an external perspective // _ADDR refers to the interface address from the service's perspective @@ -8,6 +7,9 @@ export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data'; export const CREATOR = process.env.USER!; export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID(); export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost'; + +export const SEED_PEERS = process.env.RHIZOME_SEED_PEERS || ''; + export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS; export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000'); export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR; @@ -17,8 +19,5 @@ export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLIS export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost'; export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000'); export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true'; -export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',') - .filter(x => !!x) - .map((peer: string) => PeerAddress.fromString(peer)); export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`; diff --git a/src/delta.ts b/src/delta.ts index 10a33e4..1cc2d28 100644 --- a/src/delta.ts +++ b/src/delta.ts @@ -1,6 +1,7 @@ import {randomUUID} from "crypto"; import microtime from 'microtime'; -import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js"; +import {CreatorID, HostID, Timestamp, TransactionID} from "./types.js"; +import {PeerAddress} from "./peers.js"; export type DeltaID = string; diff --git a/src/node.ts b/src/node.ts index 9f64b35..4560a99 100644 --- a/src/node.ts +++ b/src/node.ts @@ -3,10 +3,9 @@ import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB import {DeltaStream} from './deltas.js'; import {HttpServer} from './http/index.js'; import {Lossless} from './lossless.js'; -import {Peers} from './peers.js'; +import {parseAddressList, PeerAddress, Peers} from './peers.js'; import {PubSub} from './pub-sub.js'; import {RequestReply} from './request-reply.js'; -import {PeerAddress} from './types.js'; const debug = Debug('rz:rhizome-node'); export type RhizomeNodeConfig = { @@ -48,7 +47,7 @@ export class RhizomeNode { httpAddr: HTTP_API_ADDR, httpPort: HTTP_API_PORT, httpEnable: HTTP_API_ENABLE, - seedPeers: SEED_PEERS, + seedPeers: parseAddressList(SEED_PEERS), peerId: PEER_ID, creator: CREATOR, pubSubTopic: PUB_SUB_TOPIC, @@ -107,5 +106,6 @@ export class RhizomeNode { await this.pubSub.stop(); await this.requestReply.stop(); await this.httpServer.stop(); + debug(`[${this.config.peerId}]`, 'stopped'); } } diff --git a/src/peers.ts b/src/peers.ts index 059bf66..faba3da 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -1,13 +1,44 @@ import Debug from 'debug'; import {Message} from 'zeromq'; -import {SEED_PEERS} from "./config.js"; import {Delta} from "./delta.js"; import {RhizomeNode} from "./node.js"; import {Subscription} from './pub-sub.js'; import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js"; -import {PeerAddress} from "./types.js"; const debug = Debug('rz:peers'); +export class PeerAddress { + addr: string; + port: number; + + constructor(addr: string, port: number) { + this.addr = addr; + this.port = port; + } + + static fromString(addrString: string): PeerAddress { + const [addr, port] = addrString.trim().split(':'); + return new PeerAddress(addr, parseInt(port)); + } + + toAddrString() { + return `${this.addr}:${this.port}`; + } + + toJSON() { + return this.toAddrString(); + } + + isEqual(other: PeerAddress) { + return this.addr === other.addr && this.port === other.port; + } +}; + +export function parseAddressList(input: string): PeerAddress[] { + return input.split(',') + .filter(x => !!x) + .map((peer: string) => PeerAddress.fromString(peer)); +} + export enum RequestMethods { GetPublishAddress, AskForDeltas @@ -26,7 +57,7 @@ class Peer { this.rhizomeNode = rhizomeNode; this.reqAddr = reqAddr; this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr); - this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer)); + this.isSeedPeer = this.rhizomeNode.config.seedPeers.some((seedPeer) => reqAddr.isEqual(seedPeer)); } async request(method: RequestMethods): Promise { @@ -44,6 +75,9 @@ class Peer { debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`); } + debug(`[${this.rhizomeNode.config.peerId}]`, `subscribing to peer ${this.reqAddr.toAddrString()}`); + + // ZeroMQ subscription this.subscription = this.rhizomeNode.pubSub.subscribe( this.publishAddr, this.rhizomeNode.config.pubSubTopic, @@ -121,17 +155,24 @@ export class Peers { } async subscribeToSeeds() { - SEED_PEERS.forEach(async (addr, idx) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`); + const {seedPeers} = this.rhizomeNode.config; + debug(`[${this.rhizomeNode.config.peerId}]`, `subscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`); + seedPeers.forEach(async (addr, idx) => { const peer = this.addPeer(addr); - await peer.subscribeDeltas(); + + debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}, isSelf:`, peer.isSelf); + if (!peer.isSelf) { + await peer.subscribeDeltas(); + } }); } //! TODO Expect abysmal scaling properties with this function async askAllPeersForDeltas() { - this.peers.filter(({isSelf}) => !isSelf) + this.peers .forEach(async (peer, idx) => { + debug(`[${this.rhizomeNode.config.peerId}]`, `peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf); + if (peer.isSelf) return; debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`); const deltas = await peer.askForDeltas(); debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); diff --git a/src/pub-sub.ts b/src/pub-sub.ts index 5e98e95..d87e45a 100644 --- a/src/pub-sub.ts +++ b/src/pub-sub.ts @@ -9,7 +9,7 @@ import Debug from 'debug'; import {Libp2p, createLibp2p} from 'libp2p'; import {Publisher, Subscriber} from 'zeromq'; import {RhizomeNode} from './node.js'; -import {PeerAddress} from './types.js'; +import {PeerAddress} from './peers.js'; const debug = Debug('rz:pub-sub'); export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void; @@ -50,6 +50,8 @@ export class Subscription { debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`); this.cb(senderStr, msgStr); } + + debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `done waiting for subscription socket for topic ${this.topic}`); } } @@ -89,12 +91,12 @@ export class PubSub { }); this.libp2p.addEventListener("peer:discovery", (event) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail)}`); this.libp2p?.dial(event.detail.multiaddrs); }); this.libp2p.addEventListener("peer:connect", (event) => { - debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`); + debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail)}`); }); } diff --git a/src/request-reply.ts b/src/request-reply.ts index 5463af5..73a8814 100644 --- a/src/request-reply.ts +++ b/src/request-reply.ts @@ -2,8 +2,7 @@ import Debug from 'debug'; import {EventEmitter} from 'node:events'; import {Message, Reply, Request} from 'zeromq'; import {RhizomeNode} from './node.js'; -import {RequestMethods} from './peers.js'; -import {PeerAddress} from './types.js'; +import {PeerAddress, RequestMethods} from './peers.js'; const debug = Debug('rz:request-reply'); export type PeerRequest = { diff --git a/src/types.ts b/src/types.ts index 0e3916e..3f1faca 100644 --- a/src/types.ts +++ b/src/types.ts @@ -18,31 +18,3 @@ export type ViewMany = { [key: DomainEntityID]: T; }; -// TODO: Move to ./peers.ts -export class PeerAddress { - addr: string; - port: number; - - constructor(addr: string, port: number) { - this.addr = addr; - this.port = port; - } - - static fromString(addrString: string): PeerAddress { - const [addr, port] = addrString.trim().split(':'); - return new PeerAddress(addr, parseInt(port)); - } - - toAddrString() { - return `${this.addr}:${this.port}`; - } - - toJSON() { - return this.toAddrString(); - } - - isEqual(other: PeerAddress) { - return this.addr === other.addr && this.port === other.port; - } -}; -