Compare commits

...

8 Commits

35 changed files with 477 additions and 806 deletions

View File

@ -69,6 +69,7 @@ export DEBUG="*,-express:*"
export RHIZOME_REQUEST_BIND_PORT=4000
export RHIZOME_PUBLISH_BIND_PORT=4001
export RHIZOME_SEED_PEERS='localhost:4002, localhost:4004'
export RHIZOME_HTTP_API_ENABLE=true
export RHIZOME_HTTP_API_PORT=3000
export RHIZOME_PEER_ID=peer1
export RHIZOME_PUB_SUB_TOPIC=rhizome-demo-1
@ -80,6 +81,7 @@ export DEBUG="*,-express:*"
export RHIZOME_REQUEST_BIND_PORT=4002
export RHIZOME_PUBLISH_BIND_PORT=4003
export RHIZOME_SEED_PEERS='localhost:4000, localhost:4004'
export RHIZOME_HTTP_API_ENABLE=true
export RHIZOME_HTTP_API_PORT=3001
export RHIZOME_PEER_ID=peer2
export RHIZOME_PUB_SUB_TOPIC=rhizome-demo-1
@ -91,6 +93,7 @@ export DEBUG="*,-express:*"
export RHIZOME_REQUEST_BIND_PORT=4004
export RHIZOME_PUBLISH_BIND_PORT=4005
export RHIZOME_SEED_PEERS='localhost:4000, localhost:4002'
export RHIZOME_HTTP_API_ENABLE=true
export RHIZOME_HTTP_API_PORT=3002
export RHIZOME_PEER_ID=peer3
export RHIZOME_PUB_SUB_TOPIC=rhizome-demo-1

53
__tests__/delta.ts Normal file
View File

@ -0,0 +1,53 @@
import {DeltaV1, DeltaV2} from "../src/delta";
describe("Delta", () => {
it("can convert DeltaV1 to DeltaV2", () => {
const deltaV1 = new DeltaV1({
creator: 'a',
host: 'h',
pointers: [{
localContext: 'color',
target: 'red'
}, {
localContext: 'furniture',
target: 'chair-1',
targetContext: 'color'
}]
});
const deltaV2 = DeltaV2.fromV1(deltaV1);
expect(deltaV2).toMatchObject({
...deltaV1,
pointers: {
color: 'red',
furniture: {'chair-1': 'color'}
}
});
});
it("can convert DeltaV2 to DeltaV1", () => {
const deltaV2 = new DeltaV2({
creator: 'a',
host: 'h',
pointers: {
color: 'red',
furniture: {'chair-1': 'color'}
}
});
const deltaV1 = deltaV2.toV1();
expect(deltaV1).toMatchObject({
...deltaV2,
pointers: [{
localContext: 'color',
target: 'red'
}, {
localContext: 'furniture',
target: 'chair-1',
targetContext: 'color'
}]
});
});
});

View File

@ -1,34 +1,42 @@
import {Delta, DeltaFilter} from '../src/delta.js';
import {Lossless} from '../src/lossless.js';
import {RhizomeNode} from '../src/node.js';
import {Delta, DeltaFilter, DeltaV2} from '../src/delta';
import {Lossless} from '../src/lossless';
import {RhizomeNode} from '../src/node';
describe('Lossless', () => {
const node = new RhizomeNode();
it('creates a lossless view of keanu as neo in the matrix', () => {
const delta = new Delta({
const delta = new DeltaV2({
creator: 'a',
host: 'h',
pointers: [{
localContext: "actor",
target: "keanu",
targetContext: "roles"
}, {
localContext: "role",
target: "neo",
targetContext: "actor"
}, {
localContext: "film",
target: "the_matrix",
targetContext: "cast"
}, {
localContext: "base_salary",
target: 1000000
}, {
localContext: "salary_currency",
target: "usd"
}]
});
pointers: {
actor: {"keanu": "roles"},
role: {"neo": "actor"},
film: {"the_matrix": "cast"},
base_salary: 1000000,
salary_currency: "usd"
}
}).toV1();
expect(delta.pointers).toMatchObject([{
localContext: "actor",
target: "keanu",
targetContext: "roles"
}, {
localContext: "role",
target: "neo",
targetContext: "actor"
}, {
localContext: "film",
target: "the_matrix",
targetContext: "cast"
}, {
localContext: "base_salary",
target: 1000000
}, {
localContext: "salary_currency",
target: "usd"
}]);
const lossless = new Lossless(node);

View File

@ -1,9 +1,9 @@
import Debug from 'debug';
import {Delta, PointerTarget} from "../src/delta.js";
import {lastValueFromDeltas} from "../src/last-write-wins.js";
import {Lossless, LosslessViewOne} from "../src/lossless.js";
import {Lossy, valueFromCollapsedDelta} from "../src/lossy.js";
import {RhizomeNode} from "../src/node.js";
import {Delta, PointerTarget} from "../src/delta";
import {lastValueFromDeltas, valueFromCollapsedDelta} from "../src/last-write-wins";
import {Lossless, LosslessViewOne} from "../src/lossless";
import {Lossy} from "../src/lossy";
import {RhizomeNode} from "../src/node";
const debug = Debug('test:lossy');
type Role = {
@ -16,45 +16,45 @@ type Summary = {
roles: Role[];
};
function initializer(): Summary {
return {
roles: []
};
}
// TODO: Add more rigor to this example approach to generating a summary.
// it's really not CRDT, it likely depends on the order of the pointers.
// TODO: Prove with failing test
const reducer = (acc: Summary, cur: LosslessViewOne): Summary => {
if (cur.referencedAs.includes("role")) {
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
if (!delta) throw new Error('expected to find delta');
if (!actor) throw new Error('expected to find actor');
const film = valueFromCollapsedDelta("film", delta);
if (!film) throw new Error('expected to find film');
acc.roles.push({
role: cur.id,
actor,
film
});
class Summarizer extends Lossy<Summary, Summary> {
initializer(): Summary {
return {
roles: []
};
}
return acc;
}
// TODO: Add more rigor to this example approach to generating a summary.
// it's really not CRDT, it likely depends on the order of the pointers.
// TODO: Prove with failing test
const resolver = (acc: Summary): Summary => {
return acc;
}
reducer(acc: Summary, cur: LosslessViewOne): Summary {
if (cur.referencedAs.includes("role")) {
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
if (!delta) throw new Error('expected to find delta');
if (!actor) throw new Error('expected to find actor');
const film = valueFromCollapsedDelta("film", delta);
if (!film) throw new Error('expected to find film');
acc.roles.push({
role: cur.id,
actor,
film
});
}
return acc;
}
resolver(acc: Summary): Summary {
return acc;
}
}
describe('Lossy', () => {
describe('use a provided initializer, reducer, and resolver to resolve entity views', () => {
const node = new RhizomeNode();
const lossless = new Lossless(node);
const lossy = new Lossy(lossless, initializer, reducer, resolver);
const lossy = new Summarizer(lossless);
beforeAll(() => {
lossless.ingestDelta(new Delta({

View File

@ -1,4 +1,4 @@
import {parseAddressList, PeerAddress} from '../src/peers.js';
import {parseAddressList, PeerAddress} from '../src/peers';
describe('PeerAddress', () => {
it('toString()', () => {

13
__tests__/relational.ts Normal file
View File

@ -0,0 +1,13 @@
describe('Relational', () => {
it.skip('Allows expressing a domain ontology as a relational schema', async () => {});
// Deltas can be filtered at time of view resolution, and
// excluded if they violate schema constraints;
// Ideally the sender minimizes this by locally validating against the constraints.
// For cases where deltas conflict, there can be a resolution process,
// with configurable parameters such as duration, quorum, and so on;
// or a deterministic algorithm can be applied.
it.skip('Can validate a delta against a relational constraint', async () => {});
it.skip('Can validate a delta against a set of relational constraints', async () => {});
});

View File

@ -1,4 +1,4 @@
import {App} from "../../util/app.js";
import {App} from "../../util/app";
describe('Run', () => {
let app: App;

View File

@ -1,5 +1,5 @@
import Debug from 'debug';
import {App} from '../../util/app.js';
import {App} from '../../util/app';
const debug = Debug('test:two');
describe('Run', () => {
@ -13,8 +13,6 @@ describe('Run', () => {
apps[1] = new App({
httpEnable: true,
peerId: 'app-002-B',
// Make the apps use the same pubsub topic so they can talk to each other
pubSubTopic: apps[0].config.pubSubTopic,
});
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
apps[1].config.seedPeers.push(apps[0].myRequestAddr);

View File

@ -1,7 +1,7 @@
import Debug from 'debug';
import {Collection} from "../src/collection.js";
import {Entity} from "../src/entity.js";
import {RhizomeNode} from "../src/node.js";
import {BasicCollection} from '../src/collection-basic';
import {Entity} from "../src/entity";
import {RhizomeNode} from "../src/node";
const debug = Debug('example-app');
// As an app we want to be able to write and read data.
@ -23,7 +23,7 @@ type User = {
// Enable API to read lossless view
rhizomeNode.httpServer.httpApi.serveLossless();
const users = new Collection("user");
const users = new BasicCollection("user");
users.rhizomeConnect(rhizomeNode);
users.onUpdate((u: Entity) => {

315
package-lock.json generated
View File

@ -9,13 +9,6 @@
"version": "0.1.0",
"license": "Unlicense",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/identify": "^3.0.14",
"@libp2p/mdns": "^11.0.16",
"@libp2p/ping": "^2.0.14",
"@libp2p/tcp": "^10.0.14",
"debug": "^4.4.0",
"express": "^4.21.2",
"json-logic-js": "^2.0.5",
@ -607,89 +600,12 @@
"dev": true,
"license": "MIT"
},
"node_modules/@chainsafe/as-chacha20poly1305": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/@chainsafe/as-chacha20poly1305/-/as-chacha20poly1305-0.1.0.tgz",
"integrity": "sha512-BpNcL8/lji/GM3+vZ/bgRWqJ1q5kwvTFmGPk7pxm/QQZDbaMI98waOHjEymTjq2JmdD/INdNBFOVSyJofXg7ew==",
"license": "Apache-2.0"
},
"node_modules/@chainsafe/as-sha256": {
"version": "0.4.2",
"resolved": "https://registry.npmjs.org/@chainsafe/as-sha256/-/as-sha256-0.4.2.tgz",
"integrity": "sha512-HJ8GZBRjLeWtRsAXf3EbNsNzmTGpzTFjfpSf4yHkLYC+E52DhT6hwz+7qpj6I/EmFzSUm5tYYvT9K8GZokLQCQ==",
"license": "Apache-2.0"
},
"node_modules/@chainsafe/is-ip": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/@chainsafe/is-ip/-/is-ip-2.0.2.tgz",
"integrity": "sha512-ndGqEMG1W5WkGagaqOZHpPU172AGdxr+LD15sv3WIUvT5oCFUrG1Y0CW/v2Egwj4JXEvSibaIIIqImsm98y1nA==",
"license": "MIT"
},
"node_modules/@chainsafe/libp2p-gossipsub": {
"version": "14.1.0",
"resolved": "https://registry.npmjs.org/@chainsafe/libp2p-gossipsub/-/libp2p-gossipsub-14.1.0.tgz",
"integrity": "sha512-nzFBbHOoRFa/bXUSzmJaXOgHI+EttTldhLJ33yWcM0DxnWhLKychHkCDLoJO3THa1+dnzrDJoxj3N3/V0WoPVw==",
"license": "Apache-2.0",
"dependencies": {
"@libp2p/crypto": "^5.0.0",
"@libp2p/interface": "^2.0.0",
"@libp2p/interface-internal": "^2.0.0",
"@libp2p/peer-id": "^5.0.0",
"@libp2p/pubsub": "^10.0.0",
"@multiformats/multiaddr": "^12.1.14",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
"engines": {
"npm": ">=8.7.0"
}
},
"node_modules/@chainsafe/libp2p-noise": {
"version": "16.0.0",
"resolved": "https://registry.npmjs.org/@chainsafe/libp2p-noise/-/libp2p-noise-16.0.0.tgz",
"integrity": "sha512-8rqr8V1RD2/lVbfL0Bb//N8iPOFof11cUe8v8z8xJT7fUhCAbtCCSM4jbwI4HCnw0MvHLmcpmAfDCFRwcWzoeA==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@chainsafe/as-chacha20poly1305": "^0.1.0",
"@chainsafe/as-sha256": "^0.4.1",
"@libp2p/crypto": "^5.0.0",
"@libp2p/interface": "^2.0.0",
"@libp2p/peer-id": "^5.0.0",
"@noble/ciphers": "^0.6.0",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"it-length-prefixed": "^9.0.1",
"it-length-prefixed-stream": "^1.0.0",
"it-pair": "^2.0.6",
"it-pipe": "^3.0.1",
"it-stream-types": "^2.0.1",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^5.0.0",
"wherearewe": "^2.0.1"
}
},
"node_modules/@chainsafe/libp2p-yamux": {
"version": "7.0.1",
"resolved": "https://registry.npmjs.org/@chainsafe/libp2p-yamux/-/libp2p-yamux-7.0.1.tgz",
"integrity": "sha512-949MI0Ll0AsYq1gUETZmL/MijwX0jilOQ1i4s8wDEXGiMhuPWWiMsPgEnX6n+VzFmTrfNYyGaaJj5/MqxV9y/g==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/interface": "^2.0.0",
"@libp2p/utils": "^6.0.0",
"get-iterator": "^2.0.1",
"it-foreach": "^2.0.6",
"it-pushable": "^3.2.3",
"it-stream-types": "^2.0.1",
"uint8arraylist": "^2.4.8"
}
},
"node_modules/@chainsafe/netmask": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/@chainsafe/netmask/-/netmask-2.0.0.tgz",
@ -1445,29 +1361,6 @@
"uint8arrays": "^5.1.0"
}
},
"node_modules/@libp2p/identify": {
"version": "3.0.14",
"resolved": "https://registry.npmjs.org/@libp2p/identify/-/identify-3.0.14.tgz",
"integrity": "sha512-H80tdH8csD3W+wHoaltJEnjTAmZBJ22bYqFOPk5YKCF0k19Ox2MwRTkyCXuVDIdQfrYs94JE3HvLvUoN9X/JBQ==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/crypto": "^5.0.8",
"@libp2p/interface": "^2.3.0",
"@libp2p/interface-internal": "^2.2.1",
"@libp2p/peer-id": "^5.0.9",
"@libp2p/peer-record": "^8.0.13",
"@libp2p/utils": "^6.3.0",
"@multiformats/multiaddr": "^12.3.3",
"@multiformats/multiaddr-matcher": "^1.6.0",
"it-drain": "^3.0.7",
"it-parallel": "^3.0.8",
"it-protobuf-stream": "^1.1.5",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0",
"wherearewe": "^2.0.1"
}
},
"node_modules/@libp2p/interface": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.3.0.tgz",
@ -1508,22 +1401,6 @@
"weald": "^1.0.4"
}
},
"node_modules/@libp2p/mdns": {
"version": "11.0.16",
"resolved": "https://registry.npmjs.org/@libp2p/mdns/-/mdns-11.0.16.tgz",
"integrity": "sha512-FJLJywEFCm5r61b7IZ+KGvxUPEGuGx5VGXyTSE10y7lSxizn50ZUAmnN76OsBdLz/Uj3/iyzTOOmY17mzBjN3g==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/interface": "^2.3.0",
"@libp2p/interface-internal": "^2.2.1",
"@libp2p/peer-id": "^5.0.9",
"@libp2p/utils": "^6.3.0",
"@multiformats/multiaddr": "^12.3.3",
"@types/multicast-dns": "^7.2.4",
"dns-packet": "^5.6.1",
"multicast-dns": "^7.2.5"
}
},
"node_modules/@libp2p/multistream-select": {
"version": "6.0.10",
"resolved": "https://registry.npmjs.org/@libp2p/multistream-select/-/multistream-select-6.0.10.tgz",
@ -1603,59 +1480,6 @@
"uint8arrays": "^5.1.0"
}
},
"node_modules/@libp2p/ping": {
"version": "2.0.14",
"resolved": "https://registry.npmjs.org/@libp2p/ping/-/ping-2.0.14.tgz",
"integrity": "sha512-+idRl+4T2bhP+FNDgwBFWHyHkFoOcjwzQmezLR00mG8hg2iH3BvDmzMd7cagZM21SaeDd4eiN8XyhQqyi1RcZA==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/crypto": "^5.0.8",
"@libp2p/interface": "^2.3.0",
"@libp2p/interface-internal": "^2.2.1",
"@multiformats/multiaddr": "^12.3.3",
"it-byte-stream": "^1.1.0",
"uint8arrays": "^5.1.0"
}
},
"node_modules/@libp2p/pubsub": {
"version": "10.0.14",
"resolved": "https://registry.npmjs.org/@libp2p/pubsub/-/pubsub-10.0.14.tgz",
"integrity": "sha512-fzHHpI6Smrvvlje1ySRfohjlxeifpoowNRcnJy6/ZFoziHvtufuPQdJ65jL/oobd6sTnbbShAlkkx/KVXBr5lw==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/crypto": "^5.0.8",
"@libp2p/interface": "^2.3.0",
"@libp2p/interface-internal": "^2.2.1",
"@libp2p/peer-collections": "^6.0.13",
"@libp2p/peer-id": "^5.0.9",
"@libp2p/utils": "^6.3.0",
"it-length-prefixed": "^9.1.0",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.3.1",
"p-queue": "^8.0.1",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0"
}
},
"node_modules/@libp2p/tcp": {
"version": "10.0.14",
"resolved": "https://registry.npmjs.org/@libp2p/tcp/-/tcp-10.0.14.tgz",
"integrity": "sha512-HwYCvNnSqjVzoy3DQh6chy4EwWgmnII+ccT/LEpitKbV8QzHTv2HUeSaGtuGc42Z95rFMXqmJeqBkhbXcpeCRA==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@libp2p/interface": "^2.3.0",
"@libp2p/utils": "^6.3.0",
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.3.3",
"@types/sinon": "^17.0.3",
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"progress-events": "^1.0.1",
"race-event": "^1.3.0",
"stream-to-it": "^1.0.1"
}
},
"node_modules/@libp2p/utils": {
"version": "6.3.0",
"resolved": "https://registry.npmjs.org/@libp2p/utils/-/utils-6.3.0.tgz",
@ -1702,15 +1526,6 @@
"uint8arrays": "^5.0.2"
}
},
"node_modules/@multiformats/mafmt": {
"version": "12.1.6",
"resolved": "https://registry.npmjs.org/@multiformats/mafmt/-/mafmt-12.1.6.tgz",
"integrity": "sha512-tlJRfL21X+AKn9b5i5VnaTD6bNttpSpcqwKVmDmSHLwxoz97fAHaepqFOk/l1fIu94nImIXneNbhsJx/RQNIww==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@multiformats/multiaddr": "^12.0.0"
}
},
"node_modules/@multiformats/multiaddr": {
"version": "12.3.4",
"resolved": "https://registry.npmjs.org/@multiformats/multiaddr/-/multiaddr-12.3.4.tgz",
@ -1736,15 +1551,6 @@
"multiformats": "^13.0.0"
}
},
"node_modules/@noble/ciphers": {
"version": "0.6.0",
"resolved": "https://registry.npmjs.org/@noble/ciphers/-/ciphers-0.6.0.tgz",
"integrity": "sha512-mIbq/R9QXk5/cTfESb1OKtyFnk7oc1Om/8onA1158K9/OZUQFDEVy55jVTato+xmp3XX6F6Qh0zz0Nc1AxAlRQ==",
"license": "MIT",
"funding": {
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/@noble/curves": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/@noble/curves/-/curves-1.7.0.tgz",
@ -2089,16 +1895,6 @@
"dev": true,
"license": "MIT"
},
"node_modules/@types/multicast-dns": {
"version": "7.2.4",
"resolved": "https://registry.npmjs.org/@types/multicast-dns/-/multicast-dns-7.2.4.tgz",
"integrity": "sha512-ib5K4cIDR4Ro5SR3Sx/LROkMDa0BHz0OPaCBL/OSPDsAXEGZ3/KQeS6poBKYVN7BfjXDL9lWNwzyHVgt/wkyCw==",
"license": "MIT",
"dependencies": {
"@types/dns-packet": "*",
"@types/node": "*"
}
},
"node_modules/@types/murmurhash3js-revisited": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@types/murmurhash3js-revisited/-/murmurhash3js-revisited-3.0.3.tgz",
@ -2170,21 +1966,6 @@
"dev": true,
"license": "MIT"
},
"node_modules/@types/sinon": {
"version": "17.0.3",
"resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-17.0.3.tgz",
"integrity": "sha512-j3uovdn8ewky9kRBG19bOwaZbexJu/XjtkHyjvUgt4xfPFz18dcORIMqnYh66Fx3Powhcr85NT5+er3+oViapw==",
"license": "MIT",
"dependencies": {
"@types/sinonjs__fake-timers": "*"
}
},
"node_modules/@types/sinonjs__fake-timers": {
"version": "8.1.5",
"resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.5.tgz",
"integrity": "sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==",
"license": "MIT"
},
"node_modules/@types/stack-utils": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/@types/stack-utils/-/stack-utils-2.0.3.tgz",
@ -4098,15 +3879,6 @@
"integrity": "sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==",
"license": "MIT"
},
"node_modules/denque": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz",
"integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==",
"license": "Apache-2.0",
"engines": {
"node": ">=0.10"
}
},
"node_modules/depd": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz",
@ -6184,12 +5956,6 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/is-electron": {
"version": "2.2.2",
"resolved": "https://registry.npmjs.org/is-electron/-/is-electron-2.2.2.tgz",
"integrity": "sha512-FO/Rhvz5tuw4MCWkpMzHFKWD2LsfHzIb7i6MdPYZ/KW7AlxawyLkqdy+jPZP1WubqEADE3O4FUENlJHDfQASRg==",
"license": "MIT"
},
"node_modules/is-extglob": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/is-extglob/-/is-extglob-2.1.1.tgz",
@ -6722,20 +6488,6 @@
"it-pushable": "^3.2.3"
}
},
"node_modules/it-pair": {
"version": "2.0.6",
"resolved": "https://registry.npmjs.org/it-pair/-/it-pair-2.0.6.tgz",
"integrity": "sha512-5M0t5RAcYEQYNG5BV7d7cqbdwbCAp5yLdzvkxsZmkuZsLbTdZzah6MQySYfaAQjNDCq6PUnDt0hqBZ4NwMfW6g==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"it-stream-types": "^2.0.1",
"p-defer": "^4.0.0"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/it-parallel": {
"version": "3.0.8",
"resolved": "https://registry.npmjs.org/it-parallel/-/it-parallel-3.0.8.tgz",
@ -6766,17 +6518,6 @@
"npm": ">=7.0.0"
}
},
"node_modules/it-protobuf-stream": {
"version": "1.1.5",
"resolved": "https://registry.npmjs.org/it-protobuf-stream/-/it-protobuf-stream-1.1.5.tgz",
"integrity": "sha512-H70idW45As3cEbU4uSoZ9IYHUIV3YM69/2mmXYR7gOlPabWjuyNi3/abK11geiiq3la27Sos/mXr68JljjKtEQ==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"it-length-prefixed-stream": "^1.0.0",
"it-stream-types": "^2.0.1",
"uint8arraylist": "^2.4.8"
}
},
"node_modules/it-pushable": {
"version": "3.2.3",
"resolved": "https://registry.npmjs.org/it-pushable/-/it-pushable-3.2.3.tgz",
@ -8046,19 +7787,6 @@
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==",
"license": "MIT"
},
"node_modules/multicast-dns": {
"version": "7.2.5",
"resolved": "https://registry.npmjs.org/multicast-dns/-/multicast-dns-7.2.5.tgz",
"integrity": "sha512-2eznPJP8z2BFLX50tf0LuODrpINqP1RVIm/CObbTcBRITQgmC/TjcREF1NeTBzIcR5XO/ukWo+YHOjBbFwIupg==",
"license": "MIT",
"dependencies": {
"dns-packet": "^5.2.2",
"thunky": "^1.0.2"
},
"bin": {
"multicast-dns": "cli.js"
}
},
"node_modules/multiformats": {
"version": "13.3.1",
"resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.3.1.tgz",
@ -8386,21 +8114,6 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-event": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/p-event/-/p-event-6.0.1.tgz",
"integrity": "sha512-Q6Bekk5wpzW5qIyUP4gdMEujObYstZl6DMMOSenwBvV0BlE5LkDwkjs5yHbZmdCEq2o4RJx4tE1vwxFVf2FG1w==",
"license": "MIT",
"dependencies": {
"p-timeout": "^6.1.2"
},
"engines": {
"node": ">=16.17"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-limit": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz",
@ -9529,15 +9242,6 @@
"node": ">= 0.8"
}
},
"node_modules/stream-to-it": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/stream-to-it/-/stream-to-it-1.0.1.tgz",
"integrity": "sha512-AqHYAYPHcmvMrcLNgncE/q0Aj/ajP6A4qGhxP6EVn7K3YTNs0bJpJyk57wc2Heb7MUL64jurvmnmui8D9kjZgA==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"it-stream-types": "^2.0.1"
}
},
"node_modules/string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
@ -9757,12 +9461,6 @@
"license": "MIT",
"peer": true
},
"node_modules/thunky": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/thunky/-/thunky-1.1.0.tgz",
"integrity": "sha512-eHY7nBftgThBqOyHGVN+l8gF0BucP09fMo0oO/Lb0w1OF80dJv+lDVpXG60WMQvkcxAkNybKsrEIE3ZtKGmPrA==",
"license": "MIT"
},
"node_modules/tldts": {
"version": "6.1.69",
"resolved": "https://registry.npmjs.org/tldts/-/tldts-6.1.69.tgz",
@ -10393,19 +10091,6 @@
"url": "https://github.com/chalk/supports-color?sponsor=1"
}
},
"node_modules/wherearewe": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/wherearewe/-/wherearewe-2.0.1.tgz",
"integrity": "sha512-XUguZbDxCA2wBn2LoFtcEhXL6AXo+hVjGonwhSTTTU9SzbWG8Xu3onNIpzf9j/mYUcJQ0f+m37SzG77G851uFw==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"is-electron": "^2.2.0"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/which": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz",

