added delta ids; refactored delta to its own file; refactored http server

This commit is contained in:
Ladd Hoffman 2024-12-27 13:43:43 -06:00
parent 9ec95ca8d8
commit 8fdec5da11
23 changed files with 386 additions and 290 deletions

View File

@ -52,7 +52,7 @@ npm run build:watch
## Run tests ## Run tests
```bash ```bash
npm run test npm test
``` ```
## Run test coverage report ## Run test coverage report
@ -100,22 +100,22 @@ In a separate terminal, you can use `curl` to interact with an instance.
Query the number of peers seen by a given node (including itself) Query the number of peers seen by a given node (including itself)
```bash ```bash
curl -s http://localhost:3000/peers/count | jq curl -s http://localhost:3000/api/peers/count | jq
``` ```
Query the list of peers seen by a given node (including itself) Query the list of peers seen by a given node (including itself)
```bash ```bash
curl -s http://localhost:3000/peers | jq curl -s http://localhost:3000/api/peers | jq
``` ```
Query the number of deltas ingested by this node Query the number of deltas ingested by this node
```bash ```bash
curl -s http://localhost:3000/deltas/count | jq curl -s http://localhost:3000/api/deltas/count | jq
``` ```
Query the list of deltas ingested by this node Query the list of deltas ingested by this node
```bash ```bash
curl -s http://localhost:3000/deltas | jq curl -s http://localhost:3000/api/deltas | jq
``` ```
The example creates a `new TypedCollection<User>("user")` and calls `connectRhizome` to join it with the network. The example creates a `new TypedCollection<User>("user")` and calls `connectRhizome` to join it with the network.
@ -123,17 +123,17 @@ The collection is synchronized across the cluster and optionally CRUD type opera
Query the list of User IDs Query the list of User IDs
```bash ```bash
curl -s http://localhost:3000/user/ids curl -s http://localhost:3000/api/user/ids
``` ```
Query the list of User IDs Query the list of User IDs
```bash ```bash
curl -s http://localhost:3000/user/ids curl -s http://localhost:3000/api/user/ids
``` ```
Read a User by ID Read a User by ID
```bash ```bash
curl -s http://localhost:3000/user/taliesin-1 curl -s http://localhost:3000/api/user/taliesin-1
``` ```
Create a User Create a User
@ -145,7 +145,7 @@ cat <<EOF >/tmp/user.json
"nameLong": "optional", "nameLong": "optional",
"email": "optional"}} "email": "optional"}}
EOF EOF
curl -s -X PUT -H 'content-type:application/json' -d @/tmp/user.json http://localhost:3000/user | jq curl -s -X PUT -H 'content-type:application/json' -d @/tmp/user.json http://localhost:3000/api/user | jq
``` ```
# More About Concepts # More About Concepts

View File

