From ec8bfb3edbb73001e18c37263fe44c7c5fac423c Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 6 Oct 2025 08:04:45 +0100 Subject: [PATCH 1/3] feat: introduce and configure node with tiered KVStore Introduces TierStore, a KVStore implementation that manages data across three storage layers: - Primary: Main/remote data store - Ephemeral: Secondary store for non-critical, easily-rebuildable data (e.g., network graph) with fast local access - Backup: Tertiary store for disaster recovery with async/lazy operations to avoid blocking primary store Adds four configuration methods to NodeBuilder: - set_tier_store_backup: Configure backup data store - set_tier_store_ephemeral: Configure ephemeral data store - set_tier_store_retry_config: Configure retry parameters with exponential backoff - build_with_tier_store: Build node with primary data store These methods are exposed to the foreign interface via additions in ffi/types.rs: - ffi::SyncAndAsyncKVStore: Composed of KVStore and KVStoreSync methods to handle the types::SyncAndAsyncKVStore supertrait across FFI - ffi::ForeignKVStoreAdapter and ffi::DynStore: Adapt/translate between foreign language store and native Rust store - Conditional compilation for DynStore: ffi::DynStore with uniffi, types::DynStore without, with selection aided by the wrap_store!() macro --- Cargo.toml | 1 + bindings/ldk_node.udl | 61 + src/builder.rs | 242 +++- src/chain/bitcoind.rs | 4 +- src/chain/electrum.rs | 3 +- src/chain/esplora.rs | 4 +- src/chain/mod.rs | 4 +- src/data_store.rs | 7 +- src/event.rs | 9 +- src/ffi/mod.rs | 16 + src/ffi/types.rs | 326 +++++ src/io/mod.rs | 1 + src/io/tier_store.rs | 1270 +++++++++++++++++ src/io/utils.rs | 17 +- src/lib.rs | 13 +- src/liquidity.rs | 4 +- .../asynchronous/static_invoice_store.rs | 6 +- src/peer_store.rs | 6 +- src/types.rs | 3 + src/wallet/persist.rs | 3 +- tests/common/mod.rs | 7 +- tests/integration_tests_rust.rs | 12 +- 22 files changed, 1970 insertions(+), 49 deletions(-) create mode 100644 src/io/tier_store.rs diff --git a/Cargo.toml b/Cargo.toml index 701d9ddb3..3cf3e824d 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,7 @@ bitcoin = "0.32.7" bip39 = "2.0.0" bip21 = { version = "0.5", features = ["std"], default-features = false } +async-trait = {version = "0.1.89"} base64 = { version = "0.22.1", default-features = false, features = ["std"] } rand = "0.8.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index ab2f483a1..b0f98a241 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -46,6 +46,12 @@ dictionary LSPS2ServiceConfig { u64 max_payment_size_msat; }; +dictionary RetryConfig { + u16 initial_retry_delay_ms; + u16 maximum_delay_secs; + f32 backoff_multiplier; +}; + enum LogLevel { "Gossip", "Trace", @@ -67,6 +73,56 @@ interface LogWriter { void log(LogRecord record); }; +interface DynStore { + [Name=from_store] + constructor(SyncAndAsyncKVStore store); +}; + +[Trait, WithForeign] +interface SyncAndAsyncKVStore { + // KVStoreSync versions + [Throws=IOError] + sequence read_sync(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError] + void write_sync(string primary_namespace, string secondary_namespace, string key, sequence buf); + [Throws=IOError] + void remove_sync(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError] + sequence list_sync(string primary_namespace, string secondary_namespace); + + // KVStore versions + [Throws=IOError, Async] + sequence read_async(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError, Async] + void write_async(string primary_namespace, string secondary_namespace, string key, sequence buf); + [Throws=IOError, Async] + void remove_async(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError, Async] + sequence list_async(string primary_namespace, string secondary_namespace); +}; + +[Error] +enum IOError { + "NotFound", + "PermissionDenied", + "ConnectionRefused", + "ConnectionReset", + "ConnectionAborted", + "NotConnected", + "AddrInUse", + "AddrNotAvailable", + "BrokenPipe", + "AlreadyExists", + "WouldBlock", + "InvalidInput", + "InvalidData", + "TimedOut", + "WriteZero", + "Interrupted", + "UnexpectedEof", + "Other", +}; + interface Builder { constructor(); [Name=from_config] @@ -95,6 +151,9 @@ interface Builder { void set_announcement_addresses(sequence announcement_addresses); [Throws=BuildError] void set_node_alias(string node_alias); + void set_tier_store_retry_config(RetryConfig retry_config); + void set_tier_store_backup(DynStore backup_store); + void set_tier_store_ephemeral(DynStore ephemeral_store); [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); [Throws=BuildError] @@ -107,6 +166,8 @@ interface Builder { Node build_with_vss_store_and_fixed_headers(string vss_url, string store_id, record fixed_headers); [Throws=BuildError] Node build_with_vss_store_and_header_provider(string vss_url, string store_id, VssHeaderProvider header_provider); + [Throws=BuildError] + Node build_with_tier_store(DynStore primary_store); }; interface Node { diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..27acf2140 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -52,6 +52,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::{RetryConfig, TierStore}; use crate::io::utils::{ read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, }; @@ -68,13 +69,14 @@ use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; + use crate::types::{ - ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PaymentStore, PeerManager, Persister, + ChainMonitor, ChannelManager, GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, + PaymentStore, PeerManager, Persister, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; -use crate::{Node, NodeMetrics}; +use crate::{wrap_store, DynStore, Node, NodeMetrics}; const VSS_HARDENED_CHILD_INDEX: u32 = 877; const VSS_LNURL_AUTH_HARDENED_CHILD_INDEX: u32 = 138; @@ -152,6 +154,23 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, + retry: Option, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .field("retry", &self.retry) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -251,6 +270,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, } @@ -269,6 +289,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; Self { @@ -278,6 +299,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -582,21 +604,67 @@ impl NodeBuilder { Ok(self) } + /// Configures retry behavior for transient errors when accessing the primary store. + /// + /// When building with [`build_with_tier_store`], controls the exponential backoff parameters + /// used when retrying failed operations on the primary store due to transient errors + /// (network issues, timeouts, etc.). + /// + /// If not set, default retry parameters are used. See [`RetryConfig`] for details. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_retry_config(&mut self, config: RetryConfig) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.retry = Some(config); + self + } + + /// Configures the backup store for local disaster recovery. + /// + /// When building with [`build_with_tier_store`], this store receives asynchronous copies + /// of all critical data written to the primary store. If the primary store becomes + /// unavailable, reads will fall back to this backup store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_backup(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with [`build_with_tier_store`], this store is used for data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_ephemeral(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self) -> Result { let storage_dir_path = self.config.storage_dir_path.clone(); fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new( - SqliteStore::new( - storage_dir_path.into(), - Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), - Some(io::sqlite_store::KV_TABLE_NAME.to_string()), - ) - .map_err(|_| BuildError::KVStoreSetupFailed)?, - ); - self.build_with_store(kv_store) + let kv_store = SqliteStore::new( + storage_dir_path.into(), + Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|_| BuildError::KVStoreSetupFailed)?; + + let store = wrap_store!(Arc::new(kv_store)); + + self.build_with_store(store) } /// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options @@ -607,8 +675,11 @@ impl NodeBuilder { fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new(FilesystemStore::new(storage_dir_path)); - self.build_with_store(kv_store) + let kv_store = FilesystemStore::new(storage_dir_path); + + let store = wrap_store!(Arc::new(kv_store)); + + self.build_with_store(store) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -733,6 +804,8 @@ impl NodeBuilder { let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); + + let store = wrap_store!(Arc::new(vss_store)); build_with_store_internal( config, self.chain_data_source_config.as_ref(), @@ -743,7 +816,101 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(vss_store), + store, + ) + } + + /// Builds a [`Node`] instance with tiered storage for managing data across multiple storage layers. + /// + /// This build method enables a three-tier storage architecture optimized for different data types + /// and access patterns: + /// + /// ### Storage Tiers + /// + /// - **Primary Store** (required): The authoritative store for critical channel state and payment data. + /// Typically a remote/cloud storage service for durability and accessibility across devices. + /// + /// - **Ephemeral Store** (optional): Local storage for non-critical, frequently-accessed data like + /// the network graph and scorer. Improves performance by reducing latency for data that can be + /// rebuilt if lost. Configure with [`set_tier_store_ephemeral`]. + /// + /// - **Backup Store** (optional): Local backup of critical data for disaster recovery scenarios. + /// Provides a safety net if the primary store becomes temporarily unavailable. Writes are + /// asynchronous to avoid blocking primary operations. Configure with [`set_tier_store_backup`]. + /// + /// ## Configuration + /// + /// Use the setter methods to configure optional stores and retry behavior: + /// - [`set_tier_store_ephemeral`] - Set local store for network graph and scorer + /// - [`set_tier_store_backup`] - Set local backup store for disaster recovery + /// - [`set_tier_store_retry_config`] - Configure retry delays and backoff for transient errors + /// + /// ## Example + /// + /// ```ignore + /// # use ldk_node::{Builder, Config}; + /// # use ldk_node::io::tier_store::RetryConfig; + /// # use std::sync::Arc; + /// let config = Config::default(); + /// let mut builder = NodeBuilder::from_config(config); + /// + /// let primary = Arc::new(VssStore::new(...)); + /// let ephemeral = Arc::new(FilesystemStore::new(...)); + /// let backup = Arc::new(SqliteStore::new(...)); + /// let retry_config = RetryConfig::default(); + /// + /// builder + /// .set_tier_store_ephemeral(ephemeral) + /// .set_tier_store_backup(backup) + /// .set_tier_store_retry_config(retry_config); + /// + /// let node = builder.build_with_tier_store(primary)?; + /// # Ok::<(), ldk_node::BuildError>(()) + /// ``` + /// + /// [`set_tier_store_ephemeral`]: Self::set_tier_store_ephemeral + /// [`set_tier_store_backup`]: Self::set_tier_store_backup + /// [`set_tier_store_retry_config`]: Self::set_tier_store_retry_config + pub fn build_with_tier_store(&self, primary_store: Arc) -> Result { + let logger = setup_logger(&self.log_writer_config, &self.config)?; + let runtime = if let Some(handle) = self.runtime_handle.as_ref() { + Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger))) + } else { + Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| { + log_error!(logger, "Failed to setup tokio runtime: {}", e); + BuildError::RuntimeSetupFailed + })?) + }; + let seed_bytes = seed_bytes_from_config( + &self.config, + self.entropy_source_config.as_ref(), + Arc::clone(&logger), + )?; + let config = Arc::new(self.config.clone()); + + let ts_config = self.tier_store_config.as_ref(); + let retry_config = ts_config.and_then(|c| c.retry).unwrap_or_default(); + + let mut tier_store = + TierStore::new(primary_store, Arc::clone(&runtime), Arc::clone(&logger), retry_config); + + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + + let store = wrap_store!(Arc::new(tier_store)); + build_with_store_internal( + config, + self.chain_data_source_config.as_ref(), + self.gossip_source_config.as_ref(), + self.liquidity_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), + self.async_payments_role, + seed_bytes, + runtime, + logger, + store, ) } @@ -1045,6 +1212,45 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_async_payments_role(role).map(|_| ()) } + /// Configures retry behavior for transient errors when accessing the primary store. + /// + /// When building with [`build_with_tier_store`], controls the exponential backoff parameters + /// used when retrying failed operations on the primary store due to transient errors + /// (network issues, timeouts, etc.). + /// + /// If not set, default retry parameters are used. See [`RetryConfig`] for details. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_retry_config(&self, config: RetryConfig) { + self.inner.write().unwrap().set_tier_store_retry_config(config); + } + + /// Configures the backup store for local disaster recovery. + /// + /// When building with [`build_with_tier_store`], this store receives asynchronous copies + /// of all critical data written to the primary store. If the primary store becomes + /// unavailable, reads will fall back to this backup store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_backup(&self, backup_store: Arc) { + self.inner.write().unwrap().set_tier_store_backup(backup_store); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with [`build_with_tier_store`], this store is used for data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + /// + /// [`build_with_tier_store`]: Self::build_with_tier_store + pub fn set_tier_store_ephemeral(&self, ephemeral_store: Arc) { + self.inner.write().unwrap().set_tier_store_ephemeral(ephemeral_store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self) -> Result, BuildError> { @@ -1128,6 +1334,12 @@ impl ArcedNodeBuilder { .map(Arc::new) } + pub fn build_with_tier_store( + &self, primary_store: Arc, + ) -> Result, BuildError> { + self.inner.read().unwrap().build_with_tier_store(primary_store).map(Arc::new) + } + /// Builds a [`Node`] instance according to the options previously configured. pub fn build_with_store(&self, kv_store: Arc) -> Result, BuildError> { self.inner.read().unwrap().build_with_store(kv_store).map(Arc::new) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 4b7cd588f..5a46c25a4 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -38,8 +38,8 @@ use crate::fee_estimator::{ }; use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; -use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; -use crate::{Error, NodeMetrics}; +use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use crate::{DynStore, Error, NodeMetrics}; const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index dbd0d9f7f..aee37f370 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -36,7 +36,8 @@ use crate::fee_estimator::{ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use crate::DynStore; use crate::NodeMetrics; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index be6f2fb86..89c44df83 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -28,8 +28,8 @@ use crate::fee_estimator::{ }; use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; -use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; -use crate::{Error, NodeMetrics}; +use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use crate::{DynStore, Error, NodeMetrics}; pub(super) struct EsploraChainSource { pub(super) sync_config: EsploraSyncConfig, diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 309d60eab..40b144bcf 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -28,8 +28,8 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::io::utils::write_node_metrics; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; -use crate::{Error, NodeMetrics}; +use crate::types::{Broadcaster, ChainMonitor, ChannelManager, Sweeper, Wallet}; +use crate::{DynStore, Error, NodeMetrics}; pub(crate) enum WalletSyncStatus { Completed, diff --git a/src/data_store.rs b/src/data_store.rs index 83cbf4476..c712210a7 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -13,8 +13,7 @@ use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; -use crate::types::DynStore; -use crate::Error; +use crate::{DynStore, Error}; pub(crate) trait StorableObject: Clone + Readable + Writeable { type Id: StorableObjectId; @@ -175,7 +174,7 @@ mod tests { use lightning::util::test_utils::{TestLogger, TestStore}; use super::*; - use crate::hex_utils; + use crate::{hex_utils, wrap_store}; #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct TestObjectId { @@ -234,7 +233,7 @@ mod tests { #[test] fn data_is_persisted() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = wrap_store!(Arc::new(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let primary_namespace = "datastore_test_primary".to_string(); let secondary_namespace = "datastore_test_secondary".to_string(); diff --git a/src/event.rs b/src/event.rs index eedfb1c14..8ef7b7f36 100644 --- a/src/event.rs +++ b/src/event.rs @@ -48,7 +48,8 @@ use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; -use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::types::{CustomTlvRecord, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::DynStore; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -1607,11 +1608,13 @@ mod tests { use lightning::util::test_utils::{TestLogger, TestStore}; + use crate::wrap_store; + use super::*; #[tokio::test] async fn event_queue_persistence() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = wrap_store!(Arc::new(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); @@ -1647,7 +1650,7 @@ mod tests { #[tokio::test] async fn event_queue_concurrency() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = wrap_store!(Arc::new(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 32464d044..f66b3c56e 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -45,3 +45,19 @@ pub fn maybe_try_convert_enum(value: &T) -> Result<&T, crate::error::Error> { pub fn maybe_wrap(value: T) -> T { value } + +/// KVStore* wrapper +#[macro_export] +macro_rules! wrap_store { + ($store:expr) => {{ + #[cfg(feature = "uniffi")] + { + $crate::DynStore::from_ldk_store($store) + } + + #[cfg(not(feature = "uniffi"))] + { + $store + } + }}; +} diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 3c88a665f..b48d1a970 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -11,11 +11,14 @@ // Make sure to add any re-exported items that need to be used in uniffi below. use std::convert::TryInto; +use std::future::Future; use std::ops::Deref; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; pub use bip39::Mnemonic; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; @@ -31,6 +34,7 @@ use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer}; use lightning::offers::refund::Refund as LdkRefund; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::ser::Writeable; use lightning_invoice::{Bolt11Invoice as LdkBolt11Invoice, Bolt11InvoiceDescriptionRef}; pub use lightning_invoice::{Description, SignedRawBolt11Invoice}; @@ -55,8 +59,330 @@ pub use crate::payment::store::{ ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, }; pub use crate::payment::QrPaymentResult; +use crate::types::SyncAndAsyncKVStore as LdkSyncAndAsyncKVStore; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; +#[derive(Debug)] +pub enum IOError { + NotFound, + PermissionDenied, + ConnectionRefused, + ConnectionReset, + ConnectionAborted, + NotConnected, + AddrInUse, + AddrNotAvailable, + BrokenPipe, + AlreadyExists, + WouldBlock, + InvalidInput, + InvalidData, + TimedOut, + WriteZero, + Interrupted, + UnexpectedEof, + Other, +} + +impl From for IOError { + fn from(error: lightning::io::Error) -> Self { + match error.kind() { + lightning::io::ErrorKind::NotFound => IOError::NotFound, + lightning::io::ErrorKind::PermissionDenied => IOError::PermissionDenied, + lightning::io::ErrorKind::ConnectionRefused => IOError::ConnectionRefused, + lightning::io::ErrorKind::ConnectionReset => IOError::ConnectionReset, + lightning::io::ErrorKind::ConnectionAborted => IOError::ConnectionAborted, + lightning::io::ErrorKind::NotConnected => IOError::NotConnected, + lightning::io::ErrorKind::AddrInUse => IOError::AddrInUse, + lightning::io::ErrorKind::AddrNotAvailable => IOError::AddrNotAvailable, + lightning::io::ErrorKind::BrokenPipe => IOError::BrokenPipe, + lightning::io::ErrorKind::AlreadyExists => IOError::AlreadyExists, + lightning::io::ErrorKind::WouldBlock => IOError::WouldBlock, + lightning::io::ErrorKind::InvalidInput => IOError::InvalidInput, + lightning::io::ErrorKind::InvalidData => IOError::InvalidData, + lightning::io::ErrorKind::TimedOut => IOError::TimedOut, + lightning::io::ErrorKind::WriteZero => IOError::WriteZero, + lightning::io::ErrorKind::Interrupted => IOError::Interrupted, + lightning::io::ErrorKind::UnexpectedEof => IOError::UnexpectedEof, + lightning::io::ErrorKind::Other => IOError::Other, + } + } +} + +impl From for lightning::io::Error { + fn from(error: IOError) -> Self { + match error { + IOError::NotFound => lightning::io::ErrorKind::NotFound.into(), + IOError::PermissionDenied => lightning::io::ErrorKind::PermissionDenied.into(), + IOError::ConnectionRefused => lightning::io::ErrorKind::ConnectionRefused.into(), + IOError::ConnectionReset => lightning::io::ErrorKind::ConnectionReset.into(), + IOError::ConnectionAborted => lightning::io::ErrorKind::ConnectionAborted.into(), + IOError::NotConnected => lightning::io::ErrorKind::NotConnected.into(), + IOError::AddrInUse => lightning::io::ErrorKind::AddrInUse.into(), + IOError::AddrNotAvailable => lightning::io::ErrorKind::AddrNotAvailable.into(), + IOError::BrokenPipe => lightning::io::ErrorKind::BrokenPipe.into(), + IOError::AlreadyExists => lightning::io::ErrorKind::AlreadyExists.into(), + IOError::WouldBlock => lightning::io::ErrorKind::WouldBlock.into(), + IOError::InvalidInput => lightning::io::ErrorKind::InvalidInput.into(), + IOError::InvalidData => lightning::io::ErrorKind::InvalidData.into(), + IOError::TimedOut => lightning::io::ErrorKind::TimedOut.into(), + IOError::WriteZero => lightning::io::ErrorKind::WriteZero.into(), + IOError::Interrupted => lightning::io::ErrorKind::Interrupted.into(), + IOError::UnexpectedEof => lightning::io::ErrorKind::UnexpectedEof.into(), + IOError::Other => lightning::io::ErrorKind::Other.into(), + } + } +} + +impl std::fmt::Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::NotFound => write!(f, "NotFound"), + IOError::PermissionDenied => write!(f, "PermissionDenied"), + IOError::ConnectionRefused => write!(f, "ConnectionRefused"), + IOError::ConnectionReset => write!(f, "ConnectionReset"), + IOError::ConnectionAborted => write!(f, "ConnectionAborted"), + IOError::NotConnected => write!(f, "NotConnected"), + IOError::AddrInUse => write!(f, "AddrInUse"), + IOError::AddrNotAvailable => write!(f, "AddrNotAvailable"), + IOError::BrokenPipe => write!(f, "BrokenPipe"), + IOError::AlreadyExists => write!(f, "AlreadyExists"), + IOError::WouldBlock => write!(f, "WouldBlock"), + IOError::InvalidInput => write!(f, "InvalidInput"), + IOError::InvalidData => write!(f, "InvalidData"), + IOError::TimedOut => write!(f, "TimedOut"), + IOError::WriteZero => write!(f, "WriteZero"), + IOError::Interrupted => write!(f, "Interrupted"), + IOError::UnexpectedEof => write!(f, "UnexpectedEof"), + IOError::Other => write!(f, "Other"), + } + } +} + +#[async_trait] +pub trait SyncAndAsyncKVStore: Send + Sync { + // KVStoreSync methods + fn read_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + fn write_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + fn remove_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + fn list_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; + + // KVStore methods + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; +} + +pub struct ForeignKVStoreAdapter { + pub(crate) inner: Arc, +} + +impl KVStore for ForeignKVStoreAdapter { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, lightning::io::Error>> + Send>> { + let inner = self.inner.clone(); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + inner + .read_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let inner = self.inner.clone(); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + inner + .write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| e.into()) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + let inner = self.inner.clone(); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + inner + .remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| e.into()) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, lightning::io::Error>> + Send>> { + let inner = self.inner.clone(); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + Box::pin(async move { + inner.list_async(primary_namespace, secondary_namespace).await.map_err(|e| e.into()) + }) + } +} + +impl KVStoreSync for ForeignKVStoreAdapter { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + self.inner + .read_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + .map_err(|e| e.into()) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + self.inner + .write_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + .map_err(|e| e.into()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + self.inner + .remove_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + .map_err(|e| e.into()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + self.inner + .list_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + .map_err(|e| e.into()) + } +} + +pub struct DynStore { + pub(crate) inner: Arc, +} + +impl DynStore { + pub fn from_store(store: Arc) -> Self { + let adapter = ForeignKVStoreAdapter { inner: store }; + Self { inner: Arc::new(adapter) } + } + + pub fn from_ldk_store(store: Arc) -> Arc { + Arc::new(Self { inner: store }) + } +} + +impl Deref for DynStore { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl KVStore for DynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, lightning::io::Error>> + Send>> { + KVStore::read(self.inner.as_ref(), primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + KVStore::write(self.inner.as_ref(), primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + KVStore::remove(self.inner.as_ref(), primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, lightning::io::Error>> + Send>> { + KVStore::list(self.inner.as_ref(), primary_namespace, secondary_namespace) + } +} + +impl KVStoreSync for DynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + KVStoreSync::read(self.inner.as_ref(), primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + KVStoreSync::write(self.inner.as_ref(), primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + KVStoreSync::remove(self.inner.as_ref(), primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + KVStoreSync::list(self.inner.as_ref(), primary_namespace, secondary_namespace) + } +} + impl UniffiCustomTypeConverter for PublicKey { type Builtin = String; diff --git a/src/io/mod.rs b/src/io/mod.rs index 38fba5114..23a73183e 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub(crate) mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..0337ff19a --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,1270 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::io::utils::{check_namespace_key_validity, is_possibly_transient}; +use crate::logger::{LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_trace}; +use lightning::{log_debug, log_error, log_info, log_warn}; + +use tokio::sync::mpsc::{self, error::TrySendError}; + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +// todo(enigbe): Uncertain about appropriate queue size and if this would need +// configuring. +const BACKUP_QUEUE_CAPACITY: usize = 100; + +const DEFAULT_INITIAL_RETRY_DELAY_MS: u16 = 50; +const DEFAULT_MAXIMUM_RETRY_DELAY_SECS: u16 = 5; +const DEFAULT_BACKOFF_MULTIPLIER: f32 = 1.5; + +/// Configuration for exponential backoff retry behavior. +#[derive(Debug, Copy, Clone)] +pub struct RetryConfig { + /// The initial delay before the first retry attempt, in milliseconds. + pub initial_retry_delay_ms: u16, + /// The maximum delay between retry attempts, in seconds. + pub maximum_delay_secs: u16, + /// The multiplier applied to the delay after each retry attempt. + /// + /// For example, a value of `2.0` doubles the delay after each failed retry. + pub backoff_multiplier: f32, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + initial_retry_delay_ms: DEFAULT_INITIAL_RETRY_DELAY_MS, + maximum_delay_secs: DEFAULT_MAXIMUM_RETRY_DELAY_SECS, + backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, + } + } +} + +/// A 3-tiered [`KVStoreSync`] implementation that manages data across +/// three distinct storage locations, i.e. primary (preferably remote) +/// store for all critical data, optional ephemeral (local) store for +/// non-critical and easily rebuildable data, and backup (preferably +/// local) to lazily backup the primary store for disaster recovery +/// scenarios. +pub(crate) struct TierStore { + inner: Arc, + next_version: AtomicU64, + runtime: Arc, + logger: Arc, +} + +impl TierStore { + pub fn new( + primary_store: Arc, runtime: Arc, logger: Arc, + retry_config: RetryConfig, + ) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger), retry_config)); + + Self { inner, next_version: AtomicU64::new(1), runtime, logger } + } + + /// Configures the local backup store for disaster recovery. + /// + /// This store serves as a local copy of the critical data for disaster + /// recovery scenarios. When configured, this method also spawns a background + /// task that asynchronously processes backup writes and removals to avoid + /// blocking primary store operations. + /// + /// The backup operates on a best-effort basis: + /// - Writes are queued asynchronously (non-blocking) + /// - No retry logic (We assume local store is unlikely to have transient failures). + /// - Failures are logged but don't propagate to all the way to caller. + pub fn set_backup_store(&mut self, backup: Arc) { + let (tx, rx) = mpsc::channel::(BACKUP_QUEUE_CAPACITY); + + let backup_clone = Arc::clone(&backup); + let logger = Arc::clone(&self.logger); + + self.runtime.spawn_background_task(Self::process_backup_operation( + rx, + backup_clone, + logger, + )); + + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + inner.backup_sender = Some(tx); + } + + async fn process_backup_operation( + mut receiver: mpsc::Receiver, backup_store: Arc, logger: Arc, + ) { + while let Some(op) = receiver.recv().await { + match Self::apply_backup_operation(&op, &backup_store) { + Ok(_) => { + log_trace!( + logger, + "Backup succeeded for key {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + }, + Err(e) => { + log_error!( + logger, + "Backup failed permanently for key {}/{}/{}: {}", + op.primary_namespace(), + op.secondary_namespace(), + op.key(), + e + ); + }, + } + } + } + + fn apply_backup_operation(op: &BackupOp, store: &Arc) -> io::Result<()> { + match op { + BackupOp::Write { primary_namespace, secondary_namespace, key, data } => { + KVStoreSync::write( + store.as_ref(), + primary_namespace, + secondary_namespace, + key, + data.clone(), + ) + }, + BackupOp::Remove { primary_namespace, secondary_namespace, key, lazy } => { + KVStoreSync::remove( + store.as_ref(), + primary_namespace, + secondary_namespace, + key, + *lazy, + ) + }, + } + } + + /// Configures the local store for non-critical data storage. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref( + &self, locking_key: String, + ) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("TierStore version counter overflowed"); + } + + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + Send>> { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin( + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let inner = Arc::clone(&self.inner); + + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + inner + .write_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send>> { + let inner = Arc::clone(&self.inner); + + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + inner + .remove_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + Send>> { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.runtime.block_on(self.inner.read_internal( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.write_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + )) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.remove_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + )) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.runtime.block_on( + self.inner + .list_internal(primary_namespace.to_string(), secondary_namespace.to_string()), + ) + } +} + +pub struct TierStoreInner { + /// For remote data. + primary_store: Arc, + /// For local non-critical/ephemeral data. + ephemeral_store: Option>, + /// For redundancy (disaster recovery). + backup_store: Option>, + backup_sender: Option>, + logger: Arc, + retry_config: RetryConfig, + /// Per-key locks for the available data tiers, i.e. (primary, backup, ephemeral), + /// that ensures we don't have concurrent writes to the same namespace/key. + locks: Mutex>>>, +} + +impl TierStoreInner { + /// Creates a tier store with the primary (remote) data store. + pub fn new( + primary_store: Arc, logger: Arc, retry_config: RetryConfig, + ) -> Self { + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + backup_sender: None, + logger, + retry_config, + locks: Mutex::new(HashMap::new()), + } + } + + /// Queues data for asynchronous backup/write to the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// ## Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup queue is no longer available + fn enqueue_backup_write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let backup_res = backup_sender.try_send(BackupOp::Write { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + data: buf, + }); + if let Err(e) = backup_res { + match e { + // Assuming the channel is only full for a short time, should we explore + // retrying here to add some resiliency? + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Queues the removal of data from the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// # Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup system is no longer available + fn enqueue_backup_remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let removal_res = backup_sender.try_send(BackupOp::Remove { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + lazy, + }); + if let Err(e) = removal_res { + match e { + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Reads data from the backup store (if configured). + fn read_from_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + if let Some(backup) = self.backup_store.as_ref() { + KVStoreSync::read(backup.as_ref(), primary_namespace, secondary_namespace, key) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "Backup store not previously configured.")) + } + } + + /// Lists keys from the given primary and secondary namespace pair from the backup + /// store (if configured). + fn list_from_backup( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + if let Some(backup) = &self.backup_store { + KVStoreSync::list(backup.as_ref(), primary_namespace, secondary_namespace) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "Backup store not previously configured.")) + } + } + + /// Reads from the primary data store with basic retry logic, or falls back to backup. + /// + /// For transient errors, retries up to a maximum delay time with exponential + /// backoff. For any error (transient after exhaustion or non-transient), falls + /// to the backup store (if configured). + async fn read_primary_or_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); + let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let mut tries = 0_u16; + + loop { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => { + log_info!( + self.logger, + "Read succeeded after {} retries for key: {}/{}/{}", + tries, + primary_namespace, + secondary_namespace, + key + ); + return Ok(data); + }, + + Err(e) if is_possibly_transient(&e) && (delay < maximum_delay) => { + log_warn!( + self.logger, + "Possible transient error reading key {}/{}/{} (attempt {}): {}. Retrying...", + primary_namespace, + secondary_namespace, + key, + tries + 1, + e + ); + tries += 1; + tokio::time::sleep(delay).await; + delay = std::cmp::min( + delay.mul_f32(self.retry_config.backoff_multiplier), + maximum_delay, + ); + }, + + Err(e) => { + log_error!(self.logger, "Failed to read from primary store for key {}/{}/{}: {}. Falling back to backup.", + primary_namespace, secondary_namespace, key, e); + return self.read_from_backup(primary_namespace, secondary_namespace, key); + }, + } + } + } + + /// Lists keys from the primary data store with retry logic, or falls back to backup. + /// + /// For transient errors, retries up to a maximum delay time with exponential + /// backoff. For any error (transient after exhaustion or non-transient), falls + /// back to the backup store (if configured) for disaster recovery. + async fn list_primary_or_backup( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); + let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let mut tries = 0_u16; + + loop { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => { + log_info!( + self.logger, + "List succeeded after {} retries for namespace: {}/{}", + tries, + primary_namespace, + secondary_namespace + ); + return Ok(keys); + }, + Err(e) if is_possibly_transient(&e) && (delay < maximum_delay) => { + log_warn!( + self.logger, + "Possible transient error listing namespace {}/{} (attempt {}): {}. Retrying...", + primary_namespace, + secondary_namespace, + tries + 1, + e + ); + tries += 1; + tokio::time::sleep(delay).await; + delay = std::cmp::min( + delay.mul_f32(self.retry_config.backoff_multiplier), + maximum_delay, + ); + }, + Err(e) => { + log_error!(self.logger, "Failed to list from primary store for namespace {}/{}: {}. Falling back to backup.", + primary_namespace, secondary_namespace, e); + return self.list_from_backup(primary_namespace, secondary_namespace); + }, + } + } + } + + /// Writes data to the primary store with retry logic. + /// + /// For transient errors, retries up to a maximum delay time with exponential + /// backoff. + async fn retry_write_with_backoff( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); + let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let mut tries = 0_u16; + + loop { + match KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ) + .await + { + Ok(res) => { + log_info!( + self.logger, + "Write succeeded after {} retries for key: {}/{}/{}", + tries, + primary_namespace, + secondary_namespace, + key + ); + return Ok(res); + }, + Err(e) if is_possibly_transient(&e) && (delay < maximum_delay) => { + log_warn!( + self.logger, + "Possible transient error writing key {}/{}/{} (attempt {}): {}. Retrying...", + primary_namespace, + secondary_namespace, + key, + tries + 1, + e + ); + tries += 1; + tokio::time::sleep(delay).await; + delay = std::cmp::min( + delay.mul_f32(self.retry_config.backoff_multiplier), + maximum_delay, + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to write to primary store for key {}/{}/{}: {}", + primary_namespace, + secondary_namespace, + key, + e + ); + return Err(e); + }, + } + } + } + + /// Removes data from the primary store with retry logic. + /// + /// For transient errors, retries up to a maximum delay time with exponential + /// backoff. + async fn retry_remove_with_backoff( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); + let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let mut tries = 0_u16; + + loop { + match KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + { + Ok(res) => { + log_info!( + self.logger, + "Successfully removed data from primary store after {} retries for key: {}/{}/{}", + tries, + primary_namespace, + secondary_namespace, + key + ); + return Ok(res); + }, + Err(e) if is_possibly_transient(&e) && (delay < maximum_delay) => { + log_warn!( + self.logger, + "Possible transient error removing key {}/{}/{} from primary store (attempt {}): {}. Retrying...", + primary_namespace, + secondary_namespace, + key, + tries + 1, + e + ); + tries += 1; + tokio::time::sleep(delay).await; + delay = std::cmp::min( + delay.mul_f32(self.retry_config.backoff_multiplier), + maximum_delay, + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to remove data from primary store for key {}/{}/{}: {}", + primary_namespace, + secondary_namespace, + key, + e + ); + return Err(e); + }, + } + } + } + + async fn primary_write_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_write_res = match KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ) + .await + { + Ok(res) => Ok(res), + Err(e) if is_possibly_transient(&e) => { + self.retry_write_with_backoff( + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ) + .await + }, + Err(e) => Err(e), + }; + + match primary_write_res { + Ok(res) => { + // We enqueue for backup only what we successfully write to primary. In doing + // this we avoid data inconsistencies across stores. + if let Err(e) = + self.enqueue_backup_write(primary_namespace, secondary_namespace, key, buf) + { + // We don't propagate backup errors here, opting to log only. + log_warn!( + self.logger, + "Failed to queue backup write for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(res) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup write due to primary write failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn primary_remove_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_remove_res = match KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + { + Ok(res) => Ok(res), + Err(e) if is_possibly_transient(&e) => { + self.retry_remove_with_backoff(primary_namespace, secondary_namespace, key, lazy) + .await + }, + Err(e) => Err(e), + }; + + match primary_remove_res { + Ok(res) => { + if let Err(e) = + self.enqueue_backup_remove(primary_namespace, secondary_namespace, key, lazy) + { + // We don't propagate backup errors here, opting to silently log. + log_warn!( + self.logger, + "Failed to queue backup removal for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(res) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup removal due to primary removal failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We only try once here (without retry logic) because local failure might be indicative + // of a more serious issue (e.g. full memory, memory corruption, permissions change) that + // do not self-resolve such that retrying would negate the latency benefits. + + // The following questions remain: + // 1. Are there situations where local transient errors may warrant a retry? + // 2. Can we reliably identify/detect these transient errors? + // 3. Should we fall back to the primary or backup stores in the event of any error? + KVStoreSync::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + } else { + log_debug!(self.logger, "Ephemeral store not configured. Reading non-critical data from primary or backup stores."); + self.read_primary_or_backup(&primary_namespace, &secondary_namespace, &key) + .await + } + }, + _ => self.read_primary_or_backup(&primary_namespace, &secondary_namespace, &key).await, + } + } + + async fn write_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + self.execute_locked_write( + inner_lock_ref, + locking_key, + version, + async move || { + KVStoreSync::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + }, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Writing non-critical data to primary and backup stores."); + + self.execute_locked_write( + inner_lock_ref, + locking_key, + version, + async move || { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }, + ) + .await + } + }, + _ => { + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }) + .await + }, + } + } + + async fn remove_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + self.execute_locked_write( + inner_lock_ref, + locking_key, + version, + async move || { + KVStoreSync::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + }, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Removing non-critical data from primary and backup stores."); + + self.execute_locked_write( + inner_lock_ref, + locking_key, + version, + async move || { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }, + ) + .await + } + }, + _ => { + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }) + .await + }, + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + } else { + log_debug!( + self.logger, + "Ephemeral store not configured. Listing from primary and backup stores." + ); + self.list_primary_or_backup(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary_or_backup(&primary_namespace, &secondary_namespace).await, + } + } + + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + callback: FN, + ) -> Result<(), lightning::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.lock().await; + + // Check if we already have a newer version written. This ensures eventual consistency. + let is_stale_version = version <= *last_written_version; + + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected TierStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } + } +} + +enum BackupOp { + Write { primary_namespace: String, secondary_namespace: String, key: String, data: Vec }, + Remove { primary_namespace: String, secondary_namespace: String, key: String, lazy: bool }, +} + +impl BackupOp { + fn primary_namespace(&self) -> &str { + match self { + BackupOp::Write { primary_namespace, .. } + | BackupOp::Remove { primary_namespace, .. } => primary_namespace, + } + } + + fn secondary_namespace(&self) -> &str { + match self { + BackupOp::Write { secondary_namespace, .. } + | BackupOp::Remove { secondary_namespace, .. } => secondary_namespace, + } + } + + fn key(&self) -> &str { + match self { + BackupOp::Write { key, .. } | BackupOp::Remove { key, .. } => key, + } + } +} + +#[cfg(test)] +mod tests { + use crate::io::test_utils::random_storage_path; + use crate::io::tier_store::{RetryConfig, TierStore}; + use crate::logger::Logger; + use crate::runtime::Runtime; + #[cfg(not(feature = "uniffi"))] + use crate::types::DynStore; + use crate::wrap_store; + #[cfg(feature = "uniffi")] + use crate::DynStore; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + }; + use lightning_persister::fs_store::FilesystemStore; + + use std::path::PathBuf; + use std::sync::Arc; + // use std::time::Duration; + + struct StorageFixture { + tier: TierStore, + primary: Arc, + ephemeral: Option>, + backup: Option>, + base_dir: PathBuf, + } + + impl Drop for StorageFixture { + fn drop(&mut self) { + drop(self.backup.take()); + drop(self.ephemeral.take()); + + if let Err(e) = std::fs::remove_dir_all(&self.base_dir) { + eprintln!("Failed to clean up test directory {:?}: {}", self.base_dir, e); + } + } + } + + fn setup_tier_store(ephemeral: bool, backup: bool) -> StorageFixture { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + + let primary: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary_store")))); + let logger = Arc::new( + Logger::new_fs_writer(log_path, Level::Debug) + .expect("Failed to create filesystem logger"), + ); + let runtime = + Arc::new(Runtime::new(Arc::clone(&logger)).expect("Failed to create new runtime.")); + let retry_config = RetryConfig::default(); + let mut tier = + TierStore::new(Arc::clone(&primary), Arc::clone(&runtime), logger, retry_config); + + let ephemeral = if ephemeral { + let eph_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("eph_store")))); + tier.set_ephemeral_store(Arc::clone(&eph_store)); + Some(eph_store) + } else { + None + }; + + let backup = if backup { + let backup: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("backup_store")))); + tier.set_backup_store(Arc::clone(&backup)); + Some(backup) + } else { + None + }; + + StorageFixture { tier, primary, ephemeral, backup, base_dir } + } + + #[test] + fn writes_to_ephemeral_if_configured() { + let tier = setup_tier_store(true, false); + assert!(tier.ephemeral.is_some()); + + let primary_namespace = NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary_namespace = NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE; + let data = [42u8; 32].to_vec(); + + KVStoreSync::write( + &tier.tier, + primary_namespace, + secondary_namespace, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + KVStoreSync::write( + &tier.tier, + primary_namespace, + secondary_namespace, + SCORER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let eph_store = tier.ephemeral.clone().unwrap(); + let ng_read = KVStoreSync::read( + &*eph_store, + primary_namespace, + secondary_namespace, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .unwrap(); + + let sc_read = KVStoreSync::read( + &*eph_store, + primary_namespace, + secondary_namespace, + SCORER_PERSISTENCE_KEY, + ) + .unwrap(); + + assert_eq!(ng_read, data); + assert!(KVStoreSync::read( + &*tier.primary, + primary_namespace, + secondary_namespace, + NETWORK_GRAPH_PERSISTENCE_KEY + ) + .is_err()); + + assert_eq!(sc_read, data); + assert!(KVStoreSync::read( + &*tier.primary, + primary_namespace, + secondary_namespace, + SCORER_PERSISTENCE_KEY + ) + .is_err()); + } +} diff --git a/src/io/utils.rs b/src/io/utils.rs index 98993ff11..0adc80b92 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -20,6 +20,7 @@ use bdk_wallet::ChangeSet as BdkWalletChangeSet; use bip39::Mnemonic; use bitcoin::Network; use lightning::io::Cursor; +use lightning::io::ErrorKind; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ @@ -46,9 +47,9 @@ use crate::io::{ }; use crate::logger::{log_error, LdkLogger, Logger}; use crate::peer_store::PeerStore; -use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; +use crate::types::{Broadcaster, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; -use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; +use crate::{DynStore, Error, EventQueue, NodeMetrics, PaymentDetails}; pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache"; @@ -617,6 +618,18 @@ pub(crate) fn read_bdk_wallet_change_set( Ok(Some(change_set)) } +/// Checks if an error kind is possibly transient. +pub(crate) fn is_possibly_transient(error: &lightning::io::Error) -> bool { + match error.kind() { + ErrorKind::ConnectionRefused + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset + | ErrorKind::TimedOut + | ErrorKind::Interrupted + | ErrorKind::NotConnected => true, + _ => false, + } +} #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 6a26c6c5b..89397cdd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,7 @@ use event::{EventHandler, EventQueue}; use ffi::*; use gossip::GossipSource; use graph::NetworkGraph; +pub use io::tier_store::RetryConfig; pub use io::utils::generate_entropy_mnemonic; use io::utils::write_node_metrics; use lightning::chain::BestBlock; @@ -155,9 +156,15 @@ use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; -pub use types::{ - ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId, -}; +pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; + +#[cfg(feature = "uniffi")] +pub use crate::ffi::DynStore; +#[cfg(not(feature = "uniffi"))] +pub use crate::types::DynStore; + +#[cfg(not(feature = "uniffi"))] +pub use types::SyncAndAsyncKVStore; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, diff --git a/src/liquidity.rs b/src/liquidity.rs index 81d48e530..5316b395c 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -44,9 +44,9 @@ use crate::connection::ConnectionManager; use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ - Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet, + Broadcaster, ChannelManager, KeysManager, LiquidityManager, PeerManager, Wallet, }; -use crate::{total_anchor_channels_reserve_sats, Config, Error}; +use crate::{total_anchor_channels_reserve_sats, Config, DynStore, Error}; const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index a7e2d2f9e..a9253af64 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -21,7 +21,7 @@ use lightning::util::ser::{Readable, Writeable}; use crate::hex_utils; use crate::io::STATIC_INVOICE_STORE_PRIMARY_NAMESPACE; use crate::payment::asynchronous::rate_limiter::RateLimiter; -use crate::types::DynStore; +use crate::DynStore; struct PersistedStaticInvoice { invoice: StaticInvoice, @@ -161,11 +161,11 @@ mod tests { use lightning_types::features::BlindedHopFeatures; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; - use crate::types::DynStore; + use crate::{wrap_store, DynStore}; #[tokio::test] async fn static_invoice_store_test() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = wrap_store!(Arc::new(TestStore::new(false))); let static_invoice_store = StaticInvoiceStore::new(Arc::clone(&store)); let static_invoice = invoice(); diff --git a/src/peer_store.rs b/src/peer_store.rs index 82c80c396..3ca91012d 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -19,7 +19,7 @@ use crate::io::{ PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger}; -use crate::types::DynStore; +use crate::DynStore; use crate::{Error, SocketAddress}; pub struct PeerStore @@ -154,11 +154,13 @@ mod tests { use lightning::util::test_utils::{TestLogger, TestStore}; + use crate::wrap_store; + use super::*; #[test] fn peer_info_persistence() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = wrap_store!(Arc::new(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); diff --git a/src/types.rs b/src/types.rs index 71512b2cd..a04e9d162 100644 --- a/src/types.rs +++ b/src/types.rs @@ -48,8 +48,11 @@ where { } +#[cfg(not(feature = "uniffi"))] /// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers; pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send; +#[cfg(feature = "uniffi")] +pub(crate) use crate::DynStore; pub type Persister = MonitorUpdatingPersister< Arc, diff --git a/src/wallet/persist.rs b/src/wallet/persist.rs index 5c8668937..952ce8115 100644 --- a/src/wallet/persist.rs +++ b/src/wallet/persist.rs @@ -16,7 +16,8 @@ use crate::io::utils::{ write_bdk_wallet_tx_graph, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::types::DynStore; +use crate::DynStore; + pub(crate) struct KVStoreWalletPersister { latest_change_set: Option, kv_store: Arc, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index dd680488c..aaccbcffa 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,7 +32,8 @@ use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyn use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ - Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, + wrap_store, Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, + PendingSweepBalance, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -411,7 +412,9 @@ pub(crate) fn setup_node_for_async_payments( let node = match config.store_type { TestStoreType::TestSyncStore => { - let kv_store = Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.into())); + let kv_store = wrap_store!(Arc::new(TestSyncStore::new( + config.node_config.storage_dir_path.into() + ))); builder.build_with_store(kv_store).unwrap() }, TestStoreType::Sqlite => builder.build().unwrap(), diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index e2d4207cd..b02de5388 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,7 +31,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, }; -use ldk_node::{Builder, DynStore, Event, NodeError}; +use ldk_node::{wrap_store, Builder, DynStore, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -252,14 +252,15 @@ async fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let test_sync_store: Arc = - Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.clone().into())); + let test_sync_store: Arc = wrap_store!(Arc::new(TestSyncStore::new( + config.node_config.storage_dir_path.clone().into() + ))); let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); + let node = builder.build_with_store(wrap_store!(Arc::clone(&test_sync_store))).unwrap(); node.start().unwrap(); let expected_node_id = node.node_id(); @@ -297,7 +298,8 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); + let reinitialized_node = + builder.build_with_store(wrap_store!(Arc::clone(&test_sync_store))).unwrap(); reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From 315a54455632a775b88484c7eb7a94e03aebf9ee Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 20 Oct 2025 21:50:37 +0100 Subject: [PATCH 2/3] refactor: set retry config maximum delay in milliseconds --- bindings/ldk_node.udl | 2 +- src/io/tier_store.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index b0f98a241..f8ca5a8a6 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -48,7 +48,7 @@ dictionary LSPS2ServiceConfig { dictionary RetryConfig { u16 initial_retry_delay_ms; - u16 maximum_delay_secs; + u16 maximum_delay_ms; f32 backoff_multiplier; }; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 0337ff19a..7cb0963c8 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -31,8 +31,8 @@ use std::time::Duration; // configuring. const BACKUP_QUEUE_CAPACITY: usize = 100; -const DEFAULT_INITIAL_RETRY_DELAY_MS: u16 = 50; -const DEFAULT_MAXIMUM_RETRY_DELAY_SECS: u16 = 5; +const DEFAULT_INITIAL_RETRY_DELAY_MS: u16 = 10; +const DEFAULT_MAXIMUM_RETRY_DELAY_MS: u16 = 500; const DEFAULT_BACKOFF_MULTIPLIER: f32 = 1.5; /// Configuration for exponential backoff retry behavior. @@ -40,8 +40,8 @@ const DEFAULT_BACKOFF_MULTIPLIER: f32 = 1.5; pub struct RetryConfig { /// The initial delay before the first retry attempt, in milliseconds. pub initial_retry_delay_ms: u16, - /// The maximum delay between retry attempts, in seconds. - pub maximum_delay_secs: u16, + /// The maximum delay between retry attempts, in milliseconds. + pub maximum_delay_ms: u16, /// The multiplier applied to the delay after each retry attempt. /// /// For example, a value of `2.0` doubles the delay after each failed retry. @@ -52,7 +52,7 @@ impl Default for RetryConfig { fn default() -> Self { Self { initial_retry_delay_ms: DEFAULT_INITIAL_RETRY_DELAY_MS, - maximum_delay_secs: DEFAULT_MAXIMUM_RETRY_DELAY_SECS, + maximum_delay_ms: DEFAULT_MAXIMUM_RETRY_DELAY_MS, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, } } @@ -515,7 +515,7 @@ impl TierStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); - let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let maximum_delay = Duration::from_millis(self.retry_config.maximum_delay_ms as u64); let mut tries = 0_u16; loop { @@ -575,7 +575,7 @@ impl TierStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, ) -> io::Result> { let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); - let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let maximum_delay = Duration::from_millis(self.retry_config.maximum_delay_ms as u64); let mut tries = 0_u16; loop { @@ -625,7 +625,7 @@ impl TierStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); - let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let maximum_delay = Duration::from_millis(self.retry_config.maximum_delay_ms as u64); let mut tries = 0_u16; loop { @@ -689,7 +689,7 @@ impl TierStoreInner { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { let mut delay = Duration::from_millis(self.retry_config.initial_retry_delay_ms as u64); - let maximum_delay = Duration::from_secs(self.retry_config.maximum_delay_secs as u64); + let maximum_delay = Duration::from_millis(self.retry_config.maximum_delay_ms as u64); let mut tries = 0_u16; loop { From 264aa7f3199426c6e72c7118e70338f1aec6e0a2 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 20 Oct 2025 22:00:58 +0100 Subject: [PATCH 3/3] test: add comprehensive testing for TierStore This commit adds unit, integration, and FFI tests for the TierStore implementation: - Unit tests for TierStore core functionality - Integration tests for nodes built with tiered storage - Python FFI tests for foreign key-value store --- benches/payments.rs | 1 + bindings/python/src/ldk_node/kv_store.py | 118 ++++++ bindings/python/src/ldk_node/test_ldk_node.py | 327 ++++++++++------- src/io/test_utils.rs | 176 ++++++++- src/io/tier_store.rs | 344 +++++++++++++----- tests/common/mod.rs | 44 ++- tests/integration_tests_rust.rs | 88 +++++ 7 files changed, 865 insertions(+), 233 deletions(-) create mode 100644 bindings/python/src/ldk_node/kv_store.py diff --git a/benches/payments.rs b/benches/payments.rs index 75b7f0513..3bae6ecb7 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -127,6 +127,7 @@ fn payment_benchmark(c: &mut Criterion) { true, false, common::TestStoreType::Sqlite, + common::TestStoreType::Sqlite, ); let runtime = diff --git a/bindings/python/src/ldk_node/kv_store.py b/bindings/python/src/ldk_node/kv_store.py new file mode 100644 index 000000000..6d4eb9bde --- /dev/null +++ b/bindings/python/src/ldk_node/kv_store.py @@ -0,0 +1,118 @@ +import threading + +from abc import ABC, abstractmethod +from typing import List + +from ldk_node import IoError + +class AbstractKvStore(ABC): + @abstractmethod + async def list_async(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + + @abstractmethod + def list_sync(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + + @abstractmethod + async def read_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + def read_sync(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + async def remove_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + def remove_sync(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + async def write_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + + @abstractmethod + def write_sync(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + +class TestKvStore(AbstractKvStore): + def __init__(self, name: str): + self.name = name + # Storage structure: {(primary_ns, secondary_ns): {key: [bytes]}} + self.storage = {} + self._lock = threading.Lock() + + def dump(self): + print(f"\n[{self.name}] Store contents:") + for (primary_ns, secondary_ns), keys_dict in self.storage.items(): + print(f" Namespace: ({primary_ns!r}, {secondary_ns!r})") + for key, data in keys_dict.items(): + print(f" Key: {key!r} -> {len(data)} bytes") + # Optionally show first few bytes + preview = data[:20] if len(data) > 20 else data + print(f" Data preview: {preview}...") + + # KVStoreSync methods + def list_sync(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key in self.storage: + return list(self.storage[namespace_key].keys()) + return [] + + def read_sync(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + with self._lock: + print(f"[{self.name}] READ: {primary_namespace}/{secondary_namespace}/{key}") + namespace_key = (primary_namespace, secondary_namespace) + + if namespace_key not in self.storage: + print(f" -> namespace not found, keys: {list(self.storage.keys())}") + raise IoError.NotFound(f"Namespace not found: {primary_namespace}/{secondary_namespace}") + + if key not in self.storage[namespace_key]: + print(f" -> key not found, keys: {list(self.storage[namespace_key].keys())}") + raise IoError.NotFound(f"Key not found: {key}") + + data = self.storage[namespace_key][key] + print(f" -> returning {len(data)} bytes") + return data + + def write_sync(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + self.storage[namespace_key] = {} + + self.storage[namespace_key][key] = buf.copy() + + def remove_sync(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + raise IoError.NotFound(f"Namespace not found: {primary_namespace}/{secondary_namespace}") + + if key not in self.storage[namespace_key]: + raise IoError.NotFound(f"Key not found: {key}") + + del self.storage[namespace_key][key] + + if not self.storage[namespace_key]: + del self.storage[namespace_key] + + # KVStore methods + async def list_async(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + return self.list_sync(primary_namespace, secondary_namespace) + + async def read_async(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + return self.read_sync(primary_namespace, secondary_namespace, key) + + async def write_async(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + self.write_sync(primary_namespace, secondary_namespace, key, buf) + + async def remove_async(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + self.remove_sync(primary_namespace, secondary_namespace, key, lazy) + + \ No newline at end of file diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index f71e89df8..5cbf7fcec 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -5,13 +5,67 @@ import os import re import requests +import asyncio +import threading +import ldk_node from ldk_node import * +from kv_store import TestKvStore DEFAULT_ESPLORA_SERVER_URL = "http://127.0.0.1:3002" DEFAULT_TEST_NETWORK = Network.REGTEST DEFAULT_BITCOIN_CLI_BIN = "bitcoin-cli" +class NodeSetup: + def __init__(self, node, node_id, tmp_dir, listening_addresses, stores=None): + self.node = node + self.node_id = node_id + self.tmp_dir = tmp_dir + self.listening_addresses = listening_addresses + self.stores = stores # (primary, backup, ephemeral) or None + + def cleanup(self): + self.node.stop() + time.sleep(1) + self.tmp_dir.cleanup() + +def setup_two_nodes(esplora_endpoint, port_1=2323, port_2=2324, use_tier_store=False): + # Setup Node 1 + tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") + print("TMP DIR 1:", tmp_dir_1.name) + + listening_addresses_1 = [f"127.0.0.1:{port_1}"] + if use_tier_store: + node_1, stores_1 = setup_node_with_tier_store(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + else: + node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + stores_1 = None + + node_1.start() + node_id_1 = node_1.node_id() + print("Node ID 1:", node_id_1) + + setup_1 = NodeSetup(node_1, node_id_1, tmp_dir_1, listening_addresses_1, stores_1) + + # Setup Node 2 + tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") + print("TMP DIR 2:", tmp_dir_2.name) + + listening_addresses_2 = [f"127.0.0.1:{port_2}"] + if use_tier_store: + node_2, stores_2 = setup_node_with_tier_store(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + else: + node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + stores_2 = None + + node_2.start() + node_id_2 = node_2.node_id() + print("Node ID 2:", node_id_2) + + setup_2 = NodeSetup(node_2, node_id_2, tmp_dir_2, listening_addresses_2, stores_2) + + return setup_1, setup_2 + def bitcoin_cli(cmd): args = [] @@ -95,7 +149,6 @@ def send_to_address(address, amount_sats): print("SEND TX:", res) return res - def setup_node(tmp_dir, esplora_endpoint, listening_addresses): config = default_config() builder = Builder.from_config(config) @@ -105,6 +158,122 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): builder.set_listening_addresses(listening_addresses) return builder.build() +def setup_node_with_tier_store(tmp_dir, esplora_endpoint, listening_addresses): + config = default_config() + + primary = TestKvStore("primary") + backup = TestKvStore("backup") + ephemeral = TestKvStore("ephemeral") + retry_config = RetryConfig( + initial_retry_delay_ms=10, + maximum_delay_ms=100, + backoff_multiplier=2.0 + ) + + # Set event loop for async Python callbacks from Rust + # (https://mozilla.github.io/uniffi-rs/0.27/futures.html#python-uniffi_set_event_loop) + loop = asyncio.new_event_loop() + + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + ldk_node.uniffi_set_event_loop(loop) + + builder = Builder.from_config(config) + builder.set_storage_dir_path(tmp_dir) + builder.set_chain_source_esplora(esplora_endpoint, None) + builder.set_network(DEFAULT_TEST_NETWORK) + builder.set_listening_addresses(listening_addresses) + builder.set_tier_store_retry_config(retry_config) + builder.set_tier_store_backup(DynStore.from_store(backup)) + builder.set_tier_store_ephemeral(DynStore.from_store(ephemeral)) + + return builder.build_with_tier_store(DynStore.from_store(primary)), (primary, backup, ephemeral) + +def do_channel_full_cycle(setup_1, setup_2, esplora_endpoint): + # Fund both nodes + address_1 = setup_1.node.onchain_payment().new_address() + txid_1 = send_to_address(address_1, 100000) + address_2 = setup_2.node.onchain_payment().new_address() + txid_2 = send_to_address(address_2, 100000) + + wait_for_tx(esplora_endpoint, txid_1) + wait_for_tx(esplora_endpoint, txid_2) + mine_and_wait(esplora_endpoint, 6) + + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + # Verify balances + spendable_balance_1 = setup_1.node.list_balances().spendable_onchain_balance_sats + spendable_balance_2 = setup_2.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_1 == 100000 + assert spendable_balance_2 == 100000 + + # Open channel + setup_1.node.open_channel(setup_2.node_id, setup_2.listening_addresses[0], 50000, None, None) + + channel_pending_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_pending_event_1, Event.CHANNEL_PENDING) + setup_1.node.event_handled() + + channel_pending_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_pending_event_2, Event.CHANNEL_PENDING) + setup_2.node.event_handled() + + funding_txid = channel_pending_event_1.funding_txo.txid + wait_for_tx(esplora_endpoint, funding_txid) + mine_and_wait(esplora_endpoint, 6) + + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + channel_ready_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_ready_event_1, Event.CHANNEL_READY) + setup_1.node.event_handled() + + channel_ready_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_ready_event_2, Event.CHANNEL_READY) + setup_2.node.event_handled() + + # Make payment + description = Bolt11InvoiceDescription.DIRECT("asdf") + invoice = setup_2.node.bolt11_payment().receive(2500000, description, 9217) + setup_1.node.bolt11_payment().send(invoice, None) + + payment_successful_event_1 = setup_1.node.wait_next_event() + assert isinstance(payment_successful_event_1, Event.PAYMENT_SUCCESSFUL) + setup_1.node.event_handled() + + payment_received_event_2 = setup_2.node.wait_next_event() + assert isinstance(payment_received_event_2, Event.PAYMENT_RECEIVED) + setup_2.node.event_handled() + + # Close channel + setup_2.node.close_channel(channel_ready_event_2.user_channel_id, setup_1.node_id) + + channel_closed_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_closed_event_1, Event.CHANNEL_CLOSED) + setup_1.node.event_handled() + + channel_closed_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_closed_event_2, Event.CHANNEL_CLOSED) + setup_2.node.event_handled() + + mine_and_wait(esplora_endpoint, 1) + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + # Verify final balances + spendable_balance_after_close_1 = setup_1.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_1 > 95000 + assert spendable_balance_after_close_1 < 100000 + spendable_balance_after_close_2 = setup_2.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_2 == 102500 + def get_esplora_endpoint(): if os.environ.get('ESPLORA_ENDPOINT'): return str(os.environ['ESPLORA_ENDPOINT']) @@ -120,132 +289,36 @@ def setUp(self): def test_channel_full_cycle(self): esplora_endpoint = get_esplora_endpoint() - - ## Setup Node 1 - tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") - print("TMP DIR 1:", tmp_dir_1.name) - - listening_addresses_1 = ["127.0.0.1:2323"] - node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) - node_1.start() - node_id_1 = node_1.node_id() - print("Node ID 1:", node_id_1) - - # Setup Node 2 - tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") - print("TMP DIR 2:", tmp_dir_2.name) - - listening_addresses_2 = ["127.0.0.1:2324"] - node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) - node_2.start() - node_id_2 = node_2.node_id() - print("Node ID 2:", node_id_2) - - address_1 = node_1.onchain_payment().new_address() - txid_1 = send_to_address(address_1, 100000) - address_2 = node_2.onchain_payment().new_address() - txid_2 = send_to_address(address_2, 100000) - - wait_for_tx(esplora_endpoint, txid_1) - wait_for_tx(esplora_endpoint, txid_2) - - mine_and_wait(esplora_endpoint, 6) - - node_1.sync_wallets() - node_2.sync_wallets() - - spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats - spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats - total_balance_1 = node_1.list_balances().total_onchain_balance_sats - total_balance_2 = node_2.list_balances().total_onchain_balance_sats - - print("SPENDABLE 1:", spendable_balance_1) - self.assertEqual(spendable_balance_1, 100000) - - print("SPENDABLE 2:", spendable_balance_2) - self.assertEqual(spendable_balance_2, 100000) - - print("TOTAL 1:", total_balance_1) - self.assertEqual(total_balance_1, 100000) - - print("TOTAL 2:", total_balance_2) - self.assertEqual(total_balance_2, 100000) - - node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) - - channel_pending_event_1 = node_1.wait_next_event() - assert isinstance(channel_pending_event_1, Event.CHANNEL_PENDING) - print("EVENT:", channel_pending_event_1) - node_1.event_handled() - - channel_pending_event_2 = node_2.wait_next_event() - assert isinstance(channel_pending_event_2, Event.CHANNEL_PENDING) - print("EVENT:", channel_pending_event_2) - node_2.event_handled() - - funding_txid = channel_pending_event_1.funding_txo.txid - wait_for_tx(esplora_endpoint, funding_txid) - mine_and_wait(esplora_endpoint, 6) - - node_1.sync_wallets() - node_2.sync_wallets() - - channel_ready_event_1 = node_1.wait_next_event() - assert isinstance(channel_ready_event_1, Event.CHANNEL_READY) - print("EVENT:", channel_ready_event_1) - print("funding_txo:", funding_txid) - node_1.event_handled() - - channel_ready_event_2 = node_2.wait_next_event() - assert isinstance(channel_ready_event_2, Event.CHANNEL_READY) - print("EVENT:", channel_ready_event_2) - node_2.event_handled() - - description = Bolt11InvoiceDescription.DIRECT("asdf") - invoice = node_2.bolt11_payment().receive(2500000, description, 9217) - node_1.bolt11_payment().send(invoice, None) - - payment_successful_event_1 = node_1.wait_next_event() - assert isinstance(payment_successful_event_1, Event.PAYMENT_SUCCESSFUL) - print("EVENT:", payment_successful_event_1) - node_1.event_handled() - - payment_received_event_2 = node_2.wait_next_event() - assert isinstance(payment_received_event_2, Event.PAYMENT_RECEIVED) - print("EVENT:", payment_received_event_2) - node_2.event_handled() - - node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - - channel_closed_event_1 = node_1.wait_next_event() - assert isinstance(channel_closed_event_1, Event.CHANNEL_CLOSED) - print("EVENT:", channel_closed_event_1) - node_1.event_handled() - - channel_closed_event_2 = node_2.wait_next_event() - assert isinstance(channel_closed_event_2, Event.CHANNEL_CLOSED) - print("EVENT:", channel_closed_event_2) - node_2.event_handled() - - mine_and_wait(esplora_endpoint, 1) - - node_1.sync_wallets() - node_2.sync_wallets() - - spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats - assert spendable_balance_after_close_1 > 95000 - assert spendable_balance_after_close_1 < 100000 - spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats - self.assertEqual(spendable_balance_after_close_2, 102500) - - # Stop nodes - node_1.stop() - node_2.stop() - - # Cleanup - time.sleep(1) # Wait a sec so our logs can finish writing - tmp_dir_1.cleanup() - tmp_dir_2.cleanup() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + setup_1.cleanup() + setup_2.cleanup() + + def test_tier_store(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint, port_1=2325, port_2=2326, use_tier_store=True) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + primary, backup, ephemeral = setup_1.stores + + # Wait for async backup + time.sleep(2) + + self.assertGreater(len(primary.storage), 0, "Primary should have data") + self.assertGreater(len(backup.storage), 0, "Backup should have data") + self.assertEqual(list(primary.storage.keys()), list(backup.storage.keys()), + "Backup should mirror primary") + + self.assertGreater(len(ephemeral.storage), 0, "Ephemeral should have data") + ephemeral_keys = [key for namespace in ephemeral.storage.values() for key in namespace.keys()] + has_scorer_or_graph = any(key in ['scorer', 'network_graph'] for key in ephemeral_keys) + self.assertTrue(has_scorer_or_graph, "Ephemeral should contain scorer or network_graph data") + + setup_1.cleanup() + setup_2.cleanup() if __name__ == '__main__': unittest.main() diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 059e66aee..fcfdc97f7 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -5,8 +5,13 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; +use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use lightning::events::ClosureReason; use lightning::ln::functional_test_utils::{ @@ -14,13 +19,15 @@ use lightning::ln::functional_test_utils::{ create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg, }; use lightning::util::persist::{ - KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN, + KVStore, KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; -use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event}; +use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event, io}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; +use crate::runtime::Runtime; + type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< &'a K, &'a test_utils::TestLogger, @@ -227,3 +234,168 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } + +struct DelayedStoreInner { + storage: Mutex>>, + delay: Duration, +} + +impl DelayedStoreInner { + fn new(delay: Duration) -> Self { + Self { storage: Mutex::new(HashMap::new()), delay } + } + + fn make_key(pn: &str, sn: &str, key: &str) -> String { + format!("{}/{}/{}", pn, sn, key) + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let storage = self.storage.lock().unwrap(); + storage + .get(&full_key) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.insert(full_key, buf); + Ok(()) + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.remove(&full_key); + Ok(()) + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let prefix = format!("{}/{}/", primary_namespace, secondary_namespace); + let storage = self.storage.lock().unwrap(); + Ok(storage + .keys() + .filter(|k| k.starts_with(&prefix)) + .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) + .collect()) + } +} + +pub struct DelayedStore { + inner: Arc, + runtime: Arc, +} + +impl DelayedStore { + pub fn new(delay_ms: u64, runtime: Arc) -> Self { + Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime } + } +} + +impl KVStore for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + Send>> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { inner.read_internal(pn, sn, key).await }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send>> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { inner.write_internal(pn, sn, key, buf).await }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Pin> + Send>> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { inner.remove_internal(pn, sn, key).await }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + Send>> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + Box::pin(async move { inner.list_internal(pn, sn).await }) + } +} + +impl KVStoreSync for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + self.runtime.block_on(async move { inner.list_internal(pn, sn).await }) + } +} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 7cb0963c8..b471fdb62 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -29,7 +29,10 @@ use std::time::Duration; // todo(enigbe): Uncertain about appropriate queue size and if this would need // configuring. +#[cfg(not(test))] const BACKUP_QUEUE_CAPACITY: usize = 100; +#[cfg(test)] +const BACKUP_QUEUE_CAPACITY: usize = 5; const DEFAULT_INITIAL_RETRY_DELAY_MS: u16 = 10; const DEFAULT_MAXIMUM_RETRY_DELAY_MS: u16 = 500; @@ -1128,143 +1131,288 @@ impl BackupOp { #[cfg(test)] mod tests { - use crate::io::test_utils::random_storage_path; - use crate::io::tier_store::{RetryConfig, TierStore}; - use crate::logger::Logger; - use crate::runtime::Runtime; - #[cfg(not(feature = "uniffi"))] - use crate::types::DynStore; - use crate::wrap_store; - #[cfg(feature = "uniffi")] - use crate::DynStore; + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + use std::thread; use lightning::util::logger::Level; use lightning::util::persist::{ - KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning_persister::fs_store::FilesystemStore; - use std::path::PathBuf; - use std::sync::Arc; - // use std::time::Duration; - - struct StorageFixture { - tier: TierStore, - primary: Arc, - ephemeral: Option>, - backup: Option>, - base_dir: PathBuf, - } + use crate::io::test_utils::{ + do_read_write_remove_list_persist, random_storage_path, DelayedStore, + }; + use crate::logger::Logger; + use crate::runtime::Runtime; + use crate::{wrap_store, RetryConfig}; - impl Drop for StorageFixture { - fn drop(&mut self) { - drop(self.backup.take()); - drop(self.ephemeral.take()); + use super::*; - if let Err(e) = std::fs::remove_dir_all(&self.base_dir) { - eprintln!("Failed to clean up test directory {:?}: {}", self.base_dir, e); - } + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); } } - fn setup_tier_store(ephemeral: bool, backup: bool) -> StorageFixture { + fn setup_tier_store( + primary_store: Arc, logger: Arc, runtime: Arc, + ) -> TierStore { + let retry_config = RetryConfig::default(); + TierStore::new(primary_store, runtime, logger, retry_config) + } + + #[test] + fn write_read_list_remove() { let base_dir = random_storage_path(); let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); - let primary: Arc = - wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary_store")))); - let logger = Arc::new( - Logger::new_fs_writer(log_path, Level::Debug) - .expect("Failed to create filesystem logger"), - ); - let runtime = - Arc::new(Runtime::new(Arc::clone(&logger)).expect("Failed to create new runtime.")); - let retry_config = RetryConfig::default(); - let mut tier = - TierStore::new(Arc::clone(&primary), Arc::clone(&runtime), logger, retry_config); - - let ephemeral = if ephemeral { - let eph_store: Arc = - wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("eph_store")))); - tier.set_ephemeral_store(Arc::clone(&eph_store)); - Some(eph_store) - } else { - None - }; + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + let _cleanup = CleanupDir(base_dir.clone()); - let backup = if backup { - let backup: Arc = - wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("backup_store")))); - tier.set_backup_store(Arc::clone(&backup)); - Some(backup) - } else { - None - }; + let primary_store = wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger, runtime); - StorageFixture { tier, primary, ephemeral, backup, base_dir } + do_read_write_remove_list_persist(&tier); } #[test] - fn writes_to_ephemeral_if_configured() { - let tier = setup_tier_store(true, false); - assert!(tier.ephemeral.is_some()); + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); - let primary_namespace = NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE; - let secondary_namespace = NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE; - let data = [42u8; 32].to_vec(); + let ephemeral_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + let data = vec![42u8; 32]; + + // Non-critical KVStoreSync::write( - &tier.tier, - primary_namespace, - secondary_namespace, + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, data.clone(), ) .unwrap(); + // Critical KVStoreSync::write( - &tier.tier, - primary_namespace, - secondary_namespace, - SCORER_PERSISTENCE_KEY, + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, data.clone(), ) .unwrap(); - let eph_store = tier.ephemeral.clone().unwrap(); - let ng_read = KVStoreSync::read( - &*eph_store, - primary_namespace, - secondary_namespace, + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn lazy_backup() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); + + let backup_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), ) .unwrap(); - let sc_read = KVStoreSync::read( - &*eph_store, - primary_namespace, - secondary_namespace, - SCORER_PERSISTENCE_KEY, + // Immediate read from backup should fail + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_read_cm.is_err()); + + // Primary not blocked by backup hence immediate read should succeed + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(primary_read_cm.unwrap(), data); + + // Delayed read from backup should succeed + thread::sleep(Duration::from_millis(50)); + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(backup_read_cm.unwrap(), data); + } + + #[test] + fn backup_overflow_doesnt_fail_writes() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = wrap_store!(Arc::new(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + for i in 0..=10 { + let result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + &format!("{}_{}", key, i), + data.clone(), + ); + + assert!(result.is_ok(), "Write {} should succeed", i); + } + + // Check logs for backup queue overflow message + let log_contents = std::fs::read_to_string(&log_path).unwrap(); + assert!( + log_contents.contains("Backup queue is full"), + "Logs should contain backup queue overflow message" + ); + } + + #[test] + fn lazy_removal() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + wrap_store!(Arc::new(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = wrap_store!(Arc::new(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + let write_result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data.clone(), + ); + assert!(write_result.is_ok(), "Write should succeed"); + + thread::sleep(Duration::from_millis(10)); + + assert_eq!( + KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .unwrap(), + data + ); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, ) .unwrap(); - assert_eq!(ng_read, data); - assert!(KVStoreSync::read( - &*tier.primary, - primary_namespace, - secondary_namespace, - NETWORK_GRAPH_PERSISTENCE_KEY - ) - .is_err()); + thread::sleep(Duration::from_millis(10)); - assert_eq!(sc_read, data); - assert!(KVStoreSync::read( - &*tier.primary, - primary_namespace, - secondary_namespace, - SCORER_PERSISTENCE_KEY - ) - .is_err()); + let res = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(res.is_err()); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index aaccbcffa..597af8d5e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,8 +32,8 @@ use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyn use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ - wrap_store, Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, - PendingSweepBalance, + wrap_store, Builder, CustomTlvRecord, DynStore, Event, LightningBalance, Node, NodeError, + PendingSweepBalance, RetryConfig, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -263,10 +263,15 @@ pub(crate) enum TestChainSource<'a> { BitcoindRestSync(&'a BitcoinD), } -#[derive(Clone, Copy)] +#[derive(Clone)] pub(crate) enum TestStoreType { TestSyncStore, Sqlite, + TierStore { + primary: Arc, + backup: Option>, + ephemeral: Option>, + }, } impl Default for TestStoreType { @@ -293,6 +298,22 @@ macro_rules! setup_builder { pub(crate) use setup_builder; +pub(crate) fn create_tier_stores( + base_path: PathBuf, +) -> (Arc, Arc, Arc) { + let primary = wrap_store!(Arc::new( + SqliteStore::new( + base_path.join("primary"), + Some("primary_db".to_string()), + Some("primary_kv".to_string()), + ) + .unwrap(), + )); + let backup = wrap_store!(Arc::new(FilesystemStore::new(base_path.join("backup")))); + let ephemeral = wrap_store!(Arc::new(TestStore::new(false))); + (primary, backup, ephemeral) +} + pub(crate) fn setup_two_nodes( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, @@ -303,21 +324,22 @@ pub(crate) fn setup_two_nodes( anchor_channels, anchors_trusted_no_reserve, TestStoreType::TestSyncStore, + TestStoreType::TestSyncStore, ) } pub(crate) fn setup_two_nodes_with_store( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, - anchors_trusted_no_reserve: bool, store_type: TestStoreType, + anchors_trusted_no_reserve: bool, store_type_a: TestStoreType, store_type_b: TestStoreType, ) -> (TestNode, TestNode) { println!("== Node A =="); let mut config_a = random_config(anchor_channels); - config_a.store_type = store_type; + config_a.store_type = store_type_a.clone(); let node_a = setup_node(chain_source, config_a, None); println!("\n== Node B =="); let mut config_b = random_config(anchor_channels); - config_b.store_type = store_type; + config_b.store_type = store_type_b; if allow_0conf { config_b.node_config.trusted_peers_0conf.push(node_a.node_id()); } @@ -418,6 +440,16 @@ pub(crate) fn setup_node_for_async_payments( builder.build_with_store(kv_store).unwrap() }, TestStoreType::Sqlite => builder.build().unwrap(), + TestStoreType::TierStore { primary, backup, ephemeral } => { + if let Some(backup) = backup { + builder.set_tier_store_backup(backup); + } + if let Some(ephemeral) = ephemeral { + builder.set_tier_store_ephemeral(ephemeral); + } + builder.set_tier_store_retry_config(RetryConfig::default()); + builder.build_with_tier_store(primary).unwrap() + }, }; node.start().unwrap(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index b02de5388..56bb073b5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -35,10 +35,19 @@ use ldk_node::{wrap_store, Builder, DynStore, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{ + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; +use crate::common::{ + create_tier_stores, random_storage_path, setup_two_nodes_with_store, TestStoreType, +}; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -48,6 +57,85 @@ async fn channel_full_cycle() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_tier_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let (primary_a, backup_a, ephemeral_a) = create_tier_stores(random_storage_path()); + let (primary_b, backup_b, ephemeral_b) = create_tier_stores(random_storage_path()); + + let (node_a, node_b) = setup_two_nodes_with_store( + &chain_source, + false, + true, + false, + TestStoreType::TierStore { + primary: Arc::clone(&primary_a), + backup: Some(Arc::clone(&backup_a)), + ephemeral: Some(Arc::clone(&ephemeral_a)), + }, + TestStoreType::TierStore { + primary: Arc::clone(&primary_b), + backup: Some(Arc::clone(&backup_b)), + ephemeral: Some(Arc::clone(&ephemeral_b)), + }, + ); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; + + // Verify Primary store contains channel manager data + let primary_channel_manager = KVStoreSync::read( + primary_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(primary_channel_manager.is_ok(), "Primary should have channel manager data"); + + // Verify Primary store contains payment info + let primary_payments = KVStoreSync::list(primary_a.as_ref(), "payments", ""); + assert!(primary_payments.is_ok(), "Primary should have payment data"); + assert!(!primary_payments.unwrap().is_empty(), "Primary should have payment entries"); + + // Verify Backup store synced critical data + let backup_channel_manager = KVStoreSync::read( + backup_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_channel_manager.is_ok(), "Backup should have synced channel manager"); + + // Verify backup is not empty + let backup_all_keys = KVStoreSync::list(backup_a.as_ref(), "", "").unwrap(); + assert!(!backup_all_keys.is_empty(), "Backup store should not be empty"); + + // Verify Ephemeral does NOT have channel manager + let ephemeral_channel_manager = KVStoreSync::read( + ephemeral_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(ephemeral_channel_manager.is_err(), "Ephemeral should NOT have channel manager"); + + // Verify Ephemeral does NOT have payment info + let ephemeral_payments = KVStoreSync::list(ephemeral_a.as_ref(), "payments", ""); + assert!( + ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), + "Ephemeral should NOT have payment data" + ); + + //Verify Ephemeral does have network graph + let ephemeral_network_graph = KVStoreSync::read( + ephemeral_a.as_ref(), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(ephemeral_network_graph.is_ok(), "Ephemeral should have network graph"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_electrum() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();