Skip to content

Commit dfd480d

Browse files
authored
Merge pull request #10033 from SomberNight/202507_interface_fast_forward
interface: parallel header-chunks download
2 parents a9b47f7 + 3ceb59d commit dfd480d

File tree

4 files changed

+60
-33
lines changed

4 files changed

+60
-33
lines changed

electrum/interface.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -832,32 +832,72 @@ async def get_block_headers(
832832
headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
833833
return headers
834834

835-
async def request_chunk(
835+
async def request_chunk_below_max_checkpoint(
836836
self,
837-
height: int,
838837
*,
839-
tip: Optional[int] = None,
840-
can_return_early: bool = False,
841-
) -> Optional[Tuple[bool, int]]:
838+
height: int,
839+
) -> None:
842840
if not is_non_negative_integer(height):
843841
raise Exception(f"{repr(height)} is not a block height")
842+
assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
844843
index = height // CHUNK_SIZE
845-
if can_return_early and index in self._requested_chunks:
844+
if index in self._requested_chunks:
846845
return None
847-
#self.logger.debug(f"requesting chunk from height {height}")
848-
size = CHUNK_SIZE
849-
if tip is not None:
850-
size = min(size, tip - index * CHUNK_SIZE + 1)
851-
size = max(size, 0)
846+
self.logger.debug(f"requesting chunk from height {height}")
852847
try:
853848
self._requested_chunks.add(index)
854-
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size)
849+
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
855850
finally:
856851
self._requested_chunks.discard(index)
857852
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
858853
if not conn:
859-
return conn, 0
860-
return conn, len(headers)
854+
raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
855+
return None
856+
857+
async def _fast_forward_chain(
858+
self,
859+
*,
860+
height: int, # usually local chain tip + 1
861+
tip: int, # server tip. we should not request past this.
862+
) -> int:
863+
"""Request some headers starting at `height` to grow the blockchain of this interface.
864+
Returns number of headers we managed to connect, starting at `height`.
865+
"""
866+
if not is_non_negative_integer(height):
867+
raise Exception(f"{repr(height)} is not a block height")
868+
if not is_non_negative_integer(tip):
869+
raise Exception(f"{repr(tip)} is not a block height")
870+
if not (height > constants.net.max_checkpoint()
871+
or height == 0 == constants.net.max_checkpoint()):
872+
raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
873+
assert height <= tip, f"{height=} must be <= {tip=}"
874+
# Request a few chunks of headers concurrently.
875+
# tradeoffs:
876+
# - more chunks: higher memory requirements
877+
# - more chunks: higher concurrency => syncing needs fewer network round-trips
878+
# - if a chunk does not connect, bandwidth for all later chunks is wasted
879+
async with OldTaskGroup() as group:
880+
tasks = [] # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
881+
index0 = height // CHUNK_SIZE
882+
for chunk_cnt in range(10):
883+
index = index0 + chunk_cnt
884+
start_height = index * CHUNK_SIZE
885+
if start_height > tip:
886+
break
887+
end_height = min(start_height + CHUNK_SIZE - 1, tip)
888+
size = end_height - start_height + 1
889+
tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
890+
# try to connect chunks
891+
num_headers = 0
892+
for index, task in tasks:
893+
headers = task.result()
894+
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
895+
if not conn:
896+
break
897+
num_headers += len(headers)
898+
# We started at a chunk boundary, instead of requested `height`. Need to correct for that.
899+
offset = height - index0 * CHUNK_SIZE
900+
return max(0, num_headers - offset)
861901

862902
def is_main_server(self) -> bool:
863903
return (self.network.interface == self or
@@ -1021,17 +1061,17 @@ async def sync_until(
10211061
# We are far from the tip.
10221062
# It is more efficient to process headers in large batches (CPU/disk_usage/logging).
10231063
# (but this wastes a little bandwidth, if we are not on a chunk boundary)
1024-
# TODO we should request (some) chunks concurrently. would help when we are many chunks behind
1025-
could_connect, num_headers = await self.request_chunk(height, tip=next_height)
1026-
if not could_connect:
1064+
num_headers = await self._fast_forward_chain(
1065+
height=height, tip=next_height)
1066+
if num_headers == 0:
10271067
if height <= constants.net.max_checkpoint():
10281068
raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
10291069
last, height = await self.step(height)
10301070
continue
10311071
# report progress to gui/etc
10321072
util.trigger_callback('blockchain_updated')
10331073
util.trigger_callback('network_updated')
1034-
height = (height // CHUNK_SIZE * CHUNK_SIZE) + num_headers
1074+
height += num_headers
10351075
assert height <= next_height+1, (height, self.tip)
10361076
last = ChainResolutionMode.CATCHUP
10371077
else:

electrum/lnverifier.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ async def _verify_some_channels(self):
102102
header = blockchain.read_header(block_height)
103103
if header is None:
104104
if block_height <= constants.net.max_checkpoint():
105-
await self.taskgroup.spawn(self.interface.request_chunk(block_height, can_return_early=True))
105+
await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=block_height))
106106
continue
107107
self.started_verifying_channel.add(short_channel_id)
108108
await self.taskgroup.spawn(self.verify_channel(block_height, short_channel_id))

electrum/network.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,19 +1311,6 @@ def sanitize_tx_broadcast_response(server_msg) -> str:
13111311
# otherwise:
13121312
return _("Unknown error")
13131313

1314-
@best_effort_reliable
1315-
@catch_server_exceptions
1316-
async def request_chunk(
1317-
self,
1318-
height: int,
1319-
*,
1320-
tip: Optional[int] = None,
1321-
can_return_early: bool = False,
1322-
) -> Optional[Tuple[bool, int]]:
1323-
if self.interface is None: # handled by best_effort_reliable
1324-
raise RequestTimedOut()
1325-
return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
1326-
13271314
@best_effort_reliable
13281315
@catch_server_exceptions
13291316
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:

electrum/verifier.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ async def _request_proofs(self):
8888
if header is None:
8989
if tx_height <= constants.net.max_checkpoint():
9090
# FIXME these requests are not counted (self._requests_sent += 1)
91-
await self.taskgroup.spawn(self.interface.request_chunk(tx_height, can_return_early=True))
91+
await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=tx_height))
9292
continue
9393
# request now
9494
self.logger.info(f'requested merkle {tx_hash}')

0 commit comments

Comments
 (0)