id: u64,
}
impl Connection {
- async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+ async fn poll_event_process<PM, CMH, RMH, OMH, L, UMH>(
+ peer_manager: PM,
+ mut event_receiver: mpsc::Receiver<()>,
+ ) where
+ PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
- {
+ {
loop {
if event_receiver.recv().await.is_none() {
return;
}
}
- async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+ async fn schedule_read<PM, CMH, RMH, OMH, L, UMH>(
+ peer_manager: PM,
+ us: Arc<Mutex<Self>>,
+ mut reader: io::ReadHalf<TcpStream>,
+ mut read_wake_receiver: mpsc::Receiver<()>,
+ mut write_avail_receiver: mpsc::Receiver<()>,
+ ) where
+ PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
L::Target: Logger + 'static + Send + Sync,
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
- {
+ {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
- tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
+ tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH>(
+ peer_manager: PM,
+ stream: StdTcpStream,
+) -> impl std::future::Future<Output=()> where
+ PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
{
let remote_addr = get_addr_from_stream(&stream);
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
- #[cfg(debug_assertions)]
+ #[cfg(test)]
let last_us = Arc::clone(&us);
let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr) {
// socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
// keep too many wakers around, this makes sense. The race should be rare (we do
// some work after shutdown()) and an error would be a major memory leak.
- #[cfg(debug_assertions)]
- assert!(Arc::try_unwrap(last_us).is_ok());
+ #[cfg(test)]
+ debug_assert!(Arc::try_unwrap(last_us).is_ok());
}
}
}
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH>(
+ peer_manager: PM,
+ their_node_id: PublicKey,
+ stream: StdTcpStream,
+) -> impl std::future::Future<Output=()> where
+ PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
{
let remote_addr = get_addr_from_stream(&stream);
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
- #[cfg(debug_assertions)]
+ #[cfg(test)]
let last_us = Arc::clone(&us);
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
Some(tokio::spawn(async move {
// socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
// keep too many wakers around, this makes sense. The race should be rare (we do
// some work after shutdown()) and an error would be a major memory leak.
- #[cfg(debug_assertions)]
- assert!(Arc::try_unwrap(last_us).is_ok());
+ #[cfg(test)]
+ debug_assert!(Arc::try_unwrap(last_us).is_ok());
}
}
}
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
-pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+pub async fn connect_outbound<PM, CMH, RMH, OMH, L, UMH>(
+ peer_manager: PM,
+ their_node_id: PublicKey,
+ addr: SocketAddr,
+) -> Option<impl std::future::Future<Output=()>> where
+ PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
- #[cfg(debug_assertions)]
+ #[cfg(any(feature = "_test_utils", test))]
{
// In testing, ensure there are no deadlocks where the lock is already held upon
// entering the macro.
- assert!($self.pending_events.try_lock().is_ok());
- assert!($self.per_peer_state.try_write().is_ok());
+ debug_assert!($self.pending_events.try_lock().is_ok());
+ debug_assert!($self.per_peer_state.try_write().is_ok());
}
let mut msg_events = Vec::with_capacity(2);
let mut peer_state = peer_state_mutex.lock().unwrap();
peer_state.pending_msg_events.append(&mut msg_events);
}
- #[cfg(debug_assertions)]
+ #[cfg(any(feature = "_test_utils", test))]
{
if let None = per_peer_state.get(&$counterparty_node_id) {
// This shouldn't occour in tests unless an unkown counterparty_node_id
=> {
assert_eq!(*data, expected_error_str);
if let Some((err_channel_id, _user_channel_id)) = chan_id {
- assert_eq!(*channel_id, err_channel_id);
+ debug_assert_eq!(*channel_id, err_channel_id);
}
}
- _ => panic!("Unexpected event"),
+ _ => debug_assert!(false, "Unexpected event"),
}
}
}
/// Fails an HTLC backwards to the sender of it to us.
/// Note that we do not assume that channels corresponding to failed HTLCs are still available.
fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
- #[cfg(debug_assertions)]
+ #[cfg(any(feature = "_test_utils", test))]
{
// Ensure that no peer state channel storage lock is not held when calling this
// function.
// this function with any `per_peer_state` peer lock aquired would.
let per_peer_state = self.per_peer_state.read().unwrap();
for (_, peer) in per_peer_state.iter() {
- assert!(peer.try_lock().is_ok());
+ debug_assert!(peer.try_lock().is_ok());
}
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
- counterparty_node_id.clone(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
+ counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias)
{
Err(e) => {
/// Fetches the set of [`ChannelTypeFeatures`] flags which are provided by or required by
/// [`ChannelManager`].
pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelTypeFeatures {
- ChannelTypeFeatures::from_counterparty_init(&provided_init_features(config))
+ ChannelTypeFeatures::from_init(&provided_init_features(config))
}
/// Fetches the set of [`InitFeatures`] flags which are provided by or required by
features.set_channel_type_optional();
features.set_scid_privacy_optional();
features.set_zero_conf_optional();
+ #[cfg(anchors)]
+ { // Attributes are not allowed on if expressions on our current MSRV of 1.41.
+ if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
+ features.set_anchors_zero_fee_htlc_tx_optional();
+ }
+ }
features
}
let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut channel_closures = Vec::new();
for _ in 0..channel_count {
- let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (&args.entropy_source, &args.signer_provider, best_block_height))?;
+ let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
+ &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
+ ))?;
let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
funding_txo_set.insert(funding_txo.clone());
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg);
}
+
+ #[cfg(anchors)]
+ #[test]
+ fn test_anchors_zero_fee_htlc_tx_fallback() {
+ // Tests that if both nodes support anchors, but the remote node does not want to accept
+ // anchor channels at the moment, an error it sent to the local node such that it can retry
+ // the channel without the anchors feature.
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let mut anchors_config = test_default_channel_config();
+ anchors_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
+ anchors_config.manually_accept_inbound_channels = true;
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(anchors_config.clone()), Some(anchors_config.clone())]);
+ let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+ nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 0, None).unwrap();
+ let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+ assert!(open_channel_msg.channel_type.as_ref().unwrap().supports_anchors_zero_fee_htlc_tx());
+
+ nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+ let events = nodes[1].node.get_and_clear_pending_events();
+ match events[0] {
+ Event::OpenChannelRequest { temporary_channel_id, .. } => {
+ nodes[1].node.force_close_broadcasting_latest_txn(&temporary_channel_id, &nodes[0].node.get_our_node_id()).unwrap();
+ }
+ _ => panic!("Unexpected event"),
+ }
+
+ let error_msg = get_err_msg!(nodes[1], nodes[0].node.get_our_node_id());
+ nodes[0].node.handle_error(&nodes[1].node.get_our_node_id(), &error_msg);
+
+ let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+ assert!(!open_channel_msg.channel_type.unwrap().supports_anchors_zero_fee_htlc_tx());
+
+ check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
+ }
}
#[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))]