Merge pull request #2891 from TheBlueMatt/2024-02-no-ahash
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 19 Feb 2024 22:17:35 +0000 (22:17 +0000)
committerGitHub <noreply@github.com>
Mon, 19 Feb 2024 22:17:35 +0000 (22:17 +0000)
Drop the `ahash` dependency

lightning-background-processor/src/lib.rs
lightning/src/chain/onchaintx.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/outbound_payment.rs
lightning/src/ln/peer_handler.rs
lightning/src/routing/gossip.rs
lightning/src/routing/router.rs
lightning/src/util/persist.rs
lightning/src/util/test_utils.rs
lightning/src/util/wakers.rs

index 1300a67e2a16af0de614c7dce4ab37c6913686d7..46849e136f61c5a5bcd41e38b8f39251bcaac3d6 100644 (file)
@@ -854,8 +854,8 @@ impl BackgroundProcessor {
                                peer_manager.onion_message_handler().process_pending_events(&event_handler),
                                gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
-                                       channel_manager.get_event_or_persistence_needed_future(),
-                                       chain_monitor.get_update_future()
+                                       &channel_manager.get_event_or_persistence_needed_future(),
+                                       &chain_monitor.get_update_future()
                                ).wait_timeout(Duration::from_millis(100)); },
                                |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
                                || {
index 6c29d147f02fb7239a7aaf86112b3c9cf2e7509f..94d6aa35746caa3a6bbc10e991a7161d54aaef2c 100644 (file)
@@ -726,7 +726,10 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                B::Target: BroadcasterInterface,
                F::Target: FeeEstimator,
        {
-               log_debug!(logger, "Updating claims view at height {} with {} claim requests", cur_height, requests.len());
+               if !requests.is_empty() {
+                       log_debug!(logger, "Updating claims view at height {} with {} claim requests", cur_height, requests.len());
+               }
+
                let mut preprocessed_requests = Vec::with_capacity(requests.len());
                let mut aggregated_request = None;
 
@@ -772,6 +775,12 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
 
                // Claim everything up to and including `cur_height`
                let remaining_locked_packages = self.locktimed_packages.split_off(&(cur_height + 1));
+               if !self.locktimed_packages.is_empty() {
+                       log_debug!(logger,
+                               "Updating claims view at height {} with {} locked packages available for claim",
+                               cur_height,
+                               self.locktimed_packages.len());
+               }
                for (pop_height, mut entry) in self.locktimed_packages.iter_mut() {
                        log_trace!(logger, "Restoring delayed claim of package(s) at their timelock at {}.", pop_height);
                        preprocessed_requests.append(&mut entry);
@@ -852,8 +861,15 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                B::Target: BroadcasterInterface,
                F::Target: FeeEstimator,
        {
-               log_debug!(logger, "Updating claims view at height {} with {} matched transactions in block {}", cur_height, txn_matched.len(), conf_height);
+               let mut have_logged_intro = false;
+               let mut maybe_log_intro = || {
+                       if !have_logged_intro {
+                               log_debug!(logger, "Updating claims view at height {} with {} matched transactions in block {}", cur_height, txn_matched.len(), conf_height);
+                               have_logged_intro = true;
+                       }
+               };
                let mut bump_candidates = new_hash_map();
+               if !txn_matched.is_empty() { maybe_log_intro(); }
                for tx in txn_matched {
                        // Scan all input to verify is one of the outpoint spent is of interest for us
                        let mut claimed_outputs_material = Vec::new();
@@ -946,6 +962,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                        self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
                for entry in onchain_events_awaiting_threshold_conf {
                        if entry.has_reached_confirmation_threshold(cur_height) {
+                               maybe_log_intro();
                                match entry.event {
                                        OnchainEvent::Claim { claim_id } => {
                                                // We may remove a whole set of claim outpoints here, as these one may have
@@ -983,7 +1000,11 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                }
 
                // Build, bump and rebroadcast tx accordingly
-               log_trace!(logger, "Bumping {} candidates", bump_candidates.len());
+               if !bump_candidates.is_empty() {
+                       maybe_log_intro();
+                       log_trace!(logger, "Bumping {} candidates", bump_candidates.len());
+               }
+
                for (claim_id, request) in bump_candidates.iter() {
                        if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(
                                cur_height, &request, &FeerateStrategy::ForceBump, &*fee_estimator, &*logger,
index 243cf74182205a61f62fae90b66e58f4da0c62bd..55b2cbae441f5870498ab703787b2772ed9a7310 100644 (file)
@@ -4833,10 +4833,6 @@ where
 
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
-                       if new_feerate != chan.context.get_feerate_sat_per_1000_weight() {
-                               log_trace!(logger, "Channel {} does not qualify for a feerate change from {} to {}.",
-                               chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
-                       }
                        return NotifyOption::SkipPersistNoEvents;
                }
                if !chan.context.is_live() {
index c260bac856a72635e5661f3a5dd33d870460db50..b05d6f3f7290110b5aed103faef370203350a8fc 100644 (file)
@@ -2283,65 +2283,6 @@ mod tests {
                assert!(pending_events.lock().unwrap().is_empty());
        }
 
-       #[test]
-       fn fails_paying_for_bolt12_invoice() {
-               let logger = test_utils::TestLogger::new();
-               let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger));
-               let scorer = RwLock::new(test_utils::TestScorer::new());
-               let router = test_utils::TestRouter::new(network_graph, &logger, &scorer);
-               let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet);
-
-               let pending_events = Mutex::new(VecDeque::new());
-               let outbound_payments = OutboundPayments::new();
-               let payment_id = PaymentId([0; 32]);
-               let expiration = StaleExpiration::AbsoluteTimeout(Duration::from_secs(100));
-
-               let invoice = OfferBuilder::new("foo".into(), recipient_pubkey())
-                       .amount_msats(1000)
-                       .build().unwrap()
-                       .request_invoice(vec![1; 32], payer_pubkey()).unwrap()
-                       .build().unwrap()
-                       .sign(payer_sign).unwrap()
-                       .respond_with_no_std(payment_paths(), payment_hash(), now()).unwrap()
-                       .build().unwrap()
-                       .sign(recipient_sign).unwrap();
-
-               assert!(
-                       outbound_payments.add_new_awaiting_invoice(
-                               payment_id, expiration, Retry::Attempts(0),
-                               Some(invoice.amount_msats() / 100 + 50_000)
-                       ).is_ok()
-               );
-               assert!(outbound_payments.has_pending_payments());
-
-               let route_params = RouteParameters::from_payment_params_and_value(
-                       PaymentParameters::from_bolt12_invoice(&invoice),
-                       invoice.amount_msats(),
-               );
-               router.expect_find_route(
-                       route_params.clone(), Ok(Route { paths: vec![], route_params: Some(route_params) })
-               );
-
-               assert_eq!(
-                       outbound_payments.send_payment_for_bolt12_invoice(
-                               &invoice, payment_id, &&router, vec![], || InFlightHtlcs::new(), &&keys_manager,
-                               &&keys_manager, 0, &&logger, &pending_events, |_| panic!()
-                       ),
-                       Ok(()),
-               );
-               assert!(!outbound_payments.has_pending_payments());
-
-               let payment_hash = invoice.payment_hash();
-               let reason = Some(PaymentFailureReason::UnexpectedError);
-
-               assert!(!pending_events.lock().unwrap().is_empty());
-               assert_eq!(
-                       pending_events.lock().unwrap().pop_front(),
-                       Some((Event::PaymentFailed { payment_id, payment_hash, reason }, None)),
-               );
-               assert!(pending_events.lock().unwrap().is_empty());
-       }
-
        #[test]
        fn sends_payment_for_bolt12_invoice() {
                let logger = test_utils::TestLogger::new();
index 0f206117ec676abbc0b4123fac756c68802eccdd..8d83763715c83326b61cdf83f2faeec6f721d795 100644 (file)
@@ -33,7 +33,7 @@ use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
 use crate::onion_message::packet::OnionMessageContents;
 use crate::routing::gossip::{NodeId, NodeAlias};
 use crate::util::atomic_counter::AtomicCounter;
-use crate::util::logger::{Logger, WithContext};
+use crate::util::logger::{Level, Logger, WithContext};
 use crate::util::string::PrintableString;
 
 use crate::prelude::*;
@@ -1329,7 +1329,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                                        return Err(PeerHandleError { });
                                                                                },
                                                                                msgs::ErrorAction::IgnoreAndLog(level) => {
-                                                                                       log_given_level!(logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
+                                                                                       log_given_level!(logger, level, "Error handling {}message{}; ignoring: {}",
+                                                                                               if level == Level::Gossip { "gossip " } else { "" },
+                                                                                               OptionalFromDebugger(&peer_node_id), e.err);
                                                                                        continue
                                                                                },
                                                                                msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these
index 950ec79a1e98e19e8d31da31624227ddca07832a..a4938f72e8b6fd9b6eee819ebe307c30c176e1e3 100644 (file)
@@ -1925,7 +1925,10 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
                        None => {
                                core::mem::drop(channels);
                                self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
-                               return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
+                               return Err(LightningError {
+                                       err: "Couldn't find channel for update".to_owned(),
+                                       action: ErrorAction::IgnoreAndLog(Level::Gossip),
+                               });
                        },
                        Some(channel) => {
                                if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
index f298f278439f284ccb15bcd66973eb94991d8174..8e59c9bd46f5ff5edeed994adb96d2667a0acfc3 100644 (file)
@@ -505,20 +505,20 @@ impl Writeable for Route {
                write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
                (self.paths.len() as u64).write(writer)?;
                let mut blinded_tails = Vec::new();
-               for path in self.paths.iter() {
+               for (idx, path) in self.paths.iter().enumerate() {
                        (path.hops.len() as u8).write(writer)?;
-                       for (idx, hop) in path.hops.iter().enumerate() {
+                       for hop in path.hops.iter() {
                                hop.write(writer)?;
-                               if let Some(blinded_tail) = &path.blinded_tail {
-                                       if blinded_tails.is_empty() {
-                                               blinded_tails = Vec::with_capacity(path.hops.len());
-                                               for _ in 0..idx {
-                                                       blinded_tails.push(None);
-                                               }
-                                       }
-                                       blinded_tails.push(Some(blinded_tail));
-                               } else if !blinded_tails.is_empty() { blinded_tails.push(None); }
                        }
+                       if let Some(blinded_tail) = &path.blinded_tail {
+                               if blinded_tails.is_empty() {
+                                       blinded_tails = Vec::with_capacity(path.hops.len());
+                                       for _ in 0..idx {
+                                               blinded_tails.push(None);
+                                       }
+                               }
+                               blinded_tails.push(Some(blinded_tail));
+                       } else if !blinded_tails.is_empty() { blinded_tails.push(None); }
                }
                write_tlv_fields!(writer, {
                        // For compatibility with LDK versions prior to 0.0.117, we take the individual
@@ -526,7 +526,7 @@ impl Writeable for Route {
                        (1, self.route_params.as_ref().map(|p| &p.payment_params), option),
                        (2, blinded_tails, optional_vec),
                        (3, self.route_params.as_ref().map(|p| p.final_value_msat), option),
-                       (5, self.route_params.as_ref().map(|p| p.max_total_routing_fee_msat), option),
+                       (5, self.route_params.as_ref().and_then(|p| p.max_total_routing_fee_msat), option),
                });
                Ok(())
        }
index 7d501345c3ce0d9a2c9c717ae41843b5fb79d79e..2f418a8efc284a4de2d60be4b2b958b369518529 100644 (file)
@@ -187,6 +187,41 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der
        }
 }
 
+impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for dyn KVStore + Send + Sync
+       where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+               T::Target: 'static + BroadcasterInterface,
+               ES::Target: 'static + EntropySource,
+               NS::Target: 'static + NodeSigner,
+               SP::Target: 'static + SignerProvider,
+               F::Target: 'static + FeeEstimator,
+               R::Target: 'static + Router,
+               L::Target: 'static + Logger,
+{
+       /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed.
+       fn persist_manager(&self, channel_manager: &ChannelManager<M, T, ES, NS, SP, F, R, L>) -> Result<(), io::Error> {
+               self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
+                       CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
+                       CHANNEL_MANAGER_PERSISTENCE_KEY,
+                       &channel_manager.encode())
+       }
+
+       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+       fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
+               self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
+                       NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
+                       NETWORK_GRAPH_PERSISTENCE_KEY,
+                       &network_graph.encode())
+       }
+
+       /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
+       fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
+               self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
+                       SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
+                       SCORER_PERSISTENCE_KEY,
+                       &scorer.encode())
+       }
+}
+
 impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
        // TODO: We really need a way for the persister to inform the user that its time to crash/shut
        // down once these start returning failure.
@@ -218,6 +253,37 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSign
        }
 }
 
+impl<ChannelSigner: WriteableEcdsaChannelSigner> Persist<ChannelSigner> for dyn KVStore + Send + Sync {
+       // TODO: We really need a way for the persister to inform the user that its time to crash/shut
+       // down once these start returning failure.
+       // Then we should return InProgress rather than UnrecoverableError, implying we should probably
+       // just shut down the node since we're not retrying persistence!
+
+       fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
+               let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
+               match self.write(
+                       CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+                       &key, &monitor.encode())
+               {
+                       Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
+                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+               }
+       }
+
+       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
+               let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
+               match self.write(
+                       CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+                       &key, &monitor.encode())
+               {
+                       Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
+                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+               }
+       }
+}
+
 /// Read previously persisted [`ChannelMonitor`]s from the store.
 pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
        kv_store: K, entropy_source: ES, signer_provider: SP,
index 09a8c3a26612725c07fedadc7cbb269814b54f88..15cc07466d603b4f049d00fc52df7bb546765107 100644 (file)
@@ -143,7 +143,9 @@ impl<'a> Router for TestRouter<'a> {
                &self, payer: &PublicKey, params: &RouteParameters, first_hops: Option<&[&ChannelDetails]>,
                inflight_htlcs: InFlightHtlcs
        ) -> Result<Route, msgs::LightningError> {
-               if let Some((find_route_query, find_route_res)) = self.next_routes.lock().unwrap().pop_front() {
+               let route_res;
+               let next_route_opt = self.next_routes.lock().unwrap().pop_front();
+               if let Some((find_route_query, find_route_res)) = next_route_opt {
                        assert_eq!(find_route_query, *params);
                        if let Ok(ref route) = find_route_res {
                                assert_eq!(route.route_params, Some(find_route_query));
@@ -201,10 +203,18 @@ impl<'a> Router for TestRouter<'a> {
                                        }
                                }
                        }
-                       return find_route_res;
-               }
+                       route_res = find_route_res;
+               } else {
+                       route_res = self.router.find_route(payer, params, first_hops, inflight_htlcs);
+               };
 
-               self.router.find_route(payer, params, first_hops, inflight_htlcs)
+               if let Ok(route) = &route_res {
+                       // Previously, `Route`s failed to round-trip through serialization due to a write/read
+                       // mismatch. Thus, here we test all test-generated routes round-trip:
+                       let ser = route.encode();
+                       assert_eq!(Route::read(&mut &ser[..]).unwrap(), *route);
+               }
+               route_res
        }
 
        fn create_blinded_payment_paths<
@@ -632,6 +642,9 @@ impl KVStore for TestStore {
        }
 }
 
