rework availability contract to keep one active stake per worker

This commit is contained in:
Ladd Hoffman 2024-05-02 13:44:37 -05:00
parent 2790c9262b
commit c16db92e27
7 changed files with 152 additions and 111 deletions

View File

@ -18,6 +18,7 @@
"express-async-errors": "^3.1.1",
"fastq": "^1.17.1",
"level": "^8.0.1",
"lodash": "^4.17.21",
"matrix-bot-sdk": "^0.7.1",
"object-hash": "^3.0.0",
"uuid": "^9.0.1"

View File

@ -19,6 +19,7 @@
"express-async-errors": "^3.1.1",
"fastq": "^1.17.1",
"level": "^8.0.1",
"lodash": "^4.17.21",
"matrix-bot-sdk": "^0.7.1",
"object-hash": "^3.0.0",
"uuid": "^9.0.1"

View File

@ -1,7 +1,7 @@
const Promise = require('bluebird');
const { v4: uuidv4 } = require('uuid');
const { isEqual } = require('lodash');
const { getContractAddressByNetworkName } = require('../util/contract-config');
const { registerDecider } = require('./validation-pools');
const { registerMatrixEventHandler, sendMatrixEvent, sendMatrixText } = require('../matrix-bot');
const { matrixPools, matrixUserToAuthorAddress, applicationData } = require('../util/db');
@ -13,16 +13,29 @@ const write = require('../util/forum/write');
const addPostWithRetry = require('../util/add-post-with-retry');
const {
ETH_NETWORK,
ROLLUP_BATCH_SIZE,
ROLLUP_AVAILABILITY_STAKE_DURATION,
} = process.env;
const rollupBatchSize = ROLLUP_BATCH_SIZE || 1;
const availabilityStakeDuration = ROLLUP_AVAILABILITY_STAKE_DURATION || 600;
let batchWorker;
let batchItems;
const rollupAddress = getContractAddressByNetworkName(ETH_NETWORK, 'Rollup');
const stakeRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
await dao.stakeAvailability(rollup.target, currentRep, availabilityStakeDuration);
};
const getBatchPostAuthorWeights = async (batchItems) => {
const extendRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
await dao.stakeAvailability(rollup.target, currentRep, availabilityStakeDuration);
};
const getBatchPostAuthorWeights = async (batchItems_) => {
const weights = {};
await Promise.each(batchItems, async (postId) => {
await Promise.each(batchItems_, async (postId) => {
// TODO: try/catch
const post = await read(postId);
// TODO: try/catch
@ -49,7 +62,7 @@ const getBatchPostAuthorWeights = async (batchItems) => {
});
};
const submitBatchPost = async (batchItems) => {
const submitBatchPost = async () => {
const authors = await getBatchPostAuthorWeights(batchItems);
// TODO: Compute citations as aggregate of the citations of posts in the batch
const citations = [];
@ -67,12 +80,19 @@ const submitBatchPost = async (batchItems) => {
});
// Add rollup post on-chain
await addPostWithRetry(authors, batchPostId, citations);
// Stake our availability to be the next rollup worker
await stakeRollupAvailability();
// Call Rollup.submitBatch
const poolDuration = 60;
await rollup.submitBatch(batchPostId, batchItems.length, poolDuration);
// Send matrix event
await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors });
// Clear the batch in preparation for next batch
batchItems = [];
return batchPostId;
};
const evaluateMatrixPoolOutcome = async (postId) => {
const evaluateMatrixPoolOutcome = async (postId, { dryRun } = {}) => {
const matrixPool = await matrixPools.get(postId);
// This should already contain all the info we need to evaluate the outcome
const { stakes, quorum, winRatio } = matrixPool;
@ -88,29 +108,43 @@ const evaluateMatrixPoolOutcome = async (postId) => {
const result = {
stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet,
};
console.log('Matrix pool outcome evaluated', result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', result);
if (!dryRun) {
console.log(`Matrix pool for post ${postId} outcome evaluated`, result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', { postId, result });
// Even if we're not the current batch worker, keep track of batch items
let batchItems;
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// TODO: If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
await submitBatchPost(batchItems);
} else if (batchWorker === await wallet.getAddress()) {
// TODO: If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
let submitBatch = false;
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
console.log('There is no batch worker assigned. Staking availability and submitting first batch.');
submitBatch = true;
} else if (batchWorker === await wallet.getAddress()) {
// If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
if (batchItems.length === rollupBatchSize) {
console.log(`Batch size = ${batchItems.length}. Submitting batch.`);
submitBatch = true;
}
}
if (submitBatch) {
await stakeRollupAvailability();
await submitBatchPost();
}
}
return result;
};
const authorsMatch = async (authors, expectedAuthors) => {
if (expectedAuthors.length !== authors.length) return false;
return authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
};
const validateWorkEvidence = async (sender, post) => {
@ -127,7 +161,7 @@ const start = async () => {
console.log('registering validation pool decider for rollup');
registerDecider(async (pool, post) => {
// If this is not sent by the work1 contract, it's not of interest here.
if (pool.sender !== rollupAddress) return false;
if (pool.sender !== rollup.target) return false;
// A rollup post should contain
// - a list of off-chain validation pools
@ -136,20 +170,33 @@ const start = async () => {
// Our task here is to check whether the posted result agrees with our own computations
const expectedAuthors = await getBatchPostAuthorWeights(post.embeddedData.matrixPools);
if (expectedAuthors.length !== post.authors.length) return false;
return post.authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
return authorsMatch(post.authors, expectedAuthors);
});
// Even if we're not the current batch worker, keep track of batch items
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
// Check for an assigned batch worker
batchWorker = await rollup.batchWorker();
console.log('At startup', { batchWorker });
// Stake availability and
await stakeRollupAvailability();
// Set an interval to maintain it
setInterval(() => {
});
rollup.on('BatchWorkerAssigned', async (batchWorker_) => {
console.log('Batch worker assigned:', batchWorker);
batchWorker = batchWorker_;
if (batchWorker === await wallet.getAddress()) {
console.log('This instance is the new batch worker');
}
});
/// `sender` is the address that called Rollup.addItem on chain, i.e. the Work2 contract.
@ -241,15 +288,15 @@ const start = async () => {
} catch (e) {
// Error, sender has not registered their matrix identity
sendMatrixText(`Matrix user ${event.sender} has not registered their wallet address`);
return;
break;
}
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}`);
return;
sendMatrixText(`Received stake for unknown matrix pool, for post ${postId}. Stake sent by ${event.sender}`);
break;
}
const stake = { account, amount, inFavor };
matrixPool.stakes = matrixPool.stakes ?? [];
@ -260,32 +307,48 @@ const start = async () => {
}
case 'io.dgov.pool.result': {
// This should be sent by the current batch worker
// TODO: Compare batch worker's result with ours to verify and provide early warning
// const { stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet, } = result;
const { postId, result } = event.content;
let matrixPool;
try {
matrixPool = await matrixPools.get(postId);
} catch (e) {
// Error. matrix pool not found
sendMatrixText(`Received result for unknown matrix pool, for post ${postId}. Result sent by ${event.sender}`);
break;
}
// Compare batch worker's result with ours to verify and provide early warning
const expectedResult = await evaluateMatrixPoolOutcome(postId, { dryRun: true });
if (!isEqual(result, expectedResult)) {
sendMatrixText(`Unexpected result for matrix pool, for post ${postId}. Result sent by ${event.sender}\n\n`
+ `received ${JSON.stringify(result)}\n`
+ `expected ${JSON.stringify(expectedResult)}`);
}
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
break;
}
case 'io.dgov.rollup.submit': {
// This should include the identifier of the on-chain validation pool
// TODO: Compare batch worker's result with ours to verify
const { batchPostId, batchItems: batchItems_, authors } = event.content;
let batchPostIds;
try {
batchPostIds = await applicationData.get('batchPostIds');
} catch (e) {
batchPostIds = [];
}
batchPostIds.push(batchPostId);
await applicationData.put('batchPostIds', batchPostIds);
// Compare batch worker's result with ours to verify
const expectedAuthors = await getBatchPostAuthorWeights(batchItems_);
if (!authorsMatch(authors, expectedAuthors)) {
sendMatrixText(`Unexpected result for batch post ${batchPostId}`);
}
break;
}
default:
}
});
// Put in our availability stake to do the rollup work
// Then check if there is a rollup worker.
// It's possible we're the first or only worker;
// if there is no worker assigned yet, we can submit the first one.
// The procedure will be the same every time we are the rollup worker:
// - Stake availability to be the next rollup worker;
// - Create a rollup post -- compute authorship based on matrix pool results
// - Send matrix event 'io.dgov.rollup.submit'? May not be necessary,
// as there will be notification from the contract itself.
// However, it would be nice to have in the tiemline of the room,
// and it could be nice to give the participants a heads up to expect the batch.
// We could even require it as part of batch validation.
// - Call DAO.addPost(authors, postId)
// - Call Rollup.submitBatch(postId, batchSize, poolDuration)
};
module.exports = {

View File

@ -13,6 +13,7 @@ contract Availability is IAcceptAvailability, DAOContract {
}
mapping(uint => AvailabilityStake) public stakes;
mapping(address worker => uint stakeIndex) activeWorkerStakes;
uint public stakeCount;
event AvailabilityStaked(uint stakeIndex);
@ -21,7 +22,7 @@ contract Availability is IAcceptAvailability, DAOContract {
/// Accept availability stakes as reputation token transfer
function acceptAvailability(
address sender,
address worker,
uint256 amount,
uint duration
) external {
@ -30,29 +31,24 @@ contract Availability is IAcceptAvailability, DAOContract {
"acceptAvailability must only be called by DAO contract"
);
require(amount > 0, "No stake provided");
uint stakeIndex = stakeCount++;
// If we already have a stake for this worker, replace it
uint stakeIndex = activeWorkerStakes[worker];
if (stakeIndex == 0 && stakes[stakeIndex].worker != worker) {
// We don't have an existing stake for this worker
stakeIndex = stakeCount++;
activeWorkerStakes[worker] = stakeIndex;
} else if (stakes[stakeIndex].assigned) {
// Stake has already been assigned; We need to create a new one
stakeIndex = stakeCount++;
activeWorkerStakes[worker] = stakeIndex;
}
AvailabilityStake storage stake = stakes[stakeIndex];
stake.worker = sender;
stake.worker = worker;
stake.amount = amount;
stake.endTime = block.timestamp + duration;
emit AvailabilityStaked(stakeIndex);
}
function extendAvailability(uint stakeIndex, uint duration) external {
AvailabilityStake storage stake = stakes[stakeIndex];
require(
msg.sender == stake.worker,
"Worker can only extend their own availability stake"
);
require(!stake.assigned, "Stake has already been assigned work");
if (block.timestamp > stake.endTime) {
stake.endTime = block.timestamp + duration;
} else {
stake.endTime = stake.endTime + duration;
}
emit AvailabilityStaked(stakeIndex);
}
/// Select a worker randomly from among the available workers, weighted by amount staked
function randomWeightedSelection() internal view returns (uint stakeIndex) {
uint totalStakes;

View File

@ -124,7 +124,7 @@ contract DAO {
uint256 value,
uint duration
) external returns (bool) {
rep.approve(msg.sender, to, rep.allowance(msg.sender, to) + value);
rep.approve(msg.sender, to, value);
IAcceptAvailability(to).acceptAvailability(msg.sender, value, duration);
return true;
}

View File

@ -54,12 +54,9 @@ describe('Work1', () => {
let dao;
let work1;
let account1;
let account2;
beforeEach(async () => {
({
dao, work1, account1, account2,
} = await loadFixture(deploy));
({ dao, work1, account1 } = await loadFixture(deploy));
await expect(dao.stakeAvailability(work1.target, 50, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0);
});
@ -84,30 +81,33 @@ describe('Work1', () => {
it('should be able to extend the duration of an availability stake before it expires', async () => {
await time.increase(STAKE_DURATION / 2);
await expect(work1.extendAvailability(0, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0);
expect(await work1.stakeCount()).to.equal(1);
await expect(dao.stakeAvailability(work1.target, 50, STAKE_DURATION)).to.emit(work1, 'AvailabilityStaked').withArgs(0);
expect(await work1.stakeCount()).to.equal(1);
});
it('should be able to extend the duration of an availability stake after it expires', async () => {
await time.increase(STAKE_DURATION * 2);
await work1.extendAvailability(0, STAKE_DURATION);
expect(await work1.stakeCount()).to.equal(1);
await dao.stakeAvailability(work1.target, 50, STAKE_DURATION);
expect(await work1.stakeCount()).to.equal(1);
});
it('should not be able to extend the duration of another worker\'s availability stake', async () => {
await time.increase(STAKE_DURATION * 2);
await expect(work1.connect(account2).extendAvailability(0, STAKE_DURATION)).to.be.revertedWith('Worker can only extend their own availability stake');
});
it('extending a stake before expiration should increase the end time by the given duration', async () => {
it('extending a stake before expiration should reset the end time to the new duration from the present', async () => {
await time.increase(STAKE_DURATION / 2);
await work1.extendAvailability(0, STAKE_DURATION * 2);
const expectedEndTime = await time.latest() + 2.5 * STAKE_DURATION;
expect(await work1.stakeCount()).to.equal(1);
await dao.stakeAvailability(work1.target, 50, STAKE_DURATION * 2);
expect(await work1.stakeCount()).to.equal(1);
const expectedEndTime = await time.latest() + 2 * STAKE_DURATION;
const stake = await work1.stakes(0);
expect(stake.endTime).to.be.within(expectedEndTime - 1, expectedEndTime);
});
it('extending a stake after expiration should restart the stake for the given duration', async () => {
await time.increase(STAKE_DURATION * 2);
await work1.extendAvailability(0, STAKE_DURATION * 2);
expect(await work1.stakeCount()).to.equal(1);
await dao.stakeAvailability(work1.target, 50, STAKE_DURATION * 2);
expect(await work1.stakeCount()).to.equal(1);
const expectedEndTime = await time.latest() + STAKE_DURATION * 2;
const stake = await work1.stakes(0);
expect(stake.endTime).to.be.within(expectedEndTime - 1, expectedEndTime);
@ -164,14 +164,16 @@ describe('Work1', () => {
await expect(requestWork()).to.be.revertedWith('No available worker stakes');
});
it('should not be able to extend a stake that has been assigned work', async () => {
it('after a stake has been assigned work, staking again should create a new stake', async () => {
const {
dao, work1, account2,
} = await loadFixture(deploy);
await dao.stakeAvailability(work1.target, 50, STAKE_DURATION);
await work1.connect(account2).requestWork('req-content-id', { value: WORK1_PRICE });
await time.increase(STAKE_DURATION * 2);
await expect(work1.extendAvailability(0, STAKE_DURATION)).to.be.revertedWith('Stake has already been assigned work');
expect(await work1.stakeCount()).to.equal(1);
await dao.stakeAvailability(work1.target, 50, STAKE_DURATION);
expect(await work1.stakeCount()).to.equal(2);
});
});

View File

@ -74,13 +74,6 @@ function AvailabilityStakes({
setReputation(reputation / BigInt(2));
}, [DAO, workContract, account, reputation, setReputation]);
const extendAvailabilityStake = useCallback(async (stakeIndex, duration) => {
await workContract.methods.extendAvailability(stakeIndex, duration).send({
from: account,
gas: 999999,
});
}, [workContract, account]);
const displayData = availabilityStakes.filter((stake) => {
if (!stake) return false;
if (!onlyShowAvailable) return true;
@ -118,7 +111,6 @@ function AvailabilityStakes({
{showAmount && <th>Amount</th>}
<th>End Time</th>
<th>Status</th>
{showActions && <th>Actions</th>}
</tr>
</thead>
<tbody>
@ -129,20 +121,6 @@ function AvailabilityStakes({
{showAmount && <td>{s.amount.toString()}</td>}
<td>{new Date(Number(s.endTime) * 1000).toLocaleString()}</td>
<td>{getAvailabilityStatus(s)}</td>
{showActions && (
<td>
{s.currentUserIsWorker() && !s.assigned && (
<>
<Button onClick={() => extendAvailabilityStake(s.id, 3600)}>
Extend 1 Hr.
</Button>
<Button onClick={() => extendAvailabilityStake(s.id, 86400)}>
Extend 24 Hr.
</Button>
</>
)}
</td>
)}
</tr>
))}
</tbody>