Compare commits
2 Commits
6678176326
...
cb09606590
Author | SHA1 | Date |
---|---|---|
Ladd Hoffman | cb09606590 | |
Ladd Hoffman | 20b71c5d2f |
|
@ -1,13 +1,22 @@
|
|||
import {PeerAddress} from '../src/types.js';
|
||||
import {parseAddressList, PeerAddress} from '../src/peers.js';
|
||||
|
||||
describe('PeerAddress', () => {
|
||||
it('toString()', () => {
|
||||
const addr = new PeerAddress('localhost', 1000);
|
||||
expect(addr.toAddrString()).toBe("localhost:1000");
|
||||
});
|
||||
|
||||
it('fromString()', () => {
|
||||
const addr = PeerAddress.fromString("localhost:1000");
|
||||
expect(addr.addr).toBe("localhost");
|
||||
expect(addr.port).toBe(1000);
|
||||
});
|
||||
|
||||
it('parseAddressList()', () => {
|
||||
const input = "255.255.255.255:99999, 0.0.0.0:0";
|
||||
const result = parseAddressList(input);
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0].isEqual(new PeerAddress("255.255.255.255", 99999))).toBeTruthy();
|
||||
expect(result[1].isEqual(new PeerAddress("0.0.0.0", 0))).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
|
|
@ -16,8 +16,13 @@ describe('Run', () => {
|
|||
// Make the apps use the same pubsub topic so they can talk to each other
|
||||
pubSubTopic: apps[0].config.pubSubTopic,
|
||||
});
|
||||
debug('app[0].config.seedPeers before adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
|
||||
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||
debug('app[1].config.seedPeers before adding:', JSON.stringify(apps[1].config.seedPeers));
|
||||
apps[1].config.seedPeers.push(apps[0].myRequestAddr);
|
||||
debug('app[1].config.seedPeers after adding:', JSON.stringify(apps[1].config.seedPeers));
|
||||
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||
|
||||
await Promise.all(apps.map((app) => app.start()));
|
||||
});
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import {randomUUID} from "crypto";
|
||||
import {PeerAddress} from "./types.js";
|
||||
|
||||
// _HOST refers to the address from an external perspective
|
||||
// _ADDR refers to the interface address from the service's perspective
|
||||
|
@ -8,6 +7,9 @@ export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
|
|||
export const CREATOR = process.env.USER!;
|
||||
export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
|
||||
export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
|
||||
|
||||
export const SEED_PEERS = process.env.RHIZOME_SEED_PEERS || '';
|
||||
|
||||
export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
|
||||
export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
|
||||
export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR;
|
||||
|
@ -17,8 +19,5 @@ export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLIS
|
|||
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
|
||||
export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
|
||||
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
|
||||
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
|
||||
.filter(x => !!x)
|
||||
.map((peer: string) => PeerAddress.fromString(peer));
|
||||
|
||||
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import {randomUUID} from "crypto";
|
||||
import microtime from 'microtime';
|
||||
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js";
|
||||
import {CreatorID, HostID, Timestamp, TransactionID} from "./types.js";
|
||||
import {PeerAddress} from "./peers.js";
|
||||
|
||||
export type DeltaID = string;
|
||||
|
||||
|
|
|
@ -3,10 +3,9 @@ import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB
|
|||
import {DeltaStream} from './deltas.js';
|
||||
import {HttpServer} from './http/index.js';
|
||||
import {Lossless} from './lossless.js';
|
||||
import {Peers} from './peers.js';
|
||||
import {parseAddressList, PeerAddress, Peers} from './peers.js';
|
||||
import {PubSub} from './pub-sub.js';
|
||||
import {RequestReply} from './request-reply.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
const debug = Debug('rz:rhizome-node');
|
||||
|
||||
export type RhizomeNodeConfig = {
|
||||
|
@ -48,7 +47,7 @@ export class RhizomeNode {
|
|||
httpAddr: HTTP_API_ADDR,
|
||||
httpPort: HTTP_API_PORT,
|
||||
httpEnable: HTTP_API_ENABLE,
|
||||
seedPeers: SEED_PEERS,
|
||||
seedPeers: parseAddressList(SEED_PEERS),
|
||||
peerId: PEER_ID,
|
||||
creator: CREATOR,
|
||||
pubSubTopic: PUB_SUB_TOPIC,
|
||||
|
@ -107,5 +106,6 @@ export class RhizomeNode {
|
|||
await this.pubSub.stop();
|
||||
await this.requestReply.stop();
|
||||
await this.httpServer.stop();
|
||||
debug(`[${this.config.peerId}]`, 'stopped');
|
||||
}
|
||||
}
|
||||
|
|
55
src/peers.ts
55
src/peers.ts
|
@ -1,13 +1,44 @@
|
|||
import Debug from 'debug';
|
||||
import {Message} from 'zeromq';
|
||||
import {SEED_PEERS} from "./config.js";
|
||||
import {Delta} from "./delta.js";
|
||||
import {RhizomeNode} from "./node.js";
|
||||
import {Subscription} from './pub-sub.js';
|
||||
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
|
||||
import {PeerAddress} from "./types.js";
|
||||
const debug = Debug('rz:peers');
|
||||
|
||||
export class PeerAddress {
|
||||
addr: string;
|
||||
port: number;
|
||||
|
||||
constructor(addr: string, port: number) {
|
||||
this.addr = addr;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
static fromString(addrString: string): PeerAddress {
|
||||
const [addr, port] = addrString.trim().split(':');
|
||||
return new PeerAddress(addr, parseInt(port));
|
||||
}
|
||||
|
||||
toAddrString() {
|
||||
return `${this.addr}:${this.port}`;
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return this.toAddrString();
|
||||
}
|
||||
|
||||
isEqual(other: PeerAddress) {
|
||||
return this.addr === other.addr && this.port === other.port;
|
||||
}
|
||||
};
|
||||
|
||||
export function parseAddressList(input: string): PeerAddress[] {
|
||||
return input.split(',')
|
||||
.filter(x => !!x)
|
||||
.map((peer: string) => PeerAddress.fromString(peer));
|
||||
}
|
||||
|
||||
export enum RequestMethods {
|
||||
GetPublishAddress,
|
||||
AskForDeltas
|
||||
|
@ -26,7 +57,7 @@ class Peer {
|
|||
this.rhizomeNode = rhizomeNode;
|
||||
this.reqAddr = reqAddr;
|
||||
this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
|
||||
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer));
|
||||
this.isSeedPeer = this.rhizomeNode.config.seedPeers.some((seedPeer) => reqAddr.isEqual(seedPeer));
|
||||
}
|
||||
|
||||
async request(method: RequestMethods): Promise<Message> {
|
||||
|
@ -44,6 +75,9 @@ class Peer {
|
|||
debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
|
||||
}
|
||||
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribing to peer ${this.reqAddr.toAddrString()}`);
|
||||
|
||||
// ZeroMQ subscription
|
||||
this.subscription = this.rhizomeNode.pubSub.subscribe(
|
||||
this.publishAddr,
|
||||
this.rhizomeNode.config.pubSubTopic,
|
||||
|
@ -121,17 +155,24 @@ export class Peers {
|
|||
}
|
||||
|
||||
async subscribeToSeeds() {
|
||||
SEED_PEERS.forEach(async (addr, idx) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`);
|
||||
const {seedPeers} = this.rhizomeNode.config;
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`);
|
||||
seedPeers.forEach(async (addr, idx) => {
|
||||
const peer = this.addPeer(addr);
|
||||
await peer.subscribeDeltas();
|
||||
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}, isSelf:`, peer.isSelf);
|
||||
if (!peer.isSelf) {
|
||||
await peer.subscribeDeltas();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//! TODO Expect abysmal scaling properties with this function
|
||||
async askAllPeersForDeltas() {
|
||||
this.peers.filter(({isSelf}) => !isSelf)
|
||||
this.peers
|
||||
.forEach(async (peer, idx) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf);
|
||||
if (peer.isSelf) return;
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
|
||||
const deltas = await peer.askForDeltas();
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
|
||||
|
|
|
@ -9,7 +9,7 @@ import Debug from 'debug';
|
|||
import {Libp2p, createLibp2p} from 'libp2p';
|
||||
import {Publisher, Subscriber} from 'zeromq';
|
||||
import {RhizomeNode} from './node.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
import {PeerAddress} from './peers.js';
|
||||
const debug = Debug('rz:pub-sub');
|
||||
|
||||
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
|
||||
|
@ -50,6 +50,8 @@ export class Subscription {
|
|||
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
|
||||
this.cb(senderStr, msgStr);
|
||||
}
|
||||
|
||||
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `done waiting for subscription socket for topic ${this.topic}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,13 +91,12 @@ export class PubSub {
|
|||
});
|
||||
|
||||
this.libp2p.addEventListener("peer:discovery", (event) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail)}`);
|
||||
this.libp2p?.dial(event.detail.multiaddrs);
|
||||
});
|
||||
|
||||
this.libp2p.addEventListener("peer:connect", (event) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
|
||||
// TODO: Subscribe
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail)}`);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -174,6 +175,9 @@ export class PubSub {
|
|||
await pubsub.stop();
|
||||
|
||||
await this.libp2p.stop();
|
||||
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'stopped libp2p');
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,7 @@ import Debug from 'debug';
|
|||
import {EventEmitter} from 'node:events';
|
||||
import {Message, Reply, Request} from 'zeromq';
|
||||
import {RhizomeNode} from './node.js';
|
||||
import {RequestMethods} from './peers.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
import {PeerAddress, RequestMethods} from './peers.js';
|
||||
const debug = Debug('rz:request-reply');
|
||||
|
||||
export type PeerRequest = {
|
||||
|
|
28
src/types.ts
28
src/types.ts
|
@ -18,31 +18,3 @@ export type ViewMany<T> = {
|
|||
[key: DomainEntityID]: T;
|
||||
};
|
||||
|
||||
// TODO: Move to ./peers.ts
|
||||
export class PeerAddress {
|
||||
addr: string;
|
||||
port: number;
|
||||
|
||||
constructor(addr: string, port: number) {
|
||||
this.addr = addr;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
static fromString(addrString: string): PeerAddress {
|
||||
const [addr, port] = addrString.trim().split(':');
|
||||
return new PeerAddress(addr, parseInt(port));
|
||||
}
|
||||
|
||||
toAddrString() {
|
||||
return `${this.addr}:${this.port}`;
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return this.toAddrString();
|
||||
}
|
||||
|
||||
isEqual(other: PeerAddress) {
|
||||
return this.addr === other.addr && this.port === other.port;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue