Compare commits

...

2 Commits

Author SHA1 Message Date
Ladd Hoffman d3f4740422 refactored rollup for code clarity
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 35s Details
2024-05-03 14:18:20 -05:00
Ladd Hoffman 69c869a693 pause matrix outbound queue until target room id is set 2024-05-03 13:19:20 -05:00
17 changed files with 570 additions and 443 deletions

View File

@ -5,13 +5,15 @@ const {
appState,
proposalEventIds,
} = require('../util/db');
const { submitRollup, resetBatch } = require('./rollup');
const { submitRollup } = require('./rollup');
const { resetBatchItems } = require('./rollup/batch-items');
const {
BOT_INSTANCE_ID,
ETH_NETWORK,
} = process.env;
// TODO: Refactor into separate files
const handleCommand = async (client, roomId, event) => {
// Don't handle unhelpful events (ones that aren't text messages, are redacted, or sent by us)
if (event.content?.msgtype !== 'm.text') return;
@ -43,6 +45,7 @@ const handleCommand = async (client, roomId, event) => {
try {
const proposalEventId = await proposalEventIds.get(proposalIndex);
const proposalEventUri = `https://matrix.to/#/${roomId}/${proposalEventId}`;
// TODO: Send HTML message
const content = {
body: `Proposal ${proposalIndex}: ${proposalEventUri}`,
msgtype: 'm.text',
@ -67,7 +70,7 @@ const handleCommand = async (client, roomId, event) => {
console.log(`!resetBatch roomId ${roomId} instanceId ${instanceId}`);
if (instanceId === BOT_INSTANCE_ID) {
console.log('!resetBatch');
const batchItems = await resetBatch();
const batchItems = await resetBatchItems();
await client.replyText(roomId, event, `Reset batch, now contains ${batchItems.length} items`);
}
}

View File

@ -25,6 +25,7 @@ const start = () => {
console.log('post.content:', post.content);
// Send matrix room event
// TODO: Send HTML message
let message = `Proposal ${proposalIndex}\n\n${post.content}`;
if (post.embeddedData && Object.entries(post.embeddedData).length) {
message += `\n\n${JSON.stringify(post.embeddedData, null, 2)}`;

View File

@ -1,424 +0,0 @@
const Promise = require('bluebird');
const { v4: uuidv4 } = require('uuid');
const { isEqual } = require('lodash');
const { registerDecider } = require('./validation-pools');
const { registerMatrixEventHandler, sendMatrixEvent, sendMatrixText } = require('../matrix-bot');
const { matrixPools, matrixUserToAuthorAddress, applicationData } = require('../util/db');
const {
rollup, wallet, work2, dao,
} = require('../util/contracts');
const read = require('../util/forum/read');
const write = require('../util/forum/write');
const addPostWithRetry = require('../util/add-post-with-retry');
const callWithRetry = require('../util/call-contract-method-with-retry');
const {
ROLLUP_BATCH_SIZE,
ROLLUP_AVAILABILITY_STAKE_DURATION,
} = process.env;
const rollupBatchSize = ROLLUP_BATCH_SIZE || 10;
const availabilityStakeDuration = ROLLUP_AVAILABILITY_STAKE_DURATION || 600;
let batchWorker;
let batchItems;
const stakeRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
if (currentRep) {
await callWithRetry(() => dao.stakeAvailability(
rollup.target,
currentRep,
availabilityStakeDuration,
));
}
};
const getBatchPostAuthorWeights = async (batchItems_) => {
const weights = {};
await Promise.each(batchItems_, async (postId) => {
const post = await read(postId);
const matrixPool = await matrixPools.get(postId);
const { fee, result: { votePasses, quorumMet } } = matrixPool;
post.authors.forEach(({ authorAddress, weightPPM }) => {
weights[authorAddress] = weights[authorAddress] ?? 0;
if (votePasses && quorumMet) {
// scale by matrix pool outcome and strength
weights[authorAddress] += weightPPM * fee;
}
// TODO: Rewards for policing
// TODO: Propagation via references
});
});
// Rescale author weights so they sum to 1000000
const sumOfWeights = Object.values(weights).reduce((t, v) => t + v, 0);
const scaledWeights = Object.values(weights)
.map((weight) => Math.floor((weight * 1000000) / sumOfWeights));
const sumOfScaledWeights = Object.values(scaledWeights).reduce((t, v) => t + v, 0);
scaledWeights[0] += 1000000 - sumOfScaledWeights;
const authors = Object.keys(weights)
.map((authorAddress, i) => ({ authorAddress, weightPPM: scaledWeights[i] }));
return authors;
};
const submitRollup = async () => {
if (!batchItems.length) {
return { batchItems: [] };
}
const authors = await getBatchPostAuthorWeights(batchItems);
// TODO: Compute citations as aggregate of the citations of posts in the batch
const citations = [];
const content = `Batch of ${batchItems.length} items`;
const embeddedData = {
batchItems,
nonce: uuidv4().replaceAll('-', ''),
};
const sender = await wallet.getAddress();
const contentToVerify = `${content}\n\n${JSON.stringify(embeddedData, null, 2)}`;
const signature = await wallet.signMessage(contentToVerify);
// Write to the forum database
const { hash: batchPostId } = await write({
sender, authors, citations, content, embeddedData, signature,
});
// Add rollup post on-chain
await addPostWithRetry(authors, batchPostId, citations);
// Stake our availability to be the next rollup worker
await stakeRollupAvailability();
// Call Rollup.submitBatch
console.log('Submitting batch', { batchPostId, batchItems, authors });
const poolDuration = 60;
await callWithRetry(() => rollup.submitBatch(batchPostId, batchItems, poolDuration));
// Send matrix event
await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors });
// Clear the batch in preparation for next batch
batchItems = [];
await applicationData.put('batchItems', batchItems);
return {
batchPostId,
batchItems,
authors,
};
};
const evaluateMatrixPoolOutcome = async (postId, { dryRun } = {}) => {
const matrixPool = await matrixPools.get(postId);
// This should already contain all the info we need to evaluate the outcome
const { stakes, quorum, winRatio } = matrixPool;
const stakedFor = stakes
.filter((x) => x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const stakedAgainst = stakes
.filter((x) => !x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const votePasses = stakedFor * winRatio[1] >= (stakedFor + stakedAgainst) * winRatio[0];
const totalSupply = Number(await dao.totalSupply());
const quorumMet = (stakedFor + stakedAgainst) * quorum[1] >= totalSupply * quorum[0];
const result = {
stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet,
};
if (!dryRun) {
console.log(`Matrix pool for post ${postId} outcome evaluated`, result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', { postId, result });
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
let submitBatch = false;
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
console.log('There is no batch worker assigned. Staking availability and submitting first batch.');
submitBatch = true;
} else if (batchWorker === await wallet.getAddress()) {
// If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
if (batchItems.length === rollupBatchSize) {
console.log(`Batch size = ${batchItems.length}. Submitting batch.`);
submitBatch = true;
}
}
if (submitBatch) {
await stakeRollupAvailability();
await submitRollup();
}
}
return result;
};
const authorsMatch = async (authors, expectedAuthors) => {
if (expectedAuthors.length !== authors.length) return false;
return authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
};
const validateWorkEvidence = async (sender, post) => {
let valid = false;
if (sender === work2.target) {
const expectedContent = 'This is a work evidence post';
valid = post.content.startsWith(expectedContent);
}
console.log(`Work evidence ${valid ? 'matched' : 'did not match'} the expected content`);
return valid;
};
const validatePost = async ({
sender, post, postId, roomId, eventId, ...params
}) => {
const currentRep = Number(await dao.balanceOf(await wallet.getAddress()));
const valid = await validateWorkEvidence(sender, post);
const stake = { amount: currentRep, account: await wallet.getAddress(), inFavor: valid };
sendMatrixEvent('io.dgov.pool.stake', { postId, amount: currentRep, inFavor: valid });
const matrixPool = {
postId,
roomId,
eventId,
...params,
stakes: [stake],
};
console.log('matrixPool', matrixPool);
await matrixPools.put(postId, matrixPool);
};
const initiateMatrixPool = async (postId, post, sender, fee) => {
const duration = 20;
const quorum = [1, 3];
const winRatio = [1, 2];
const params = {
sender,
fee: Number(fee),
duration,
quorum,
winRatio,
};
console.log('sending matrix pool start event');
const { roomId, eventId } = await sendMatrixEvent('io.dgov.pool.start', {
postId,
...params,
});
console.log('sent matrix pool start event');
// Register our own stake and send a message
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
// Since we're assuming responsibility as the batch worker,
// set a timeout to evaulate the outcome
setTimeout(() => evaluateMatrixPoolOutcome(postId), duration * 1000);
};
const resetBatch = async () => {
batchItems = [];
// Read from Rollup.items
const itemCount = await rollup.itemCount();
const promises = [];
for (let i = 0; i < itemCount; i += 1) {
promises.push(rollup.items(i));
}
const batchItemsInfo = await Promise.all(promises);
batchItems = batchItemsInfo.map((x) => x.postId);
await applicationData.put('batchItems', batchItems);
// Make sure there's a matrix pool for each batch item.
// If there's not, then let's start one.
await Promise.each(batchItemsInfo, async ({ postId, sender, fee }) => {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
try {
await matrixPools.get(postId);
} catch (e) {
await initiateMatrixPool(postId, post, sender, fee);
}
});
return batchItems;
};
const start = async () => {
console.log('registering validation pool decider for rollup');
registerDecider(async (pool, post) => {
// If this is not sent by the work1 contract, it's not of interest here.
if (pool.sender !== rollup.target) return false;
// A rollup post should contain
// - a list of off-chain validation pools
// - authorship corresponding to the result of those off-chain pools
if (!post.embeddedData?.batchItems) return false;
// Our task here is to check whether the posted result agrees with our own computations
let expectedAuthors;
try {
expectedAuthors = await getBatchPostAuthorWeights(post.embeddedData.batchItems);
} catch (e) {
console.error('Error calculating batch post author weights', e);
return false;
}
const valid = authorsMatch(post.authors, expectedAuthors);
console.log(`batch post ${pool.props.postId} is ${valid ? 'valid' : 'invalid'}`);
return valid;
});
// Even if we're not the current batch worker, keep track of batch items
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
// Check for an assigned batch worker
batchWorker = await rollup.batchWorker();
console.log('At startup, batch worker:', batchWorker);
// Stake availability and set an interval to maintain it
await stakeRollupAvailability();
setInterval(stakeRollupAvailability, availabilityStakeDuration * 1000);
rollup.on('BatchWorkerAssigned', async (batchWorker_) => {
batchWorker = batchWorker_;
console.log('Batch worker assigned:', batchWorker);
if (batchWorker === await wallet.getAddress()) {
console.log('This instance is the new batch worker');
}
});
/// `sender` is the address that called Rollup.addItem on chain, i.e. the Work2 contract.
rollup.on('BatchItemAdded', async (postId, sender, fee) => {
// If we are the batch worker or there is no batch worker, initiate a matrix pool
if (batchWorker === await wallet.getAddress()
|| batchWorker === '0x0000000000000000000000000000000000000000') {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
// Initialize a matrix pool
try {
await matrixPools.get(postId);
// If this doesn't throw, it means we or someone else already sent this event
console.log(`Matrix pool start event has already been sent for postId ${postId}`);
} catch (e) {
if (e.status === 404) {
await initiateMatrixPool(postId, post, sender, fee);
} else {
throw e;
}
}
}
});
registerMatrixEventHandler(async (client, roomId, event) => {
switch (event.type) {
case 'io.dgov.pool.start': {
// Note that matrix pools are identified by the postId to which they pertain.
// This means that for a given post there can only be one matrix pool at a time.
const { postId, sender, ...params } = event.content;
// We can use LevelDB to store information about validation pools
const eventId = event.event_id;
console.log('Matrix pool started', { postId, ...params });
// Validate the target post, and stake for/against
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
break;
}
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
break;
}
case 'io.dgov.pool.stake': {
const { postId, amount, inFavor } = event.content;
let account;
try {
account = await matrixUserToAuthorAddress(event.sender);
} catch (e) {
// Error, sender has not registered their matrix identity
sendMatrixText(`Matrix user ${event.sender} has not registered their wallet address`);
break;
}
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}. Stake sent by ${event.sender}`);
break;
}
const stake = { account, amount, inFavor };
matrixPool.stakes = matrixPool.stakes ?? [];
matrixPool.stakes.push(stake);
await matrixPools.put(postId, matrixPool);
console.log(`registered stake in matrix pool for post ${postId} by ${account}`);
break;
}
case 'io.dgov.pool.result': {
// This should be sent by the current batch worker
// const { stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet, } = result;
const { postId, result } = event.content;
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received result for unknown matrix pool, for post ${postId}. Result sent by ${event.sender}`);
break;
}
// Compare batch worker's result with ours to verify and provide early warning
const expectedResult = await evaluateMatrixPoolOutcome(postId, { dryRun: true });
if (!isEqual(result, expectedResult)) {
sendMatrixText(`Unexpected result for matrix pool, for post ${postId}. Result sent by ${event.sender}\n\n`
+ `received ${JSON.stringify(result)}\n`
+ `expected ${JSON.stringify(expectedResult)}`);
}
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
break;
}
case 'io.dgov.rollup.submit': {
// This should include the identifier of the on-chain validation pool
const { batchPostId, batchItems: batchItems_, authors } = event.content;
let batchPostIds;
try {
batchPostIds = await applicationData.get('batchPostIds');
} catch (e) {
batchPostIds = [];
}
batchPostIds.push(batchPostId);
await applicationData.put('batchPostIds', batchPostIds);
// Compare batch worker's result with ours to verify
const expectedAuthors = await getBatchPostAuthorWeights(batchItems_);
if (!authorsMatch(authors, expectedAuthors)) {
sendMatrixText(`Unexpected result for batch post ${batchPostId}`);
}
// Reset batchItems in preparation for next batch
const nextBatchItems = batchItems.filter((postId) => !batchPostIds.includes(postId));
batchItems = nextBatchItems;
await applicationData.put('batchItems', batchItems);
break;
}
default:
}
});
};
module.exports = {
start,
submitRollup,
resetBatch,
};

View File

@ -0,0 +1,65 @@
const { rollup } = require('../../util/contracts');
const { applicationData, matrixPools } = require('../../util/db');
const read = require('../../util/forum/read');
const { initiateMatrixPool } = require('./matrix-pools/initiate');
let batchItems;
const initializeBatchItems = async () => {
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
};
const getBatchItems = () => batchItems;
const addBatchItem = async (postId) => {
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
};
const clearBatchItems = async (itemsToClear) => {
batchItems = batchItems.filter((item) => !itemsToClear.includes(item));
await applicationData.put('batchItems', batchItems);
};
const resetBatchItems = async () => {
batchItems = [];
// Read from Rollup.items
const itemCount = await rollup.itemCount();
const promises = [];
for (let i = 0; i < itemCount; i += 1) {
promises.push(rollup.items(i));
}
const batchItemsInfo = await Promise.all(promises);
batchItems = batchItemsInfo.map((x) => x.postId);
await applicationData.put('batchItems', batchItems);
// Make sure there's a matrix pool for each batch item.
// If there's not, then let's start one.
await Promise.each(batchItemsInfo, async ({ postId, sender, fee }) => {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
try {
await matrixPools.get(postId);
} catch (e) {
await initiateMatrixPool(postId, post, sender, fee);
}
});
return batchItems;
};
module.exports = {
initializeBatchItems,
getBatchItems,
addBatchItem,
clearBatchItems,
resetBatchItems,
};

View File

@ -0,0 +1,29 @@
const { rollup, wallet } = require('../../util/contracts');
let batchWorker;
const getCurrentBatchWorker = () => batchWorker;
const initializeBatchWorker = async () => {
batchWorker = await rollup.batchWorker();
console.log('At startup, batch worker:', batchWorker);
rollup.on('BatchWorkerAssigned', async (batchWorker_) => {
batchWorker = batchWorker_;
console.log('Batch worker assigned:', batchWorker);
if (batchWorker === await wallet.getAddress()) {
console.log('This instance is the new batch worker');
}
});
};
const setBatchWorker = (batchWorker_) => {
batchWorker = batchWorker_;
};
module.exports = {
getCurrentBatchWorker,
initializeBatchWorker,
setBatchWorker,
};

View File

@ -0,0 +1,31 @@
const read = require('../../util/forum/read');
const { matrixPools } = require('../../util/db');
const computeAuthorWeights = async (batchItems_) => {
const weights = {};
await Promise.each(batchItems_, async (postId) => {
const post = await read(postId);
const matrixPool = await matrixPools.get(postId);
const { fee, result: { votePasses, quorumMet } } = matrixPool;
post.authors.forEach(({ authorAddress, weightPPM }) => {
weights[authorAddress] = weights[authorAddress] ?? 0;
if (votePasses && quorumMet) {
// scale by matrix pool outcome and strength
weights[authorAddress] += weightPPM * fee;
}
// TODO: Rewards for policing
// TODO: Propagation via references
});
});
// Rescale author weights so they sum to 1000000
const sumOfWeights = Object.values(weights).reduce((t, v) => t + v, 0);
const scaledWeights = Object.values(weights)
.map((weight) => Math.floor((weight * 1000000) / sumOfWeights));
const sumOfScaledWeights = Object.values(scaledWeights).reduce((t, v) => t + v, 0);
scaledWeights[0] += 1000000 - sumOfScaledWeights;
const authors = Object.keys(weights)
.map((authorAddress, i) => ({ authorAddress, weightPPM: scaledWeights[i] }));
return authors;
};
module.exports = computeAuthorWeights;

View File

@ -0,0 +1,23 @@
const {
dao,
} = require('../../util/contracts');
const computeMatrixPoolResult = async (matrixPool) => {
// This should already contain all the info we need to evaluate the outcome
const { stakes, quorum, winRatio } = matrixPool;
const stakedFor = stakes
.filter((x) => x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const stakedAgainst = stakes
.filter((x) => !x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const votePasses = stakedFor * winRatio[1] >= (stakedFor + stakedAgainst) * winRatio[0];
const totalSupply = Number(await dao.totalSupply());
const quorumMet = (stakedFor + stakedAgainst) * quorum[1] >= totalSupply * quorum[0];
const result = {
stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet,
};
return result;
};
module.exports = computeMatrixPoolResult;

View File

@ -0,0 +1,9 @@
const {
ROLLUP_BATCH_SIZE,
ROLLUP_AVAILABILITY_STAKE_DURATION,
} = process.env;
module.exports = {
rollupBatchSize: ROLLUP_BATCH_SIZE || 10,
availabilityStakeDuration: ROLLUP_AVAILABILITY_STAKE_DURATION || 600,
};

View File

@ -0,0 +1,192 @@
const { isEqual } = require('lodash');
const { registerDecider } = require('../validation-pools');
const { registerMatrixEventHandler, sendMatrixText } = require('../../matrix-bot');
const { matrixPools, matrixUserToAuthorAddress, applicationData } = require('../../util/db');
const {
rollup, wallet,
} = require('../../util/contracts');
const read = require('../../util/forum/read');
const { availabilityStakeDuration } = require('./config');
const {
stakeRollupAvailability, getBatchPostAuthorWeights, authorsMatch, validatePost,
} = require('./utils');
const computeMatrixPoolResult = require('./compute-matrix-pool-result');
const { initializeBatchItems } = require('./batch-items');
const submitRollup = require('./submit-rollup');
const { getCurrentBatchWorker, initializeBatchWorker } = require('./batch-worker');
const { initiateMatrixPool } = require('./matrix-pools/initiate');
let batchItems;
const start = async () => {
console.log('registering validation pool decider for rollup');
registerDecider(async (pool, post) => {
// If this is not sent by the work1 contract, it's not of interest here.
if (pool.sender !== rollup.target) return false;
// A rollup post should contain
// - a list of off-chain validation pools
// - authorship corresponding to the result of those off-chain pools
if (!post.embeddedData?.batchItems) return false;
// Our task here is to check whether the posted result agrees with our own computations
let expectedAuthors;
try {
expectedAuthors = await getBatchPostAuthorWeights(post.embeddedData.batchItems);
} catch (e) {
console.error('Error calculating batch post author weights', e);
return false;
}
const valid = authorsMatch(post.authors, expectedAuthors);
console.log(`batch post ${pool.props.postId} is ${valid ? 'valid' : 'invalid'}`);
return valid;
});
// Even if we're not the current batch worker, keep track of batch items
initializeBatchItems();
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
// Check for an assigned batch worker
await initializeBatchWorker();
// Stake availability and set an interval to maintain it
await stakeRollupAvailability();
setInterval(stakeRollupAvailability, availabilityStakeDuration * 1000);
/// `sender` is the address that called Rollup.addItem on chain, i.e. the Work2 contract.
rollup.on('BatchItemAdded', async (postId, sender, fee) => {
// If we are the batch worker or there is no batch worker, initiate a matrix pool
const batchWorker = getCurrentBatchWorker();
if (batchWorker === await wallet.getAddress()
|| batchWorker === '0x0000000000000000000000000000000000000000') {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
// Initialize a matrix pool
try {
await matrixPools.get(postId);
// If this doesn't throw, it means we or someone else already sent this event
console.log(`Matrix pool start event has already been sent for postId ${postId}`);
} catch (e) {
if (e.status === 404) {
await initiateMatrixPool(postId, post, sender, fee);
} else {
throw e;
}
}
}
});
registerMatrixEventHandler(async (client, roomId, event) => {
switch (event.type) {
case 'io.dgov.pool.start': {
// Note that matrix pools are identified by the postId to which they pertain.
// This means that for a given post there can only be one matrix pool at a time.
const { postId, sender, ...params } = event.content;
// We can use LevelDB to store information about validation pools
const eventId = event.event_id;
console.log('Matrix pool started', { postId, ...params });
// Validate the target post, and stake for/against
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
break;
}
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
break;
}
case 'io.dgov.pool.stake': {
const { postId, amount, inFavor } = event.content;
let account;
try {
account = await matrixUserToAuthorAddress(event.sender);
} catch (e) {
// Error, sender has not registered their matrix identity
sendMatrixText(`Matrix user ${event.sender} has not registered their wallet address`);
break;
}
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}. Stake sent by ${event.sender}`);
break;
}
const stake = { account, amount, inFavor };
matrixPool.stakes = matrixPool.stakes ?? [];
matrixPool.stakes.push(stake);
await matrixPools.put(postId, matrixPool);
console.log(`registered stake in matrix pool for post ${postId} by ${account}`);
break;
}
case 'io.dgov.pool.result': {
// This should be sent by the current batch worker
// const { stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet, } = result;
const { postId, result } = event.content;
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received result for unknown matrix pool, for post ${postId}. Result sent by ${event.sender}`);
break;
}
// Compare batch worker's result with ours to verify and provide early warning
const expectedResult = await computeMatrixPoolResult(matrixPool);
if (!isEqual(result, expectedResult)) {
sendMatrixText(`Unexpected result for matrix pool, for post ${postId}. Result sent by ${event.sender}\n\n`
+ `received ${JSON.stringify(result)}\n`
+ `expected ${JSON.stringify(expectedResult)}`);
}
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
break;
}
case 'io.dgov.rollup.submit': {
// This should include the identifier of the on-chain validation pool
const { batchPostId, batchItems: batchItems_, authors } = event.content;
let batchPostIds;
try {
batchPostIds = await applicationData.get('batchPostIds');
} catch (e) {
batchPostIds = [];
}
batchPostIds.push(batchPostId);
await applicationData.put('batchPostIds', batchPostIds);
// Compare batch worker's result with ours to verify
const expectedAuthors = await getBatchPostAuthorWeights(batchItems_);
if (!authorsMatch(authors, expectedAuthors)) {
sendMatrixText(`Unexpected result for batch post ${batchPostId}`);
}
// Reset batchItems in preparation for next batch
const nextBatchItems = batchItems.filter((postId) => !batchPostIds.includes(postId));
batchItems = nextBatchItems;
await applicationData.put('batchItems', batchItems);
break;
}
default:
}
});
};
module.exports = {
start,
submitRollup,
};

