110 lines
3.7 KiB
JavaScript
110 lines
3.7 KiB
JavaScript
import { Actor } from './actor.js';
|
|
import { Action } from './action.js';
|
|
import {
|
|
Message, PostMessage, PeerMessage, messageFromJSON,
|
|
} from './message.js';
|
|
import { CryptoUtil } from './crypto.js';
|
|
import { ForumView } from './forum-view.js';
|
|
import { PrioritizedQueue } from './prioritized-queue.js';
|
|
|
|
export class ForumNode extends Actor {
|
|
constructor(name, scene) {
|
|
super(name, scene);
|
|
this.forumView = new ForumView();
|
|
this.queue = new PrioritizedQueue();
|
|
this.actions = {
|
|
storePost: new Action('store post', scene),
|
|
peerMessage: new Action('peer message', scene),
|
|
};
|
|
}
|
|
|
|
// Generate a signing key pair and connect to the network
|
|
async initialize(forumNetwork) {
|
|
this.keyPair = await CryptoUtil.generateAsymmetricKey();
|
|
this.forumNetwork = forumNetwork.addNode(this);
|
|
this.status.set('Initialized');
|
|
return this;
|
|
}
|
|
|
|
// Send a message to all other nodes in the network
|
|
async broadcast(message) {
|
|
await message.sign(this.keyPair);
|
|
const otherForumNodes = this.forumNetwork
|
|
.listNodes()
|
|
.filter((forumNode) => forumNode.keyPair.publicKey !== this.keyPair.publicKey);
|
|
for (const forumNode of otherForumNodes) {
|
|
// For now just call receiveMessage on the target node
|
|
this.actions.peerMessage.log(this, forumNode, null, message.content);
|
|
await forumNode.receiveMessage(JSON.stringify(message.toJSON()));
|
|
}
|
|
}
|
|
|
|
// Perform minimal processing to ingest a message.
|
|
// Enqueue it for further processing.
|
|
async receiveMessage(messageStr) {
|
|
const messageJson = JSON.parse(messageStr);
|
|
const senderReputation = this.forumView.getReputation(messageJson.publicKey) || 0;
|
|
this.queue.add(messageJson, senderReputation);
|
|
}
|
|
|
|
// Process next highest priority message in the queue
|
|
async processNextMessage() {
|
|
const messageJson = this.queue.pop();
|
|
if (!messageJson) {
|
|
return null;
|
|
}
|
|
return this.processMessage(messageJson);
|
|
}
|
|
|
|
// Process a message from the queue
|
|
async processMessage(messageJson) {
|
|
try {
|
|
await Message.verify(messageJson);
|
|
} catch (e) {
|
|
this.actions.processMessage.log(this, this, 'invalid signature', messageJson, '-x');
|
|
console.log(`${this.name}: received message with invalid signature`);
|
|
return;
|
|
}
|
|
|
|
const { publicKey } = messageJson;
|
|
const message = messageFromJSON(messageJson);
|
|
console.log(`${this.name}: processMessage`, message);
|
|
|
|
if (message instanceof PostMessage) {
|
|
await this.processPostMessage(publicKey, message.content);
|
|
} else if (message instanceof PeerMessage) {
|
|
await this.processPeerMessage(publicKey, message.content);
|
|
} else {
|
|
// Unknown message type
|
|
// Penalize sender for wasting our time
|
|
console.log(`${this.name}: penalizing sender for unknown message type ${message.type}`);
|
|
this.forumView.incrementReputation(message.publicKey, -1);
|
|
}
|
|
}
|
|
|
|
// Process an incoming post, received by whatever means
|
|
processPost(authorId, post, stake) {
|
|
this.actions.storePost.log(this, this, null, { authorId, post, stake });
|
|
this.forumView.addPost(authorId, post.id, post, stake);
|
|
}
|
|
|
|
// Process a post we received in a message
|
|
async processPostMessage(authorId, { post, stake }) {
|
|
this.processPost(authorId, post, stake);
|
|
await this.broadcast(
|
|
new PeerMessage({
|
|
posts: [{ authorId, post, stake }],
|
|
}),
|
|
);
|
|
}
|
|
|
|
// Process a message we receive from a peer
|
|
async processPeerMessage(peerId, { posts }) {
|
|
// We are trusting that the peer verified the signatures of the posts they're forwarding.
|
|
// We could instead have the peer forward the signed messages and re-verify them.
|
|
for (const { authorId, post, stake } of posts) {
|
|
this.processPost(authorId, post, stake);
|
|
}
|
|
}
|
|
}
|