Add a few more simple tests of the PeerHandler
[rust-lightning] / fuzz / src / chanmon_consistency.rs
index 4177b50275931722565fc1468aa7e5b08f29d76d..8c4f5adcb64ae64abbbc03b0956c556f3abbf44c 100644 (file)
@@ -30,11 +30,11 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hash_types::{BlockHash, WPubkeyHash};
 
 use lightning::chain;
-use lightning::chain::{BestBlock, chainmonitor, channelmonitor, Confirm, Watch};
-use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
+use lightning::chain::{BestBlock, ChannelMonitorUpdateErr, chainmonitor, channelmonitor, Confirm, Watch};
+use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent};
 use lightning::chain::transaction::OutPoint;
 use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
-use lightning::chain::keysinterface::{KeysInterface, InMemorySigner};
+use lightning::chain::keysinterface::{KeyMaterial, KeysInterface, InMemorySigner, Recipient};
 use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
 use lightning::ln::channelmanager::{ChainParameters, ChannelManager, PaymentSendFailure, ChannelManagerReadArgs};
 use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
@@ -50,8 +50,7 @@ use lightning::util::events::MessageSendEventsProvider;
 use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
 use lightning::routing::router::{Route, RouteHop};
 
-
-use utils::test_logger;
+use utils::test_logger::{self, Output};
 use utils::test_persister::TestPersister;
 
 use bitcoin::secp256k1::key::{PublicKey,SecretKey};
@@ -64,6 +63,7 @@ use std::collections::{HashSet, hash_map, HashMap};
 use std::sync::{Arc,Mutex};
 use std::sync::atomic;
 use std::io::Cursor;
+use bitcoin::bech32::u5;
 
 const MAX_FEE: u32 = 10_000;
 struct FuzzEstimator {
@@ -99,8 +99,8 @@ impl Writer for VecWriter {
 struct TestChainMonitor {
        pub logger: Arc<dyn Logger>,
        pub keys: Arc<KeyProvider>,
+       pub persister: Arc<TestPersister>,
        pub chain_monitor: Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
-       pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
        // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
        // logic will automatically force-close our channels for us (as we don't have an up-to-date
        // monitor implying we are not able to punish misbehaving counterparties). Because this test
@@ -112,28 +112,27 @@ struct TestChainMonitor {
 impl TestChainMonitor {
        pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
                Self {
-                       chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest, persister)),
+                       chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest, Arc::clone(&persister))),
                        logger,
                        keys,
-                       update_ret: Mutex::new(Ok(())),
+                       persister,
                        latest_monitors: Mutex::new(HashMap::new()),
                        should_update_manager: atomic::AtomicBool::new(false),
                }
        }
 }
 impl chain::Watch<EnforcingSigner> for TestChainMonitor {
-       fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
+       fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
                let mut ser = VecWriter(Vec::new());
                monitor.write(&mut ser).unwrap();
                if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
                        panic!("Already had monitor pre-watch_channel");
                }
                self.should_update_manager.store(true, atomic::Ordering::Relaxed);
-               assert!(self.chain_monitor.watch_channel(funding_txo, monitor).is_ok());
-               self.update_ret.lock().unwrap().clone()
+               self.chain_monitor.watch_channel(funding_txo, monitor)
        }
 
-       fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
+       fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), chain::ChannelMonitorUpdateErr> {
                let mut map_lock = self.latest_monitors.lock().unwrap();
                let mut map_entry = match map_lock.entry(funding_txo) {
                        hash_map::Entry::Occupied(entry) => entry,
@@ -146,8 +145,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
                deserialized_monitor.write(&mut ser).unwrap();
                map_entry.insert((update.update_id, ser.0));
                self.should_update_manager.store(true, atomic::Ordering::Relaxed);
-               assert!(self.chain_monitor.update_channel(funding_txo, update).is_ok());
-               self.update_ret.lock().unwrap().clone()
+               self.chain_monitor.update_channel(funding_txo, update)
        }
 
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
@@ -163,8 +161,12 @@ struct KeyProvider {
 impl KeysInterface for KeyProvider {
        type Signer = EnforcingSigner;
 
-       fn get_node_secret(&self) -> SecretKey {
-               SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, self.node_id]).unwrap()
+       fn get_node_secret(&self, _recipient: Recipient) -> Result<SecretKey, ()> {
+               Ok(SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, self.node_id]).unwrap())
+       }
+
+       fn get_inbound_payment_key_material(&self) -> KeyMaterial {
+               KeyMaterial([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, self.node_id])
        }
 
        fn get_destination_script(&self) -> Script {
@@ -186,6 +188,7 @@ impl KeysInterface for KeyProvider {
                let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed);
                let keys = InMemorySigner::new(
                        &secp_ctx,
+                       self.get_node_secret(Recipient::Node).unwrap(),
                        SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, self.node_id]).unwrap(),
                        SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, self.node_id]).unwrap(),
                        SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, self.node_id]).unwrap(),