View File

@ -0,0 +1,44 @@
const { sendMatrixEvent } = require('../../../matrix-bot');
const { wallet } = require('../../../util/contracts');
const { matrixPools } = require('../../../util/db');
const { addBatchItem, getBatchItems } = require('../batch-items');
const { getCurrentBatchWorker } = require('../batch-worker');
const computeMatrixPoolResult = require('../compute-matrix-pool-result');
const { rollupBatchSize } = require('../config');
const submitRollup = require('../submit-rollup');
const { stakeRollupAvailability } = require('../utils');
const evaluateMatrixPoolOutcome = async (postId) => {
const matrixPool = await matrixPools.get(postId);
const result = await computeMatrixPoolResult(matrixPool);
console.log(`Matrix pool for post ${postId} outcome evaluated`, result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', { postId, result });
await addBatchItem(postId);
let submitBatch = false;
const batchWorker = getCurrentBatchWorker();
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
console.log('There is no batch worker assigned. Staking availability and submitting first batch.');
submitBatch = true;
} else if (batchWorker === await wallet.getAddress()) {
// If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
const batchItems = getBatchItems();
if (batchItems.length === rollupBatchSize) {
console.log(`Batch size = ${batchItems.length}. Submitting batch.`);
submitBatch = true;
}
}
if (submitBatch) {
await stakeRollupAvailability();
await submitRollup();
}
return result;
};
module.exports = evaluateMatrixPoolOutcome;