+unsafe impl Sync for TestStore {}
+unsafe impl Send for TestStore {}
+
 pub struct TestBroadcaster {
        pub txn_broadcasted: Mutex<Vec<Transaction>>,
        pub blocks: Arc<Mutex<Vec<(Block, u32)>>>,
index 14e6bbe64a24524367661da70772f603538c3b55..b2c9d21b998e790dba23570d49be13a2be6d2053 100644 (file)
@@ -56,25 +56,33 @@ impl Notifier {
        /// Gets a [`Future`] that will get woken up with any waiters
        pub(crate) fn get_future(&self) -> Future {
                let mut lock = self.notify_pending.lock().unwrap();
+               let mut self_idx = 0;
                if let Some(existing_state) = &lock.1 {
-                       if existing_state.lock().unwrap().callbacks_made {
+                       let mut locked = existing_state.lock().unwrap();
+                       if locked.callbacks_made {
                                // If the existing `FutureState` has completed and actually made callbacks,
                                // consider the notification flag to have been cleared and reset the future state.
+                               mem::drop(locked);
                                lock.1.take();
                                lock.0 = false;
+                       } else {
+                               self_idx = locked.next_idx;
+                               locked.next_idx += 1;
                        }
                }
                if let Some(existing_state) = &lock.1 {
-                       Future { state: Arc::clone(&existing_state) }
+                       Future { state: Arc::clone(&existing_state), self_idx }
                } else {
                        let state = Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               std_future_callbacks: Vec::new(),
                                callbacks_with_state: Vec::new(),
                                complete: lock.0,
                                callbacks_made: false,
+                               next_idx: 1,
                        }));
                        lock.1 = Some(Arc::clone(&state));
-                       Future { state }
+                       Future { state, self_idx: 0 }
                }
        }
 
