Compare commits

...

2 Commits

Author SHA1 Message Date
Ladd Hoffman cb09606590 fixed a small bug with global seedPeers 2024-12-31 14:04:34 -06:00
Ladd Hoffman 20b71c5d2f log when libp2p is stopped 2024-12-31 12:49:08 -06:00
9 changed files with 80 additions and 50 deletions

View File

@ -1,13 +1,22 @@
import {PeerAddress} from '../src/types.js';
import {parseAddressList, PeerAddress} from '../src/peers.js';
describe('PeerAddress', () => {
it('toString()', () => {
const addr = new PeerAddress('localhost', 1000);
expect(addr.toAddrString()).toBe("localhost:1000");
});
it('fromString()', () => {
const addr = PeerAddress.fromString("localhost:1000");
expect(addr.addr).toBe("localhost");
expect(addr.port).toBe(1000);
});
it('parseAddressList()', () => {
const input = "255.255.255.255:99999, 0.0.0.0:0";
const result = parseAddressList(input);
expect(result).toHaveLength(2);
expect(result[0].isEqual(new PeerAddress("255.255.255.255", 99999))).toBeTruthy();
expect(result[1].isEqual(new PeerAddress("0.0.0.0", 0))).toBeTruthy();
});
});

View File

@ -16,8 +16,13 @@ describe('Run', () => {
// Make the apps use the same pubsub topic so they can talk to each other
pubSubTopic: apps[0].config.pubSubTopic,
});
debug('app[0].config.seedPeers before adding:', JSON.stringify(apps[0].config.seedPeers));
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
debug('app[1].config.seedPeers before adding:', JSON.stringify(apps[1].config.seedPeers));
apps[1].config.seedPeers.push(apps[0].myRequestAddr);
debug('app[1].config.seedPeers after adding:', JSON.stringify(apps[1].config.seedPeers));
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
await Promise.all(apps.map((app) => app.start()));
});

View File

@ -1,5 +1,4 @@
import {randomUUID} from "crypto";
import {PeerAddress} from "./types.js";
// _HOST refers to the address from an external perspective
// _ADDR refers to the interface address from the service's perspective
@ -8,6 +7,9 @@ export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
export const CREATOR = process.env.USER!;
export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
export const SEED_PEERS = process.env.RHIZOME_SEED_PEERS || '';
export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR;
@ -17,8 +19,5 @@ export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLIS
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
.filter(x => !!x)
.map((peer: string) => PeerAddress.fromString(peer));
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;

View File

@ -1,6 +1,7 @@
import {randomUUID} from "crypto";
import microtime from 'microtime';
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js";
import {CreatorID, HostID, Timestamp, TransactionID} from "./types.js";
import {PeerAddress} from "./peers.js";
export type DeltaID = string;

View File

