Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/vss-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ jobs:
cd ldk-node
export TEST_VSS_BASE_URL="http://localhost:8080/vss"
RUSTFLAGS="--cfg vss_test" cargo test io::vss_store
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss
RUST_BACKTRACE=full RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss -- --nocapture
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
log = { version = "0.4.22", default-features = false, features = ["std"]}

vss-client = "0.3"
#vss-client = "0.3"
#vss-client = { path = "../vss-rust-client" }
vss-client = { git = "https://github.com/tnull/vss-rust-client", branch = "2025-08-enable-client-side-delays-0.3.1" }
prost = { version = "0.11.6", default-features = false}

[target.'cfg(windows)'.dependencies]
Expand Down
3 changes: 1 addition & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,7 @@ impl NodeBuilder {

let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();

let vss_store =
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime));
let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider);
build_with_store_internal(
config,
self.chain_data_source_config.as_ref(),
Expand Down
96 changes: 46 additions & 50 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,29 @@ use vss_client::types::{
};
use vss_client::util::key_obfuscator::KeyObfuscator;
use vss_client::util::retry::{
ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy,
MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy,
ExponentialBackoffRetryPolicy, FilteredRetryPolicy, MaxAttemptsRetryPolicy,
MaxTotalDelayRetryPolicy, RetryPolicy,
};
use vss_client::util::storable_builder::{EntropySource, StorableBuilder};

use crate::io::utils::check_namespace_key_validity;
use crate::runtime::Runtime;

type CustomRetryPolicy = FilteredRetryPolicy<
JitteredRetryPolicy<
MaxTotalDelayRetryPolicy<MaxAttemptsRetryPolicy<ExponentialBackoffRetryPolicy<VssError>>>,
>,
MaxTotalDelayRetryPolicy<MaxAttemptsRetryPolicy<ExponentialBackoffRetryPolicy<VssError>>>,
Box<dyn Fn(&VssError) -> bool + 'static + Send + Sync>,
>;

// We set this to a small number of threads that would still allow to make some progress if one
// would hit a blocking case
const INTERNAL_RUNTIME_WORKERS: usize = 2;
const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5);
const VSS_IO_TIMEOUT: Duration = Duration::from_secs(100);

/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
pub struct VssStore {
inner: Arc<VssStoreInner>,
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
// operations aren't sensitive to the order of execution.
next_version: AtomicU64,
runtime: Arc<Runtime>,
// A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
// blocking task to finish while the blocked thread had acquired the reactor. In particular,
// this works around a previously-hit case where a concurrent call to
Expand All @@ -68,7 +64,7 @@ pub struct VssStore {
impl VssStore {
pub(crate) fn new(
base_url: String, store_id: String, vss_seed: [u8; 32],
header_provider: Arc<dyn VssHeaderProvider>, runtime: Arc<Runtime>,
header_provider: Arc<dyn VssHeaderProvider>,
) -> Self {
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
let next_version = AtomicU64::new(1);
Expand All @@ -86,7 +82,7 @@ impl VssStore {
.unwrap(),
);

Self { inner, next_version, runtime, internal_runtime }
Self { inner, next_version, internal_runtime }
}

// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
Expand Down Expand Up @@ -133,13 +129,15 @@ impl KVStoreSync for VssStore {
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::read timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || {
internal_runtime.block_on(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::read timed out";
eprintln!("{}", msg);
Error::new(ErrorKind::Other, msg)
})
})?
})
}

fn write(
Expand Down Expand Up @@ -171,13 +169,15 @@ impl KVStoreSync for VssStore {
};
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::write timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || {
internal_runtime.block_on(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|e| {
let msg = "VssStore::write timed out";
eprintln!("VssStore::write timed out: {:?}", e);
Error::new(ErrorKind::Other, msg)
})
})?
})
}

fn remove(
Expand Down Expand Up @@ -208,13 +208,15 @@ impl KVStoreSync for VssStore {
};
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::remove timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || {
internal_runtime.block_on(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::remove timed out";
eprintln!("{}", msg);
Error::new(ErrorKind::Other, msg)
})
})?
})
}

fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
Expand All @@ -229,13 +231,15 @@ impl KVStoreSync for VssStore {
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::list timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || {
internal_runtime.block_on(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::list timed out";
eprintln!("{}", msg);
Error::new(ErrorKind::Other, msg)
})
})?
})
}
}

Expand Down Expand Up @@ -333,9 +337,8 @@ impl VssStoreInner {
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource);
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
.with_max_attempts(10)
.with_max_total_delay(Duration::from_secs(15))
.with_max_jitter(Duration::from_millis(10))
.with_max_attempts(100)
.with_max_total_delay(VSS_IO_TIMEOUT)
.skip_retry_on_error(Box::new(|e: &VssError| {
matches!(
e,
Expand Down Expand Up @@ -610,7 +613,6 @@ mod tests {

use super::*;
use crate::io::test_utils::do_read_write_remove_list_persist;
use crate::logger::Logger;

#[test]
fn vss_read_write_remove_list_persist() {
Expand All @@ -620,10 +622,7 @@ mod tests {
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);

do_read_write_remove_list_persist(&vss_store);
}
Expand All @@ -636,10 +635,7 @@ mod tests {
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);

do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
Expand Down
74 changes: 38 additions & 36 deletions tests/integration_tests_vss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,44 @@ use ldk_node::Builder;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn channel_full_cycle_with_vss_store() {
let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd();
println!("== Node A ==");
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
let config_a = common::random_config(true);
let mut builder_a = Builder::from_config(config_a.node_config);
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let node_a = builder_a
.build_with_vss_store_and_fixed_headers(
vss_base_url.clone(),
"node_1_store".to_string(),
HashMap::new(),
)
.unwrap();
node_a.start().unwrap();
for i in 1..100 {
println!("Run {}: == Node A ==", i);
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
let config_a = common::random_config(true);
let mut builder_a = Builder::from_config(config_a.node_config);
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let node_a = builder_a
.build_with_vss_store_and_fixed_headers(
vss_base_url.clone(),
format!("node_{}_1_store", i),
HashMap::new(),
)
.unwrap();
node_a.start().unwrap();

println!("\n== Node B ==");
let config_b = common::random_config(true);
let mut builder_b = Builder::from_config(config_b.node_config);
builder_b.set_chain_source_esplora(esplora_url.clone(), None);
let node_b = builder_b
.build_with_vss_store_and_fixed_headers(
vss_base_url,
"node_2_store".to_string(),
HashMap::new(),
)
.unwrap();
node_b.start().unwrap();
println!("\nRun {}: == Node B ==", i);
let config_b = common::random_config(true);
let mut builder_b = Builder::from_config(config_b.node_config);
builder_b.set_chain_source_esplora(esplora_url.clone(), None);
let node_b = builder_b
.build_with_vss_store_and_fixed_headers(
vss_base_url,
format!("node_{}_2_store", i),
HashMap::new(),
)
.unwrap();
node_b.start().unwrap();

common::do_channel_full_cycle(
node_a,
node_b,
&bitcoind.client,
&electrsd.client,
false,
true,
false,
)
.await;
common::do_channel_full_cycle(
node_a,
node_b,
&bitcoind.client,
&electrsd.client,
false,
true,
false,
)
.await;
}
}