Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,6 @@ check-cfg = [
"cfg(cln_test)",
"cfg(lnd_test)",
]

[patch.crates-io]
esplora-client = { git = 'https://github.com/acidbunny21/rust-esplora-client.git', branch = 'submit-tx-pkg-clients' }
114 changes: 109 additions & 5 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,8 @@ impl BitcoindChainSource {
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
// features, we should eventually switch to use `submitpackage` via the
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
// transactions.
for tx in &package {
if package.len() == 1 {
let tx = &package[0];
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
Expand Down Expand Up @@ -571,6 +568,48 @@ impl BitcoindChainSource {
);
},
}
} else if package.len() > 1 {
let txids: Vec<_> = package.iter().map(|tx| tx.compute_txid()).collect();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.api_client.submit_package(&package),
);
match timeout_fut.await {
Ok(res) => match res {
Ok((package_msg, mut ids)) => {
// TODO: seems we don't get the txids back in the same order...
ids.sort_unstable();
let mut sorted_txids = txids.clone();
sorted_txids.sort_unstable();
debug_assert_eq!(ids, sorted_txids);
log_trace!(
self.logger,
"Package broadcast message {}, txids: {:?}",
package_msg,
txids,
);
},
Err(e) => {
log_error!(self.logger, "Failed to broadcast package {:?}: {}", txids, e);
log_trace!(self.logger, "Failed broadcast package bytes:");
for tx in package {
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
}
},
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast package due to timeout {:?}: {}",
txids,
e
);
log_trace!(self.logger, "Failed broadcast package bytes:");
for tx in package {
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
}
},
}
}
}
}
Expand Down Expand Up @@ -667,6 +706,34 @@ impl BitcoindClient {
rpc_client.call_method::<Txid>("sendrawtransaction", &[tx_json]).await
}

/// Submits the provided package
pub(crate) async fn submit_package(
&self, package: &[Transaction],
) -> std::io::Result<(String, Vec<Txid>)> {
match self {
BitcoindClient::Rpc { rpc_client, .. } => {
Self::submit_package_inner(Arc::clone(rpc_client), package).await
},
BitcoindClient::Rest { rpc_client, .. } => {
// Bitcoin Core's REST interface does not support submitting packages
// so we use the RPC client.
Self::submit_package_inner(Arc::clone(rpc_client), package).await
},
}
}

async fn submit_package_inner(
rpc_client: Arc<RpcClient>, package: &[Transaction],
) -> std::io::Result<(String, Vec<Txid>)> {
let package_serialized: Vec<_> =
package.iter().map(|tx| bitcoin::consensus::encode::serialize_hex(tx)).collect();
let package_json = serde_json::json!(package_serialized);
rpc_client
.call_method::<SubmitPackageResponse>("submitpackage", &[package_json])
.await
.map(|resp| (resp.package_msg, resp.txids))
}

/// Retrieve the fee estimate needed for a transaction to begin
/// confirmation within the provided `num_blocks`.
pub(crate) async fn get_fee_estimate_for_target(
Expand Down Expand Up @@ -1302,6 +1369,43 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
}
}

pub struct SubmitPackageResponse {
package_msg: String,
txids: Vec<Txid>,
}

impl TryInto<SubmitPackageResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<SubmitPackageResponse> {
let package_msg = self.0["package_msg"]
.as_str()
.ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse submitpackage response",
))?
.to_string();
let tx_results = self.0["tx-results"].as_object().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse submitpackage response",
))?;
let mut txids = Vec::with_capacity(tx_results.len());
for tx_result in tx_results.values() {
let txid_string = tx_result["txid"].as_str().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse submitpackage response",
))?;
let txid: Txid = txid_string.parse().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse submitpackage response",
)
})?;
txids.push(txid);
}
Ok(SubmitPackageResponse { package_msg, txids })
}
}

