diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index 094b9a21ef..d30215c4f1 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -142,6 +142,66 @@ impl InstanceState { impl SelfInstanceBuilder for InstanceState {} +/// Helper module for acquiring permits from the outbound connections semaphore. +/// +/// This is used by the outbound HTTP implementations to limit concurrent outbound connections. +mod concurrent_outbound_connections { + use super::*; + + /// Acquires a semaphore permit for the given interface, if a semaphore is configured. + pub async fn acquire_semaphore<'a>( + interface: &str, + semaphore: &'a Option>, + ) -> Option> { + let s = semaphore.as_ref()?; + acquire(interface, || s.try_acquire(), async || s.acquire().await).await + } + + /// Acquires an owned semaphore permit for the given interface, if a semaphore is configured. + pub async fn acquire_owned_semaphore( + interface: &str, + semaphore: &Option>, + ) -> Option { + let s = semaphore.as_ref()?; + acquire( + interface, + || s.clone().try_acquire_owned(), + async || s.clone().acquire_owned().await, + ) + .await + } + + /// Helper function to acquire a semaphore permit, either immediately or by waiting. + /// + /// Allows getting either a borrowed or owned permit. + async fn acquire( + interface: &str, + try_acquire: impl Fn() -> Result, + acquire: impl AsyncFnOnce() -> Result, + ) -> Option { + // Try to acquire a permit without waiting first + // Keep track of whether we had to wait for metrics purposes. + let mut waited = false; + let permit = match try_acquire() { + Ok(p) => Ok(p), + // No available permits right now; wait for one + Err(tokio::sync::TryAcquireError::NoPermits) => { + waited = true; + acquire().await.map_err(|_| ()) + } + Err(_) => Err(()), + }; + if permit.is_ok() { + spin_telemetry::monotonic_counter!( + outbound_http.concurrent_connection_permits_acquired = 1, + interface = interface, + waited = waited + ); + } + permit.ok() + } +} + pub type Request = http::Request; pub type Response = http::Response; diff --git a/crates/factor-outbound-http/src/spin.rs b/crates/factor-outbound-http/src/spin.rs index 3eb9408320..08d725632c 100644 --- a/crates/factor-outbound-http/src/spin.rs +++ b/crates/factor-outbound-http/src/spin.rs @@ -104,10 +104,11 @@ impl spin_http::Host for crate::InstanceState { // If we're limiting concurrent outbound requests, acquire a permit // Note: since we don't have access to the underlying connection, we can only // limit the number of concurrent requests, not connections. - let permit = match &self.concurrent_outbound_connections_semaphore { - Some(s) => s.acquire().await.ok(), - None => None, - }; + let permit = crate::concurrent_outbound_connections::acquire_semaphore( + "spin", + &self.concurrent_outbound_connections_semaphore, + ) + .await; let resp = client.execute(req).await.map_err(log_reqwest_error)?; drop(permit); diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index ef8ec62a7d..15ae413d80 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -590,10 +590,12 @@ impl ConnectOptions { crate::remove_blocked_addrs(&self.blocked_networks, &mut socket_addrs)?; // If we're limiting concurrent outbound requests, acquire a permit - let permit = match &self.concurrent_outbound_connections_semaphore { - Some(s) => s.clone().acquire_owned().await.ok(), - None => None, - }; + + let permit = crate::concurrent_outbound_connections::acquire_owned_semaphore( + "wasi", + &self.concurrent_outbound_connections_semaphore, + ) + .await; let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs)) .await diff --git a/crates/telemetry/src/metrics.rs b/crates/telemetry/src/metrics.rs index 2dc2ed811c..e01e90c70c 100644 --- a/crates/telemetry/src/metrics.rs +++ b/crates/telemetry/src/metrics.rs @@ -61,6 +61,8 @@ pub(crate) fn otel_metrics_layer LookupSpan<'span>>( /// /// The increment may only be an i64 or f64. You must not mix types for the same metric. /// +/// Takes advantage of counter support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html). +/// /// ```no_run /// # use spin_telemetry::metrics::counter; /// counter!(spin.metric_name = 1, metric_attribute = "value"); @@ -76,6 +78,8 @@ macro_rules! counter { /// /// The increment may only be an i64 or f64. You must not mix types for the same metric. /// +/// Takes advantage of histogram support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html). +/// /// ```no_run /// # use spin_telemetry::metrics::histogram; /// histogram!(spin.metric_name = 1.5, metric_attribute = "value"); @@ -91,6 +95,8 @@ macro_rules! histogram { /// /// The increment may only be a positive i64 or f64. You must not mix types for the same metric. /// +/// Takes advantage of monotonic counter support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html). +/// /// ```no_run /// # use spin_telemetry::metrics::monotonic_counter; /// monotonic_counter!(spin.metric_name = 1, metric_attribute = "value"); @@ -101,6 +107,23 @@ macro_rules! monotonic_counter { } } +#[macro_export] +/// Records an increment to the named monotonic counter with the given attributes. +/// +/// The increment may only be a positive i64 or f64. You must not mix types for the same metric. +/// +/// Takes advantage of gauge support in [tracing-opentelemetry](https://docs.rs/tracing-opentelemetry/0.32.0/tracing_opentelemetry/struct.MetricsLayer.html). +/// +/// ```no_run +/// # use spin_telemetry::metrics::gauge; +/// gauge!(spin.metric_name = 1, metric_attribute = "value"); +/// ``` +macro_rules! gauge { + ($metric:ident $(. $suffixes:ident)* = $metric_value:expr $(, $attrs:ident=$values:expr)*) => { + tracing::trace!(gauge.$metric $(. $suffixes)* = $metric_value $(, $attrs=$values)*); + } +} + pub use counter; pub use histogram; pub use monotonic_counter;