deduplicate deltas

This commit is contained in:
Ladd Hoffman 2024-12-22 14:38:01 -06:00
parent 7efb9024ec
commit cbec1b5afb
5 changed files with 60 additions and 18 deletions

17
package-lock.json generated
View File

@ -11,9 +11,11 @@
"dependencies": { "dependencies": {
"@types/bluebird": "^3.5.42", "@types/bluebird": "^3.5.42",
"@types/json-logic-js": "^2.0.8", "@types/json-logic-js": "^2.0.8",
"@types/object-hash": "^3.0.6",
"express": "^4.21.2", "express": "^4.21.2",
"json-logic-js": "^2.0.5", "json-logic-js": "^2.0.5",
"level": "^9.0.0", "level": "^9.0.0",
"object-hash": "^3.0.0",
"zeromq": "^6.1.2" "zeromq": "^6.1.2"
}, },
"devDependencies": { "devDependencies": {
@ -1707,6 +1709,12 @@
"undici-types": "~6.20.0" "undici-types": "~6.20.0"
} }
}, },
"node_modules/@types/object-hash": {
"version": "3.0.6",
"resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-3.0.6.tgz",
"integrity": "sha512-fOBV8C1FIu2ELinoILQ+ApxcUKz4ngq+IWUYrxSGjXzzjUALijilampwkMgEtJ+h2njAW3pi853QpzNVCHB73w==",
"license": "MIT"
},
"node_modules/@types/qs": { "node_modules/@types/qs": {
"version": "6.9.17", "version": "6.9.17",
"resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.17.tgz", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.17.tgz",
@ -7338,6 +7346,15 @@
"node": "^12.13.0 || ^14.15.0 || >=16.0.0" "node": "^12.13.0 || ^14.15.0 || >=16.0.0"
} }
}, },
"node_modules/object-hash": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/object-hash/-/object-hash-3.0.0.tgz",
"integrity": "sha512-RSn9F68PjH9HqtltsSnqYC1XXoWe9Bju5+213R98cNGttag9q9yAOTzdbsqvIa7aNm5WffBZFpWYr2aWrklWAw==",
"license": "MIT",
"engines": {
"node": ">= 6"
}
},
"node_modules/object-inspect": { "node_modules/object-inspect": {
"version": "1.13.3", "version": "1.13.3",
"resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.3.tgz", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.3.tgz",

View File

@ -16,9 +16,11 @@
"dependencies": { "dependencies": {
"@types/bluebird": "^3.5.42", "@types/bluebird": "^3.5.42",
"@types/json-logic-js": "^2.0.8", "@types/json-logic-js": "^2.0.8",
"@types/object-hash": "^3.0.6",
"express": "^4.21.2", "express": "^4.21.2",
"json-logic-js": "^2.0.5", "json-logic-js": "^2.0.5",
"level": "^9.0.0", "level": "^9.0.0",
"object-hash": "^3.0.0",
"zeromq": "^6.1.2" "zeromq": "^6.1.2"
}, },
"devDependencies": { "devDependencies": {

View File

@ -3,6 +3,7 @@ import {REQUEST_BIND_HOST, REQUEST_BIND_PORT} from './config';
import {publishSock, subscribeSock} from './pub-sub'; import {publishSock, subscribeSock} from './pub-sub';
import {Decision, Delta, PeerAddress} from './types'; import {Decision, Delta, PeerAddress} from './types';
import {myRequestAddr} from './peers'; import {myRequestAddr} from './peers';
import objectHash from 'object-hash';
export const deltaStream = new EventEmitter(); export const deltaStream = new EventEmitter();
@ -11,12 +12,19 @@ export const deltasAccepted: Delta[] = [];
export const deltasRejected: Delta[] = []; export const deltasRejected: Delta[] = [];
export const deltasDeferred: Delta[] = []; export const deltasDeferred: Delta[] = [];
export const hashesReceived = new Set<string>();
export function applyPolicy(delta: Delta): Decision { export function applyPolicy(delta: Delta): Decision {
return !!delta && Decision.Accept; return !!delta && Decision.Accept;
} }
export function receiveDelta(delta: Delta) { export function receiveDelta(delta: Delta) {
deltasProposed.push(delta); // Deduplication: if we already received this delta, disregard it
const hash = objectHash(delta);
if (!hashesReceived.has(hash)) {
hashesReceived.add(hash);
deltasProposed.push(delta);
}
} }
export function ingestDelta(delta: Delta) { export function ingestDelta(delta: Delta) {

View File

@ -60,14 +60,11 @@ class Users {
}); });
app.get("/deltas/count", (req: express.Request, res: express.Response) => { app.get("/deltas/count", (req: express.Request, res: express.Response) => {
// TODO: streaming
res.json(deltasAccepted.length); res.json(deltasAccepted.length);
}); });
app.get("/peers", (req: express.Request, res: express.Response) => { app.get("/peers", (req: express.Request, res: express.Response) => {
res.json(peers.map(({reqAddr, publishAddr}) => { res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
const isSeedPeer = !!SEED_PEERS.find(({addr, port}) =>
addr === reqAddr.addr && port === reqAddr.port);
const deltasAcceptedCount = deltasAccepted const deltasAcceptedCount = deltasAccepted
.filter((delta: Delta) => { .filter((delta: Delta) => {
return delta.receivedFrom?.addr == reqAddr.addr && return delta.receivedFrom?.addr == reqAddr.addr &&
@ -77,6 +74,7 @@ class Users {
const peerInfo = { const peerInfo = {
reqAddr: reqAddr.toAddrString(), reqAddr: reqAddr.toAddrString(),
publishAddr: publishAddr?.toAddrString(), publishAddr: publishAddr?.toAddrString(),
isSelf,
isSeedPeer, isSeedPeer,
deltaCount: { deltaCount: {
accepted: deltasAcceptedCount accepted: deltasAcceptedCount
@ -86,6 +84,10 @@ class Users {
})); }));
}); });
app.get("/peers/count", (req: express.Request, res: express.Response) => {
res.json(peers.length);
});
if (HTTP_API_ENABLE) { if (HTTP_API_ENABLE) {
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
@ -123,9 +125,13 @@ class Users {
const result = users.getOne(taliesin.id); const result = users.getOne(taliesin.id);
const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin); const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin);
console.log(`Result ${matches ? 'matches' : 'does not match'} expected.` + if (matches) {
`\n\nExpected \n${JSON.stringify(taliesin)}` + console.log('Result matches expected: ' + JSON.stringify(taliesin));
`\nReceived\n${JSON.stringify(result)}`); } else {
console.log(`Result does not match expected.` +
`\n\nExpected \n${JSON.stringify(taliesin)}` +
`\nReceived\n${JSON.stringify(result)}`);
}
})(); })();

View File

@ -34,9 +34,14 @@ class Peer {
reqAddr: PeerAddress; reqAddr: PeerAddress;
reqSock: RequestSocket; reqSock: RequestSocket;
publishAddr: PeerAddress | undefined; publishAddr: PeerAddress | undefined;
isSelf: boolean;
isSeedPeer: boolean;
constructor(addr: string, port: number) { constructor(addr: string, port: number) {
this.reqAddr = new PeerAddress(addr, port); this.reqAddr = new PeerAddress(addr, port);
this.reqSock = new RequestSocket(addr, port); this.reqSock = new RequestSocket(addr, port);
this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port;
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) =>
addr === seedPeer.addr && port === seedPeer.port);
} }
async subscribe() { async subscribe() {
if (!this.publishAddr) { if (!this.publishAddr) {
@ -61,6 +66,8 @@ class Peer {
export const peers: Peer[] = []; export const peers: Peer[] = [];
peers.push(new Peer(myRequestAddr.addr, myRequestAddr.port));
function newPeer(addr: string, port: number) { function newPeer(addr: string, port: number) {
const peer = new Peer(addr, port); const peer = new Peer(addr, port);
peers.push(peer); peers.push(peer);
@ -77,14 +84,16 @@ export async function subscribeToSeeds() {
//! TODO Expect abysmal scaling properties with this function //! TODO Expect abysmal scaling properties with this function
export async function askAllPeersForDeltas() { export async function askAllPeersForDeltas() {
peers.forEach(async (peer, idx) => { peers
console.log(`Asking peer ${idx} for deltas`); .filter(({isSelf}) => !isSelf)
const deltas = await peer.askForDeltas(); .forEach(async (peer, idx) => {
console.log(`received ${deltas.length}`); console.log(`Asking peer ${idx} for deltas`);
for (const delta of deltas) { const deltas = await peer.askForDeltas();
delta.receivedFrom = peer.reqAddr; console.log(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
receiveDelta(delta); for (const delta of deltas) {
} delta.receivedFrom = peer.reqAddr;
ingestAll(); receiveDelta(delta);
}); }
ingestAll();
});
} }