diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 8473ed413..7a167f45c 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -45,4 +45,4 @@ jobs: cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" RUSTFLAGS="--cfg vss_test" cargo test io::vss_store - RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + RUST_BACKTRACE=full RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss -- --nocapture diff --git a/Cargo.toml b/Cargo.toml index 701d9ddb3..d6faf6137 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,7 +101,9 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -vss-client = "0.3" +#vss-client = "0.3" +#vss-client = { path = "../vss-rust-client" } +vss-client = { git = "https://github.com/tnull/vss-rust-client", branch = "2025-08-enable-client-side-delays-0.3.1" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..59f5b9b46 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -731,8 +731,7 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); - let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); + let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider); build_with_store_internal( config, self.chain_data_source_config.as_ref(), diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 028eb87e4..01494cb5b 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -29,25 +29,22 @@ use vss_client::types::{ }; use vss_client::util::key_obfuscator::KeyObfuscator; use vss_client::util::retry::{ - ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy, - MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy, + ExponentialBackoffRetryPolicy, FilteredRetryPolicy, MaxAttemptsRetryPolicy, + MaxTotalDelayRetryPolicy, RetryPolicy, }; use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; -use crate::runtime::Runtime; type CustomRetryPolicy = FilteredRetryPolicy< - JitteredRetryPolicy< - MaxTotalDelayRetryPolicy>>, - >, + MaxTotalDelayRetryPolicy>>, Box bool + 'static + Send + Sync>, >; // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; -const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); +const VSS_IO_TIMEOUT: Duration = Duration::from_secs(100); /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { @@ -55,7 +52,6 @@ pub struct VssStore { // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. next_version: AtomicU64, - runtime: Arc, // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned // blocking task to finish while the blocked thread had acquired the reactor. In particular, // this works around a previously-hit case where a concurrent call to @@ -68,7 +64,7 @@ pub struct VssStore { impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, runtime: Arc, + header_provider: Arc, ) -> Self { let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); let next_version = AtomicU64::new(1); @@ -86,7 +82,7 @@ impl VssStore { .unwrap(), ); - Self { inner, next_version, runtime, internal_runtime } + Self { inner, next_version, internal_runtime } } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -133,13 +129,15 @@ impl KVStoreSync for VssStore { async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::read timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::read timed out"; + eprintln!("{}", msg); + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn write( @@ -171,13 +169,15 @@ impl KVStoreSync for VssStore { }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::write timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|e| { + let msg = "VssStore::write timed out"; + eprintln!("VssStore::write timed out: {:?}", e); + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn remove( @@ -208,13 +208,15 @@ impl KVStoreSync for VssStore { }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::remove timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::remove timed out"; + eprintln!("{}", msg); + Error::new(ErrorKind::Other, msg) + }) + })? + }) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -229,13 +231,15 @@ impl KVStoreSync for VssStore { let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::list timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || { + internal_runtime.block_on(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::list timed out"; + eprintln!("{}", msg); + Error::new(ErrorKind::Other, msg) + }) + })? + }) } } @@ -333,9 +337,8 @@ impl VssStoreInner { let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource); let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) - .with_max_attempts(10) - .with_max_total_delay(Duration::from_secs(15)) - .with_max_jitter(Duration::from_millis(10)) + .with_max_attempts(100) + .with_max_total_delay(VSS_IO_TIMEOUT) .skip_retry_on_error(Box::new(|e: &VssError| { matches!( e, @@ -610,7 +613,6 @@ mod tests { use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; - use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { @@ -620,10 +622,7 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); - let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider); do_read_write_remove_list_persist(&vss_store); } @@ -636,10 +635,7 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); - let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider); do_read_write_remove_list_persist(&vss_store); drop(vss_store) diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 93f167dae..a9c397520 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -16,42 +16,44 @@ use ldk_node::Builder; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); - println!("== Node A =="); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = common::random_config(true); - let mut builder_a = Builder::from_config(config_a.node_config); - builder_a.set_chain_source_esplora(esplora_url.clone(), None); - let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); - let node_a = builder_a - .build_with_vss_store_and_fixed_headers( - vss_base_url.clone(), - "node_1_store".to_string(), - HashMap::new(), - ) - .unwrap(); - node_a.start().unwrap(); + for i in 1..100 { + println!("Run {}: == Node A ==", i); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let config_a = common::random_config(true); + let mut builder_a = Builder::from_config(config_a.node_config); + builder_a.set_chain_source_esplora(esplora_url.clone(), None); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let node_a = builder_a + .build_with_vss_store_and_fixed_headers( + vss_base_url.clone(), + format!("node_{}_1_store", i), + HashMap::new(), + ) + .unwrap(); + node_a.start().unwrap(); - println!("\n== Node B =="); - let config_b = common::random_config(true); - let mut builder_b = Builder::from_config(config_b.node_config); - builder_b.set_chain_source_esplora(esplora_url.clone(), None); - let node_b = builder_b - .build_with_vss_store_and_fixed_headers( - vss_base_url, - "node_2_store".to_string(), - HashMap::new(), - ) - .unwrap(); - node_b.start().unwrap(); + println!("\nRun {}: == Node B ==", i); + let config_b = common::random_config(true); + let mut builder_b = Builder::from_config(config_b.node_config); + builder_b.set_chain_source_esplora(esplora_url.clone(), None); + let node_b = builder_b + .build_with_vss_store_and_fixed_headers( + vss_base_url, + format!("node_{}_2_store", i), + HashMap::new(), + ) + .unwrap(); + node_b.start().unwrap(); - common::do_channel_full_cycle( - node_a, - node_b, - &bitcoind.client, - &electrsd.client, - false, - true, - false, - ) - .await; + common::do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + false, + ) + .await; + } }