X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=3736bd603e5bd65977bfc874919b5f9c61ee48bc;hb=12c3a24bebe67ba406609fdfe6747da9c1449756;hp=a12ec9c0f3b8b85a868e9f2e3fa05f0abc460b3d;hpb=f5e87d8441f2e873114fd3b310a04be0a2a9181f;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index a12ec9c0..3736bd60 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -24,16 +24,17 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] -use lightning::events::{EventHandler, EventsProvider}; -use lightning::ln::channelmanager::ChannelManager; +use lightning::events::EventHandler; +#[cfg(any(feature = "std", feature = "futures"))] +use lightning::events::EventsProvider; + +use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; -use lightning::routing::router::Router; use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; @@ -78,6 +79,8 @@ use alloc::vec::Vec; /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for /// unilateral chain closure fees are at risk. /// +/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager +/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor /// [`Event`]: lightning::events::Event /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred @@ -283,7 +286,7 @@ macro_rules! define_run_body { $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); @@ -333,14 +336,14 @@ macro_rules! define_run_body { break; } - if $channel_manager.get_and_clear_needs_persistence() { + if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { @@ -437,7 +440,7 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { @@ -536,45 +539,64 @@ use core::task; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; -/// # struct MyStore {} -/// # impl lightning::util::persist::KVStore for MyStore { +/// # struct Logger {} +/// # impl lightning::util::logger::Logger for Logger { +/// # fn log(&self, _record: lightning::util::logger::Record) {} +/// # } +/// # struct Store {} +/// # impl lightning::util::persist::KVStore for Store { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } -/// # struct MyEventHandler {} -/// # impl MyEventHandler { +/// # struct EventHandler {} +/// # impl EventHandler { /// # async fn handle_event(&self, _: lightning::events::Event) {} /// # } /// # #[derive(Eq, PartialEq, Clone, Hash)] -/// # struct MySocketDescriptor {} -/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { +/// # struct SocketDescriptor {} +/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor { /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; -/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; -/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; -/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; -/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; -/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; -/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; -/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; -/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; -/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = RwLock, Arc>>; -/// -/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { -/// let background_persister = Arc::clone(&my_persister); -/// let background_event_handler = Arc::clone(&my_event_handler); -/// let background_chain_mon = Arc::clone(&my_chain_monitor); -/// let background_chan_man = Arc::clone(&my_channel_manager); -/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync)); -/// let background_peer_man = Arc::clone(&my_peer_manager); -/// let background_logger = Arc::clone(&my_logger); -/// let background_scorer = Arc::clone(&my_scorer); +/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; +/// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; +/// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; +/// # type Scorer = RwLock, Arc>>; +/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger>; +/// # +/// # struct Node< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # > { +/// # peer_manager: Arc>, +/// # event_handler: Arc, +/// # channel_manager: Arc>, +/// # chain_monitor: Arc>, +/// # gossip_sync: Arc>, +/// # persister: Arc, +/// # logger: Arc, +/// # scorer: Arc, +/// # } +/// # +/// # async fn setup_background_processing< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # >(node: Node) { +/// let background_persister = Arc::clone(&node.persister); +/// let background_event_handler = Arc::clone(&node.event_handler); +/// let background_chain_mon = Arc::clone(&node.chain_monitor); +/// let background_chan_man = Arc::clone(&node.channel_manager); +/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync)); +/// let background_peer_man = Arc::clone(&node.peer_manager); +/// let background_logger = Arc::clone(&node.logger); +/// let background_scorer = Arc::clone(&node.scorer); /// /// // Setup the sleeper. /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); @@ -604,9 +626,9 @@ use core::task; /// sleeper, /// mobile_interruptable_platform, /// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) -/// ) -/// .await -/// .expect("Failed to process events"); +/// ) +/// .await +/// .expect("Failed to process events"); /// }); /// /// // Stop the background processing. @@ -619,21 +641,16 @@ pub async fn process_events_async< 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -650,16 +667,12 @@ pub async fn process_events_async< where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, PM::Target: APeerManager + Send + Sync, { let mut should_break = false; @@ -690,11 +703,11 @@ where define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await, peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, gossip_sync, logger, scorer, should_break, { let fut = Selector { - a: channel_manager.get_event_or_persistence_needed_future(), + a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; @@ -727,8 +740,6 @@ async fn process_onion_message_handler_events_async< where PM::Target: APeerManager + Send + Sync, { - use lightning::events::EventsProvider; - let events = core::cell::RefCell::new(Vec::new()); peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); @@ -787,20 +798,15 @@ impl BackgroundProcessor { 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -813,16 +819,12 @@ impl BackgroundProcessor { where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); @@ -848,13 +850,13 @@ impl BackgroundProcessor { }; define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), - channel_manager, channel_manager.process_pending_events(&event_handler), + channel_manager, channel_manager.get_cm().process_pending_events(&event_handler), peer_manager, peer_manager.onion_message_handler().process_pending_events(&event_handler), gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( - channel_manager.get_event_or_persistence_needed_future(), - chain_monitor.get_update_future() + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, || { @@ -928,7 +930,7 @@ mod tests { use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::{get_event_msg, get_event}; - use lightning::ln::PaymentHash; + use lightning::ln::{PaymentHash, ChannelId}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; use lightning::ln::features::{ChannelFeatures, NodeFeatures}; @@ -983,6 +985,7 @@ mod tests { Arc>>, Arc, + Arc, Arc>, (), TestScorer> @@ -1262,12 +1265,12 @@ mod tests { let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); + let now = Duration::from_secs(genesis_block.header.time as u64); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); - let now = Duration::from_secs(genesis_block.header.time as u64); - let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone())); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -1340,8 +1343,8 @@ mod tests { fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) { for i in 1..=depth { - let prev_blockhash = node.best_block.block_hash(); - let height = node.best_block.height() + 1; + let prev_blockhash = node.best_block.block_hash; + let height = node.best_block.height + 1; let header = create_dummy_header(prev_blockhash, height); let txdata = vec![(0, tx)]; node.best_block = BestBlock::new(header.block_hash(), height); @@ -1413,7 +1416,7 @@ mod tests { } // Force-close the channel. - nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap(); + nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id()).unwrap(); // Check that the force-close updates are persisted. check_persisted_data!(nodes[0].node, filepath.clone());