diff --git a/consensoor/engine/client.py b/consensoor/engine/client.py index ea2b5f9..f048701 100644 --- a/consensoor/engine/client.py +++ b/consensoor/engine/client.py @@ -14,10 +14,13 @@ GetPayloadResponse, EngineAPIError, ) +from . import ssz_types as ssz from .. import metrics logger = logging.getLogger(__name__) +SSZ_CONTENT_TYPE = "application/octet-stream" + def get_fork_for_timestamp(timestamp: int) -> str: """Determine which fork is active for a given timestamp.""" @@ -50,11 +53,15 @@ class EngineAPIClient: """Client for Ethereum Engine API.""" def __init__(self, url: str, jwt_secret: bytes): - self.url = url + self.url = url.rstrip("/") self.jwt_secret = jwt_secret self._session: Optional[aiohttp.ClientSession] = None self._request_id = 0 self._genesis_time: Optional[int] = None + # SSZ REST endpoints (e.g. "POST /engine/v4/payloads") the EL has + # advertised via engine_exchangeCapabilities. Populated lazily on + # the first capabilities exchange; empty means JSON-RPC for all calls. + self._ssz_endpoints: set[str] = set() def set_genesis_time(self, genesis_time: int) -> None: """Set the genesis time for fork calculations.""" @@ -72,6 +79,52 @@ def _create_jwt_token(self) -> str: payload = {"iat": now} return jwt.encode(payload, self.jwt_secret, algorithm="HS256") + def _ssz_supported(self, endpoint: str) -> bool: + """Whether the EL has advertised the given SSZ REST endpoint.""" + return endpoint in self._ssz_endpoints + + async def _ssz_request( + self, + http_method: str, + path: str, + body: Optional[bytes], + metric_label: str, + ) -> Optional[bytes]: + """Issue an SSZ-over-REST Engine API request. + + Returns the raw response body bytes on 200, or ``None`` on 204. + Raises ``EngineAPIError`` for non-success status codes. + """ + session = await self._ensure_session() + headers = { + "Authorization": f"Bearer {self._create_jwt_token()}", + "Accept": SSZ_CONTENT_TYPE, + } + if body is not None: + headers["Content-Type"] = SSZ_CONTENT_TYPE + + url = f"{self.url}{path}" + start = time.time() + error_type: Optional[str] = None + try: + async with session.request( + http_method, url, data=body, headers=headers + ) as response: + raw = await response.read() + if response.status == 200: + return raw + if response.status == 204: + return None + error_type = str(response.status) + text = raw.decode("utf-8", errors="replace") + raise EngineAPIError(response.status, text) + except aiohttp.ClientError as e: + error_type = "connection_error" + logger.error(f"Engine SSZ {http_method} {path} connection error: {e}") + raise + finally: + metrics.record_engine_api_call(metric_label, time.time() - start, error_type) + async def _call(self, method: str, params: list) -> Any: """Make a JSON-RPC call to the Engine API.""" session = await self._ensure_session() @@ -158,25 +211,12 @@ async def new_payload_v5( parent_beacon_block_root: bytes, execution_requests: list, ) -> PayloadStatus: - """Send a new payload to the execution layer (Engine API v5 - Osaka/Fulu).""" + """Send a new payload to the execution layer (Engine API v5 - Amsterdam).""" payload_dict = self._payload_to_dict(execution_payload) - - params = [ - payload_dict, - ["0x" + h.hex() for h in versioned_hashes], - "0x" + parent_beacon_block_root.hex(), - execution_requests, - ] - - logger.debug( - f"newPayloadV5: blockHash={payload_dict.get('blockHash')}, " - f"execution_requests={execution_requests}, " - f"parent_beacon_root={parent_beacon_block_root.hex()[:16]}" + return await self._new_payload_v5_via_dict( + payload_dict, versioned_hashes, parent_beacon_block_root, execution_requests ) - result = await self._call("engine_newPayloadV5", params) - return PayloadStatus.from_dict(result) - async def new_payload_v5_raw( self, payload_dict: dict, @@ -186,34 +226,39 @@ async def new_payload_v5_raw( ) -> PayloadStatus: """Send a new payload to the execution layer using raw dict (no SSZ round-trip). - This is used for GLOAS/ePBS where the payload comes directly from getPayloadV5 + This is used for GLOAS/ePBS where the payload comes directly from getPayloadV6 and should be passed through without modification to avoid blockhash mismatch. """ - params = [ - payload_dict, - ["0x" + h.hex() for h in versioned_hashes], - "0x" + parent_beacon_block_root.hex(), - execution_requests, - ] - logger.info( f"newPayloadV5_raw: blockHash={payload_dict.get('blockHash')}, " f"execution_requests={execution_requests}, " f"parent_beacon_root={parent_beacon_block_root.hex()[:16]}" ) + return await self._new_payload_v5_via_dict( + payload_dict, versioned_hashes, parent_beacon_block_root, execution_requests + ) - result = await self._call("engine_newPayloadV5", params) - return PayloadStatus.from_dict(result) - - async def new_payload_v4( + async def _new_payload_v5_via_dict( self, - execution_payload, + payload_dict: dict, versioned_hashes: list[bytes], parent_beacon_block_root: bytes, execution_requests: list, ) -> PayloadStatus: - """Send a new payload to the execution layer (Engine API v4).""" - payload_dict = self._payload_to_dict(execution_payload) + if self._ssz_supported("POST /engine/v5/payloads"): + req = ssz.NewPayloadV5Request( + execution_payload=self._payload_dict_to_v4_ssz(payload_dict), + expected_blob_versioned_hashes=[ssz.Bytes32(h) for h in versioned_hashes], + parent_beacon_block_root=ssz.Bytes32(parent_beacon_block_root), + execution_requests=[ssz.TransactionBytes(self._hex_bytes(r)) for r in execution_requests], + ) + raw = await self._ssz_request( + "POST", "/engine/v5/payloads", req.encode_bytes(), "engine_newPayloadV5_ssz" + ) + if raw is None: + return PayloadStatus.from_dict({"status": "SYNCING", "latestValidHash": None}) + decoded = ssz.PayloadStatusV1.decode_bytes(raw) + return PayloadStatus.from_dict(self._payload_status_ssz_to_dict(decoded)) params = [ payload_dict, @@ -221,16 +266,47 @@ async def new_payload_v4( "0x" + parent_beacon_block_root.hex(), execution_requests, ] + result = await self._call("engine_newPayloadV5", params) + return PayloadStatus.from_dict(result) + async def new_payload_v4( + self, + execution_payload, + versioned_hashes: list[bytes], + parent_beacon_block_root: bytes, + execution_requests: list, + ) -> PayloadStatus: + """Send a new payload to the execution layer (Engine API v4 - Prague/Electra/Fulu).""" + payload_dict = self._payload_to_dict(execution_payload) logger.info( f"newPayloadV4: blockHash={payload_dict.get('blockHash')}, " f"stateRoot={payload_dict.get('stateRoot')}, " f"timestamp={payload_dict.get('timestamp')}, " f"execution_requests={execution_requests}, " - f"parent_beacon_root={parent_beacon_block_root.hex()[:16]}, " - f"full_params_len={len(params)}" + f"parent_beacon_root={parent_beacon_block_root.hex()[:16]}" ) + if self._ssz_supported("POST /engine/v4/payloads"): + req = ssz.NewPayloadV4Request( + execution_payload=self._payload_dict_to_v3_ssz(payload_dict), + expected_blob_versioned_hashes=[ssz.Bytes32(h) for h in versioned_hashes], + parent_beacon_block_root=ssz.Bytes32(parent_beacon_block_root), + execution_requests=[ssz.TransactionBytes(self._hex_bytes(r)) for r in execution_requests], + ) + raw = await self._ssz_request( + "POST", "/engine/v4/payloads", req.encode_bytes(), "engine_newPayloadV4_ssz" + ) + if raw is None: + return PayloadStatus.from_dict({"status": "SYNCING", "latestValidHash": None}) + decoded = ssz.PayloadStatusV1.decode_bytes(raw) + return PayloadStatus.from_dict(self._payload_status_ssz_to_dict(decoded)) + + params = [ + payload_dict, + ["0x" + h.hex() for h in versioned_hashes], + "0x" + parent_beacon_block_root.hex(), + execution_requests, + ] result = await self._call("engine_newPayloadV4", params) return PayloadStatus.from_dict(result) @@ -243,11 +319,27 @@ async def forkchoice_updated_v3( attrs = None if payload_attributes: attrs = {k: v for k, v in payload_attributes.items() if k != "slotNumber"} + + if self._ssz_supported("POST /engine/v3/forkchoice"): + req = ssz.ForkchoiceUpdatedV3Request( + forkchoice_state=self._fc_state_to_ssz(forkchoice_state), + payload_attributes=[self._attrs_v3_dict_to_ssz(attrs)] if attrs else [], + ) + raw = await self._ssz_request( + "POST", "/engine/v3/forkchoice", req.encode_bytes(), "engine_forkchoiceUpdatedV3_ssz" + ) + if raw is None: + return ForkchoiceUpdateResponse.from_dict({ + "payloadStatus": {"status": "SYNCING", "latestValidHash": None}, + "payloadId": None, + }) + decoded = ssz.ForkchoiceUpdatedResponseV1.decode_bytes(raw) + return ForkchoiceUpdateResponse.from_dict(self._fcu_response_ssz_to_dict(decoded)) + params = [ forkchoice_state.to_dict(), attrs, ] - result = await self._call("engine_forkchoiceUpdatedV3", params) return ForkchoiceUpdateResponse.from_dict(result) @@ -264,11 +356,26 @@ async def forkchoice_updated_v4( `slotNumber` based on a misread; that produced a `PayloadAttributesV3` shape and geth replied -38003 "Invalid payload attributes".) """ + if self._ssz_supported("POST /engine/v4/forkchoice"): + req = ssz.ForkchoiceUpdatedV4Request( + forkchoice_state=self._fc_state_to_ssz(forkchoice_state), + payload_attributes=[self._attrs_v4_dict_to_ssz(payload_attributes)] if payload_attributes else [], + ) + raw = await self._ssz_request( + "POST", "/engine/v4/forkchoice", req.encode_bytes(), "engine_forkchoiceUpdatedV4_ssz" + ) + if raw is None: + return ForkchoiceUpdateResponse.from_dict({ + "payloadStatus": {"status": "SYNCING", "latestValidHash": None}, + "payloadId": None, + }) + decoded = ssz.ForkchoiceUpdatedResponseV1.decode_bytes(raw) + return ForkchoiceUpdateResponse.from_dict(self._fcu_response_ssz_to_dict(decoded)) + params = [ forkchoice_state.to_dict(), payload_attributes, ] - result = await self._call("engine_forkchoiceUpdatedV4", params) return ForkchoiceUpdateResponse.from_dict(result) @@ -337,6 +444,21 @@ async def forkchoice_updated( async def get_payload_v6(self, payload_id: bytes) -> GetPayloadResponse: """Get an execution payload by ID (Amsterdam/Gloas).""" + if self._ssz_supported("GET /engine/v6/payloads/{payload_id}"): + raw = await self._ssz_request( + "GET", f"/engine/v6/payloads/0x{payload_id.hex()}", None, "engine_getPayloadV6_ssz" + ) + if raw is None: + raise EngineAPIError(204, "EL returned 204 No Content for getPayloadV6") + decoded = ssz.GetPayloadResponseV6.decode_bytes(raw) + return GetPayloadResponse.from_dict({ + "executionPayload": self._v4_payload_ssz_to_dict(decoded.execution_payload), + "blockValue": hex(int(decoded.block_value)), + "blobsBundle": self._blobs_bundle_v2_to_dict(decoded.blobs_bundle), + "shouldOverrideBuilder": bool(decoded.should_override_builder), + "executionRequests": ["0x" + bytes(r).hex() for r in decoded.execution_requests], + }) + result = await self._call("engine_getPayloadV6", ["0x" + payload_id.hex()]) exec_payload = result.get('executionPayload', {}) exec_requests = result.get('executionRequests') @@ -350,6 +472,21 @@ async def get_payload_v6(self, payload_id: bytes) -> GetPayloadResponse: async def get_payload_v5(self, payload_id: bytes) -> GetPayloadResponse: """Get an execution payload by ID (Osaka/Fulu).""" + if self._ssz_supported("GET /engine/v5/payloads/{payload_id}"): + raw = await self._ssz_request( + "GET", f"/engine/v5/payloads/0x{payload_id.hex()}", None, "engine_getPayloadV5_ssz" + ) + if raw is None: + raise EngineAPIError(204, "EL returned 204 No Content for getPayloadV5") + decoded = ssz.GetPayloadResponseV5.decode_bytes(raw) + return GetPayloadResponse.from_dict({ + "executionPayload": self._v3_payload_ssz_to_dict(decoded.execution_payload), + "blockValue": hex(int(decoded.block_value)), + "blobsBundle": self._blobs_bundle_v2_to_dict(decoded.blobs_bundle), + "shouldOverrideBuilder": bool(decoded.should_override_builder), + "executionRequests": ["0x" + bytes(r).hex() for r in decoded.execution_requests], + }) + result = await self._call("engine_getPayloadV5", ["0x" + payload_id.hex()]) exec_payload = result.get('executionPayload', {}) exec_requests = result.get('executionRequests') @@ -363,6 +500,21 @@ async def get_payload_v5(self, payload_id: bytes) -> GetPayloadResponse: async def get_payload_v4(self, payload_id: bytes) -> GetPayloadResponse: """Get an execution payload by ID (Electra/Prague).""" + if self._ssz_supported("GET /engine/v4/payloads/{payload_id}"): + raw = await self._ssz_request( + "GET", f"/engine/v4/payloads/0x{payload_id.hex()}", None, "engine_getPayloadV4_ssz" + ) + if raw is None: + raise EngineAPIError(204, "EL returned 204 No Content for getPayloadV4") + decoded = ssz.GetPayloadResponseV4.decode_bytes(raw) + return GetPayloadResponse.from_dict({ + "executionPayload": self._v3_payload_ssz_to_dict(decoded.execution_payload), + "blockValue": hex(int(decoded.block_value)), + "blobsBundle": self._blobs_bundle_v1_to_dict(decoded.blobs_bundle), + "shouldOverrideBuilder": bool(decoded.should_override_builder), + "executionRequests": ["0x" + bytes(r).hex() for r in decoded.execution_requests], + }) + result = await self._call("engine_getPayloadV4", ["0x" + payload_id.hex()]) logger.debug( f"getPayloadV4 raw response: executionRequests={result.get('executionRequests')}, " @@ -372,6 +524,20 @@ async def get_payload_v4(self, payload_id: bytes) -> GetPayloadResponse: async def get_payload_v3(self, payload_id: bytes) -> GetPayloadResponse: """Get an execution payload by ID (Deneb/Cancun).""" + if self._ssz_supported("GET /engine/v3/payloads/{payload_id}"): + raw = await self._ssz_request( + "GET", f"/engine/v3/payloads/0x{payload_id.hex()}", None, "engine_getPayloadV3_ssz" + ) + if raw is None: + raise EngineAPIError(204, "EL returned 204 No Content for getPayloadV3") + decoded = ssz.GetPayloadResponseV3.decode_bytes(raw) + return GetPayloadResponse.from_dict({ + "executionPayload": self._v3_payload_ssz_to_dict(decoded.execution_payload), + "blockValue": hex(int(decoded.block_value)), + "blobsBundle": self._blobs_bundle_v1_to_dict(decoded.blobs_bundle), + "shouldOverrideBuilder": bool(decoded.should_override_builder), + }) + result = await self._call("engine_getPayloadV3", ["0x" + payload_id.hex()]) return GetPayloadResponse.from_dict(result) @@ -415,12 +581,25 @@ async def new_payload_v3( """Send a new payload to the execution layer (Engine API v3 - Deneb).""" payload_dict = self._payload_to_dict(execution_payload) + if self._ssz_supported("POST /engine/v3/payloads"): + req = ssz.NewPayloadV3Request( + execution_payload=self._payload_dict_to_v3_ssz(payload_dict), + expected_blob_versioned_hashes=[ssz.Bytes32(h) for h in versioned_hashes], + parent_beacon_block_root=ssz.Bytes32(parent_beacon_block_root), + ) + raw = await self._ssz_request( + "POST", "/engine/v3/payloads", req.encode_bytes(), "engine_newPayloadV3_ssz" + ) + if raw is None: + return PayloadStatus.from_dict({"status": "SYNCING", "latestValidHash": None}) + decoded = ssz.PayloadStatusV1.decode_bytes(raw) + return PayloadStatus.from_dict(self._payload_status_ssz_to_dict(decoded)) + params = [ payload_dict, ["0x" + h.hex() for h in versioned_hashes], "0x" + parent_beacon_block_root.hex(), ] - result = await self._call("engine_newPayloadV3", params) return PayloadStatus.from_dict(result) @@ -475,7 +654,13 @@ async def new_payload( return await self.new_payload_v1(execution_payload) async def exchange_capabilities(self) -> list[str]: - """Exchange capabilities with the execution layer.""" + """Exchange capabilities with the execution layer. + + Advertises both JSON-RPC method names and the SSZ REST endpoints from + execution-apis PR #764. The intersection of advertised SSZ endpoints + with what the EL returns determines which calls use binary transport; + all others fall back to JSON-RPC. + """ capabilities = [ "engine_newPayloadV5", "engine_newPayloadV4", @@ -492,8 +677,19 @@ async def exchange_capabilities(self) -> list[str]: "engine_getPayloadV3", "engine_getPayloadV2", "engine_getPayloadV1", - ] + ] + ssz.SSZ_CAPABILITIES result = await self._call("engine_exchangeCapabilities", [capabilities]) + + el_caps = result if isinstance(result, list) else [] + offered = set(ssz.SSZ_CAPABILITIES) + self._ssz_endpoints = {c for c in el_caps if c in offered} + if self._ssz_endpoints: + logger.info( + f"Engine SSZ transport negotiated: {len(self._ssz_endpoints)} endpoints — " + f"{sorted(self._ssz_endpoints)}" + ) + else: + logger.info("Engine SSZ transport not advertised by EL; using JSON-RPC for all calls") return result async def get_client_version(self) -> Optional[dict]: @@ -573,3 +769,225 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() + + # --- SSZ helpers --- + + @staticmethod + def _hex_bytes(s: Optional[str]) -> bytes: + if not s: + return b"" + return bytes.fromhex(s[2:] if s.startswith("0x") else s) + + @staticmethod + def _hex_int(s: Any) -> int: + if s is None: + return 0 + if isinstance(s, int): + return s + return int(s, 16) if s.startswith("0x") else int(s) + + @classmethod + def _withdrawal_dict_to_ssz(cls, w: dict) -> Any: + from ..spec.types.capella import Withdrawal + return Withdrawal( + index=cls._hex_int(w["index"]), + validator_index=cls._hex_int(w["validatorIndex"]), + address=ssz.Bytes20(cls._hex_bytes(w["address"])), + amount=cls._hex_int(w["amount"]), + ) + + @classmethod + def _payload_dict_to_v3_ssz(cls, p: dict) -> "ssz.ExecutionPayloadV3": + return ssz.ExecutionPayloadV3( + parent_hash=ssz.Hash32(cls._hex_bytes(p["parentHash"])), + fee_recipient=ssz.ExecutionAddress(cls._hex_bytes(p["feeRecipient"])), + state_root=ssz.Bytes32(cls._hex_bytes(p["stateRoot"])), + receipts_root=ssz.Bytes32(cls._hex_bytes(p["receiptsRoot"])), + logs_bloom=ssz.LogsBloom(cls._hex_bytes(p["logsBloom"])), + prev_randao=ssz.Bytes32(cls._hex_bytes(p["prevRandao"])), + block_number=cls._hex_int(p["blockNumber"]), + gas_limit=cls._hex_int(p["gasLimit"]), + gas_used=cls._hex_int(p["gasUsed"]), + timestamp=cls._hex_int(p["timestamp"]), + extra_data=ssz.ExtraData(cls._hex_bytes(p["extraData"])), + base_fee_per_gas=cls._hex_int(p["baseFeePerGas"]), + block_hash=ssz.Hash32(cls._hex_bytes(p["blockHash"])), + transactions=[ssz.TransactionBytes(cls._hex_bytes(tx)) for tx in p["transactions"]], + withdrawals=[cls._withdrawal_dict_to_ssz(w) for w in p.get("withdrawals", [])], + blob_gas_used=cls._hex_int(p.get("blobGasUsed", "0x0")), + excess_blob_gas=cls._hex_int(p.get("excessBlobGas", "0x0")), + ) + + @classmethod + def _payload_dict_to_v4_ssz(cls, p: dict) -> "ssz.ExecutionPayloadV4": + return ssz.ExecutionPayloadV4( + parent_hash=ssz.Hash32(cls._hex_bytes(p["parentHash"])), + fee_recipient=ssz.ExecutionAddress(cls._hex_bytes(p["feeRecipient"])), + state_root=ssz.Bytes32(cls._hex_bytes(p["stateRoot"])), + receipts_root=ssz.Bytes32(cls._hex_bytes(p["receiptsRoot"])), + logs_bloom=ssz.LogsBloom(cls._hex_bytes(p["logsBloom"])), + prev_randao=ssz.Bytes32(cls._hex_bytes(p["prevRandao"])), + block_number=cls._hex_int(p["blockNumber"]), + gas_limit=cls._hex_int(p["gasLimit"]), + gas_used=cls._hex_int(p["gasUsed"]), + timestamp=cls._hex_int(p["timestamp"]), + extra_data=ssz.ExtraData(cls._hex_bytes(p["extraData"])), + base_fee_per_gas=cls._hex_int(p["baseFeePerGas"]), + block_hash=ssz.Hash32(cls._hex_bytes(p["blockHash"])), + transactions=[ssz.TransactionBytes(cls._hex_bytes(tx)) for tx in p["transactions"]], + withdrawals=[cls._withdrawal_dict_to_ssz(w) for w in p.get("withdrawals", [])], + blob_gas_used=cls._hex_int(p.get("blobGasUsed", "0x0")), + excess_blob_gas=cls._hex_int(p.get("excessBlobGas", "0x0")), + block_access_list=ssz.TransactionBytes(cls._hex_bytes(p.get("blockAccessList", "0x"))), + slot_number=cls._hex_int(p.get("slotNumber", "0x0")), + ) + + @classmethod + def _v3_payload_ssz_to_dict(cls, ep: "ssz.ExecutionPayloadV3") -> dict: + return { + "parentHash": "0x" + bytes(ep.parent_hash).hex(), + "feeRecipient": "0x" + bytes(ep.fee_recipient).hex(), + "stateRoot": "0x" + bytes(ep.state_root).hex(), + "receiptsRoot": "0x" + bytes(ep.receipts_root).hex(), + "logsBloom": "0x" + bytes(ep.logs_bloom).hex(), + "prevRandao": "0x" + bytes(ep.prev_randao).hex(), + "blockNumber": hex(int(ep.block_number)), + "gasLimit": hex(int(ep.gas_limit)), + "gasUsed": hex(int(ep.gas_used)), + "timestamp": hex(int(ep.timestamp)), + "extraData": "0x" + bytes(ep.extra_data).hex(), + "baseFeePerGas": hex(int(ep.base_fee_per_gas)), + "blockHash": "0x" + bytes(ep.block_hash).hex(), + "transactions": ["0x" + bytes(tx).hex() for tx in ep.transactions], + "withdrawals": [ + { + "index": hex(int(w.index)), + "validatorIndex": hex(int(w.validator_index)), + "address": "0x" + bytes(w.address).hex(), + "amount": hex(int(w.amount)), + } + for w in ep.withdrawals + ], + "blobGasUsed": hex(int(ep.blob_gas_used)), + "excessBlobGas": hex(int(ep.excess_blob_gas)), + } + + @classmethod + def _v4_payload_ssz_to_dict(cls, ep: "ssz.ExecutionPayloadV4") -> dict: + d = { + "parentHash": "0x" + bytes(ep.parent_hash).hex(), + "feeRecipient": "0x" + bytes(ep.fee_recipient).hex(), + "stateRoot": "0x" + bytes(ep.state_root).hex(), + "receiptsRoot": "0x" + bytes(ep.receipts_root).hex(), + "logsBloom": "0x" + bytes(ep.logs_bloom).hex(), + "prevRandao": "0x" + bytes(ep.prev_randao).hex(), + "blockNumber": hex(int(ep.block_number)), + "gasLimit": hex(int(ep.gas_limit)), + "gasUsed": hex(int(ep.gas_used)), + "timestamp": hex(int(ep.timestamp)), + "extraData": "0x" + bytes(ep.extra_data).hex(), + "baseFeePerGas": hex(int(ep.base_fee_per_gas)), + "blockHash": "0x" + bytes(ep.block_hash).hex(), + "transactions": ["0x" + bytes(tx).hex() for tx in ep.transactions], + "withdrawals": [ + { + "index": hex(int(w.index)), + "validatorIndex": hex(int(w.validator_index)), + "address": "0x" + bytes(w.address).hex(), + "amount": hex(int(w.amount)), + } + for w in ep.withdrawals + ], + "blobGasUsed": hex(int(ep.blob_gas_used)), + "excessBlobGas": hex(int(ep.excess_blob_gas)), + "blockAccessList": "0x" + bytes(ep.block_access_list).hex(), + "slotNumber": hex(int(ep.slot_number)), + } + return d + + @staticmethod + def _blobs_bundle_v1_to_dict(b: "ssz.BlobsBundleV1") -> dict: + return { + "commitments": ["0x" + bytes(c).hex() for c in b.commitments], + "proofs": ["0x" + bytes(p).hex() for p in b.proofs], + "blobs": ["0x" + bytes(blob).hex() for blob in b.blobs], + } + + @staticmethod + def _blobs_bundle_v2_to_dict(b: "ssz.BlobsBundleV2") -> dict: + return { + "commitments": ["0x" + bytes(c).hex() for c in b.commitments], + "proofs": ["0x" + bytes(p).hex() for p in b.proofs], + "blobs": ["0x" + bytes(blob).hex() for blob in b.blobs], + } + + @staticmethod + def _payload_status_ssz_to_dict(ps: "ssz.PayloadStatusV1") -> dict: + status_str = ssz.INT_TO_PAYLOAD_STATUS.get(int(ps.status)) + if status_str is None: + raise EngineAPIError(-1, f"Unknown PayloadStatus enum: {int(ps.status)}") + result: dict = {"status": status_str} + if len(ps.latest_valid_hash) == 1: + result["latestValidHash"] = "0x" + bytes(ps.latest_valid_hash[0]).hex() + else: + result["latestValidHash"] = None + if len(ps.validation_error) > 0: + result["validationError"] = bytes(ps.validation_error).decode("utf-8", errors="replace") + else: + result["validationError"] = None + return result + + @classmethod + def _fcu_response_ssz_to_dict(cls, r: "ssz.ForkchoiceUpdatedResponseV1") -> dict: + result = {"payloadStatus": cls._payload_status_ssz_to_dict(r.payload_status)} + if len(r.payload_id) == 1: + result["payloadId"] = "0x" + bytes(r.payload_id[0]).hex() + else: + result["payloadId"] = None + return result + + @classmethod + def _fc_state_to_ssz(cls, state: ForkchoiceState) -> "ssz.ForkchoiceStateV1": + return ssz.ForkchoiceStateV1( + head_block_hash=ssz.Bytes32(state.head_block_hash), + safe_block_hash=ssz.Bytes32(state.safe_block_hash), + finalized_block_hash=ssz.Bytes32(state.finalized_block_hash), + ) + + @classmethod + def _attrs_v1_dict_to_ssz(cls, a: dict) -> "ssz.PayloadAttributesV1": + return ssz.PayloadAttributesV1( + timestamp=cls._hex_int(a["timestamp"]), + prev_randao=ssz.Bytes32(cls._hex_bytes(a["prevRandao"])), + suggested_fee_recipient=ssz.Bytes20(cls._hex_bytes(a["suggestedFeeRecipient"])), + ) + + @classmethod + def _attrs_v2_dict_to_ssz(cls, a: dict) -> "ssz.PayloadAttributesV2": + return ssz.PayloadAttributesV2( + timestamp=cls._hex_int(a["timestamp"]), + prev_randao=ssz.Bytes32(cls._hex_bytes(a["prevRandao"])), + suggested_fee_recipient=ssz.Bytes20(cls._hex_bytes(a["suggestedFeeRecipient"])), + withdrawals=[cls._withdrawal_dict_to_ssz(w) for w in a.get("withdrawals", [])], + ) + + @classmethod + def _attrs_v3_dict_to_ssz(cls, a: dict) -> "ssz.PayloadAttributesV3": + return ssz.PayloadAttributesV3( + timestamp=cls._hex_int(a["timestamp"]), + prev_randao=ssz.Bytes32(cls._hex_bytes(a["prevRandao"])), + suggested_fee_recipient=ssz.Bytes20(cls._hex_bytes(a["suggestedFeeRecipient"])), + withdrawals=[cls._withdrawal_dict_to_ssz(w) for w in a.get("withdrawals", [])], + parent_beacon_block_root=ssz.Bytes32(cls._hex_bytes(a["parentBeaconBlockRoot"])), + ) + + @classmethod + def _attrs_v4_dict_to_ssz(cls, a: dict) -> "ssz.PayloadAttributesV4": + return ssz.PayloadAttributesV4( + timestamp=cls._hex_int(a["timestamp"]), + prev_randao=ssz.Bytes32(cls._hex_bytes(a["prevRandao"])), + suggested_fee_recipient=ssz.Bytes20(cls._hex_bytes(a["suggestedFeeRecipient"])), + withdrawals=[cls._withdrawal_dict_to_ssz(w) for w in a.get("withdrawals", [])], + parent_beacon_block_root=ssz.Bytes32(cls._hex_bytes(a["parentBeaconBlockRoot"])), + slot_number=cls._hex_int(a["slotNumber"]), + ) diff --git a/consensoor/engine/ssz_types.py b/consensoor/engine/ssz_types.py new file mode 100644 index 0000000..357ecdd --- /dev/null +++ b/consensoor/engine/ssz_types.py @@ -0,0 +1,305 @@ +"""SSZ container definitions for the Engine API binary transport. + +Per execution-apis PR #764 (`src/engine/ssz-encoding.md`). +Nullable JSON fields are encoded as ``List[T, 1]`` in SSZ — empty list +denotes absence, single element denotes presence. +""" + +from remerkleable.basic import boolean, uint8, uint64, uint256 +from remerkleable.byte_arrays import ByteList, ByteVector, Bytes32 +from remerkleable.complex import Container, List + +from ..spec.types.base import ( + Bytes20, + ExecutionAddress, + Hash32, + MAX_BYTES_PER_TRANSACTION, +) +from ..spec.types.capella import Withdrawal as WithdrawalV1 + +BYTES_PER_LOGS_BLOOM = 256 +MAX_EXTRA_DATA_BYTES = 2**5 +MAX_TRANSACTIONS_PER_PAYLOAD = 2**20 +MAX_WITHDRAWALS_PER_PAYLOAD = 2**4 +MAX_BLOB_COMMITMENTS_PER_BLOCK = 2**12 +FIELD_ELEMENTS_PER_BLOB = 4096 +BYTES_PER_FIELD_ELEMENT = 32 +BLOB_SIZE = FIELD_ELEMENTS_PER_BLOB * BYTES_PER_FIELD_ELEMENT +CELLS_PER_EXT_BLOB = 128 +MAX_BLOB_HASHES_REQUEST = 128 +MAX_PAYLOAD_BODIES_REQUEST = 2**5 +MAX_EXECUTION_REQUESTS = 2**8 +MAX_ERROR_MESSAGE_LENGTH = 1024 +MAX_CLIENT_CODE_LENGTH = 2 +MAX_CLIENT_NAME_LENGTH = 64 +MAX_CLIENT_VERSION_LENGTH = 64 +MAX_CLIENT_VERSIONS = 4 +MAX_CAPABILITY_NAME_LENGTH = 64 +MAX_CAPABILITIES = 64 + + +Bytes8 = ByteVector[8] +Bytes48 = ByteVector[48] +LogsBloom = ByteVector[BYTES_PER_LOGS_BLOOM] +ExtraData = ByteList[MAX_EXTRA_DATA_BYTES] +TransactionBytes = ByteList[MAX_BYTES_PER_TRANSACTION] +Blob = ByteVector[BLOB_SIZE] + + +class ExecutionPayloadV1(Container): + parent_hash: Hash32 + fee_recipient: ExecutionAddress + state_root: Bytes32 + receipts_root: Bytes32 + logs_bloom: LogsBloom + prev_randao: Bytes32 + block_number: uint64 + gas_limit: uint64 + gas_used: uint64 + timestamp: uint64 + extra_data: ExtraData + base_fee_per_gas: uint256 + block_hash: Hash32 + transactions: List[TransactionBytes, MAX_TRANSACTIONS_PER_PAYLOAD] + + +class ExecutionPayloadV2(Container): + parent_hash: Hash32 + fee_recipient: ExecutionAddress + state_root: Bytes32 + receipts_root: Bytes32 + logs_bloom: LogsBloom + prev_randao: Bytes32 + block_number: uint64 + gas_limit: uint64 + gas_used: uint64 + timestamp: uint64 + extra_data: ExtraData + base_fee_per_gas: uint256 + block_hash: Hash32 + transactions: List[TransactionBytes, MAX_TRANSACTIONS_PER_PAYLOAD] + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + + +class ExecutionPayloadV3(Container): + parent_hash: Hash32 + fee_recipient: ExecutionAddress + state_root: Bytes32 + receipts_root: Bytes32 + logs_bloom: LogsBloom + prev_randao: Bytes32 + block_number: uint64 + gas_limit: uint64 + gas_used: uint64 + timestamp: uint64 + extra_data: ExtraData + base_fee_per_gas: uint256 + block_hash: Hash32 + transactions: List[TransactionBytes, MAX_TRANSACTIONS_PER_PAYLOAD] + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + blob_gas_used: uint64 + excess_blob_gas: uint64 + + +class ExecutionPayloadV4(Container): + parent_hash: Hash32 + fee_recipient: ExecutionAddress + state_root: Bytes32 + receipts_root: Bytes32 + logs_bloom: LogsBloom + prev_randao: Bytes32 + block_number: uint64 + gas_limit: uint64 + gas_used: uint64 + timestamp: uint64 + extra_data: ExtraData + base_fee_per_gas: uint256 + block_hash: Hash32 + transactions: List[TransactionBytes, MAX_TRANSACTIONS_PER_PAYLOAD] + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + blob_gas_used: uint64 + excess_blob_gas: uint64 + block_access_list: ByteList[MAX_BYTES_PER_TRANSACTION] + slot_number: uint64 + + +class PayloadStatusV1(Container): + status: uint8 + latest_valid_hash: List[Bytes32, 1] + validation_error: ByteList[MAX_ERROR_MESSAGE_LENGTH] + + +PAYLOAD_STATUS_TO_INT = {"VALID": 0, "INVALID": 1, "SYNCING": 2, "ACCEPTED": 3} +INT_TO_PAYLOAD_STATUS = {v: k for k, v in PAYLOAD_STATUS_TO_INT.items()} + + +class ForkchoiceStateV1(Container): + head_block_hash: Bytes32 + safe_block_hash: Bytes32 + finalized_block_hash: Bytes32 + + +class PayloadAttributesV1(Container): + timestamp: uint64 + prev_randao: Bytes32 + suggested_fee_recipient: Bytes20 + + +class PayloadAttributesV2(Container): + timestamp: uint64 + prev_randao: Bytes32 + suggested_fee_recipient: Bytes20 + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + + +class PayloadAttributesV3(Container): + timestamp: uint64 + prev_randao: Bytes32 + suggested_fee_recipient: Bytes20 + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + parent_beacon_block_root: Bytes32 + + +class PayloadAttributesV4(Container): + timestamp: uint64 + prev_randao: Bytes32 + suggested_fee_recipient: Bytes20 + withdrawals: List[WithdrawalV1, MAX_WITHDRAWALS_PER_PAYLOAD] + parent_beacon_block_root: Bytes32 + slot_number: uint64 + + +class ForkchoiceUpdatedResponseV1(Container): + payload_status: PayloadStatusV1 + payload_id: List[Bytes8, 1] + + +class BlobsBundleV1(Container): + commitments: List[Bytes48, MAX_BLOB_COMMITMENTS_PER_BLOCK] + proofs: List[Bytes48, MAX_BLOB_COMMITMENTS_PER_BLOCK] + blobs: List[Blob, MAX_BLOB_COMMITMENTS_PER_BLOCK] + + +class BlobsBundleV2(Container): + commitments: List[Bytes48, MAX_BLOB_COMMITMENTS_PER_BLOCK] + proofs: List[Bytes48, MAX_BLOB_COMMITMENTS_PER_BLOCK * CELLS_PER_EXT_BLOB] + blobs: List[Blob, MAX_BLOB_COMMITMENTS_PER_BLOCK] + + +class GetPayloadResponseV2(Container): + execution_payload: ExecutionPayloadV2 + block_value: uint256 + + +class GetPayloadResponseV3(Container): + execution_payload: ExecutionPayloadV3 + block_value: uint256 + blobs_bundle: BlobsBundleV1 + should_override_builder: boolean + + +class GetPayloadResponseV4(Container): + execution_payload: ExecutionPayloadV3 + block_value: uint256 + blobs_bundle: BlobsBundleV1 + should_override_builder: boolean + execution_requests: List[TransactionBytes, MAX_EXECUTION_REQUESTS] + + +class GetPayloadResponseV5(Container): + execution_payload: ExecutionPayloadV3 + block_value: uint256 + blobs_bundle: BlobsBundleV2 + should_override_builder: boolean + execution_requests: List[TransactionBytes, MAX_EXECUTION_REQUESTS] + + +class GetPayloadResponseV6(Container): + execution_payload: ExecutionPayloadV4 + block_value: uint256 + blobs_bundle: BlobsBundleV2 + should_override_builder: boolean + execution_requests: List[TransactionBytes, MAX_EXECUTION_REQUESTS] + + +class NewPayloadV1Request(Container): + execution_payload: ExecutionPayloadV1 + + +class NewPayloadV2Request(Container): + execution_payload: ExecutionPayloadV2 + + +class NewPayloadV3Request(Container): + execution_payload: ExecutionPayloadV3 + expected_blob_versioned_hashes: List[Bytes32, MAX_BLOB_COMMITMENTS_PER_BLOCK] + parent_beacon_block_root: Bytes32 + + +class NewPayloadV4Request(Container): + execution_payload: ExecutionPayloadV3 + expected_blob_versioned_hashes: List[Bytes32, MAX_BLOB_COMMITMENTS_PER_BLOCK] + parent_beacon_block_root: Bytes32 + execution_requests: List[TransactionBytes, MAX_EXECUTION_REQUESTS] + + +class NewPayloadV5Request(Container): + execution_payload: ExecutionPayloadV4 + expected_blob_versioned_hashes: List[Bytes32, MAX_BLOB_COMMITMENTS_PER_BLOCK] + parent_beacon_block_root: Bytes32 + execution_requests: List[TransactionBytes, MAX_EXECUTION_REQUESTS] + + +class ForkchoiceUpdatedV1Request(Container): + forkchoice_state: ForkchoiceStateV1 + payload_attributes: List[PayloadAttributesV1, 1] + + +class ForkchoiceUpdatedV2Request(Container): + forkchoice_state: ForkchoiceStateV1 + payload_attributes: List[PayloadAttributesV2, 1] + + +class ForkchoiceUpdatedV3Request(Container): + forkchoice_state: ForkchoiceStateV1 + payload_attributes: List[PayloadAttributesV3, 1] + + +class ForkchoiceUpdatedV4Request(Container): + forkchoice_state: ForkchoiceStateV1 + payload_attributes: List[PayloadAttributesV4, 1] + + +class ExchangeCapabilitiesRequest(Container): + capabilities: List[ByteList[MAX_CAPABILITY_NAME_LENGTH], MAX_CAPABILITIES] + + +class ExchangeCapabilitiesResponse(Container): + capabilities: List[ByteList[MAX_CAPABILITY_NAME_LENGTH], MAX_CAPABILITIES] + + +# Capability strings advertised in `engine_exchangeCapabilities` per PR #764. +# A CL advertises the SSZ REST endpoints it supports; if the EL responds with +# the same string, the corresponding call MAY use the binary transport. +SSZ_CAPABILITIES: list[str] = [ + "POST /engine/v1/payloads", + "POST /engine/v2/payloads", + "POST /engine/v3/payloads", + "POST /engine/v4/payloads", + "POST /engine/v5/payloads", + "GET /engine/v1/payloads/{payload_id}", + "GET /engine/v2/payloads/{payload_id}", + "GET /engine/v3/payloads/{payload_id}", + "GET /engine/v4/payloads/{payload_id}", + "GET /engine/v5/payloads/{payload_id}", + "GET /engine/v6/payloads/{payload_id}", + "POST /engine/v1/forkchoice", + "POST /engine/v2/forkchoice", + "POST /engine/v3/forkchoice", + "POST /engine/v4/forkchoice", + "POST /engine/v1/blobs", + "POST /engine/v2/blobs", + "POST /engine/v3/blobs", + "POST /engine/v1/capabilities", + "POST /engine/v1/client/version", +]