X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fbitcoind_client.rs;h=73ee635dec917c2d861994832ccb582e60215f01;hb=2bd0d81d407506d05901c3ac204fcbe3f8da34e9;hp=71e6272fbce7969ad6f3583738333b4356659c39;hpb=25a4f605443ed2bc2bb9548f5679a86fc76d0333;p=ldk-sample diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index 71e6272..73ee635 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -1,149 +1,273 @@ +use crate::convert::{BlockchainInfo, FeeResponse, FundedTx, NewAddress, RawTx, SignedTx}; use base64; +use bitcoin::blockdata::block::Block; use bitcoin::blockdata::transaction::Transaction; use bitcoin::consensus::encode; +use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::util::address::Address; -use crate::convert::{BlockchainInfo, FeeResponse, FundedTx, NewAddress, RawTx, SignedTx}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning_block_sync::http::HttpEndpoint; use lightning_block_sync::rpc::RpcClient; +use lightning_block_sync::{AsyncBlockSourceResult, BlockHeaderData, BlockSource}; use serde_json; use std::collections::HashMap; use std::str::FromStr; -use std::sync::Mutex; -use tokio::runtime::{Handle, Runtime}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; pub struct BitcoindClient { - bitcoind_rpc_client: Mutex, - host: String, - port: u16, - rpc_user: String, - rpc_password: String, - runtime: Mutex, + bitcoind_rpc_client: Arc>, + host: String, + port: u16, + rpc_user: String, + rpc_password: String, + fees: Arc>, +} + +#[derive(Clone, Eq, Hash, PartialEq)] +pub enum Target { + Background, + Normal, + HighPriority, } +impl BlockSource for &BitcoindClient { + fn get_header<'a>( + &'a mut self, header_hash: &'a BlockHash, height_hint: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { + let mut rpc = self.bitcoind_rpc_client.lock().await; + rpc.get_header(header_hash, height_hint).await + }) + } + + fn get_block<'a>( + &'a mut self, header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { + let mut rpc = self.bitcoind_rpc_client.lock().await; + rpc.get_block(header_hash).await + }) + } + + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)> { + Box::pin(async move { + let mut rpc = self.bitcoind_rpc_client.lock().await; + rpc.get_best_block().await + }) + } +} + +/// The minimum feerate we are allowed to send, as specify by LDK. +const MIN_FEERATE: u32 = 253; + impl BitcoindClient { - pub fn new(host: String, port: u16, rpc_user: String, rpc_password: String) -> - std::io::Result - { - let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); - let rpc_credentials = base64::encode(format!("{}:{}", rpc_user.clone(), - rpc_password.clone())); - let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; - let client = Self { - bitcoind_rpc_client: Mutex::new(bitcoind_rpc_client), - host, - port, - rpc_user, - rpc_password, - runtime: Mutex::new(Runtime::new().unwrap()), - }; - Ok(client) - } - - pub fn get_new_rpc_client(&self) -> std::io::Result { - let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); - let rpc_credentials = base64::encode(format!("{}:{}", - self.rpc_user.clone(), - self.rpc_password.clone())); - RpcClient::new(&rpc_credentials, http_endpoint) - } - - pub fn create_raw_transaction(&self, outputs: Vec>) -> RawTx { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - let outputs_json = serde_json::json!(outputs); - runtime.block_on(rpc.call_method::("createrawtransaction", &vec![serde_json::json!([]), outputs_json])).unwrap() - } - - pub fn fund_raw_transaction(&self, raw_tx: RawTx) -> FundedTx { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - let raw_tx_json = serde_json::json!(raw_tx.0); - runtime.block_on(rpc.call_method("fundrawtransaction", &[raw_tx_json])).unwrap() - } - - pub fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - let tx_hex_json = serde_json::json!(tx_hex); - runtime.block_on(rpc.call_method("signrawtransactionwithwallet", - &vec![tx_hex_json])).unwrap() - } - - pub fn get_new_address(&self) -> Address { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - let addr_args = vec![serde_json::json!("LDK output address")]; - let addr = runtime.block_on(rpc.call_method::("getnewaddress", &addr_args)).unwrap(); - Address::from_str(addr.0.as_str()).unwrap() - } - - pub fn get_blockchain_info(&self) -> BlockchainInfo { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - runtime.block_on(rpc.call_method::("getblockchaininfo", - &vec![])).unwrap() - } + pub async fn new( + host: String, port: u16, rpc_user: String, rpc_password: String, + ) -> std::io::Result { + let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); + let rpc_credentials = + base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); + let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; + let _dummy = bitcoind_rpc_client + .call_method::("getblockchaininfo", &vec![]) + .await + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::PermissionDenied, + "Failed to make initial call to bitcoind - please check your RPC user/password and access settings") + })?; + let mut fees: HashMap = HashMap::new(); + fees.insert(Target::Background, AtomicU32::new(MIN_FEERATE)); + fees.insert(Target::Normal, AtomicU32::new(2000)); + fees.insert(Target::HighPriority, AtomicU32::new(5000)); + let client = Self { + bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)), + host, + port, + rpc_user, + rpc_password, + fees: Arc::new(fees), + }; + BitcoindClient::poll_for_fee_estimates( + client.fees.clone(), + client.bitcoind_rpc_client.clone(), + ) + .await; + Ok(client) + } + + async fn poll_for_fee_estimates( + fees: Arc>, rpc_client: Arc>, + ) { + tokio::spawn(async move { + loop { + let background_estimate = { + let mut rpc = rpc_client.lock().await; + let background_conf_target = serde_json::json!(144); + let background_estimate_mode = serde_json::json!("ECONOMICAL"); + let resp = rpc + .call_method::( + "estimatesmartfee", + &vec![background_conf_target, background_estimate_mode], + ) + .await + .unwrap(); + match resp.feerate_sat_per_kw { + Some(feerate) => std::cmp::max(feerate, MIN_FEERATE), + None => MIN_FEERATE, + } + }; + + let normal_estimate = { + let mut rpc = rpc_client.lock().await; + let normal_conf_target = serde_json::json!(18); + let normal_estimate_mode = serde_json::json!("ECONOMICAL"); + let resp = rpc + .call_method::( + "estimatesmartfee", + &vec![normal_conf_target, normal_estimate_mode], + ) + .await + .unwrap(); + match resp.feerate_sat_per_kw { + Some(feerate) => std::cmp::max(feerate, MIN_FEERATE), + None => 2000, + } + }; + + let high_prio_estimate = { + let mut rpc = rpc_client.lock().await; + let high_prio_conf_target = serde_json::json!(6); + let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE"); + let resp = rpc + .call_method::( + "estimatesmartfee", + &vec![high_prio_conf_target, high_prio_estimate_mode], + ) + .await + .unwrap(); + + match resp.feerate_sat_per_kw { + Some(feerate) => std::cmp::max(feerate, MIN_FEERATE), + None => 5000, + } + }; + + fees.get(&Target::Background) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&Target::Normal).unwrap().store(normal_estimate, Ordering::Release); + fees.get(&Target::HighPriority) + .unwrap() + .store(high_prio_estimate, Ordering::Release); + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); + } + + pub fn get_new_rpc_client(&self) -> std::io::Result { + let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); + let rpc_credentials = + base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone())); + RpcClient::new(&rpc_credentials, http_endpoint) + } + + pub async fn create_raw_transaction(&self, outputs: Vec>) -> RawTx { + let mut rpc = self.bitcoind_rpc_client.lock().await; + + let outputs_json = serde_json::json!(outputs); + rpc.call_method::("createrawtransaction", &vec![serde_json::json!([]), outputs_json]) + .await + .unwrap() + } + + pub async fn fund_raw_transaction(&self, raw_tx: RawTx) -> FundedTx { + let mut rpc = self.bitcoind_rpc_client.lock().await; + + let raw_tx_json = serde_json::json!(raw_tx.0); + let options = serde_json::json!({ + // LDK gives us feerates in satoshis per KW but Bitcoin Core here expects fees + // denominated in satoshis per vB. First we need to multiply by 4 to convert weight + // units to virtual bytes, then divide by 1000 to convert KvB to vB. + "fee_rate": self.get_est_sat_per_1000_weight(ConfirmationTarget::Normal) as f64 / 250.0, + // While users could "cancel" a channel open by RBF-bumping and paying back to + // themselves, we don't allow it here as its easy to have users accidentally RBF bump + // and pay to the channel funding address, which results in loss of funds. Real + // LDK-based applications should enable RBF bumping and RBF bump either to a local + // change address or to a new channel output negotiated with the same node. + "replaceable": false, + }); + rpc.call_method("fundrawtransaction", &[raw_tx_json, options]).await.unwrap() + } + + pub async fn send_raw_transaction(&self, raw_tx: RawTx) { + let mut rpc = self.bitcoind_rpc_client.lock().await; + + let raw_tx_json = serde_json::json!(raw_tx.0); + rpc.call_method::("sendrawtransaction", &[raw_tx_json]).await.unwrap(); + } + + pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx { + let mut rpc = self.bitcoind_rpc_client.lock().await; + + let tx_hex_json = serde_json::json!(tx_hex); + rpc.call_method("signrawtransactionwithwallet", &vec![tx_hex_json]).await.unwrap() + } + + pub async fn get_new_address(&self) -> Address { + let mut rpc = self.bitcoind_rpc_client.lock().await; + + let addr_args = vec![serde_json::json!("LDK output address")]; + let addr = rpc.call_method::("getnewaddress", &addr_args).await.unwrap(); + Address::from_str(addr.0.as_str()).unwrap() + } + + pub async fn get_blockchain_info(&self) -> BlockchainInfo { + let mut rpc = self.bitcoind_rpc_client.lock().await; + rpc.call_method::("getblockchaininfo", &vec![]).await.unwrap() + } } impl FeeEstimator for BitcoindClient { - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - let runtime = self.runtime.lock().unwrap(); - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - - let (conf_target, estimate_mode, default) = match confirmation_target { - ConfirmationTarget::Background => (144, "ECONOMICAL", 253), - ConfirmationTarget::Normal => (18, "ECONOMICAL", 20000), - ConfirmationTarget::HighPriority => (6, "ECONOMICAL", 50000), - }; - - // This function may be called from a tokio runtime, or not. So we need to check before - // making the call to avoid the error "cannot run a tokio runtime from within a tokio runtime". - let conf_target_json = serde_json::json!(conf_target); - let estimate_mode_json = serde_json::json!(estimate_mode); - let resp = match Handle::try_current() { - Ok(_) => { - tokio::task::block_in_place(|| { - runtime.block_on(rpc.call_method::("estimatesmartfee", - &vec![conf_target_json, - estimate_mode_json])).unwrap() - }) - }, - _ => runtime.block_on(rpc.call_method::("estimatesmartfee", - &vec![conf_target_json, - estimate_mode_json])).unwrap() - }; - if resp.errored { - return default - } - resp.feerate.unwrap() - } + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + match confirmation_target { + ConfirmationTarget::Background => { + self.fees.get(&Target::Background).unwrap().load(Ordering::Acquire) + } + ConfirmationTarget::Normal => { + self.fees.get(&Target::Normal).unwrap().load(Ordering::Acquire) + } + ConfirmationTarget::HighPriority => { + self.fees.get(&Target::HighPriority).unwrap().load(Ordering::Acquire) + } + } + } } impl BroadcasterInterface for BitcoindClient { - fn broadcast_transaction(&self, tx: &Transaction) { - let mut rpc = self.bitcoind_rpc_client.lock().unwrap(); - let runtime = self.runtime.lock().unwrap(); - - let tx_serialized = serde_json::json!(encode::serialize_hex(tx)); - // This function may be called from a tokio runtime, or not. So we need to check before - // making the call to avoid the error "cannot run a tokio runtime from within a tokio runtime". - match Handle::try_current() { - Ok(_) => { - tokio::task::block_in_place(|| { - runtime.block_on(rpc.call_method::("sendrawtransaction", - &vec![tx_serialized])).unwrap(); - }); - }, - _ => { - runtime.block_on(rpc.call_method::("sendrawtransaction", - &vec![tx_serialized])).unwrap(); - } - } - } + fn broadcast_transaction(&self, tx: &Transaction) { + let bitcoind_rpc_client = self.bitcoind_rpc_client.clone(); + let tx_serialized = serde_json::json!(encode::serialize_hex(tx)); + tokio::spawn(async move { + let mut rpc = bitcoind_rpc_client.lock().await; + // This may error due to RL calling `broadcast_transaction` with the same transaction + // multiple times, but the error is safe to ignore. + match rpc.call_method::("sendrawtransaction", &vec![tx_serialized]).await { + Ok(_) => {} + Err(e) => { + let err_str = e.get_ref().unwrap().to_string(); + if !err_str.contains("Transaction already in block chain") + && !err_str.contains("Inputs missing or spent") + && !err_str.contains("bad-txns-inputs-missingorspent") + && !err_str.contains("non-BIP68-final") + && !err_str.contains("insufficient fee, rejecting replacement ") + { + panic!("{}", e); + } + } + } + }); + } }