From cbec1b5afb5d2cdfc18e4ee47c0f9573220a81cd Mon Sep 17 00:00:00 2001 From: Ladd Date: Sun, 22 Dec 2024 14:38:01 -0600 Subject: [PATCH] deduplicate deltas --- package-lock.json | 17 +++++++++++++++++ package.json | 2 ++ src/deltas.ts | 10 +++++++++- src/example-app.ts | 20 +++++++++++++------- src/peers.ts | 29 +++++++++++++++++++---------- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index c6fc4a7..4a8f052 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,9 +11,11 @@ "dependencies": { "@types/bluebird": "^3.5.42", "@types/json-logic-js": "^2.0.8", + "@types/object-hash": "^3.0.6", "express": "^4.21.2", "json-logic-js": "^2.0.5", "level": "^9.0.0", + "object-hash": "^3.0.0", "zeromq": "^6.1.2" }, "devDependencies": { @@ -1707,6 +1709,12 @@ "undici-types": "~6.20.0" } }, + "node_modules/@types/object-hash": { + "version": "3.0.6", + "resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-3.0.6.tgz", + "integrity": "sha512-fOBV8C1FIu2ELinoILQ+ApxcUKz4ngq+IWUYrxSGjXzzjUALijilampwkMgEtJ+h2njAW3pi853QpzNVCHB73w==", + "license": "MIT" + }, "node_modules/@types/qs": { "version": "6.9.17", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.17.tgz", @@ -7338,6 +7346,15 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/object-hash": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/object-hash/-/object-hash-3.0.0.tgz", + "integrity": "sha512-RSn9F68PjH9HqtltsSnqYC1XXoWe9Bju5+213R98cNGttag9q9yAOTzdbsqvIa7aNm5WffBZFpWYr2aWrklWAw==", + "license": "MIT", + "engines": { + "node": ">= 6" + } + }, "node_modules/object-inspect": { "version": "1.13.3", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.3.tgz", diff --git a/package.json b/package.json index cd1874c..bb056bb 100644 --- a/package.json +++ b/package.json @@ -16,9 +16,11 @@ "dependencies": { "@types/bluebird": "^3.5.42", "@types/json-logic-js": "^2.0.8", + "@types/object-hash": "^3.0.6", "express": "^4.21.2", "json-logic-js": "^2.0.5", "level": "^9.0.0", + "object-hash": "^3.0.0", "zeromq": "^6.1.2" }, "devDependencies": { diff --git a/src/deltas.ts b/src/deltas.ts index f20d4ac..39bf8fd 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -3,6 +3,7 @@ import {REQUEST_BIND_HOST, REQUEST_BIND_PORT} from './config'; import {publishSock, subscribeSock} from './pub-sub'; import {Decision, Delta, PeerAddress} from './types'; import {myRequestAddr} from './peers'; +import objectHash from 'object-hash'; export const deltaStream = new EventEmitter(); @@ -11,12 +12,19 @@ export const deltasAccepted: Delta[] = []; export const deltasRejected: Delta[] = []; export const deltasDeferred: Delta[] = []; +export const hashesReceived = new Set(); + export function applyPolicy(delta: Delta): Decision { return !!delta && Decision.Accept; } export function receiveDelta(delta: Delta) { - deltasProposed.push(delta); + // Deduplication: if we already received this delta, disregard it + const hash = objectHash(delta); + if (!hashesReceived.has(hash)) { + hashesReceived.add(hash); + deltasProposed.push(delta); + } } export function ingestDelta(delta: Delta) { diff --git a/src/example-app.ts b/src/example-app.ts index 26860dd..5caee79 100644 --- a/src/example-app.ts +++ b/src/example-app.ts @@ -60,14 +60,11 @@ class Users { }); app.get("/deltas/count", (req: express.Request, res: express.Response) => { - // TODO: streaming res.json(deltasAccepted.length); }); app.get("/peers", (req: express.Request, res: express.Response) => { - res.json(peers.map(({reqAddr, publishAddr}) => { - const isSeedPeer = !!SEED_PEERS.find(({addr, port}) => - addr === reqAddr.addr && port === reqAddr.port); + res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { const deltasAcceptedCount = deltasAccepted .filter((delta: Delta) => { return delta.receivedFrom?.addr == reqAddr.addr && @@ -77,6 +74,7 @@ class Users { const peerInfo = { reqAddr: reqAddr.toAddrString(), publishAddr: publishAddr?.toAddrString(), + isSelf, isSeedPeer, deltaCount: { accepted: deltasAcceptedCount @@ -86,6 +84,10 @@ class Users { })); }); + app.get("/peers/count", (req: express.Request, res: express.Response) => { + res.json(peers.length); + }); + if (HTTP_API_ENABLE) { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); @@ -123,9 +125,13 @@ class Users { const result = users.getOne(taliesin.id); const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin); - console.log(`Result ${matches ? 'matches' : 'does not match'} expected.` + - `\n\nExpected \n${JSON.stringify(taliesin)}` + - `\nReceived\n${JSON.stringify(result)}`); + if (matches) { + console.log('Result matches expected: ' + JSON.stringify(taliesin)); + } else { + console.log(`Result does not match expected.` + + `\n\nExpected \n${JSON.stringify(taliesin)}` + + `\nReceived\n${JSON.stringify(result)}`); + } })(); diff --git a/src/peers.ts b/src/peers.ts index d26d1b5..3986667 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -34,9 +34,14 @@ class Peer { reqAddr: PeerAddress; reqSock: RequestSocket; publishAddr: PeerAddress | undefined; + isSelf: boolean; + isSeedPeer: boolean; constructor(addr: string, port: number) { this.reqAddr = new PeerAddress(addr, port); this.reqSock = new RequestSocket(addr, port); + this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port; + this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => + addr === seedPeer.addr && port === seedPeer.port); } async subscribe() { if (!this.publishAddr) { @@ -61,6 +66,8 @@ class Peer { export const peers: Peer[] = []; +peers.push(new Peer(myRequestAddr.addr, myRequestAddr.port)); + function newPeer(addr: string, port: number) { const peer = new Peer(addr, port); peers.push(peer); @@ -77,14 +84,16 @@ export async function subscribeToSeeds() { //! TODO Expect abysmal scaling properties with this function export async function askAllPeersForDeltas() { - peers.forEach(async (peer, idx) => { - console.log(`Asking peer ${idx} for deltas`); - const deltas = await peer.askForDeltas(); - console.log(`received ${deltas.length}`); - for (const delta of deltas) { - delta.receivedFrom = peer.reqAddr; - receiveDelta(delta); - } - ingestAll(); - }); + peers + .filter(({isSelf}) => !isSelf) + .forEach(async (peer, idx) => { + console.log(`Asking peer ${idx} for deltas`); + const deltas = await peer.askForDeltas(); + console.log(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); + for (const delta of deltas) { + delta.receivedFrom = peer.reqAddr; + receiveDelta(delta); + } + ingestAll(); + }); }