#[derive(Debug, Clone)]
pub(crate) struct MempoolEntry {
/// The transaction id
Expand Down
78 changes: 76 additions & 2 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,17 @@ impl EsploraChainSource {
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
for tx in &package {
if package.len() == 1 {
let tx = &package[0];
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.esplora_client.broadcast(tx),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
Ok(id) => {
debug_assert_eq!(id, txid);
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
},
Err(e) => match e {
Expand Down Expand Up @@ -432,6 +434,78 @@ impl EsploraChainSource {
);
},
}
} else if package.len() > 1 {
let txids: Vec<_> = package.iter().map(|tx| tx.compute_txid()).collect();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.esplora_client.submit_package(&package, None, None),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(result) => {
// TODO: sometimes, we get 0 ids back...
let _ids: Vec<_> =
result.tx_results.values().map(|value| value.txid).collect();
log_trace!(
self.logger,
"Package broadcast message {}, txids: {:?}",
result.package_msg,
txids,
);
},
Err(e) => match e {
esplora_client::Error::HttpResponse { status, message } => {
if status == 400 {
// Log 400 at lesser level, as this often just means bitcoind already knows the
// transaction.
// FIXME: We can further differentiate here based on the error
// message which will be available with rust-esplora-client 0.7 and
// later.
log_trace!(
self.logger,
"Failed to broadcast due to HTTP connection error: {}",
message
);
} else {
log_error!(
self.logger,
"Failed to broadcast due to HTTP connection error: {} - {}",
status,
message
);
}
log_trace!(self.logger, "Failed broadcast package bytes:");
for tx in package {
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
}
},
_ => {
log_error!(
self.logger,
"Failed to broadcast package {:?}: {}",
txids,
e
);
log_trace!(self.logger, "Failed broadcast package bytes:");
for tx in package {
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
}
},
},
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast package due to timeout {:?}: {}",
txids,
e
);
log_trace!(self.logger, "Failed broadcast transaction bytes:");
for tx in package {
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
}
},
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig {
user_config.manually_accept_inbound_channels = true;
user_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx =
config.anchor_channels_config.is_some();
user_config.channel_handshake_config.negotiate_anchor_zero_fee_commitments =
config.anchor_channels_config.is_some();

if may_announce_channel(config).is_err() {
user_config.accept_forwards_to_priv_channels = false;
Expand Down
3 changes: 2 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,8 @@ where
}
}

let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx();
let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx()
|| channel_type.requires_anchor_zero_fee_commitments();
if anchor_channel {
if let Some(anchor_channels_config) =
self.config.anchor_channels_config.as_ref()
Expand Down
13 changes: 7 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,11 +1082,11 @@ impl Node {
.peer_by_node_id(&node_id)
.ok_or(Error::ConnectionFailed)?
.init_features;
let anchor_channel = init_features.requires_anchors_zero_fee_htlc_tx()
|| init_features.requires_anchor_zero_fee_commitments();
let required_funds_sats = channel_amount_sats
+ self.config.anchor_channels_config.as_ref().map_or(0, |c| {
if init_features.requires_anchors_zero_fee_htlc_tx()
&& !c.trusted_peers_no_reserve.contains(&node_id)
{
if anchor_channel && !c.trusted_peers_no_reserve.contains(&node_id) {
c.per_channel_reserve_sats
} else {
0
Expand Down Expand Up @@ -1611,9 +1611,10 @@ pub(crate) fn total_anchor_channels_reserve_sats(
!anchor_channels_config.trusted_peers_no_reserve.contains(&c.counterparty.node_id)
&& c.channel_shutdown_state
.map_or(true, |s| s != ChannelShutdownState::ShutdownComplete)
&& c.channel_type
.as_ref()
.map_or(false, |t| t.requires_anchors_zero_fee_htlc_tx())
&& c.channel_type.as_ref().map_or(false, |t| {
t.requires_anchors_zero_fee_htlc_tx()
|| t.requires_anchor_zero_fee_commitments()
})
})
.count() as u64
* anchor_channels_config.per_channel_reserve_sats
Expand Down
4 changes: 3 additions & 1 deletion src/liquidity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,11 @@ where
total_anchor_channels_reserve_sats(&self.channel_manager, &self.config);
let spendable_amount_sats =
self.wallet.get_spendable_amount_sats(cur_anchor_reserve_sats).unwrap_or(0);
let anchor_channel = init_features.requires_anchors_zero_fee_htlc_tx()
|| init_features.requires_anchor_zero_fee_commitments();
let required_funds_sats = channel_amount_sats
+ self.config.anchor_channels_config.as_ref().map_or(0, |c| {
if init_features.requires_anchors_zero_fee_htlc_tx()
if anchor_channel
&& !c.trusted_peers_no_reserve.contains(&their_network_key)
{
c.per_channel_reserve_sats
Expand Down
Loading