Serialize ChannelManager events
authorSourabh Marathe <surja795@gmail.com>
Thu, 7 May 2020 01:02:44 +0000 (21:02 -0400)
committerSourabh Marathe <surja795@gmail.com>
Thu, 14 May 2020 21:02:54 +0000 (17:02 -0400)
Also adds a test for de/serializing events

lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs

index 5922ec2842277e6cb5179802e77717a3402acacc..09798eb9bbab22b47c752605e97c71ce7f4e75bd 100644 (file)
@@ -38,7 +38,7 @@ use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 use chain::keysinterface::{ChannelKeys, KeysInterface, KeysManager, InMemoryChannelKeys};
 use util::config::UserConfig;
 use util::{byte_utils, events};
-use util::ser::{Readable, ReadableArgs, Writeable, Writer};
+use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::Logger;
 use util::errors::APIError;
@@ -3608,6 +3608,12 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
                        peer_state.latest_features.write(writer)?;
                }
 
+               let events = self.pending_events.lock().unwrap();
+               (events.len() as u64).write(writer)?;
+               for event in events.iter() {
+                       event.write(writer)?;
+               }
+
                (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
 
                Ok(())
@@ -3754,12 +3760,13 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        }
                }
 
+               const MAX_ALLOC_SIZE: usize = 1024 * 64;
                let forward_htlcs_count: u64 = Readable::read(reader)?;
                let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
                for _ in 0..forward_htlcs_count {
                        let short_channel_id = Readable::read(reader)?;
                        let pending_forwards_count: u64 = Readable::read(reader)?;
-                       let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128));
+                       let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, MAX_ALLOC_SIZE/mem::size_of::<HTLCForwardInfo>()));
                        for _ in 0..pending_forwards_count {
                                pending_forwards.push(Readable::read(reader)?);
                        }
@@ -3771,7 +3778,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                for _ in 0..claimable_htlcs_count {
                        let payment_hash = Readable::read(reader)?;
                        let previous_hops_len: u64 = Readable::read(reader)?;
-                       let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2));
+                       let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, MAX_ALLOC_SIZE/mem::size_of::<ClaimableHTLC>()));
                        for _ in 0..previous_hops_len {
                                previous_hops.push(Readable::read(reader)?);
                        }
@@ -3779,7 +3786,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                }
 
                let peer_count: u64 = Readable::read(reader)?;
