From 6420f06ef56ba1fe91d9685f1e84e494f2465f86 Mon Sep 17 00:00:00 2001 From: moisesPomilio <93723302+moisesPompilio@users.noreply.github.com> Date: Fri, 31 Oct 2025 20:59:54 -0300 Subject: [PATCH 1/2] Add ProbingService with pluggable ProbingStrategy - Add optional ProbingService to run periodic probe payments for liquidity checks. - Configure probing behavior by providing a public `ProbingStrategy` trait implementation. - Targets and selection logic are fully customizable by the user via the trait (no fixed target count is passed by the node). --- src/builder.rs | 104 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 30 ++++++++++++++ src/probing.rs | 89 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 src/probing.rs diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..fc0b810e1 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -66,6 +66,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing::{ProbingService, ProbingStrategy}; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -129,6 +130,40 @@ struct LiquiditySourceConfig { lsps2_service: Option, } +#[derive(Clone, Debug)] +struct ProbingServiceConfig { + /// Time in seconds between consecutive probing attempts. + probing_interval_secs: u64, + + /// Amount in milli-satoshis used for each probe. + probing_amount_msat: u64, + + /// Configuration for the probing strategy as a shareable trait-object. + strategy: ProbingStrategyConfig, +} + +pub enum ProbingStrategyConfig { + Custom { strategy: Arc }, +} + +impl fmt::Debug for ProbingStrategyConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Custom { .. } => { + f.debug_struct("Custom").field("strategy", &"").finish() + }, + } + } +} + +impl Clone for ProbingStrategyConfig { + fn clone(&self) -> Self { + match self { + Self::Custom { strategy } => Self::Custom { strategy: Arc::clone(strategy) }, + } + } +} + #[derive(Clone)] enum LogWriterConfig { File { log_file_path: Option, max_log_level: Option }, @@ -253,6 +288,7 @@ pub struct NodeBuilder { async_payments_role: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, + probing_service_config: Option, } impl NodeBuilder { @@ -271,6 +307,7 @@ impl NodeBuilder { let log_writer_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; + let probing_service_config = None; Self { config, entropy_source_config, @@ -281,6 +318,7 @@ impl NodeBuilder { runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, + probing_service_config, } } @@ -488,6 +526,28 @@ impl NodeBuilder { self } + /// Configures the probing service with a custom target selection strategy. + /// + /// This allows full control over how probing targets are selected by providing + /// a custom implementation of the [`ProbingStrategy`] trait. + /// + /// # Parameters + /// * `probing_interval_secs` - Seconds between probing cycles + /// * `probing_amount_msat` - Amount in milli-satoshis per probe + /// * `strategy` - Custom [`ProbingStrategy`] implementation + /// + /// [`ProbingStrategy`]: crate::probing::ProbingStrategy + pub fn set_probing_service_with_custom_strategy( + &mut self, probing_interval_secs: u64, probing_amount_msat: u64, strategy: T, + ) -> &mut Self { + self.probing_service_config = Some(ProbingServiceConfig { + probing_interval_secs, + probing_amount_msat, + strategy: ProbingStrategyConfig::Custom { strategy: Arc::new(strategy) }, + }); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -744,6 +804,7 @@ impl NodeBuilder { runtime, logger, Arc::new(vss_store), + self.probing_service_config.as_ref(), ) } @@ -778,6 +839,7 @@ impl NodeBuilder { runtime, logger, kv_store, + self.probing_service_config.as_ref(), ) } } @@ -977,6 +1039,27 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); } + /// Configures the probing service with a custom target selection strategy. + /// + /// This allows full control over how probing targets are selected by providing + /// a custom implementation of the [`ProbingStrategy`] trait. + /// + /// # Parameters + /// * `probing_interval_secs` - Seconds between probing cycles + /// * `probing_amount_msat` - Amount in milli-satoshis per probe + /// * `strategy` - Custom [`ProbingStrategy`] implementation + /// + /// [`ProbingStrategy`]: crate::probing::ProbingStrategy + pub fn set_probing_service_with_custom_strategy( + &self, probing_interval_secs: u64, probing_amount_msat: u64, strategy: T, + ) { + self.inner.write().unwrap().set_probing_service_with_custom_strategy( + probing_interval_secs, + probing_amount_msat, + strategy, + ); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -1142,6 +1225,7 @@ fn build_with_store_internal( pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, async_payments_role: Option, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, kv_store: Arc, + probing_service_config: Option<&ProbingServiceConfig>, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1767,6 +1851,25 @@ fn build_with_store_internal( let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone()); + let probing_service = if let Some(pro_ser) = probing_service_config { + let strategy: Arc = match &pro_ser.strategy { + ProbingStrategyConfig::Custom { strategy } => Arc::clone(strategy), + }; + Some(Arc::new(ProbingService::new( + pro_ser.probing_interval_secs, + pro_ser.probing_amount_msat, + Arc::clone(&strategy), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&channel_manager), + Arc::clone(&keys_manager), + Arc::clone(&is_running), + Arc::clone(&payment_store), + ))) + } else { + None + }; + Ok(Node { runtime, stop_sender, @@ -1797,6 +1900,7 @@ fn build_with_store_internal( node_metrics, om_mailbox, async_payments_role, + probing_service, }) } diff --git a/src/lib.rs b/src/lib.rs index 6a26c6c5b..eccbd9477 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod probing; mod runtime; mod scoring; mod tx_broadcaster; @@ -149,6 +150,8 @@ use payment::{ UnifiedQrPayment, }; use peer_store::{PeerInfo, PeerStore}; +use probing::ProbingService; +pub use probing::ProbingStrategy; use rand::Rng; use runtime::Runtime; use types::{ @@ -196,6 +199,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + probing_service: Option>, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -625,6 +629,32 @@ impl Node { }); } + if let Some(probing_service) = self.probing_service.as_ref() { + let mut stop_probing_service = self.stop_sender.subscribe(); + let probing_service = Arc::clone(probing_service); + let probing_logger = Arc::clone(&self.logger); + + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = tokio::time::interval(Duration::from_secs( + probing_service.probing_interval_secs, + )); + loop { + tokio::select! { + _ = stop_probing_service.changed() => { + log_debug!( + probing_logger, + "Stopping probing service.", + ); + return; + } + _ = interval.tick() => { + probing_service.handle_probing(); + } + } + } + }); + } + log_info!(self.logger, "Startup complete."); *is_running_lock = true; Ok(()) diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 000000000..3d02038c6 --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,89 @@ +use bitcoin::secp256k1::PublicKey; +use std::sync::{Arc, RwLock}; + +use crate::{ + config::Config, + logger::{log_debug, log_error, LdkLogger, Logger}, + payment::SpontaneousPayment, + types::{ChannelManager, KeysManager, PaymentStore}, +}; + +/// Trait for probing strategies to select targets for liquidity assessment. +pub trait ProbingStrategy { + /// Loads the targets to be used in the current probing cycle. + /// + /// Called at the start of each probing cycle before sending probes. + fn load_targets(&self); + + /// Returns the next target public key for probing, or None if no more targets are available. + fn next_target(&self) -> Option; +} + +/// Configuration for the probing service used to evaluate channel liquidity by sending pre-flight +/// probes to peers and routes. +pub struct ProbingService { + pub probing_interval_secs: u64, + probing_amount_msat: u64, + strategy: Arc, + config: Arc, + logger: Arc, + channel_manager: Arc, + keys_manager: Arc, + is_running: Arc>, + payment_store: Arc, +} + +impl ProbingService { + /// Creates a new probing service with the given configuration and dependencies. + pub fn new( + probing_interval_secs: u64, probing_amount_msat: u64, + strategy: Arc, config: Arc, logger: Arc, + channel_manager: Arc, keys_manager: Arc, + is_running: Arc>, payment_store: Arc, + ) -> Self { + Self { + probing_interval_secs, + probing_amount_msat, + strategy, + config, + logger, + channel_manager, + keys_manager, + is_running, + payment_store, + } + } + + pub fn handle_probing(&self) { + self.strategy.load_targets(); + loop { + if let Some(target) = self.strategy.next_target() { + let spontaneous_payment = self.spontaneous_payment(); + match spontaneous_payment.send_probes(self.probing_amount_msat, target) { + Ok(_) => { + log_debug!(self.logger, "Probing service sent probe to target: {}", target) + }, + Err(e) => log_error!( + self.logger, + "Probing service failed to send probe to target {}: {}", + target, + e + ), + } + } else { + break; + } + } + } + + fn spontaneous_payment(&self) -> SpontaneousPayment { + SpontaneousPayment::new( + Arc::clone(&self.channel_manager), + Arc::clone(&self.keys_manager), + Arc::clone(&self.payment_store), + Arc::clone(&self.config), + Arc::clone(&self.is_running), + Arc::clone(&self.logger), + ) + } +} From fd8ab952294c3db0d7af8f91c220ca9024005aa3 Mon Sep 17 00:00:00 2001 From: moisesPomilio <93723302+moisesPompilio@users.noreply.github.com> Date: Fri, 31 Oct 2025 22:08:15 -0300 Subject: [PATCH 2/2] Add `HighCapacityStrategy` implementing `ProbingStrategy` - Add `HighCapacityStrategy` that selects top-N nodes by aggregate capacity from `NetworkGraph`. - Add lightweight caching with configurable reuse limit to avoid frequent graph scans. - Wireable from builder via `set_probing_service_with_high_capacity_strategy` --- src/builder.rs | 77 ++++++++++++++- src/graph.rs | 35 +++++++ src/probing.rs | 69 ++++++++++++- tests/integration_tests_rust.rs | 170 ++++++++++++++++++++++++++++++++ 4 files changed, 348 insertions(+), 3 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index fc0b810e1..2eaa244ce 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -66,7 +66,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; -use crate::probing::{ProbingService, ProbingStrategy}; +use crate::probing::{HighCapacityStrategy, ProbingService, ProbingStrategy}; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -144,6 +144,7 @@ struct ProbingServiceConfig { pub enum ProbingStrategyConfig { Custom { strategy: Arc }, + HighCapacity { max_targets_per_cycle: usize, target_cache_reuse_limit: usize }, } impl fmt::Debug for ProbingStrategyConfig { @@ -152,6 +153,14 @@ impl fmt::Debug for ProbingStrategyConfig { Self::Custom { .. } => { f.debug_struct("Custom").field("strategy", &"").finish() }, + Self::HighCapacity { + max_targets_per_cycle: max_targets, + target_cache_reuse_limit: max_reloads, + } => f + .debug_struct("HighCapacity") + .field("max_targets", max_targets) + .field("max_reloads", max_reloads) + .finish(), } } } @@ -160,6 +169,13 @@ impl Clone for ProbingStrategyConfig { fn clone(&self) -> Self { match self { Self::Custom { strategy } => Self::Custom { strategy: Arc::clone(strategy) }, + Self::HighCapacity { + max_targets_per_cycle: max_targets, + target_cache_reuse_limit: max_reloads, + } => Self::HighCapacity { + max_targets_per_cycle: *max_targets, + target_cache_reuse_limit: *max_reloads, + }, } } } @@ -548,6 +564,33 @@ impl NodeBuilder { self } + /// Configures the probing service with the built-in high-capacity strategy. + /// + /// Targets peers with the highest total channel capacity to assess liquidity + /// on the most significant network routes. + /// + /// # Parameters + /// * `probing_interval_secs` - Seconds between probing cycles + /// * `probing_amount_msat` - Amount in milli-satoshis per probe + /// * `max_targets_per_cycle` - Maximum peers to probe each cycle + /// * `target_cache_reuse_limit` - Number of cycles to reuse targets before refreshing. + /// Acts as a cache: targets are reloaded from the network graph only after this many cycles, + /// reducing overhead while adapting to network changes. + pub fn set_probing_service_with_high_capacity_strategy( + &mut self, probing_interval_secs: u64, probing_amount_msat: u64, + max_targets_per_cycle: usize, target_cache_reuse_limit: usize, + ) -> &mut Self { + self.probing_service_config = Some(ProbingServiceConfig { + probing_interval_secs, + probing_amount_msat, + strategy: ProbingStrategyConfig::HighCapacity { + max_targets_per_cycle, + target_cache_reuse_limit, + }, + }); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -1060,6 +1103,30 @@ impl ArcedNodeBuilder { ); } + /// Configures the probing service with the built-in high-capacity strategy. + /// + /// Targets peers with the highest total channel capacity to assess liquidity + /// on the most significant network routes. + /// + /// # Parameters + /// * `probing_interval_secs` - Seconds between probing cycles + /// * `probing_amount_msat` - Amount in milli-satoshis per probe + /// * `max_targets_per_cycle` - Maximum peers to probe each cycle + /// * `target_cache_reuse_limit` - Number of cycles to reuse targets before refreshing. + /// Acts as a cache: targets are reloaded from the network graph only after this many cycles, + /// reducing overhead while adapting to network changes. + pub fn set_probing_service_with_high_capacity_strategy( + &self, probing_interval_secs: u64, probing_amount_msat: u64, max_targets_per_cycle: usize, + target_cache_reuse_limit: usize, + ) { + self.inner.write().unwrap().set_probing_service_with_high_capacity_strategy( + probing_interval_secs, + probing_amount_msat, + max_targets_per_cycle, + target_cache_reuse_limit, + ); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -1854,6 +1921,14 @@ fn build_with_store_internal( let probing_service = if let Some(pro_ser) = probing_service_config { let strategy: Arc = match &pro_ser.strategy { ProbingStrategyConfig::Custom { strategy } => Arc::clone(strategy), + ProbingStrategyConfig::HighCapacity { + max_targets_per_cycle: max_targets, + target_cache_reuse_limit: max_reloads, + } => Arc::new(HighCapacityStrategy::new( + Arc::clone(&network_graph), + *max_targets, + *max_reloads, + )), }; Some(Arc::new(ProbingService::new( pro_ser.probing_interval_secs, diff --git a/src/graph.rs b/src/graph.rs index f2daebb9f..6d3201339 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -17,6 +17,9 @@ use lightning::routing::gossip::RoutingFees; #[cfg(not(feature = "uniffi"))] use lightning::routing::gossip::{ChannelInfo, NodeInfo}; +use std::cmp::Reverse; +use std::collections::{BinaryHeap, HashMap}; + use crate::types::Graph; /// Represents the network as nodes and channels between them. @@ -48,6 +51,38 @@ impl NetworkGraph { pub fn node(&self, node_id: &NodeId) -> Option { self.inner.read_only().nodes().get(node_id).cloned().map(|n| n.into()) } + + /// Selects nodes with the highest total channel capacity in the network. + pub fn select_highest_capacity_nodes(&self, quantity_nodes: usize) -> Vec { + // Calculate total capacity for each node by summing all their channel capacities + let node_capacities = self.inner.read_only().channels().unordered_iter().fold( + HashMap::new(), + |mut acc, (_, chan_info)| { + let cap = chan_info.capacity_sats.unwrap_or(0); + *acc.entry(chan_info.node_one).or_insert(0) += cap; + *acc.entry(chan_info.node_two).or_insert(0) += cap; + acc + }, + ); + + // Use a min-heap to efficiently track the top N nodes by capacity + node_capacities + .into_iter() + .fold(BinaryHeap::with_capacity(quantity_nodes), |mut top_heap, (node_id, cap)| { + if top_heap.len() < quantity_nodes { + top_heap.push(Reverse((cap, node_id))); + } else if let Some(Reverse((min_cap, _))) = top_heap.peek() { + if cap > *min_cap { + top_heap.pop(); + top_heap.push(Reverse((cap, node_id))); + } + } + top_heap + }) + .into_iter() + .map(|Reverse((_, node_id))| node_id) + .collect() + } } /// Details about a channel (both directions). diff --git a/src/probing.rs b/src/probing.rs index 3d02038c6..89601b036 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -1,11 +1,12 @@ use bitcoin::secp256k1::PublicKey; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use crate::{ config::Config, + graph::NetworkGraph, logger::{log_debug, log_error, LdkLogger, Logger}, payment::SpontaneousPayment, - types::{ChannelManager, KeysManager, PaymentStore}, + types::{ChannelManager, Graph, KeysManager, PaymentStore}, }; /// Trait for probing strategies to select targets for liquidity assessment. @@ -19,6 +20,70 @@ pub trait ProbingStrategy { fn next_target(&self) -> Option; } +/// Simple strategy that selects targets based on highest channel capacity. +pub struct HighCapacityStrategy { + network_graph: Arc, + max_targets_per_cycle: usize, + target_cache_reuse_limit: usize, + targets: Mutex>, + target_index: Mutex, + uses_since_load: Mutex, +} + +impl HighCapacityStrategy { + pub fn new(network_graph: Arc, max_targets: usize, max_reloads: usize) -> Self { + Self { + network_graph, + max_targets_per_cycle: max_targets, + target_cache_reuse_limit: max_reloads, + targets: Mutex::new(Vec::new()), + target_index: Mutex::new(0), + uses_since_load: Mutex::new(0), + } + } + + fn network_graph(&self) -> NetworkGraph { + NetworkGraph::new(Arc::clone(&self.network_graph)) + } +} + +impl ProbingStrategy for HighCapacityStrategy { + fn load_targets(&self) { + let mut targets = self.targets.lock().unwrap(); + let mut uses_since_load = self.uses_since_load.lock().unwrap(); + + if !targets.is_empty() && *uses_since_load < self.target_cache_reuse_limit { + *uses_since_load += 1; + return; + } + + let network = self.network_graph(); + let highest_capacity_nodes = + network.select_highest_capacity_nodes(self.max_targets_per_cycle); + *targets = + highest_capacity_nodes.iter().filter_map(|node_id| node_id.as_pubkey().ok()).collect(); + + let mut target_index = self.target_index.lock().unwrap(); + *target_index = 0; + *uses_since_load = 0; + } + + fn next_target(&self) -> Option { + let mut target_index = self.target_index.lock().unwrap(); + let targets = self.targets.lock().unwrap(); + + if *target_index < targets.len() { + let pk = targets[*target_index]; + *target_index += 1; + Some(pk) + } else { + // reset index for next cycle + *target_index = 0; + None + } + } +} + /// Configuration for the probing service used to evaluate channel liquidity by sending pre-flight /// probes to peers and routes. pub struct ProbingService { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index e2d4207cd..fbd49dfb7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -245,6 +245,176 @@ async fn multi_hop_sending() { expect_payment_successful_event!(nodes[0], payment_id, Some(fee_paid_msat)); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_probing_high_capacity_strategy() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let probing_interval_secs = 2; + let probing_amount_msat = 1000; + let probing_max_targets_per_cycle = 7; + let probing_target_cache_reuse_limit = 1; + + // Setup and fund 5 nodes + let mut nodes = Vec::new(); + for index in 0..9 { + let config = random_config(true); + let sync_config = EsploraSyncConfig { background_sync_config: None }; + setup_builder!(builder, config.node_config); + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + + if index == 0 { + builder.set_probing_service_with_high_capacity_strategy( + probing_interval_secs, + probing_amount_msat, + probing_max_targets_per_cycle, + probing_target_cache_reuse_limit, + ); + } + + let node = builder.build().unwrap(); + node.start().unwrap(); + nodes.push(node); + } + + let addresses = nodes.iter().map(|n| n.onchain_payment().new_address().unwrap()).collect(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + addresses, + Amount::from_sat(premine_amount_sat), + ) + .await; + + for n in &nodes { + n.sync_wallets().unwrap(); + assert_eq!(n.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(n.next_event(), None); + } + + // Setup channel topology: + // (1M:0)- N2 -(1M:0) - N3 -(1M:0) - N4 -(1M:0) + // / \ + // N0 -(100k:0)-> N1 N5 + // \ / + // (1M:0)- N8 -(1M:0) - N7 -(1M:0) - N6 -(1M:0) + + open_channel(&nodes[0], &nodes[1], 100_000, true, &electrsd).await; + open_channel(&nodes[1], &nodes[2], 1_000_009, true, &electrsd).await; + // We need to sync wallets in-between back-to-back channel opens from the same node so BDK + // wallet picks up on the broadcast funding tx and doesn't double-spend itself. + // + // TODO: Remove once fixed in BDK. + nodes[1].sync_wallets().unwrap(); + open_channel(&nodes[2], &nodes[3], 1_000_000, true, &electrsd).await; + open_channel(&nodes[3], &nodes[4], 1_000_000, true, &electrsd).await; + open_channel(&nodes[4], &nodes[5], 1_000_000, true, &electrsd).await; + + open_channel(&nodes[6], &nodes[5], 1_000_000, true, &electrsd).await; + open_channel(&nodes[7], &nodes[6], 1_000_000, true, &electrsd).await; + open_channel(&nodes[8], &nodes[7], 1_000_000, true, &electrsd).await; + open_channel(&nodes[1], &nodes[8], 1_000_000, true, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + for n in &nodes { + n.sync_wallets().unwrap(); + } + + expect_event!(nodes[0], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[2], ChannelReady); + expect_event!(nodes[2], ChannelReady); + expect_event!(nodes[3], ChannelReady); + expect_event!(nodes[3], ChannelReady); + expect_event!(nodes[4], ChannelReady); + expect_event!(nodes[4], ChannelReady); + expect_event!(nodes[5], ChannelReady); + expect_event!(nodes[5], ChannelReady); + expect_event!(nodes[6], ChannelReady); + expect_event!(nodes[6], ChannelReady); + expect_event!(nodes[7], ChannelReady); + expect_event!(nodes[7], ChannelReady); + expect_event!(nodes[8], ChannelReady); + expect_event!(nodes[8], ChannelReady); + + // Sleep a bit for gossip to propagate. + std::thread::sleep(std::time::Duration::from_secs(1)); + + let invoice = nodes[5] + .bolt11_payment() + .receive( + 1_000, + &Bolt11InvoiceDescription::Direct( + Description::new(String::from("probe test")).unwrap(), + ) + .into(), + 9217, + ) + .unwrap(); + + let route_params = RouteParametersConfig { + max_total_routing_fee_msat: Some(75_000), + max_total_cltv_expiry_delta: 1000, + max_path_count: 1, + max_channel_saturation_power_of_half: 0, + }; + nodes[0].bolt11_payment().send(&invoice, Some(route_params)).unwrap(); + expect_event!(nodes[1], PaymentForwarded); + + // We expect that the payment goes through N2 or N3, so we check both for the PaymentForwarded event. + let node_3_fwd_event = matches!(nodes[3].next_event(), Some(Event::PaymentForwarded { .. })); + let node_7_fwd_event = matches!(nodes[7].next_event(), Some(Event::PaymentForwarded { .. })); + assert_ne!(node_3_fwd_event, node_7_fwd_event); + let id_node_route = if node_3_fwd_event { 3 } else { 7 }; + println!("Probing used node {}", id_node_route); + + let invoice = nodes[5] + .bolt11_payment() + .receive( + 1_000, + &Bolt11InvoiceDescription::Direct( + Description::new(String::from("probe test")).unwrap(), + ) + .into(), + 9217, + ) + .unwrap(); + nodes[0].bolt11_payment().send(&invoice, Some(route_params)).unwrap(); + expect_event!(nodes[1], PaymentForwarded); + + let node_3_fwd_event = matches!(nodes[3].next_event(), Some(Event::PaymentForwarded { .. })); + let node_7_fwd_event = matches!(nodes[7].next_event(), Some(Event::PaymentForwarded { .. })); + assert_ne!(node_3_fwd_event, node_7_fwd_event); + let check_id_node_route = if node_3_fwd_event { 3 } else { 7 }; + assert_eq!(id_node_route, check_id_node_route); + + nodes[check_id_node_route].stop().unwrap(); + // Sleep to allow probing to occur with one less node. + std::thread::sleep(std::time::Duration::from_secs(5)); + nodes[check_id_node_route].start().unwrap(); + + let invoice = nodes[5] + .bolt11_payment() + .receive( + 1_000, + &Bolt11InvoiceDescription::Direct( + Description::new(String::from("probe test")).unwrap(), + ) + .into(), + 9217, + ) + .unwrap(); + nodes[0].bolt11_payment().send(&invoice, Some(route_params)).unwrap(); + expect_event!(nodes[1], PaymentForwarded); + + let node_3_fwd_event = matches!(nodes[3].next_event(), Some(Event::PaymentForwarded { .. })); + let node_7_fwd_event = matches!(nodes[7].next_event(), Some(Event::PaymentForwarded { .. })); + assert!(node_3_fwd_event || node_7_fwd_event); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn start_stop_reinit() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();