X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fchannelmanager.rs;h=5ee2f6725566fcb818eaf306f252ca62c182188a;hb=224fb05cc53526e7b861e746183451d3d74ad549;hp=08ed2515f0d2aee35774ac7474d142be6adec237;hpb=bb43b98e8fa6bce8d2e1a93a9ce566a939d87baa;p=rust-lightning diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 08ed2515..5ee2f672 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -23,14 +23,15 @@ use secp256k1; use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; -use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; +use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route,RouteHop}; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, HandleError}; +use ln::msgs::{ChannelMessageHandler, DecodeError, HandleError}; use chain::keysinterface::KeysInterface; +use util::config::UserConfig; use util::{byte_utils, events, internal_traits, rng}; use util::sha2::Sha256; -use util::ser::{Readable, Writeable}; +use util::ser::{Readable, ReadableArgs, Writeable, Writer}; use util::chacha20poly1305rfc::ChaCha20; use util::logger::Logger; use util::errors::APIError; @@ -41,11 +42,10 @@ use crypto::hmac::Hmac; use crypto::digest::Digest; use crypto::symmetriccipher::SynchronousStreamCipher; -use std::{ptr, mem}; -use std::collections::HashMap; -use std::collections::hash_map; +use std::{cmp, ptr, mem}; +use std::collections::{HashMap, hash_map, HashSet}; use std::io::Cursor; -use std::sync::{Mutex,MutexGuard,Arc}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Instant,Duration}; @@ -301,22 +301,45 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum /// /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through /// to individual Channels. +/// +/// Implements Writeable to write out all channel state to disk. Implies peer_disconnected() for +/// all peers during write/read (though does not modify this instance, only the instance being +/// serialized). This will result in any channels which have not yet exchanged funding_created (ie +/// called funding_transaction_generated for outbound channels). +/// +/// Note that you can be a bit lazier about writing out ChannelManager than you can be with +/// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before +/// returning from ManyChannelMonitor::add_update_monitor, with ChannelManagers, writing updates +/// happens out-of-band (and will prevent any other ChannelManager operations from occurring during +/// the serialization process). If the deserialized version is out-of-date compared to the +/// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the +/// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees). +/// +/// Note that the deserializer is only implemented for (Sha256dHash, ChannelManager), which +/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along +/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call +/// block_connected() to step towards your best block) upon deserialization before using the +/// object! pub struct ChannelManager { + default_configuration: UserConfig, genesis_hash: Sha256dHash, fee_estimator: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, - announce_channels_publicly: bool, - fee_proportional_millionths: u32, latest_block_height: AtomicUsize, + last_block_hash: Mutex, secp_ctx: Secp256k1, channel_state: Mutex, our_network_key: SecretKey, pending_events: Mutex>, + /// Used when we have to take a BIG lock to make sure everything is self-consistent. + /// Essentially just when we're serializing ourselves out. + /// Taken first everywhere where we are making changes before any other locks. + total_consistency_lock: RwLock<()>, keys_manager: Arc, @@ -388,23 +411,22 @@ impl ChannelManager { /// This is the main "logic hub" for all channel-related actions, and implements /// ChannelMessageHandler. /// - /// fee_proportional_millionths is an optional fee to charge any payments routed through us. /// Non-proportional fees are fixed according to our risk using the provided fee estimator. /// /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`! - pub fn new(fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc, keys_manager: Arc) -> Result, secp256k1::Error> { + pub fn new(network: Network, feeest: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc,keys_manager: Arc, config: UserConfig) -> Result, secp256k1::Error> { let secp_ctx = Secp256k1::new(); let res = Arc::new(ChannelManager { + default_configuration: config.clone(), genesis_hash: genesis_block(network).header.bitcoin_hash(), fee_estimator: feeest.clone(), monitor: monitor.clone(), chain_monitor, tx_broadcaster, - announce_channels_publicly, - fee_proportional_millionths, - latest_block_height: AtomicUsize::new(0), //TODO: Get an init value (generally need to replay recent chain on chain_monitor registration) + latest_block_height: AtomicUsize::new(0), //TODO: Get an init value + last_block_hash: Mutex::new(Default::default()), secp_ctx, channel_state: Mutex::new(ChannelHolder{ @@ -418,6 +440,7 @@ impl ChannelManager { our_network_key: keys_manager.get_node_secret(), pending_events: Mutex::new(Vec::new()), + total_consistency_lock: RwLock::new(()), keys_manager, @@ -438,10 +461,17 @@ impl ChannelManager { /// If successful, will generate a SendOpenChannel message event, so you should probably poll /// PeerManager::process_events afterwards. /// - /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat being greater than channel_value_satoshis * 1k + /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat is + /// greater than channel_value_satoshis * 1k or channel_value_satoshis is < 1000. pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> { - let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?; + if channel_value_satoshis < 1000 { + return Err(APIError::APIMisuseError { err: "channel_value must be at least 1000 satoshis" }); + } + + let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, Arc::clone(&self.logger), &self.default_configuration)?; let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator); + + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -505,6 +535,8 @@ impl ChannelManager { /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { + let _ = self.total_consistency_lock.read().unwrap(); + let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -567,6 +599,8 @@ impl ChannelManager { /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. pub fn force_close_channel(&self, channel_id: &[u8; 32]) { + let _ = self.total_consistency_lock.read().unwrap(); + let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -1040,7 +1074,7 @@ impl ChannelManager { if *amt_to_forward < chan.get_their_htlc_minimum_msat() { // amount_below_minimum break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, Some(self.get_channel_update(chan).unwrap()))); } - let fee = amt_to_forward.checked_mul(self.fee_proportional_millionths as u64).and_then(|prop_fee| { (prop_fee / 1000000).checked_add(chan.get_our_fee_base_msat(&*self.fee_estimator) as u64) }); + let fee = amt_to_forward.checked_mul(chan.get_fee_proportional_millionths() as u64).and_then(|prop_fee| { (prop_fee / 1000000).checked_add(chan.get_our_fee_base_msat(&*self.fee_estimator) as u64) }); if fee.is_none() || msg.amount_msat < fee.unwrap() || (msg.amount_msat - fee.unwrap()) < *amt_to_forward { // fee_insufficient break Some(("Prior hop has deviated from specified fees parameters or origin node has obsolete ones", 0x1000 | 12, Some(self.get_channel_update(chan).unwrap()))); } @@ -1094,7 +1128,7 @@ impl ChannelManager { cltv_expiry_delta: CLTV_EXPIRY_DELTA, htlc_minimum_msat: chan.get_our_htlc_minimum_msat(), fee_base_msat: chan.get_our_fee_base_msat(&*self.fee_estimator), - fee_proportional_millionths: self.fee_proportional_millionths, + fee_proportional_millionths: chan.get_fee_proportional_millionths(), excess_data: Vec::new(), }; @@ -1147,6 +1181,7 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = self.channel_state.lock().unwrap(); let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { @@ -1198,11 +1233,16 @@ impl ChannelManager { /// Call this upon creation of a funding transaction for the given channel. /// + /// Note that ALL inputs in the transaction pointed to by funding_txo MUST spend SegWit outputs + /// or your counterparty can steal your funds! + /// /// Panics if a funding transaction has already been provided for this channel. /// /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { + let _ = self.total_consistency_lock.read().unwrap(); + let (chan, msg, chan_monitor) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.remove(temporary_channel_id) { @@ -1268,6 +1308,8 @@ impl ChannelManager { /// Should only really ever be called in response to an PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { + let _ = self.total_consistency_lock.read().unwrap(); + let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); { @@ -1389,6 +1431,8 @@ impl ChannelManager { /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect after a PaymentReceived event. pub fn fail_htlc_backwards(&self, payment_hash: &[u8; 32], reason: PaymentFailReason) -> bool { + let _ = self.total_consistency_lock.read().unwrap(); + let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash); if let Some(mut sources) = removed_source { @@ -1484,6 +1528,8 @@ impl ChannelManager { let mut payment_hash = [0; 32]; sha.result(&mut payment_hash); + let _ = self.total_consistency_lock.read().unwrap(); + let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash); if let Some(mut sources) = removed_source { @@ -1562,6 +1608,7 @@ impl ChannelManager { let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); let mut htlc_failures = Vec::new(); + let _ = self.total_consistency_lock.read().unwrap(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1641,7 +1688,7 @@ impl ChannelManager { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash", msg.temporary_channel_id.clone())); } - let channel = Channel::new_from_req(&*self.fee_estimator, &self.keys_manager, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger)) + let channel = Channel::new_from_req(&*self.fee_estimator, &self.keys_manager, their_node_id.clone(), msg, 0, Arc::clone(&self.logger), &self.default_configuration) .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?; let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -1667,7 +1714,7 @@ impl ChannelManager { //TODO: see issue #153, need a consistent behavior on obnoxious behavior from random node return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.temporary_channel_id)); } - chan.accept_channel(&msg) + chan.accept_channel(&msg, &self.default_configuration) .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.temporary_channel_id))?; (chan.get_value_satoshis(), chan.get_funding_redeemscript().to_v0_p2wsh(), chan.get_user_id()) }, @@ -1788,7 +1835,7 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; if let Some(msg) = shutdown { channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: their_node_id.clone(), @@ -1882,7 +1929,7 @@ impl ChannelManager { //encrypted with the same key. Its not immediately obvious how to usefully exploit that, //but we should prevent it anyway. - let (pending_forward_info, mut channel_state_lock) = self.decode_update_add_htlc_onion(msg); + let (mut pending_forward_info, mut channel_state_lock) = self.decode_update_add_htlc_onion(msg); let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.get_mut(&msg.channel_id) { @@ -1892,7 +1939,16 @@ impl ChannelManager { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } if !chan.is_usable() { - return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Channel not yet available for receiving HTLCs", action: Some(msgs::ErrorAction::IgnoreError)})); + // If the update_add is completely bogus, the call will Err and we will close, + // but if we've sent a shutdown and they haven't acknowledged it yet, we just + // want to reject the new HTLC and fail it backwards instead of forwarding. + if let PendingHTLCStatus::Forward(PendingForwardHTLCInfo { incoming_shared_secret, .. }) = pending_forward_info { + pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason: ChannelManager::build_first_hop_failure_packet(&incoming_shared_secret, 0x1000|20, &self.get_channel_update(chan).unwrap().encode_with_len()[..]), + })); + } } chan.update_add_htlc(&msg, pending_forward_info).map_err(|e| MsgHandleErrInternal::from_maybe_close(e)) }, @@ -2158,7 +2214,7 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + let (revoke_and_ack, commitment_signed, closing_signed, chan_monitor) = chan.commitment_signed(&msg, &*self.fee_estimator).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } @@ -2179,6 +2235,12 @@ impl ChannelManager { }, }); } + if let Some(msg) = closing_signed { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: their_node_id.clone(), + msg, + }); + } Ok(()) }, None => Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) @@ -2228,7 +2290,7 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let (commitment_update, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + let (commitment_update, pending_forwards, pending_failures, closing_signed, chan_monitor) = chan.revoke_and_ack(&msg, &*self.fee_estimator).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } @@ -2238,6 +2300,12 @@ impl ChannelManager { updates, }); } + if let Some(msg) = closing_signed { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: their_node_id.clone(), + msg, + }); + } (pending_forwards, pending_failures, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel")) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) @@ -2315,7 +2383,7 @@ impl ChannelManager { if chan.get_their_node_id() != *their_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg) + let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order, shutdown) = chan.channel_reestablish(msg) .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; if let Some(monitor) = channel_monitor { if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { @@ -2354,6 +2422,12 @@ impl ChannelManager { send_raa!(); }, } + if let Some(msg) = shutdown { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: their_node_id.clone(), + msg, + }); + } Ok(()) }, None => Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) @@ -2366,6 +2440,7 @@ impl ChannelManager { /// Note: This API is likely to change! #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -2423,6 +2498,7 @@ impl events::EventsProvider for ChannelManager { impl ChainListener for ChannelManager { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -2496,10 +2572,12 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.store(height as usize, Ordering::Release); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } /// We force-close the channel without letting our counterparty participate in the shutdown fn block_disconnected(&self, header: &BlockHeader) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -2527,6 +2605,7 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } } @@ -2565,70 +2644,87 @@ macro_rules! handle_error { impl ChannelMessageHandler for ChannelManager { //TODO: Handle errors and close channel (or so) fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id) } fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id) } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id) } fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id) } fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id) } fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id) } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id) } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), msgs::HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), their_node_id) } fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), their_node_id) } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), their_node_id) } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), their_node_id) } fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_commitment_signed(their_node_id, msg), their_node_id) } fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), their_node_id) } fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fee(their_node_id, msg), their_node_id) } fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id) } fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), their_node_id) } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); { @@ -2684,6 +2780,7 @@ impl ChannelMessageHandler for ChannelManager { } fn peer_connected(&self, their_node_id: &PublicKey) { + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); let pending_msg_events = channel_state.pending_msg_events; @@ -2708,6 +2805,8 @@ impl ChannelMessageHandler for ChannelManager { } fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) { + let _ = self.total_consistency_lock.read().unwrap(); + if msg.channel_id == [0; 32] { for chan in self.list_channels() { if chan.remote_network_id == *their_node_id { @@ -2720,15 +2819,397 @@ impl ChannelMessageHandler for ChannelManager { } } +const SERIALIZATION_VERSION: u8 = 1; +const MIN_SERIALIZATION_VERSION: u8 = 1; + +impl Writeable for PendingForwardHTLCInfo { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + if let &Some(ref onion) = &self.onion_packet { + 1u8.write(writer)?; + onion.write(writer)?; + } else { + 0u8.write(writer)?; + } + self.incoming_shared_secret.write(writer)?; + self.payment_hash.write(writer)?; + self.short_channel_id.write(writer)?; + self.amt_to_forward.write(writer)?; + self.outgoing_cltv_value.write(writer)?; + Ok(()) + } +} + +impl Readable for PendingForwardHTLCInfo { + fn read(reader: &mut R) -> Result { + let onion_packet = match >::read(reader)? { + 0 => None, + 1 => Some(msgs::OnionPacket::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }; + Ok(PendingForwardHTLCInfo { + onion_packet, + incoming_shared_secret: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + short_channel_id: Readable::read(reader)?, + amt_to_forward: Readable::read(reader)?, + outgoing_cltv_value: Readable::read(reader)?, + }) + } +} + +impl Writeable for HTLCFailureMsg { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailureMsg::Relay(ref fail_msg) => { + 0u8.write(writer)?; + fail_msg.write(writer)?; + }, + &HTLCFailureMsg::Malformed(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailureMsg { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)), + 1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for PendingHTLCStatus { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &PendingHTLCStatus::Forward(ref forward_info) => { + 0u8.write(writer)?; + forward_info.write(writer)?; + }, + &PendingHTLCStatus::Fail(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for PendingHTLCStatus { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)), + 1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCPreviousHopData, 0, { + short_channel_id, + htlc_id, + incoming_packet_shared_secret +}); + +impl Writeable for HTLCSource { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCSource::PreviousHopData(ref hop_data) => { + 0u8.write(writer)?; + hop_data.write(writer)?; + }, + &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => { + 1u8.write(writer)?; + route.write(writer)?; + session_priv.write(writer)?; + first_hop_htlc_msat.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCSource { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), + 1 => Ok(HTLCSource::OutboundRoute { + route: Readable::read(reader)?, + session_priv: Readable::read(reader)?, + first_hop_htlc_msat: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for HTLCFailReason { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailReason::ErrorPacket { ref err } => { + 0u8.write(writer)?; + err.write(writer)?; + }, + &HTLCFailReason::Reason { ref failure_code, ref data } => { + 1u8.write(writer)?; + failure_code.write(writer)?; + data.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailReason { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailReason::ErrorPacket { err: Readable::read(reader)? }), + 1 => Ok(HTLCFailReason::Reason { + failure_code: Readable::read(reader)?, + data: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCForwardInfo, 0, { + prev_short_channel_id, + prev_htlc_id, + forward_info +}); + +impl Writeable for ChannelManager { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + let _ = self.total_consistency_lock.write().unwrap(); + + writer.write_all(&[SERIALIZATION_VERSION; 1])?; + writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + + self.genesis_hash.write(writer)?; + (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?; + self.last_block_hash.lock().unwrap().write(writer)?; + + let channel_state = self.channel_state.lock().unwrap(); + let mut unfunded_channels = 0; + for (_, channel) in channel_state.by_id.iter() { + if !channel.is_funding_initiated() { + unfunded_channels += 1; + } + } + ((channel_state.by_id.len() - unfunded_channels) as u64).write(writer)?; + for (_, channel) in channel_state.by_id.iter() { + if channel.is_funding_initiated() { + channel.write(writer)?; + } + } + + (channel_state.forward_htlcs.len() as u64).write(writer)?; + for (short_channel_id, pending_forwards) in channel_state.forward_htlcs.iter() { + short_channel_id.write(writer)?; + (pending_forwards.len() as u64).write(writer)?; + for forward in pending_forwards { + forward.write(writer)?; + } + } + + (channel_state.claimable_htlcs.len() as u64).write(writer)?; + for (payment_hash, previous_hops) in channel_state.claimable_htlcs.iter() { + payment_hash.write(writer)?; + (previous_hops.len() as u64).write(writer)?; + for previous_hop in previous_hops { + previous_hop.write(writer)?; + } + } + + Ok(()) + } +} + +/// Arguments for the creation of a ChannelManager that are not deserialized. +/// +/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation +/// is: +/// 1) Deserialize all stored ChannelMonitors. +/// 2) Deserialize the ChannelManager by filling in this struct and calling <(Sha256dHash, +/// ChannelManager)>::read(reader, args). +/// This may result in closing some Channels if the ChannelMonitor is newer than the stored +/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. +/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using +/// ChannelMonitor::get_monitored_outpoints and ChannelMonitor::get_funding_txo(). +/// 4) Reconnect blocks on your ChannelMonitors. +/// 5) Move the ChannelMonitors into your local ManyChannelMonitor. +/// 6) Disconnect/connect blocks on the ChannelManager. +/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen +/// automatically as it does in ChannelManager::new()). +pub struct ChannelManagerReadArgs<'a> { + /// The keys provider which will give us relevant keys. Some keys will be loaded during + /// deserialization. + pub keys_manager: Arc, + + /// The fee_estimator for use in the ChannelManager in the future. + /// + /// No calls to the FeeEstimator will be made during deserialization. + pub fee_estimator: Arc, + /// The ManyChannelMonitor for use in the ChannelManager in the future. + /// + /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that + /// you have deserialized ChannelMonitors separately and will add them to your + /// ManyChannelMonitor after deserializing this ChannelManager. + pub monitor: Arc, + /// The ChainWatchInterface for use in the ChannelManager in the future. + /// + /// No calls to the ChainWatchInterface will be made during deserialization. + pub chain_monitor: Arc, + /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be + /// used to broadcast the latest local commitment transactions of channels which must be + /// force-closed during deserialization. + pub tx_broadcaster: Arc, + /// The Logger for use in the ChannelManager and which may be used to log information during + /// deserialization. + pub logger: Arc, + /// Default settings used for new channels. Any existing channels will continue to use the + /// runtime settings which were stored when the ChannelManager was serialized. + pub default_config: UserConfig, + + /// A map from channel funding outpoints to ChannelMonitors for those channels (ie + /// value.get_funding_txo() should be the key). + /// + /// If a monitor is inconsistent with the channel state during deserialization the channel will + /// be force-closed using the data in the channelmonitor and the Channel will be dropped. This + /// is true for missing channels as well. If there is a monitor missing for which we find + /// channel data Err(DecodeError::InvalidValue) will be returned. + /// + /// In such cases the latest local transactions will be sent to the tx_broadcaster included in + /// this struct. + pub channel_monitors: &'a HashMap, +} + +impl<'a, R : ::std::io::Read> ReadableArgs> for (Sha256dHash, ChannelManager) { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result { + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; + if min_ver > SERIALIZATION_VERSION { + return Err(DecodeError::UnknownVersion); + } + + let genesis_hash: Sha256dHash = Readable::read(reader)?; + let latest_block_height: u32 = Readable::read(reader)?; + let last_block_hash: Sha256dHash = Readable::read(reader)?; + + let mut closed_channels = Vec::new(); + + let channel_count: u64 = Readable::read(reader)?; + let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128)); + let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + for _ in 0..channel_count { + let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?; + if channel.last_block_connected != last_block_hash { + return Err(DecodeError::InvalidValue); + } + + let funding_txo = channel.channel_monitor().get_funding_txo().ok_or(DecodeError::InvalidValue)?; + funding_txo_set.insert(funding_txo.clone()); + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() || + channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() || + channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() { + let mut force_close_res = channel.force_shutdown(); + force_close_res.0 = monitor.get_latest_local_commitment_txn(); + closed_channels.push(force_close_res); + } else { + if let Some(short_channel_id) = channel.get_short_channel_id() { + short_to_id.insert(short_channel_id, channel.channel_id()); + } + by_id.insert(channel.channel_id(), channel); + } + } else { + return Err(DecodeError::InvalidValue); + } + } + + for (ref funding_txo, ref monitor) in args.channel_monitors.iter() { + if !funding_txo_set.contains(funding_txo) { + closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new())); + } + } + + let forward_htlcs_count: u64 = Readable::read(reader)?; + let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128)); + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128)); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + forward_htlcs.insert(short_channel_id, pending_forwards); + } + + let claimable_htlcs_count: u64 = Readable::read(reader)?; + let mut claimable_htlcs = HashMap::with_capacity(cmp::min(claimable_htlcs_count as usize, 128)); + for _ in 0..claimable_htlcs_count { + let payment_hash = Readable::read(reader)?; + let previous_hops_len: u64 = Readable::read(reader)?; + let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2)); + for _ in 0..previous_hops_len { + previous_hops.push(Readable::read(reader)?); + } + claimable_htlcs.insert(payment_hash, previous_hops); + } + + let channel_manager = ChannelManager { + genesis_hash, + fee_estimator: args.fee_estimator, + monitor: args.monitor, + chain_monitor: args.chain_monitor, + tx_broadcaster: args.tx_broadcaster, + + latest_block_height: AtomicUsize::new(latest_block_height as usize), + last_block_hash: Mutex::new(last_block_hash), + secp_ctx: Secp256k1::new(), + + channel_state: Mutex::new(ChannelHolder { + by_id, + short_to_id, + next_forward: Instant::now(), + forward_htlcs, + claimable_htlcs, + pending_msg_events: Vec::new(), + }), + our_network_key: args.keys_manager.get_node_secret(), + + pending_events: Mutex::new(Vec::new()), + total_consistency_lock: RwLock::new(()), + keys_manager: args.keys_manager, + logger: args.logger, + default_configuration: args.default_config, + }; + + for close_res in closed_channels.drain(..) { + channel_manager.finish_force_close_channel(close_res); + //TODO: Broadcast channel update for closed channels, but only after we've made a + //connection or two. + } + + Ok((last_block_hash.clone(), channel_manager)) + } +} + #[cfg(test)] mod tests { use chain::chaininterface; use chain::transaction::OutPoint; - use chain::chaininterface::ChainListener; + use chain::chaininterface::{ChainListener, ChainWatchInterface}; use chain::keysinterface::KeysInterface; use chain::keysinterface; - use ln::channelmanager::{ChannelManager,OnionKeys,PaymentFailReason,RAACommitmentOrder}; - use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; + use ln::channelmanager::{ChannelManager,ChannelManagerReadArgs,OnionKeys,PaymentFailReason,RAACommitmentOrder}; + use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS, ManyChannelMonitor}; use ln::router::{Route, RouteHop, Router}; use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; @@ -2736,7 +3217,8 @@ mod tests { use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::logger::Logger; - use util::ser::Writeable; + use util::ser::{Writeable, Writer, ReadableArgs}; + use util::config::UserConfig; use bitcoin::util::hash::Sha256dHash; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -2931,6 +3413,7 @@ mod tests { chan_monitor: Arc, node: Arc, router: Router, + node_seed: [u8; 32], network_payment_count: Rc>, network_chan_count: Rc>, } @@ -3599,10 +4082,13 @@ mod tests { let mut seed = [0; 32]; rng.fill_bytes(&mut seed); let keys_manager = Arc::new(keysinterface::KeysManager::new(&seed, Network::Testnet, Arc::clone(&logger))); - let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone())); - let node = ChannelManager::new(0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone()).unwrap(); + let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone(), logger.clone())); + let mut config = UserConfig::new(); + config.channel_options.announced_channel = true; + config.channel_limits.force_announced_channel_preference = false; + let node = ChannelManager::new(Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone(), config).unwrap(); let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger)); - nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, + nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, node_seed: seed, network_payment_count: payment_count.clone(), network_chan_count: chan_count.clone(), }); @@ -6405,4 +6891,203 @@ mod tests { sign_msg!(unsigned_msg); assert!(nodes[0].router.handle_channel_announcement(&chan_announcement).is_err()); } + + struct VecWriter(Vec); + impl Writer for VecWriter { + fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { + self.0.extend_from_slice(buf); + Ok(()) + } + fn size_hint(&mut self, size: usize) { + self.0.reserve_exact(size); + } + } + + #[test] + fn test_no_txn_manager_serialize_deserialize() { + let mut nodes = create_network(2); + + let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001); + + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + let nodes_0_serialized = nodes[0].node.encode(); + let mut chan_0_monitor_serialized = VecWriter(Vec::new()); + nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap(); + + nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()))); + let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; + let (_, chan_0_monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(chan_0_monitor_read.is_empty()); + + let mut nodes_0_read = &nodes_0_serialized[..]; + let config = UserConfig::new(); + let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()))); + let (_, nodes_0_deserialized) = { + let mut channel_monitors = HashMap::new(); + channel_monitors.insert(chan_0_monitor.get_funding_txo().unwrap(), &chan_0_monitor); + <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + default_config: config, + keys_manager, + fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }), + monitor: nodes[0].chan_monitor.clone(), + chain_monitor: nodes[0].chain_monitor.clone(), + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &channel_monitors, + }).unwrap() + }; + assert!(nodes_0_read.is_empty()); + + assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); + nodes[0].node = Arc::new(nodes_0_deserialized); + let nodes_0_as_listener: Arc = nodes[0].node.clone(); + nodes[0].chain_monitor.register_listener(Arc::downgrade(&nodes_0_as_listener)); + assert_eq!(nodes[0].node.list_channels().len(), 1); + check_added_monitors!(nodes[0], 1); + + nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]); + + nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]).unwrap(); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + + let (funding_locked, _) = create_chan_between_nodes_with_value_confirm(&nodes[0], &nodes[1], &tx); + let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked); + for node in nodes.iter() { + assert!(node.router.handle_channel_announcement(&announcement).unwrap()); + node.router.handle_channel_update(&as_update).unwrap(); + node.router.handle_channel_update(&bs_update).unwrap(); + } + + send_payment(&nodes[0], &[&nodes[1]], 1000000); + } + + #[test] + fn test_simple_manager_serialize_deserialize() { + let mut nodes = create_network(2); + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (our_payment_preimage, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); + let (_, our_payment_hash) = route_payment(&nodes[0], &[&nodes[1]], 1000000); + + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + let nodes_0_serialized = nodes[0].node.encode(); + let mut chan_0_monitor_serialized = VecWriter(Vec::new()); + nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap(); + + nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()))); + let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; + let (_, chan_0_monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(chan_0_monitor_read.is_empty()); + + let mut nodes_0_read = &nodes_0_serialized[..]; + let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()))); + let (_, nodes_0_deserialized) = { + let mut channel_monitors = HashMap::new(); + channel_monitors.insert(chan_0_monitor.get_funding_txo().unwrap(), &chan_0_monitor); + <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + default_config: UserConfig::new(), + keys_manager, + fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }), + monitor: nodes[0].chan_monitor.clone(), + chain_monitor: nodes[0].chain_monitor.clone(), + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &channel_monitors, + }).unwrap() + }; + assert!(nodes_0_read.is_empty()); + + assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); + nodes[0].node = Arc::new(nodes_0_deserialized); + check_added_monitors!(nodes[0], 1); + + reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + + fail_payment(&nodes[0], &[&nodes[1]], our_payment_hash); + claim_payment(&nodes[0], &[&nodes[1]], our_payment_preimage); + } + + #[test] + fn test_manager_serialize_deserialize_inconsistent_monitor() { + // Test deserializing a ChannelManager with a out-of-date ChannelMonitor + let mut nodes = create_network(4); + create_announced_chan_between_nodes(&nodes, 0, 1); + create_announced_chan_between_nodes(&nodes, 2, 0); + let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3); + + let (our_payment_preimage, _) = route_payment(&nodes[2], &[&nodes[0], &nodes[1]], 1000000); + + // Serialize the ChannelManager here, but the monitor we keep up-to-date + let nodes_0_serialized = nodes[0].node.encode(); + + route_payment(&nodes[0], &[&nodes[3]], 1000000); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + nodes[3].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + // Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/ + // nodes[3]) + let mut node_0_monitors_serialized = Vec::new(); + for monitor in nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter() { + let mut writer = VecWriter(Vec::new()); + monitor.1.write_for_disk(&mut writer).unwrap(); + node_0_monitors_serialized.push(writer.0); + } + + nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()))); + let mut node_0_monitors = Vec::new(); + for serialized in node_0_monitors_serialized.iter() { + let mut read = &serialized[..]; + let (_, monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(read.is_empty()); + node_0_monitors.push(monitor); + } + + let mut nodes_0_read = &nodes_0_serialized[..]; + let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()))); + let (_, nodes_0_deserialized) = <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + default_config: UserConfig::new(), + keys_manager, + fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }), + monitor: nodes[0].chan_monitor.clone(), + chain_monitor: nodes[0].chain_monitor.clone(), + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &node_0_monitors.iter().map(|monitor| { (monitor.get_funding_txo().unwrap(), monitor) }).collect(), + }).unwrap(); + assert!(nodes_0_read.is_empty()); + + { // Channel close should result in a commitment tx and an HTLC tx + let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert_eq!(txn.len(), 2); + assert_eq!(txn[0].input[0].previous_output.txid, funding_tx.txid()); + assert_eq!(txn[1].input[0].previous_output.txid, txn[0].txid()); + } + + for monitor in node_0_monitors.drain(..) { + assert!(nodes[0].chan_monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor).is_ok()); + check_added_monitors!(nodes[0], 1); + } + nodes[0].node = Arc::new(nodes_0_deserialized); + + // nodes[1] and nodes[2] have no lost state with nodes[0]... + reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + reconnect_nodes(&nodes[0], &nodes[2], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + //... and we can even still claim the payment! + claim_payment(&nodes[2], &[&nodes[0], &nodes[1]], our_payment_preimage); + + nodes[3].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish = get_event_msg!(nodes[3], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id()); + nodes[0].node.peer_connected(&nodes[3].node.get_our_node_id()); + if let Err(msgs::HandleError { action: Some(msgs::ErrorAction::SendErrorMessage { msg }), .. }) = nodes[0].node.handle_channel_reestablish(&nodes[3].node.get_our_node_id(), &reestablish) { + assert_eq!(msg.channel_id, channel_id); + } else { panic!("Unexpected result"); } + } }