use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use bitcoin::transaction::Version;
use bitcoin::{Amount, ScriptBuf, Txid};
+ use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
- use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
+ use lightning::events::{
+ Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
+ };
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
}
}
+ #[test]
+ fn test_event_handling_failures_are_replayed() {
+ let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
+ let channel_value = 100000;
+ let data_dir = nodes[0].kv_store.get_data_dir();
+ let persister = Arc::new(Persister::new(data_dir.clone()));
+
+ let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
+ let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
+ let should_fail_event_handling = Arc::new(AtomicBool::new(true));
+ let event_handler = move |event: Event| {
+ if let Ok(true) = should_fail_event_handling.compare_exchange(
+ true,
+ false,
+ Ordering::Acquire,
+ Ordering::Relaxed,
+ ) {
+ first_event_send.send(event).unwrap();
+ return Err(ReplayEvent());
+ }
+
+ second_event_send.send(event).unwrap();
+ Ok(())
+ };
+
+ let bg_processor = BackgroundProcessor::start(
+ persister,
+ event_handler,
+ nodes[0].chain_monitor.clone(),
+ nodes[0].node.clone(),
+ Some(nodes[0].messenger.clone()),
+ nodes[0].no_gossip_sync(),
+ nodes[0].peer_manager.clone(),
+ nodes[0].logger.clone(),
+ Some(nodes[0].scorer.clone()),
+ );
+
+ begin_open_channel!(nodes[0], nodes[1], channel_value);
+ assert_eq!(
+ first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
+ second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+ );
+
+ if !std::thread::panicking() {
+ bg_processor.stop().unwrap();
+ }
+ }
+
#[test]
fn test_scorer_persistence() {
let (_, nodes) = create_nodes(2, "test_scorer_persistence");