Skip to content
This repository was archived by the owner on Aug 12, 2023. It is now read-only.

Commit 6b38c28

Browse files
authored
Introduce Uniswap V2 event extractor (#379)
* Introduce Uniswap V2 event extractor * Recursively fetch all pages of Uniswap events * Tweak config * Refactor small things
1 parent b1a7522 commit 6b38c28

File tree

11 files changed

+276
-17
lines changed

11 files changed

+276
-17
lines changed

config/default.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ module.exports = {
55
database: {
66
connectionString: process.env.CONNECTION_STRING,
77
},
8-
maxChunkSize: parseInt(process.env.MAX_CHUNK_SIZE, 10),
8+
maxChunkSize: {
9+
default: parseInt(process.env.MAX_CHUNK_SIZE, 10),
10+
uniswapV2Swap: {
11+
v3: 1800, // 30 minutes
12+
},
13+
},
914
maxPollingInterval: parseInt(process.env.MAX_POLLING_INTERVAL, 10),
1015
minConfirmations: 12,
1116
minPollingInterval: parseInt(process.env.MIN_POLLING_INTERVAL, 10),
@@ -27,6 +32,9 @@ module.exports = {
2732
transformedErc20: {
2833
v3: 10247094,
2934
},
35+
uniswapV2Swap: {
36+
v3: 1600834642, // Represents a point in time, not a block number
37+
},
3038
},
3139
web3: {
3240
endpoint: process.env.WEB3_ENDPOINT,

packages/core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"@0x-event-extractor/fill-extractor-v3": "^1.0.0",
99
"@0x-event-extractor/shared": "^1.0.0",
1010
"@0x-event-extractor/transformed-erc20-extractor": "^1.0.0",
11+
"@0x-event-extractor/uniswap-v2-swap-extractor": "^1.0.0",
1112
"@bugsnag/js": "6.5.2",
1213
"bugsnag": "^2.4.3",
1314
"delay": "^4.3.0",
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
const { web3 } = require('@0x-event-extractor/shared');
2+
3+
const getBlock = async blockNumber => {
4+
const block = await web3.getWrapper().getBlockIfExistsAsync(blockNumber);
5+
6+
return block;
7+
};
8+
9+
module.exports = getBlock;

packages/core/src/events/get-config-for-event-type.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,21 @@ const { config } = require('@0x-event-extractor/shared');
33

44
const getConfigForEventType = (eventType, protocolVersion) => {
55
const camelEventType = _.camelCase(eventType);
6+
const defaultChunkSize = config.get(`maxChunkSize.default`);
7+
68
const startBlockKey = `startBlock.${camelEventType}.v${protocolVersion}`;
79
const startBlock = config.get(startBlockKey);
8-
const maxChunkSize = config.get('maxChunkSize');
9-
const minConfirmations = config.get('minConfirmations');
10+
11+
const chunkSizeKey = `maxChunkSize.${camelEventType}.v${protocolVersion}`;
12+
const maxChunkSize = config.get(chunkSizeKey) || defaultChunkSize;
1013

1114
if (startBlock === undefined) {
1215
throw new Error(
1316
`Start block config not found for v${protocolVersion} ${eventType} events`,
1417
);
1518
}
1619

17-
return { startBlock, maxChunkSize, minConfirmations };
20+
return { startBlock, maxChunkSize };
1821
};
1922

2023
module.exports = getConfigForEventType;

packages/core/src/events/get-next-block-range.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@ const getConfigForEventType = require('./get-config-for-event-type');
44
const getLastProcessedBlock = require('./get-last-processed-block');
55

66
const getNextBlockRange = async ({
7-
currentBlock,
87
eventType,
8+
maxBlockNumber,
99
protocolVersion,
1010
}) => {
1111
const config = getConfigForEventType(eventType, protocolVersion);
12-
const { maxChunkSize, minConfirmations, startBlock } = config;
12+
const { maxChunkSize, startBlock } = config;
1313
const lastBlock = await getLastProcessedBlock(eventType, protocolVersion);
14-
const maxBlock = currentBlock - minConfirmations;
1514
const fromBlock = lastBlock === null ? startBlock : lastBlock + 1;
16-
const toBlock = _.clamp(fromBlock + maxChunkSize, 1, maxBlock);
15+
const toBlock = _.clamp(fromBlock + maxChunkSize, 1, maxBlockNumber);
1716

1817
// Notify the consumer that there are no blocks to process
1918
if (toBlock < fromBlock) {

packages/core/src/jobs/extract-events.js

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1+
const { config } = require('@0x-event-extractor/shared');
12
const fillExtractorV1 = require('@0x-event-extractor/fill-extractor-v1');
23
const fillExtractorV2 = require('@0x-event-extractor/fill-extractor-v2');
34
const fillExtractorV3 = require('@0x-event-extractor/fill-extractor-v3');
45
const transformedERC20Extractor = require('@0x-event-extractor/transformed-erc20-extractor');
6+
const uniswapV2Extractor = require('@0x-event-extractor/uniswap-v2-swap-extractor');
57

68
const { getLogger } = require('../util/logging');
79
const BlockRange = require('../model/block-range');
810
const Event = require('../model/event');
11+
const getBlock = require('../ethereum/get-block');
912
const getCurrentBlock = require('../ethereum/get-current-block');
1013
const getNextBlockRange = require('../events/get-next-block-range');
1114
const withTransaction = require('../util/with-transaction');
1215

13-
const performExtraction = async (currentBlock, extractorConfig) => {
16+
const performExtraction = async (maxBlockNumber, extractorConfig) => {
1417
const {
1518
eventType,
1619
fetchLogEntries,
@@ -21,7 +24,7 @@ const performExtraction = async (currentBlock, extractorConfig) => {
2124
// Scope all logging for the job to the specified protocol version and event type
2225
const logger = getLogger(`extract v${protocolVersion} ${eventType} events`);
2326

24-
const rangeConfig = { currentBlock, eventType, protocolVersion };
27+
const rangeConfig = { eventType, maxBlockNumber, protocolVersion };
2528
const nextBlockRange = await getNextBlockRange(rangeConfig);
2629

2730
if (nextBlockRange === null) {
@@ -93,24 +96,37 @@ const performExtraction = async (currentBlock, extractorConfig) => {
9396
}
9497
};
9598

99+
const determineMaxQueryableBlock = currentBlock => {
100+
const minConfirmations = config.get('minConfirmations');
101+
const maxBlock = currentBlock - minConfirmations;
102+
103+
return maxBlock;
104+
};
105+
96106
const extractEvents = async () => {
97107
const logger = getLogger('event extractor');
98108

99109
logger.info('beginning event extraction');
100110
logger.info('fetching current block');
101111

102-
const currentBlock = await getCurrentBlock();
112+
const currentBlockNumber = await getCurrentBlock();
113+
const maxBlockNumber = determineMaxQueryableBlock(currentBlockNumber);
103114

104-
logger.info(`current block is ${currentBlock}`);
115+
logger.info(`current block is ${currentBlockNumber}`);
116+
logger.info(`max block is ${maxBlockNumber}`);
105117

106118
/**
107119
* Extractors are run sequentially to help avoid issues with rate
108120
* limiting in the Ethereum RPC provider.
109121
*/
110-
await performExtraction(currentBlock, fillExtractorV1);
111-
await performExtraction(currentBlock, fillExtractorV2);
112-
await performExtraction(currentBlock, fillExtractorV3);
113-
await performExtraction(currentBlock, transformedERC20Extractor);
122+
await performExtraction(maxBlockNumber, fillExtractorV1);
123+
await performExtraction(maxBlockNumber, fillExtractorV2);
124+
await performExtraction(maxBlockNumber, fillExtractorV3);
125+
await performExtraction(maxBlockNumber, transformedERC20Extractor);
126+
127+
const maxBlock = await getBlock(maxBlockNumber);
128+
129+
await performExtraction(maxBlock.timestamp, uniswapV2Extractor);
114130

115131
logger.info('finished event extraction');
116132
};
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "@0x-event-extractor/uniswap-v2-swap-extractor",
3+
"description": "Extractor for UniswapV2 Swap events",
4+
"version": "1.0.0",
5+
"main": "src/index.js",
6+
"dependencies": {
7+
"@0x-event-extractor/shared": "^1.0.0",
8+
"@0x/utils": "5.5.1",
9+
"graphql": "15.3.0",
10+
"graphql-request": "3.1.0"
11+
}
12+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
const { request, gql } = require('graphql-request');
2+
const { BigNumber } = require('@0x/utils');
3+
4+
const EXCHANGE_PROXY_ADDRESS = '0xdef1c0ded9bec7f1a1670819833240f027b25eff';
5+
6+
const convertAmount = (amount, decimals) => {
7+
return new BigNumber(amount).times(
8+
new BigNumber(10).pow(new BigNumber(decimals)),
9+
);
10+
};
11+
12+
const fetchEvents = async (fromBlock, toBlock, skip = 0) => {
13+
const pageSize = 100;
14+
const query = gql`{
15+
swaps (
16+
first: ${pageSize},
17+
skip: ${skip},
18+
where: {
19+
sender: "${EXCHANGE_PROXY_ADDRESS}",
20+
timestamp_gte: ${fromBlock},
21+
timestamp_lte: ${toBlock},
22+
}) {
23+
id
24+
transaction {
25+
id
26+
blockNumber
27+
timestamp
28+
}
29+
pair {
30+
id
31+
token0 {
32+
id
33+
decimals
34+
}
35+
token1 {
36+
id
37+
decimals
38+
}
39+
}
40+
amount0In
41+
amount1In
42+
amount0Out
43+
amount1Out
44+
to
45+
logIndex
46+
}
47+
}`;
48+
49+
const response = await request(
50+
'https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2',
51+
query,
52+
);
53+
54+
const events = response.swaps.map(swap => {
55+
const maker = swap.pair.id; // The Uniswap pool for traded pair
56+
const taker = swap.to;
57+
const transactionHash = swap.transaction.id;
58+
const { logIndex } = swap;
59+
const { blockNumber } = swap.transaction;
60+
61+
const amount0In = convertAmount(swap.amount0In, swap.pair.token0.decimals);
62+
const amount1In = convertAmount(swap.amount1In, swap.pair.token1.decimals);
63+
64+
const amount0Out = convertAmount(
65+
swap.amount0Out,
66+
swap.pair.token0.decimals,
67+
);
68+
69+
const amount1Out = convertAmount(
70+
swap.amount1Out,
71+
swap.pair.token1.decimals,
72+
);
73+
74+
const fromToken = amount0In.gt(amount1In)
75+
? swap.pair.token0
76+
: swap.pair.token1;
77+
78+
const toToken = amount0Out.gt(amount1Out)
79+
? swap.pair.token0
80+
: swap.pair.token1;
81+
82+
const fromTokenAmount = amount0In.gt(amount1In) ? amount0In : amount1In;
83+
const toTokenAmount = amount0Out.gt(amount1Out) ? amount0Out : amount1Out;
84+
85+
return {
86+
blockNumber,
87+
data: {
88+
maker,
89+
makerAmount: fromTokenAmount,
90+
makerToken: fromToken.id,
91+
taker,
92+
takerAmount: toTokenAmount,
93+
takerToken: toToken.id,
94+
},
95+
logIndex,
96+
transactionHash,
97+
};
98+
});
99+
100+
if (events.length === pageSize) {
101+
const nextEvents = await fetchEvents(fromBlock, toBlock, skip + pageSize);
102+
103+
return events.concat(nextEvents);
104+
}
105+
106+
return events;
107+
};
108+
109+
module.exports = fetchEvents;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
const getEventData = logEntry => {
2+
return logEntry.data;
3+
};
4+
5+
module.exports = getEventData;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
const fetchLogEntries = require('./fetch-log-entries');
2+
const getEventData = require('./get-event-data');
3+
4+
module.exports = {
5+
configure: () => {},
6+
eventType: 'UniswapV2Swap',
7+
fetchLogEntries,
8+
getEventData,
9+
protocolVersion: 3,
10+
};

0 commit comments

Comments
 (0)