@@ -109,36 +117,39 @@ define_callback!(Send);
 define_callback!();
 
 pub(crate) struct FutureState {
-       // When we're tracking whether a callback counts as having woken the user's code, we check the
-       // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
-       // user-provided function.
-       callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
-       callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
+       // `callbacks` count as having woken the users' code (as they go direct to the user), but
+       // `std_future_callbacks` and `callbacks_with_state` do not (as the first just wakes a future,
+       // we only count it after another `poll()` and the second wakes a `Sleeper` which handles
+       // setting `callbacks_made` itself).
+       callbacks: Vec<Box<dyn FutureCallback>>,
+       std_future_callbacks: Vec<(usize, StdWaker)>,
+       callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
        complete: bool,
        callbacks_made: bool,
+       next_idx: usize,
 }
 
 fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
        let mut state_lock = this.lock().unwrap();
        let state = &mut *state_lock;
-       for (counts_as_call, callback) in state.callbacks.drain(..) {
+       for callback in state.callbacks.drain(..) {
                callback.call();
-               state.callbacks_made |= counts_as_call;
+               state.callbacks_made = true;
        }
-       for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
+       for (_, waker) in state.std_future_callbacks.drain(..) {
+               waker.0.wake_by_ref();
+       }
+       for callback in state.callbacks_with_state.drain(..) {
                (callback)(this);
-               state.callbacks_made |= counts_as_call;
        }
        state.complete = true;
        state.callbacks_made
 }
 
 /// A simple future which can complete once, and calls some callback(s) when it does so.