-               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, 128));
+               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState>)>()));
                for _ in 0..peer_count {
                        let peer_pubkey = Readable::read(reader)?;
                        let peer_state = PeerState {
@@ -3788,6 +3795,15 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
+               let event_count: u64 = Readable::read(reader)?;
+               let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
+               for _ in 0..event_count {
+                       match MaybeReadable::read(reader)? {
+                               Some(event) => pending_events_read.push(event),
+                               None => continue,
+                       }
+               }
+
                let last_node_announcement_serial: u32 = Readable::read(reader)?;
 
                let channel_manager = ChannelManager {
@@ -3813,7 +3829,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
 
                        per_peer_state: RwLock::new(per_peer_state),
 
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(pending_events_read),
                        total_consistency_lock: RwLock::new(()),
                        keys_manager: args.keys_manager,
                        logger: args.logger,
index 5e49050dc60d74f51c693c48f78ea9103080b408..b769b0c61b4160223bec34de4fb02d13192673ed 100644 (file)
@@ -3954,6 +3954,127 @@ fn test_no_txn_manager_serialize_deserialize() {
        send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
 }
 
+#[test]
+fn test_manager_serialize_deserialize_events() {
+       // This test makes sure the events field in ChannelManager survives de/serialization
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let fee_estimator: test_utils::TestFeeEstimator;
+       let new_chan_monitor: test_utils::TestChannelMonitor;
+       let keys_manager: test_utils::TestKeysInterface;
+       let nodes_0_deserialized: ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>;
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       // Start creating a channel, but stop right before broadcasting the event message FundingBroadcastSafe
+       let channel_value = 100000;
+       let push_msat = 10001;
+       let a_flags = InitFeatures::known();
+       let b_flags = InitFeatures::known();
+       let node_a = nodes.pop().unwrap();
+       let node_b = nodes.pop().unwrap();
+       node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42, None).unwrap();
+       node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), a_flags, &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id()));
+       node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), b_flags, &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id()));
+
+       let (temporary_channel_id, tx, funding_output) = create_funding_transaction(&node_a, channel_value, 42);
+
+       node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
+       check_added_monitors!(node_a, 0);
+
+       node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendFundingCreated, node_b.node.get_our_node_id()));
+       {
+               let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
+               assert_eq!(added_monitors.len(), 1);
+               assert_eq!(added_monitors[0].0, funding_output);
+               added_monitors.clear();
+       }
+
+       node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id()));
+       {
+               let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
+               assert_eq!(added_monitors.len(), 1);
+               assert_eq!(added_monitors[0].0, funding_output);
+               added_monitors.clear();
+       }
+       // Normally, this is where node_a would check for a FundingBroadcastSafe event, but the test de/serializes first instead
+
+       nodes.push(node_a);
+       nodes.push(node_b);
+
+       // Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized
+       let nodes_0_serialized = nodes[0].node.encode();
+       let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
+       nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap();
+
+       fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
+       new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()), &fee_estimator);
+       nodes[0].chan_monitor = &new_chan_monitor;
+       let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
+       let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap();
+       assert!(chan_0_monitor_read.is_empty());
+
+       let mut nodes_0_read = &nodes_0_serialized[..];
+       let config = UserConfig::default();
+       keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()));
+       let (_, nodes_0_deserialized_tmp) = {
+               let mut channel_monitors = HashMap::new();
+               channel_monitors.insert(chan_0_monitor.get_funding_txo(), &mut chan_0_monitor);
+               <(BlockHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+                       default_config: config,
+                       keys_manager: &keys_manager,
+                       fee_estimator: &fee_estimator,
+                       monitor: nodes[0].chan_monitor,
+                       tx_broadcaster: nodes[0].tx_broadcaster.clone(),
+                       logger: Arc::new(test_utils::TestLogger::new()),
+                       channel_monitors: &mut channel_monitors,
+               }).unwrap()
+       };
+       nodes_0_deserialized = nodes_0_deserialized_tmp;
+       assert!(nodes_0_read.is_empty());
+
+       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+       assert!(nodes[0].chan_monitor.add_monitor(chan_0_monitor.get_funding_txo(), chan_0_monitor).is_ok());
+       nodes[0].node = &nodes_0_deserialized;
+
+       // After deserializing, make sure the FundingBroadcastSafe event is still held by the channel manager
+       let events_4 = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events_4.len(), 1);
+       match events_4[0] {
+               Event::FundingBroadcastSafe { ref funding_txo, user_channel_id } => {
+                       assert_eq!(user_channel_id, 42);
+                       assert_eq!(*funding_txo, funding_output);
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       // Make sure the channel is functioning as though the de/serialization never happened
+       nodes[0].block_notifier.register_listener(nodes[0].node);
+       assert_eq!(nodes[0].node.list_channels().len(), 1);
+       check_added_monitors!(nodes[0], 1);
+
+       nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+       let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
+       nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+       let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
+
+       nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]);
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+       nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]);
+       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+
+       let (funding_locked, _) = create_chan_between_nodes_with_value_confirm(&nodes[0], &nodes[1], &tx);
+       let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked);
+       for node in nodes.iter() {
+               assert!(node.net_graph_msg_handler.handle_channel_announcement(&announcement).unwrap());
+               node.net_graph_msg_handler.handle_channel_update(&as_update).unwrap();
+               node.net_graph_msg_handler.handle_channel_update(&bs_update).unwrap();
+       }
+
+       send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
+}
+
 #[test]
 fn test_simple_manager_serialize_deserialize() {
        let chanmon_cfgs = create_chanmon_cfgs(2);