Merge pull request #1964 from TheBlueMatt/2023-01-no-debug-panics
authorArik <arik-so@users.noreply.github.com>
Thu, 19 Jan 2023 01:41:54 +0000 (17:41 -0800)
committerGitHub <noreply@github.com>
Thu, 19 Jan 2023 01:41:54 +0000 (17:41 -0800)
Use test/_test_utils to enable single-threaded debug assertions

1  2 
lightning-net-tokio/src/lib.rs
lightning/src/ln/channelmanager.rs

index 45fe9b12f61b17a7b078805de1e6344efa91fbf3,39452cff034ffc27ba6118814efaf3f480d7c579..38b09a2886c2b825acec3cf4f4f741626f184139
@@@ -123,11 -123,7 +123,11 @@@ struct Connection 
        id: u64,
  }
  impl Connection {
 -      async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
 +      async fn poll_event_process<PM, CMH, RMH, OMH, L, UMH>(
 +              peer_manager: PM,
 +              mut event_receiver: mpsc::Receiver<()>,
 +      ) where
 +                      PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync,
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
                        OMH: Deref + 'static + Send + Sync,
                        OMH::Target: OnionMessageHandler + Send + Sync,
                        L::Target: Logger + Send + Sync,
                        UMH::Target: CustomMessageHandler + Send + Sync,
 -    {
 +      {
                loop {
                        if event_receiver.recv().await.is_none() {
                                return;
                }
        }
  
 -      async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
 +      async fn schedule_read<PM, CMH, RMH, OMH, L, UMH>(
 +              peer_manager: PM,
 +              us: Arc<Mutex<Self>>,
 +              mut reader: io::ReadHalf<TcpStream>,
 +              mut read_wake_receiver: mpsc::Receiver<()>,
 +              mut write_avail_receiver: mpsc::Receiver<()>,
 +      ) where
 +                      PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
                        OMH: Deref + 'static + Send + Sync,
                        OMH::Target: OnionMessageHandler + 'static + Send + Sync,
                        L::Target: Logger + 'static + Send + Sync,
                        UMH::Target: CustomMessageHandler + 'static + Send + Sync,
 -        {
 +              {
                // Create a waker to wake up poll_event_process, above
                let (event_waker, event_receiver) = mpsc::channel(1);
 -              tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
 +              tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
  
                // 8KB is nice and big but also should never cause any issues with stack overflowing.
                let mut buf = [0; 8192];
@@@ -283,11 -272,7 +283,11 @@@ fn get_addr_from_stream(stream: &StdTcp
  /// The returned future will complete when the peer is disconnected and associated handling
  /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
  /// not need to poll the provided future in order to make progress.
 -pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
 +pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH>(
 +      peer_manager: PM,
 +      stream: StdTcpStream,
 +) -> impl std::future::Future<Output=()> where
 +              PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
                OMH: Deref + 'static + Send + Sync,
  {
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
-       #[cfg(debug_assertions)]
+       #[cfg(test)]
        let last_us = Arc::clone(&us);
  
        let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr) {
                                // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
                                // keep too many wakers around, this makes sense. The race should be rare (we do
                                // some work after shutdown()) and an error would be a major memory leak.
-                               #[cfg(debug_assertions)]
-                               assert!(Arc::try_unwrap(last_us).is_ok());
+                               #[cfg(test)]
+                               debug_assert!(Arc::try_unwrap(last_us).is_ok());
                        }
                }
        }
  /// The returned future will complete when the peer is disconnected and associated handling
  /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
  /// not need to poll the provided future in order to make progress.
 -pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
 +pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH>(
 +      peer_manager: PM,
 +      their_node_id: PublicKey,
 +      stream: StdTcpStream,
 +) -> impl std::future::Future<Output=()> where
 +              PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
                OMH: Deref + 'static + Send + Sync,
  {
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
-       #[cfg(debug_assertions)]
+       #[cfg(test)]
        let last_us = Arc::clone(&us);
        let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
                Some(tokio::spawn(async move {
                                // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
                                // keep too many wakers around, this makes sense. The race should be rare (we do
                                // some work after shutdown()) and an error would be a major memory leak.
-                               #[cfg(debug_assertions)]
-                               assert!(Arc::try_unwrap(last_us).is_ok());
+                               #[cfg(test)]
+                               debug_assert!(Arc::try_unwrap(last_us).is_ok());
                        }
                }
        }
  /// disconnected and associated handling futures are freed, though, because all processing in said
  /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
  /// make progress.
 -pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
 +pub async fn connect_outbound<PM, CMH, RMH, OMH, L, UMH>(
 +      peer_manager: PM,
 +      their_node_id: PublicKey,
 +      addr: SocketAddr,
 +) -> Option<impl std::future::Future<Output=()>> where
 +              PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
                OMH: Deref + 'static + Send + Sync,
index af83edc5634d37d76d18c95bf61ede56999cdf23,c5b9f924d82927d24f69c1bd3b820528d080cb32..f502a3336cadd935a474b4abe89ef189f8432b0f
@@@ -1153,12 -1153,12 +1153,12 @@@ macro_rules! handle_error 
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
-                               #[cfg(debug_assertions)]
+                               #[cfg(any(feature = "_test_utils", test))]
                                {
                                        // In testing, ensure there are no deadlocks where the lock is already held upon
                                        // entering the macro.
-                                       assert!($self.pending_events.try_lock().is_ok());
-                                       assert!($self.per_peer_state.try_write().is_ok());
+                                       debug_assert!($self.pending_events.try_lock().is_ok());
+                                       debug_assert!($self.per_peer_state.try_write().is_ok());
                                }
  
                                let mut msg_events = Vec::with_capacity(2);
                                                let mut peer_state = peer_state_mutex.lock().unwrap();
                                                peer_state.pending_msg_events.append(&mut msg_events);
                                        }
-                                       #[cfg(debug_assertions)]
+                                       #[cfg(any(feature = "_test_utils", test))]
                                        {
                                                if let None = per_peer_state.get(&$counterparty_node_id) {
                                                        // This shouldn't occour in tests unless an unkown counterparty_node_id
                                                                => {
                                                                        assert_eq!(*data, expected_error_str);
                                                                        if let Some((err_channel_id, _user_channel_id)) = chan_id {
-                                                                               assert_eq!(*channel_id, err_channel_id);
+                                                                               debug_assert_eq!(*channel_id, err_channel_id);
                                                                        }
                                                                }
-                                                               _ => panic!("Unexpected event"),
+                                                               _ => debug_assert!(false, "Unexpected event"),
                                                        }
                                                }
                                        }
@@@ -3565,7 -3565,7 +3565,7 @@@ wher
        /// Fails an HTLC backwards to the sender of it to us.
        /// Note that we do not assume that channels corresponding to failed HTLCs are still available.
        fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
-               #[cfg(debug_assertions)]
+               #[cfg(any(feature = "_test_utils", test))]
                {
                        // Ensure that no peer state channel storage lock is not held when calling this
                        // function.
                        // this function with any `per_peer_state` peer lock aquired would.
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_, peer) in per_peer_state.iter() {
-                               assert!(peer.try_lock().is_ok());
+                               debug_assert!(peer.try_lock().is_ok());
                        }
                }
  
                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
 -                      counterparty_node_id.clone(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
 +                      counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
                        self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias)
                {
                        Err(e) => {
@@@ -6267,7 -6267,7 +6267,7 @@@ pub(crate) fn provided_channel_features
  /// Fetches the set of [`ChannelTypeFeatures`] flags which are provided by or required by
  /// [`ChannelManager`].
  pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelTypeFeatures {
 -      ChannelTypeFeatures::from_counterparty_init(&provided_init_features(config))
 +      ChannelTypeFeatures::from_init(&provided_init_features(config))
  }
  
  /// Fetches the set of [`InitFeatures`] flags which are provided by or required by
@@@ -6288,12 -6288,6 +6288,12 @@@ pub fn provided_init_features(_config: 
        features.set_channel_type_optional();
        features.set_scid_privacy_optional();
        features.set_zero_conf_optional();
 +      #[cfg(anchors)]
 +      { // Attributes are not allowed on if expressions on our current MSRV of 1.41.
 +              if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
 +                      features.set_anchors_zero_fee_htlc_tx_optional();
 +              }
 +      }
        features
  }
  
@@@ -7033,9 -7027,7 +7033,9 @@@ wher
                let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut channel_closures = Vec::new();
                for _ in 0..channel_count {
 -                      let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (&args.entropy_source, &args.signer_provider, best_block_height))?;
 +                      let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
 +                              &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
 +                      ))?;
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
@@@ -8319,42 -8311,6 +8319,42 @@@ mod tests 
  
                nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg);
        }
 +
 +      #[cfg(anchors)]
 +      #[test]
 +      fn test_anchors_zero_fee_htlc_tx_fallback() {
 +              // Tests that if both nodes support anchors, but the remote node does not want to accept
 +              // anchor channels at the moment, an error it sent to the local node such that it can retry
 +              // the channel without the anchors feature.
 +              let chanmon_cfgs = create_chanmon_cfgs(2);
 +              let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
 +              let mut anchors_config = test_default_channel_config();
 +              anchors_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
 +              anchors_config.manually_accept_inbound_channels = true;
 +              let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(anchors_config.clone()), Some(anchors_config.clone())]);
 +              let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 +
 +              nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 0, None).unwrap();
 +              let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 +              assert!(open_channel_msg.channel_type.as_ref().unwrap().supports_anchors_zero_fee_htlc_tx());
 +
 +              nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +              let events = nodes[1].node.get_and_clear_pending_events();
 +              match events[0] {
 +                      Event::OpenChannelRequest { temporary_channel_id, .. } => {
 +                              nodes[1].node.force_close_broadcasting_latest_txn(&temporary_channel_id, &nodes[0].node.get_our_node_id()).unwrap();
 +                      }
 +                      _ => panic!("Unexpected event"),
 +              }
 +
 +              let error_msg = get_err_msg!(nodes[1], nodes[0].node.get_our_node_id());
 +              nodes[0].node.handle_error(&nodes[1].node.get_our_node_id(), &error_msg);
 +
 +              let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 +              assert!(!open_channel_msg.channel_type.unwrap().supports_anchors_zero_fee_htlc_tx());
 +
 +              check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
 +      }
  }
  
  #[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))]