-///
-/// Clones can be made and all futures cloned from the same source will complete at the same time.
-#[derive(Clone)]
 pub struct Future {
        state: Arc<Mutex<FutureState>>,
+       self_idx: usize,
 }
 
 impl Future {
@@ -153,7 +164,7 @@ impl Future {
                        mem::drop(state);
                        callback.call();
                } else {
-                       state.callbacks.push((true, callback));
+                       state.callbacks.push(callback);
                }
        }
 
@@ -169,16 +180,16 @@ impl Future {
 
        /// Waits until this [`Future`] completes.
        #[cfg(feature = "std")]
-       pub fn wait(self) {
-               Sleeper::from_single_future(self).wait();
+       pub fn wait(&self) {
+               Sleeper::from_single_future(&self).wait();
        }
 
        /// Waits until this [`Future`] completes or the given amount of time has elapsed.
        ///
        /// Returns true if the [`Future`] completed, false if the time elapsed.
        #[cfg(feature = "std")]
-       pub fn wait_timeout(self, max_wait: Duration) -> bool {
-               Sleeper::from_single_future(self).wait_timeout(max_wait)
+       pub fn wait_timeout(&self, max_wait: Duration) -> bool {
+               Sleeper::from_single_future(&self).wait_timeout(max_wait)
        }
 
        #[cfg(test)]
@@ -191,11 +202,14 @@ impl Future {
        }
 }
 
+impl Drop for Future {
+       fn drop(&mut self) {
+               self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
+       }
+}
+
 use core::task::Waker;
 struct StdWaker(pub Waker);
-impl FutureCallback for StdWaker {
-       fn call(&self) { self.0.wake_by_ref() }
-}
 
 /// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
 impl<'a> StdFuture for Future {
@@ -208,7 +222,8 @@ impl<'a> StdFuture for Future {
                        Poll::Ready(())
                } else {
                        let waker = cx.waker().clone();
-                       state.callbacks.push((false, Box::new(StdWaker(waker))));
+                       state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
+                       state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
                        Poll::Pending
                }
        }