@ -1,9 +1,9 @@
import {Delta, DeltaFilter} from '../src/delta';
import {Lossless} from '../src/lossless'; import {Lossless} from '../src/lossless';
import {Delta, DeltaFilter} from '../src/types';
describe('Lossless', () => { describe('Lossless', () => {
it('creates a lossless view of keanu as neo in the matrix', () => { it('creates a lossless view of keanu as neo in the matrix', () => {
const delta: Delta = { const delta = new Delta({
creator: 'a', creator: 'a',
host: 'h', host: 'h',
pointers: [{ pointers: [{
@ -25,13 +25,13 @@ describe('Lossless', () => {
localContext: "salary_currency", localContext: "salary_currency",
target: "usd" target: "usd"
}] }]
}; });
const lossless = new Lossless(); const lossless = new Lossless();
lossless.ingestDelta(delta); lossless.ingestDelta(delta);
expect(lossless.view()).toEqual({ expect(lossless.view()).toMatchObject({
keanu: { keanu: {
referencedAs: ["actor"], referencedAs: ["actor"],
properties: { properties: {
@ -87,7 +87,7 @@ describe('Lossless', () => {
const lossless = new Lossless(); const lossless = new Lossless();
beforeAll(() => { beforeAll(() => {
lossless.ingestDelta({ lossless.ingestDelta(new Delta({
creator: 'A', creator: 'A',
host: 'H', host: 'H',
pointers: [{ pointers: [{
@ -95,9 +95,9 @@ describe('Lossless', () => {
target: "ace", target: "ace",
targetContext: "value" targetContext: "value"
}] }]
}); }));
lossless.ingestDelta({ lossless.ingestDelta(new Delta({
creator: 'B', creator: 'B',
host: 'H', host: 'H',
pointers: [{ pointers: [{
@ -106,9 +106,9 @@ describe('Lossless', () => {
target: "ace", target: "ace",
targetContext: "value" targetContext: "value"
}] }]
}); }));
expect(lossless.view()).toEqual({ expect(lossless.view()).toMatchObject({
ace: { ace: {
referencedAs: ["1", "14"], referencedAs: ["1", "14"],
properties: { properties: {
@ -135,7 +135,7 @@ describe('Lossless', () => {
return creator === 'A' && host === 'H'; return creator === 'A' && host === 'H';
}; };
expect(lossless.view(undefined, filter)).toEqual({ expect(lossless.view(undefined, filter)).toMatchObject({
ace: { ace: {
referencedAs: ["1"], referencedAs: ["1"],
properties: { properties: {
@ -150,7 +150,7 @@ describe('Lossless', () => {
} }
}); });
expect(lossless.view(["ace"], filter)).toEqual({ expect(lossless.view(["ace"], filter)).toMatchObject({
ace: { ace: {
referencedAs: ["1"], referencedAs: ["1"],
properties: { properties: {

View File

@ -1,6 +1,6 @@
import {Delta, PointerTarget} from "../src/delta";
import {Lossless, LosslessViewMany} from "../src/lossless"; import {Lossless, LosslessViewMany} from "../src/lossless";
import {Lossy, firstValueFromLosslessViewOne, valueFromCollapsedDelta} from "../src/lossy"; import {Lossy, firstValueFromLosslessViewOne, valueFromCollapsedDelta} from "../src/lossy";
import {PointerTarget} from "../src/types";
describe('Lossy', () => { describe('Lossy', () => {
describe('se a provided function to resolve entity views', () => { describe('se a provided function to resolve entity views', () => {
@ -8,7 +8,7 @@ describe('Lossy', () => {
const lossy = new Lossy(lossless); const lossy = new Lossy(lossless);
beforeAll(() => { beforeAll(() => {
lossless.ingestDelta({ lossless.ingestDelta(new Delta({
creator: 'a', creator: 'a',
host: 'h', host: 'h',
pointers: [{ pointers: [{
@ -30,7 +30,7 @@ describe('Lossy', () => {
localContext: "salary_currency", localContext: "salary_currency",
target: "usd" target: "usd"
}] }]
}); }));
}); });
it('example summary', () => { it('example summary', () => {
@ -63,7 +63,7 @@ describe('Lossy', () => {
return {roles}; return {roles};
} }
const result = lossy.resolve(resolver); const result = lossy.resolve<Summary>(resolver);
expect(result).toEqual({ expect(result).toEqual({
roles: [{ roles: [{
film: "the_matrix", film: "the_matrix",

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {RhizomeNode} from "./node"; import {RhizomeNode} from "../src/node";
import {Entity} from "./entity"; import {Entity} from "../src/entity";
import {TypedCollection} from "./typed-collection"; import {TypedCollection} from "../src/typed-collection";
const debug = Debug('example-app'); const debug = Debug('example-app');
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.
@ -30,11 +30,16 @@ type User = {
debug('New user!:', u); debug('New user!:', u);
}); });
await rhizomeNode.start();
await rhizomeNode.start() // Let's use the rhizomic database for some more things.
// Like what?
// - Logging
// - Chat
//
const taliesin = users.put(undefined, { const taliesin = await users.put(undefined, {
// id: 'taliesin-1', id: 'taliesin-1',
name: 'Taliesin', name: 'Taliesin',
nameLong: 'Taliesin (Ladd)', nameLong: 'Taliesin (Ladd)',
age: Math.floor(Math.random() * 1000) age: Math.floor(Math.random() * 1000)

View File

@ -1,38 +1,61 @@
> rhizome-node@1.0.0 test > rhizome-node@0.1.0 test
> jest --coverage > jest --coverage
PASS __tests__/lossy.ts
PASS __tests__/peer-address.ts
PASS __tests__/lossless.ts PASS __tests__/lossless.ts
PASS __tests__/run/001-single-node.ts PASS __tests__/peer-address.ts
PASS __tests__/run/002-two-nodes.ts FAIL __tests__/lossy.ts
----------------------|---------|----------|---------|---------|------------------------------------------------ ● Test suite failed to run
__tests__/lossy.ts:66:36 - error TS2345: Argument of type '(losslessView: LosslessViewMany) => Summary' is not assignable to parameter of type 'Resolver'.
Type 'Summary' is not assignable to type 'LossyViewMany'.
Property 'roles' is incompatible with index signature.
Type 'Role[]' is missing the following properties from type 'LossyViewOne<Properties>': id, properties
66 const result = lossy.resolve(resolver);
   ~~~~~~~~
FAIL __tests__/run/002-two-nodes.ts
● Run can create a record on app0 and read it on app1
SyntaxError: Unexpected token '<', "<!DOCTYPE "... is not valid JSON
at JSON.parse (<anonymous>)
FAIL __tests__/run/001-single-node.ts
● Run can put a new user and fetch it
SyntaxError: Unexpected token '<', "<!DOCTYPE "... is not valid JSON
at JSON.parse (<anonymous>)
----------------------|---------|----------|---------|---------|-------------------------------------------------------
File | % Stmts | % Branch | % Funcs | % Lines | Uncovered Line #s File | % Stmts | % Branch | % Funcs | % Lines | Uncovered Line #s
----------------------|---------|----------|---------|---------|------------------------------------------------ ----------------------|---------|----------|---------|---------|-------------------------------------------------------
All files | 85.48 | 61.26 | 82.83 | 85.93 | All files | 79.08 | 44.65 | 78.26 | 79.29 |
src | 88.35 | 68.06 | 84.21 | 88.52 | src | 79.55 | 47.05 | 77.96 | 79.45 |
collection.ts | 89.61 | 71.42 | 73.33 | 91.89 | 62-65,114-122 collection.ts | 53.76 | 26.66 | 57.89 | 54.02 | 54-123,131-135,155,195,226,233,246
config.ts | 94.44 | 89.65 | 50 | 94.44 | 22 config.ts | 100 | 72.41 | 100 | 100 | 7-10,12,15,17-20
deltas.ts | 64.44 | 50 | 76.92 | 64.44 | 27-30,42-46,55-56,64-73 deltas.ts | 77.77 | 62.5 | 84.61 | 77.77 | 42-46,64-73
entity.ts | 100 | 100 | 100 | 100 | entity.ts | 25 | 100 | 0 | 25 | 17-21
http-api.ts | 61.19 | 13.04 | 44.44 | 61.19 | 32,37,44-60,66,79-80,85-92,100,129-130,145-151 http-api.ts | 51.51 | 13.04 | 33.33 | 51.51 | 32,37,44-60,66,79-80,85-92,97,117,122-128,136,141-147
lossless.ts | 98.27 | 91.66 | 100 | 100 | 96 lossless.ts | 100 | 100 | 100 | 100 |
lossy.ts | 100 | 85.71 | 100 | 100 | 38 lossy.ts | 43.75 | 0 | 50 | 46.66 | 26-29,37-40
node.ts | 100 | 100 | 100 | 100 | node.ts | 100 | 100 | 100 | 100 |
peers.ts | 96.82 | 100 | 100 | 96.61 | 125-126 peers.ts | 100 | 100 | 100 | 100 |
pub-sub.ts | 100 | 100 | 100 | 100 | pub-sub.ts | 100 | 100 | 100 | 100 |
request-reply.ts | 95.65 | 0 | 100 | 95.34 | 46,59 request-reply.ts | 95.65 | 0 | 100 | 95.34 | 46,59
typed-collection.ts | 100 | 100 | 100 | 100 | typed-collection.ts | 100 | 100 | 100 | 100 |
types.ts | 100 | 100 | 100 | 100 | types.ts | 81.25 | 100 | 85.71 | 81.25 | 11-14
src/util | 58.62 | 26.08 | 72.22 | 60.37 | src/util | 70.68 | 30.43 | 77.77 | 73.58 |
md-files.ts | 58.62 | 26.08 | 72.22 | 60.37 | 53-57,74-77,91-108,116-123 md-files.ts | 70.68 | 30.43 | 77.77 | 73.58 | 53-57,74-77,98-102,116-123
util | 100 | 100 | 100 | 100 | util | 100 | 100 | 100 | 100 |
app.ts | 100 | 100 | 100 | 100 | app.ts | 100 | 100 | 100 | 100 |
----------------------|---------|----------|---------|---------|------------------------------------------------ ----------------------|---------|----------|---------|---------|-------------------------------------------------------
Test Suites: 3 failed, 2 passed, 5 total
Test Suites: 5 passed, 5 total Tests: 2 failed, 4 passed, 6 total
Tests: 7 passed, 7 total
Snapshots: 0 total Snapshots: 0 total
Time: 3.831 s, estimated 4 s Time: 3.777 s, estimated 5 s
Ran all test suites. Ran all test suites.

View File

@ -1,20 +1,20 @@
{ {
"name": "rhizome-node", "name": "rhizome-node",
"version": "1.0.0", "version": "0.1.0",
"description": "Rhizomatic database engine node", "description": "Rhizomatic database engine node",
"scripts": { "scripts": {
"example-app": "node dist/example-app.js",
"build": "tsc", "build": "tsc",
"build:watch": "tsc --watch", "build:watch": "tsc --watch",
"lint": "eslint", "lint": "eslint",
"test": "jest", "test": "jest",
"coverage": "./scripts/coverage.sh" "coverage": "./scripts/coverage.sh",
"example-app": "node dist/examples/app.js"
}, },
"jest": { "jest": {
"testEnvironment": "node", "testEnvironment": "node",
"preset": "ts-jest" "preset": "ts-jest"
}, },
"author": "", "author": "Taliesin (Ladd) <ladd@dgov.io>",
"license": "Unlicense", "license": "Unlicense",
"dependencies": { "dependencies": {
"@types/bluebird": "^3.5.42", "@types/bluebird": "^3.5.42",

View File

@ -13,4 +13,5 @@ done
dest="./markdown/coverage_report.md" dest="./markdown/coverage_report.md"
npm run test -- --coverage 2>&1 | tee | sed 's/\s*$//' > "$dest" npm run test -- --coverage 2>&1 | tee "$dest"
sed -i 's/\s*$//' "$dest"

View File

@ -6,11 +6,12 @@
import Debug from 'debug'; import Debug from 'debug';
import {randomUUID} from "node:crypto"; import {randomUUID} from "node:crypto";
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import {Entity} from "./entity"; import {Delta, DeltaID} from "./delta";
import {Entity, EntityProperties} from "./entity";
import {Lossless, LosslessViewMany} from "./lossless"; import {Lossless, LosslessViewMany} from "./lossless";
import {firstValueFromLosslessViewOne, Lossy, LossyViewMany, LossyViewOne} from "./lossy"; import {firstValueFromLosslessViewOne, Lossy, LossyViewMany, LossyViewOne} from "./lossy";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node";
import {Delta} from "./types"; import {DomainEntityID} from "./types";
const debug = Debug('collection'); const debug = Debug('collection');
export class Collection { export class Collection {
@ -19,9 +20,18 @@ export class Collection {
entities = new Map<string, Entity>(); entities = new Map<string, Entity>();
eventStream = new EventEmitter(); eventStream = new EventEmitter();
lossless = new Lossless(); // TODO: Really just need one global Lossless instance lossless = new Lossless(); // TODO: Really just need one global Lossless instance
lossy: Lossy;
constructor(name: string) { constructor(name: string) {
this.name = name; this.name = name;
this.lossy = new Lossy(this.lossless);
}
ingestDelta(delta: Delta) {
const updated = this.lossless.ingestDelta(delta);
this.eventStream.emit('ingested', delta);
this.eventStream.emit('updated', updated);
} }
// Instead of trying to update our final view of the entity with every incoming delta, // Instead of trying to update our final view of the entity with every incoming delta,
@ -36,48 +46,69 @@ export class Collection {
rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => { rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => {
// TODO: Make sure this is the kind of delta we're looking for // TODO: Make sure this is the kind of delta we're looking for
debug(`collection ${this.name} received delta ${JSON.stringify(delta)}`); debug(`collection ${this.name} received delta ${JSON.stringify(delta)}`);
this.lossless.ingestDelta(delta); this.ingestDelta(delta);
}); });
rhizomeNode.httpApi.serveCollection(this); rhizomeNode.httpServer.httpApi.serveCollection(this);
debug(`connected ${this.name} to rhizome`); debug(`connected ${this.name} to rhizome`);
} }
// Applies the javascript rules for updating object values, onCreate(cb: (entity: Entity) => void) {
// e.g. set to `undefined` to delete a property // TODO: Trigger for changes received from peers
updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity { this.eventStream.on('create', (entity: Entity) => {
let entity: Entity | undefined; cb(entity);
let eventType: 'create' | 'update' | 'delete' | undefined; });
entityId = entityId ?? randomUUID();
entity = this.entities.get(entityId);
if (!entity) {
entity = new Entity(entityId);
entity.id = entityId;
eventType = 'create';
} }
if (!properties) { onUpdate(cb: (entity: Entity) => void) {
// Let's interpret this as entity deletion // TODO: Trigger for changes received from peers
this.entities.delete(entityId); this.eventStream.on('update', (entity: Entity) => {
// TODO: prepare and publish a delta cb(entity);
// TODO: execute hooks });
eventType = 'delete';
} else {
let anyChanged = false;
Object.entries(properties).forEach(([key, value]) => {
if (key === 'id') return;
let changed = false;
if (entity.properties && entity.properties[key] !== value) {
entity.properties[key] = value;
changed = true;
} }
if (local && changed) {
// If this is a change, let's generate a delta defaultResolver(losslessView: LosslessViewMany): LossyViewMany {
if (!this.rhizomeNode) throw new Error(`${this.name} collection not connected to rhizome`); const resolved: LossyViewMany = {};
const delta: Delta = { debug('default resolver, lossless view', JSON.stringify(losslessView));
creator: this.rhizomeNode.config.creator, for (const [id, ent] of Object.entries(losslessView)) {
host: this.rhizomeNode.config.peerId, resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = firstValueFromLosslessViewOne(ent, key) || {};
debug(`[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
}
}
return resolved;
}
// Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property
generateDeltas(
entityId: DomainEntityID,
newProperties: EntityProperties,
creator?: string,
host?: string
): Delta[] {
const deltas: Delta[] = [];
let oldProperties: EntityProperties = {};
if (entityId) {
const entity = this.get(entityId);
if (entity) {
oldProperties = entity.properties;
}
}
// Generate a delta for each changed property
Object.entries(newProperties).forEach(([key, value]) => {
// Disallow property named "id" TODO: Clarify id semantics
if (key === 'id') return;
if (oldProperties[key] !== value && host && creator) {
deltas.push(new Delta({
creator,
host,
pointers: [{ pointers: [{
localContext: this.name, localContext: this.name,
target: entityId, target: entityId,
@ -86,49 +117,50 @@ export class Collection {
localContext: key, localContext: key,
target: value target: value
}] }]
}; }));
deltas?.push(delta);
// We append to the array the caller may provide
// We can update this count as we receive network confirmation for deltas
entity.ahead += 1;
} }
anyChanged = anyChanged || changed;
}); });
this.entities.set(entityId, entity); return deltas;
if (anyChanged) {
eventType = eventType || 'update';
}
}
if (eventType) {
// TODO: Reconcile this with lossy view approach
this.eventStream.emit(eventType, entity);
}
return entity;
} }
onCreate(cb: (entity: Entity) => void) { async put(
// TODO: Reconcile this with lossy view approach entityId: DomainEntityID | undefined,
this.eventStream.on('create', (entity: Entity) => { properties: EntityProperties
cb(entity); ): Promise<LossyViewOne> {
}); // const deltas: Delta[] = [];
// const entity = this.updateEntity(entityId, properties, true, deltas);
// THIS PUT SHOULD CORRESOND TO A PARTICULAR MATERIALIZED VIEW...
// How can we encode that?
// Well, we have a way to do that, we just need the same particular inputs
if (!entityId) {
entityId = randomUUID();
} }
onUpdate(cb: (entity: Entity) => void) { const deltas = this.generateDeltas(
// TODO: Reconcile this with lossy view approach entityId,
this.eventStream.on('update', (entity: Entity) => { properties,
cb(entity); this.rhizomeNode?.config.creator,
}); this.rhizomeNode?.config.peerId,
} );
put(entityId: string | undefined, properties: object): Entity {
const deltas: Delta[] = [];
const entity = this.updateEntity(entityId, properties, true, deltas);
debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas)); debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas));
const allIngested = new Promise<boolean>((resolve) => {
const ingestedIds = new Set<DeltaID>();
this.eventStream.on('ingested', (delta: Delta) => {
// TODO: timeout
if (deltas.map(({id}) => id).includes(delta.id)) {
ingestedIds.add(delta.id);
if (ingestedIds.size === deltas.length) {
resolve(true);
}
}
})
});
// updateEntity may have generated some deltas for us to store and publish // updateEntity may have generated some deltas for us to store and publish
deltas.forEach(async (delta: Delta) => { deltas.forEach(async (delta: Delta) => {
@ -141,30 +173,28 @@ export class Collection {
debug(`published delta ${JSON.stringify(delta)}`); debug(`published delta ${JSON.stringify(delta)}`);
// ingest the delta as though we had received it from a peer // ingest the delta as though we had received it from a peer
this.lossless.ingestDelta(delta); this.ingestDelta(delta);
}); });
return entity;
// Return updated view of this entity
// Let's wait for an event notifying us that the entity has been updated.
// This means all of our deltas have been applied.
await allIngested;
const res = this.get(entityId);
if (!res) throw new Error("could not get what we just put!");
this.eventStream.emit("update", res);
return res;
} }
get(id: string): LossyViewOne | undefined { get(id: string): LossyViewOne | undefined {
// Now with lossy view approach, instead of just returning what we already have, // Now with lossy view approach, instead of just returning what we already have,
// let's compute our view now. // let's compute our view now.
// return this.entities.get(id); // return this.entities.get(id);
const lossy = new Lossy(this.lossless); const res = this.lossy.resolve((view) => this.defaultResolver(view), [id]);
const resolver = (losslessView: LosslessViewMany) => {
const lossyView: LossyViewMany = {};
debug('lossless view', JSON.stringify(losslessView));
for (const [id, ent] of Object.entries(losslessView)) {
lossyView[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = firstValueFromLosslessViewOne(ent, key) || {};
debug(`[ ${key} ] = ${value}`);
lossyView[id].properties[key] = value;
}
}
return lossyView;
};
const res = lossy.resolve(resolver, [id]) as LossyViewMany;;
return res[id]; return res[id];
} }

30
src/delta.ts Normal file
View File

@ -0,0 +1,30 @@
import {randomUUID} from "crypto";
import {PeerAddress} from "./types";
export type DeltaID = string;
export type PointerTarget = string | number | undefined;
export type Pointer = {
localContext: string;
target: PointerTarget;
targetContext?: string;
};
export class Delta {
id: DeltaID;
receivedFrom?: PeerAddress;
creator: string;
host: string;
pointers: Pointer[] = [];
constructor(delta: Omit<Delta, "id">) {
this.id = randomUUID();
this.creator = delta.creator;
this.host = delta.host;
this.receivedFrom = delta.receivedFrom;
this.pointers = delta.pointers;
}
}
export type DeltaFilter = (delta: Delta) => boolean;

View File

@ -1,10 +1,16 @@
import Debug from 'debug'; import Debug from 'debug';
import EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import objectHash from 'object-hash'; import objectHash from 'object-hash';
import {Delta} from './delta';
import {RhizomeNode} from './node'; import {RhizomeNode} from './node';
import {Decision, Delta} from './types';
const debug = Debug('deltas'); const debug = Debug('deltas');
enum Decision {
Accept,
Reject,
Defer
};
export class DeltaStream { export class DeltaStream {
rhizomeNode: RhizomeNode; rhizomeNode: RhizomeNode;
deltaStream = new EventEmitter(); deltaStream = new EventEmitter();

View File

@ -14,12 +14,11 @@ export type EntityProperties = {
}; };
export class Entity { export class Entity {
id: string;
properties: EntityProperties = {}; properties: EntityProperties = {};
ahead = 0; ahead = 0;
constructor(id: string) { constructor(
this.id = id; readonly id: string,
} ) {}
} }

View File

@ -1,5 +1,9 @@
import { add_operation, apply } from 'json-logic-js'; import { add_operation, apply } from 'json-logic-js';
import { Delta, DeltaContext } from '../types'; import { Delta } from '../delta';
type DeltaContext = Delta & {
creatorAddress: string;
};
add_operation('in', (needle, haystack) => { add_operation('in', (needle, haystack) => {
return [...haystack].includes(needle); return [...haystack].includes(needle);

View File

@ -1,29 +1,12 @@
import Debug from "debug"; import express, {Router} from "express";
import express, {Express, Router} from "express"; import {Collection} from "src/collection";
import {Server} from "http"; import {Delta} from "src/delta";
import {Collection} from "./collection"; import {RhizomeNode} from "src/node";
import {RhizomeNode} from "./node";
import {Delta} from "./types";
import {htmlDocFromMarkdown, MDFiles} from "./util/md-files";
const debug = Debug('http-api');
export class HttpApi { export class HttpApi {
rhizomeNode: RhizomeNode; router = Router();
app: Express;
router: Router;
mdFiles = new MDFiles();
server?: Server;
constructor(rhizomeNode: RhizomeNode) { constructor(readonly rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
this.app = express();
this.router = Router();
this.app.use(express.json());
this.app.use(this.router);
}
start() {
// --------------- deltas ---------------- // --------------- deltas ----------------
// Serve list of all deltas accepted // Serve list of all deltas accepted
@ -65,48 +48,6 @@ export class HttpApi {
this.router.get("/peers/count", (_req: express.Request, res: express.Response) => { this.router.get("/peers/count", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.peers.peers.length); res.json(this.rhizomeNode.peers.peers.length);
}); });
// ----------------- html ---------------------
// Scan and watch for markdown files
this.mdFiles.readDir();
this.mdFiles.readReadme();
this.mdFiles.watchDir();
this.mdFiles.watchReadme();
// Serve README
this.router.get('/html/README', (_req: express.Request, res: express.Response) => {
const html = this.mdFiles.getReadmeHTML();
res.setHeader('content-type', 'text/html').send(html);
});
// Serve markdown files as html
this.router.get('/html/:name', (req: express.Request, res: express.Response) => {
const {name} = req.params;
let html = this.mdFiles.getHtml(name);
if (!html) {
res.status(404);
html = htmlDocFromMarkdown(`# 404 Not Found: ${name}\n\n ## [Index](/html)`);
}
res.setHeader('content-type', 'text/html');
res.send(html);
});
// Serve index
this.router.get('/html', (_req: express.Request, res: express.Response) => {
res.setHeader('content-type', 'text/html').send(this.mdFiles.indexHtml);
});
// ------------------- server ---------------------
const {httpAddr, httpPort} = this.rhizomeNode.config;
this.server = this.app.listen({
port: httpPort,
host: httpAddr,
exclusive: true
}, () => {
debug(`HTTP API bound to ${httpAddr}:${httpPort}`);
});
} }
serveCollection(collection: Collection) { serveCollection(collection: Collection) {
@ -130,26 +71,21 @@ export class HttpApi {
// Add a new domain entity // Add a new domain entity
// TODO: schema validation // TODO: schema validation
this.router.put(`/${name}`, (req: express.Request, res: express.Response) => { this.router.put(`/${name}`, async (req: express.Request, res: express.Response) => {
const {body: {id, properties}} = req; const {body: {id, properties}} = req;
const ent = collection.put(id, properties); const ent = await collection.put(id, properties);
res.json(ent); res.json(ent);
}); });
// Update a domain entity // Update a domain entity
this.router.put(`/${name}/:id`, (req: express.Request, res: express.Response) => { this.router.put(`/${name}/:id`, async (req: express.Request, res: express.Response) => {
const {body: properties, params: {id}} = req; const {body: properties, params: {id}} = req;
if (properties.id && properties.id !== id) { if (properties.id && properties.id !== id) {
res.status(400).json({error: "ID Mismatch", param: id, property: properties.id}); res.status(400).json({error: "ID Mismatch", param: id, property: properties.id});
return; return;
} }
const ent = collection.put(id, properties); const ent = await collection.put(id, properties);
res.json(ent); res.json(ent);
}); });
} }
async stop() {
this.server?.close();
this.mdFiles.close();
}
} }

42
src/http/html.ts Normal file
View File

@ -0,0 +1,42 @@
import express, {Router} from "express";
import {htmlDocFromMarkdown, MDFiles} from "../util/md-files";
export class HttpHtml {
router = Router();
mdFiles = new MDFiles();
constructor() {
// Scan and watch for markdown files
this.mdFiles.readDir();
this.mdFiles.readReadme();
this.mdFiles.watchDir();
this.mdFiles.watchReadme();
// Serve README
this.router.get('/README', (_req: express.Request, res: express.Response) => {
const html = this.mdFiles.getReadmeHTML();
res.setHeader('content-type', 'text/html').send(html);
});
// Serve markdown files as html
this.router.get('/:name', (req: express.Request, res: express.Response) => {
const {name} = req.params;
let html = this.mdFiles.getHtml(name);
if (!html) {
res.status(404);
html = htmlDocFromMarkdown(`# 404 Not Found: ${name}\n\n ## [Index](/html)`);
}
res.setHeader('content-type', 'text/html');
res.send(html);
});
// Serve index
this.router.get('/', (_req: express.Request, res: express.Response) => {
res.setHeader('content-type', 'text/html').send(this.mdFiles.indexHtml);
});
}
close() {
this.mdFiles.close();
}
}

39
src/http/index.ts Normal file
View File

@ -0,0 +1,39 @@
import Debug from "debug";
import express from "express";
import {Server} from "http";
import {RhizomeNode} from "../node";
import {HttpApi} from "./api";
import {HttpHtml} from "./html";
const debug = Debug('http-api');
export class HttpServer {
app = express();
httpHtml: HttpHtml;
httpApi: HttpApi;
server?: Server;
constructor(readonly rhizomeNode: RhizomeNode) {
this.httpHtml = new HttpHtml();
this.httpApi = new HttpApi(this.rhizomeNode);
this.app.use(express.json());
this.app.use('/html', this.httpHtml.router);
this.app.use('/api', this.httpApi.router);
}
start() {
const {httpAddr, httpPort} = this.rhizomeNode.config;
this.server = this.app.listen({
port: httpPort,
host: httpAddr,
exclusive: true
}, () => {
debug(`HTTP API bound to ${httpAddr}:${httpPort}`);
});
}
async stop() {
this.server?.close();
this.httpHtml.close();
}
}

View File

@ -2,7 +2,8 @@
// We can maintain a record of all the targeted entities, and the deltas that targeted them // We can maintain a record of all the targeted entities, and the deltas that targeted them
import Debug from 'debug'; import Debug from 'debug';
import {Delta, DeltaFilter, DomainEntityID, Properties, PropertyID, PropertyTypes} from "./types"; import {Delta, DeltaFilter} from './delta';
import {DomainEntityID, PropertyID, PropertyTypes} from "./types";
const debug = Debug('lossless'); const debug = Debug('lossless');
export type CollapsedPointer = {[key: string]: PropertyTypes}; export type CollapsedPointer = {[key: string]: PropertyTypes};
@ -65,7 +66,7 @@ class DomainEntity {
export class Lossless { export class Lossless {
domainEntities = new DomainEntityMap(); domainEntities = new DomainEntityMap();
ingestDelta(delta: Delta) { ingestDelta(delta: Delta): LosslessViewMany {
const targets = delta.pointers const targets = delta.pointers
.filter(({targetContext}) => !!targetContext) .filter(({targetContext}) => !!targetContext)
.map(({target}) => target) .map(({target}) => target)
@ -85,6 +86,8 @@ export class Lossless {
debug('after add, domain entity:', JSON.stringify(ent)); debug('after add, domain entity:', JSON.stringify(ent));
} }
return this.view(targets);
} }
//TODO: json logic -- view(deltaFilter?: FilterExpr) { //TODO: json logic -- view(deltaFilter?: FilterExpr) {

View File

@ -7,22 +7,26 @@
import Debug from 'debug'; import Debug from 'debug';
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless"; import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless";
import {DeltaFilter, DomainEntityID, Properties} from "./types"; import {DomainEntityID, Properties} from "./types";
import {DeltaFilter} from "./delta";
const debug = Debug('lossy'); const debug = Debug('lossy');
export type LossyViewOne = { export type LossyViewOne<T = Properties> = {
id: DomainEntityID; id: DomainEntityID;
properties: Properties; properties: T;
}; };
export type LossyViewMany = { export type LossyViewMany = {
[key: DomainEntityID]: LossyViewOne; [key: DomainEntityID]: LossyViewOne;
}; };
type Resolver = (losslessView: LosslessViewMany) => LossyViewMany | unknown; type Resolver<T = LosslessViewMany> = (losslessView: LosslessViewMany) => T;
// Extract a particular value from a delta's pointers // Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(delta: CollapsedDelta, key: string): string | number | undefined { export function valueFromCollapsedDelta(
delta: CollapsedDelta,
key: string
): string | number | undefined {
for (const pointer of delta.pointers) { for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) { for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) { if (k === key && (typeof value === "string" || typeof value === "number")) {
@ -33,7 +37,13 @@ export function valueFromCollapsedDelta(delta: CollapsedDelta, key: string): str
} }
// Example function for resolving a value for an entity by taking the first value we find // Example function for resolving a value for an entity by taking the first value we find
export function firstValueFromLosslessViewOne(ent: LosslessViewOne, key: string): {delta: CollapsedDelta, value: string | number} | undefined { export function firstValueFromLosslessViewOne(
ent: LosslessViewOne,
key: string
): {
delta: CollapsedDelta,
value: string | number
} | undefined {
debug(`trying to get value for ${key} from ${JSON.stringify(ent.properties[key])}`); debug(`trying to get value for ${key} from ${JSON.stringify(ent.properties[key])}`);
for (const delta of ent.properties[key] || []) { for (const delta of ent.properties[key] || []) {
const value = valueFromCollapsedDelta(delta, key); const value = valueFromCollapsedDelta(delta, key);
@ -48,8 +58,13 @@ export class Lossy {
this.lossless = lossless; this.lossless = lossless;
} }
resolve(fn: Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter) { // Using the lossless view of some given domain entities,
return fn(this.lossless.view(entityIds, deltaFilter)); // apply a filter to the deltas composing that lossless view,
// and then apply a supplied resolver function which receives
// the filtered lossless view as input.
resolve<T>(fn: Resolver<T>, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter) {
const losslessView = this.lossless.view(entityIds, deltaFilter);
return fn(losslessView);
} }
} }

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config'; import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config';
import {DeltaStream} from './deltas'; import {DeltaStream} from './deltas';
import {HttpApi} from './http-api'; import {HttpServer} from './http';
import {Peers} from './peers'; import {Peers} from './peers';
import {PubSub} from './pub-sub'; import {PubSub} from './pub-sub';
import {RequestReply} from './request-reply'; import {RequestReply} from './request-reply';
@ -28,7 +28,7 @@ export class RhizomeNode {
config: RhizomeNodeConfig; config: RhizomeNodeConfig;
pubSub: PubSub; pubSub: PubSub;
requestReply: RequestReply; requestReply: RequestReply;
httpApi: HttpApi; httpServer: HttpServer;
deltaStream: DeltaStream; deltaStream: DeltaStream;
peers: Peers; peers: Peers;
myRequestAddr: PeerAddress; myRequestAddr: PeerAddress;
@ -61,7 +61,7 @@ export class RhizomeNode {
); );
this.pubSub = new PubSub(this); this.pubSub = new PubSub(this);
this.requestReply = new RequestReply(this); this.requestReply = new RequestReply(this);
this.httpApi = new HttpApi(this); this.httpServer = new HttpServer(this);
this.deltaStream = new DeltaStream(this); this.deltaStream = new DeltaStream(this);
this.peers = new Peers(this); this.peers = new Peers(this);
} }
@ -73,7 +73,7 @@ export class RhizomeNode {
// Start HTTP server // Start HTTP server
if (this.config.httpEnable) { if (this.config.httpEnable) {
this.httpApi.start(); this.httpServer.start();
} }
// Wait a short time for sockets to initialize // Wait a short time for sockets to initialize
@ -95,6 +95,6 @@ export class RhizomeNode {
async stop() { async stop() {
await this.pubSub.stop(); await this.pubSub.stop();
await this.requestReply.stop(); await this.requestReply.stop();
await this.httpApi.stop(); await this.httpServer.stop();
} }
} }

View File

@ -4,7 +4,8 @@ import {SEED_PEERS} from "./config";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node";
import {Subscription} from './pub-sub'; import {Subscription} from './pub-sub';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply"; import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
import {Delta, PeerAddress} from "./types"; import {PeerAddress} from "./types";
import {Delta} from "./delta";
const debug = Debug('peers'); const debug = Debug('peers');
export enum RequestMethods { export enum RequestMethods {

View File

@ -1,13 +1,10 @@
import { Collection } from './collection'; import {Collection} from './collection';
import {Entity, EntityProperties} from './entity'; import {EntityProperties} from './entity';
import {LossyViewOne} from './lossy'; import {LossyViewOne} from './lossy';
import {DomainEntityID} from './types';
export class TypedCollection<T extends EntityProperties> extends Collection { export class TypedCollection<T extends EntityProperties> extends Collection {
put(id: string | undefined, properties: T): Entity { async put(id: DomainEntityID | undefined, properties: T): Promise<LossyViewOne> {
return super.put(id, properties); return super.put(id, properties);
} }
get(id: string): LossyViewOne | undefined {
return super.get(id);
}
} }

View File

@ -1,44 +1,9 @@
export type PointerTarget = string | number | undefined;
export type Pointer = {
localContext: string;
target: PointerTarget;
targetContext?: string;
};
export type Delta = {
creator: string;
host: string;
pointers: Pointer[];
receivedFrom?: PeerAddress;
}
export type DeltaContext = Delta & {
creatorAddress: string;
};
export type Query = {
filterExpr: JSON
};
export type QueryResult = {
deltas: Delta[]
};
export enum Decision {
Accept,
Reject,
Defer
};
export type JSONLogic = object; export type JSONLogic = object;
export type FilterExpr = JSONLogic; export type FilterExpr = JSONLogic;
export type FilterGenerator = () => FilterExpr; export type FilterGenerator = () => FilterExpr;
export type DeltaFilter = (delta: Delta) => boolean;
export type PropertyTypes = string | number | undefined; export type PropertyTypes = string | number | undefined;
export type DomainEntityID = string; export type DomainEntityID = string;

View File

@ -13,6 +13,6 @@
"skipLibCheck": true, "skipLibCheck": true,
"forceConsistentCasingInFileNames": true "forceConsistentCasingInFileNames": true
}, },
"include": ["src/**/*"], "include": ["src/**/*", "examples/**/*"],
"exclude": ["node_modules"] "exclude": ["node_modules"]
} }

View File

@ -29,7 +29,7 @@ export class App extends RhizomeNode {
users.rhizomeConnect(this); users.rhizomeConnect(this);
const {httpAddr, httpPort} = this.config; const {httpAddr, httpPort} = this.config;
this.apiUrl = `http://${httpAddr}:${httpPort}`; this.apiUrl = `http://${httpAddr}:${httpPort}/api`;
} }
} }