View File

@ -0,0 +1,40 @@
const { sendMatrixEvent } = require('../../../matrix-bot');
const { validatePost } = require('../utils');
const evaluateMatrixPoolOutcome = require('./evaluate');
const initiateMatrixPool = async (postId, post, sender, fee) => {
const duration = 20;
const quorum = [1, 3];
const winRatio = [1, 2];
const params = {
sender,
fee: Number(fee),
duration,
quorum,
winRatio,
};
console.log('sending matrix pool start event');
const { roomId, eventId } = await sendMatrixEvent('io.dgov.pool.start', {
postId,
...params,
});
console.log('sent matrix pool start event');
// Register our own stake and send a message
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
// Since we're assuming responsibility as the batch worker,
// set a timeout to evaulate the outcome
setTimeout(
() => {
evaluateMatrixPoolOutcome(postId);
},
duration * 1000,
);
};
module.exports = {
initiateMatrixPool,
};

View File

@ -0,0 +1,51 @@
const { v4: uuidv4 } = require('uuid');
const write = require('../../util/forum/write');
const addPostWithRetry = require('../../util/add-post-with-retry');
const callWithRetry = require('../../util/call-with-retry');
const { getBatchItems, clearBatchItems } = require('./batch-items');
const computeAuthorWeights = require('./compute-author-weights');
const { wallet, rollup } = require('../../util/contracts');
const { sendMatrixEvent } = require('../../matrix-bot');
const { stakeRollupAvailability } = require('./utils');
const submitRollup = async () => {
const batchItems = getBatchItems();
if (!batchItems.length) {
return { batchItems: [] };
}
const authors = await computeAuthorWeights(batchItems);
// TODO: Compute citations as aggregate of the citations of posts in the batch
const citations = [];
const content = `Batch of ${batchItems.length} items`;
const embeddedData = {
batchItems,
nonce: uuidv4().replaceAll('-', ''),
};
const sender = await wallet.getAddress();
const contentToVerify = `${content}\n\n${JSON.stringify(embeddedData, null, 2)}`;
const signature = await wallet.signMessage(contentToVerify);
// Write to the forum database
const { hash: batchPostId } = await write({
sender, authors, citations, content, embeddedData, signature,
});
// Add rollup post on-chain
await addPostWithRetry(authors, batchPostId, citations);
// Stake our availability to be the next rollup worker
await stakeRollupAvailability();
// Call Rollup.submitBatch
console.log('Submitting batch', { batchPostId, batchItems, authors });
const poolDuration = 60;
await callWithRetry(() => rollup.submitBatch(batchPostId, batchItems, poolDuration));
// Send matrix event
await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors });
// Clear the batch in preparation for next batch
await clearBatchItems(batchItems);
return {
batchPostId,
batchItems,
authors,
};
};
module.exports = submitRollup;

