use chain::keysinterface::{ChannelKeys, KeysInterface, KeysManager, InMemoryChannelKeys};
use util::config::UserConfig;
use util::{byte_utils, events};
-use util::ser::{Readable, ReadableArgs, Writeable, Writer};
+use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
use util::chacha20::{ChaCha20, ChaChaReader};
use util::logger::Logger;
use util::errors::APIError;
peer_state.latest_features.write(writer)?;
}
+ let events = self.pending_events.lock().unwrap();
+ (events.len() as u64).write(writer)?;
+ for event in events.iter() {
+ event.write(writer)?;
+ }
+
(self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
Ok(())
}
}
+ const MAX_ALLOC_SIZE: usize = 1024 * 64;
let forward_htlcs_count: u64 = Readable::read(reader)?;
let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
for _ in 0..forward_htlcs_count {
let short_channel_id = Readable::read(reader)?;
let pending_forwards_count: u64 = Readable::read(reader)?;
- let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128));
+ let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, MAX_ALLOC_SIZE/mem::size_of::<HTLCForwardInfo>()));
for _ in 0..pending_forwards_count {
pending_forwards.push(Readable::read(reader)?);
}
for _ in 0..claimable_htlcs_count {
let payment_hash = Readable::read(reader)?;
let previous_hops_len: u64 = Readable::read(reader)?;
- let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2));
+ let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, MAX_ALLOC_SIZE/mem::size_of::<ClaimableHTLC>()));
for _ in 0..previous_hops_len {
previous_hops.push(Readable::read(reader)?);
}
}
let peer_count: u64 = Readable::read(reader)?;
- let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, 128));
+ let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState>)>()));
for _ in 0..peer_count {
let peer_pubkey = Readable::read(reader)?;
let peer_state = PeerState {
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
}
+ let event_count: u64 = Readable::read(reader)?;
+ let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
+ for _ in 0..event_count {
+ match MaybeReadable::read(reader)? {
+ Some(event) => pending_events_read.push(event),
+ None => continue,
+ }
+ }
+
let last_node_announcement_serial: u32 = Readable::read(reader)?;
let channel_manager = ChannelManager {
per_peer_state: RwLock::new(per_peer_state),
- pending_events: Mutex::new(Vec::new()),
+ pending_events: Mutex::new(pending_events_read),
total_consistency_lock: RwLock::new(()),
keys_manager: args.keys_manager,
logger: args.logger,
send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
}
+#[test]
+fn test_manager_serialize_deserialize_events() {
+ // This test makes sure the events field in ChannelManager survives de/serialization
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let fee_estimator: test_utils::TestFeeEstimator;
+ let new_chan_monitor: test_utils::TestChannelMonitor;
+ let keys_manager: test_utils::TestKeysInterface;
+ let nodes_0_deserialized: ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>;
+ let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+ // Start creating a channel, but stop right before broadcasting the event message FundingBroadcastSafe
+ let channel_value = 100000;
+ let push_msat = 10001;
+ let a_flags = InitFeatures::known();
+ let b_flags = InitFeatures::known();
+ let node_a = nodes.pop().unwrap();
+ let node_b = nodes.pop().unwrap();
+ node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42, None).unwrap();
+ node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), a_flags, &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id()));
+ node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), b_flags, &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id()));
+
+ let (temporary_channel_id, tx, funding_output) = create_funding_transaction(&node_a, channel_value, 42);
+
+ node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
+ check_added_monitors!(node_a, 0);
+
+ node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendFundingCreated, node_b.node.get_our_node_id()));
+ {
+ let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
+
+ node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id()));
+ {
+ let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
+ // Normally, this is where node_a would check for a FundingBroadcastSafe event, but the test de/serializes first instead
+
+ nodes.push(node_a);
+ nodes.push(node_b);
+
+ // Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized
+ let nodes_0_serialized = nodes[0].node.encode();
+ let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
+ nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap();
+
+ fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
+ new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()), &fee_estimator);
+ nodes[0].chan_monitor = &new_chan_monitor;
+ let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
+ let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap();
+ assert!(chan_0_monitor_read.is_empty());
+
+ let mut nodes_0_read = &nodes_0_serialized[..];
+ let config = UserConfig::default();
+ keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()));
+ let (_, nodes_0_deserialized_tmp) = {
+ let mut channel_monitors = HashMap::new();
+ channel_monitors.insert(chan_0_monitor.get_funding_txo(), &mut chan_0_monitor);
+ <(BlockHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+ default_config: config,
+ keys_manager: &keys_manager,
+ fee_estimator: &fee_estimator,
+ monitor: nodes[0].chan_monitor,
+ tx_broadcaster: nodes[0].tx_broadcaster.clone(),
+ logger: Arc::new(test_utils::TestLogger::new()),
+ channel_monitors: &mut channel_monitors,
+ }).unwrap()
+ };
+ nodes_0_deserialized = nodes_0_deserialized_tmp;
+ assert!(nodes_0_read.is_empty());
+
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+ assert!(nodes[0].chan_monitor.add_monitor(chan_0_monitor.get_funding_txo(), chan_0_monitor).is_ok());
+ nodes[0].node = &nodes_0_deserialized;
+
+ // After deserializing, make sure the FundingBroadcastSafe event is still held by the channel manager
+ let events_4 = nodes[0].node.get_and_clear_pending_events();
+ assert_eq!(events_4.len(), 1);
+ match events_4[0] {
+ Event::FundingBroadcastSafe { ref funding_txo, user_channel_id } => {
+ assert_eq!(user_channel_id, 42);
+ assert_eq!(*funding_txo, funding_output);
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ // Make sure the channel is functioning as though the de/serialization never happened
+ nodes[0].block_notifier.register_listener(nodes[0].node);
+ assert_eq!(nodes[0].node.list_channels().len(), 1);
+ check_added_monitors!(nodes[0], 1);
+
+ nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+ let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
+ nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+ let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
+
+ nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]);
+ assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+ nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]);
+ assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+
+ let (funding_locked, _) = create_chan_between_nodes_with_value_confirm(&nodes[0], &nodes[1], &tx);
+ let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked);
+ for node in nodes.iter() {
+ assert!(node.net_graph_msg_handler.handle_channel_announcement(&announcement).unwrap());
+ node.net_graph_msg_handler.handle_channel_update(&as_update).unwrap();
+ node.net_graph_msg_handler.handle_channel_update(&bs_update).unwrap();
+ }
+
+ send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
+}
+
#[test]
fn test_simple_manager_serialize_deserialize() {
let chanmon_cfgs = create_chanmon_cfgs(2);