@@ -209,7 +212,7 @@ impl KeysInterface for KeyProvider {
        fn read_chan_signer(&self, buffer: &[u8]) -> Result<Self::Signer, DecodeError> {
                let mut reader = std::io::Cursor::new(buffer);
 
-               let inner: InMemorySigner = Readable::read(&mut reader)?;
+               let inner: InMemorySigner = ReadableArgs::read(&mut reader, self.get_node_secret(Recipient::Node).unwrap())?;
                let state = self.make_enforcement_state_cell(inner.commitment_seed);
 
                Ok(EnforcingSigner {
@@ -219,7 +222,7 @@ impl KeysInterface for KeyProvider {
                })
        }
 
-       fn sign_invoice(&self, _invoice_preimage: Vec<u8>) -> Result<RecoverableSignature, ()> {
+       fn sign_invoice(&self, _hrp_bytes: &[u8], _invoice_data: &[u5], _recipient: Recipient) -> Result<RecoverableSignature, ()> {
                unreachable!()
        }
 }
@@ -273,8 +276,8 @@ fn check_payment_err(send_err: PaymentSendFailure) {
                PaymentSendFailure::AllFailedRetrySafe(per_path_results) => {
                        for api_err in per_path_results { check_api_err(api_err); }
                },
-               PaymentSendFailure::PartialFailure(per_path_results) => {
-                       for res in per_path_results { if let Err(api_err) = res { check_api_err(api_err); } }
+               PaymentSendFailure::PartialFailure { results, .. } => {
+                       for res in results { if let Err(api_err) = res { check_api_err(api_err); } }
                },
        }
 }
@@ -286,7 +289,7 @@ fn get_payment_secret_hash(dest: &ChanMan, payment_id: &mut u8) -> Option<(Payme
        let mut payment_hash;
        for _ in 0..256 {
                payment_hash = PaymentHash(Sha256::hash(&[*payment_id; 1]).into_inner());
-               if let Ok(payment_secret) = dest.create_inbound_payment_for_hash(payment_hash, None, 3600, 0) {
+               if let Ok(payment_secret) = dest.create_inbound_payment_for_hash(payment_hash, None, 3600) {
                        return Some((payment_secret, payment_hash));
                }
                *payment_id = payment_id.wrapping_add(1);
@@ -307,6 +310,7 @@ fn send_payment(source: &ChanMan, dest: &ChanMan, dest_chan_id: u64, amt: u64, p
                        fee_msat: amt,
                        cltv_expiry_delta: 200,
                }]],
+               payment_params: None,
        }, payment_hash, &Some(payment_secret)) {
                check_payment_err(err);
                false
@@ -332,6 +336,7 @@ fn send_hop_payment(source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, des
                        fee_msat: amt,
                        cltv_expiry_delta: 200,
                }]],
+               payment_params: None,
        }, payment_hash, &Some(payment_secret)) {
                check_payment_err(err);
                false
@@ -339,14 +344,16 @@ fn send_hop_payment(source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, des
 }
 
 #[inline]
-pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
+pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
+       let out = SearchingOutput::new(underlying_out);
        let broadcast = Arc::new(TestBroadcaster{});
 
        macro_rules! make_node {
                ($node_id: expr, $fee_estimator: expr) => { {
                        let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
                        let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(HashMap::new()) });
-                       let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(), Arc::new(TestPersister{}), Arc::clone(&keys_manager)));
+                       let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
+                               Arc::new(TestPersister { update_ret: Mutex::new(Ok(())) }), Arc::clone(&keys_manager)));
 
                        let mut config = UserConfig::default();
                        config.channel_options.forwarding_fee_proportional_millionths = 0;