View File

@ -0,0 +1,62 @@
const { sendMatrixEvent } = require('../../matrix-bot');
const callWithRetry = require('../../util/call-with-retry');
const {
rollup, wallet, dao,
work2,
} = require('../../util/contracts');
const { matrixPools } = require('../../util/db');
const { availabilityStakeDuration } = require('./config');
const stakeRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
if (currentRep) {
await callWithRetry(() => dao.stakeAvailability(
rollup.target,
currentRep,
availabilityStakeDuration,
));
}
};
const authorsMatch = async (authors, expectedAuthors) => {
if (expectedAuthors.length !== authors.length) return false;
return authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
};
const validateWorkEvidence = async (sender, post) => {
let valid = false;
if (sender === work2.target) {
const expectedContent = 'This is a work evidence post';
valid = post.content.startsWith(expectedContent);
}
console.log(`Work evidence ${valid ? 'matched' : 'did not match'} the expected content`);
return valid;
};
const validatePost = async ({
sender, post, postId, roomId, eventId, ...params
}) => {
const currentRep = Number(await dao.balanceOf(await wallet.getAddress()));
const valid = await validateWorkEvidence(sender, post);
const stake = { amount: currentRep, account: await wallet.getAddress(), inFavor: valid };
sendMatrixEvent('io.dgov.pool.stake', { postId, amount: currentRep, inFavor: valid });
const matrixPool = {
postId,
roomId,
eventId,
...params,
stakes: [stake],
};
console.log('matrixPool', matrixPool);
await matrixPools.put(postId, matrixPool);
};
module.exports = {
stakeRollupAvailability,
authorsMatch,
validateWorkEvidence,
validatePost,
};

