Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ ethereum_ssz_derive = "0.9"
eyre = "0.6.12"
futures = "0.3.30"
headers = "0.4.0"
headers-accept = "0.2.1"
indexmap = "2.2.6"
jsonwebtoken = { version = "9.3.1", default-features = false }
lazy_static = "1.5.0"
lh_eth2 = { package = "eth2", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" }
lh_eth2_keystore = { package = "eth2_keystore", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" }
lh_types = { package = "types", git = "https://github.com/sigp/lighthouse", tag = "v8.0.0-rc.0" }
mediatype = "0.20.0"
parking_lot = "0.12.3"
pbkdf2 = "0.12.2"
prometheus = "0.14.0"
Expand Down
4 changes: 2 additions & 2 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ extra_validation_enabled = false
# OPTIONAL
# rpc_url = "https://ethereum-holesky-rpc.publicnode.com"
# URL of the SSV API server to use, if you have a mux that targets an SSV node operator
# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4"
# ssv_api_url = "https://api.ssv.network/api/v4"
# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4/"
# ssv_api_url = "https://api.ssv.network/api/v4/"
# Timeout for any HTTP requests sent from the PBS module to other services, in seconds
# OPTIONAL, DEFAULT: 10
http_timeout_seconds = 10
Expand Down
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
eyre.workspace = true
futures.workspace = true
headers-accept.workspace = true
jsonwebtoken.workspace = true
lh_eth2.workspace = true
lh_eth2_keystore.workspace = true
lh_types.workspace = true
mediatype.workspace = true
pbkdf2.workspace = true
rand.workspace = true
rayon.workspace = true
Expand Down
184 changes: 180 additions & 4 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
#[cfg(feature = "testing-flags")]
use std::cell::Cell;
use std::{
fmt::Display,
net::Ipv4Addr,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

use alloy::{hex, primitives::U256};
use axum::http::HeaderValue;
use axum::{
extract::{FromRequest, Request},
http::HeaderValue,
response::{IntoResponse, Response as AxumResponse},
};
use bytes::Bytes;
use futures::StreamExt;
use lh_types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use headers_accept::Accept;
pub use lh_types::ForkName;
use lh_types::{
BeaconBlock,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
};
use rand::{Rng, distr::Alphanumeric};
use reqwest::{Response, header::HeaderMap};
use reqwest::{
Response,
header::{ACCEPT, CONTENT_TYPE, HeaderMap},
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use ssz::{Decode, Encode};
Expand All @@ -26,11 +41,16 @@ use tracing_subscriber::{
use crate::{
config::LogsSettings,
constants::SIGNER_JWT_EXPIRATION,
pbs::HEADER_VERSION_VALUE,
pbs::{HEADER_VERSION_VALUE, SignedBlindedBeaconBlock},
types::{BlsPublicKey, Chain, Jwt, JwtClaims, ModuleId},
};

const APPLICATION_JSON: &str = "application/json";
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const WILDCARD: &str = "*/*";

const MILLIS_PER_SECOND: u64 = 1_000;
pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version";

#[derive(Debug, Error)]
pub enum ResponseReadError {
Expand Down Expand Up @@ -409,6 +429,162 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<Head
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?)
}

/// Parse the ACCEPT header to get the type of response to encode the body with,
/// defaulting to JSON if missing. Returns an error if malformed or unsupported
/// types are requested. Supports requests with multiple ACCEPT headers or
/// headers with multiple media types.
pub fn get_accept_type(req_headers: &HeaderMap) -> eyre::Result<EncodingType> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a few unit tests for this function?

let accept = Accept::from_str(
req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or(APPLICATION_JSON),
)
.map_err(|e| eyre::eyre!("invalid accept header: {e}"))?;

if accept.media_types().count() == 0 {
// No valid media types found, default to JSON
return Ok(EncodingType::Json);
}

// Get the SSZ and JSON media types if present
let mut ssz_type = false;
let mut json_type = false;
let mut unsupported_type = false;
accept.media_types().for_each(|mt| match mt.essence().to_string().as_str() {
APPLICATION_OCTET_STREAM => ssz_type = true,
APPLICATION_JSON | WILDCARD => json_type = true,
_ => unsupported_type = true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would rather default to json here instead just in case?

});

// If SSZ is present, prioritize it
if ssz_type {
return Ok(EncodingType::Ssz);
}
// If there aren't any unsupported types, use JSON
if !unsupported_type {
return Ok(EncodingType::Json);
}
Err(eyre::eyre!("unsupported accept type"))
}

/// Parse CONTENT TYPE header to get the encoding type of the body, defaulting
/// to JSON if missing or malformed.
pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType {
EncodingType::from_str(
req_headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or(APPLICATION_JSON),
)
.unwrap_or(EncodingType::Json)
}

/// Parse CONSENSUS_VERSION header
pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option<ForkName> {
ForkName::from_str(
req_headers
.get(CONSENSUS_VERSION_HEADER)
.and_then(|value| value.to_str().ok())
.unwrap_or(""),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if missing should we default to the current fork instead?

)
.ok()
}

/// Enum for types that can be used to encode incoming request bodies or
/// outgoing response bodies
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum EncodingType {
/// Body is UTF-8 encoded as JSON
Json,

/// Body is raw bytes representing an SSZ object
Ssz,
}

impl std::fmt::Display for EncodingType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EncodingType::Json => write!(f, "application/json"),
EncodingType::Ssz => write!(f, "application/octet-stream"),
}
}
}