@@ -365,7 +372,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                ($ser: expr, $node_id: expr, $old_monitors: expr, $keys_manager: expr, $fee_estimator: expr) => { {
                    let keys_manager = Arc::clone(& $keys_manager);
                        let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
-                       let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(), Arc::new(TestPersister{}), Arc::clone(& $keys_manager)));
+                       let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
+                               Arc::new(TestPersister { update_ret: Mutex::new(Ok(())) }), Arc::clone(& $keys_manager)));
 
                        let mut config = UserConfig::default();
                        config.channel_options.forwarding_fee_proportional_millionths = 0;
@@ -403,8 +411,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
        let mut channel_txn = Vec::new();
        macro_rules! make_channel {
                ($source: expr, $dest: expr, $chan_id: expr) => { {
-                       $source.peer_connected(&$dest.get_our_node_id(), &Init { features: InitFeatures::known() });
-                       $dest.peer_connected(&$source.get_our_node_id(), &Init { features: InitFeatures::known() });
+                       $source.peer_connected(&$dest.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                       $dest.peer_connected(&$source.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
 
                        $source.create_channel($dest.get_our_node_id(), 100_000, 42, 0, None).unwrap();
                        let open_channel = {
@@ -732,7 +740,11 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                                        // force-close which we should detect as an error).
                                                        assert_eq!(msg.contents.flags & 2, 0);
                                                },
-                                               _ => panic!("Unhandled message event {:?}", event),
+                                               _ => if out.may_fail.load(atomic::Ordering::Acquire) {
+                                                       return;
+                                               } else {
+                                                       panic!("Unhandled message event {:?}", event)
+                                               },
                                        }
                                        if $limit_events != ProcessMessages::AllMessages {
                                                break;
@@ -764,7 +776,11 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                                        events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
                                                                assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
                                                        },
-                                                       _ => panic!("Unhandled message event"),
+                                                       _ => if out.may_fail.load(atomic::Ordering::Acquire) {
+                                                               return;
+                                                       } else {
+                                                               panic!("Unhandled message event")
+                                                       },
                                                }
                                        }
                                        push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(0));
@@ -781,7 +797,11 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                                        events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
                                                                assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
                                                        },
-                                                       _ => panic!("Unhandled message event"),
+                                                       _ => if out.may_fail.load(atomic::Ordering::Acquire) {
+                                                               return;
+                                                       } else {
+                                                               panic!("Unhandled message event")
+                                                       },
                                                }
                                        }
                                        push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(2));
@@ -827,12 +847,17 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                                        }
                                                },
                                                events::Event::PaymentSent { .. } => {},
+                                               events::Event::PaymentPathSuccessful { .. } => {},
                                                events::Event::PaymentPathFailed { .. } => {},
                                                events::Event::PaymentForwarded { .. } if $node == 1 => {},
                                                events::Event::PendingHTLCsForwardable { .. } => {
                                                        nodes[$node].process_pending_htlc_forwards();
                                                },
-                                               _ => panic!("Unhandled event"),
+                                               _ => if out.may_fail.load(atomic::Ordering::Acquire) {
+                                                       return;
+                                               } else {
+                                                       panic!("Unhandled event")
+                                               },
                                        }
                                }
                                had_events
@@ -846,31 +871,35 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                        // bit-twiddling mutations to have similar effects. This is probably overkill, but no
                        // harm in doing so.
 
-                       0x00 => *monitor_a.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
-                       0x01 => *monitor_b.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
-                       0x02 => *monitor_c.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
-                       0x04 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
-                       0x05 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
-                       0x06 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
+                       0x00 => *monitor_a.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
+                       0x01 => *monitor_b.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
+                       0x02 => *monitor_c.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
+                       0x04 => *monitor_a.persister.update_ret.lock().unwrap() = Ok(()),
+                       0x05 => *monitor_b.persister.update_ret.lock().unwrap() = Ok(()),
+                       0x06 => *monitor_c.persister.update_ret.lock().unwrap() = Ok(()),
 
                        0x08 => {
                                if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[0].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[0].process_monitor_events();
                                }
                        },
                        0x09 => {
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                        },
                        0x0a => {
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                        },
                        0x0b => {
                                if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[2].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[2].process_monitor_events();
                                }
                        },
 