View File

@ -23,7 +23,7 @@ const matrixClient = new MatrixClient(
);
let joinedRooms;
const { startOutboundQueue, sendMatrixEvent, sendMatrixText } = require('./outbound-queue');
const { initializeOutboundQueue, sendMatrixEvent, sendMatrixText } = require('./outbound-queue');
const start = async () => {
// Automatically join a room to which we are invited
@ -35,7 +35,7 @@ const start = async () => {
matrixClient.start().then(() => {
console.log('Matrix bot started!');
// Start the outbound queue
startOutboundQueue(matrixClient);
initializeOutboundQueue(matrixClient);
});
};

View File

@ -5,17 +5,7 @@ const { applicationData } = require('../util/db');
let matrixClient;
let targetRoomId;
const setTargetRoomId = async (roomId) => {
targetRoomId = roomId;
console.log('target room ID:', targetRoomId);
await applicationData.put('targetRoomId', targetRoomId);
};
const processOutboundQueue = async ({ type, ...args }) => {
if (!targetRoomId) {
console.log('targetRoomId not set, cannot deliver message');
return;
}
switch (type) {
case 'MatrixEvent': {
const { eventType, content, onSend } = args;
@ -34,10 +24,19 @@ const processOutboundQueue = async ({ type, ...args }) => {
};
const outboundQueue = fastq(processOutboundQueue, 1);
// Pause until matrixClient is set
// Pause outbound queue until matrixClient and targetRoomId are set
outboundQueue.pause();
const startOutboundQueue = async (matrixClient_) => {
const setTargetRoomId = async (roomId) => {
targetRoomId = roomId;
console.log('target room ID:', targetRoomId);
await applicationData.put('targetRoomId', targetRoomId);
if (matrixClient) {
outboundQueue.resume();
}
};
const initializeOutboundQueue = async (matrixClient_) => {
matrixClient = matrixClient_;
try {
targetRoomId = await applicationData.get('targetRoomId');
@ -46,7 +45,9 @@ const startOutboundQueue = async (matrixClient_) => {
// No target room set
console.warn('target room ID is not set -- will not be able to send messages until it is set. Use !target <bot-id>');
}
outboundQueue.resume();
if (targetRoomId) {
outboundQueue.resume();
}
};
const sendMatrixEvent = async (eventType, content) => new Promise((resolve) => {
@ -73,7 +74,7 @@ const sendMatrixText = async (text) => new Promise((resolve) => {
module.exports = {
setTargetRoomId,
outboundQueue,
startOutboundQueue,
initializeOutboundQueue,
sendMatrixEvent,
sendMatrixText,
};

View File

@ -1,4 +1,4 @@
const callWithRetry = require('./call-contract-method-with-retry');
const callWithRetry = require('./call-with-retry');
const { dao } = require('./contracts');
const addPostWithRetry = async (authors, hash, citations) => {