impl FromStr for EncodingType {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we match on lowercase string?

"application/json" | "" => Ok(EncodingType::Json),
"application/octet-stream" => Ok(EncodingType::Ssz),
_ => Err(format!("unsupported encoding type: {value}")),
}
}
}

pub enum BodyDeserializeError {
SerdeJsonError(serde_json::Error),
SszDecodeError(ssz::DecodeError),
UnsupportedMediaType,
MissingVersionHeader,
}

impl Display for BodyDeserializeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BodyDeserializeError::SerdeJsonError(e) => write!(f, "JSON deserialization error: {e}"),
BodyDeserializeError::SszDecodeError(e) => {
write!(f, "SSZ deserialization error: {e:?}")
}
BodyDeserializeError::UnsupportedMediaType => write!(f, "unsupported media type"),
BodyDeserializeError::MissingVersionHeader => {
write!(f, "missing consensus version header")
}
}
}
}

pub async fn deserialize_body(
headers: &HeaderMap,
body: Bytes,
) -> Result<SignedBlindedBeaconBlock, BodyDeserializeError> {
if headers.contains_key(CONTENT_TYPE) {
return match get_content_type(headers) {
EncodingType::Json => serde_json::from_slice::<SignedBlindedBeaconBlock>(&body)
.map_err(BodyDeserializeError::SerdeJsonError),
EncodingType::Ssz => {
// Get the version header
match get_consensus_version_header(headers) {
Some(version) => {
SignedBlindedBeaconBlock::from_ssz_bytes_with(&body, |bytes| {
BeaconBlock::from_ssz_bytes_for_fork(bytes, version)
})
.map_err(BodyDeserializeError::SszDecodeError)
}
None => Err(BodyDeserializeError::MissingVersionHeader),
}
}
};
}

Err(BodyDeserializeError::UnsupportedMediaType)
}

#[must_use]
#[derive(Debug, Clone, Default)]
pub struct RawRequest {
pub body_bytes: Bytes,
}

impl<S> FromRequest<S> for RawRequest
where
S: Send + Sync,
{
type Rejection = AxumResponse;

async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?;
Ok(Self { body_bytes: bytes })
}
}

#[cfg(unix)]
pub async fn wait_for_signal() -> eyre::Result<()> {
use tokio::signal::unix::{SignalKind, signal};
Expand Down
1 change: 1 addition & 0 deletions crates/pbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ axum.workspace = true
axum-extra.workspace = true
cb-common.workspace = true
cb-metrics.workspace = true
ethereum_ssz.workspace = true
eyre.workspace = true
futures.workspace = true
lazy_static.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/pbs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub enum PbsClientError {
NoResponse,
NoPayload,
Internal,
DecodeError(String),
}

impl PbsClientError {
Expand All @@ -14,6 +15,7 @@ impl PbsClientError {
PbsClientError::NoResponse => StatusCode::BAD_GATEWAY,
PbsClientError::NoPayload => StatusCode::BAD_GATEWAY,
PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR,
PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST,
}
}
}
Expand All @@ -24,6 +26,7 @@ impl IntoResponse for PbsClientError {
PbsClientError::NoResponse => "no response from relays".to_string(),
PbsClientError::NoPayload => "no payload from relays".to_string(),
PbsClientError::Internal => "internal server error".to_string(),
PbsClientError::DecodeError(e) => format!("error decoding request: {e}"),
};

(self.status_code(), msg).into_response()
Expand Down
Loading
Loading