@ -3,10 +3,9 @@ import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB
import {DeltaStream} from './deltas.js';
import {HttpServer} from './http/index.js';
import {Lossless} from './lossless.js';
import {Peers} from './peers.js';
import {parseAddressList, PeerAddress, Peers} from './peers.js';
import {PubSub} from './pub-sub.js';
import {RequestReply} from './request-reply.js';
import {PeerAddress} from './types.js';
const debug = Debug('rz:rhizome-node');
export type RhizomeNodeConfig = {
@ -48,7 +47,7 @@ export class RhizomeNode {
httpAddr: HTTP_API_ADDR,
httpPort: HTTP_API_PORT,
httpEnable: HTTP_API_ENABLE,
seedPeers: SEED_PEERS,
seedPeers: parseAddressList(SEED_PEERS),
peerId: PEER_ID,
creator: CREATOR,
pubSubTopic: PUB_SUB_TOPIC,
@ -107,5 +106,6 @@ export class RhizomeNode {
await this.pubSub.stop();
await this.requestReply.stop();
await this.httpServer.stop();
debug(`[${this.config.peerId}]`, 'stopped');
}
}

View File

@ -1,13 +1,44 @@
import Debug from 'debug';
import {Message} from 'zeromq';
import {SEED_PEERS} from "./config.js";
import {Delta} from "./delta.js";
import {RhizomeNode} from "./node.js";
import {Subscription} from './pub-sub.js';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
import {PeerAddress} from "./types.js";
const debug = Debug('rz:peers');
export class PeerAddress {
addr: string;
port: number;
constructor(addr: string, port: number) {
this.addr = addr;
this.port = port;
}
static fromString(addrString: string): PeerAddress {
const [addr, port] = addrString.trim().split(':');
return new PeerAddress(addr, parseInt(port));
}
toAddrString() {
return `${this.addr}:${this.port}`;
}
toJSON() {
return this.toAddrString();
}
isEqual(other: PeerAddress) {
return this.addr === other.addr && this.port === other.port;
}
};
export function parseAddressList(input: string): PeerAddress[] {
return input.split(',')
.filter(x => !!x)
.map((peer: string) => PeerAddress.fromString(peer));
}
export enum RequestMethods {
GetPublishAddress,
AskForDeltas
@ -26,7 +57,7 @@ class Peer {
this.rhizomeNode = rhizomeNode;
this.reqAddr = reqAddr;
this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer));
this.isSeedPeer = this.rhizomeNode.config.seedPeers.some((seedPeer) => reqAddr.isEqual(seedPeer));
}
async request(method: RequestMethods): Promise<Message> {
@ -44,6 +75,9 @@ class Peer {
debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
}
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribing to peer ${this.reqAddr.toAddrString()}`);
// ZeroMQ subscription
this.subscription = this.rhizomeNode.pubSub.subscribe(
this.publishAddr,
this.rhizomeNode.config.pubSubTopic,
@ -121,17 +155,24 @@ export class Peers {
}
async subscribeToSeeds() {
SEED_PEERS.forEach(async (addr, idx) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`);
const {seedPeers} = this.rhizomeNode.config;
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`);
seedPeers.forEach(async (addr, idx) => {
const peer = this.addPeer(addr);
await peer.subscribeDeltas();
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}, isSelf:`, peer.isSelf);
if (!peer.isSelf) {
await peer.subscribeDeltas();
}
});
}
//! TODO Expect abysmal scaling properties with this function
async askAllPeersForDeltas() {
this.peers.filter(({isSelf}) => !isSelf)
this.peers
.forEach(async (peer, idx) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf);
if (peer.isSelf) return;
debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
const deltas = await peer.askForDeltas();
debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);

View File

@ -9,7 +9,7 @@ import Debug from 'debug';
import {Libp2p, createLibp2p} from 'libp2p';
import {Publisher, Subscriber} from 'zeromq';
import {RhizomeNode} from './node.js';
import {PeerAddress} from './types.js';
import {PeerAddress} from './peers.js';
const debug = Debug('rz:pub-sub');
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
@ -50,6 +50,8 @@ export class Subscription {
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
this.cb(senderStr, msgStr);
}
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `done waiting for subscription socket for topic ${this.topic}`);
}
}
@ -89,13 +91,12 @@ export class PubSub {
});
this.libp2p.addEventListener("peer:discovery", (event) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`);
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail)}`);
this.libp2p?.dial(event.detail.multiaddrs);
});
this.libp2p.addEventListener("peer:connect", (event) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
// TODO: Subscribe
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail)}`);
});
}
@ -174,6 +175,9 @@ export class PubSub {
await pubsub.stop();
await this.libp2p.stop();
debug(`[${this.rhizomeNode.config.peerId}]`, 'stopped libp2p');
}
}
}

View File

@ -2,8 +2,7 @@ import Debug from 'debug';
import {EventEmitter} from 'node:events';
import {Message, Reply, Request} from 'zeromq';
import {RhizomeNode} from './node.js';
import {RequestMethods} from './peers.js';
import {PeerAddress} from './types.js';
import {PeerAddress, RequestMethods} from './peers.js';
const debug = Debug('rz:request-reply');
export type PeerRequest = {

View File

@ -18,31 +18,3 @@ export type ViewMany<T> = {
[key: DomainEntityID]: T;
};
// TODO: Move to ./peers.ts
export class PeerAddress {
addr: string;
port: number;
constructor(addr: string, port: number) {
this.addr = addr;
this.port = port;
}
static fromString(addrString: string): PeerAddress {
const [addr, port] = addrString.trim().split(':');
return new PeerAddress(addr, parseInt(port));
}
toAddrString() {
return `${this.addr}:${this.port}`;
}
toJSON() {
return this.toAddrString();
}
isEqual(other: PeerAddress) {
return this.addr === other.addr && this.port === other.port;
}
};