Compare commits
2 Commits
940c3212be
...
1f8cbda73f
Author | SHA1 | Date |
---|---|---|
Ladd Hoffman | 1f8cbda73f | |
Ladd Hoffman | 44e6cfa13c |
|
@ -1,7 +1,10 @@
|
|||
import {Delta, DeltaFilter} from '../src/delta';
|
||||
import {Lossless} from '../src/lossless';
|
||||
import {Delta, DeltaFilter} from '../src/delta.js';
|
||||
import {Lossless} from '../src/lossless.js';
|
||||
import {RhizomeNode} from '../src/node.js';
|
||||
|
||||
describe('Lossless', () => {
|
||||
const node = new RhizomeNode();
|
||||
|
||||
it('creates a lossless view of keanu as neo in the matrix', () => {
|
||||
const delta = new Delta({
|
||||
creator: 'a',
|
||||
|
@ -27,7 +30,7 @@ describe('Lossless', () => {
|
|||
}]
|
||||
});
|
||||
|
||||
const lossless = new Lossless();
|
||||
const lossless = new Lossless(node);
|
||||
|
||||
lossless.ingestDelta(delta);
|
||||
|
||||
|
@ -84,7 +87,7 @@ describe('Lossless', () => {
|
|||
});
|
||||
|
||||
describe('can filter deltas', () => {
|
||||
const lossless = new Lossless();
|
||||
const lossless = new Lossless(node);
|
||||
|
||||
beforeAll(() => {
|
||||
lossless.ingestDelta(new Delta({
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
import {Delta, PointerTarget} from "../src/delta";
|
||||
import {Lossless, LosslessViewMany} from "../src/lossless";
|
||||
import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta, ResolvedViewMany} from "../src/lossy";
|
||||
import {RhizomeNode} from "../src/node.js";
|
||||
import {Delta, PointerTarget} from "../src/delta.js";
|
||||
import {Lossless, LosslessViewMany} from "../src/lossless.js";
|
||||
import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js";
|
||||
|
||||
describe('Lossy', () => {
|
||||
describe('se a provided function to resolve entity views', () => {
|
||||
const lossless = new Lossless();
|
||||
const node = new RhizomeNode();
|
||||
const lossless = new Lossless(node);
|
||||
const lossy = new Lossy(lossless);
|
||||
|
||||
beforeAll(() => {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import {PeerAddress} from '../src/types';
|
||||
import {PeerAddress} from '../src/types.js';
|
||||
|
||||
describe('PeerAddress', () => {
|
||||
it('toString()', () => {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import {App} from "../../util/app";
|
||||
import {App} from "../../util/app.js";
|
||||
|
||||
describe('Run', () => {
|
||||
let app: App;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import Debug from 'debug';
|
||||
import {App} from '../../util/app';
|
||||
import {App} from '../../util/app.js';
|
||||
const debug = Debug('test:two');
|
||||
|
||||
describe('Run', () => {
|
||||
|
@ -13,6 +13,8 @@ describe('Run', () => {
|
|||
apps[1] = new App({
|
||||
httpEnable: true,
|
||||
peerId: 'app1',
|
||||
// Make the apps use the same pubsub topic so they can talk to each other
|
||||
pubSubTopic: apps[0].config.pubSubTopic,
|
||||
});
|
||||
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
|
||||
apps[1].config.seedPeers.push(apps[0].myRequestAddr);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import Debug from 'debug';
|
||||
import {Collection} from "../src/collection";
|
||||
import {Entity} from "../src/entity";
|
||||
import {RhizomeNode} from "../src/node";
|
||||
import {Collection} from "../src/collection.js";
|
||||
import {Entity} from "../src/entity.js";
|
||||
import {RhizomeNode} from "../src/node.js";
|
||||
const debug = Debug('example-app');
|
||||
|
||||
// As an app we want to be able to write and read data.
|
||||
|
@ -27,11 +27,11 @@ type User = {
|
|||
users.rhizomeConnect(rhizomeNode);
|
||||
|
||||
users.onUpdate((u: Entity) => {
|
||||
debug('User updated:', u);
|
||||
debug(`[${rhizomeNode.config.peerId}]`, 'User updated:', u);
|
||||
});
|
||||
|
||||
users.onCreate((u: Entity) => {
|
||||
debug('New user!:', u);
|
||||
debug(`[${rhizomeNode.config.peerId}]`, 'New user!:', u);
|
||||
});
|
||||
|
||||
await rhizomeNode.start();
|
||||
|
@ -64,9 +64,9 @@ type User = {
|
|||
const expected = JSON.stringify(taliesinData);
|
||||
|
||||
if (result === expected) {
|
||||
debug('Put result matches expected: ' + expected);
|
||||
debug(`[${rhizomeNode.config.peerId}]`, 'Put result matches expected: ' + expected);
|
||||
} else {
|
||||
debug(`Put result does not match expected.` +
|
||||
debug(`[${rhizomeNode.config.peerId}]`, `Put result does not match expected.` +
|
||||
`\n\nExpected \n${expected}` +
|
||||
`\nReceived\n${result}`);
|
||||
}
|
||||
|
@ -87,9 +87,9 @@ type User = {
|
|||
const expected = JSON.stringify(taliesinData);
|
||||
|
||||
if (result === expected) {
|
||||
debug('Get result matches expected: ' + expected);
|
||||
debug(`[${rhizomeNode.config.peerId}]`, 'Get result matches expected: ' + expected);
|
||||
} else {
|
||||
debug(`Get result does not match expected.` +
|
||||
debug(`[${rhizomeNode.config.peerId}]`, `Get result does not match expected.` +
|
||||
`\n\nExpected \n${expected}` +
|
||||
`\nReceived\n${result}`);
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
36
package.json
36
package.json
|
@ -2,11 +2,12 @@
|
|||
"name": "rhizome-node",
|
||||
"version": "0.1.0",
|
||||
"description": "Rhizomatic database engine node",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"build:watch": "tsc --watch",
|
||||
"lint": "eslint",
|
||||
"test": "jest",
|
||||
"test": "node --experimental-vm-modules node_modules/.bin/jest",
|
||||
"coverage": "./scripts/coverage.sh",
|
||||
"example-app": "node dist/examples/app.js"
|
||||
},
|
||||
|
@ -18,15 +19,37 @@
|
|||
],
|
||||
"testMatch": [
|
||||
"**/__tests__/**/*"
|
||||
]
|
||||
],
|
||||
"transform": {
|
||||
"^.+\\.ts$": [
|
||||
"ts-jest",
|
||||
{
|
||||
"useESM": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"extensionsToTreatAsEsm": [
|
||||
".ts"
|
||||
],
|
||||
"moduleNameMapper": {
|
||||
"^(\\.{1,2}/.*)\\.js$": "$1"
|
||||
}
|
||||
},
|
||||
"author": "Taliesin (Ladd) <ladd@dgov.io>",
|
||||
"license": "Unlicense",
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-gossipsub": "^14.1.0",
|
||||
"@chainsafe/libp2p-noise": "^16.0.0",
|
||||
"@chainsafe/libp2p-yamux": "^7.0.1",
|
||||
"@libp2p/identify": "^3.0.14",
|
||||
"@libp2p/mdns": "^11.0.16",
|
||||
"@libp2p/ping": "^2.0.14",
|
||||
"@libp2p/tcp": "^10.0.14",
|
||||
"debug": "^4.4.0",
|
||||
"express": "^4.21.2",
|
||||
"json-logic-js": "^2.0.5",
|
||||
"level": "^9.0.0",
|
||||
"libp2p": "^2.4.2",
|
||||
"microtime": "^3.1.1",
|
||||
"object-hash": "^3.0.0",
|
||||
"showdown": "^2.1.0",
|
||||
|
@ -34,20 +57,21 @@
|
|||
"zeromq": "^6.1.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/js": "^9.17.0",
|
||||
"@types/bluebird": "^3.5.42",
|
||||
"@types/debug": "^4.1.12",
|
||||
"@types/json-logic-js": "^2.0.8",
|
||||
"@types/microtime": "^2.1.2",
|
||||
"@types/object-hash": "^3.0.6",
|
||||
"@eslint/js": "^9.17.0",
|
||||
"@types/express": "^5.0.0",
|
||||
"@types/jest": "^29.5.14",
|
||||
"@types/json-logic-js": "^2.0.8",
|
||||
"@types/microtime": "^2.1.2",
|
||||
"@types/node": "^22.10.2",
|
||||
"@types/object-hash": "^3.0.6",
|
||||
"@types/showdown": "^2.0.6",
|
||||
"eslint": "^9.17.0",
|
||||
"eslint-config-airbnb-base-typescript": "^1.1.0",
|
||||
"jest": "^29.7.0",
|
||||
"ts-jest": "^29.2.5",
|
||||
"tsc-alias": "^1.8.10",
|
||||
"typescript": "^5.7.2",
|
||||
"typescript-eslint": "^8.18.0"
|
||||
}
|
||||
|
|
|
@ -6,11 +6,11 @@
|
|||
import Debug from 'debug';
|
||||
import {randomUUID} from "node:crypto";
|
||||
import EventEmitter from "node:events";
|
||||
import {Delta, DeltaFilter} from "./delta";
|
||||
import {Entity, EntityProperties} from "./entity";
|
||||
import {Lossy, ResolvedViewOne, Resolver} from "./lossy";
|
||||
import {RhizomeNode} from "./node";
|
||||
import {DomainEntityID} from "./types";
|
||||
import {Delta, DeltaFilter} from "./delta.js";
|
||||
import {Entity, EntityProperties} from "./entity.js";
|
||||
import {Lossy, ResolvedViewOne, Resolver} from "./lossy.js";
|
||||
import {RhizomeNode} from "./node.js";
|
||||
import {DomainEntityID} from "./types.js";
|
||||
const debug = Debug('collection');
|
||||
|
||||
export class Collection {
|
||||
|
@ -45,7 +45,7 @@ export class Collection {
|
|||
|
||||
rhizomeNode.httpServer.httpApi.serveCollection(this);
|
||||
|
||||
debug(`connected ${this.name} to rhizome`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `connected ${this.name} to rhizome`);
|
||||
}
|
||||
|
||||
// Applies the javascript rules for updating object values,
|
||||
|
@ -214,9 +214,10 @@ export class Collection {
|
|||
resolver?: Resolver,
|
||||
deltaFilter?: DeltaFilter
|
||||
): T | undefined {
|
||||
if (!this.rhizomeNode) return undefined;
|
||||
if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
|
||||
if (!this.lossy) throw new Error('lossy view not initialized');
|
||||
|
||||
const res = this.lossy?.resolve(resolver, [id], deltaFilter) || {};
|
||||
const res = this.lossy.resolve(resolver, [id], deltaFilter) || {};
|
||||
|
||||
return res[id] as T;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import {randomUUID} from "crypto";
|
||||
import {PeerAddress} from "./types";
|
||||
import {PeerAddress} from "./types.js";
|
||||
|
||||
// _HOST refers to the address from an external perspective
|
||||
// _ADDR refers to the interface address from the service's perspective
|
||||
|
@ -20,3 +20,5 @@ export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
|
|||
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
|
||||
.filter(x => !!x)
|
||||
.map((peer: string) => PeerAddress.fromString(peer));
|
||||
|
||||
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import {randomUUID} from "crypto";
|
||||
import microtime from 'microtime';
|
||||
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types";
|
||||
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js";
|
||||
|
||||
export type DeltaID = string;
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import Debug from 'debug';
|
||||
import EventEmitter from 'node:events';
|
||||
import objectHash from 'object-hash';
|
||||
import {Delta, DeltaNetworkImage} from './delta';
|
||||
import {RhizomeNode} from './node';
|
||||
import {Delta, DeltaNetworkImage} from './delta.js';
|
||||
import {RhizomeNode} from './node.js';
|
||||
const debug = Debug('deltas');
|
||||
|
||||
enum Decision {
|
||||
|
@ -86,8 +86,11 @@ export class DeltaStream {
|
|||
}
|
||||
|
||||
async publishDelta(delta: Delta) {
|
||||
debug(`Publishing delta: ${JSON.stringify(delta)}`);
|
||||
await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta));
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`);
|
||||
await this.rhizomeNode.pubSub.publish(
|
||||
this.rhizomeNode.config.pubSubTopic,
|
||||
this.serializeDelta(delta)
|
||||
);
|
||||
}
|
||||
|
||||
serializeDelta(delta: Delta): string {
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
// - As typescript interfaces?
|
||||
// - As typescript classes?
|
||||
|
||||
import {Collection} from "./collection";
|
||||
import {PropertyTypes} from "./types";
|
||||
import {Collection} from "./collection.js";
|
||||
import {PropertyTypes} from "./types.js";
|
||||
|
||||
export type EntityProperties = {
|
||||
[key: string]: PropertyTypes;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { add_operation, apply } from 'json-logic-js';
|
||||
import { Delta } from '../delta';
|
||||
import { Delta } from '../delta.js';
|
||||
|
||||
type DeltaContext = Delta & {
|
||||
creatorAddress: string;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { FilterExpr } from "../types";
|
||||
import { FilterExpr } from "../types.js";
|
||||
// import { map } from 'radash';
|
||||
|
||||
// A creator as seen by a host
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import express, {Router} from "express";
|
||||
import {Collection} from "src/collection";
|
||||
import {Delta} from "src/delta";
|
||||
import {RhizomeNode} from "src/node";
|
||||
import {Collection} from "../collection.js";
|
||||
import {Delta} from "../delta.js";
|
||||
import {RhizomeNode} from "../node.js";
|
||||
|
||||
export class HttpApi {
|
||||
router = Router();
|
||||
|
@ -30,24 +30,25 @@ export class HttpApi {
|
|||
|
||||
// Get the list of peers seen by this node (including itself)
|
||||
this.router.get("/peers", (_req: express.Request, res: express.Response) => {
|
||||
res.json(this.rhizomeNode.peers.peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
|
||||
const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted
|
||||
.filter((delta: Delta) => {
|
||||
return delta.receivedFrom?.addr == reqAddr.addr &&
|
||||
delta.receivedFrom?.port == reqAddr.port;
|
||||
})
|
||||
.length;
|
||||
const peerInfo = {
|
||||
reqAddr: reqAddr.toAddrString(),
|
||||
publishAddr: publishAddr?.toAddrString(),
|
||||
isSelf,
|
||||
isSeedPeer,
|
||||
deltaCount: {
|
||||
accepted: deltasAcceptedCount
|
||||
}
|
||||
};
|
||||
return peerInfo;
|
||||
}));
|
||||
res.json(this.rhizomeNode.peers.peers.map(
|
||||
({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
|
||||
const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted
|
||||
.filter((delta: Delta) => {
|
||||
return delta.receivedFrom?.addr == reqAddr.addr &&
|
||||
delta.receivedFrom?.port == reqAddr.port;
|
||||
})
|
||||
.length;
|
||||
const peerInfo = {
|
||||
reqAddr: reqAddr.toAddrString(),
|
||||
publishAddr: publishAddr?.toAddrString(),
|
||||
isSelf,
|
||||
isSeedPeer,
|
||||
deltaCount: {
|
||||
accepted: deltasAcceptedCount
|
||||
}
|
||||
};
|
||||
return peerInfo;
|
||||
}));
|
||||
});
|
||||
|
||||
// Get the number of peers seen by this node (including itself)
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
import express, {Router} from "express";
|
||||
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files";
|
||||
import {RhizomeNode} from "../node.js";
|
||||
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files.js";
|
||||
|
||||
export class HttpHtml {
|
||||
router = Router();
|
||||
mdFiles = new MDFiles();
|
||||
mdFiles: MDFiles;
|
||||
|
||||
constructor(readonly rhizomeNode: RhizomeNode) {
|
||||
this.mdFiles = new MDFiles(this.rhizomeNode);
|
||||
|
||||
constructor() {
|
||||
// Scan and watch for markdown files
|
||||
this.mdFiles.readDir();
|
||||
this.mdFiles.readReadme();
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import Debug from "debug";
|
||||
import express from "express";
|
||||
import {Server} from "http";
|
||||
import {RhizomeNode} from "../node";
|
||||
import {HttpApi} from "./api";
|
||||
import {HttpHtml} from "./html";
|
||||
import {RhizomeNode} from "../node.js";
|
||||
import {HttpApi} from "./api.js";
|
||||
import {HttpHtml} from "./html.js";
|
||||
const debug = Debug('http-api');
|
||||
|
||||
export class HttpServer {
|
||||
|
@ -13,7 +13,7 @@ export class HttpServer {
|
|||
server?: Server;
|
||||
|
||||
constructor(readonly rhizomeNode: RhizomeNode) {
|
||||
this.httpHtml = new HttpHtml();
|
||||
this.httpHtml = new HttpHtml(this.rhizomeNode);
|
||||
this.httpApi = new HttpApi(this.rhizomeNode);
|
||||
|
||||
this.app.use(express.json());
|
||||
|
@ -28,7 +28,7 @@ export class HttpServer {
|
|||
host: httpAddr,
|
||||
exclusive: true
|
||||
}, () => {
|
||||
debug(`HTTP API bound to ${httpAddr}:${httpPort}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `HTTP API bound to ${httpAddr}:${httpPort}`);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -3,9 +3,10 @@
|
|||
|
||||
import Debug from 'debug';
|
||||
import EventEmitter from 'events';
|
||||
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta';
|
||||
import {Transactions} from './transactions';
|
||||
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types";
|
||||
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js';
|
||||
import {Transactions} from './transactions.js';
|
||||
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js";
|
||||
import {RhizomeNode} from './node.js';
|
||||
const debug = Debug('lossless');
|
||||
|
||||
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
|
||||
|
@ -64,13 +65,14 @@ class LosslessEntity {
|
|||
|
||||
export class Lossless {
|
||||
domainEntities = new LosslessEntityMap();
|
||||
transactions = new Transactions();
|
||||
transactions: Transactions;
|
||||
referencedAs = new Map<string, Set<DomainEntityID>>();
|
||||
eventStream = new EventEmitter();
|
||||
|
||||
constructor() {
|
||||
constructor(readonly rhizomeNode: RhizomeNode) {
|
||||
this.transactions = new Transactions(this);
|
||||
this.transactions.eventStream.on("completed", (transactionId) => {
|
||||
debug(`completed transaction ${transactionId}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `completed transaction ${transactionId}`);
|
||||
const transaction = this.transactions.get(transactionId);
|
||||
if (!transaction) return;
|
||||
for (const id of transaction.entityIds) {
|
||||
|
@ -141,6 +143,7 @@ export class Lossless {
|
|||
if (delta.transactionId) {
|
||||
if (!this.transactions.isComplete(delta.transactionId)) {
|
||||
// TODO: Test this condition
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `excluding delta ${delta.id} because transaction ${delta.transactionId} is not completed`);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
47
src/lossy.ts
47
src/lossy.ts
|
@ -5,11 +5,11 @@
|
|||
// We can achieve this via functional expression, encoded as JSON-Logic.
|
||||
// Fields in the output can be described as transformations
|
||||
|
||||
import Debug from 'debug';
|
||||
import {DeltaFilter} from "./delta";
|
||||
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless";
|
||||
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types";
|
||||
const debug = Debug('lossy');
|
||||
// import Debug from 'debug';
|
||||
import {DeltaFilter} from "./delta.js";
|
||||
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js";
|
||||
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js";
|
||||
// const debug = Debug('lossy');
|
||||
|
||||
type TimestampedProperty = {
|
||||
value: PropertyTypes,
|
||||
|
@ -74,23 +74,6 @@ export function lastValueFromLosslessViewOne(
|
|||
return res;
|
||||
}
|
||||
|
||||
function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
|
||||
const resolved: ResolvedViewMany = {};
|
||||
|
||||
// debug('default resolver, lossless view', JSON.stringify(losslessView));
|
||||
for (const [id, ent] of Object.entries(losslessView)) {
|
||||
resolved[id] = {id, properties: {}};
|
||||
|
||||
for (const key of Object.keys(ent.properties)) {
|
||||
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
|
||||
|
||||
// debug(`[ ${key} ] = ${value}`);
|
||||
resolved[id].properties[key] = value;
|
||||
}
|
||||
}
|
||||
return resolved;
|
||||
};
|
||||
|
||||
// TODO: Incremental updates of lossy models. For example, with last-write-wins,
|
||||
// we keep the timeUpdated for each field. A second stage resolver can rearrange
|
||||
// the data structure to a preferred shape and may discard the timeUpdated info.
|
||||
|
@ -108,11 +91,29 @@ export class Lossy {
|
|||
// TODO: Cache things!
|
||||
resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T {
|
||||
if (!fn) {
|
||||
fn = defaultResolver;
|
||||
fn = (v) => this.defaultResolver(v);
|
||||
}
|
||||
const losslessView = this.lossless.view(entityIds, deltaFilter);
|
||||
return fn(losslessView) as T;
|
||||
}
|
||||
|
||||
defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
|
||||
const resolved: ResolvedViewMany = {};
|
||||
|
||||
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'default resolver, lossless view', JSON.stringify(losslessView));
|
||||
for (const [id, ent] of Object.entries(losslessView)) {
|
||||
resolved[id] = {id, properties: {}};
|
||||
|
||||
for (const key of Object.keys(ent.properties)) {
|
||||
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
|
||||
|
||||
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `[ ${key} ] = ${value}`);
|
||||
resolved[id].properties[key] = value;
|
||||
}
|
||||
}
|
||||
return resolved;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Generate a rule
|
||||
|
|
27
src/node.ts
27
src/node.ts
|
@ -1,12 +1,12 @@
|
|||
import Debug from 'debug';
|
||||
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config';
|
||||
import {DeltaStream} from './deltas';
|
||||
import {HttpServer} from './http';
|
||||
import {Lossless} from './lossless';
|
||||
import {Peers} from './peers';
|
||||
import {PubSub} from './pub-sub';
|
||||
import {RequestReply} from './request-reply';
|
||||
import {PeerAddress} from './types';
|
||||
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB_TOPIC, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config.js';
|
||||
import {DeltaStream} from './deltas.js';
|
||||
import {HttpServer} from './http/index.js';
|
||||
import {Lossless} from './lossless.js';
|
||||
import {Peers} from './peers.js';
|
||||
import {PubSub} from './pub-sub.js';
|
||||
import {RequestReply} from './request-reply.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
const debug = Debug('rhizome-node');
|
||||
|
||||
export type RhizomeNodeConfig = {
|
||||
|
@ -22,6 +22,7 @@ export type RhizomeNodeConfig = {
|
|||
seedPeers: PeerAddress[];
|
||||
peerId: string;
|
||||
creator: string; // TODO each host should be able to support multiple users
|
||||
pubSubTopic: string;
|
||||
};
|
||||
|
||||
// So that we can run more than one instance in the same process (for testing)
|
||||
|
@ -50,9 +51,10 @@ export class RhizomeNode {
|
|||
seedPeers: SEED_PEERS,
|
||||
peerId: PEER_ID,
|
||||
creator: CREATOR,
|
||||
pubSubTopic: PUB_SUB_TOPIC,
|
||||
...config
|
||||
};
|
||||
debug('config', this.config);
|
||||
debug(`[${this.config.peerId}]`, 'config', this.config);
|
||||
this.myRequestAddr = new PeerAddress(
|
||||
this.config.requestBindHost,
|
||||
this.config.requestBindPort
|
||||
|
@ -66,7 +68,7 @@ export class RhizomeNode {
|
|||
this.httpServer = new HttpServer(this);
|
||||
this.deltaStream = new DeltaStream(this);
|
||||
this.peers = new Peers(this);
|
||||
this.lossless = new Lossless();
|
||||
this.lossless = new Lossless(this);
|
||||
}
|
||||
|
||||
async start() {
|
||||
|
@ -74,7 +76,7 @@ export class RhizomeNode {
|
|||
this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta));
|
||||
|
||||
// Start ZeroMQ publish and reply sockets
|
||||
this.pubSub.start();
|
||||
await this.pubSub.start();
|
||||
this.requestReply.start();
|
||||
|
||||
// Start HTTP server
|
||||
|
@ -82,6 +84,9 @@ export class RhizomeNode {
|
|||
this.httpServer.start();
|
||||
}
|
||||
|
||||
// Start libp2p subscription
|
||||
this.peers.start();
|
||||
|
||||
// Wait a short time for sockets to initialize
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
|
||||
|
|
51
src/peers.ts
51
src/peers.ts
|
@ -1,11 +1,11 @@
|
|||
import Debug from 'debug';
|
||||
import {Message} from 'zeromq';
|
||||
import {SEED_PEERS} from "./config";
|
||||
import {Delta} from "./delta";
|
||||
import {RhizomeNode} from "./node";
|
||||
import {Subscription} from './pub-sub';
|
||||
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
|
||||
import {PeerAddress} from "./types";
|
||||
import {SEED_PEERS} from "./config.js";
|
||||
import {Delta} from "./delta.js";
|
||||
import {RhizomeNode} from "./node.js";
|
||||
import {Subscription} from './pub-sub.js';
|
||||
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
|
||||
import {PeerAddress} from "./types.js";
|
||||
const debug = Debug('peers');
|
||||
|
||||
export enum RequestMethods {
|
||||
|
@ -31,26 +31,26 @@ class Peer {
|
|||
|
||||
async request(method: RequestMethods): Promise<Message> {
|
||||
if (!this.reqSock) {
|
||||
this.reqSock = new RequestSocket(this.reqAddr);
|
||||
this.reqSock = this.rhizomeNode.requestReply.createRequestSocket(this.reqAddr);
|
||||
}
|
||||
return this.reqSock.request(method);
|
||||
}
|
||||
|
||||
async subscribeDeltas() {
|
||||
if (!this.publishAddr) {
|
||||
debug(`requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
|
||||
const res = await this.request(RequestMethods.GetPublishAddress);
|
||||
this.publishAddr = PeerAddress.fromString(res.toString());
|
||||
debug(`received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
|
||||
}
|
||||
|
||||
this.subscription = this.rhizomeNode.pubSub.subscribe(
|
||||
this.publishAddr,
|
||||
"deltas",
|
||||
this.rhizomeNode.config.pubSubTopic,
|
||||
(sender, msg) => {
|
||||
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg.toString());
|
||||
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
|
||||
delta.receivedFrom = sender;
|
||||
debug(`Received delta: ${JSON.stringify(delta)}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Received delta: ${JSON.stringify(delta)}`);
|
||||
this.rhizomeNode.deltaStream.ingestDelta(delta);
|
||||
});
|
||||
|
||||
|
@ -81,37 +81,48 @@ export class Peers {
|
|||
this.addPeer(this.rhizomeNode.myRequestAddr);
|
||||
|
||||
this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
|
||||
debug('inspecting peer request');
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'inspecting peer request');
|
||||
switch (req.method) {
|
||||
case RequestMethods.GetPublishAddress: {
|
||||
debug('it\'s a request for our publish address');
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for our publish address');
|
||||
await res.send(this.rhizomeNode.myPublishAddr.toAddrString());
|
||||
break;
|
||||
}
|
||||
case RequestMethods.AskForDeltas: {
|
||||
debug('it\'s a request for deltas');
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'it\'s a request for deltas');
|
||||
// TODO: stream these rather than
|
||||
// trying to write them all in one message
|
||||
const deltas = this.rhizomeNode.deltaStream.deltasAccepted;
|
||||
debug(`sending ${deltas.length} deltas`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `sending ${deltas.length} deltas`);
|
||||
await res.send(JSON.stringify(deltas));
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
start() {
|
||||
this.rhizomeNode.pubSub.subscribeTopic(
|
||||
this.rhizomeNode.config.pubSubTopic,
|
||||
(sender, msg) => {
|
||||
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
|
||||
delta.receivedFrom = sender;
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Received delta: ${JSON.stringify(delta)}`);
|
||||
this.rhizomeNode.deltaStream.ingestDelta(delta);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
addPeer(addr: PeerAddress): Peer {
|
||||
const peer = new Peer(this.rhizomeNode, addr);
|
||||
this.peers.push(peer);
|
||||
debug('added peer', addr);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'added peer', addr);
|
||||
return peer;
|
||||
}
|
||||
|
||||
async subscribeToSeeds() {
|
||||
SEED_PEERS.forEach(async (addr, idx) => {
|
||||
debug(`SEED PEERS[${idx}]=${addr.toAddrString()}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`);
|
||||
const peer = this.addPeer(addr);
|
||||
await peer.subscribeDeltas();
|
||||
});
|
||||
|
@ -121,9 +132,9 @@ export class Peers {
|
|||
async askAllPeersForDeltas() {
|
||||
this.peers.filter(({isSelf}) => !isSelf)
|
||||
.forEach(async (peer, idx) => {
|
||||
debug(`Asking peer ${idx} for deltas`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
|
||||
const deltas = await peer.askForDeltas();
|
||||
debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
|
||||
for (const delta of deltas) {
|
||||
delta.receivedFrom = peer.reqAddr;
|
||||
this.rhizomeNode.deltaStream.receiveDelta(delta);
|
||||
|
|
124
src/pub-sub.ts
124
src/pub-sub.ts
|
@ -1,10 +1,18 @@
|
|||
import {GossipSub, gossipsub} from '@chainsafe/libp2p-gossipsub';
|
||||
import {noise} from '@chainsafe/libp2p-noise';
|
||||
import {yamux} from '@chainsafe/libp2p-yamux';
|
||||
import {identify} from '@libp2p/identify';
|
||||
import {mdns} from '@libp2p/mdns';
|
||||
import {ping} from '@libp2p/ping';
|
||||
import {tcp} from '@libp2p/tcp';
|
||||
import Debug from 'debug';
|
||||
import {Message, Publisher, Subscriber} from 'zeromq';
|
||||
import {RhizomeNode} from './node';
|
||||
import {PeerAddress} from './types';
|
||||
import {Libp2p, createLibp2p} from 'libp2p';
|
||||
import {Publisher, Subscriber} from 'zeromq';
|
||||
import {RhizomeNode} from './node.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
const debug = Debug('pub-sub');
|
||||
|
||||
export type SubscribedMessageHandler = (sender: PeerAddress, msg: Message) => void;
|
||||
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
|
||||
|
||||
// TODO: Allow subscribing to multiple topics on one socket
|
||||
export class Subscription {
|
||||
|
@ -13,22 +21,34 @@ export class Subscription {
|
|||
publishAddr: PeerAddress;
|
||||
publishAddrStr: string;
|
||||
cb: SubscribedMessageHandler;
|
||||
libp2p?: Libp2p;
|
||||
|
||||
constructor(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler) {
|
||||
constructor(
|
||||
readonly pubSub: PubSub,
|
||||
publishAddr: PeerAddress,
|
||||
topic: string,
|
||||
cb: SubscribedMessageHandler,
|
||||
libp2p?: Libp2p
|
||||
) {
|
||||
this.cb = cb;
|
||||
this.topic = topic;
|
||||
this.publishAddr = publishAddr;
|
||||
this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
|
||||
this.libp2p = libp2p;
|
||||
}
|
||||
|
||||
async start() {
|
||||
this.sock.connect(this.publishAddrStr);
|
||||
this.sock.subscribe(this.topic);
|
||||
debug(`Subscribing to ${this.topic} topic on ${this.publishAddrStr}`);
|
||||
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`);
|
||||
|
||||
// Wait for ZeroMQ messages.
|
||||
// This will block indefinitely.
|
||||
for await (const [, sender, msg] of this.sock) {
|
||||
const senderAddr = PeerAddress.fromString(sender.toString());
|
||||
this.cb(senderAddr, msg);
|
||||
const senderStr = PeerAddress.fromString(sender.toString());
|
||||
const msgStr = msg.toString();
|
||||
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
|
||||
this.cb(senderStr, msgStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +58,7 @@ export class PubSub {
|
|||
publishSock: Publisher;
|
||||
publishAddrStr: string;
|
||||
subscriptions: Subscription[] = [];
|
||||
libp2p?: Libp2p;
|
||||
|
||||
constructor(rhizomeNode: RhizomeNode) {
|
||||
this.rhizomeNode = rhizomeNode;
|
||||
|
@ -49,19 +70,82 @@ export class PubSub {
|
|||
|
||||
async start() {
|
||||
await this.publishSock.bind(this.publishAddrStr);
|
||||
debug(`Publishing socket bound to ${this.publishAddrStr}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
|
||||
|
||||
this.libp2p = await createLibp2p({
|
||||
addresses: {
|
||||
// TODO: Config
|
||||
listen: ['/ip4/127.0.0.1/tcp/0']
|
||||
},
|
||||
transports: [tcp()],
|
||||
connectionEncrypters: [noise()],
|
||||
streamMuxers: [yamux()],
|
||||
peerDiscovery: [mdns()],
|
||||
services: {
|
||||
pubsub: gossipsub(),
|
||||
identify: identify(),
|
||||
ping: ping(),
|
||||
}
|
||||
});
|
||||
|
||||
this.libp2p.addEventListener("peer:discovery", (event) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`);
|
||||
this.libp2p?.dial(event.detail.multiaddrs);
|
||||
});
|
||||
|
||||
this.libp2p.addEventListener("peer:connect", (event) => {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
|
||||
// TODO: Subscribe
|
||||
});
|
||||
}
|
||||
|
||||
async publish(topic: string, msg: string) {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to ZeroMQ, msg: ${msg}`);
|
||||
await this.publishSock.send([
|
||||
topic,
|
||||
this.rhizomeNode.myRequestAddr.toAddrString(),
|
||||
msg
|
||||
]);
|
||||
|
||||
if (this.libp2p) {
|
||||
const pubsub = this.libp2p.services.pubsub as GossipSub;
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `publishing to Libp2p, msg: ${msg}`);
|
||||
try {
|
||||
await pubsub.publish(topic, Buffer.from(msg));
|
||||
} catch (e: unknown) {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'Libp2p publish:', (e as Error).message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler): Subscription {
|
||||
const subscription = new Subscription(publishAddr, topic, cb);
|
||||
subscribedTopics = new Set<string>();
|
||||
|
||||
subscribeTopic(topic: string, cb: SubscribedMessageHandler) {
|
||||
if (!this.libp2p) throw new Error('libp2p not initialized');
|
||||
const pubsub = this.libp2p.services.pubsub as GossipSub;
|
||||
|
||||
// TODO: If we subscribe to multiple topics this callback will be duplicated
|
||||
pubsub.addEventListener("message", (event) => {
|
||||
const msg = Buffer.from(event.detail.data).toString();
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Libp2p subscribtion received msg: ${msg}`);
|
||||
cb(new PeerAddress('libp2p', 0), msg);
|
||||
});
|
||||
|
||||
// Add to our list of subscribed topics so we can unsubscribe later.
|
||||
// Also has the effect of not calling subscribe more than once per topic.
|
||||
if (!this.subscribedTopics.has(topic)) {
|
||||
pubsub.subscribe(topic);
|
||||
this.subscribedTopics.add(topic);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'subscribed topics:', Array.from(this.subscribedTopics.keys()));
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(
|
||||
publishAddr: PeerAddress,
|
||||
topic: string,
|
||||
cb: SubscribedMessageHandler
|
||||
): Subscription {
|
||||
const subscription = new Subscription(this, publishAddr, topic, cb, this.libp2p);
|
||||
this.subscriptions.push(subscription);
|
||||
return subscription;
|
||||
}
|
||||
|
@ -73,7 +157,23 @@ export class PubSub {
|
|||
|
||||
for (const subscription of this.subscriptions) {
|
||||
subscription.sock.close();
|
||||
debug('subscription socket is closed?', subscription.sock.closed);
|
||||
}
|
||||
|
||||
if (this.libp2p) {
|
||||
const pubsub = this.libp2p.services.pubsub as GossipSub;
|
||||
|
||||
pubsub.removeEventListener("message");
|
||||
|
||||
for (const topic of this.subscribedTopics) {
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `unsubscribing Libp2p topic ${topic}`);
|
||||
pubsub.unsubscribe(topic)
|
||||
}
|
||||
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, 'stopping gossipsub');
|
||||
|
||||
await pubsub.stop();
|
||||
|
||||
await this.libp2p.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import {Request, Reply, Message} from 'zeromq';
|
||||
import {EventEmitter} from 'node:events';
|
||||
import {RequestMethods} from './peers';
|
||||
import Debug from 'debug';
|
||||
import {RhizomeNode} from './node';
|
||||
import {PeerAddress} from './types';
|
||||
import {EventEmitter} from 'node:events';
|
||||
import {Message, Reply, Request} from 'zeromq';
|
||||
import {RhizomeNode} from './node.js';
|
||||
import {RequestMethods} from './peers.js';
|
||||
import {PeerAddress} from './types.js';
|
||||
const debug = Debug('request-reply');
|
||||
|
||||
export type PeerRequest = {
|
||||
|
@ -16,10 +16,10 @@ export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
|
|||
export class RequestSocket {
|
||||
sock = new Request();
|
||||
|
||||
constructor(addr: PeerAddress) {
|
||||
constructor(readonly requestReply: RequestReply, addr: PeerAddress) {
|
||||
const addrStr = `tcp://${addr.addr}:${addr.port}`;
|
||||
this.sock.connect(addrStr);
|
||||
debug(`Request socket connecting to ${addrStr}`);
|
||||
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${addrStr}`);
|
||||
}
|
||||
|
||||
async request(method: RequestMethods): Promise<Message> {
|
||||
|
@ -55,7 +55,7 @@ function peerRequestFromMsg(msg: Message): PeerRequest | null {
|
|||
const obj = JSON.parse(msg.toString());
|
||||
req = {...obj};
|
||||
} catch (e) {
|
||||
debug('error receiving command', e);
|
||||
console.error('error receiving command', e);
|
||||
}
|
||||
return req;
|
||||
}
|
||||
|
@ -76,10 +76,10 @@ export class RequestReply {
|
|||
async start() {
|
||||
|
||||
await this.replySock.bind(this.requestBindAddrStr);
|
||||
debug(`Reply socket bound to ${this.requestBindAddrStr}`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`);
|
||||
|
||||
for await (const [msg] of this.replySock) {
|
||||
debug(`Received message`, {msg: msg.toString()});
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Received message`, {msg: msg.toString()});
|
||||
const req = peerRequestFromMsg(msg);
|
||||
this.requestStream.emit('request', req);
|
||||
}
|
||||
|
@ -94,6 +94,10 @@ export class RequestReply {
|
|||
});
|
||||
}
|
||||
|
||||
createRequestSocket(addr: PeerAddress) {
|
||||
return new RequestSocket(this, addr);
|
||||
}
|
||||
|
||||
async stop() {
|
||||
await this.replySock.unbind(this.requestBindAddrStr);
|
||||
this.replySock.close();
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { Level } from 'level';
|
||||
import { LEVEL_DB_DIR } from './config';
|
||||
import { LEVEL_DB_DIR } from './config.js';
|
||||
import path from 'path';
|
||||
|
||||
function newStore(name: string): Level {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
import Debug from "debug";
|
||||
import EventEmitter from "events";
|
||||
import {Delta, DeltaID} from "./delta";
|
||||
import {DomainEntityID, TransactionID} from "./types";
|
||||
import {Delta, DeltaID} from "./delta.js";
|
||||
import {DomainEntityID, TransactionID} from "./types.js";
|
||||
import {Lossless} from "./lossless.js";
|
||||
const debug = Debug("transactions");
|
||||
|
||||
function getDeltaTransactionId(delta: Delta): TransactionID | undefined {
|
||||
|
@ -58,6 +59,8 @@ export class Transactions {
|
|||
transactions = new Map<TransactionID, Transaction>();
|
||||
eventStream = new EventEmitter();
|
||||
|
||||
constructor(readonly lossless: Lossless) {}
|
||||
|
||||
get(id: TransactionID): Transaction | undefined {
|
||||
return this.transactions.get(id);
|
||||
}
|
||||
|
@ -101,7 +104,7 @@ export class Transactions {
|
|||
if (transactionId && size) {
|
||||
// This delta describes a transaction
|
||||
|
||||
debug(`transaction ${transactionId} has size ${size}`);
|
||||
debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `transaction ${transactionId} has size ${size}`);
|
||||
|
||||
this.setSize(transactionId, size as number);
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import Debug from "debug";
|
||||
import {FSWatcher, readdirSync, readFileSync, watch} from "fs";
|
||||
import path, {join} from "path";
|
||||
import {Converter} from "showdown";
|
||||
import showdown from "showdown";
|
||||
import {RhizomeNode} from "../node.js";
|
||||
const {Converter} = showdown;
|
||||
const debug = Debug('md-files');
|
||||
|
||||
const docConverter = new Converter({
|
||||
|
@ -29,6 +31,8 @@ export class MDFiles {
|
|||
readmeWatcher?: FSWatcher;
|
||||
latestIndexHtml?: Html;
|
||||
|
||||
constructor(readonly rhizomeNode: RhizomeNode) {}
|
||||
|
||||
readFile(name: string) {
|
||||
const md = readFileSync(join('./markdown', `${name}.md`)).toString();
|
||||
let m = "";
|
||||
|
@ -95,14 +99,14 @@ export class MDFiles {
|
|||
|
||||
switch (eventType) {
|
||||
case 'rename': {
|
||||
debug(`file ${name} renamed`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} renamed`);
|
||||
// Remove it from memory and re-scan everything
|
||||
this.files.delete(name);
|
||||
this.readDir();
|
||||
break;
|
||||
}
|
||||
case 'change': {
|
||||
debug(`file ${name} changed`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `file ${name} changed`);
|
||||
// Re-read this file
|
||||
this.readFile(name)
|
||||
break;
|
||||
|
@ -117,7 +121,7 @@ export class MDFiles {
|
|||
|
||||
switch (eventType) {
|
||||
case 'change': {
|
||||
debug(`README file changed`);
|
||||
debug(`[${this.rhizomeNode.config.peerId}]`, `README file changed`);
|
||||
// Re-read this file
|
||||
this.readReadme()
|
||||
break;
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES6",
|
||||
"module": "CommonJS",
|
||||
"target": "ESNext",
|
||||
"module": "ESNext",
|
||||
"esModuleInterop": true,
|
||||
"moduleResolution": "Node",
|
||||
"moduleResolution": "bundler",
|
||||
"sourceMap": false,
|
||||
"isolatedModules": true,
|
||||
/* "allowImportingTsExtensions": true, */
|
||||
/* "noEmit": true, */
|
||||
"baseUrl": ".",
|
||||
"outDir": "dist",
|
||||
"lib": ["ESNext"],
|
||||
"types": ["node", "jest"],
|
||||
"importsNotUsedAsValues": "remove",
|
||||
"strict": true,
|
||||
"skipLibCheck": true,
|
||||
|
@ -21,5 +26,8 @@
|
|||
],
|
||||
"exclude": [
|
||||
"node_modules"
|
||||
]
|
||||
],
|
||||
"tsc-alias": {
|
||||
"resolveFullPaths": true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import {RhizomeNode, RhizomeNodeConfig} from "../src/node";
|
||||
import {Collection} from "../src/collection";
|
||||
import {RhizomeNode, RhizomeNodeConfig} from "../src/node.js";
|
||||
import {Collection} from "../src/collection.js";
|
||||
import {randomUUID} from "crypto";
|
||||
|
||||
const start = 5000;
|
||||
const range = 5000;
|
||||
|
@ -14,6 +15,7 @@ export class App extends RhizomeNode {
|
|||
publishBindPort: getRandomPort(),
|
||||
requestBindPort: getRandomPort(),
|
||||
httpPort: getRandomPort(),
|
||||
pubSubTopic: config?.pubSubTopic || `deltas-${randomUUID()}`,
|
||||
...config,
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue