#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
-
#![deny(missing_docs)]
#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
-
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
-
#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
#[cfg(any(test, feature = "std"))]
#[cfg(not(feature = "std"))]
extern crate alloc;
-#[macro_use] extern crate lightning;
+#[macro_use]
+extern crate lightning;
extern crate lightning_rapid_gossip_sync;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::events::{Event, PathFailure};
#[cfg(feature = "std")]
use lightning::events::EventHandler;
#[cfg(feature = "std")]
use lightning::events::EventsProvider;
+use lightning::events::{Event, PathFailure};
use lightning::ln::channelmanager::AChannelManager;
use lightning::ln::msgs::OnionMessageHandler;
-use lightning::onion_message::messenger::AOnionMessenger;
use lightning::ln::peer_handler::APeerManager;
+use lightning::onion_message::messenger::AOnionMessenger;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-use lightning::routing::utxo::UtxoLookup;
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
+use lightning::routing::utxo::UtxoLookup;
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
#[cfg(feature = "std")]
use core::ops::Deref;
use core::time::Duration;
-#[cfg(feature = "std")]
-use std::sync::Arc;
#[cfg(feature = "std")]
use core::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "std")]
+use std::sync::Arc;
+#[cfg(feature = "std")]
use std::thread::{self, JoinHandle};
#[cfg(feature = "std")]
use std::time::Instant;
#[cfg(feature = "futures")]
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
-const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
+const fn min_u64(a: u64, b: u64) -> u64 {
+ if a < b {
+ a
+ } else {
+ b
+ }
+}
#[cfg(feature = "futures")]
-const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
- min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
+const FASTEST_TIMER: u64 = min_u64(
+ min_u64(FRESHNESS_TIMER, PING_TIMER),
+ min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
+);
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
G: Deref<Target = NetworkGraph<L>>,
U: Deref,
L: Deref,
->
-where U::Target: UtxoLookup, L::Target: Logger {
+> where
+ U::Target: UtxoLookup,
+ L::Target: Logger,
+{
/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
P2P(P),
/// Rapid gossip sync from a trusted server.
}
impl<
- P: Deref<Target = P2PGossipSync<G, U, L>>,
- R: Deref<Target = RapidGossipSync<G, L>>,
- G: Deref<Target = NetworkGraph<L>>,
- U: Deref,
- L: Deref,
-> GossipSync<P, R, G, U, L>
-where U::Target: UtxoLookup, L::Target: Logger {
+ P: Deref<Target = P2PGossipSync<G, U, L>>,
+ R: Deref<Target = RapidGossipSync<G, L>>,
+ G: Deref<Target = NetworkGraph<L>>,
+ U: Deref,
+ L: Deref,
+ > GossipSync<P, R, G, U, L>
+where
+ U::Target: UtxoLookup,
+ L::Target: Logger,
+{
fn network_graph(&self) -> Option<&G> {
match self {
GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
}
/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
-impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
- GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
+impl<
+ P: Deref<Target = P2PGossipSync<G, U, L>>,
+ G: Deref<Target = NetworkGraph<L>>,
+ U: Deref,
+ L: Deref,
+ > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
where
U::Target: UtxoLookup,
L::Target: Logger,
}
/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
-impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
+impl<
+ 'a,
+ R: Deref<Target = RapidGossipSync<G, L>>,
+ G: Deref<Target = NetworkGraph<L>>,
+ L: Deref,
+ >
GossipSync<
&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
R,
G,
&'a (dyn UtxoLookup + Send + Sync),
L,
- >
-where
+ > where
L::Target: Logger,
{
/// Initializes a new [`GossipSync::Rapid`] variant.
&'a NetworkGraph<L>,
&'a (dyn UtxoLookup + Send + Sync),
L,
- >
-where
+ > where
L::Target: Logger,
{
/// Initializes a new [`GossipSync::None`] variant.
}
}
-fn handle_network_graph_update<L: Deref>(
- network_graph: &NetworkGraph<L>, event: &Event
-) where L::Target: Logger {
+fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
+where
+ L::Target: Logger,
+{
if let Event::PaymentPathFailed {
- failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
+ failure: PathFailure::OnPath { network_update: Some(ref upd) },
+ ..
+ } = event
{
network_graph.handle_network_update(upd);
}
#[cfg(feature = "futures")]
pub(crate) mod futures_util {
use core::future::Future;
- use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
- use core::pin::Pin;
use core::marker::Unpin;
+ use core::pin::Pin;
+ use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
pub(crate) struct Selector<
- A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+ A: Future<Output = ()> + Unpin,
+ B: Future<Output = ()> + Unpin,
+ C: Future<Output = bool> + Unpin,
> {
pub a: A,
pub b: B,
pub c: C,
}
pub(crate) enum SelectorOutput {
- A, B, C(bool),
+ A,
+ B,
+ C(bool),
}
impl<
- A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
- > Future for Selector<A, B, C> {
+ A: Future<Output = ()> + Unpin,
+ B: Future<Output = ()> + Unpin,
+ C: Future<Output = bool> + Unpin,
+ > Future for Selector<A, B, C>
+ {
type Output = SelectorOutput;
- fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
+ fn poll(
+ mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
+ ) -> Poll<SelectorOutput> {
match Pin::new(&mut self.a).poll(ctx) {
- Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
+ Poll::Ready(()) => {
+ return Poll::Ready(SelectorOutput::A);
+ },
Poll::Pending => {},
}
match Pin::new(&mut self.b).poll(ctx) {
- Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
+ Poll::Ready(()) => {
+ return Poll::Ready(SelectorOutput::B);
+ },
Poll::Pending => {},
}
match Pin::new(&mut self.c).poll(ctx) {
- Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+ Poll::Ready(res) => {
+ return Poll::Ready(SelectorOutput::C(res));
+ },
Poll::Pending => {},
}
Poll::Pending
// If we want to poll a future without an async context to figure out if it has completed or
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
// but sadly there's a good bit of boilerplate here.
- fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
- fn dummy_waker_action(_: *const ()) { }
+ fn dummy_waker_clone(_: *const ()) -> RawWaker {
+ RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
+ }
+ fn dummy_waker_action(_: *const ()) {}
const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
- dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
- pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
+ dummy_waker_clone,
+ dummy_waker_action,
+ dummy_waker_action,
+ dummy_waker_action,
+ );
+ pub(crate) fn dummy_waker() -> Waker {
+ unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
+ }
}
#[cfg(feature = "futures")]
-use futures_util::{Selector, SelectorOutput, dummy_waker};
-#[cfg(feature = "futures")]
use core::task;
+#[cfg(feature = "futures")]
+use futures_util::{dummy_waker, Selector, SelectorOutput};
/// Processes background events in a future.
///
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+ M: 'static
+ + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+ + Send
+ + Sync,
CM: 'static + Deref + Send + Sync,
OM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
FetchTime: Fn() -> Option<Duration>,
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
- onion_messenger: Option<OM>,
- gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
- sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
+ onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
+ logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
+ fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
})
};
define_run_body!(
- persister, chain_monitor,
+ persister,
+ chain_monitor,
chain_monitor.process_pending_events_async(async_event_handler).await,
- channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
- onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await },
- peer_manager, gossip_sync, logger, scorer, should_break, {
+ channel_manager,
+ channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
+ onion_messenger,
+ if let Some(om) = &onion_messenger {
+ om.get_om().process_pending_events_async(async_event_handler).await
+ },
+ peer_manager,
+ gossip_sync,
+ logger,
+ scorer,
+ should_break,
+ {
let fut = Selector {
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
- c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
+ c: sleeper(if mobile_interruptable_platform {
+ Duration::from_millis(100)
+ } else {
+ Duration::from_secs(FASTEST_TIMER)
+ }),
};
match fut.await {
- SelectorOutput::A|SelectorOutput::B => {},
+ SelectorOutput::A | SelectorOutput::B => {},
SelectorOutput::C(exit) => {
should_break = exit;
- }
+ },
}
- }, |t| sleeper(Duration::from_secs(t)),
+ },
+ |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(fut).poll(&mut ctx) {
- task::Poll::Ready(exit) => { should_break = exit; true },
+ task::Poll::Ready(exit) => {
+ should_break = exit;
+ true
+ },
task::Poll::Pending => false,
}
- }, mobile_interruptable_platform, fetch_time,
+ },
+ mobile_interruptable_platform,
+ fetch_time,
)
}
P: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+ M: 'static
+ + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+ + Send
+ + Sync,
CM: 'static + Deref + Send + Sync,
OM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
PM: 'static + Deref + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
- SC: for <'b> WriteableScore<'b>,
+ SC: for<'b> WriteableScore<'b>,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
- onion_messenger: Option<OM>,
- gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+ onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
+ logger: L, scorer: Option<S>,
) -> Self
where
UL::Target: 'static + UtxoLookup,
}
if let Some(ref scorer) = scorer {
use std::time::SystemTime;
- let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+ let duration_since_epoch = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
.expect("Time should be sometime after 1970");
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
event_handler.handle_event(event);
};
define_run_body!(
- persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
- channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
- onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) },
- peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
- { Sleeper::from_two_futures(
- &channel_manager.get_cm().get_event_or_persistence_needed_future(),
- &chain_monitor.get_update_future()
- ).wait_timeout(Duration::from_millis(100)); },
- |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+ persister,
+ chain_monitor,
+ chain_monitor.process_pending_events(&event_handler),
+ channel_manager,
+ channel_manager.get_cm().process_pending_events(&event_handler),
+ onion_messenger,
+ if let Some(om) = &onion_messenger {
+ om.get_om().process_pending_events(&event_handler)
+ },
+ peer_manager,
+ gossip_sync,
+ logger,
+ scorer,
+ stop_thread.load(Ordering::Acquire),
+ {
+ Sleeper::from_two_futures(
+ &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+ &chain_monitor.get_update_future(),
+ )
+ .wait_timeout(Duration::from_millis(100));
+ },
+ |_| Instant::now(),
+ |time: &Instant, dur| time.elapsed().as_secs() > dur,
+ false,
|| {
use std::time::SystemTime;
- Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
- .expect("Time should be sometime after 1970"))
+ Some(
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Time should be sometime after 1970"),
+ )
},
)
});
#[cfg(all(feature = "std", test))]
mod tests {
- use bitcoin::{Amount, ScriptBuf, Txid};
+ use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
use bitcoin::blockdata::constants::{genesis_block, ChainHash};
use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::hashes::Hash;
use bitcoin::network::Network;
- use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
+ use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use bitcoin::transaction::Version;
- use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter};
+ use bitcoin::{Amount, ScriptBuf, Txid};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
- use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource};
use lightning::chain::transaction::OutPoint;
- use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
- use lightning::{get_event_msg, get_event};
- use lightning::ln::types::{PaymentHash, ChannelId};
+ use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
+ use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
use lightning::ln::channelmanager;
- use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
+ use lightning::ln::channelmanager::{
+ ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
+ };
use lightning::ln::features::{ChannelFeatures, NodeFeatures};
use lightning::ln::functional_test_utils::*;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
- use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
+ use lightning::ln::peer_handler::{
+ IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
+ };
+ use lightning::ln::types::{ChannelId, PaymentHash};
use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
- use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
- use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
+ use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
+ use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
+ use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
use lightning::util::config::UserConfig;
+ use lightning::util::persist::{
+ KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
+ SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
+ };
use lightning::util::ser::Writeable;
+ use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
use lightning::util::test_utils;
- use lightning::util::persist::{KVStore,
- CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
- SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
- use lightning::util::sweep::{OutputSweeper, OutputSpendStatus};
+ use lightning::{get_event, get_event_msg};
use lightning_persister::fs_store::FilesystemStore;
+ use lightning_rapid_gossip_sync::RapidGossipSync;
use std::collections::VecDeque;
- use std::{fs, env};
use std::path::PathBuf;
- use std::sync::{Arc, Mutex};
use std::sync::mpsc::SyncSender;
+ use std::sync::{Arc, Mutex};
use std::time::Duration;
- use lightning_rapid_gossip_sync::RapidGossipSync;
- use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
+ use std::{env, fs};
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
#[derive(Clone, Hash, PartialEq, Eq)]
- struct TestDescriptor{}
+ struct TestDescriptor {}
impl SocketDescriptor for TestDescriptor {
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
0
#[cfg(not(c_bindings))]
type LockingWrapper<T> = Mutex<T>;
- type ChannelManager =
- channelmanager::ChannelManager<
- Arc<ChainMonitor>,
- Arc<test_utils::TestBroadcaster>,
- Arc<KeysManager>,
- Arc<KeysManager>,
- Arc<KeysManager>,
- Arc<test_utils::TestFeeEstimator>,
- Arc<DefaultRouter<
+ type ChannelManager = channelmanager::ChannelManager<
+ Arc<ChainMonitor>,
+ Arc<test_utils::TestBroadcaster>,
+ Arc<KeysManager>,
+ Arc<KeysManager>,
+ Arc<KeysManager>,
+ Arc<test_utils::TestFeeEstimator>,
+ Arc<
+ DefaultRouter<
Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
Arc<test_utils::TestLogger>,
Arc<KeysManager>,
Arc<LockingWrapper<TestScorer>>,
(),
- TestScorer>
+ TestScorer,
>,
- Arc<test_utils::TestLogger>>;
-
- type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
-
- type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
- type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
-
- type OM = OnionMessenger<Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestLogger>, Arc<ChannelManager>, Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<KeysManager>>>, IgnoringMessageHandler, Arc<ChannelManager>, IgnoringMessageHandler>;
+ >,
+ Arc<test_utils::TestLogger>,
+ >;
+
+ type ChainMonitor = chainmonitor::ChainMonitor<
+ InMemorySigner,
+ Arc<test_utils::TestChainSource>,
+ Arc<test_utils::TestBroadcaster>,
+ Arc<test_utils::TestFeeEstimator>,
+ Arc<test_utils::TestLogger>,
+ Arc<FilesystemStore>,
+ >;
+
+ type PGS = Arc<
+ P2PGossipSync<
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestChainSource>,
+ Arc<test_utils::TestLogger>,
+ >,
+ >;
+ type RGS = Arc<
+ RapidGossipSync<
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestLogger>,
+ >,
+ >;
+
+ type OM = OnionMessenger<
+ Arc<KeysManager>,
+ Arc<KeysManager>,
+ Arc<test_utils::TestLogger>,
+ Arc<ChannelManager>,
+ Arc<
+ DefaultMessageRouter<
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestLogger>,
+ Arc<KeysManager>,
+ >,
+ >,
+ IgnoringMessageHandler,
+ Arc<ChannelManager>,
+ IgnoringMessageHandler,
+ >;
struct Node {
node: Arc<ChannelManager>,
messenger: Arc<OM>,
p2p_gossip_sync: PGS,
rapid_gossip_sync: RGS,
- peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<OM>, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
+ peer_manager: Arc<
+ PeerManager<
+ TestDescriptor,
+ Arc<test_utils::TestChannelMessageHandler>,
+ Arc<test_utils::TestRoutingMessageHandler>,
+ Arc<OM>,
+ Arc<test_utils::TestLogger>,
+ IgnoringMessageHandler,
+ Arc<KeysManager>,
+ >,
+ >,
chain_monitor: Arc<ChainMonitor>,
kv_store: Arc<FilesystemStore>,
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
logger: Arc<test_utils::TestLogger>,
best_block: BestBlock,
scorer: Arc<LockingWrapper<TestScorer>>,
- sweeper: Arc<OutputSweeper<Arc<test_utils::TestBroadcaster>, Arc<TestWallet>,
- Arc<test_utils::TestFeeEstimator>, Arc<dyn Filter + Sync + Send>, Arc<FilesystemStore>,
- Arc<test_utils::TestLogger>, Arc<KeysManager>>>,
+ sweeper: Arc<
+ OutputSweeper<
+ Arc<test_utils::TestBroadcaster>,
+ Arc<TestWallet>,
+ Arc<test_utils::TestFeeEstimator>,
+ Arc<dyn Filter + Sync + Send>,
+ Arc<FilesystemStore>,
+ Arc<test_utils::TestLogger>,
+ Arc<KeysManager>,
+ >,
+ >,
}
impl Node {
- fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+ fn p2p_gossip_sync(
+ &self,
+ ) -> GossipSync<
+ PGS,
+ RGS,
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestChainSource>,
+ Arc<test_utils::TestLogger>,
+ > {
GossipSync::P2P(self.p2p_gossip_sync.clone())
}
- fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+ fn rapid_gossip_sync(
+ &self,
+ ) -> GossipSync<
+ PGS,
+ RGS,
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestChainSource>,
+ Arc<test_utils::TestLogger>,
+ > {
GossipSync::Rapid(self.rapid_gossip_sync.clone())
}
- fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+ fn no_gossip_sync(
+ &self,
+ ) -> GossipSync<
+ PGS,
+ RGS,
+ Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+ Arc<test_utils::TestChainSource>,
+ Arc<test_utils::TestLogger>,
+ > {
GossipSync::None
}
}
fn drop(&mut self) {
let data_dir = self.kv_store.get_data_dir();
match fs::remove_dir_all(data_dir.clone()) {
- Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
- _ => {}
+ Err(e) => {
+ println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
+ },
+ _ => {},
}
}
}
impl Persister {
fn new(data_dir: PathBuf) -> Self {
let kv_store = FilesystemStore::new(data_dir);
- Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
+ Self {
+ graph_error: None,
+ graph_persistence_notifier: None,
+ manager_error: None,
+ scorer_error: None,
+ kv_store,
+ }
}
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
}
impl KVStore for Persister {
- fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
+ fn read(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+ ) -> lightning::io::Result<Vec<u8>> {
self.kv_store.read(primary_namespace, secondary_namespace, key)
}
- fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
- if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
- secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
- key == CHANNEL_MANAGER_PERSISTENCE_KEY
+ fn write(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+ ) -> lightning::io::Result<()> {
+ if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
+ && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
+ && key == CHANNEL_MANAGER_PERSISTENCE_KEY
{
if let Some((error, message)) = self.manager_error {
- return Err(std::io::Error::new(error, message))
+ return Err(std::io::Error::new(error, message));
}
}
- if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
- secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
- key == NETWORK_GRAPH_PERSISTENCE_KEY
+ if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
+ && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
+ && key == NETWORK_GRAPH_PERSISTENCE_KEY
{
if let Some(sender) = &self.graph_persistence_notifier {
match sender.send(()) {
Ok(()) => {},
- Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
+ Err(std::sync::mpsc::SendError(())) => {
+ println!("Persister failed to notify as receiver went away.")
+ },
}
};
if let Some((error, message)) = self.graph_error {
- return Err(std::io::Error::new(error, message))
+ return Err(std::io::Error::new(error, message));
}
}
- if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
- secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
- key == SCORER_PERSISTENCE_KEY
+ if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
+ && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
+ && key == SCORER_PERSISTENCE_KEY
{
if let Some((error, message)) = self.scorer_error {
- return Err(std::io::Error::new(error, message))
+ return Err(std::io::Error::new(error, message));
}
}
self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
}
- fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
+ fn remove(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
+ ) -> lightning::io::Result<()> {
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
}
- fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
+ fn list(
+ &self, primary_namespace: &str, secondary_namespace: &str,
+ ) -> lightning::io::Result<Vec<String>> {
self.kv_store.list(primary_namespace, secondary_namespace)
}
}
}
impl lightning::util::ser::Writeable for TestScorer {
- fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
+ fn write<W: lightning::util::ser::Writer>(
+ &self, _: &mut W,
+ ) -> Result<(), lightning::io::Error> {
+ Ok(())
+ }
}
impl ScoreLookUp for TestScorer {
type ScoreParams = ();
fn channel_penalty_msat(
- &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
- ) -> u64 { unimplemented!(); }
+ &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
+ _score_params: &Self::ScoreParams,
+ ) -> u64 {
+ unimplemented!();
+ }
}
impl ScoreUpdate for TestScorer {
- fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
+ fn payment_path_failed(
+ &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
+ ) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, short_channel_id } => {
},
TestResult::ProbeSuccess { path } => {
panic!("Unexpected probe success: {:?}", path)
- }
+ },
}
}
}
},
TestResult::ProbeSuccess { path } => {
panic!("Unexpected probe success: {:?}", path)
- }
+ },
}
}
}
},
TestResult::ProbeSuccess { path } => {
panic!("Unexpected probe success: {:?}", path)
- }
+ },
}
}
}
},
TestResult::ProbeSuccess { path } => {
assert_eq!(actual_path, &path);
- }
+ },
}
}
}
let mut nodes = Vec::new();
for i in 0..num_nodes {
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
- let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
+ let fee_estimator =
+ Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let genesis_block = genesis_block(network);
let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
let now = Duration::from_secs(genesis_block.header.time as u64);
let seed = [i as u8; 32];
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
- let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
- let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager)));
+ let router = Arc::new(DefaultRouter::new(
+ network_graph.clone(),
+ logger.clone(),
+ Arc::clone(&keys_manager),
+ scorer.clone(),
+ Default::default(),
+ ));
+ let msg_router = Arc::new(DefaultMessageRouter::new(
+ network_graph.clone(),
+ Arc::clone(&keys_manager),
+ ));
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
- let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
+ let kv_store =
+ Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
let now = Duration::from_secs(genesis_block.header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
- let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
+ let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
+ Some(chain_source.clone()),
+ tx_broadcaster.clone(),
+ logger.clone(),
+ fee_estimator.clone(),
+ kv_store.clone(),
+ ));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
- let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
- let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, manager.clone(), IgnoringMessageHandler {}));
+ let manager = Arc::new(ChannelManager::new(
+ fee_estimator.clone(),
+ chain_monitor.clone(),
+ tx_broadcaster.clone(),
+ router.clone(),
+ logger.clone(),
+ keys_manager.clone(),
+ keys_manager.clone(),
+ keys_manager.clone(),
+ UserConfig::default(),
+ params,
+ genesis_block.header.time,
+ ));
+ let messenger = Arc::new(OnionMessenger::new(
+ keys_manager.clone(),
+ keys_manager.clone(),
+ logger.clone(),
+ manager.clone(),
+ msg_router.clone(),
+ IgnoringMessageHandler {},
+ manager.clone(),
+ IgnoringMessageHandler {},
+ ));
let wallet = Arc::new(TestWallet {});
- let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
- None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
- let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
- let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
+ let sweeper = Arc::new(OutputSweeper::new(
+ best_block,
+ Arc::clone(&tx_broadcaster),
+ Arc::clone(&fee_estimator),
+ None::<Arc<dyn Filter + Sync + Send>>,
+ Arc::clone(&keys_manager),
+ wallet,
+ Arc::clone(&kv_store),
+ Arc::clone(&logger),
+ ));
+ let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
+ network_graph.clone(),
+ Some(chain_source.clone()),
+ logger.clone(),
+ ));
+ let rapid_gossip_sync =
+ Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
let msg_handler = MessageHandler {
- chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
+ chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
+ ChainHash::using_genesis_block(Network::Testnet),
+ )),
route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
- onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{}
+ onion_message_handler: messenger.clone(),
+ custom_message_handler: IgnoringMessageHandler {},
+ };
+ let peer_manager = Arc::new(PeerManager::new(
+ msg_handler,
+ 0,
+ &seed,
+ logger.clone(),
+ keys_manager.clone(),
+ ));
+ let node = Node {
+ node: manager,
+ p2p_gossip_sync,
+ rapid_gossip_sync,
+ peer_manager,
+ chain_monitor,
+ kv_store,
+ tx_broadcaster,
+ network_graph,
+ logger,
+ best_block,
+ scorer,
+ sweeper,
+ messenger,
};
- let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
- let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger };
nodes.push(node);
}
for i in 0..num_nodes {
- for j in (i+1)..num_nodes {
+ for j in (i + 1)..num_nodes {
let init_i = Init {
- features: nodes[j].node.init_features(), networks: None, remote_network_address: None
+ features: nodes[j].node.init_features(),
+ networks: None,
+ remote_network_address: None,
};
- nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &init_i, true).unwrap();
+ nodes[i]
+ .node
+ .peer_connected(&nodes[j].node.get_our_node_id(), &init_i, true)
+ .unwrap();
let init_j = Init {
- features: nodes[i].node.init_features(), networks: None, remote_network_address: None
+ features: nodes[i].node.init_features(),
+ networks: None,
+ remote_network_address: None,
};
- nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &init_j, false).unwrap();
+ nodes[j]
+ .node
+ .peer_connected(&nodes[i].node.get_our_node_id(), &init_j, false)
+ .unwrap();
}
}
begin_open_channel!($node_a, $node_b, $channel_value);
let events = $node_a.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
- let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
- $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
- let msg_a = get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id());
+ let (temporary_channel_id, tx) =
+ handle_funding_generation_ready!(events[0], $channel_value);
+ $node_a
+ .node
+ .funding_transaction_generated(
+ &temporary_channel_id,
+ &$node_b.node.get_our_node_id(),
+ tx.clone(),
+ )
+ .unwrap();
+ let msg_a = get_event_msg!(
+ $node_a,
+ MessageSendEvent::SendFundingCreated,
+ $node_b.node.get_our_node_id()
+ );
$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &msg_a);
get_event!($node_b, Event::ChannelPending);
- let msg_b = get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id());
+ let msg_b = get_event_msg!(
+ $node_b,
+ MessageSendEvent::SendFundingSigned,
+ $node_a.node.get_our_node_id()
+ );
$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &msg_b);
get_event!($node_a, Event::ChannelPending);
tx
- }}
+ }};
}
macro_rules! begin_open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
- $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
- let msg_a = get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id());
+ $node_a
+ .node
+ .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
+ .unwrap();
+ let msg_a = get_event_msg!(
+ $node_a,
+ MessageSendEvent::SendOpenChannel,
+ $node_b.node.get_our_node_id()
+ );
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &msg_a);
- let msg_b = get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id());
+ let msg_b = get_event_msg!(
+ $node_b,
+ MessageSendEvent::SendAcceptChannel,
+ $node_a.node.get_our_node_id()
+ );
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &msg_b);
- }}
+ }};
}
macro_rules! handle_funding_generation_ready {
($event: expr, $channel_value: expr) => {{
match $event {
- Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
+ Event::FundingGenerationReady {
+ temporary_channel_id,
+ channel_value_satoshis,
+ ref output_script,
+ user_channel_id,
+ ..
+ } => {
assert_eq!(channel_value_satoshis, $channel_value);
assert_eq!(user_channel_id, 42);
- let tx = Transaction { version: Version::ONE, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
- value: Amount::from_sat(channel_value_satoshis), script_pubkey: output_script.clone(),
- }]};
+ let tx = Transaction {
+ version: Version::ONE,
+ lock_time: LockTime::ZERO,
+ input: Vec::new(),
+ output: vec![TxOut {
+ value: Amount::from_sat(channel_value_satoshis),
+ script_pubkey: output_script.clone(),
+ }],
+ };
(temporary_channel_id, tx)
},
_ => panic!("Unexpected event"),
}
- }}
+ }};
}
fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].p2p_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
macro_rules! check_persisted_data {
($node: expr, $filepath: expr) => {
loop {
expected_bytes.clear();
match $node.write(&mut expected_bytes) {
- Ok(()) => {
- match std::fs::read($filepath) {
- Ok(bytes) => {
- if bytes == expected_bytes {
- break
- } else {
- continue
- }
- },
- Err(_) => continue
- }
+ Ok(()) => match std::fs::read($filepath) {
+ Ok(bytes) => {
+ if bytes == expected_bytes {
+ break;
+ } else {
+ continue;
+ }
+ },
+ Err(_) => continue,
},
- Err(e) => panic!("Unexpected error: {}", e)
+ Err(e) => panic!("Unexpected error: {}", e),
}
}
- }
+ };
}
// Check that the initial channel manager data is persisted as expected.
- let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
+ let filepath =
+ get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
check_persisted_data!(nodes[0].node, filepath.clone());
loop {
- if !nodes[0].node.get_event_or_persist_condvar_value() { break }
+ if !nodes[0].node.get_event_or_persist_condvar_value() {
+ break;
+ }
}
// Force-close the channel.
let error_message = "Channel force-closed";
- nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap();
+ nodes[0]
+ .node
+ .force_close_broadcasting_latest_txn(
+ &ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }),
+ &nodes[1].node.get_our_node_id(),
+ error_message.to_string(),
+ )
+ .unwrap();
// Check that the force-close updates are persisted.
check_persisted_data!(nodes[0].node, filepath.clone());
loop {
- if !nodes[0].node.get_event_or_persist_condvar_value() { break }
+ if !nodes[0].node.get_event_or_persist_condvar_value() {
+ break;
+ }
}
// Check network graph is persisted
- let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
+ let filepath =
+ get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
check_persisted_data!(nodes[0].network_graph, filepath.clone());
// Check scorer is persisted
- let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
+ let filepath =
+ get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
check_persisted_data!(nodes[0].scorer, filepath.clone());
if !std::thread::panicking() {
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
- if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
- log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
- log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
- log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
- break
+ if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
+ && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
+ && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
+ && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
+ {
+ break;
}
}
open_channel!(nodes[0], nodes[1], 100000);
let data_dir = nodes[0].kv_store.get_data_dir();
- let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
+ let persister = Arc::new(
+ Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
+ );
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"),
Err(e) => {
open_channel!(nodes[0], nodes[1], 100000);
let data_dir = nodes[0].kv_store.get_data_dir();
- let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
+ let persister = Arc::new(
+ Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
+ );
let bp_future = super::process_events_async(
- persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
- nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
- Some(nodes[0].scorer.clone()), move |dur: Duration| {
+ persister,
+ |_: _| async {},
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].rapid_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ move |dur: Duration| {
Box::pin(async move {
tokio::time::sleep(dur).await;
false // Never exit
})
- }, false, || Some(Duration::ZERO),
+ },
+ false,
+ || Some(Duration::ZERO),
);
match bp_future.await {
Ok(_) => panic!("Expected error persisting manager"),
// Test that if we encounter an error during network graph persistence, an error gets returned.
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
let data_dir = nodes[0].kv_store.get_data_dir();
- let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
+ let persister =
+ Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].p2p_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting network graph"),
// Test that if we encounter an error during scorer persistence, an error gets returned.
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
let data_dir = nodes[0].kv_store.get_data_dir();
- let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
+ let persister =
+ Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting scorer"),
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: Event| match event {
- Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
+ Event::FundingGenerationReady { .. } => funding_generation_send
+ .send(handle_funding_generation_ready!(event, channel_value))
+ .unwrap(),
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
Event::ChannelReady { .. } => {},
_ => panic!("Unexpected event: {:?}", event),
};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
let (temporary_channel_id, funding_tx) = funding_generation_recv
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("FundingGenerationReady not handled within deadline");
- nodes[0].node.funding_transaction_generated(&temporary_channel_id, &node_1_id, funding_tx.clone()).unwrap();
+ nodes[0]
+ .node
+ .funding_transaction_generated(&temporary_channel_id, &node_1_id, funding_tx.clone())
+ .unwrap();
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(&node_0_id, &msg_0);
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(&node_1_id, &msg_1);
- let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+ let _ = channel_pending_recv
+ .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("ChannelPending not handled within deadline");
// Confirm the funding transaction.
confirm_transaction(&mut nodes[1], &funding_tx);
let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
nodes[0].node.handle_channel_ready(&node_1_id, &bs_funding);
- let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
+ let _as_channel_update =
+ get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
nodes[1].node.handle_channel_ready(&node_0_id, &as_funding);
- let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
- let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+ let _bs_channel_update =
+ get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
+ let broadcast_funding =
+ nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
assert_eq!(broadcast_funding.txid(), funding_tx.txid());
assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
_ => panic!("Unexpected event: {:?}", event),
};
let persister = Arc::new(Persister::new(data_dir));
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
// Force close the channel and check that the SpendableOutputs event was handled.
let error_message = "Channel force-closed";
- nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &node_1_id, error_message.to_string()).unwrap();
+ nodes[0]
+ .node
+ .force_close_broadcasting_latest_txn(
+ &nodes[0].node.list_channels()[0].channel_id,
+ &node_1_id,
+ error_message.to_string(),
+ )
+ .unwrap();
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
.expect("Events not handled within deadline");
match event {
Event::SpendableOutputs { outputs, channel_id } => {
- nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap();
+ nodes[0]
+ .sweeper
+ .track_spendable_outputs(outputs, channel_id, false, Some(153))
+ .unwrap();
},
_ => panic!("Unexpected event: {:?}", event),
}
match tracked_output.status {
OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
assert_eq!(delayed_until_height, Some(153));
- }
+ },
_ => panic!("Unexpected status"),
}
}
match tracked_output.status {
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid());
- }
+ },
_ => panic!("Unexpected status"),
}
match tracked_output.status {
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid());
- }
+ },
_ => panic!("Unexpected status"),
}
assert_ne!(sweep_tx_0, sweep_tx_1);
match tracked_output.status {
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
- }
+ },
_ => panic!("Unexpected status"),
}
assert_ne!(sweep_tx_0, sweep_tx_2);
match tracked_output.status {
OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
- }
+ },
_ => panic!("Unexpected status"),
}
match tracked_output.status {
OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
- }
+ },
_ => panic!("Unexpected status"),
}
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: _| {};
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let expected_log = "Calling time_passed and persisting scorer".to_string();
if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
- break
+ break;
}
}
macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
($nodes: expr, $receive: expr, $sleep: expr) => {
let features = ChannelFeatures::empty();
- $nodes[0].network_graph.add_channel_from_partial_announcement(
- 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
- ).expect("Failed to update channel from partial announcement");
+ $nodes[0]
+ .network_graph
+ .add_channel_from_partial_announcement(
+ 42,
+ 53,
+ features,
+ $nodes[0].node.get_our_node_id(),
+ $nodes[1].node.get_our_node_id(),
+ )
+ .expect("Failed to update channel from partial announcement");
let original_graph_description = $nodes[0].network_graph.to_string();
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
$sleep;
let log_entries = $nodes[0].logger.lines.lock().unwrap();
let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
- if *log_entries.get(&("lightning_background_processor", loop_counter))
- .unwrap_or(&0) > 1
+ if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
+ > 1
{
// Wait until the loop has gone around at least twice.
- break
+ break;
}
}
let initialization_input = vec![
- 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
- 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
- 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
- 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
- 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
- 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
- 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
- 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
- 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
- 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
- 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
- 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
- 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
+ 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
+ 247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
+ 98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
+ 250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
+ 67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
+ 115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
+ 1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
+ 65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
+ 149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
+ 175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
+ 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
+ 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
+ 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
+ 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
];
- $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
+ $nodes[0]
+ .rapid_gossip_sync
+ .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
+ .unwrap();
// this should have added two channels and pruned the previous one.
assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
// all channels should now be pruned
assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
- }
+ };
}
#[test]
fn test_not_pruning_network_graph_until_graph_sync_completion() {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
+ let (_, nodes) =
+ create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
let event_handler = |_: _| {};
- let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let background_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].rapid_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
- do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
+ do_test_not_pruning_network_graph_until_graph_sync_completion!(
+ nodes,
receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
- std::thread::sleep(Duration::from_millis(1)));
+ std::thread::sleep(Duration::from_millis(1))
+ );
background_processor.stop().unwrap();
}
async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
+ let (_, nodes) =
+ create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = super::process_events_async(
- persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
- nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
- Some(nodes[0].scorer.clone()), move |dur: Duration| {
+ persister,
+ |_: _| async {},
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].rapid_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ move |dur: Duration| {
let mut exit_receiver = exit_receiver.clone();
Box::pin(async move {
tokio::select! {
_ = exit_receiver.changed() => true,
}
})
- }, false, || Some(Duration::from_secs(1696300000)),
+ },
+ false,
+ || Some(Duration::from_secs(1696300000)),
);
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
- do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
- let mut i = 0;
- loop {
- tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
- if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
- assert!(i < 5);
- i += 1;
- }
- }, tokio::time::sleep(Duration::from_millis(1)).await);
+ do_test_not_pruning_network_graph_until_graph_sync_completion!(
+ nodes,
+ {
+ let mut i = 0;
+ loop {
+ tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER))
+ .await;
+ if let Ok(()) = receiver.try_recv() {
+ break Ok::<(), ()>(());
+ }
+ assert!(i < 5);
+ i += 1;
+ }
+ },
+ tokio::time::sleep(Duration::from_millis(1)).await
+ );
exit_sender.send(()).unwrap();
});
let (r1, r2) = tokio::join!(t1, t2);
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
- do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
+ do_test_payment_path_scoring!(
+ nodes,
+ receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+ );
if !std::thread::panicking() {
bg_processor.stop().unwrap();
let sender_ref = sender.clone();
async move {
match event {
- Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
- Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
- Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
- Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
+ Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
+ Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
+ Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
+ Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
_ => panic!("Unexpected event: {:?}", event),
}
}
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = super::process_events_async(
- persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
- nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
- Some(nodes[0].scorer.clone()), move |dur: Duration| {
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ move |dur: Duration| {
let mut exit_receiver = exit_receiver.clone();
Box::pin(async move {
tokio::select! {
_ = exit_receiver.changed() => true,
}
})
- }, false, || Some(Duration::ZERO),
+ },
+ false,
+ || Some(Duration::ZERO),
);
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let expected_log = "Persisting scorer after update".to_string();
- assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
+ assert_eq!(
+ *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
+ 5
+ );
});
let (r1, r2) = tokio::join!(t1, t2);