diff --git a/Cargo.lock b/Cargo.lock index f034c39c..6dd9c8ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,7 +1624,9 @@ dependencies = [ "ethereum_ssz_derive", "eyre", "futures", + "headers-accept", "jsonwebtoken", + "mediatype 0.20.0", "pbkdf2 0.12.2", "rand 0.9.2", "rayon", @@ -1671,6 +1673,7 @@ dependencies = [ "axum-extra", "cb-common", "cb-metrics", + "ethereum_ssz", "eyre", "futures", "lazy_static", @@ -1725,6 +1728,7 @@ dependencies = [ "cb-common", "cb-pbs", "cb-signer", + "ethereum_ssz", "eyre", "reqwest 0.12.23", "serde_json", @@ -2766,7 +2770,7 @@ dependencies = [ "futures", "futures-util", "libp2p-identity", - "mediatype", + "mediatype 0.19.20", "multiaddr", "pretty_reqwest_error", "proto_array", @@ -3335,6 +3339,17 @@ dependencies = [ "sha1", ] +[[package]] +name = "headers-accept" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479bcb872e714e11f72fcc6a71afadbc86d0dbe887bc44252b04cfbc63272897" +dependencies = [ + "headers-core", + "http 1.3.1", + "mediatype 0.20.0", +] + [[package]] name = "headers-core" version = "0.3.0" @@ -4135,6 +4150,12 @@ version = "0.19.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33746aadcb41349ec291e7f2f0a3aa6834d1d7c58066fb4b01f68efc4c4b7631" +[[package]] +name = "mediatype" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f490ea2ae935dd8ac89c472d4df28c7f6b87cc20767e1b21fd5ed6a16e7f61e4" + [[package]] name = "memchr" version = "2.7.5" diff --git a/Cargo.toml b/Cargo.toml index 68cb9e27..a4920cca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,12 +45,14 @@ ethereum_ssz_derive = "0.9" eyre = "0.6.12" futures = "0.3.30" headers = "0.4.0" +headers-accept = "0.2.1" indexmap = "2.2.6" jsonwebtoken = { version = "9.3.1", default-features = false } lazy_static = "1.5.0" lh_eth2 = { package = "eth2", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" } lh_eth2_keystore = { package = "eth2_keystore", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" } lh_types = { package = "types", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" } +mediatype = "0.20.0" parking_lot = "0.12.3" pbkdf2 = "0.12.2" prometheus = "0.14.0" diff --git a/config.example.toml b/config.example.toml index 2bcd0efe..3714a257 100644 --- a/config.example.toml +++ b/config.example.toml @@ -56,8 +56,8 @@ extra_validation_enabled = false # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" # URL of the SSV API server to use, if you have a mux that targets an SSV node operator -# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4" -# ssv_api_url = "https://api.ssv.network/api/v4" +# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4/" +# ssv_api_url = "https://api.ssv.network/api/v4/" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 http_timeout_seconds = 10 diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 57a5fbb3..feeb283c 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -25,10 +25,12 @@ ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true eyre.workspace = true futures.workspace = true +headers-accept.workspace = true jsonwebtoken.workspace = true lh_eth2.workspace = true lh_eth2_keystore.workspace = true lh_types.workspace = true +mediatype.workspace = true pbkdf2.workspace = true rand.workspace = true rayon.workspace = true diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 764ab188..291932d8 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,16 +1,31 @@ #[cfg(feature = "testing-flags")] use std::cell::Cell; use std::{ + fmt::Display, net::Ipv4Addr, + str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; use alloy::{hex, primitives::U256}; -use axum::http::HeaderValue; +use axum::{ + extract::{FromRequest, Request}, + http::HeaderValue, + response::{IntoResponse, Response as AxumResponse}, +}; +use bytes::Bytes; use futures::StreamExt; -use lh_types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; +use headers_accept::Accept; +pub use lh_types::ForkName; +use lh_types::{ + BeaconBlock, + test_utils::{SeedableRng, TestRandom, XorShiftRng}, +}; use rand::{Rng, distr::Alphanumeric}; -use reqwest::{Response, header::HeaderMap}; +use reqwest::{ + Response, + header::{ACCEPT, CONTENT_TYPE, HeaderMap}, +}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value; use ssz::{Decode, Encode}; @@ -26,11 +41,16 @@ use tracing_subscriber::{ use crate::{ config::LogsSettings, constants::SIGNER_JWT_EXPIRATION, - pbs::HEADER_VERSION_VALUE, + pbs::{HEADER_VERSION_VALUE, SignedBlindedBeaconBlock}, types::{BlsPublicKey, Chain, Jwt, JwtClaims, ModuleId}, }; +const APPLICATION_JSON: &str = "application/json"; +const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; +const WILDCARD: &str = "*/*"; + const MILLIS_PER_SECOND: u64 = 1_000; +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; #[derive(Debug, Error)] pub enum ResponseReadError { @@ -409,6 +429,162 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result eyre::Result { + let accept = Accept::from_str( + req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or(APPLICATION_JSON), + ) + .map_err(|e| eyre::eyre!("invalid accept header: {e}"))?; + + if accept.media_types().count() == 0 { + // No valid media types found, default to JSON + return Ok(EncodingType::Json); + } + + // Get the SSZ and JSON media types if present + let mut ssz_type = false; + let mut json_type = false; + let mut unsupported_type = false; + accept.media_types().for_each(|mt| match mt.essence().to_string().as_str() { + APPLICATION_OCTET_STREAM => ssz_type = true, + APPLICATION_JSON | WILDCARD => json_type = true, + _ => unsupported_type = true, + }); + + // If SSZ is present, prioritize it + if ssz_type { + return Ok(EncodingType::Ssz); + } + // If there aren't any unsupported types, use JSON + if !unsupported_type { + return Ok(EncodingType::Json); + } + Err(eyre::eyre!("unsupported accept type")) +} + +/// Parse CONTENT TYPE header to get the encoding type of the body, defaulting +/// to JSON if missing or malformed. +pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType { + EncodingType::from_str( + req_headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .unwrap_or(APPLICATION_JSON), + ) + .unwrap_or(EncodingType::Json) +} + +/// Parse CONSENSUS_VERSION header +pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option { + ForkName::from_str( + req_headers + .get(CONSENSUS_VERSION_HEADER) + .and_then(|value| value.to_str().ok()) + .unwrap_or(""), + ) + .ok() +} + +/// Enum for types that can be used to encode incoming request bodies or +/// outgoing response bodies +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum EncodingType { + /// Body is UTF-8 encoded as JSON + Json, + + /// Body is raw bytes representing an SSZ object + Ssz, +} + +impl std::fmt::Display for EncodingType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EncodingType::Json => write!(f, "application/json"), + EncodingType::Ssz => write!(f, "application/octet-stream"), + } + } +} + +impl FromStr for EncodingType { + type Err = String; + fn from_str(value: &str) -> Result { + match value { + "application/json" | "" => Ok(EncodingType::Json), + "application/octet-stream" => Ok(EncodingType::Ssz), + _ => Err(format!("unsupported encoding type: {value}")), + } + } +} + +pub enum BodyDeserializeError { + SerdeJsonError(serde_json::Error), + SszDecodeError(ssz::DecodeError), + UnsupportedMediaType, + MissingVersionHeader, +} + +impl Display for BodyDeserializeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BodyDeserializeError::SerdeJsonError(e) => write!(f, "JSON deserialization error: {e}"), + BodyDeserializeError::SszDecodeError(e) => { + write!(f, "SSZ deserialization error: {e:?}") + } + BodyDeserializeError::UnsupportedMediaType => write!(f, "unsupported media type"), + BodyDeserializeError::MissingVersionHeader => { + write!(f, "missing consensus version header") + } + } + } +} + +pub async fn deserialize_body( + headers: &HeaderMap, + body: Bytes, +) -> Result { + if headers.contains_key(CONTENT_TYPE) { + return match get_content_type(headers) { + EncodingType::Json => serde_json::from_slice::(&body) + .map_err(BodyDeserializeError::SerdeJsonError), + EncodingType::Ssz => { + // Get the version header + match get_consensus_version_header(headers) { + Some(version) => { + SignedBlindedBeaconBlock::from_ssz_bytes_with(&body, |bytes| { + BeaconBlock::from_ssz_bytes_for_fork(bytes, version) + }) + .map_err(BodyDeserializeError::SszDecodeError) + } + None => Err(BodyDeserializeError::MissingVersionHeader), + } + } + }; + } + + Err(BodyDeserializeError::UnsupportedMediaType) +} + +#[must_use] +#[derive(Debug, Clone, Default)] +pub struct RawRequest { + pub body_bytes: Bytes, +} + +impl FromRequest for RawRequest +where + S: Send + Sync, +{ + type Rejection = AxumResponse; + + async fn from_request(req: Request, _state: &S) -> Result { + let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?; + Ok(Self { body_bytes: bytes }) + } +} + #[cfg(unix)] pub async fn wait_for_signal() -> eyre::Result<()> { use tokio::signal::unix::{SignalKind, signal}; diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index e8cb0b31..1c5c2f1f 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -12,6 +12,7 @@ axum.workspace = true axum-extra.workspace = true cb-common.workspace = true cb-metrics.workspace = true +ethereum_ssz.workspace = true eyre.workspace = true futures.workspace = true lazy_static.workspace = true diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 590c03d4..6c1c5c68 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -6,6 +6,7 @@ pub enum PbsClientError { NoResponse, NoPayload, Internal, + DecodeError(String), } impl PbsClientError { @@ -14,6 +15,7 @@ impl PbsClientError { PbsClientError::NoResponse => StatusCode::BAD_GATEWAY, PbsClientError::NoPayload => StatusCode::BAD_GATEWAY, PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR, + PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST, } } } @@ -24,6 +26,7 @@ impl IntoResponse for PbsClientError { PbsClientError::NoResponse => "no response from relays".to_string(), PbsClientError::NoPayload => "no payload from relays".to_string(), PbsClientError::Internal => "internal server error".to_string(), + PbsClientError::DecodeError(e) => format!("error decoding request: {e}"), }; (self.status_code(), msg).into_response() diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 9ed312af..ca8d2d7c 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -1,14 +1,17 @@ use alloy::primitives::utils::format_ether; use axum::{ extract::{Path, State}, - http::HeaderMap, + http::{HeaderMap, HeaderValue}, response::IntoResponse, }; use cb_common::{ pbs::{GetHeaderInfo, GetHeaderParams}, - utils::{get_user_agent, ms_into_slot}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, get_accept_type, get_user_agent, ms_into_slot, + }, }; -use reqwest::StatusCode; +use reqwest::{StatusCode, header::CONTENT_TYPE}; +use ssz::Encode; use tracing::{error, info}; use crate::{ @@ -32,6 +35,14 @@ pub async fn handle_get_header>( let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let accept_type = get_accept_type(&req_headers).map_err(|e| { + error!(%e, "error parsing accept header"); + PbsClientError::DecodeError(format!("error parsing accept header: {e}")) + }); + if let Err(e) = accept_type { + return Ok((StatusCode::BAD_REQUEST, e).into_response()); + } + let accept_type = accept_type.unwrap(); info!(ua, ms_into_slot, "new request"); @@ -41,7 +52,30 @@ pub async fn handle_get_header>( info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + let response = match accept_type { + EncodingType::Ssz => { + let mut res = max_bid.data.as_ssz_bytes().into_response(); + let Ok(consensus_version_header) = + HeaderValue::from_str(&max_bid.version.to_string()) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); + }; + let Ok(content_type_header) = + HeaderValue::from_str(&format!("{}", EncodingType::Ssz)) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); + }; + res.headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + res + } + EncodingType::Json => (StatusCode::OK, axum::Json(max_bid)).into_response(), + }; + Ok(response) } else { // spec: return 204 if request is valid but no bid available info!("no header available for slot"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 004b601e..1134b462 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,11 +1,20 @@ use std::sync::Arc; -use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; +use axum::{ + Json, + extract::State, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, +}; use cb_common::{ - pbs::{BuilderApiVersion, GetPayloadInfo, SignedBlindedBeaconBlock}, - utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}, + pbs::{BuilderApiVersion, GetPayloadInfo}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, RawRequest, deserialize_body, get_accept_type, + get_user_agent, timestamp_of_slot_start_millis, utcnow_ms + }, }; -use reqwest::StatusCode; +use reqwest::{StatusCode, header::CONTENT_TYPE}; +use ssz::Encode; use tracing::{error, info, trace}; use crate::{ @@ -19,37 +28,30 @@ use crate::{ pub async fn handle_submit_block_v1>( state: State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json>, + raw_request: RawRequest, ) -> Result { - handle_submit_block_impl::( - state, - req_headers, - signed_blinded_block, - BuilderApiVersion::V1, - ) - .await + handle_submit_block_impl::(state, req_headers, raw_request, BuilderApiVersion::V1).await } pub async fn handle_submit_block_v2>( state: State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json>, + raw_request: RawRequest, ) -> Result { - handle_submit_block_impl::( - state, - req_headers, - signed_blinded_block, - BuilderApiVersion::V2, - ) - .await + handle_submit_block_impl::(state, req_headers, raw_request, BuilderApiVersion::V2).await } async fn handle_submit_block_impl>( State(state): State>, req_headers: HeaderMap, - signed_blinded_block: Arc, + raw_request: RawRequest, api_version: BuilderApiVersion, ) -> Result { + let signed_blinded_block = Arc::new( + deserialize_body(&req_headers, raw_request.body_bytes).await.map_err(|e| { + error!(%e, "failed to deserialize signed blinded block"); + PbsClientError::DecodeError(format!("failed to deserialize body: {e}")) + })?); tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64); tracing::Span::current() .record("block_hash", tracing::field::debug(signed_blinded_block.block_hash())); @@ -64,23 +66,67 @@ async fn handle_submit_block_impl>( let block_hash = signed_blinded_block.block_hash(); let slot_start_ms = timestamp_of_slot_start_millis(slot.into(), state.config.chain); let ua = get_user_agent(&req_headers); + let response_type = get_accept_type(&req_headers).map_err(|e| { + error!(%e, "error parsing accept header"); + PbsClientError::DecodeError(format!("error parsing accept header: {e}")) + }); + if let Err(e) = response_type { + return Ok((StatusCode::BAD_REQUEST, e.into_response())); + } + let response_type = response_type.unwrap(); info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); match A::submit_block(signed_blinded_block, req_headers, state, api_version).await { Ok(res) => match res { - Some(block_response) => { - trace!(?block_response); + Some(payload_and_blobs) => { + trace!(?payload_and_blobs); info!("received unblinded block (v1)"); BEACON_NODE_STATUS .with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); - Ok((StatusCode::OK, Json(block_response).into_response())) + let response = match response_type { + EncodingType::Json => { + info!("sending response as JSON"); + Json(payload_and_blobs).into_response() + } + EncodingType::Ssz => { + let mut response = payload_and_blobs.data.as_ssz_bytes().into_response(); + let Ok(consensus_version_header) = + HeaderValue::from_str(&payload_and_blobs.version.to_string()) + else { + info!("sending response as JSON"); + return Ok(( + StatusCode::OK, + axum::Json(payload_and_blobs).into_response(), + )); + }; + let Ok(content_type_header) = + HeaderValue::from_str(&EncodingType::Ssz.to_string()) + else { + info!("sending response as JSON"); + return Ok(( + StatusCode::OK, + axum::Json(payload_and_blobs).into_response(), + )); + }; + response + .headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + response + } + }; + + Ok((StatusCode::OK, response)) } None => { info!("received unblinded block (v2)"); + // Note: this doesn't provide consensus_version_header because it doesn't pass + // the body through, and there's no content-type header since the body is empty. BEACON_NODE_STATUS .with_label_values(&["202", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 5e8e1596..61a54420 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -10,6 +10,7 @@ axum.workspace = true cb-common.workspace = true cb-pbs.workspace = true cb-signer.workspace = true +ethereum_ssz.workspace = true eyre.workspace = true lh_types.workspace = true reqwest.workspace = true diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index 6f004c97..5611a1eb 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -10,7 +10,7 @@ use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; use axum::{ Json, Router, extract::{Path, State}, - http::StatusCode, + http::{HeaderMap, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, }; @@ -19,17 +19,22 @@ use cb_common::{ BUILDER_V1_API_PATH, BUILDER_V2_API_PATH, BlobsBundle, BuilderBid, BuilderBidElectra, ExecutionPayloadElectra, ExecutionPayloadHeaderElectra, ExecutionRequests, ForkName, GET_HEADER_PATH, GET_STATUS_PATH, GetHeaderParams, GetHeaderResponse, GetPayloadInfo, - PayloadAndBlobs, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, SignedBlindedBeaconBlock, - SignedBuilderBid, SubmitBlindedBlockResponse, + PayloadAndBlobs, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, SignedBuilderBid, + SubmitBlindedBlockResponse, }, signature::sign_builder_root, types::{BlsSecretKey, Chain}, - utils::{TestRandomSeed, timestamp_of_slot_start_sec}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, RawRequest, TestRandomSeed, deserialize_body, + get_accept_type, get_consensus_version_header, timestamp_of_slot_start_sec, + }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK_RESPONSE; use lh_types::KzgProof; +use reqwest::header::CONTENT_TYPE; +use ssz::Encode; use tokio::net::TcpListener; -use tracing::debug; +use tracing::{debug, error}; use tree_hash::TreeHash; pub async fn start_mock_relay_service(state: Arc, port: u16) -> eyre::Result<()> { @@ -110,36 +115,69 @@ pub fn mock_relay_app_router(state: Arc) -> Router { async fn handle_get_header( State(state): State>, Path(GetHeaderParams { parent_hash, .. }): Path, + headers: HeaderMap, ) -> Response { state.received_get_header.fetch_add(1, Ordering::Relaxed); + let accept_type = get_accept_type(&headers) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}"))); + if let Err(e) = accept_type { + return e.into_response(); + } + let accept_header = accept_type.unwrap(); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); - let mut header = ExecutionPayloadHeaderElectra { - parent_hash: parent_hash.into(), - block_hash: Default::default(), - timestamp: timestamp_of_slot_start_sec(0, state.chain), - ..ExecutionPayloadHeaderElectra::test_random() - }; - - header.block_hash.0[0] = 1; + let data = match consensus_version_header { + // Add Fusaka and other forks here when necessary + ForkName::Electra => { + let mut header = ExecutionPayloadHeaderElectra { + parent_hash: parent_hash.into(), + block_hash: Default::default(), + timestamp: timestamp_of_slot_start_sec(0, state.chain), + ..ExecutionPayloadHeaderElectra::test_random() + }; - let message = BuilderBid::Electra(BuilderBidElectra { - header, - blob_kzg_commitments: Default::default(), - execution_requests: ExecutionRequests::default(), - value: U256::from(10), - pubkey: state.signer.public_key().into(), - }); + header.block_hash.0[0] = 1; - let object_root = message.tree_hash_root(); - let signature = sign_builder_root(state.chain, &state.signer, object_root); - let response = SignedBuilderBid { message, signature }; + let message = BuilderBid::Electra(BuilderBidElectra { + header, + blob_kzg_commitments: Default::default(), + execution_requests: ExecutionRequests::default(), + value: U256::from(10), + pubkey: state.signer.public_key().into(), + }); - let response = GetHeaderResponse { - version: ForkName::Electra, - data: response, - metadata: Default::default(), + let object_root = message.tree_hash_root(); + let signature = sign_builder_root(state.chain, &state.signer, object_root); + let response = SignedBuilderBid { message, signature }; + match accept_header { + EncodingType::Json => { + let versioned_response = GetHeaderResponse { + version: ForkName::Electra, + data: response, + metadata: Default::default(), + }; + serde_json::to_vec(&versioned_response).unwrap() + } + EncodingType::Ssz => response.as_ssz_bytes(), + } + } + _ => { + return ( + StatusCode::BAD_REQUEST, + format!("Unsupported fork {consensus_version_header}"), + ) + .into_response(); + } }; - (StatusCode::OK, Json(response)).into_response() + + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } async fn handle_get_status(State(state): State>) -> impl IntoResponse { @@ -162,14 +200,33 @@ async fn handle_register_validator( } async fn handle_submit_block_v1( + headers: HeaderMap, State(state): State>, - Json(submit_block): Json, + raw_request: RawRequest, ) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); - if state.large_body() { - (StatusCode::OK, Json(vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK_RESPONSE])).into_response() + let accept_header = get_accept_type(&headers); + if let Err(e) = accept_header { + error!(%e, "error parsing accept header"); + return (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}")) + .into_response(); + } + let accept_header = accept_header.unwrap(); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + + let data = if state.large_body() { + vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK_RESPONSE] } else { let mut execution_payload = ExecutionPayloadElectra::test_random(); + let submit_block = deserialize_body(&headers, raw_request.body_bytes).await.map_err(|e| { + error!(%e, "failed to deserialize signed blinded block"); + (StatusCode::BAD_REQUEST, format!("failed to deserialize body: {e}")) + }); + if let Err(e) = submit_block { + return e.into_response(); + } + let submit_block = submit_block.unwrap(); execution_payload.block_hash = submit_block.block_hash().into(); let mut blobs_bundle = BlobsBundle::default(); @@ -182,15 +239,39 @@ async fn handle_submit_block_v1( let response = PayloadAndBlobs { execution_payload: execution_payload.into(), blobs_bundle }; - let response = SubmitBlindedBlockResponse { - version: ForkName::Electra, - metadata: Default::default(), - data: response, - }; + match accept_header { + EncodingType::Json => { + // Response is versioned for JSON + let response = SubmitBlindedBlockResponse { + version: ForkName::Electra, + metadata: Default::default(), + data: response, + }; + serde_json::to_vec(&response).unwrap() + } + EncodingType::Ssz => match consensus_version_header { + // Response isn't versioned for SSZ + ForkName::Electra => response.as_ssz_bytes(), + _ => { + return ( + StatusCode::BAD_REQUEST, + format!("Unsupported fork {consensus_version_header}"), + ) + .into_response(); + } + }, + } + }; - (StatusCode::OK, Json(response)).into_response() - } + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } + async fn handle_submit_block_v2(State(state): State>) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); (StatusCode::ACCEPTED, "").into_response() diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index ab593277..80aed0c2 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -2,9 +2,13 @@ use alloy::{primitives::B256, rpc::types::beacon::relay::ValidatorRegistration}; use cb_common::{ pbs::{BuilderApiVersion, RelayClient, SignedBlindedBeaconBlock}, types::BlsPublicKey, - utils::bls_pubkey_from_hex, + utils::{CONSENSUS_VERSION_HEADER, EncodingType, ForkName, bls_pubkey_from_hex}, }; -use reqwest::Response; +use reqwest::{ + Response, + header::{ACCEPT, CONTENT_TYPE}, +}; +use ssz::Encode; use crate::utils::generate_mock_relay; @@ -20,13 +24,26 @@ impl MockValidator { Ok(Self { comm_boost: generate_mock_relay(port, pubkey)? }) } - pub async fn do_get_header(&self, pubkey: Option) -> eyre::Result { + pub async fn do_get_header( + &self, + pubkey: Option, + accept: Option, + fork_name: ForkName, + ) -> eyre::Result { let default_pubkey = bls_pubkey_from_hex( "0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae", )?; let url = self.comm_boost.get_header_url(0, &B256::ZERO, &pubkey.unwrap_or(default_pubkey))?; - Ok(self.comm_boost.client.get(url).send().await?) + let res = self + .comm_boost + .client + .get(url) + .header(ACCEPT, &accept.unwrap_or(EncodingType::Json).to_string()) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) + .send() + .await?; + Ok(res) } pub async fn do_get_status(&self) -> eyre::Result { @@ -49,29 +66,65 @@ impl MockValidator { pub async fn do_submit_block_v1( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, + accept: EncodingType, + content_type: EncodingType, + fork_name: ForkName, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block, BuilderApiVersion::V1).await + self.do_submit_block_impl( + signed_blinded_block_opt, + accept, + content_type, + fork_name, + BuilderApiVersion::V1, + ) + .await } pub async fn do_submit_block_v2( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, + accept: EncodingType, + content_type: EncodingType, + fork_name: ForkName, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block, BuilderApiVersion::V2).await + self.do_submit_block_impl( + signed_blinded_block_opt, + accept, + content_type, + fork_name, + BuilderApiVersion::V2, + ) + .await } async fn do_submit_block_impl( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, + accept: EncodingType, + content_type: EncodingType, + fork_name: ForkName, api_version: BuilderApiVersion, ) -> eyre::Result { let url = self.comm_boost.submit_block_url(api_version).unwrap(); let signed_blinded_block = - signed_blinded_block.unwrap_or_else(load_test_signed_blinded_block); + signed_blinded_block_opt.unwrap_or_else(load_test_signed_blinded_block); + let body = match content_type { + EncodingType::Json => serde_json::to_vec(&signed_blinded_block).unwrap(), + EncodingType::Ssz => signed_blinded_block.as_ssz_bytes(), + }; - Ok(self.comm_boost.client.post(url).json(&signed_blinded_block).send().await?) + Ok(self + .comm_boost + .client + .post(url) + .body(body) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) + .header(CONTENT_TYPE, &content_type.to_string()) + .header(ACCEPT, &accept.to_string()) + .send() + .await?) } } diff --git a/tests/tests/pbs_get_header.rs b/tests/tests/pbs_get_header.rs index d44d70ce..eebb0113 100644 --- a/tests/tests/pbs_get_header.rs +++ b/tests/tests/pbs_get_header.rs @@ -2,11 +2,11 @@ use std::{sync::Arc, time::Duration}; use alloy::primitives::{B256, U256}; use cb_common::{ - pbs::GetHeaderResponse, + pbs::{GetHeaderResponse, SignedBuilderBid}, signature::sign_builder_root, signer::random_secret, types::{BlsPublicKeyBytes, Chain}, - utils::timestamp_of_slot_start_sec, + utils::{EncodingType, ForkName, get_consensus_version_header, timestamp_of_slot_start_sec}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -15,7 +15,7 @@ use cb_tests::{ utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, }; use eyre::Result; -use lh_types::ForkName; +use lh_types::ForkVersionDecode; use reqwest::StatusCode; use tracing::info; use tree_hash::TreeHash; @@ -45,7 +45,7 @@ async fn test_get_header() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); let res = serde_json::from_slice::(&res.bytes().await?)?; @@ -64,6 +64,52 @@ async fn test_get_header() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey = signer.public_key(); + + let chain = Chain::Holesky; + let pbs_port = 3210; + let relay_port = pbs_port + 1; + + // Run a mock relay + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = + mock_validator.do_get_header(None, Some(EncodingType::Ssz), ForkName::Electra).await?; + assert_eq!(res.status(), StatusCode::OK); + + let fork = get_consensus_version_header(res.headers()).expect("missing fork version header"); + assert_eq!(fork, ForkName::Electra); + let data = SignedBuilderBid::from_ssz_bytes_by_fork(&res.bytes().await?, fork).unwrap(); + + assert_eq!(mock_state.received_get_header(), 1); + assert_eq!(data.message.header().block_hash().0[0], 1); + assert_eq!(data.message.header().parent_hash().0, B256::ZERO); + assert_eq!(*data.message.value(), U256::from(10)); + assert_eq!(*data.message.pubkey(), BlsPublicKeyBytes::from(mock_state.signer.public_key())); + assert_eq!(data.message.header().timestamp(), timestamp_of_slot_start_sec(0, chain)); + assert_eq!( + data.signature, + sign_builder_root(chain, &mock_state.signer, data.message.tree_hash_root()) + ); + Ok(()) +} + #[tokio::test] async fn test_get_header_returns_204_if_relay_down() -> Result<()> { setup_test_env(); @@ -91,7 +137,7 @@ async fn test_get_header_returns_204_if_relay_down() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::NO_CONTENT); // 204 error assert_eq!(mock_state.received_get_header(), 0); // no header received diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 3a15b49b..34bc76de 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -5,7 +5,7 @@ use cb_common::{ interop::ssv::utils::fetch_ssv_pubkeys_from_url, signer::random_secret, types::Chain, - utils::{ResponseReadError, set_ignore_content_length}, + utils::{EncodingType, ForkName, ResponseReadError, set_ignore_content_length}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -195,13 +195,19 @@ async fn test_mux() -> Result<()> { // Send default request without specifying a validator key let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header with default"); - assert_eq!(mock_validator.do_get_header(None).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator.do_get_header(None, None, ForkName::Electra).await?.status(), + StatusCode::OK + ); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used // Send request specifying a validator key to use mux info!("Sending get header with mux"); assert_eq!( - mock_validator.do_get_header(Some(validator_pubkey)).await?.status(), + mock_validator + .do_get_header(Some(validator_pubkey), None, ForkName::Electra) + .await? + .status(), StatusCode::OK ); assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used @@ -218,12 +224,24 @@ async fn test_mux() -> Result<()> { // v1 Submit block requests should go to all relays info!("Sending submit block v1"); - assert_eq!(mock_validator.do_submit_block_v1(None).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator + .do_submit_block_v1(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .await? + .status(), + StatusCode::OK + ); assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used // v2 Submit block requests should go to all relays info!("Sending submit block v2"); - assert_eq!(mock_validator.do_submit_block_v2(None).await?.status(), StatusCode::ACCEPTED); + assert_eq!( + mock_validator + .do_submit_block_v2(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .await? + .status(), + StatusCode::ACCEPTED + ); assert_eq!(mock_state.received_submit_block(), 6); // default + 2 mux relays were used Ok(()) diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 44979fbe..1c8d3cdc 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -14,6 +14,7 @@ use cb_tests::{ utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, }; use eyre::Result; +use lh_types::ForkName; use reqwest::StatusCode; use tokio::sync::RwLock; use tracing::info; @@ -108,7 +109,8 @@ async fn test_auto_refresh() -> Result<()> { // relay only since it hasn't been seen in the mux yet let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + let res = + mock_validator.do_get_header(Some(new_mux_pubkey.clone()), None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used @@ -136,14 +138,16 @@ async fn test_auto_refresh() -> Result<()> { assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); // Try to run a get_header on the new pubkey - now it should use the mux relay - let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + let res = + mock_validator.do_get_header(Some(new_mux_pubkey.clone()), None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used // Now try to do a get_header with the old pubkey - it should only use the // default relay - let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; + let res = + mock_validator.do_get_header(Some(default_pubkey.clone()), None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used @@ -160,7 +164,9 @@ async fn test_auto_refresh() -> Result<()> { // Try to do a get_header with the removed pubkey - it should only use the // default relay - let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; + let res = mock_validator + .do_get_header(Some(existing_mux_pubkey.clone()), None, ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used diff --git a/tests/tests/pbs_post_blinded_blocks.rs b/tests/tests/pbs_post_blinded_blocks.rs index 37e9612c..79725b3e 100644 --- a/tests/tests/pbs_post_blinded_blocks.rs +++ b/tests/tests/pbs_post_blinded_blocks.rs @@ -1,9 +1,10 @@ use std::{sync::Arc, time::Duration}; use cb_common::{ - pbs::{BuilderApiVersion, GetPayloadInfo, SubmitBlindedBlockResponse}, + pbs::{BuilderApiVersion, GetPayloadInfo, PayloadAndBlobs, SubmitBlindedBlockResponse}, signer::random_secret, types::Chain, + utils::{EncodingType, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -12,12 +13,13 @@ use cb_tests::{ utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, }; use eyre::Result; +use lh_types::beacon_response::ForkVersionDecode; use reqwest::{Response, StatusCode}; use tracing::info; #[tokio::test] async fn test_submit_block_v1() -> Result<()> { - let res = submit_block_impl(3800, &BuilderApiVersion::V1).await?; + let res = submit_block_impl(3800, BuilderApiVersion::V1, EncodingType::Json).await?; assert_eq!(res.status(), StatusCode::OK); let signed_blinded_block = load_test_signed_blinded_block(); @@ -32,7 +34,31 @@ async fn test_submit_block_v1() -> Result<()> { #[tokio::test] async fn test_submit_block_v2() -> Result<()> { - let res = submit_block_impl(3850, &BuilderApiVersion::V2).await?; + let res = submit_block_impl(3850, BuilderApiVersion::V2, EncodingType::Json).await?; + assert_eq!(res.status(), StatusCode::ACCEPTED); + assert_eq!(res.bytes().await?.len(), 0); + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_v1_ssz() -> Result<()> { + let res = submit_block_impl(3810, BuilderApiVersion::V1, EncodingType::Ssz).await?; + assert_eq!(res.status(), StatusCode::OK); + + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_v2_ssz() -> Result<()> { + let res = submit_block_impl(3860, BuilderApiVersion::V2, EncodingType::Ssz).await?; assert_eq!(res.status(), StatusCode::ACCEPTED); assert_eq!(res.bytes().await?.len(), 0); Ok(()) @@ -60,7 +86,9 @@ async fn test_submit_block_too_large() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block_v1(None).await; + let res = mock_validator + .do_submit_block_v1(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .await; // response size exceeds max size: max: 20971520 assert_eq!(res.unwrap().status(), StatusCode::BAD_GATEWAY); @@ -68,7 +96,13 @@ async fn test_submit_block_too_large() -> Result<()> { Ok(()) } -async fn submit_block_impl(pbs_port: u16, api_version: &BuilderApiVersion) -> Result { +async fn submit_block_impl( + pbs_port: u16, + api_version: BuilderApiVersion, + serialization_mode: EncodingType, +) -> Result { + let accept = serialization_mode; + setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); @@ -93,10 +127,24 @@ async fn submit_block_impl(pbs_port: u16, api_version: &BuilderApiVersion) -> Re info!("Sending submit block"); let res = match api_version { BuilderApiVersion::V1 => { - mock_validator.do_submit_block_v1(Some(signed_blinded_block)).await? + mock_validator + .do_submit_block_v1( + Some(signed_blinded_block), + accept, + serialization_mode, + ForkName::Electra, + ) + .await? } BuilderApiVersion::V2 => { - mock_validator.do_submit_block_v2(Some(signed_blinded_block)).await? + mock_validator + .do_submit_block_v2( + Some(signed_blinded_block), + accept, + serialization_mode, + ForkName::Electra, + ) + .await? } }; assert_eq!(mock_state.received_submit_block(), 1);