@@ -224,17 +239,17 @@ pub struct Sleeper {
 #[cfg(feature = "std")]
 impl Sleeper {
        /// Constructs a new sleeper from one future, allowing blocking on it.
-       pub fn from_single_future(future: Future) -> Self {
-               Self { notifiers: vec![future.state] }
+       pub fn from_single_future(future: &Future) -> Self {
+               Self { notifiers: vec![Arc::clone(&future.state)] }
        }
        /// Constructs a new sleeper from two futures, allowing blocking on both at once.
        // Note that this is the common case - a ChannelManager and ChainMonitor.
-       pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
-               Self { notifiers: vec![fut_a.state, fut_b.state] }
+       pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
+               Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
        }
        /// Constructs a new sleeper on many futures, allowing blocking on all at once.
        pub fn new(futures: Vec<Future>) -> Self {
-               Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
+               Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
        }
        /// Prepares to go into a wait loop body, creating a condition variable which we can block on
        /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
@@ -251,10 +266,10 @@ impl Sleeper {
                                        *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
                                        break;
                                }
-                               notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
+                               notifier.callbacks_with_state.push(Box::new(move |notifier_ref| {
                                        *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
                                        cv_ref.notify_all();
-                               })));
+                               }));
                        }
                }
                (cv, notified_fut_mtx)
