let mut score = scorer.lock();
match event {
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
- let path = path.iter().collect::<Vec<_>>();
- score.payment_path_failed(&path, *scid);
+ score.payment_path_failed(path, *scid);
},
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
// Reached if the destination explicitly failed it back. We treat this as a successful probe
// because the payment made it all the way to the destination with sufficient liquidity.
- let path = path.iter().collect::<Vec<_>>();
- score.probe_successful(&path);
+ score.probe_successful(path);
},
Event::PaymentPathSuccessful { path, .. } => {
- let path = path.iter().collect::<Vec<_>>();
- score.payment_path_successful(&path);
+ score.payment_path_successful(path);
},
Event::ProbeSuccessful { path, .. } => {
- let path = path.iter().collect::<Vec<_>>();
- score.probe_successful(&path);
+ score.probe_successful(path);
},
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
- let path = path.iter().collect::<Vec<_>>();
- score.probe_failed(&path, *scid);
+ score.probe_failed(path, *scid);
},
_ => {},
}
// persistence.
$peer_manager.process_events();
+ // Exit the loop if the background processor was requested to stop.
+ if $loop_exit_check {
+ log_trace!($logger, "Terminating background processor.");
+ break;
+ }
+
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let mut await_start = None;
let updates_available = $await;
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
- if updates_available {
- log_trace!($logger, "Persisting ChannelManager...");
- $persister.persist_manager(&*$channel_manager)?;
- log_trace!($logger, "Done persisting ChannelManager.");
- }
// Exit the loop if the background processor was requested to stop.
if $loop_exit_check {
log_trace!($logger, "Terminating background processor.");
break;
}
+
+ if updates_available {
+ log_trace!($logger, "Persisting ChannelManager...");
+ $persister.persist_manager(&*$channel_manager)?;
+ log_trace!($logger, "Done persisting ChannelManager.");
+ }
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
- /// future which outputs true, the loop will exit and this function's future will complete.
+ /// future which outputs `true`, the loop will exit and this function's future will complete.
+ /// The `sleeper` future is free to return early after it has triggered the exit condition.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
///
/// mobile device, where we may need to check for interruption of the application regularly. If you
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
/// are hundreds or thousands of simultaneous process calls running.
+ ///
+ /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
+ /// could setup `process_events_async` like this:
+ /// ```
+ /// # struct MyPersister {}
+ /// # impl lightning::util::persist::KVStorePersister for MyPersister {
+ /// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
+ /// # }
+ /// # struct MyEventHandler {}
+ /// # impl MyEventHandler {
+ /// # async fn handle_event(&self, _: lightning::events::Event) {}
+ /// # }
+ /// # #[derive(Eq, PartialEq, Clone, Hash)]
+ /// # struct MySocketDescriptor {}
+ /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
+ /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
+ /// # fn disconnect_socket(&mut self) {}
+ /// # }
+ /// # use std::sync::{Arc, Mutex};
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
+ /// # use lightning_background_processor::{process_events_async, GossipSync};
+ /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
+ /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
+ /// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
+ /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
+ /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
+ /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
+ /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
+ /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
+ /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
+ /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
+ /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
+ /// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
+ ///
+ /// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
+ /// let background_persister = Arc::clone(&my_persister);
+ /// let background_event_handler = Arc::clone(&my_event_handler);
+ /// let background_chain_mon = Arc::clone(&my_chain_monitor);
+ /// let background_chan_man = Arc::clone(&my_channel_manager);
+ /// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
+ /// let background_peer_man = Arc::clone(&my_peer_manager);
+ /// let background_logger = Arc::clone(&my_logger);
+ /// let background_scorer = Arc::clone(&my_scorer);
+ ///
+ /// // Setup the sleeper.
+ /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
+ ///
+ /// let sleeper = move |d| {
+ /// let mut receiver = stop_receiver.clone();
+ /// Box::pin(async move {
+ /// tokio::select!{
+ /// _ = tokio::time::sleep(d) => false,
+ /// _ = receiver.changed() => true,
+ /// }
+ /// })
+ /// };
+ ///
+ /// let mobile_interruptable_platform = false;
+ ///
+ /// let handle = tokio::spawn(async move {
+ /// process_events_async(
+ /// background_persister,
+ /// |e| background_event_handler.handle_event(e),
+ /// background_chain_mon,
+ /// background_chan_man,
+ /// background_gossip_sync,
+ /// background_peer_man,
+ /// background_logger,
+ /// Some(background_scorer),
+ /// sleeper,
+ /// mobile_interruptable_platform,
+ /// )
+ /// .await
+ /// .expect("Failed to process events");
+ /// });
+ ///
+ /// // Stop the background processing.
+ /// stop_sender.send(()).unwrap();
+ /// handle.await.unwrap();
+ /// # }
+ ///```
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
- use lightning::routing::router::{DefaultRouter, RouteHop};
+ use lightning::routing::router::{DefaultRouter, Path, RouteHop};
use lightning::routing::scoring::{ChannelUsage, Score};
use lightning::util::config::UserConfig;
use lightning::util::ser::Writeable;
#[derive(Debug)]
enum TestResult {
- PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
- PaymentSuccess { path: Vec<RouteHop> },
- ProbeFailure { path: Vec<RouteHop> },
- ProbeSuccess { path: Vec<RouteHop> },
+ PaymentFailure { path: Path, short_channel_id: u64 },
+ PaymentSuccess { path: Path },
+ ProbeFailure { path: Path },
+ ProbeSuccess { path: Path },
}
impl TestScorer {
&self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
) -> u64 { unimplemented!(); }
- fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
+ fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, short_channel_id } => {
- assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
+ assert_eq!(actual_path, &path);
assert_eq!(actual_short_channel_id, short_channel_id);
},
TestResult::PaymentSuccess { path } => {
}
}
- fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
+ fn payment_path_successful(&mut self, actual_path: &Path) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
panic!("Unexpected payment path failure: {:?}", path)
},
TestResult::PaymentSuccess { path } => {
- assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
+ assert_eq!(actual_path, &path);
},
TestResult::ProbeFailure { path } => {
panic!("Unexpected probe failure: {:?}", path)
}
}
- fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
+ fn probe_failed(&mut self, actual_path: &Path, _: u64) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
panic!("Unexpected payment path success: {:?}", path)
},
TestResult::ProbeFailure { path } => {
- assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
+ assert_eq!(actual_path, &path);
},
TestResult::ProbeSuccess { path } => {
panic!("Unexpected probe success: {:?}", path)
}
}
}
- fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
+ fn probe_successful(&mut self, actual_path: &Path) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
panic!("Unexpected probe failure: {:?}", path)
},
TestResult::ProbeSuccess { path } => {
- assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
+ assert_eq!(actual_path, &path);
}
}
}
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
- let path = vec![RouteHop {
+ let path = Path { hops: vec![RouteHop {
pubkey: node_1_id,
node_features: NodeFeatures::empty(),
short_channel_id: scored_scid,
channel_features: ChannelFeatures::empty(),
fee_msat: 0,
cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
- }];
+ }], blinded_tail: None };
$nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
$nodes[0].node.push_pending_event(Event::PaymentPathFailed {