View File

@ -2,7 +2,6 @@
"name": "rhizome-node",
"version": "0.1.0",
"description": "Rhizomatic database engine node",
"type": "module",
"scripts": {
"build": "tsc",
"build:watch": "tsc --watch",
@ -19,32 +18,11 @@
],
"testMatch": [
"**/__tests__/**/*"
],
"transform": {
"^.+\\.ts$": [
"ts-jest",
{
"useESM": true
}
]
},
"extensionsToTreatAsEsm": [
".ts"
],
"moduleNameMapper": {
"^(\\.{1,2}/.*)\\.js$": "$1"
}
]
},
"author": "Taliesin (Ladd) <ladd@dgov.io>",
"license": "Unlicense",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/identify": "^3.0.14",
"@libp2p/mdns": "^11.0.16",
"@libp2p/ping": "^2.0.14",
"@libp2p/tcp": "^10.0.14",
"debug": "^4.4.0",
"express": "^4.21.2",
"json-logic-js": "^2.0.5",

View File

@ -1,38 +1,31 @@
// A basic collection of entities
// This may be extended to house a collection of objects that all follow a common schema.
// It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join
import Debug from 'debug';
import {randomUUID} from "node:crypto";
import EventEmitter from "node:events";
import {Delta} from "./delta.js";
import {Entity, EntityProperties} from "./entity.js";
import {LastWriteWins, ResolvedViewOne} from './last-write-wins.js';
import {RhizomeNode} from "./node.js";
import {DomainEntityID} from "./types.js";
const debug = Debug('rz:collection');
import {Delta} from "./delta";
import {Entity, EntityProperties} from "./entity";
import {ResolvedViewOne} from './last-write-wins';
import {RhizomeNode} from "./node";
import {DomainEntityID} from "./types";
const debug = Debug('rz:abstract-collection');
export class Collection {
export abstract class Collection<View> {
rhizomeNode?: RhizomeNode;
name: string;
eventStream = new EventEmitter();
lossy?: LastWriteWins;
lossy?: View;
constructor(name: string) {
this.name = name;
}
// Instead of trying to update our final view of the entity with every incoming delta,
// let's try this:
// - keep a lossless view (of everything)
// - build a lossy view when needed
// This approach is simplistic, but can then be optimized and enhanced.
abstract initializeView(): void;
abstract resolve(id: string): ResolvedViewOne | undefined;
rhizomeConnect(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
this.lossy = new LastWriteWins(this.rhizomeNode.lossless);
this.initializeView();
// Listen for completed transactions, and emit updates to event stream
this.rhizomeNode.lossless.eventStream.on("updated", (id) => {
@ -43,13 +36,12 @@ export class Collection {
this.eventStream.emit("update", res);
});
rhizomeNode.httpServer.httpApi.serveCollection(this);
// TODO: Fix this
rhizomeNode.httpServer.httpApi.serveCollection<View>(this);
debug(`[${this.rhizomeNode.config.peerId}]`, `Connected ${this.name} to rhizome`);
}
// Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property.
// This function is here instead of Entity so that it can:
// - read the current state in order to build its delta
// - include the collection name in the delta it produces
@ -206,14 +198,4 @@ export class Collection {
return res;
}
resolve(
id: string
): ResolvedViewOne | undefined {
if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
if (!this.lossy) throw new Error('lossy view not initialized');
const res = this.lossy.resolve([id]) || {};
return res[id];
}
}

27
src/collection-basic.ts Normal file
View File

@ -0,0 +1,27 @@
// A basic collection of entities
// This may be extended to house a collection of objects that all follow a common schema.
// It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join
import {Collection} from './collection-abstract';
import {LastWriteWins, ResolvedViewOne} from './last-write-wins';
export class BasicCollection extends Collection<LastWriteWins> {
lossy?: LastWriteWins;
initializeView() {
if (!this.rhizomeNode) throw new Error('not connected to rhizome');
this.lossy = new LastWriteWins(this.rhizomeNode.lossless);
}
resolve(
id: string
): ResolvedViewOne | undefined {
if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
if (!this.lossy) throw new Error('lossy view not initialized');
const res = this.lossy.resolve([id]) || {};
return res[id];
}
}

View File

@ -0,0 +1,26 @@
import {Collection} from "./collection-abstract";
import {LastWriteWins, ResolvedViewOne} from "./last-write-wins";
class RelationalView extends LastWriteWins {
}
export class RelationalCollection extends Collection<RelationalView> {
lossy?: RelationalView;
initializeView() {
if (!this.rhizomeNode) throw new Error('not connected to rhizome');
this.lossy = new RelationalView(this.rhizomeNode.lossless);
}
resolve(
id: string
): ResolvedViewOne | undefined {
if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
if (!this.lossy) throw new Error('lossy view not initialized');
const res = this.lossy.resolve([id]) || {};
return res[id];
}
}

View File

@ -1,8 +1,8 @@
import Debug from 'debug';
import EventEmitter from 'node:events';
import objectHash from 'object-hash';
import {Delta, DeltaNetworkImage} from './delta.js';
import {RhizomeNode} from './node.js';
import {Delta} from './delta';
import {RhizomeNode} from './node';
const debug = Debug('rz:deltas');
enum Decision {
@ -12,7 +12,6 @@ enum Decision {
};
export class DeltaStream {
rhizomeNode: RhizomeNode;
deltaStream = new EventEmitter();
deltasProposed: Delta[] = [];
deltasAccepted: Delta[] = [];
@ -20,9 +19,7 @@ export class DeltaStream {
deltasDeferred: Delta[] = [];
hashesReceived = new Set<string>();
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
}
constructor(readonly rhizomeNode: RhizomeNode) {}
applyPolicy(delta: Delta): Decision {
return !!delta && Decision.Accept;
@ -88,18 +85,19 @@ export class DeltaStream {
async publishDelta(delta: Delta) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`);
await this.rhizomeNode.pubSub.publish(
this.rhizomeNode.config.pubSubTopic,
"deltas",
this.serializeDelta(delta)
);
}
serializeDelta(delta: Delta): string {
const deltaNetworkImage = new DeltaNetworkImage(delta);
const deltaNetworkImage = delta.toNetworkImage();
return JSON.stringify(deltaNetworkImage);
}
deserializeDelta(input: string): Delta {
// TODO: Input validation
return JSON.parse(input);
const parsed = JSON.parse(input);
return Delta.fromNetworkImage(parsed);
}
}

View File

@ -1,25 +1,37 @@
import {randomUUID} from "crypto";
import Debug from 'debug';
import microtime from 'microtime';
import {CreatorID, HostID, Timestamp, TransactionID} from "./types.js";
import {PeerAddress} from "./peers.js";
import {PeerAddress} from "./peers";
import {CreatorID, DomainEntityID, HostID, PropertyID, Timestamp, TransactionID} from "./types";
const debug = Debug('rz:delta');
export type DeltaID = string;
export type PointerTarget = string | number | undefined;
export type PointerTarget = string | number | null;
export type Pointer = {
type PointerV1 = {
localContext: string;
target: PointerTarget;
targetContext?: string;
};
export class DeltaNetworkImage {
export type Scalar = string | number | null;
export type Reference = {
[key: PropertyID]: DomainEntityID
};
export type PointersV2 = {
[key: PropertyID]: Scalar | Reference
};
export class DeltaNetworkImageV1 {
id: DeltaID;
timeCreated: Timestamp;
host: HostID;
creator: CreatorID;
pointers: Pointer[];
constructor({id, timeCreated, host, creator, pointers}: DeltaNetworkImage) {
pointers: PointerV1[];
constructor({id, timeCreated, host, creator, pointers}: DeltaNetworkImageV1) {
this.id = id;
this.host = host;
this.creator = creator;
@ -28,26 +40,106 @@ export class DeltaNetworkImage {
}
};
export class Delta extends DeltaNetworkImage {
export class DeltaNetworkImageV2 {
id: DeltaID;
timeCreated: Timestamp;
host: HostID;
creator: CreatorID;
pointers: PointersV2;
constructor({id, timeCreated, host, creator, pointers}: DeltaNetworkImageV2) {
this.id = id;
this.host = host;
this.creator = creator;
this.timeCreated = timeCreated;
this.pointers = pointers;
}
};
export class DeltaV1 extends DeltaNetworkImageV1 {
receivedFrom?: PeerAddress;
timeReceived: Timestamp;
transactionId?: TransactionID;
// TODO: Verify the following assumption:
// We're assuming that you only call this constructor when
// actually creating a new delta.
// When receiving one from the network, you can
constructor({host, creator, pointers}: Partial<DeltaNetworkImage>) {
// TODO: Verify that when receiving a delta from the network we can
// retain the delta's id.
const id = randomUUID();
const timeCreated = microtime.now();
constructor({id, timeCreated, host, creator, pointers}: Partial<DeltaNetworkImageV1>) {
id = id ?? randomUUID();
timeCreated = timeCreated ?? microtime.now();
if (!host || !creator || !pointers) throw new Error('uninitializied values');
super({id, timeCreated, host, creator, pointers});
this.timeCreated = timeCreated;
this.timeReceived = this.timeCreated;
}
toNetworkImage() {
return new DeltaNetworkImageV1(this);
}
static fromNetworkImage(delta: DeltaNetworkImageV1) {
return new DeltaV1(delta);
}
}
export class DeltaV2 extends DeltaNetworkImageV2 {
receivedFrom?: PeerAddress;
timeReceived: Timestamp;
transactionId?: TransactionID;
constructor({id, timeCreated, host, creator, pointers}: Partial<DeltaNetworkImageV2>) {
id = id ?? randomUUID();
timeCreated = timeCreated ?? microtime.now();
if (!host || !creator || !pointers) throw new Error('uninitializied values');
super({id, timeCreated, host, creator, pointers});
this.timeCreated = timeCreated;
this.timeReceived = this.timeCreated;
}
toNetworkImage() {
return new DeltaNetworkImageV2(this);
}
static fromNetworkImage(delta: DeltaNetworkImageV2) {
return new DeltaV2(delta);
}
static fromV1(delta: DeltaV1) {
const pointersV2: PointersV2 = {};
for (const {localContext, target, targetContext} of delta.pointers) {
if (targetContext && typeof target === "string") {
pointersV2[localContext] = {[target]: targetContext};
} else {
pointersV2[localContext] = target;
}
}
debug(`fromV1, pointers in: ${JSON.stringify(delta.pointers)}`);
debug(`fromV1, pointers out: ${JSON.stringify(pointersV2)}`);
return DeltaV2.fromNetworkImage({
...delta,
pointers: pointersV2
});
}
toV1() {
const pointersV1: PointerV1[] = [];
for (const [localContext, pointerTarget] of Object.entries(this.pointers)) {
if (pointerTarget && typeof pointerTarget === "object") {
const [obj] = Object.entries(pointerTarget)
if (!obj) throw new Error("invalid pointer target");
const [target, targetContext] = Object.entries(pointerTarget)[0];
pointersV1.push({localContext, target, targetContext});
} else {
pointersV1.push({localContext, target: pointerTarget});
}
}
return new DeltaV1({
...this,
pointers: pointersV1
});
}
}
// Alias
export class Delta extends DeltaV1 {}
export type DeltaFilter = (delta: Delta) => boolean;

View File

@ -7,8 +7,7 @@
// - As typescript interfaces?
// - As typescript classes?
import {Collection} from "./collection.js";
import {PropertyTypes} from "./types.js";
import {PropertyTypes} from "./types";
export type EntityProperties = {
[key: string]: PropertyTypes;
@ -20,11 +19,5 @@ export class Entity {
constructor(
readonly id: string,
readonly collection?: Collection
) {}
async save() {
if (!this.collection) throw new Error('to save this entity you must specify the collection');
return this.collection.put(this.id, this.properties);
}
}

View File

@ -1,20 +0,0 @@
import { add_operation, apply } from 'json-logic-js';
import { Delta } from '../delta.js';
type DeltaContext = Delta & {
creatorAddress: string;
};
add_operation('in', (needle, haystack) => {
return [...haystack].includes(needle);
});
export function applyFilter(deltas: Delta[], filterExpr: JSON): Delta[] {
return deltas.filter(delta => {
const context: DeltaContext = {
...delta,
creatorAddress: [delta.creator, delta.host].join('@'),
};
return apply(filterExpr, context);
});
}

View File

@ -1,33 +0,0 @@
import { FilterExpr } from "../types.js";
// import { map } from 'radash';
// A creator as seen by a host
type OriginPoint = {
creator: string;
host: string;
};
class Party {
originPoints: OriginPoint[];
constructor(og: OriginPoint) {
this.originPoints = [og];
}
getAddress() {
const { creator, host } = this.originPoints[0];
return `${creator}@${host}`;
}
}
const knownParties = new Set<Party>();
export const countKnownParties = () => knownParties.size;
export function generateFilter(): FilterExpr {
// map(knownParties, (p: Party) => p.address]
//
const addresses = [...knownParties.values()].map(p => p.getAddress());
return {
'in': ['$creatorAddress', addresses]
};
};

View File

@ -1,7 +1,7 @@
import express, {Router} from "express";
import {Collection} from "../collection.js";
import {Delta} from "../delta.js";
import {RhizomeNode} from "../node.js";
import {Collection} from "../collection-abstract";
import {Delta} from "../delta";
import {RhizomeNode} from "../node";
export class HttpApi {
router = Router();
@ -57,7 +57,8 @@ export class HttpApi {
});
}
serveCollection(collection: Collection) {
// serveCollection<T extends Collection>(collection: T) {
serveCollection<View>(collection: Collection<View>) {
const {name} = collection;
// Get the ID of all domain entities

View File

@ -1,6 +1,6 @@
import express, {Router} from "express";
import {RhizomeNode} from "../node.js";
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files.js";
import {RhizomeNode} from "../node";
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files";
export class HttpHtml {
router = Router();
@ -9,12 +9,6 @@ export class HttpHtml {
constructor(readonly rhizomeNode: RhizomeNode) {
this.mdFiles = new MDFiles(this.rhizomeNode);
// Scan and watch for markdown files
this.mdFiles.readDir();
this.mdFiles.readReadme();
this.mdFiles.watchDir();
this.mdFiles.watchReadme();
// Serve README
this.router.get('/README', (_req: express.Request, res: express.Response) => {
const html = this.mdFiles.getReadmeHTML();
@ -39,7 +33,15 @@ export class HttpHtml {
});
}
close() {
this.mdFiles.close();
start() {
// Scan and watch for markdown files
this.mdFiles.readDir();
this.mdFiles.readReadme();
this.mdFiles.watchDir();
this.mdFiles.watchReadme();
}
stop() {
this.mdFiles.stop();
}
}

View File

@ -1,9 +1,9 @@
import Debug from "debug";
import express from "express";
import {Server} from "http";
import {RhizomeNode} from "../node.js";
import {HttpApi} from "./api.js";
import {HttpHtml} from "./html.js";
import {RhizomeNode} from "../node";
import {HttpApi} from "./api";
import {HttpHtml} from "./html";
const debug = Debug('rz:http-api');
export class HttpServer {
@ -23,6 +23,7 @@ export class HttpServer {
start() {
const {httpAddr, httpPort} = this.rhizomeNode.config;
this.httpHtml.start();
this.server = this.app.listen({
port: httpPort,
host: httpAddr,
@ -34,6 +35,6 @@ export class HttpServer {
async stop() {
this.server?.close();
this.httpHtml.close();
this.httpHtml.stop();
}
}

View File

@ -1,8 +1,8 @@
// import Debug from 'debug';
import {EntityProperties} from "./entity.js";
import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js";
import {Lossy, valueFromCollapsedDelta} from './lossy.js';
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js";
import {EntityProperties} from "./entity";
import {CollapsedDelta, LosslessViewOne} from "./lossless";
import {Lossy} from './lossy';
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types";
// const debug = Debug('rz:lossy:last-write-wins');
type TimestampedProperty = {
@ -27,7 +27,21 @@ export type ResolvedViewMany = ViewMany<ResolvedViewOne>;
type Accumulator = LossyViewMany<TimestampedProperties>;
type Result = LossyViewMany<EntityProperties>;
// Function for resolving a value for an entity by last write wins
// Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(
key: string,
delta: CollapsedDelta
): string | number | undefined {
for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) {
return value;
}
}
}
}
// Resolve a value for an entity by last write wins
export function lastValueFromDeltas(
key: string,
deltas?: CollapsedDelta[]
@ -55,46 +69,41 @@ export function lastValueFromDeltas(
return res;
}
function initializer(): Accumulator {
return {};
};
function reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator {
if (!acc[cur.id]) {
acc[cur.id] = {id: cur.id, properties: {}};
}
for (const [key, deltas] of Object.entries(cur.propertyDeltas)) {
const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {};
if (!value || !timeUpdated) continue;
if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) {
acc[cur.id].properties[key] = {
value,
timeUpdated
};
}
}
return acc;
};
function resolver(cur: Accumulator): Result {
const res: Result = {};
for (const [id, ent] of Object.entries(cur)) {
res[id] = {id, properties: {}};
for (const [key, {value}] of Object.entries(ent.properties)) {
res[id].properties[key] = value;
}
}
return res;
};
export class LastWriteWins extends Lossy<Accumulator, Result> {
constructor(
readonly lossless: Lossless,
) {
super(lossless, initializer, reducer, resolver);
initializer(): Accumulator {
return {};
}
reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator {
if (!acc[cur.id]) {
acc[cur.id] = {id: cur.id, properties: {}};
}
for (const [key, deltas] of Object.entries(cur.propertyDeltas)) {
const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {};
if (!value || !timeUpdated) continue;
if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) {
acc[cur.id].properties[key] = {
value,
timeUpdated
};
}
}
return acc;
};
resolver(cur: Accumulator): Result {
const res: Result = {};
for (const [id, ent] of Object.entries(cur)) {
res[id] = {id, properties: {}};
for (const [key, {value}] of Object.entries(ent.properties)) {
res[id].properties[key] = value;
}
}
return res;
};
}

View File

@ -3,15 +3,15 @@
import Debug from 'debug';
import EventEmitter from 'events';
import {Delta, DeltaFilter, DeltaID, DeltaNetworkImage} from './delta.js';
import {RhizomeNode} from './node.js';
import {Transactions} from './transactions.js';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js";
import {Delta, DeltaFilter, DeltaID, DeltaNetworkImageV1} from './delta';
import {RhizomeNode} from './node';
import {Transactions} from './transactions';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types";
const debug = Debug('rz:lossless');
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
export type CollapsedDelta = Omit<DeltaNetworkImage, 'pointers'> & {
export type CollapsedDelta = Omit<DeltaNetworkImageV1, 'pointers'> & {
pointers: CollapsedPointer[];
};
@ -122,9 +122,7 @@ export class Lossless {
}
viewSpecific(entityId: DomainEntityID, deltaIds: DeltaID[], deltaFilter?: DeltaFilter): LosslessViewOne | undefined {
debug(`[${this.rhizomeNode.config.peerId}]`, `viewSpecific, deltaIds:`, JSON.stringify(deltaIds));
const combinedFilter = (delta: Delta) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `combinedFilter, deltaIds:`, JSON.stringify(deltaIds));
if (!deltaIds.includes(delta.id)) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Excluding delta ${delta.id} because it's not in the requested list of deltas`);
return false;
@ -191,7 +189,6 @@ export class Lossless {
};
}
debug(`[${this.rhizomeNode.config.peerId}]`, `Returning view:`, JSON.stringify(view, null, 2));
return view;
}

View File

@ -3,39 +3,22 @@
// into various possible "lossy" views that combine or exclude some information.
import Debug from 'debug';
import {DeltaFilter, DeltaID} from "./delta.js";
import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js";
import {DomainEntityID} from "./types.js";
import {DeltaFilter, DeltaID} from "./delta";
import {Lossless, LosslessViewOne} from "./lossless";
import {DomainEntityID} from "./types";
const debug = Debug('rz:lossy');
export type Initializer<Accumulator> = (v: LosslessViewOne) => Accumulator;
export type Reducer<Accumulator> = (acc: Accumulator, cur: LosslessViewOne) => Accumulator;
export type Resolver<Accumulator, Result> = (cur: Accumulator) => Result;
// Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(
key: string,
delta: CollapsedDelta
): string | number | undefined {
for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) {
return value;
}
}
}
}
// We support incremental updates of lossy models.
export class Lossy<Accumulator, Result> {
export abstract class Lossy<Accumulator, Result> {
deltaFilter?: DeltaFilter;
accumulator?: Accumulator;
abstract initializer(v: LosslessViewOne): Accumulator;
abstract reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator;
abstract resolver(cur: Accumulator): Result;
constructor(
readonly lossless: Lossless,
readonly initializer: Initializer<Accumulator>,
readonly reducer: Reducer<Accumulator>,
readonly resolver: Resolver<Accumulator, Result>,
) {
this.lossless.eventStream.on("updated", (id, deltaIds) => {
debug(`[${this.lossless.rhizomeNode.config.peerId}] entity ${id} updated, deltaIds:`,

View File

@ -1,11 +1,11 @@
import Debug from 'debug';
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB_TOPIC, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config.js';
import {DeltaStream} from './deltas.js';
import {HttpServer} from './http/index.js';
import {Lossless} from './lossless.js';
import {parseAddressList, PeerAddress, Peers} from './peers.js';
import {PubSub} from './pub-sub.js';
import {RequestReply} from './request-reply.js';
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config';
import {DeltaStream} from './delta-stream';
import {HttpServer} from './http/index';
import {Lossless} from './lossless';
import {parseAddressList, PeerAddress, Peers} from './peers';
import {PubSub} from './pub-sub';
import {RequestReply} from './request-reply';
const debug = Debug('rz:rhizome-node');
export type RhizomeNodeConfig = {
@ -21,7 +21,6 @@ export type RhizomeNodeConfig = {
seedPeers: PeerAddress[];
peerId: string;
creator: string; // TODO each host should be able to support multiple users
pubSubTopic: string;
};
// So that we can run more than one instance in the same process (for testing)
@ -50,7 +49,6 @@ export class RhizomeNode {
seedPeers: parseAddressList(SEED_PEERS),
peerId: PEER_ID,
creator: CREATOR,
pubSubTopic: PUB_SUB_TOPIC,
...config
};
debug(`[${this.config.peerId}]`, 'Config', this.config);
@ -76,10 +74,7 @@ export class RhizomeNode {
// Bind ZeroMQ publish socket
// TODO: Config option to enable zmq pubsub
// await this.pubSub.startZmq();
// Initialize Libp2p
await this.pubSub.startLibp2p();
await this.pubSub.startZmq();
// Bind ZeroMQ request socket
// TODO: request/reply via libp2p?
@ -91,22 +86,12 @@ export class RhizomeNode {
this.httpServer.start();
}
{
// Start libp2p subscription
// TODO: Config option to enable gossipsub
// TODO: Config options for gossipsub and other libp2p configs
this.peers.start();
// Wait a short time for peers to connect
await new Promise((resolve) => setTimeout(resolve, 200));
}
{
// Wait a short time for sockets to initialize
// await new Promise((resolve) => setTimeout(resolve, 500));
await new Promise((resolve) => setTimeout(resolve, 500));
// Subscribe to seed peers
// this.peers.subscribeToSeeds();
this.peers.subscribeToSeeds();
// Wait a short time for sockets to initialize
// await new Promise((resolve) => setTimeout(resolve, 500));

View File

@ -1,9 +1,9 @@
import Debug from 'debug';
import {Message} from 'zeromq';
import {Delta} from "./delta.js";
import {RhizomeNode} from "./node.js";
import {Subscription} from './pub-sub.js';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
import {Delta} from "./delta";
import {RhizomeNode} from "./node";
import {Subscription} from './pub-sub';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
const debug = Debug('rz:peers');
export class PeerAddress {
@ -69,10 +69,12 @@ class Peer {
async subscribeDeltas() {
if (!this.publishAddr) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
debug(`[${this.rhizomeNode.config.peerId}]`,
`Requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
const res = await this.request(RequestMethods.GetPublishAddress);
this.publishAddr = PeerAddress.fromString(res.toString());
debug(`[${this.rhizomeNode.config.peerId}]`, `Received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
debug(`[${this.rhizomeNode.config.peerId}]`,
`Received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
}
debug(`[${this.rhizomeNode.config.peerId}]`, `Subscribing to peer ${this.reqAddr.toAddrString()}`);
@ -80,7 +82,7 @@ class Peer {
// ZeroMQ subscription
this.subscription = this.rhizomeNode.pubSub.subscribe(
this.publishAddr,
this.rhizomeNode.config.pubSubTopic,
"deltas",
(sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender;
@ -135,19 +137,6 @@ export class Peers {
});
}
start() {
// TODO: Move this somewhere that makes more sense
this.rhizomeNode.pubSub.subscribeTopic(
this.rhizomeNode.config.pubSubTopic,
(sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg);
delta.receivedFrom = sender;
debug(`[${this.rhizomeNode.config.peerId}]`, `Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta);
}
);
}
stop() {
debug(`[${this.rhizomeNode.config.peerId}]`, 'Closing all peer request sockets');
for (const peer of this.peers) {

View File

@ -1,15 +1,7 @@
import {GossipSub, gossipsub} from '@chainsafe/libp2p-gossipsub';
import {noise} from '@chainsafe/libp2p-noise';
import {yamux} from '@chainsafe/libp2p-yamux';
import {identify} from '@libp2p/identify';
import {mdns} from '@libp2p/mdns';
import {ping} from '@libp2p/ping';
import {tcp} from '@libp2p/tcp';
import Debug from 'debug';
import {Libp2p, createLibp2p} from 'libp2p';
import {Publisher, Subscriber} from 'zeromq';
import {RhizomeNode} from './node.js';
import {PeerAddress} from './peers.js';
import {RhizomeNode} from './node';
import {PeerAddress} from './peers';
const debug = Debug('rz:pub-sub');
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
@ -21,20 +13,17 @@ export class Subscription {
publishAddr: PeerAddress;
publishAddrStr: string;
cb: SubscribedMessageHandler;
libp2p?: Libp2p;
constructor(
readonly pubSub: PubSub,
publishAddr: PeerAddress,
topic: string,
cb: SubscribedMessageHandler,
libp2p?: Libp2p
) {
this.cb = cb;
this.topic = topic;
this.publishAddr = publishAddr;
this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
this.libp2p = libp2p;
}
async start() {
@ -60,7 +49,6 @@ export class PubSub {
publishSock?: Publisher;
publishAddrStr: string;
subscriptions: Subscription[] = [];
libp2p?: Libp2p;
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
@ -76,33 +64,6 @@ export class PubSub {
debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
}
async startLibp2p() {
this.libp2p = await createLibp2p({
addresses: {
// TODO: Config
listen: ['/ip4/127.0.0.1/tcp/0']
},
transports: [tcp()],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [mdns()],
services: {
pubsub: gossipsub(),
identify: identify(),
ping: ping(),
}
});
this.libp2p.addEventListener("peer:discovery", (event) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `Found peer: ${JSON.stringify(event.detail)}`);
this.libp2p?.dial(event.detail.multiaddrs);
});
this.libp2p.addEventListener("peer:connect", (event) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `Connected to peer: ${JSON.stringify(event.detail)}`);
});
}
async publish(topic: string, msg: string) {
if (this.publishSock) {
await this.publishSock.send([
@ -112,44 +73,6 @@ export class PubSub {
]);
debug(`[${this.rhizomeNode.config.peerId}]`, `Published to ZeroMQ, msg: ${msg}`);
}
if (this.libp2p) {
const pubsub = this.libp2p.services.pubsub as GossipSub;
let published = false;
try {
await pubsub.publish(topic, Buffer.from(msg));
published = true;
} catch (e: unknown) {
if (!((e as Error).message as string).match("PublishError.NoPeersSubscribedToTopic")) {
debug(`[${this.rhizomeNode.config.peerId}]`, 'Libp2p publish:', (e as Error).message);
}
}
if (published) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Published to Libp2p, msg: ${msg}`);
}
}
}
subscribedTopics = new Set<string>();
subscribeTopic(topic: string, cb: SubscribedMessageHandler) {
if (!this.libp2p) throw new Error('libp2p not initialized');
const pubsub = this.libp2p.services.pubsub as GossipSub;
// TODO: If we subscribe to multiple topics this callback will be duplicated
pubsub.addEventListener("message", (event) => {
const msg = Buffer.from(event.detail.data).toString();
debug(`[${this.rhizomeNode.config.peerId}]`, `Libp2p subscribtion received msg: ${msg}`);
cb(new PeerAddress('libp2p', 0), msg);
});
// Add to our list of subscribed topics so we can unsubscribe later.
// Also has the effect of not calling subscribe more than once per topic.
if (!this.subscribedTopics.has(topic)) {
pubsub.subscribe(topic);
this.subscribedTopics.add(topic);
debug(`[${this.rhizomeNode.config.peerId}]`, 'Subscribed topics:', Array.from(this.subscribedTopics.keys()));
}
}
subscribe(
@ -157,7 +80,7 @@ export class PubSub {
topic: string,
cb: SubscribedMessageHandler
): Subscription {
const subscription = new Subscription(this, publishAddr, topic, cb, this.libp2p);
const subscription = new Subscription(this, publishAddr, topic, cb);
this.subscriptions.push(subscription);
return subscription;
}
@ -173,25 +96,5 @@ export class PubSub {
for (const subscription of this.subscriptions) {
subscription.sock.close();
}
if (this.libp2p) {
const pubsub = this.libp2p.services.pubsub as GossipSub;
pubsub.removeEventListener("message");
for (const topic of this.subscribedTopics) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Unsubscribing Libp2p topic ${topic}`);
pubsub.unsubscribe(topic)
}
debug(`[${this.rhizomeNode.config.peerId}]`, 'Stopping gossipsub');
await pubsub.stop();
await this.libp2p.stop();
debug(`[${this.rhizomeNode.config.peerId}]`, 'Stopped libp2p');
}
}
}

View File

@ -1,8 +1,8 @@
import Debug from 'debug';
import {EventEmitter} from 'node:events';
import {Message, Reply, Request} from 'zeromq';
import {RhizomeNode} from './node.js';
import {PeerAddress, RequestMethods} from './peers.js';
import {RhizomeNode} from './node';
import {PeerAddress, RequestMethods} from './peers';
const debug = Debug('rz:request-reply');
export type PeerRequest = {
@ -12,15 +12,18 @@ export type PeerRequest = {
export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
export class RequestSocket {
sock = new Request();
sock?: Request;
addrStr: string;
constructor(readonly requestReply: RequestReply, addr: PeerAddress) {
const addrStr = `tcp://${addr.addr}:${addr.port}`;
this.sock.connect(addrStr);
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${addrStr}`);
this.addrStr = `tcp://${addr.addr}:${addr.port}`;
this.sock = new Request();
this.sock.connect(this.addrStr);
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${this.addrStr}`);
}
async request(method: RequestMethods): Promise<Message> {
if (!this.sock) throw new Error('Request socket is undefined');
const req: PeerRequest = {
method
};
@ -34,7 +37,9 @@ export class RequestSocket {
}
close() {
this.sock.close();
this.sock?.close();
// Make sure it goes out of scope
this.sock = undefined;
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, 'Request socket closed');
}
}
@ -63,7 +68,7 @@ function peerRequestFromMsg(msg: Message): PeerRequest | null {
export class RequestReply {
rhizomeNode: RhizomeNode;
replySock = new Reply();
replySock?: Reply;
requestStream = new EventEmitter();
requestBindAddrStr: string;
@ -75,6 +80,7 @@ export class RequestReply {
// Listen for incoming requests
async start() {
this.replySock = new Reply();
await this.replySock.bind(this.requestBindAddrStr);
debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`);
@ -90,8 +96,10 @@ export class RequestReply {
// Each handler will get a copy of every message.
registerRequestHandler(handler: RequestHandler) {
this.requestStream.on('request', (req) => {
const res = new ResponseSocket(this.replySock);
handler(req, res);
if (this.replySock) {
const res = new ResponseSocket(this.replySock);
handler(req, res);
}
});
}
@ -100,9 +108,11 @@ export class RequestReply {
}
async stop() {
await this.replySock.unbind(this.requestBindAddrStr);
this.replySock.close();
this.replySock = new Reply();
debug(`[${this.rhizomeNode.config.peerId}]`, 'Reply socket closed');
if (this.replySock) {
await this.replySock.unbind(this.requestBindAddrStr);
this.replySock.close();
this.replySock = undefined;
debug(`[${this.rhizomeNode.config.peerId}]`, 'Reply socket closed');
}
}
}

View File

@ -1,5 +1,5 @@
import { Level } from 'level';
import { LEVEL_DB_DIR } from './config.js';
import { LEVEL_DB_DIR } from './config';
import path from 'path';
function newStore(name: string): Level {

View File

@ -1,8 +1,8 @@
import Debug from "debug";
import EventEmitter from "events";
import {Delta, DeltaID} from "./delta.js";
import {Lossless} from "./lossless.js";
import {DomainEntityID, TransactionID} from "./types.js";
import {Delta, DeltaID} from "./delta";
import {Lossless} from "./lossless";
import {DomainEntityID, TransactionID} from "./types";
const debug = Debug('rz:transactions');
function getDeltaTransactionId(delta: Delta): TransactionID | undefined {

View File

@ -4,7 +4,7 @@ export type FilterExpr = JSONLogic;
export type FilterGenerator = () => FilterExpr;
export type PropertyTypes = string | number | undefined;
export type PropertyTypes = string | number | null;
export type DomainEntityID = string;
export type PropertyID = string;

View File

@ -2,7 +2,7 @@ import Debug from "debug";
import {FSWatcher, readdirSync, readFileSync, watch} from "fs";
import path, {join} from "path";
import showdown from "showdown";
import {RhizomeNode} from "../node.js";
import {RhizomeNode} from "../node";
const {Converter} = showdown;
const debug = Debug('rz:md-files');
@ -130,7 +130,7 @@ export class MDFiles {
});
}
close() {
stop() {
this.dirWatcher?.close();
this.readmeWatcher?.close();
}

View File

@ -1,17 +1,12 @@
{
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"target": "ES6",
"module": "CommonJS",
"esModuleInterop": true,
"moduleResolution": "bundler",
"sourceMap": false,
"isolatedModules": true,
/* "allowImportingTsExtensions": true, */
/* "noEmit": true, */
"moduleResolution": "Node",
"sourceMap": true,
"baseUrl": ".",
"outDir": "dist",
"lib": ["ESNext"],
"types": ["node", "jest"],
"importsNotUsedAsValues": "remove",
"strict": true,
"skipLibCheck": true,
@ -24,10 +19,5 @@
"scratch/**/*",
"__tests__/**/*"
],
"exclude": [
"node_modules"
],
"tsc-alias": {
"resolveFullPaths": true
}
"exclude": ["node_modules"]
}

View File

@ -1,6 +1,5 @@
import {RhizomeNode, RhizomeNodeConfig} from "../src/node.js";
import {Collection} from "../src/collection.js";
import {randomUUID} from "crypto";
import {BasicCollection} from "../src/collection-basic";
import {RhizomeNode, RhizomeNodeConfig} from "../src/node";
const start = 5000;
const range = 5000;
@ -15,11 +14,10 @@ export class App extends RhizomeNode {
publishBindPort: getRandomPort(),
requestBindPort: getRandomPort(),
httpPort: getRandomPort(),
pubSubTopic: config?.pubSubTopic || `deltas-${randomUUID()}`,
...config,
});
const users = new Collection("user");
const users = new BasicCollection("user");
users.rhizomeConnect(this);
const {httpAddr, httpPort} = this.config;