From 082de3e23e357d70b2f1e6a9269194416aeda31e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 22 Aug 2025 11:06:53 +0200 Subject: [PATCH 1/8] Supply `data_decryption_key` and `aad` for `StorableBuilder` We bump our `vss-client` dependency to include the changes to the `StorableBuilder` interface. Previously, we the `vss-client` didn't allow to set `ChaCha20Poly1305RFC`'s `aad` field, which had the `tag` not commit to any particular key. This would allow a malicious VSS provider to substitute blobs stored under a different key without the client noticing. Here, we now set the `aad` field to the key under which the `Storable` will be stored, ensuring that the retrieved data was originally stored under the key we expected. We also account for `StorableBuilder` now taking `data_decryption_key` by reference on `build`/`deconstruct`. --- Cargo.toml | 3 ++- src/io/vss_store.rs | 51 ++++++++++++++++++++++++++------------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 544dfca08..a2634e330 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -vss-client = "0.3" +#vss-client = "0.3" +vss-client = { git = "https://github.com/tnull/vss-rust-client", rev = "03ca9d99f70387aabec225020e46434cda8d18ff" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 0e7d0872a..b66cc99e7 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -316,7 +316,7 @@ impl Drop for VssStore { struct VssStoreInner { client: VssClient, store_id: String, - storable_builder: StorableBuilder, + data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. @@ -331,7 +331,6 @@ impl VssStoreInner { let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); - let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource); let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) .with_max_attempts(10) .with_max_total_delay(Duration::from_secs(15)) @@ -347,7 +346,7 @@ impl VssStoreInner { let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); let locks = Mutex::new(HashMap::new()); - Self { client, store_id, storable_builder, key_obfuscator, locks } + Self { client, store_id, data_encryption_key, key_obfuscator, locks } } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -413,9 +412,8 @@ impl VssStoreInner { ) -> io::Result> { check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() }; let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", @@ -437,7 +435,11 @@ impl VssStoreInner { Error::new(ErrorKind::Other, msg) })?; - Ok(self.storable_builder.deconstruct(storable)?.0) + let storable_builder = StorableBuilder::new(RandEntropySource); + let decrypted = storable_builder + .deconstruct(storable, &self.data_encryption_key, store_key.as_bytes())? + .0; + Ok(decrypted) } async fn write_internal( @@ -451,22 +453,27 @@ impl VssStoreInner { "write", )?; - self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let vss_version = -1; - let storable = self.storable_builder.build(buf, vss_version); - let request = PutObjectRequest { - store_id: self.store_id.clone(), - global_version: None, - transaction_items: vec![KeyValue { - key: obfuscated_key, - version: vss_version, - value: storable.encode_to_vec(), - }], - delete_items: vec![], - }; + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let vss_version = -1; + let storable_builder = StorableBuilder::new(RandEntropySource); + let storable = storable_builder.build( + buf.to_vec(), + vss_version, + &self.data_encryption_key, + store_key.as_bytes(), + ); + let request = PutObjectRequest { + store_id: self.store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: store_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { self.client.put_object(&request).await.map_err(|e| { let msg = format!( "Failed to write to key {}/{}/{}: {}", From 046a1048863a864ee184bbe870649daa51a09da5 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 6 Nov 2025 12:22:17 +0100 Subject: [PATCH 2/8] Account for `vss-client` being renamed to `vss-client-ng` --- Cargo.toml | 4 ++-- src/builder.rs | 2 +- src/ffi/types.rs | 2 +- src/io/vss_store.rs | 16 ++++++++-------- src/lib.rs | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a2634e330..5d605c983 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,8 +65,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -#vss-client = "0.3" -vss-client = { git = "https://github.com/tnull/vss-rust-client", rev = "03ca9d99f70387aabec225020e46434cda8d18ff" } +#vss-client-ng = "0.3" +vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "98ac5e171ef1ab970bbc58cd995ffc1e615421d1" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..bed325db6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -39,7 +39,7 @@ use lightning::util::persist::{ use lightning::util::ser::ReadableArgs; use lightning::util::sweep::OutputSweeper; use lightning_persister::fs_store::FilesystemStore; -use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider}; +use vss_client_ng::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider}; use crate::chain::ChainSource; use crate::config::{ diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 3c88a665f..e99cdc230 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -40,7 +40,7 @@ pub use lightning_liquidity::lsps1::msgs::{ }; pub use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; pub use lightning_types::string::UntrustedString; -pub use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError}; +pub use vss_client_ng::headers::{VssHeaderProvider, VssHeaderProviderError}; use crate::builder::sanitize_alias; pub use crate::config::{ diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index b66cc99e7..60f6e5ad6 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -20,19 +20,19 @@ use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::{KVStore, KVStoreSync}; use prost::Message; use rand::RngCore; -use vss_client::client::VssClient; -use vss_client::error::VssError; -use vss_client::headers::VssHeaderProvider; -use vss_client::types::{ +use vss_client_ng::client::VssClient; +use vss_client_ng::error::VssError; +use vss_client_ng::headers::VssHeaderProvider; +use vss_client_ng::types::{ DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest, Storable, }; -use vss_client::util::key_obfuscator::KeyObfuscator; -use vss_client::util::retry::{ +use vss_client_ng::util::key_obfuscator::KeyObfuscator; +use vss_client_ng::util::retry::{ ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy, MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy, }; -use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; +use vss_client_ng::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; use crate::runtime::Runtime; @@ -613,7 +613,7 @@ mod tests { use rand::distr::Alphanumeric; use rand::{rng, Rng, RngCore}; - use vss_client::headers::FixedHeaders; + use vss_client_ng::headers::FixedHeaders; use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; diff --git a/src/lib.rs b/src/lib.rs index 701a14dde..3a4c1df9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,7 +159,7 @@ pub use types::{ }; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, - vss_client, + vss_client_ng, }; use crate::scoring::setup_background_pathfinding_scores_sync; From 690f12a12c4caa582efa9b673594511a6f2dc8de Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 6 Nov 2025 12:30:57 +0100 Subject: [PATCH 3/8] Account for `Retry` being handled internally We recently dropped the `Retry` logic in `vss-client-ng`, and rather now lean on `reqwest`'s retry logic. Here, we account for the corresponding interface changes. --- Cargo.toml | 2 +- src/builder.rs | 7 ++++++- src/io/vss_store.rs | 45 ++++++++++++--------------------------------- 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d605c983..b18b59830 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ serde_json = { version = "1.0.128", default-features = false, features = ["std"] log = { version = "0.4.22", default-features = false, features = ["std"]} #vss-client-ng = "0.3" -vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "98ac5e171ef1ab970bbc58cd995ffc1e615421d1" } +vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "e6d8e57a949bff3cf511aef258b7aa2c681e35c9" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] diff --git a/src/builder.rs b/src/builder.rs index bed325db6..e3109cb9a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -732,7 +732,12 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); + VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)) + .map_err(|e| { + log_error!(logger, "Failed to setup store: {}", e); + BuildError::KVStoreSetupFailed + })?; + build_with_store_internal( config, self.chain_data_source_config.as_ref(), diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 60f6e5ad6..e36527572 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -28,22 +28,11 @@ use vss_client_ng::types::{ Storable, }; use vss_client_ng::util::key_obfuscator::KeyObfuscator; -use vss_client_ng::util::retry::{ - ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy, - MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy, -}; use vss_client_ng::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; use crate::runtime::Runtime; -type CustomRetryPolicy = FilteredRetryPolicy< - JitteredRetryPolicy< - MaxTotalDelayRetryPolicy>>, - >, - Box bool + 'static + Send + Sync>, ->; - // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; @@ -69,8 +58,8 @@ impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], header_provider: Arc, runtime: Arc, - ) -> Self { - let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); + ) -> io::Result { + let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)?); let next_version = AtomicU64::new(1); let internal_runtime = Some( tokio::runtime::Builder::new_multi_thread() @@ -86,7 +75,7 @@ impl VssStore { .unwrap(), ); - Self { inner, next_version, runtime, internal_runtime } + Ok(Self { inner, next_version, runtime, internal_runtime }) } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -314,7 +303,7 @@ impl Drop for VssStore { } struct VssStoreInner { - client: VssClient, + client: VssClient, store_id: String, data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, @@ -327,26 +316,16 @@ impl VssStoreInner { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], header_provider: Arc, - ) -> Self { + ) -> io::Result { let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); - let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) - .with_max_attempts(10) - .with_max_total_delay(Duration::from_secs(15)) - .with_max_jitter(Duration::from_millis(10)) - .skip_retry_on_error(Box::new(|e: &VssError| { - matches!( - e, - VssError::NoSuchKeyError(..) - | VssError::InvalidRequestError(..) - | VssError::ConflictError(..) - ) - }) as _); - - let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); + let client = VssClient::new_with_headers(base_url, header_provider).map_err(|e| { + let msg = format!("Failed to setup VssClient: {}", e); + Error::new(ErrorKind::Other, msg) + })?; let locks = Mutex::new(HashMap::new()); - Self { client, store_id, data_encryption_key, key_obfuscator, locks } + Ok(Self { client, store_id, data_encryption_key, key_obfuscator, locks }) } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -630,7 +609,7 @@ mod tests { let logger = Arc::new(Logger::new_log_facade()); let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); do_read_write_remove_list_persist(&vss_store); } @@ -646,7 +625,7 @@ mod tests { let logger = Arc::new(Logger::new_log_facade()); let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); do_read_write_remove_list_persist(&vss_store); drop(vss_store) From ac502697754f73df60dfd1525a37f137372a14ad Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 3 Nov 2025 13:15:36 +0100 Subject: [PATCH 4/8] Only use internal runtime in `VssStore` We previously attempted to drop the internal runtime from `VssStore`, resulting into blocking behavior. While we recently made changes that improved our situation (having VSS CI pass again pretty reliably), we just ran into yet another case where the VSS CI hung (cf. https://github.com/lightningdevkit/vss-server/actions/runs/19023212819/job/54322173817?pr=59). Here we attempt to restore even more of the original pre- ab3d78d1ecd05a755c836915284e5ca60c65692a / #623 behavior to get rid of the reappearing blocking behavior, i.e., only use the internal runtime in `VssStore`. --- src/builder.rs | 9 +++--- src/io/vss_store.rs | 75 ++++++++++++++++++++++----------------------- 2 files changed, 40 insertions(+), 44 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index e3109cb9a..c3682c3bb 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -732,11 +732,10 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)) - .map_err(|e| { - log_error!(logger, "Failed to setup store: {}", e); - BuildError::KVStoreSetupFailed - })?; + VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| { + log_error!(logger, "Failed to setup store: {}", e); + BuildError::KVStoreSetupFailed + })?; build_with_store_internal( config, diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index e36527572..17a240283 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -31,7 +31,6 @@ use vss_client_ng::util::key_obfuscator::KeyObfuscator; use vss_client_ng::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; -use crate::runtime::Runtime; // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case @@ -44,7 +43,6 @@ pub struct VssStore { // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. next_version: AtomicU64, - runtime: Arc, // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned // blocking task to finish while the blocked thread had acquired the reactor. In particular, // this works around a previously-hit case where a concurrent call to @@ -57,7 +55,7 @@ pub struct VssStore { impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, runtime: Arc, + header_provider: Arc, ) -> io::Result { let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)?); let next_version = AtomicU64::new(1); @@ -75,7 +73,7 @@ impl VssStore { .unwrap(), ); - Ok(Self { inner, next_version, runtime, internal_runtime }) + Ok(Self { inner, next_version, internal_runtime }) } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -122,13 +120,14 @@ impl KVStoreSync for VssStore { async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::read timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::read timed out"; + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn write( @@ -160,13 +159,14 @@ impl KVStoreSync for VssStore { }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::write timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::write timed out"; + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn remove( @@ -197,13 +197,14 @@ impl KVStoreSync for VssStore { }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::remove timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::remove timed out"; + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -218,13 +219,14 @@ impl KVStoreSync for VssStore { let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::list timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::list timed out"; + Error::new(ErrorKind::Other, msg) + }) + })? + }) } } @@ -596,7 +598,6 @@ mod tests { use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; - use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { @@ -606,10 +607,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); } @@ -622,10 +621,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); drop(vss_store) From 4790def992ae0a65e4c0ed0c97c691a5f3b613b3 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 6 Nov 2025 12:39:51 +0100 Subject: [PATCH 5/8] Drop redundant `tokio::timeout`s for VSS IO Now that we rely on `reqwest` v0.12.* retry logic as well as client-side timeouts, we can address the remaining TODOs here and simply drop the redundant `tokio::timeout`s we previously added as a safeguard to blocking tasks (even though in the worst cases we saw they never actually fired). --- src/io/vss_store.rs | 46 ++++----------------------------------------- 1 file changed, 4 insertions(+), 42 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 17a240283..851750cc6 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -13,7 +13,6 @@ use std::panic::RefUnwindSafe; use std::pin::Pin; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::io::{self, Error, ErrorKind}; @@ -35,7 +34,6 @@ use crate::io::utils::check_namespace_key_validity; // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; -const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { @@ -118,16 +116,7 @@ impl KVStoreSync for VssStore { let inner = Arc::clone(&self.inner); let fut = async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - tokio::task::block_in_place(move || { - internal_runtime.block_on(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::read timed out"; - Error::new(ErrorKind::Other, msg) - }) - })? - }) + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn write( @@ -157,16 +146,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - tokio::task::block_in_place(move || { - internal_runtime.block_on(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::write timed out"; - Error::new(ErrorKind::Other, msg) - }) - })? - }) + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn remove( @@ -195,16 +175,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - tokio::task::block_in_place(move || { - internal_runtime.block_on(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::remove timed out"; - Error::new(ErrorKind::Other, msg) - }) - })? - }) + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -217,16 +188,7 @@ impl KVStoreSync for VssStore { let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - tokio::task::block_in_place(move || { - internal_runtime.block_on(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::list timed out"; - Error::new(ErrorKind::Other, msg) - }) - })? - }) + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } } From ba9ba5beda9e278ba26e6aea289991369bd0a8eb Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 3 Nov 2025 14:44:23 +0100 Subject: [PATCH 6/8] DROPME: Run test 100 times --- .github/workflows/vss-integration.yml | 2 +- tests/integration_tests_vss.rs | 74 ++++++++++++++------------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 8473ed413..043f91d8f 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -45,4 +45,4 @@ jobs: cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" RUSTFLAGS="--cfg vss_test" cargo test io::vss_store - RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss -- --nocapture diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 93f167dae..a9c397520 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -16,42 +16,44 @@ use ldk_node::Builder; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); - println!("== Node A =="); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = common::random_config(true); - let mut builder_a = Builder::from_config(config_a.node_config); - builder_a.set_chain_source_esplora(esplora_url.clone(), None); - let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); - let node_a = builder_a - .build_with_vss_store_and_fixed_headers( - vss_base_url.clone(), - "node_1_store".to_string(), - HashMap::new(), - ) - .unwrap(); - node_a.start().unwrap(); + for i in 1..100 { + println!("Run {}: == Node A ==", i); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let config_a = common::random_config(true); + let mut builder_a = Builder::from_config(config_a.node_config); + builder_a.set_chain_source_esplora(esplora_url.clone(), None); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let node_a = builder_a + .build_with_vss_store_and_fixed_headers( + vss_base_url.clone(), + format!("node_{}_1_store", i), + HashMap::new(), + ) + .unwrap(); + node_a.start().unwrap(); - println!("\n== Node B =="); - let config_b = common::random_config(true); - let mut builder_b = Builder::from_config(config_b.node_config); - builder_b.set_chain_source_esplora(esplora_url.clone(), None); - let node_b = builder_b - .build_with_vss_store_and_fixed_headers( - vss_base_url, - "node_2_store".to_string(), - HashMap::new(), - ) - .unwrap(); - node_b.start().unwrap(); + println!("\nRun {}: == Node B ==", i); + let config_b = common::random_config(true); + let mut builder_b = Builder::from_config(config_b.node_config); + builder_b.set_chain_source_esplora(esplora_url.clone(), None); + let node_b = builder_b + .build_with_vss_store_and_fixed_headers( + vss_base_url, + format!("node_{}_2_store", i), + HashMap::new(), + ) + .unwrap(); + node_b.start().unwrap(); - common::do_channel_full_cycle( - node_a, - node_b, - &bitcoind.client, - &electrsd.client, - false, - true, - false, - ) - .await; + common::do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + false, + ) + .await; + } } From 189fd6d8f94c162abaa178f08999a3ca38417a31 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 6 Nov 2025 14:10:31 +0100 Subject: [PATCH 7/8] Bump retries and timeouts considerably As LDK currently will still panic and crash if we'd ever fail a `write` operation, we here considerably bump the defaults set in `VssClient` to at least make this less likely. Longer term, we still hope to mitigate the issue by moving to the async-persist flow. --- Cargo.toml | 2 +- src/io/vss_store.rs | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b18b59830..9ed1d076c 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ serde_json = { version = "1.0.128", default-features = false, features = ["std"] log = { version = "0.4.22", default-features = false, features = ["std"]} #vss-client-ng = "0.3" -vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "e6d8e57a949bff3cf511aef258b7aa2c681e35c9" } +vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "7cf661b4ba45983ecad0f59e6d74050e2c84212f" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 851750cc6..ac45c476d 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -35,6 +35,9 @@ use crate::io::utils::check_namespace_key_validity; // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; +const HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); +const HTTP_RETRIES: u32 = 10; + /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { inner: Arc, @@ -284,10 +287,11 @@ impl VssStoreInner { let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); - let client = VssClient::new_with_headers(base_url, header_provider).map_err(|e| { - let msg = format!("Failed to setup VssClient: {}", e); + let reqwest_client = build_client(&base_url).map_err(|_| { + let msg = format!("Failed to setup HTTP client: invalid URL"); Error::new(ErrorKind::Other, msg) })?; + let client = VssClient::from_client_and_headers(base_url, reqwest_client, header_provider); let locks = Mutex::new(HashMap::new()); Ok(Self { client, store_id, data_encryption_key, key_obfuscator, locks }) } @@ -524,6 +528,32 @@ impl VssStoreInner { } } +fn build_client(base_url: &str) -> Result { + let url = reqwest::Url::parse(base_url).map_err(|_| ())?; + let host_str = url.host_str().ok_or(())?.to_string(); + let retry = reqwest::retry::for_host(host_str) + .max_retries_per_request(HTTP_RETRIES) + .classify_fn(|req_rep| match req_rep.status() { + // VSS uses INTERNAL_SERVER_ERROR when sending back error repsonses. These are + // currently still covered by our `RetryPolicy`, so we tell `reqwest` not to retry them. + Some(reqwest::StatusCode::INTERNAL_SERVER_ERROR) => req_rep.success(), + Some(reqwest::StatusCode::BAD_REQUEST) => req_rep.success(), + Some(reqwest::StatusCode::UNAUTHORIZED) => req_rep.success(), + Some(reqwest::StatusCode::NOT_FOUND) => req_rep.success(), + Some(reqwest::StatusCode::CONFLICT) => req_rep.success(), + Some(reqwest::StatusCode::OK) => req_rep.success(), + _ => req_rep.retryable(), + }); + let client = reqwest::Client::builder() + .timeout(HTTP_TIMEOUT) + .connect_timeout(HTTP_TIMEOUT) + .read_timeout(HTTP_TIMEOUT) + .retry(retry) + .build() + .unwrap(); + Ok(client) +} + fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) { let hkdf = |initial_key_material: &[u8], salt: &[u8]| -> [u8; 32] { let mut engine = HmacEngine::::new(salt); From 96db24dd916079d67864c714189253faba548cc0 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 7 Nov 2025 18:21:19 +0100 Subject: [PATCH 8/8] TRY --- src/lib.rs | 166 ++++++++++++++++++++++++++--------------------------- 1 file changed, 83 insertions(+), 83 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3a4c1df9e..742470443 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -423,89 +423,89 @@ impl Node { } }); - // Regularly broadcast node announcements. - let bcast_cm = Arc::clone(&self.channel_manager); - let bcast_pm = Arc::clone(&self.peer_manager); - let bcast_config = Arc::clone(&self.config); - let bcast_store = Arc::clone(&self.kv_store); - let bcast_logger = Arc::clone(&self.logger); - let bcast_node_metrics = Arc::clone(&self.node_metrics); - let mut stop_bcast = self.stop_sender.subscribe(); - let node_alias = self.config.node_alias.clone(); - if may_announce_channel(&self.config).is_ok() { - self.runtime.spawn_cancellable_background_task(async move { - // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. - #[cfg(not(test))] - let mut interval = tokio::time::interval(Duration::from_secs(30)); - #[cfg(test)] - let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - tokio::select! { - _ = stop_bcast.changed() => { - log_debug!( - bcast_logger, - "Stopping broadcasting node announcements.", - ); - return; - } - _ = interval.tick() => { - let skip_broadcast = match bcast_node_metrics.read().unwrap().latest_node_announcement_broadcast_timestamp { - Some(latest_bcast_time_secs) => { - // Skip if the time hasn't elapsed yet. - let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; - next_bcast_unix_time.elapsed().is_err() - } - None => { - // Don't skip if we haven't broadcasted before. - false - } - }; - - if skip_broadcast { - continue; - } - - if !bcast_cm.list_channels().iter().any(|chan| chan.is_announced && chan.is_channel_ready) { - // Skip if we don't have any public channels that are ready. - continue; - } - - if bcast_pm.list_peers().is_empty() { - // Skip if we don't have any connected peers to gossip to. - continue; - } - - let addresses = if let Some(announcement_addresses) = bcast_config.announcement_addresses.clone() { - announcement_addresses - } else if let Some(listening_addresses) = bcast_config.listening_addresses.clone() { - listening_addresses - } else { - debug_assert!(false, "We checked whether the node may announce, so listening addresses should always be set"); - continue; - }; - - if let Some(node_alias) = node_alias.as_ref() { - bcast_pm.broadcast_node_announcement([0; 3], node_alias.0, addresses); - - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); - locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; - write_node_metrics(&*locked_node_metrics, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) - .unwrap_or_else(|e| { - log_error!(bcast_logger, "Persistence failed: {}", e); - }); - } - } else { - debug_assert!(false, "We checked whether the node may announce, so node alias should always be set"); - continue - } - } - } - } - }); - } + //// Regularly broadcast node announcements. + //let bcast_cm = Arc::clone(&self.channel_manager); + //let bcast_pm = Arc::clone(&self.peer_manager); + //let bcast_config = Arc::clone(&self.config); + //let bcast_store = Arc::clone(&self.kv_store); + //let bcast_logger = Arc::clone(&self.logger); + //let bcast_node_metrics = Arc::clone(&self.node_metrics); + //let mut stop_bcast = self.stop_sender.subscribe(); + //let node_alias = self.config.node_alias.clone(); + //if may_announce_channel(&self.config).is_ok() { + // self.runtime.spawn_cancellable_background_task(async move { + // // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. + // #[cfg(not(test))] + // let mut interval = tokio::time::interval(Duration::from_secs(30)); + // #[cfg(test)] + // let mut interval = tokio::time::interval(Duration::from_secs(5)); + // loop { + // tokio::select! { + // _ = stop_bcast.changed() => { + // log_debug!( + // bcast_logger, + // "Stopping broadcasting node announcements.", + // ); + // return; + // } + // _ = interval.tick() => { + // let skip_broadcast = match bcast_node_metrics.read().unwrap().latest_node_announcement_broadcast_timestamp { + // Some(latest_bcast_time_secs) => { + // // Skip if the time hasn't elapsed yet. + // let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; + // next_bcast_unix_time.elapsed().is_err() + // } + // None => { + // // Don't skip if we haven't broadcasted before. + // false + // } + // }; + + // if skip_broadcast { + // continue; + // } + + // if !bcast_cm.list_channels().iter().any(|chan| chan.is_announced && chan.is_channel_ready) { + // // Skip if we don't have any public channels that are ready. + // continue; + // } + + // if bcast_pm.list_peers().is_empty() { + // // Skip if we don't have any connected peers to gossip to. + // continue; + // } + + // let addresses = if let Some(announcement_addresses) = bcast_config.announcement_addresses.clone() { + // announcement_addresses + // } else if let Some(listening_addresses) = bcast_config.listening_addresses.clone() { + // listening_addresses + // } else { + // debug_assert!(false, "We checked whether the node may announce, so listening addresses should always be set"); + // continue; + // }; + + // //if let Some(node_alias) = node_alias.as_ref() { + // // bcast_pm.broadcast_node_announcement([0; 3], node_alias.0, addresses); + + // // let unix_time_secs_opt = + // // SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + // // { + // // let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); + // // locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; + // // write_node_metrics(&*locked_node_metrics, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + // // .unwrap_or_else(|e| { + // // log_error!(bcast_logger, "Persistence failed: {}", e); + // // }); + // // } + // //} else { + // // debug_assert!(false, "We checked whether the node may announce, so node alias should always be set"); + // // continue + // //} + // } + // } + // } + // }); + //} let stop_tx_bcast = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source);