@@ -892,15 +921,15 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                        },
                        0x0e => {
                                if chan_a_disconnected {
-                                       nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known() });
-                                       nodes[1].peer_connected(&nodes[0].get_our_node_id(), &Init { features: InitFeatures::known() });
+                                       nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                                       nodes[1].peer_connected(&nodes[0].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
                                        chan_a_disconnected = false;
                                }
                        },
                        0x0f => {
                                if chan_b_disconnected {
-                                       nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init { features: InitFeatures::known() });
-                                       nodes[2].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known() });
+                                       nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                                       nodes[2].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
                                        chan_b_disconnected = false;
                                }
                        },
@@ -1071,33 +1100,37 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                // Test that no channel is in a stuck state where neither party can send funds even
                                // after we resolve all pending events.
                                // First make sure there are no pending monitor updates, resetting the error state
-                               // and calling channel_monitor_updated for each monitor.
-                               *monitor_a.update_ret.lock().unwrap() = Ok(());
-                               *monitor_b.update_ret.lock().unwrap() = Ok(());
-                               *monitor_c.update_ret.lock().unwrap() = Ok(());
+                               // and calling force_channel_monitor_updated for each monitor.
+                               *monitor_a.persister.update_ret.lock().unwrap() = Ok(());
+                               *monitor_b.persister.update_ret.lock().unwrap() = Ok(());
+                               *monitor_c.persister.update_ret.lock().unwrap() = Ok(());
 
                                if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[0].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[0].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[2].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[2].process_monitor_events();
                                }
 
                                // Next, make sure peers are all connected to each other
                                if chan_a_disconnected {
-                                       nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known() });
-                                       nodes[1].peer_connected(&nodes[0].get_our_node_id(), &Init { features: InitFeatures::known() });
+                                       nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                                       nodes[1].peer_connected(&nodes[0].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
                                        chan_a_disconnected = false;
                                }
                                if chan_b_disconnected {
-                                       nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init { features: InitFeatures::known() });
-                                       nodes[2].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known() });
+                                       nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                                       nodes[2].peer_connected(&nodes[1].get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
                                        chan_b_disconnected = false;
                                }
 
@@ -1115,7 +1148,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                        break;
                                }
 
-                               // Finally, make sure that at least one end of each channel can make a substantial payment.
+                               // Finally, make sure that at least one end of each channel can make a substantial payment
                                assert!(
                                        send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut payment_id) ||
                                        send_payment(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut payment_id));
@@ -1142,7 +1175,29 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
        }
 }
 
-pub fn chanmon_consistency_test<Out: test_logger::Output>(data: &[u8], out: Out) {
+/// We actually have different behavior based on if a certain log string has been seen, so we have
+/// to do a bit more tracking.
+#[derive(Clone)]
+struct SearchingOutput<O: Output> {
+       output: O,
+       may_fail: Arc<atomic::AtomicBool>,
+}
+impl<O: Output> Output for SearchingOutput<O> {
+       fn locked_write(&self, data: &[u8]) {
+               // We hit a design limitation of LN state machine (see CONCURRENT_INBOUND_HTLC_FEE_BUFFER)
+               if std::str::from_utf8(data).unwrap().contains("Outbound update_fee HTLC buffer overflow - counterparty should force-close this channel") {
+                       self.may_fail.store(true, atomic::Ordering::Release);
+               }
+               self.output.locked_write(data)
+       }
+}
+impl<O: Output> SearchingOutput<O> {
+       pub fn new(output: O) -> Self {
+               Self { output, may_fail: Arc::new(atomic::AtomicBool::new(false)) }
+       }
+}
+
+pub fn chanmon_consistency_test<Out: Output>(data: &[u8], out: Out) {
        do_test(data, out);
 }