prefix log messages with peer id

This commit is contained in:
Ladd Hoffman 2024-12-31 12:28:24 -06:00
parent 44e6cfa13c
commit 1f8cbda73f
17 changed files with 102 additions and 81 deletions

View File

@ -1,7 +1,10 @@
import {Delta, DeltaFilter} from '../src/delta.js'; import {Delta, DeltaFilter} from '../src/delta.js';
import {Lossless} from '../src/lossless.js'; import {Lossless} from '../src/lossless.js';
import {RhizomeNode} from '../src/node.js';
describe('Lossless', () => { describe('Lossless', () => {
const node = new RhizomeNode();
it('creates a lossless view of keanu as neo in the matrix', () => { it('creates a lossless view of keanu as neo in the matrix', () => {
const delta = new Delta({ const delta = new Delta({
creator: 'a', creator: 'a',
@ -27,7 +30,7 @@ describe('Lossless', () => {
}] }]
}); });
const lossless = new Lossless(); const lossless = new Lossless(node);
lossless.ingestDelta(delta); lossless.ingestDelta(delta);
@ -84,7 +87,7 @@ describe('Lossless', () => {
}); });
describe('can filter deltas', () => { describe('can filter deltas', () => {
const lossless = new Lossless(); const lossless = new Lossless(node);
beforeAll(() => { beforeAll(() => {
lossless.ingestDelta(new Delta({ lossless.ingestDelta(new Delta({

View File

@ -1,10 +1,12 @@
import {RhizomeNode} from "../src/node.js";
import {Delta, PointerTarget} from "../src/delta.js"; import {Delta, PointerTarget} from "../src/delta.js";
import {Lossless, LosslessViewMany} from "../src/lossless.js"; import {Lossless, LosslessViewMany} from "../src/lossless.js";
import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js"; import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js";
describe('Lossy', () => { describe('Lossy', () => {
describe('se a provided function to resolve entity views', () => { describe('se a provided function to resolve entity views', () => {
const lossless = new Lossless(); const node = new RhizomeNode();
const lossless = new Lossless(node);
const lossy = new Lossy(lossless); const lossy = new Lossy(lossless);
beforeAll(() => { beforeAll(() => {

View File

@ -27,11 +27,11 @@ type User = {
users.rhizomeConnect(rhizomeNode); users.rhizomeConnect(rhizomeNode);
users.onUpdate((u: Entity) => { users.onUpdate((u: Entity) => {
debug('User updated:', u); debug(`[${rhizomeNode.config.peerId}]`, 'User updated:', u);
}); });
users.onCreate((u: Entity) => { users.onCreate((u: Entity) => {
debug('New user!:', u); debug(`[${rhizomeNode.config.peerId}]`, 'New user!:', u);
}); });
await rhizomeNode.start(); await rhizomeNode.start();
@ -64,9 +64,9 @@ type User = {
const expected = JSON.stringify(taliesinData); const expected = JSON.stringify(taliesinData);
if (result === expected) { if (result === expected) {
debug('Put result matches expected: ' + expected); debug(`[${rhizomeNode.config.peerId}]`, 'Put result matches expected: ' + expected);
} else { } else {
debug(`Put result does not match expected.` + debug(`[${rhizomeNode.config.peerId}]`, `Put result does not match expected.` +
`\n\nExpected \n${expected}` + `\n\nExpected \n${expected}` +
`\nReceived\n${result}`); `\nReceived\n${result}`);
} }
@ -87,9 +87,9 @@ type User = {
const expected = JSON.stringify(taliesinData); const expected = JSON.stringify(taliesinData);
if (result === expected) { if (result === expected) {
debug('Get result matches expected: ' + expected); debug(`[${rhizomeNode.config.peerId}]`, 'Get result matches expected: ' + expected);
} else { } else {
debug(`Get result does not match expected.` + debug(`[${rhizomeNode.config.peerId}]`, `Get result does not match expected.` +
`\n\nExpected \n${expected}` + `\n\nExpected \n${expected}` +
`\nReceived\n${result}`); `\nReceived\n${result}`);
} }

View File

@ -20,9 +20,8 @@
"testMatch": [ "testMatch": [
"**/__tests__/**/*" "**/__tests__/**/*"
], ],
"verbose": true,
"transform": { "transform": {
"^.+\\.ts?$": [ "^.+\\.ts$": [
"ts-jest", "ts-jest",
{ {
"useESM": true "useESM": true

View File

@ -45,7 +45,7 @@ export class Collection {
rhizomeNode.httpServer.httpApi.serveCollection(this); rhizomeNode.httpServer.httpApi.serveCollection(this);
debug(`connected ${this.name} to rhizome`); debug(`[${this.rhizomeNode.config.peerId}]`, `connected ${this.name} to rhizome`);
} }
// Applies the javascript rules for updating object values, // Applies the javascript rules for updating object values,

View File

@ -86,7 +86,7 @@ export class DeltaStream {
} }
async publishDelta(delta: Delta) { async publishDelta(delta: Delta) {
debug(`Publishing delta: ${JSON.stringify(delta)}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`);
await this.rhizomeNode.pubSub.publish( await this.rhizomeNode.pubSub.publish(
this.rhizomeNode.config.pubSubTopic, this.rhizomeNode.config.pubSubTopic,
this.serializeDelta(delta) this.serializeDelta(delta)

View File

@ -1,7 +1,7 @@
import express, {Router} from "express"; import express, {Router} from "express";
import {Collection} from "src/collection"; import {Collection} from "../collection.js";
import {Delta} from "src/delta"; import {Delta} from "../delta.js";
import {RhizomeNode} from "src/node"; import {RhizomeNode} from "../node.js";
export class HttpApi { export class HttpApi {
router = Router(); router = Router();

View File

@ -1,11 +1,14 @@
import express, {Router} from "express"; import express, {Router} from "express";
import {RhizomeNode} from "../node.js";
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files.js"; import {htmlDocFromMarkdown, MDFiles} from "../util/md-files.js";
export class HttpHtml { export class HttpHtml {
router = Router(); router = Router();
mdFiles = new MDFiles(); mdFiles: MDFiles;
constructor(readonly rhizomeNode: RhizomeNode) {
this.mdFiles = new MDFiles(this.rhizomeNode);
constructor() {
// Scan and watch for markdown files // Scan and watch for markdown files
this.mdFiles.readDir(); this.mdFiles.readDir();
this.mdFiles.readReadme(); this.mdFiles.readReadme();

View File

@ -13,7 +13,7 @@ export class HttpServer {
server?: Server; server?: Server;
constructor(readonly rhizomeNode: RhizomeNode) { constructor(readonly rhizomeNode: RhizomeNode) {
this.httpHtml = new HttpHtml(); this.httpHtml = new HttpHtml(this.rhizomeNode);
this.httpApi = new HttpApi(this.rhizomeNode); this.httpApi = new HttpApi(this.rhizomeNode);
this.app.use(express.json()); this.app.use(express.json());
@ -28,7 +28,7 @@ export class HttpServer {
host: httpAddr, host: httpAddr,
exclusive: true exclusive: true
}, () => { }, () => {
debug(`HTTP API bound to ${httpAddr}:${httpPort}`); debug(`[${this.rhizomeNode.config.peerId}]`, `HTTP API bound to ${httpAddr}:${httpPort}`);
}); });
} }

View File

@ -6,6 +6,7 @@ import EventEmitter from 'events';
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js'; import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js';
import {Transactions} from './transactions.js'; import {Transactions} from './transactions.js';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js"; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js";
import {RhizomeNode} from './node.js';
const debug = Debug('lossless'); const debug = Debug('lossless');
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
@ -64,13 +65,14 @@ class LosslessEntity {
export class Lossless { export class Lossless {
domainEntities = new LosslessEntityMap(); domainEntities = new LosslessEntityMap();
transactions = new Transactions(); transactions: Transactions;
referencedAs = new Map<string, Set<DomainEntityID>>(); referencedAs = new Map<string, Set<DomainEntityID>>();
eventStream = new EventEmitter(); eventStream = new EventEmitter();
constructor() { constructor(readonly rhizomeNode: RhizomeNode) {
this.transactions = new Transactions(this);
this.transactions.eventStream.on("completed", (transactionId) => { this.transactions.eventStream.on("completed", (transactionId) => {
debug(`completed transaction ${transactionId}`); debug(`[${this.rhizomeNode.config.peerId}]`, `completed transaction ${transactionId}`);
const transaction = this.transactions.get(transactionId); const transaction = this.transactions.get(transactionId);
if (!transaction) return; if (!transaction) return;
for (const id of transaction.entityIds) { for (const id of transaction.entityIds) {
@ -141,7 +143,7 @@ export class Lossless {
if (delta.transactionId) { if (delta.transactionId) {
if (!this.transactions.isComplete(delta.transactionId)) { if (!this.transactions.isComplete(delta.transactionId)) {
// TODO: Test this condition // TODO: Test this condition
debug(`excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`); debug(`[${this.rhizomeNode.config.peerId}]`, `excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`);
continue; continue;
} }
} }

View File

@ -5,11 +5,11 @@
// We can achieve this via functional expression, encoded as JSON-Logic. // We can achieve this via functional expression, encoded as JSON-Logic.
// Fields in the output can be described as transformations // Fields in the output can be described as transformations
import Debug from 'debug'; // import Debug from 'debug';
import {DeltaFilter} from "./delta.js"; import {DeltaFilter} from "./delta.js";
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js"; import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js";
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js"; import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js";
const debug = Debug('lossy'); // const debug = Debug('lossy');
type TimestampedProperty = { type TimestampedProperty = {
value: PropertyTypes, value: PropertyTypes,
@ -74,23 +74,6 @@ export function lastValueFromLosslessViewOne(
return res; return res;
} }
function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
const resolved: ResolvedViewMany = {};
debug('default resolver, lossless view', JSON.stringify(losslessView));
for (const [id, ent] of Object.entries(losslessView)) {
resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
debug(`[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
}
}
return resolved;
};
// TODO: Incremental updates of lossy models. For example, with last-write-wins, // TODO: Incremental updates of lossy models. For example, with last-write-wins,
// we keep the timeUpdated for each field. A second stage resolver can rearrange // we keep the timeUpdated for each field. A second stage resolver can rearrange
// the data structure to a preferred shape and may discard the timeUpdated info. // the data structure to a preferred shape and may discard the timeUpdated info.
@ -108,11 +91,29 @@ export class Lossy {
// TODO: Cache things! // TODO: Cache things!
resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T {
if (!fn) { if (!fn) {
fn = defaultResolver; fn = (v) => this.defaultResolver(v);
} }
const losslessView = this.lossless.view(entityIds, deltaFilter); const losslessView = this.lossless.view(entityIds, deltaFilter);
return fn(losslessView) as T; return fn(losslessView) as T;
} }
defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
const resolved: ResolvedViewMany = {};
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'default resolver, lossless view', JSON.stringify(losslessView));
for (const [id, ent] of Object.entries(losslessView)) {
resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
}
}
return resolved;
};
} }
// Generate a rule // Generate a rule

View File

@ -54,7 +54,7 @@ export class RhizomeNode {
pubSubTopic: PUB_SUB_TOPIC, pubSubTopic: PUB_SUB_TOPIC,
...config ...config
}; };
debug('config', this.config); debug(`[${this.config.peerId}]`, 'config', this.config);
this.myRequestAddr = new PeerAddress( this.myRequestAddr = new PeerAddress(
this.config.requestBindHost, this.config.requestBindHost,
this.config.requestBindPort this.config.requestBindPort
@ -68,7 +68,7 @@ export class RhizomeNode {
this.httpServer = new HttpServer(this); this.httpServer = new HttpServer(this);
this.deltaStream = new DeltaStream(this); this.deltaStream = new DeltaStream(this);
this.peers = new Peers(this); this.peers = new Peers(this);
this.lossless = new Lossless(); this.lossless = new Lossless(this);
} }
async start() { async start() {

View File

@ -31,17 +31,17 @@ class Peer {
async request(method: RequestMethods): Promise<Message> { async request(method: RequestMethods): Promise<Message> {
if (!this.reqSock) { if (!this.reqSock) {
this.reqSock = new RequestSocket(this.reqAddr); this.reqSock = this.rhizomeNode.requestReply.createRequestSocket(this.reqAddr);
} }
return this.reqSock.request(method); return this.reqSock.request(method);
} }
async subscribeDeltas() { async subscribeDeltas() {
if (!this.publishAddr) { if (!this.publishAddr) {
debug(`requesting publish addr from peer ${this.reqAddr.toAddrString()}`); debug(`[${this.rhizomeNode.config.peerId}]`, `requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
const res = await this.request(RequestMethods.GetPublishAddress); const res = await this.request(RequestMethods.GetPublishAddress);
this.publishAddr = PeerAddress.fromString(res.toString()); this.publishAddr = PeerAddress.fromString(res.toString());
debug(`received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`); debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
} }
this.subscription = this.rhizomeNode.pubSub.subscribe( this.subscription = this.rhizomeNode.pubSub.subscribe(
@ -50,7 +50,7 @@ class Peer {
(sender, msg) => { (sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg); const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender; delta.receivedFrom = sender;
debug(`Received delta: ${JSON.stringify(delta)}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta); this.rhizomeNode.deltaStream.ingestDelta(delta);
}); });
@ -81,19 +81,19 @@ export class Peers {
this.addPeer(this.rhizomeNode.myRequestAddr); this.addPeer(this.rhizomeNode.myRequestAddr);
this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
debug('inspecting peer request'); debug(`[${this.rhizomeNode.config.peerId}]`, 'inspecting peer request');
switch (req.method) { switch (req.method) {
case RequestMethods.GetPublishAddress: { case RequestMethods.GetPublishAddress: {
debug('it\'s a request for our publish address'); debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for our publish address');
await res.send(this.rhizomeNode.myPublishAddr.toAddrString()); await res.send(this.rhizomeNode.myPublishAddr.toAddrString());
break; break;
} }
case RequestMethods.AskForDeltas: { case RequestMethods.AskForDeltas: {
debug('it\'s a request for deltas'); debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for deltas');
// TODO: stream these rather than // TODO: stream these rather than
// trying to write them all in one message // trying to write them all in one message
const deltas = this.rhizomeNode.deltaStream.deltasAccepted; const deltas = this.rhizomeNode.deltaStream.deltasAccepted;
debug(`sending ${deltas.length} deltas`); debug(`[${this.rhizomeNode.config.peerId}]`, `sending ${deltas.length} deltas`);
await res.send(JSON.stringify(deltas)); await res.send(JSON.stringify(deltas));
break; break;
} }
@ -107,7 +107,7 @@ export class Peers {
(sender, msg) => { (sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg); const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender; delta.receivedFrom = sender;
debug(`Received delta: ${JSON.stringify(delta)}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta); this.rhizomeNode.deltaStream.ingestDelta(delta);
} }
); );
@ -116,13 +116,13 @@ export class Peers {
addPeer(addr: PeerAddress): Peer { addPeer(addr: PeerAddress): Peer {
const peer = new Peer(this.rhizomeNode, addr); const peer = new Peer(this.rhizomeNode, addr);
this.peers.push(peer); this.peers.push(peer);
debug('added peer', addr); debug(`[${this.rhizomeNode.config.peerId}]`, 'added peer', addr);
return peer; return peer;
} }
async subscribeToSeeds() { async subscribeToSeeds() {
SEED_PEERS.forEach(async (addr, idx) => { SEED_PEERS.forEach(async (addr, idx) => {
debug(`SEED PEERS[${idx}]=${addr.toAddrString()}`); debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`);
const peer = this.addPeer(addr); const peer = this.addPeer(addr);
await peer.subscribeDeltas(); await peer.subscribeDeltas();
}); });
@ -132,9 +132,9 @@ export class Peers {
async askAllPeersForDeltas() { async askAllPeersForDeltas() {
this.peers.filter(({isSelf}) => !isSelf) this.peers.filter(({isSelf}) => !isSelf)
.forEach(async (peer, idx) => { .forEach(async (peer, idx) => {
debug(`Asking peer ${idx} for deltas`); debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
const deltas = await peer.askForDeltas(); const deltas = await peer.askForDeltas();
debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
for (const delta of deltas) { for (const delta of deltas) {
delta.receivedFrom = peer.reqAddr; delta.receivedFrom = peer.reqAddr;
this.rhizomeNode.deltaStream.receiveDelta(delta); this.rhizomeNode.deltaStream.receiveDelta(delta);

View File

@ -24,6 +24,7 @@ export class Subscription {
libp2p?: Libp2p; libp2p?: Libp2p;
constructor( constructor(
readonly pubSub: PubSub,
publishAddr: PeerAddress, publishAddr: PeerAddress,
topic: string, topic: string,
cb: SubscribedMessageHandler, cb: SubscribedMessageHandler,
@ -39,14 +40,14 @@ export class Subscription {
async start() { async start() {
this.sock.connect(this.publishAddrStr); this.sock.connect(this.publishAddrStr);
this.sock.subscribe(this.topic); this.sock.subscribe(this.topic);
debug(`Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`); debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`);
// Wait for ZeroMQ messages. // Wait for ZeroMQ messages.
// This will block indefinitely. // This will block indefinitely.
for await (const [, sender, msg] of this.sock) { for await (const [, sender, msg] of this.sock) {
const senderStr = PeerAddress.fromString(sender.toString()); const senderStr = PeerAddress.fromString(sender.toString());
const msgStr = msg.toString(); const msgStr = msg.toString();
debug(`ZeroMQ subscribtion received msg: ${msgStr}`); debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
this.cb(senderStr, msgStr); this.cb(senderStr, msgStr);
} }
} }
@ -69,7 +70,7 @@ export class PubSub {
async start() { async start() {
await this.publishSock.bind(this.publishAddrStr); await this.publishSock.bind(this.publishAddrStr);
debug(`ZeroMQ publishing socket bound to ${this.publishAddrStr}`); debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
this.libp2p = await createLibp2p({ this.libp2p = await createLibp2p({
addresses: { addresses: {
@ -88,18 +89,18 @@ export class PubSub {
}); });
this.libp2p.addEventListener("peer:discovery", (event) => { this.libp2p.addEventListener("peer:discovery", (event) => {
debug(`found peer: ${JSON.stringify(event.detail, null, 2)}`); debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`);
this.libp2p?.dial(event.detail.multiaddrs); this.libp2p?.dial(event.detail.multiaddrs);
}); });
this.libp2p.addEventListener("peer:connect", (event) => { this.libp2p.addEventListener("peer:connect", (event) => {
debug(`connected to peer: ${JSON.stringify(event.detail, null, 2)}`); debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
// TODO: Subscribe // TODO: Subscribe
}); });
} }
async publish(topic: string, msg: string) { async publish(topic: string, msg: string) {
debug(`publishing to ZeroMQ, msg: ${msg}`); debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to ZeroMQ, msg: ${msg}`);
await this.publishSock.send([ await this.publishSock.send([
topic, topic,
this.rhizomeNode.myRequestAddr.toAddrString(), this.rhizomeNode.myRequestAddr.toAddrString(),
@ -108,11 +109,11 @@ export class PubSub {
if (this.libp2p) { if (this.libp2p) {
const pubsub = this.libp2p.services.pubsub as GossipSub; const pubsub = this.libp2p.services.pubsub as GossipSub;
debug(`publishing to Libp2p, msg: ${msg}`); debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to Libp2p, msg: ${msg}`);
try { try {
await pubsub.publish(topic, Buffer.from(msg)); await pubsub.publish(topic, Buffer.from(msg));
} catch (e: unknown) { } catch (e: unknown) {
debug('Libp2p publish:', (e as Error).message); debug(`[${this.rhizomeNode.config.peerId}]`, 'Libp2p publish:', (e as Error).message);
} }
} }
} }
@ -126,7 +127,7 @@ export class PubSub {
// TODO: If we subscribe to multiple topics this callback will be duplicated // TODO: If we subscribe to multiple topics this callback will be duplicated
pubsub.addEventListener("message", (event) => { pubsub.addEventListener("message", (event) => {
const msg = Buffer.from(event.detail.data).toString(); const msg = Buffer.from(event.detail.data).toString();
debug(`Libp2p subscribtion received msg: ${msg}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Libp2p subscribtion received msg: ${msg}`);
cb(new PeerAddress('libp2p', 0), msg); cb(new PeerAddress('libp2p', 0), msg);
}); });
@ -135,7 +136,7 @@ export class PubSub {
if (!this.subscribedTopics.has(topic)) { if (!this.subscribedTopics.has(topic)) {
pubsub.subscribe(topic); pubsub.subscribe(topic);
this.subscribedTopics.add(topic); this.subscribedTopics.add(topic);
debug('subscribed topics:', Array.from(this.subscribedTopics.keys())); debug(`[${this.rhizomeNode.config.peerId}]`, 'subscribed topics:', Array.from(this.subscribedTopics.keys()));
} }
} }
@ -144,7 +145,7 @@ export class PubSub {
topic: string, topic: string,
cb: SubscribedMessageHandler cb: SubscribedMessageHandler
): Subscription { ): Subscription {
const subscription = new Subscription(publishAddr, topic, cb, this.libp2p); const subscription = new Subscription(this, publishAddr, topic, cb, this.libp2p);
this.subscriptions.push(subscription); this.subscriptions.push(subscription);
return subscription; return subscription;
} }
@ -164,11 +165,11 @@ export class PubSub {
pubsub.removeEventListener("message"); pubsub.removeEventListener("message");
for (const topic of this.subscribedTopics) { for (const topic of this.subscribedTopics) {
debug(`unsubscribing Libp2p topic ${topic}`); debug(`[${this.rhizomeNode.config.peerId}]`, `unsubscribing Libp2p topic ${topic}`);
pubsub.unsubscribe(topic) pubsub.unsubscribe(topic)
} }
debug('stopping gossipsub'); debug(`[${this.rhizomeNode.config.peerId}]`, 'stopping gossipsub');
await pubsub.stop(); await pubsub.stop();

View File

@ -16,10 +16,10 @@ export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
export class RequestSocket { export class RequestSocket {
sock = new Request(); sock = new Request();
constructor(addr: PeerAddress) { constructor(readonly requestReply: RequestReply, addr: PeerAddress) {
const addrStr = `tcp://${addr.addr}:${addr.port}`; const addrStr = `tcp://${addr.addr}:${addr.port}`;
this.sock.connect(addrStr); this.sock.connect(addrStr);
debug(`Request socket connecting to ${addrStr}`); debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${addrStr}`);
} }
async request(method: RequestMethods): Promise<Message> { async request(method: RequestMethods): Promise<Message> {
@ -55,7 +55,7 @@ function peerRequestFromMsg(msg: Message): PeerRequest | null {
const obj = JSON.parse(msg.toString()); const obj = JSON.parse(msg.toString());
req = {...obj}; req = {...obj};
} catch (e) { } catch (e) {
debug('error receiving command', e); console.error('error receiving command', e);
} }
return req; return req;
} }
@ -76,10 +76,10 @@ export class RequestReply {
async start() { async start() {
await this.replySock.bind(this.requestBindAddrStr); await this.replySock.bind(this.requestBindAddrStr);
debug(`Reply socket bound to ${this.requestBindAddrStr}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`);
for await (const [msg] of this.replySock) { for await (const [msg] of this.replySock) {
debug(`Received message`, {msg: msg.toString()}); debug(`[${this.rhizomeNode.config.peerId}]`, `Received message`, {msg: msg.toString()});
const req = peerRequestFromMsg(msg); const req = peerRequestFromMsg(msg);
this.requestStream.emit('request', req); this.requestStream.emit('request', req);
} }
@ -94,6 +94,10 @@ export class RequestReply {
}); });
} }
createRequestSocket(addr: PeerAddress) {
return new RequestSocket(this, addr);
}
async stop() { async stop() {
await this.replySock.unbind(this.requestBindAddrStr); await this.replySock.unbind(this.requestBindAddrStr);
this.replySock.close(); this.replySock.close();

View File

@ -2,6 +2,7 @@ import Debug from "debug";
import EventEmitter from "events"; import EventEmitter from "events";
import {Delta, DeltaID} from "./delta.js"; import {Delta, DeltaID} from "./delta.js";
import {DomainEntityID, TransactionID} from "./types.js"; import {DomainEntityID, TransactionID} from "./types.js";
import {Lossless} from "./lossless.js";
const debug = Debug("transactions"); const debug = Debug("transactions");
function getDeltaTransactionId(delta: Delta): TransactionID | undefined { function getDeltaTransactionId(delta: Delta): TransactionID | undefined {
@ -58,6 +59,8 @@ export class Transactions {
transactions = new Map<TransactionID, Transaction>(); transactions = new Map<TransactionID, Transaction>();
eventStream = new EventEmitter(); eventStream = new EventEmitter();
constructor(readonly lossless: Lossless) {}
get(id: TransactionID): Transaction | undefined { get(id: TransactionID): Transaction | undefined {
return this.transactions.get(id); return this.transactions.get(id);
} }
@ -101,7 +104,7 @@ export class Transactions {
if (transactionId && size) { if (transactionId && size) {
// This delta describes a transaction // This delta describes a transaction
debug(`transaction ${transactionId} has size ${size}`); debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `transaction ${transactionId} has size ${size}`);
this.setSize(transactionId, size as number); this.setSize(transactionId, size as number);

View File

@ -2,6 +2,7 @@ import Debug from "debug";
import {FSWatcher, readdirSync, readFileSync, watch} from "fs"; import {FSWatcher, readdirSync, readFileSync, watch} from "fs";
import path, {join} from "path"; import path, {join} from "path";
import showdown from "showdown"; import showdown from "showdown";
import {RhizomeNode} from "../node.js";
const {Converter} = showdown; const {Converter} = showdown;
const debug = Debug('md-files'); const debug = Debug('md-files');
@ -30,6 +31,8 @@ export class MDFiles {
readmeWatcher?: FSWatcher; readmeWatcher?: FSWatcher;
latestIndexHtml?: Html; latestIndexHtml?: Html;
constructor(readonly rhizomeNode: RhizomeNode) {}
readFile(name: string) { readFile(name: string) {
const md = readFileSync(join('./markdown', `${name}.md`)).toString(); const md = readFileSync(join('./markdown', `${name}.md`)).toString();
let m = ""; let m = "";
@ -96,14 +99,14 @@ export class MDFiles {
switch (eventType) { switch (eventType) {
case 'rename': { case 'rename': {
debug(`file ${name} renamed`); debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} renamed`);
// Remove it from memory and re-scan everything // Remove it from memory and re-scan everything
this.files.delete(name); this.files.delete(name);
this.readDir(); this.readDir();
break; break;
} }
case 'change': { case 'change': {
debug(`file ${name} changed`); debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} changed`);
// Re-read this file // Re-read this file
this.readFile(name) this.readFile(name)
break; break;
@ -118,7 +121,7 @@ export class MDFiles {
switch (eventType) { switch (eventType) {
case 'change': { case 'change': {
debug(`README file changed`); debug(`[${this.rhizomeNode.config.peerId}]`, `README file changed`);
// Re-read this file // Re-read this file
this.readReadme() this.readReadme()
break; break;