X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=aae64e981bb54271d0279c26db666bdee77f4d10;hb=e8f154dd3c299c7988762909df48b0c9d919d6f8;hp=fb6baabfac1d86c78fca7892afaf9a4bbdb24763;hpb=7c9463668a4f71663746e57bdb09ee6b91797d5a;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index fb6baabf..aae64e98 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -24,19 +24,18 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::EventHandler; -#[cfg(any(feature = "std", feature = "futures"))] +#[cfg(feature = "std")] use lightning::events::EventsProvider; -use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; +use lightning::onion_message::messenger::AOnionMessenger; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; -use lightning::routing::router::Router; use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; @@ -57,7 +56,7 @@ use std::thread::{self, JoinHandle}; use std::time::Instant; #[cfg(not(feature = "std"))] -use alloc::vec::Vec; +use alloc::boxed::Box; /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its @@ -81,6 +80,8 @@ use alloc::vec::Vec; /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for /// unilateral chain closure fees are at risk. /// +/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager +/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor /// [`Event`]: lightning::events::Event /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred @@ -281,12 +282,13 @@ macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, - $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $onion_messenger: ident, $process_onion_message_handler_events: expr, + $peer_manager: ident, $gossip_sync: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); @@ -336,19 +338,21 @@ macro_rules! define_run_body { break; } - if $channel_manager.get_and_clear_needs_persistence() { + if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { - log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); - $peer_manager.onion_message_handler().timer_tick_occurred(); + if let Some(om) = &$onion_messenger { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); } if await_slow { @@ -440,7 +444,7 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { @@ -539,45 +543,67 @@ use core::task; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; -/// # struct MyStore {} -/// # impl lightning::util::persist::KVStore for MyStore { +/// # struct Logger {} +/// # impl lightning::util::logger::Logger for Logger { +/// # fn log(&self, _record: lightning::util::logger::Record) {} +/// # } +/// # struct Store {} +/// # impl lightning::util::persist::KVStore for Store { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } -/// # struct MyEventHandler {} -/// # impl MyEventHandler { +/// # struct EventHandler {} +/// # impl EventHandler { /// # async fn handle_event(&self, _: lightning::events::Event) {} /// # } /// # #[derive(Eq, PartialEq, Clone, Hash)] -/// # struct MySocketDescriptor {} -/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { +/// # struct SocketDescriptor {} +/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor { /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; -/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; -/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; -/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; -/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; -/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; -/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; -/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; -/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; -/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = RwLock, Arc>>; -/// -/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { -/// let background_persister = Arc::clone(&my_persister); -/// let background_event_handler = Arc::clone(&my_event_handler); -/// let background_chain_mon = Arc::clone(&my_chain_monitor); -/// let background_chan_man = Arc::clone(&my_channel_manager); -/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync)); -/// let background_peer_man = Arc::clone(&my_peer_manager); -/// let background_logger = Arc::clone(&my_logger); -/// let background_scorer = Arc::clone(&my_scorer); +/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; +/// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; +/// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; +/// # type OnionMessenger = lightning::onion_message::messenger::OnionMessenger, Arc, Arc, Arc>, Arc, Arc, Arc>>, Arc>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>; +/// # type Scorer = RwLock, Arc>>; +/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger>; +/// # +/// # struct Node< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # > { +/// # peer_manager: Arc>, +/// # event_handler: Arc, +/// # channel_manager: Arc>, +/// # onion_messenger: Arc>, +/// # chain_monitor: Arc>, +/// # gossip_sync: Arc>, +/// # persister: Arc, +/// # logger: Arc, +/// # scorer: Arc, +/// # } +/// # +/// # async fn setup_background_processing< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # >(node: Node) { +/// let background_persister = Arc::clone(&node.persister); +/// let background_event_handler = Arc::clone(&node.event_handler); +/// let background_chain_mon = Arc::clone(&node.chain_monitor); +/// let background_chan_man = Arc::clone(&node.channel_manager); +/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync)); +/// let background_peer_man = Arc::clone(&node.peer_manager); +/// let background_onion_messenger = Arc::clone(&node.onion_messenger); +/// let background_logger = Arc::clone(&node.logger); +/// let background_scorer = Arc::clone(&node.scorer); /// /// // Setup the sleeper. /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); @@ -600,6 +626,7 @@ use core::task; /// |e| background_event_handler.handle_event(e), /// background_chain_mon, /// background_chan_man, +/// Some(background_onion_messenger), /// background_gossip_sync, /// background_peer_man, /// background_logger, @@ -607,9 +634,9 @@ use core::task; /// sleeper, /// mobile_interruptable_platform, /// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) -/// ) -/// .await -/// .expect("Failed to process events"); +/// ) +/// .await +/// .expect("Failed to process events"); /// }); /// /// // Stop the background processing. @@ -622,21 +649,17 @@ pub async fn process_events_async< 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -647,22 +670,20 @@ pub async fn process_events_async< FetchTime: Fn() -> Option, >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, PM::Target: APeerManager + Send + Sync, { let mut should_break = false; @@ -673,7 +694,7 @@ where let logger = &logger; let persister = &persister; let fetch_time = &fetch_time; - async move { + Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68 if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } @@ -688,16 +709,16 @@ where } } event_handler(event).await; - } + }) }; define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, - gossip_sync, logger, scorer, should_break, { + channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await, + onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await }, + peer_manager, gossip_sync, logger, scorer, should_break, { let fut = Selector { - a: channel_manager.get_event_or_persistence_needed_future(), + a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; @@ -719,25 +740,6 @@ where ) } -#[cfg(feature = "futures")] -async fn process_onion_message_handler_events_async< - EventHandlerFuture: core::future::Future, - EventHandler: Fn(Event) -> EventHandlerFuture, - PM: 'static + Deref + Send + Sync, ->( - peer_manager: &PM, handler: EventHandler -) -where - PM::Target: APeerManager + Send + Sync, -{ - let events = core::cell::RefCell::new(Vec::new()); - peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); - - for event in events.into_inner() { - handler(event).await - } -} - #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level @@ -788,20 +790,16 @@ impl BackgroundProcessor { 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -809,21 +807,19 @@ impl BackgroundProcessor { SC: for <'b> WriteableScore<'b>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, ) -> Self where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); @@ -849,13 +845,12 @@ impl BackgroundProcessor { }; define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), - channel_manager, channel_manager.process_pending_events(&event_handler), - peer_manager, - peer_manager.onion_message_handler().process_pending_events(&event_handler), - gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), + channel_manager, channel_manager.get_cm().process_pending_events(&event_handler), + onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) }, + peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( - channel_manager.get_event_or_persistence_needed_future(), - chain_monitor.get_update_future() + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, || { @@ -918,24 +913,28 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { + use bitcoin::{Amount, ScriptBuf, Txid}; use bitcoin::blockdata::constants::{genesis_block, ChainHash}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; - use bitcoin::network::constants::Network; + use bitcoin::hashes::Hash; + use bitcoin::network::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; - use lightning::chain::{BestBlock, Confirm, chainmonitor}; + use bitcoin::transaction::Version; + use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; - use lightning::sign::{InMemorySigner, KeysManager}; + use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource}; use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::{get_event_msg, get_event}; - use lightning::ln::{PaymentHash, ChannelId}; + use lightning::ln::types::{PaymentHash, ChannelId}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; use lightning::ln::features::{ChannelFeatures, NodeFeatures}; use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; + use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop}; @@ -946,6 +945,7 @@ mod tests { CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY}; + use lightning::util::sweep::{OutputSweeper, OutputSpendStatus}; use lightning_persister::fs_store::FilesystemStore; use std::collections::VecDeque; use std::{fs, env}; @@ -984,6 +984,7 @@ mod tests { Arc>>, Arc, + Arc, Arc>, (), TestScorer> @@ -995,11 +996,14 @@ mod tests { type PGS = Arc>>, Arc, Arc>>; type RGS = Arc>>, Arc>>; + type OM = OnionMessenger, Arc, Arc, Arc, Arc>>, Arc, Arc>>, IgnoringMessageHandler, IgnoringMessageHandler, IgnoringMessageHandler>; + struct Node { node: Arc, + messenger: Arc, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>, + peer_manager: Arc, Arc, Arc, Arc, IgnoringMessageHandler, Arc>>, chain_monitor: Arc, kv_store: Arc, tx_broadcaster: Arc, @@ -1007,6 +1011,9 @@ mod tests { logger: Arc, best_block: BestBlock, scorer: Arc>, + sweeper: Arc, Arc, + Arc, Arc, Arc, + Arc, Arc>>, } impl Node { @@ -1245,6 +1252,14 @@ mod tests { } } + struct TestWallet {} + + impl ChangeDestinationSource for TestWallet { + fn get_change_destination_script(&self) -> Result { + Ok(ScriptBuf::new()) + } + } + fn get_full_filepath(filepath: String, filename: String) -> String { let mut path = PathBuf::from(filepath); path.push(filename); @@ -1263,8 +1278,11 @@ mod tests { let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); + let now = Duration::from_secs(genesis_block.header.time as u64); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default())); + let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager))); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); @@ -1273,15 +1291,19 @@ mod tests { let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time)); + let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, IgnoringMessageHandler {}, IgnoringMessageHandler {})); + let wallet = Arc::new(TestWallet {}); + let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), + None::>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger))); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone())); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), - onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{} + onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{} }; let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone())); - let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer }; + let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger }; nodes.push(node); } @@ -1329,8 +1351,8 @@ mod tests { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); - let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut { - value: channel_value_satoshis, script_pubkey: output_script.clone(), + let tx = Transaction { version: Version::ONE, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut { + value: Amount::from_sat(channel_value_satoshis), script_pubkey: output_script.clone(), }]}; (temporary_channel_id, tx) }, @@ -1341,8 +1363,8 @@ mod tests { fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) { for i in 1..=depth { - let prev_blockhash = node.best_block.block_hash(); - let height = node.best_block.height() + 1; + let prev_blockhash = node.best_block.block_hash; + let height = node.best_block.height + 1; let header = create_dummy_header(prev_blockhash, height); let txdata = vec![(0, tx)]; node.best_block = BestBlock::new(header.block_hash(), height); @@ -1350,15 +1372,40 @@ mod tests { 1 => { node.node.transactions_confirmed(&header, &txdata, height); node.chain_monitor.transactions_confirmed(&header, &txdata, height); + node.sweeper.transactions_confirmed(&header, &txdata, height); }, x if x == depth => { + // We need the TestBroadcaster to know about the new height so that it doesn't think + // we're violating the time lock requirements of transactions broadcasted at that + // point. + node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height)); node.node.best_block_updated(&header, height); node.chain_monitor.best_block_updated(&header, height); + node.sweeper.best_block_updated(&header, height); }, _ => {}, } } } + + fn advance_chain(node: &mut Node, num_blocks: u32) { + for i in 1..=num_blocks { + let prev_blockhash = node.best_block.block_hash; + let height = node.best_block.height + 1; + let header = create_dummy_header(prev_blockhash, height); + node.best_block = BestBlock::new(header.block_hash(), height); + if i == num_blocks { + // We need the TestBroadcaster to know about the new height so that it doesn't think + // we're violating the time lock requirements of transactions broadcasted at that + // point. + node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height)); + node.node.best_block_updated(&header, height); + node.chain_monitor.best_block_updated(&header, height); + node.sweeper.best_block_updated(&header, height); + } + } + } + fn confirm_transaction(node: &mut Node, tx: &Transaction) { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } @@ -1379,7 +1426,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); macro_rules! check_persisted_data { ($node: expr, $filepath: expr) => { @@ -1414,7 +1461,8 @@ mod tests { } // Force-close the channel. - nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id()).unwrap(); + let error_message = "Channel force-closed"; + nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap(); // Check that the force-close updates are persisted. check_persisted_data!(nodes[0].node, filepath.clone()); @@ -1446,7 +1494,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); @@ -1475,7 +1523,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -1496,7 +1544,7 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let bp_future = super::process_events_async( - persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { Box::pin(async move { @@ -1521,7 +1569,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting network graph"), @@ -1539,7 +1587,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting scorer"), @@ -1567,7 +1615,7 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), }; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); @@ -1590,6 +1638,9 @@ mod tests { let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id()); nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding); let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id()); + let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + assert_eq!(broadcast_funding.txid(), funding_tx.txid()); + assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); if !std::thread::panicking() { bg_processor.stop().unwrap(); @@ -1604,10 +1655,11 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), }; let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Force close the channel and check that the SpendableOutputs event was handled. - nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap(); + let error_message = "Channel force-closed"; + nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap(); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); @@ -1615,10 +1667,95 @@ mod tests { .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) .expect("Events not handled within deadline"); match event { - Event::SpendableOutputs { .. } => {}, + Event::SpendableOutputs { outputs, channel_id } => { + nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap(); + }, _ => panic!("Unexpected event: {:?}", event), } + // Check we don't generate an initial sweeping tx until we reach the required height. + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() { + assert!(!tracked_output.is_spent_in(&sweep_tx_0)); + match tracked_output.status { + OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => { + assert_eq!(delayed_until_height, Some(153)); + } + _ => panic!("Unexpected status"), + } + } + + advance_chain(&mut nodes[0], 3); + + // Check we generate an initial sweeping tx. + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we regenerate and rebroadcast the sweeping tx each block. + advance_chain(&mut nodes[0], 1); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + assert_ne!(sweep_tx_0, sweep_tx_1); + + advance_chain(&mut nodes[0], 1); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + assert_ne!(sweep_tx_0, sweep_tx_2); + assert_ne!(sweep_tx_1, sweep_tx_2); + + // Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations. + confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + match tracked_output.status { + OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we still see the transaction as confirmed if we unconfirm any untracked + // transaction. (We previously had a bug that would mark tracked transactions as + // unconfirmed if any transaction at an unknown block height would be unconfirmed.) + let unconf_txid = Txid::from_slice(&[0; 32]).unwrap(); + nodes[0].sweeper.transaction_unconfirmed(&unconf_txid); + + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + match tracked_output.status { + OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we stop tracking the spendable outputs when one of the txs reaches + // ANTI_REORG_DELAY confirmations. + confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0); + if !std::thread::panicking() { bg_processor.stop().unwrap(); } @@ -1630,7 +1767,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); @@ -1703,7 +1840,7 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let event_handler = |_: _| {}; - let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)), @@ -1723,7 +1860,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); @@ -1861,7 +1998,7 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))); @@ -1898,7 +2035,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone();