From 3e26bd7a1dfb780d4534304f7e48f66a7ae1a9ea Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 11 Feb 2020 18:34:29 -0500 Subject: [PATCH] Rm ChannelMonitor merge capabilities in favor of explicit add/update This removes the ability to merge ChannelMonitors in favor of explicit ChannelMonitorUpdates. It further removes ChannelManager::test_restore_channel_monitor in favor of the new ChannelManager::channel_monitor_updated method, which explicitly confirms a set of updates instead of providing the latest copy of each ChannelMonitor to the user. This removes almost all need for Channels to have the latest channel_monitor, except for broadcasting the latest local state. --- fuzz/src/chanmon_consistency.rs | 140 ++++++++--------- lightning/src/ln/chanmon_update_fail_tests.rs | 141 ++++++++++-------- lightning/src/ln/channelmanager.rs | 121 +-------------- lightning/src/ln/channelmonitor.rs | 93 ++---------- lightning/src/ln/functional_test_utils.rs | 2 +- lightning/src/ln/functional_tests.rs | 8 +- lightning/src/util/errors.rs | 2 +- lightning/src/util/test_utils.rs | 8 +- 8 files changed, 182 insertions(+), 333 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 02de54595..dc811c3ba 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -73,56 +73,55 @@ impl Writer for VecWriter { } } -static mut IN_RESTORE: bool = false; pub struct TestChannelMonitor { + pub logger: Arc, pub simple_monitor: Arc>>, pub update_ret: Mutex>, - pub latest_good_update: Mutex>>, - pub latest_update_good: Mutex>, - pub latest_updates_good_at_last_ser: Mutex>, + // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization + // logic will automatically force-close our channels for us (as we don't have an up-to-date + // monitor implying we are not able to punish misbehaving counterparties). Because this test + // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest + // fully-serialized monitor state here, as well as the corresponding update_id. + pub latest_monitors: Mutex)>>, pub should_update_manager: atomic::AtomicBool, } impl TestChannelMonitor { pub fn new(chain_monitor: Arc, broadcaster: Arc, logger: Arc, feeest: Arc) -> Self { Self { - simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)), + simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)), + logger, update_ret: Mutex::new(Ok(())), - latest_good_update: Mutex::new(HashMap::new()), - latest_update_good: Mutex::new(HashMap::new()), - latest_updates_good_at_last_ser: Mutex::new(HashMap::new()), + latest_monitors: Mutex::new(HashMap::new()), should_update_manager: atomic::AtomicBool::new(false), } } } impl channelmonitor::ManyChannelMonitor for TestChannelMonitor { - fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { - let ret = self.update_ret.lock().unwrap().clone(); - if let Ok(()) = ret { - let mut ser = VecWriter(Vec::new()); - monitor.write_for_disk(&mut ser).unwrap(); - self.latest_good_update.lock().unwrap().insert(funding_txo, ser.0); - match self.latest_update_good.lock().unwrap().entry(funding_txo) { - hash_map::Entry::Vacant(e) => { e.insert(true); }, - hash_map::Entry::Occupied(mut e) => { - if !e.get() && unsafe { IN_RESTORE } { - // Technically we can't consider an update to be "good" unless we're doing - // it in response to a test_restore_channel_monitor as the channel may - // still be waiting on such a call, so only set us to good if we're in the - // middle of a restore call. - e.insert(true); - } - }, - } - self.should_update_manager.store(true, atomic::Ordering::Relaxed); - } else { - self.latest_update_good.lock().unwrap().insert(funding_txo, false); + fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + let mut ser = VecWriter(Vec::new()); + monitor.write_for_disk(&mut ser).unwrap(); + if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) { + panic!("Already had monitor pre-add_monitor"); } - assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok()); - ret + self.should_update_manager.store(true, atomic::Ordering::Relaxed); + assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok()); + self.update_ret.lock().unwrap().clone() } fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { - unimplemented!(); //TODO + let mut map_lock = self.latest_monitors.lock().unwrap(); + let mut map_entry = match map_lock.entry(funding_txo) { + hash_map::Entry::Occupied(entry) => entry, + hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"), + }; + let mut deserialized_monitor = <(Sha256d, channelmonitor::ChannelMonitor)>:: + read(&mut Cursor::new(&map_entry.get().1), Arc::clone(&self.logger)).unwrap().1; + deserialized_monitor.update_monitor(update.clone()).unwrap(); + let mut ser = VecWriter(Vec::new()); + deserialized_monitor.write_for_disk(&mut ser).unwrap(); + map_entry.insert((update.update_id, ser.0)); + self.should_update_manager.store(true, atomic::Ordering::Relaxed); + self.update_ret.lock().unwrap().clone() } fn get_and_clear_pending_htlcs_updated(&self) -> Vec { @@ -214,10 +213,10 @@ pub fn do_test(data: &[u8]) { config.peer_channel_config_limits.min_dust_limit_satoshis = 0; let mut monitors = HashMap::new(); - let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap(); - for (outpoint, monitor_ser) in old_monitors.drain() { + let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap(); + for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() { monitors.insert(outpoint, <(Sha256d, ChannelMonitor)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1); - monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser); + monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser)); } let mut monitor_refs = HashMap::new(); for (outpoint, monitor) in monitors.iter_mut() { @@ -234,17 +233,7 @@ pub fn do_test(data: &[u8]) { channel_monitors: &mut monitor_refs, }; - let res = (<(Sha256d, ChannelManager, Arc>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor); - for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() { - if !was_good { - // If the last time we updated a monitor we didn't successfully update (and we - // have sense updated our serialized copy of the ChannelManager) we may - // force-close the channel on our counterparty cause we know we're missing - // something. Thus, we just return here since we can't continue to test. - return; - } - } - res + (<(Sha256d, ChannelManager, Arc>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor) } } } @@ -270,6 +259,7 @@ pub fn do_test(data: &[u8]) { }; $source.handle_accept_channel(&$dest.get_our_node_id(), InitFeatures::supported(), &accept_channel); + let funding_output; { let events = $source.get_and_clear_pending_events(); assert_eq!(events.len(), 1); @@ -277,7 +267,7 @@ pub fn do_test(data: &[u8]) { let tx = Transaction { version: $chan_id, lock_time: 0, input: Vec::new(), output: vec![TxOut { value: *channel_value_satoshis, script_pubkey: output_script.clone(), }]}; - let funding_output = OutPoint::new(tx.txid(), 0); + funding_output = OutPoint::new(tx.txid(), 0); $source.funding_transaction_generated(&temporary_channel_id, funding_output); channel_txn.push(tx); } else { panic!("Wrong event type"); } @@ -307,6 +297,7 @@ pub fn do_test(data: &[u8]) { if let events::Event::FundingBroadcastSafe { .. } = events[0] { } else { panic!("Wrong event type"); } } + funding_output } } } @@ -363,8 +354,8 @@ pub fn do_test(data: &[u8]) { let mut nodes = [node_a, node_b, node_c]; - make_channel!(nodes[0], nodes[1], 0); - make_channel!(nodes[1], nodes[2], 1); + let chan_1_funding = make_channel!(nodes[0], nodes[1], 0); + let chan_2_funding = make_channel!(nodes[1], nodes[2], 1); for node in nodes.iter() { confirm_txn!(node); @@ -635,9 +626,26 @@ pub fn do_test(data: &[u8]) { 0x03 => *monitor_a.update_ret.lock().unwrap() = Ok(()), 0x04 => *monitor_b.update_ret.lock().unwrap() = Ok(()), 0x05 => *monitor_c.update_ret.lock().unwrap() = Ok(()), - 0x06 => { unsafe { IN_RESTORE = true }; nodes[0].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; }, - 0x07 => { unsafe { IN_RESTORE = true }; nodes[1].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; }, - 0x08 => { unsafe { IN_RESTORE = true }; nodes[2].test_restore_channel_monitor(); unsafe { IN_RESTORE = false }; }, + 0x06 => { + if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { + nodes[0].channel_monitor_updated(&chan_1_funding, *id); + } + }, + 0x07 => { + if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { + nodes[1].channel_monitor_updated(&chan_1_funding, *id); + } + }, + 0x24 => { + if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { + nodes[1].channel_monitor_updated(&chan_2_funding, *id); + } + }, + 0x08 => { + if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { + nodes[2].channel_monitor_updated(&chan_2_funding, *id); + } + }, 0x09 => send_payment!(nodes[0], (&nodes[1], chan_a)), 0x0a => send_payment!(nodes[1], (&nodes[0], chan_a)), 0x0b => send_payment!(nodes[1], (&nodes[2], chan_b)), @@ -726,27 +734,19 @@ pub fn do_test(data: &[u8]) { nodes[2] = node_c.clone(); monitor_c = new_monitor_c; }, + // 0x24 defined above _ => test_return!(), } - if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) { - node_a_ser.0.clear(); - nodes[0].write(&mut node_a_ser).unwrap(); - monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed); - *monitor_a.latest_updates_good_at_last_ser.lock().unwrap() = monitor_a.latest_update_good.lock().unwrap().clone(); - } - if monitor_b.should_update_manager.load(atomic::Ordering::Relaxed) { - node_b_ser.0.clear(); - nodes[1].write(&mut node_b_ser).unwrap(); - monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed); - *monitor_b.latest_updates_good_at_last_ser.lock().unwrap() = monitor_b.latest_update_good.lock().unwrap().clone(); - } - if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) { - node_c_ser.0.clear(); - nodes[2].write(&mut node_c_ser).unwrap(); - monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed); - *monitor_c.latest_updates_good_at_last_ser.lock().unwrap() = monitor_c.latest_update_good.lock().unwrap().clone(); - } + node_a_ser.0.clear(); + nodes[0].write(&mut node_a_ser).unwrap(); + monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed); + node_b_ser.0.clear(); + nodes[1].write(&mut node_b_ser).unwrap(); + monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed); + node_c_ser.0.clear(); + nodes[2].write(&mut node_c_ser).unwrap(); + monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed); } } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 1437d91bf..562b21524 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3,6 +3,7 @@ //! There are a bunch of these as their handling is relatively error-prone so they are split out //! here. See also the chanmon_fail_consistency fuzz test. +use chain::transaction::OutPoint; use ln::channelmanager::{RAACommitmentOrder, PaymentPreimage, PaymentHash}; use ln::channelmonitor::ChannelMonitorUpdateErr; use ln::features::InitFeatures; @@ -56,7 +57,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(&nodes[0]); @@ -76,8 +77,9 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { } *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); @@ -116,10 +118,9 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); } - // ...and make sure we can force-close a TemporaryFailure channel with a PermanentFailure - *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + // ...and make sure we can force-close a frozen channel + nodes[0].node.force_close_channel(&channel_id); + check_added_monitors!(nodes[0], 0); check_closed_broadcast!(nodes[0], false); // TODO: Once we hit the chain with the failure transaction we should check that we get a @@ -158,7 +159,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); @@ -218,8 +219,9 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); macro_rules! disconnect_reconnect_peers { () => { { nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); @@ -488,7 +490,7 @@ fn test_monitor_update_fail_cs() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); let (payment_preimage, our_payment_hash) = get_payment_preimage_hash!(nodes[0]); @@ -506,8 +508,9 @@ fn test_monitor_update_fail_cs() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(responses.len(), 2); @@ -539,8 +542,9 @@ fn test_monitor_update_fail_cs() { } *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &final_raa); @@ -564,13 +568,13 @@ fn test_monitor_update_fail_cs() { #[test] fn test_monitor_update_fail_no_rebroadcast() { // Tests handling of a monitor update failure when no message rebroadcasting on - // test_restore_channel_monitor() is required. Backported from - // chanmon_fail_consistency fuzz tests. + // channel_monitor_updated() is required. Backported from chanmon_fail_consistency + // fuzz tests. let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); let (payment_preimage_1, our_payment_hash) = get_payment_preimage_hash!(nodes[0]); @@ -590,9 +594,10 @@ fn test_monitor_update_fail_no_rebroadcast() { check_added_monitors!(nodes[1], 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); - check_added_monitors!(nodes[1], 1); + check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); let events = nodes[1].node.get_and_clear_pending_events(); @@ -615,7 +620,7 @@ fn test_monitor_update_raa_while_paused() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; send_payment(&nodes[0], &[&nodes[1]], 5000000, 5_000_000); @@ -649,8 +654,9 @@ fn test_monitor_update_raa_while_paused() { check_added_monitors!(nodes[0], 1); *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_update_raa.0); @@ -803,8 +809,9 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // Restore monitor updating, ensuring we immediately get a fail-back update and a // update_add update. *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); check_added_monitors!(nodes[1], 1); @@ -942,7 +949,7 @@ fn test_monitor_update_fail_reestablish() { let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported()); let (our_payment_preimage, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1000000); @@ -993,8 +1000,9 @@ fn test_monitor_update_fail_reestablish() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); assert!(updates.update_add_htlcs.is_empty()); @@ -1023,7 +1031,7 @@ fn raa_no_response_awaiting_raa_state() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]); @@ -1074,9 +1082,10 @@ fn raa_no_response_awaiting_raa_state() { check_added_monitors!(nodes[1], 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); // nodes[1] should be AwaitingRAA here! - check_added_monitors!(nodes[1], 1); + check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); expect_pending_htlcs_forwardable!(nodes[1]); expect_payment_received!(nodes[1], payment_hash_1, 1000000); @@ -1139,7 +1148,7 @@ fn claim_while_disconnected_monitor_update_fail() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; // Forward a payment for B to claim let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); @@ -1183,13 +1192,14 @@ fn claim_while_disconnected_monitor_update_fail() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Previous monitor update failure prevented generation of RAA".to_string(), 1); // Note that nodes[1] not updating monitor here is OK - it wont take action on the new HTLC - // until we've test_restore_channel_monitor'd and updated for the new commitment transaction. + // until we've channel_monitor_update'd and updated for the new commitment transaction. // Now un-fail the monitor, which will result in B sending its original commitment update, // receiving the commitment update from A, and the resulting commitment dances. *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(bs_msgs.len(), 2); @@ -1258,7 +1268,7 @@ fn monitor_failed_no_reestablish_response() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; // Route the payment and deliver the initial commitment_signed (with a monitor update failure // on receipt). @@ -1292,8 +1302,9 @@ fn monitor_failed_no_reestablish_response() { nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bs_reconnect); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_responses.0); @@ -1327,7 +1338,7 @@ fn first_message_on_recv_ordering() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; // Route the first payment outbound, holding the last RAA for B until we are set up so that we // can deliver it and fail the monitor update. @@ -1373,8 +1384,8 @@ fn first_message_on_recv_ordering() { check_added_monitors!(nodes[1], 1); // Now deliver the update_add_htlc/commitment_signed for the second payment, which does need an - // RAA/CS response, which should be generated when we call test_restore_channel_monitor (with - // the appropriate HTLC acceptance). + // RAA/CS response, which should be generated when we call channel_monitor_update (with the + // appropriate HTLC acceptance). nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]); nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg); check_added_monitors!(nodes[1], 1); @@ -1382,8 +1393,9 @@ fn first_message_on_recv_ordering() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Previous monitor update failure prevented generation of RAA".to_string(), 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); expect_payment_received!(nodes[1], payment_hash_1, 1000000); @@ -1434,7 +1446,7 @@ fn test_monitor_update_fail_claim() { check_added_monitors!(nodes[2], 1); // Successfully update the monitor on the 1<->2 channel, but the 0<->1 channel should still be - // paused, so forward shouldn't succeed until we call test_restore_channel_monitor(). + // paused, so forward shouldn't succeed until we call channel_monitor_updated(). *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); let mut events = nodes[2].node.get_and_clear_pending_msg_events(); @@ -1468,8 +1480,9 @@ fn test_monitor_update_fail_claim() { } else { panic!("Unexpected event!"); } // Now restore monitor updating on the 0<->1 channel and claim the funds on B. - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_fulfill_update.update_fulfill_htlcs[0]); @@ -1492,7 +1505,7 @@ fn test_monitor_update_on_pending_forwards() { let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::supported(), InitFeatures::supported()); // Rebalance a bit so that we can send backwards from 3 to 1. @@ -1526,8 +1539,9 @@ fn test_monitor_update_on_pending_forwards() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); nodes[0].node.handle_update_fail_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fail_htlcs[0]); @@ -1560,7 +1574,7 @@ fn monitor_update_claim_fail_no_response() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()); + let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::supported(), InitFeatures::supported()).2; // Forward a payment for B to claim let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); @@ -1585,8 +1599,9 @@ fn monitor_update_claim_fail_no_response() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_raa); @@ -1636,14 +1651,17 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails: check_added_monitors!(nodes[0], 1); *nodes[1].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure); - nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id())); + let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()); + let channel_id = OutPoint { txid: funding_created_msg.funding_txid, index: funding_created_msg.funding_output_index }.to_channel_id(); + nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg); check_added_monitors!(nodes[1], 1); if restore_between_fails { assert!(fail_on_generate); *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[0], 1); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); } @@ -1666,11 +1684,13 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails: } assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[0].node.test_restore_channel_monitor(); + let (outpoint, latest_update) = nodes[0].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[0], 0); + } else { + check_added_monitors!(nodes[0], 1); } - check_added_monitors!(nodes[0], 1); - let events = nodes[0].node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); match events[0] { @@ -1704,8 +1724,9 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails: } *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(()); - nodes[1].node.test_restore_channel_monitor(); - check_added_monitors!(nodes[1], 1); + let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + check_added_monitors!(nodes[1], 0); let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first { nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f7aa487ce..3e4d3df9a 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -316,7 +316,7 @@ pub type SimpleRefChannelManager<'a, 'b, M, T> = ChannelManager ChannelManager { { @@ -1885,117 +1885,6 @@ impl ChannelManager { - // TODO: There may be some pending HTLCs that we intended to fail - // backwards when a monitor update failed. We should make sure - // knowledge of those gets moved into the appropriate in-memory - // ChannelMonitor and they get failed backwards once we get - // on-chain confirmations. - // Note I think #198 addresses this, so once it's merged a test - // should be written. - if let Some(short_id) = channel.get_short_channel_id() { - short_to_id.remove(&short_id); - } - close_results.push(channel.force_shutdown()); - if let Ok(update) = self.get_channel_update(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - false - }, - ChannelMonitorUpdateErr::TemporaryFailure => true, - } - } else { - let (raa, commitment_update, order, pending_forwards, mut pending_failures, needs_broadcast_safe, funding_locked) = channel.monitor_updating_restored(); - if !pending_forwards.is_empty() { - htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), pending_forwards)); - } - htlc_failures.append(&mut pending_failures); - - macro_rules! handle_cs { () => { - if let Some(update) = commitment_update { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: channel.get_their_node_id(), - updates: update, - }); - } - } } - macro_rules! handle_raa { () => { - if let Some(revoke_and_ack) = raa { - pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: channel.get_their_node_id(), - msg: revoke_and_ack, - }); - } - } } - match order { - RAACommitmentOrder::CommitmentFirst => { - handle_cs!(); - handle_raa!(); - }, - RAACommitmentOrder::RevokeAndACKFirst => { - handle_raa!(); - handle_cs!(); - }, - } - if needs_broadcast_safe { - pending_events.push(events::Event::FundingBroadcastSafe { - funding_txo: channel.get_funding_txo().unwrap(), - user_channel_id: channel.get_user_id(), - }); - } - if let Some(msg) = funding_locked { - pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_their_node_id(), - msg, - }); - if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: channel.get_their_node_id(), - msg: announcement_sigs, - }); - } - short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); - } - true - } - } else { true } - }); - } - - self.pending_events.lock().unwrap().append(&mut pending_events); - - for failure in htlc_failures.drain(..) { - self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); - } - self.forward_htlcs(&mut htlc_forwards[..]); - - for res in close_results.drain(..) { - self.finish_force_close_channel(res); - } - } - fn internal_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash", msg.temporary_channel_id.clone())); @@ -2058,8 +1947,8 @@ impl ChannelManager { // Note that we reply with the new channel_id in error messages if we gave up on the diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 42566d1ea..113e007ab 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -131,9 +131,9 @@ pub enum ChannelMonitorUpdateErr { } /// General Err type for ChannelMonitor actions. Generally, this implies that the data provided is -/// inconsistent with the ChannelMonitor being called. eg for ChannelMonitor::insert_combine this -/// means you tried to merge two monitors for different channels or for a channel which was -/// restored from a backup and then generated new commitment updates. +/// inconsistent with the ChannelMonitor being called. eg for ChannelMonitor::update_monitor this +/// means you tried to update a monitor for a different channel or the ChannelMonitorUpdate was +/// corrupted. /// Contains a human-readable error message. #[derive(Debug)] pub struct MonitorUpdateError(pub &'static str); @@ -150,7 +150,7 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between /// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing -/// events to it, while also taking any add_update_monitor events and passing them to some remote +/// events to it, while also taking any add/update_monitor events and passing them to some remote /// server(s). /// /// Note that any updates to a channel's monitor *must* be applied to each instance of the @@ -164,7 +164,7 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// BlockNotifier and call the BlockNotifier's `block_(dis)connected` methods, which will notify /// all registered listeners in one go. pub trait ManyChannelMonitor: Send + Sync { - /// Adds or updates a monitor for the given `funding_txo`. + /// Adds a monitor for the given `funding_txo`. /// /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected @@ -176,7 +176,7 @@ pub trait ManyChannelMonitor: Send + Sync { /// /// Any spends of outputs which should have been registered which aren't passed to /// ChannelMonitors via block_connected may result in FUNDS LOSS. - fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Updates a monitor for the given `funding_txo`. /// @@ -279,14 +279,11 @@ impl) -> Result<(), MonitorUpdateError> { + pub fn add_monitor_by_key(&self, key: Key, monitor: ChannelMonitor) -> Result<(), MonitorUpdateError> { let mut monitors = self.monitors.lock().unwrap(); - match monitors.get_mut(&key) { - Some(orig_monitor) => { - log_trace!(self, "Updating Channel Monitor for channel {}", log_funding_info!(monitor.key_storage)); - return orig_monitor.insert_combine(monitor); - }, - None => {} + let entry = match monitors.entry(key) { + hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")), + hash_map::Entry::Vacant(e) => e, }; match monitor.key_storage { Storage::Local { ref funding_info, .. } => { @@ -310,7 +307,7 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor where T::Target: BroadcasterInterface { - fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - match self.add_update_monitor_by_key(funding_txo, monitor) { + fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + match self.add_monitor_by_key(funding_txo, monitor) { Ok(_) => Ok(()), Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), } @@ -879,7 +876,7 @@ pub struct ChannelMonitor { // We simply modify last_block_hash in Channel's block_connected so that serialization is // consistent but hopefully the users' copy handles block_connected in a consistent way. - // (we do *not*, however, update them in insert_combine to ensure any local user copies keep + // (we do *not*, however, update them in update_monitor to ensure any local user copies keep // their last_block_hash from its state and not based on updated copies that didn't run through // the full block_connected). pub(crate) last_block_hash: Sha256dHash, @@ -1509,68 +1506,6 @@ impl ChannelMonitor { Ok(()) } - /// Combines this ChannelMonitor with the information contained in the other ChannelMonitor. - /// After a successful call this ChannelMonitor is up-to-date and is safe to use to monitor the - /// chain for new blocks/transactions. - pub fn insert_combine(&mut self, mut other: ChannelMonitor) -> Result<(), MonitorUpdateError> { - match self.key_storage { - Storage::Local { ref funding_info, .. } => { - if funding_info.is_none() { return Err(MonitorUpdateError("Try to combine a Local monitor without funding_info")); } - let our_funding_info = funding_info; - if let Storage::Local { ref funding_info, .. } = other.key_storage { - if funding_info.is_none() { return Err(MonitorUpdateError("Try to combine a Local monitor without funding_info")); } - // We should be able to compare the entire funding_txo, but in fuzztarget it's trivially - // easy to collide the funding_txo hash and have a different scriptPubKey. - if funding_info.as_ref().unwrap().0 != our_funding_info.as_ref().unwrap().0 { - return Err(MonitorUpdateError("Funding transaction outputs are not identical!")); - } - } else { - return Err(MonitorUpdateError("Try to combine a Local monitor with a Watchtower one !")); - } - }, - Storage::Watchtower { .. } => { - if let Storage::Watchtower { .. } = other.key_storage { - unimplemented!(); - } else { - return Err(MonitorUpdateError("Try to combine a Watchtower monitor with a Local one !")); - } - }, - } - let other_min_secret = other.get_min_seen_secret(); - let our_min_secret = self.get_min_seen_secret(); - if our_min_secret > other_min_secret { - self.provide_secret(other_min_secret, other.get_secret(other_min_secret).unwrap())?; - } - if let Some(ref local_tx) = self.current_local_signed_commitment_tx { - if let Some(ref other_local_tx) = other.current_local_signed_commitment_tx { - let our_commitment_number = 0xffffffffffff - ((((local_tx.tx.without_valid_witness().input[0].sequence as u64 & 0xffffff) << 3*8) | (local_tx.tx.without_valid_witness().lock_time as u64 & 0xffffff)) ^ self.commitment_transaction_number_obscure_factor); - let other_commitment_number = 0xffffffffffff - ((((other_local_tx.tx.without_valid_witness().input[0].sequence as u64 & 0xffffff) << 3*8) | (other_local_tx.tx.without_valid_witness().lock_time as u64 & 0xffffff)) ^ other.commitment_transaction_number_obscure_factor); - if our_commitment_number >= other_commitment_number { - self.key_storage = other.key_storage; - } - } - } - // TODO: We should use current_remote_commitment_number and the commitment number out of - // local transactions to decide how to merge - if our_min_secret >= other_min_secret { - self.their_cur_revocation_points = other.their_cur_revocation_points; - for (txid, htlcs) in other.remote_claimable_outpoints.drain() { - self.remote_claimable_outpoints.insert(txid, htlcs); - } - if let Some(local_tx) = other.prev_local_signed_commitment_tx { - self.prev_local_signed_commitment_tx = Some(local_tx); - } - if let Some(local_tx) = other.current_local_signed_commitment_tx { - self.current_local_signed_commitment_tx = Some(local_tx); - } - self.payment_preimages = other.payment_preimages; - self.to_remote_rescue = other.to_remote_rescue; - } - - self.current_remote_commitment_number = cmp::min(self.current_remote_commitment_number, other.current_remote_commitment_number); - Ok(()) - } - /// Gets the update_id from the latest ChannelMonitorUpdate which was applied to this /// ChannelMonitor. pub fn get_latest_update_id(&self) -> u64 { diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index eda7e0f3a..2c1426181 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -134,7 +134,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let chain_watch = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&self.logger) as Arc)); let channel_monitor = test_utils::TestChannelMonitor::new(chain_watch.clone(), self.tx_broadcaster.clone(), self.logger.clone(), feeest); for deserialized_monitor in deserialized_monitors.drain(..) { - if let Err(_) = channel_monitor.add_update_monitor(deserialized_monitor.get_funding_txo().unwrap(), deserialized_monitor) { + if let Err(_) = channel_monitor.add_monitor(deserialized_monitor.get_funding_txo().unwrap(), deserialized_monitor) { panic!(); } } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 6b817f6d4..b9e6b5690 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -3710,7 +3710,7 @@ fn test_no_txn_manager_serialize_deserialize() { nodes_0_deserialized = nodes_0_deserialized_tmp; assert!(nodes_0_read.is_empty()); - assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); + assert!(nodes[0].chan_monitor.add_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); nodes[0].node = &nodes_0_deserialized; nodes[0].block_notifier.register_listener(nodes[0].node); assert_eq!(nodes[0].node.list_channels().len(), 1); @@ -3780,7 +3780,7 @@ fn test_simple_manager_serialize_deserialize() { nodes_0_deserialized = nodes_0_deserialized_tmp; assert!(nodes_0_read.is_empty()); - assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); + assert!(nodes[0].chan_monitor.add_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); nodes[0].node = &nodes_0_deserialized; check_added_monitors!(nodes[0], 1); @@ -3854,7 +3854,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { } for monitor in node_0_monitors.drain(..) { - assert!(nodes[0].chan_monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor).is_ok()); + assert!(nodes[0].chan_monitor.add_monitor(monitor.get_funding_txo().unwrap(), monitor).is_ok()); check_added_monitors!(nodes[0], 1); } nodes[0].node = &nodes_0_deserialized; @@ -6577,7 +6577,7 @@ fn test_data_loss_protect() { }).unwrap().1 }; nodes[0].node = &node_state_0; - assert!(monitor.add_update_monitor(OutPoint { txid: chan.3.txid(), index: 0 }, chan_monitor.clone()).is_ok()); + assert!(monitor.add_monitor(OutPoint { txid: chan.3.txid(), index: 0 }, chan_monitor.clone()).is_ok()); nodes[0].chan_monitor = &monitor; nodes[0].chain_monitor = chain_monitor; diff --git a/lightning/src/util/errors.rs b/lightning/src/util/errors.rs index 27f837775..35bf00bd8 100644 --- a/lightning/src/util/errors.rs +++ b/lightning/src/util/errors.rs @@ -33,7 +33,7 @@ pub enum APIError { /// A human-readable error message err: &'static str }, - /// An attempt to call add_update_monitor returned an Err (ie you did this!), causing the + /// An attempt to call add/update_monitor returned an Err (ie you did this!), causing the /// attempted action to fail. MonitorUpdateFailed, } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 4112208d1..a38aea2d4 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -46,6 +46,7 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestChannelMonitor<'a> { pub added_monitors: Mutex)>>, + pub latest_monitor_update_id: Mutex>, pub simple_monitor: channelmonitor::SimpleManyChannelMonitor, pub update_ret: Mutex>, } @@ -53,13 +54,14 @@ impl<'a> TestChannelMonitor<'a> { pub fn new(chain_monitor: Arc, broadcaster: &'a chaininterface::BroadcasterInterface, logger: Arc, fee_estimator: Arc) -> Self { Self { added_monitors: Mutex::new(Vec::new()), + latest_monitor_update_id: Mutex::new(HashMap::new()), simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, fee_estimator), update_ret: Mutex::new(Ok(())), } } } impl<'a> channelmonitor::ManyChannelMonitor for TestChannelMonitor<'a> { - fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... let mut w = TestVecWriter(Vec::new()); @@ -69,7 +71,8 @@ impl<'a> channelmonitor::ManyChannelMonitor for TestChanne w.0.clear(); monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash... self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone())); - assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok()); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, monitor.get_latest_update_id())); + assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok()); self.update_ret.lock().unwrap().clone() } @@ -80,6 +83,7 @@ impl<'a> channelmonitor::ManyChannelMonitor for TestChanne assert!(channelmonitor::ChannelMonitorUpdate::read( &mut ::std::io::Cursor::new(&w.0)).unwrap() == update); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, update.update_id)); assert!(self.simple_monitor.update_monitor(funding_txo, update).is_ok()); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... -- 2.39.5