Merge pull request #2212 from wpaulino/off-by-one-locktime
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Sat, 22 Apr 2023 21:54:06 +0000 (21:54 +0000)
committerGitHub <noreply@github.com>
Sat, 22 Apr 2023 21:54:06 +0000 (21:54 +0000)
Fix off-by-one finalized transaction locktime

ci/ci-tests.sh
lightning-background-processor/src/lib.rs
lightning-net-tokio/src/lib.rs
lightning/src/ln/channelmanager.rs

index e0e66861fa98c05c7d38a5db081a6288e0323304..f948cc0d63e925f94b803f2738ed0475f01cbc2b 100755 (executable)
@@ -87,7 +87,7 @@ fi
 
 echo -e "\n\nTest futures builds"
 pushd lightning-background-processor
-cargo test --verbose --color always --no-default-features --features futures
+cargo test --verbose --color always --features futures
 popd
 
 if [ "$RUSTC_MINOR_VERSION" -gt 55 ]; then
index f9b2990a2f28e2210622c8eba16225dac5e2aa27..af9fbcd15f5114a985025fa588a58d2e8e6fc3f4 100644 (file)
@@ -350,7 +350,8 @@ macro_rules! define_run_body {
                        // falling back to our usual hourly prunes. This avoids short-lived clients never
                        // pruning their network graph. We run once 60 seconds after startup before
                        // continuing our normal cadence.
-                       if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
+                       let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
+                       if $timer_elapsed(&mut last_prune_call, prune_timer) {
                                // The network graph must not be pruned while rapid sync completion is pending
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
                                        #[cfg(feature = "std")] {
@@ -368,7 +369,8 @@ macro_rules! define_run_body {
 
                                        have_pruned = true;
                                }
-                               last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
+                               let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
+                               last_prune_call = $get_timer(prune_timer);
                        }
 
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
@@ -881,7 +883,10 @@ mod tests {
 
                        if key == "network_graph" {
                                if let Some(sender) = &self.graph_persistence_notifier {
-                                       sender.send(()).unwrap();
+                                       match sender.send(()) {
+                                               Ok(()) => {},
+                                               Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
+                                       }
                                };
 
                                if let Some((error, message)) = self.graph_error {
@@ -1497,10 +1502,9 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
                                let mut i = 0;
                                loop {
@@ -1512,7 +1516,9 @@ mod tests {
                        }, tokio::time::sleep(Duration::from_millis(1)).await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 
        macro_rules! do_test_payment_path_scoring {
@@ -1666,13 +1672,14 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_payment_path_scoring!(nodes, receiver.recv().await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 }
index 37c9ddad76204ee68dda7656a81772bb97ef6fca..48f1736d0c2296988c5d0f7c2c110421d5ca319f 100644 (file)
@@ -189,7 +189,7 @@ impl Connection {
                        // our timeslice to another task we may just spin on this peer, starving other peers
                        // and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
                        // here.
-                       tokio::task::yield_now().await;
+                       let _ = tokio::task::yield_now().await;
                };
                let writer_option = us.lock().unwrap().writer.take();
                if let Some(mut writer) = writer_option {
index e8261f686264b884b7679185ee1543292c9193b5..d019cd6fd2a561741bba7da9efe01259160da98e 100644 (file)
@@ -72,7 +72,7 @@ use core::{cmp, mem};
 use core::cell::RefCell;
 use crate::io::Read;
 use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
-use core::sync::atomic::{AtomicUsize, Ordering};
+use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
 use core::time::Duration;
 use core::ops::Deref;
 
@@ -934,6 +934,8 @@ where
 
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_events: Mutex<Vec<events::Event>>,
+       /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
+       pending_events_processor: AtomicBool,
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_background_events: Mutex<Vec<BackgroundEvent>>,
        /// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1696,30 +1698,47 @@ macro_rules! handle_new_monitor_update {
 
 macro_rules! process_events_body {
        ($self: expr, $event_to_handle: expr, $handle_event: expr) => {
-               // We'll acquire our total consistency lock until the returned future completes so that
-               // we can be sure no other persists happen while processing events.
-               let _read_guard = $self.total_consistency_lock.read().unwrap();
+               let mut processed_all_events = false;
+               while !processed_all_events {
+                       if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+                               return;
+                       }
 
-               let mut result = NotifyOption::SkipPersist;
+                       let mut result = NotifyOption::SkipPersist;
 
-               // TODO: This behavior should be documented. It's unintuitive that we query
-               // ChannelMonitors when clearing other events.
-               if $self.process_pending_monitor_events() {
-                       result = NotifyOption::DoPersist;
-               }
+                       {
+                               // We'll acquire our total consistency lock so that we can be sure no other
+                               // persists happen while processing monitor events.
+                               let _read_guard = $self.total_consistency_lock.read().unwrap();
+
+                               // TODO: This behavior should be documented. It's unintuitive that we query
+                               // ChannelMonitors when clearing other events.
+                               if $self.process_pending_monitor_events() {
+                                       result = NotifyOption::DoPersist;
+                               }
+                       }
 
-               let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
-               if !pending_events.is_empty() {
-                       result = NotifyOption::DoPersist;
-               }
+                       let pending_events = $self.pending_events.lock().unwrap().clone();
+                       let num_events = pending_events.len();
+                       if !pending_events.is_empty() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               for event in pending_events {
-                       $event_to_handle = event;
-                       $handle_event;
-               }
+                       for event in pending_events {
+                               $event_to_handle = event;
+                               $handle_event;
+                       }
 
-               if result == NotifyOption::DoPersist {
-                       $self.persistence_notifier.notify();
+                       {
+                               let mut pending_events = $self.pending_events.lock().unwrap();
+                               pending_events.drain(..num_events);
+                               processed_all_events = pending_events.is_empty();
+                               $self.pending_events_processor.store(false, Ordering::Release);
+                       }
+
+                       if result == NotifyOption::DoPersist {
+                               $self.persistence_notifier.notify();
+                       }
                }
        }
 }
@@ -1787,6 +1806,7 @@ where
                        per_peer_state: FairRwLock::new(HashMap::new()),
 
                        pending_events: Mutex::new(Vec::new()),
+                       pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        persistence_notifier: Notifier::new(),
@@ -8027,6 +8047,7 @@ where
                        per_peer_state: FairRwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(pending_events_read),
+                       pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        persistence_notifier: Notifier::new(),
@@ -8058,8 +8079,6 @@ mod tests {
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
        use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
-       #[cfg(feature = "std")]
-       use core::time::Duration;
        use core::sync::atomic::Ordering;
        use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
        use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};