@@ -439,13 +454,15 @@ mod tests {
 
                // Wait on the other thread to finish its sleep, note that the leak only happened if we
                // actually have to sleep here, not if we immediately return.
-               Sleeper::from_two_futures(future_a, future_b).wait();
+               Sleeper::from_two_futures(&future_a, &future_b).wait();
 
                join_handle.join().unwrap();
 
                // then drop the notifiers and make sure the future states are gone.
                mem::drop(notifier_a);
                mem::drop(notifier_b);
+               mem::drop(future_a);
+               mem::drop(future_b);
 
                assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
        }
@@ -455,10 +472,13 @@ mod tests {
                let future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               std_future_callbacks: Vec::new(),
                                callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
-                       }))
+                               next_idx: 1,
+                       })),
+                       self_idx: 0,
                };
                let callback = Arc::new(AtomicBool::new(false));
                let callback_ref = Arc::clone(&callback);
@@ -475,10 +495,13 @@ mod tests {
                let future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               std_future_callbacks: Vec::new(),
                                callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
-                       }))
+                               next_idx: 1,
+                       })),
+                       self_idx: 0,
                };
                complete_future(&future.state);
 
@@ -514,12 +537,15 @@ mod tests {
                let mut future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               std_future_callbacks: Vec::new(),
                                callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
-                       }))
+                               next_idx: 2,
+                       })),
+                       self_idx: 0,
                };
-               let mut second_future = Future { state: Arc::clone(&future.state) };
+               let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
 
                let (woken, waker) = create_waker();
                assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
@@ -638,18 +664,18 @@ mod tests {
                // Set both notifiers as woken without sleeping yet.
                notifier_a.notify();
                notifier_b.notify();
-               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+               Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
 
                // One future has woken us up, but the other should still have a pending notification.
-               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+               Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
 
                // However once we've slept twice, we should no longer have any pending notifications
-               assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+               assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
                        .wait_timeout(Duration::from_millis(10)));
 
                // Test ordering somewhat more.
                notifier_a.notify();
-               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+               Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
        }
 
        #[test]
@@ -667,7 +693,7 @@ mod tests {
 
                // After sleeping one future (not guaranteed which one, however) will have its notification
                // bit cleared.
-               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+               Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
 
                // By registering a callback on the futures for both notifiers, one will complete
                // immediately, but one will remain tied to the notifier, and will complete once the
@@ -686,8 +712,48 @@ mod tests {
                notifier_b.notify();
 
                assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
-               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
-               assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+               Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
+               assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
                        .wait_timeout(Duration::from_millis(10)));
        }
+
+       #[test]
+       #[cfg(feature = "std")]
+       fn multi_poll_stores_single_waker() {
+               // When a `Future` is `poll()`ed multiple times, only the last `Waker` should be called,
+               // but previously we'd store all `Waker`s until they're all woken at once. This tests a few
+               // cases to ensure `Future`s avoid storing an endless set of `Waker`s.
+               let notifier = Notifier::new();
+               let future_state = Arc::clone(&notifier.get_future().state);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
+
+               // Test that simply polling a future twice doesn't result in two pending `Waker`s.
+               let mut future_a = notifier.get_future();
+               assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
+               assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
+
+               // If we poll a second future, however, that will store a second `Waker`.
+               let mut future_b = notifier.get_future();
+               assert_eq!(Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
+
+               // but when we drop the `Future`s, the pending Wakers will also be dropped.
+               mem::drop(future_a);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
+               mem::drop(future_b);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
+
+               // Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
+               let mut future_a = notifier.get_future();
+               assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
+               assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
+               notifier.notify();
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
+               assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Ready(()));
+               assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
+       }
 }