diff --git a/backend/package-lock.json b/backend/package-lock.json index b9e7180..bd302e2 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -18,6 +18,7 @@ "express-async-errors": "^3.1.1", "fastq": "^1.17.1", "level": "^8.0.1", + "lodash": "^4.17.21", "matrix-bot-sdk": "^0.7.1", "object-hash": "^3.0.0", "uuid": "^9.0.1" diff --git a/backend/package.json b/backend/package.json index 0d6f70d..fbc47f9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -19,6 +19,7 @@ "express-async-errors": "^3.1.1", "fastq": "^1.17.1", "level": "^8.0.1", + "lodash": "^4.17.21", "matrix-bot-sdk": "^0.7.1", "object-hash": "^3.0.0", "uuid": "^9.0.1" diff --git a/backend/src/event-handlers/rollup.js b/backend/src/event-handlers/rollup.js index a67b1c7..9197803 100644 --- a/backend/src/event-handlers/rollup.js +++ b/backend/src/event-handlers/rollup.js @@ -1,7 +1,7 @@ const Promise = require('bluebird'); const { v4: uuidv4 } = require('uuid'); +const { isEqual } = require('lodash'); -const { getContractAddressByNetworkName } = require('../util/contract-config'); const { registerDecider } = require('./validation-pools'); const { registerMatrixEventHandler, sendMatrixEvent, sendMatrixText } = require('../matrix-bot'); const { matrixPools, matrixUserToAuthorAddress, applicationData } = require('../util/db'); @@ -13,16 +13,29 @@ const write = require('../util/forum/write'); const addPostWithRetry = require('../util/add-post-with-retry'); const { - ETH_NETWORK, + ROLLUP_BATCH_SIZE, + ROLLUP_AVAILABILITY_STAKE_DURATION, } = process.env; +const rollupBatchSize = ROLLUP_BATCH_SIZE || 1; +const availabilityStakeDuration = ROLLUP_AVAILABILITY_STAKE_DURATION || 600; + let batchWorker; +let batchItems; -const rollupAddress = getContractAddressByNetworkName(ETH_NETWORK, 'Rollup'); +const stakeRollupAvailability = async () => { + const currentRep = await dao.balanceOf(await wallet.getAddress()); + await dao.stakeAvailability(rollup.target, currentRep, availabilityStakeDuration); +}; -const getBatchPostAuthorWeights = async (batchItems) => { +const extendRollupAvailability = async () => { + const currentRep = await dao.balanceOf(await wallet.getAddress()); + await dao.stakeAvailability(rollup.target, currentRep, availabilityStakeDuration); +}; + +const getBatchPostAuthorWeights = async (batchItems_) => { const weights = {}; - await Promise.each(batchItems, async (postId) => { + await Promise.each(batchItems_, async (postId) => { // TODO: try/catch const post = await read(postId); // TODO: try/catch @@ -49,7 +62,7 @@ const getBatchPostAuthorWeights = async (batchItems) => { }); }; -const submitBatchPost = async (batchItems) => { +const submitBatchPost = async () => { const authors = await getBatchPostAuthorWeights(batchItems); // TODO: Compute citations as aggregate of the citations of posts in the batch const citations = []; @@ -67,12 +80,19 @@ const submitBatchPost = async (batchItems) => { }); // Add rollup post on-chain await addPostWithRetry(authors, batchPostId, citations); + // Stake our availability to be the next rollup worker + await stakeRollupAvailability(); // Call Rollup.submitBatch const poolDuration = 60; await rollup.submitBatch(batchPostId, batchItems.length, poolDuration); + // Send matrix event + await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors }); + // Clear the batch in preparation for next batch + batchItems = []; + return batchPostId; }; -const evaluateMatrixPoolOutcome = async (postId) => { +const evaluateMatrixPoolOutcome = async (postId, { dryRun } = {}) => { const matrixPool = await matrixPools.get(postId); // This should already contain all the info we need to evaluate the outcome const { stakes, quorum, winRatio } = matrixPool; @@ -88,29 +108,43 @@ const evaluateMatrixPoolOutcome = async (postId) => { const result = { stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet, }; - console.log('Matrix pool outcome evaluated', result); - matrixPool.result = result; - await matrixPools.put(postId, matrixPool); - sendMatrixEvent('io.dgov.pool.result', result); + if (!dryRun) { + console.log(`Matrix pool for post ${postId} outcome evaluated`, result); + matrixPool.result = result; + await matrixPools.put(postId, matrixPool); + sendMatrixEvent('io.dgov.pool.result', { postId, result }); - // Even if we're not the current batch worker, keep track of batch items - let batchItems; - try { - batchItems = await applicationData.get('batchItems'); - } catch (e) { - batchItems = []; - } - batchItems.push(postId); - await applicationData.put('batchItems', batchItems); + batchItems.push(postId); + await applicationData.put('batchItems', batchItems); - if (batchWorker === '0x0000000000000000000000000000000000000000') { - // TODO: If there's no batch worker, we should stake our availability - // and then submit the batch immediately. - await submitBatchPost(batchItems); - } else if (batchWorker === await wallet.getAddress()) { - // TODO: If we are the batch worker, we should wait an appropriate amout of time / - // number of matrix pools before submitting a batch. + let submitBatch = false; + if (batchWorker === '0x0000000000000000000000000000000000000000') { + // If there's no batch worker, we should stake our availability + // and then submit the batch immediately. + console.log('There is no batch worker assigned. Staking availability and submitting first batch.'); + submitBatch = true; + } else if (batchWorker === await wallet.getAddress()) { + // If we are the batch worker, we should wait an appropriate amout of time / + // number of matrix pools before submitting a batch. + if (batchItems.length === rollupBatchSize) { + console.log(`Batch size = ${batchItems.length}. Submitting batch.`); + submitBatch = true; + } + } + if (submitBatch) { + await stakeRollupAvailability(); + await submitBatchPost(); + } } + return result; +}; + +const authorsMatch = async (authors, expectedAuthors) => { + if (expectedAuthors.length !== authors.length) return false; + return authors.every(({ authorAddress, weightPPM }) => { + const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress); + return weightPPM === expectedAuthor.weightPPM; + }); }; const validateWorkEvidence = async (sender, post) => { @@ -127,7 +161,7 @@ const start = async () => { console.log('registering validation pool decider for rollup'); registerDecider(async (pool, post) => { // If this is not sent by the work1 contract, it's not of interest here. - if (pool.sender !== rollupAddress) return false; + if (pool.sender !== rollup.target) return false; // A rollup post should contain // - a list of off-chain validation pools @@ -136,20 +170,33 @@ const start = async () => { // Our task here is to check whether the posted result agrees with our own computations const expectedAuthors = await getBatchPostAuthorWeights(post.embeddedData.matrixPools); - if (expectedAuthors.length !== post.authors.length) return false; - return post.authors.every(({ authorAddress, weightPPM }) => { - const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress); - return weightPPM === expectedAuthor.weightPPM; - }); + return authorsMatch(post.authors, expectedAuthors); }); + // Even if we're not the current batch worker, keep track of batch items + try { + batchItems = await applicationData.get('batchItems'); + } catch (e) { + batchItems = []; + } + // Check for an assigned batch worker batchWorker = await rollup.batchWorker(); console.log('At startup', { batchWorker }); + // Stake availability and + await stakeRollupAvailability(); + // Set an interval to maintain it + setInterval(() => { + + }); + rollup.on('BatchWorkerAssigned', async (batchWorker_) => { console.log('Batch worker assigned:', batchWorker); batchWorker = batchWorker_; + if (batchWorker === await wallet.getAddress()) { + console.log('This instance is the new batch worker'); + } }); /// `sender` is the address that called Rollup.addItem on chain, i.e. the Work2 contract. @@ -241,15 +288,15 @@ const start = async () => { } catch (e) { // Error, sender has not registered their matrix identity sendMatrixText(`Matrix user ${event.sender} has not registered their wallet address`); - return; + break; } let matrixPool; try { matrixPool = await matrixPools.get(postId); } catch (e) { // Error. matrix pool not found - sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}`); - return; + sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}. Stake sent by ${event.sender}`); + break; } const stake = { account, amount, inFavor }; matrixPool.stakes = matrixPool.stakes ?? []; @@ -260,32 +307,48 @@ const start = async () => { } case 'io.dgov.pool.result': { // This should be sent by the current batch worker - // TODO: Compare batch worker's result with ours to verify and provide early warning + // const { stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet, } = result; + const { postId, result } = event.content; + let matrixPool; + try { + matrixPool = await matrixPools.get(postId); + } catch (e) { + // Error. matrix pool not found + sendMatrixText(`Received result for unknown matrix pool, for post ${postId}. Result sent by ${event.sender}`); + break; + } + // Compare batch worker's result with ours to verify and provide early warning + const expectedResult = await evaluateMatrixPoolOutcome(postId, { dryRun: true }); + if (!isEqual(result, expectedResult)) { + sendMatrixText(`Unexpected result for matrix pool, for post ${postId}. Result sent by ${event.sender}\n\n` + + `received ${JSON.stringify(result)}\n` + + `expected ${JSON.stringify(expectedResult)}`); + } + matrixPool.result = result; + await matrixPools.put(postId, matrixPool); break; } case 'io.dgov.rollup.submit': { // This should include the identifier of the on-chain validation pool - // TODO: Compare batch worker's result with ours to verify + const { batchPostId, batchItems: batchItems_, authors } = event.content; + let batchPostIds; + try { + batchPostIds = await applicationData.get('batchPostIds'); + } catch (e) { + batchPostIds = []; + } + batchPostIds.push(batchPostId); + await applicationData.put('batchPostIds', batchPostIds); + // Compare batch worker's result with ours to verify + const expectedAuthors = await getBatchPostAuthorWeights(batchItems_); + if (!authorsMatch(authors, expectedAuthors)) { + sendMatrixText(`Unexpected result for batch post ${batchPostId}`); + } break; } default: } }); - - // Put in our availability stake to do the rollup work - // Then check if there is a rollup worker. - // It's possible we're the first or only worker; - // if there is no worker assigned yet, we can submit the first one. - // The procedure will be the same every time we are the rollup worker: - // - Stake availability to be the next rollup worker; - // - Create a rollup post -- compute authorship based on matrix pool results - // - Send matrix event 'io.dgov.rollup.submit'? May not be necessary, - // as there will be notification from the contract itself. - // However, it would be nice to have in the tiemline of the room, - // and it could be nice to give the participants a heads up to expect the batch. - // We could even require it as part of batch validation. - // - Call DAO.addPost(authors, postId) - // - Call Rollup.submitBatch(postId, batchSize, poolDuration) }; module.exports = { diff --git a/ethereum/contracts/Availability.sol b/ethereum/contracts/Availability.sol index 51a98ef..fb04ded 100644 --- a/ethereum/contracts/Availability.sol +++ b/ethereum/contracts/Availability.sol @@ -13,6 +13,7 @@ contract Availability is IAcceptAvailability, DAOContract { } mapping(uint => AvailabilityStake) public stakes; + mapping(address worker => uint stakeIndex) activeWorkerStakes; uint public stakeCount; event AvailabilityStaked(uint stakeIndex); @@ -21,7 +22,7 @@ contract Availability is IAcceptAvailability, DAOContract { /// Accept availability stakes as reputation token transfer function acceptAvailability( - address sender, + address worker, uint256 amount, uint duration ) external { @@ -30,29 +31,24 @@ contract Availability is IAcceptAvailability, DAOContract { "acceptAvailability must only be called by DAO contract" ); require(amount > 0, "No stake provided"); - uint stakeIndex = stakeCount++; + // If we already have a stake for this worker, replace it + uint stakeIndex = activeWorkerStakes[worker]; + if (stakeIndex == 0 && stakes[stakeIndex].worker != worker) { + // We don't have an existing stake for this worker + stakeIndex = stakeCount++; + activeWorkerStakes[worker] = stakeIndex; + } else if (stakes[stakeIndex].assigned) { + // Stake has already been assigned; We need to create a new one + stakeIndex = stakeCount++; + activeWorkerStakes[worker] = stakeIndex; + } AvailabilityStake storage stake = stakes[stakeIndex]; - stake.worker = sender; + stake.worker = worker; stake.amount = amount; stake.endTime = block.timestamp + duration; emit AvailabilityStaked(stakeIndex); } - function extendAvailability(uint stakeIndex, uint duration) external { - AvailabilityStake storage stake = stakes[stakeIndex]; - require( - msg.sender == stake.worker, - "Worker can only extend their own availability stake" - ); - require(!stake.assigned, "Stake has already been assigned work"); - if (block.timestamp > stake.endTime) { - stake.endTime = block.timestamp + duration; - } else { - stake.endTime = stake.endTime + duration; - } - emit AvailabilityStaked(stakeIndex); - } - /// Select a worker randomly from among the available workers, weighted by amount staked function randomWeightedSelection() internal view returns (uint stakeIndex) { uint totalStakes; diff --git a/ethereum/contracts/core/DAO.sol b/ethereum/contracts/core/DAO.sol index 4906651..bca74ec 100644 --- a/ethereum/contracts/core/DAO.sol +++ b/ethereum/contracts/core/DAO.sol @@ -124,7 +124,7 @@ contract DAO { uint256 value, uint duration ) external returns (bool) { - rep.approve(msg.sender, to, rep.allowance(msg.sender, to) + value); + rep.approve(msg.sender, to, value); IAcceptAvailability(to).acceptAvailability(msg.sender, value, duration); return true; } diff --git a/ethereum/test/Work1.js b/ethereum/test/Work1.js index bb00bd2..38cc2dc 100644 --- a/ethereum/test/Work1.js +++ b/ethereum/test/Work1.js @@ -54,12 +54,9 @@ describe('Work1', () => { let dao; let work1; let account1; - let account2; beforeEach(async () => { - ({ - dao, work1, account1, account2, - } = await loadFixture(deploy)); + ({ dao, work1, account1 } = await loadFixture(deploy)); await expect(dao.stakeAvailability(work1.target, 50, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0); }); @@ -84,30 +81,33 @@ describe('Work1', () => { it('should be able to extend the duration of an availability stake before it expires', async () => { await time.increase(STAKE_DURATION / 2); - await expect(work1.extendAvailability(0, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0); + expect(await work1.stakeCount()).to.equal(1); + await expect(dao.stakeAvailability(work1.target, 50, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0); + expect(await work1.stakeCount()).to.equal(1); }); it('should be able to extend the duration of an availability stake after it expires', async () => { await time.increase(STAKE_DURATION * 2); - await work1.extendAvailability(0, STAKE_DURATION); + expect(await work1.stakeCount()).to.equal(1); + await dao.stakeAvailability(work1.target, 50, STAKE_DURATION); + expect(await work1.stakeCount()).to.equal(1); }); - it('should not be able to extend the duration of another worker\'s availability stake', async () => { - await time.increase(STAKE_DURATION * 2); - await expect(work1.connect(account2).extendAvailability(0, STAKE_DURATION)).to.be.revertedWith('Worker can only extend their own availability stake'); - }); - - it('extending a stake before expiration should increase the end time by the given duration', async () => { + it('extending a stake before expiration should reset the end time to the new duration from the present', async () => { await time.increase(STAKE_DURATION / 2); - await work1.extendAvailability(0, STAKE_DURATION * 2); - const expectedEndTime = await time.latest() + 2.5 * STAKE_DURATION; + expect(await work1.stakeCount()).to.equal(1); + await dao.stakeAvailability(work1.target, 50, STAKE_DURATION * 2); + expect(await work1.stakeCount()).to.equal(1); + const expectedEndTime = await time.latest() + 2 * STAKE_DURATION; const stake = await work1.stakes(0); expect(stake.endTime).to.be.within(expectedEndTime - 1, expectedEndTime); }); it('extending a stake after expiration should restart the stake for the given duration', async () => { await time.increase(STAKE_DURATION * 2); - await work1.extendAvailability(0, STAKE_DURATION * 2); + expect(await work1.stakeCount()).to.equal(1); + await dao.stakeAvailability(work1.target, 50, STAKE_DURATION * 2); + expect(await work1.stakeCount()).to.equal(1); const expectedEndTime = await time.latest() + STAKE_DURATION * 2; const stake = await work1.stakes(0); expect(stake.endTime).to.be.within(expectedEndTime - 1, expectedEndTime); @@ -164,14 +164,16 @@ describe('Work1', () => { await expect(requestWork()).to.be.revertedWith('No available worker stakes'); }); - it('should not be able to extend a stake that has been assigned work', async () => { + it('after a stake has been assigned work, staking again should create a new stake', async () => { const { dao, work1, account2, } = await loadFixture(deploy); await dao.stakeAvailability(work1.target, 50, STAKE_DURATION); await work1.connect(account2).requestWork('req-content-id', { value: WORK1_PRICE }); await time.increase(STAKE_DURATION * 2); - await expect(work1.extendAvailability(0, STAKE_DURATION)).to.be.revertedWith('Stake has already been assigned work'); + expect(await work1.stakeCount()).to.equal(1); + await dao.stakeAvailability(work1.target, 50, STAKE_DURATION); + expect(await work1.stakeCount()).to.equal(2); }); }); diff --git a/frontend/src/components/work-contracts/AvailabilityStakes.jsx b/frontend/src/components/work-contracts/AvailabilityStakes.jsx index ffc7c19..823e05c 100644 --- a/frontend/src/components/work-contracts/AvailabilityStakes.jsx +++ b/frontend/src/components/work-contracts/AvailabilityStakes.jsx @@ -74,13 +74,6 @@ function AvailabilityStakes({ setReputation(reputation / BigInt(2)); }, [DAO, workContract, account, reputation, setReputation]); - const extendAvailabilityStake = useCallback(async (stakeIndex, duration) => { - await workContract.methods.extendAvailability(stakeIndex, duration).send({ - from: account, - gas: 999999, - }); - }, [workContract, account]); - const displayData = availabilityStakes.filter((stake) => { if (!stake) return false; if (!onlyShowAvailable) return true; @@ -118,7 +111,6 @@ function AvailabilityStakes({ {showAmount &&