fetcher.events_indices
Module with repos and models for EventsIndex.
EventsIndex stores the information about fetched events.
Each EventsIndex is persisted in the events_indices table.
EventsIndex has a field data that stores the blocks
for which events (with given chain_id, contract address,
event name, and arguments for lookup) were already fetched.
Note
EventsIndex has some granularity in storing the blocks defined by the
constants.BLOCKS_PER_BIT constant. This means:
You cannot fetch events for less than
constants.BLOCKS_PER_BITblocksFetch start and end blocks should be snapped to the
constants.BLOCKS_PER_BITgrid.
Example
As an illustratory example, consider how an instance of EventsIndex
evolves when fetching the same type of events for different blocks.
Assume BLOCKS_PER_BIT == 1000.
Step1: Fetch events for blocks 11337 - 13798
First, the fetch blocks are rounded to 11000 - 14000, and events are fetched
for these blocks.
Start timestamp, in this case, should be 11000. However, there’s the rule
that start timestamp must be a multiple of 8 * constants.BLOCKS_PER_BIT
(for performance reasons). So it’s rounded down to 8000.
EventsIndex’s data is updated to 00001F401C
Timestamp (hex) |
Mask (hex) |
Timestamp (dec) |
Mask (bin) |
|---|---|---|---|
00001F40 |
1C |
8000 |
00011100 |
So the first bit of the mask is blocks 8000 - 9000, the second is 9000 - 10000, etc.
Step2: Fetch blocks 15546 - 19403
Round blocks to 15000 - 20000, update data to 00001F401DF0
Timestamp (hex) |
Mask (hex) |
Timestamp (dec) |
Mask (bin) |
|---|---|---|---|
00001F40 |
1DF0 |
8000 |
0001110111110000 |
Step3: Fetch blocks 3000 - 5000
Now the start timestamp will be updated to 0 (the multiple of 8 * constants.BLOCKS_PER_BIT,
rounded down). The new index is 000000001C1DF0
Timestamp (hex) |
Mask (hex) |
Timestamp (dec) |
Mask (bin) |
|---|---|---|---|
00000000 |
1C1DF0 |
0 |
000111000001110111110000 |
- web3cat.fetcher.events_indices.constants.BLOCKS_PER_BIT = 1000
Defines the granularity of event indices. Blocks are divided into chunks. Each chunk has the size of
BLOCKS_PER_BIT. If the bit is set, the events for the whole chunk are fetched. Partial fetches are not permitted.
- class web3cat.fetcher.events_indices.BitArray(data: Optional[bytes] = None)
Bases:
objectThis is a simple bitset tailored for
events_indicesneeds.Example
>>> bits = BitArray() >>> bits[3] False >>> bits[3] = True # 00010000 >>> bits[0] False >>> bits[3] True >>> bits.prepend_empty_bytes(2) # 000000000000000000010000 >>> bits[0] False >>> bits[3] False >>> bits[2 * 8 + 3] True >>> bits.set_range(0, 4, True) # 111100000000000000010000 >>> bits[0] True >>> bits[1] True >>> bits[2] True >>> bits[3] True >>> bits[4] False
- __getitem__(pos: int) bool
Get the bit value at
pos- Parameters:
pos – The position of the bit
- Returns:
Bit value (
FalseorTrue)
- __setitem__(pos: int, value: bool)
Set the bit value at
pos- Parameters:
pos – The position of the bit
value – Bit value (
FalseorTrue)
- prepend_empty_bytes(num_bytes: int)
Adds zero bytes (8 * num_bytes
Falsevalues) before the bitarray.- Parameters:
num_bytes – Number of bytes to add
- set_range(start: int, end: int, value: bool)
Set value for bits starting at
start(inclusive) and ending atend(non-inclusive).- Parameters:
start – start of the range (inclusive)
end – end of the range (non-inclusive)
- Exceptions:
Both values should be >= 0 and
end>=start. OtherwiseIndexErroris raised.
- class web3cat.fetcher.events_indices.EventsIndicesRepo(rpc: Optional[str] = None, cache_path: Optional[str] = None, block_grid_step: int = 1000, w3: Optional[Web3] = None, conn: Optional[Connection] = None)
Bases:
CoreReading and writing
EventsIndexto database.- find_indices(address: str, event: str, args: Optional[Dict[str, Any]] = None) Iterator[EventsIndex]
Find all indices that match
chain_id,address,event, andargs.The match for
argsis non-trivial. It tries to find all indices that could have the events for the specificargs.Example
Imagine two indices for the ERC20 Transfer event:
Stating that all
Transferevents were fetched from block 2000 to 4000Stating that
Transferfrom address “0x6b17…” events were fetched from block 3000 to 4000
Now we want to query if the
Transferevents were fetched for{"from": "0x6b17..."}for blocks 2500 to 4000. A naive implementation would return just the{"from": "0x6b17..."}index stating that blocks 2500 to 3000 are missing. However, these events were already fetched as part of allTransferevents from block 2000 to 4000. That’s why a list of indices is returned, so the check is made against the data that is really missing.- Parameters:
address – Contract address
event – Event name
args – Argument filters
- Returns:
Iterator of matched indices
- get_index(address: str, event: str, args: Optional[Dict[str, Any]] = None) web3cat.fetcher.events_indices.index.EventsIndex | None
Find an index with the exact match of
chain_id,address,event, andargs.- Parameters:
chain_id – Ethereum chain_id
address – Contract address
event – Event name
args – Argument filters
- Returns:
Found index, or
Noneif nothing is found
- save(indices: List[EventsIndex])
Save indices to database.
- Parameters:
indices – a list of indices to save
- purge()
Clear all database entries
- class web3cat.fetcher.events_indices.EventsIndex(chain_id: int, address: str, event: str, args: Dict[str, Any], data: EventsIndexData)
Bases:
objectStores data about fetched events for a specific
chain_id,address,eventandargument_filters.- Parameters:
chain_id – Ethereum chain_id
address – Contract address
event – Event name
_args – Argument filters (filter for indexed fields of event)
data – Index data storing blocks for which events were already fetched. See
EventsIndexDatafor details.
- data: EventsIndexData
Index data storing blocks for which events were already fetched.
- static from_row(row: Tuple[int, str, str, str, bytes]) EventsIndex
Deserialize from web3cat.database row
- Parameters:
row – database row
- property args: Optional[Dict[str, Any]]
Argument filters for the event.
Arguments filters are sorted by key, and each value (if
list) is sorted as well. This sorting is required to compare argument filters fast.
- static normalize_args(args: Optional[Dict[str, Any]]) Dict[str, Any]
Sort argument filters keys and each value (if
list).- Parameters:
args – Event argument filters
- to_dict() Dict[str, Any]
Convert
EventsIndexto dict
- class web3cat.fetcher.events_indices.EventsIndexData(start_block: Optional[int] = None, end_block: Optional[int] = None, mask: Optional[bytes] = None)
Bases:
objectStores blocks numbers with already fetched events.
All blocks are divided by chunks of
constants.BLOCKS_PER_BITsize. A bit set toTruemeans that events for the chunk were already fetched.As a storage optimistaion
start_blockparameter is used. It is the first block that hasTrueentries after it. Obviously, it must be a multiple ofconstants.BLOCKS_PER_BIT. However, there’s an additional restriction ofstart_blockbeing a multiple of 8 *constants.BLOCKS_PER_BIT(for performance reasons).See the example section in
fetcher.events_indicesfor more details.- Parameters:
start_block – The starting block for the index
mask – A bitset for storing index values
- set_range(start_block: int, end_block: int, value: bool)
Set blocks range from
start_block(inclusive) toend_block(non-inclusive) tovalue.- Parameters:
start_block – start of the range (inclusive)
end_block – end of the range (non-inclusive)
- Exceptions:
The
start_blockandend_blockarguments must be a multiples ofconstants.BLOCKS_PER_BIT(snap_block_to_grid()can be used for that). OtherwiseIndexErroris raised.
- snap_block_to_grid(block: int) int
Round down block to the nearest chunk start (chunks of size
constants.BLOCKS_PER_BIT)
- step() int
Returns
constants.BLOCKS_PER_BIT.
- dump() bytes
Serialize class data into binary.
The binary format is
start_block
end_block
mask
4 bytes
4 bytes
n bytes (as needed)
- static load(data: bytes) EventsIndexData
Restore class from binary. See
dump()for binary format.- Returns:
An instance of
EventsIndexData
- to_dict() Dict[str, Any]
Convert
EventsIndexDatato dict