Merge pull request #223 from TheBlueMatt/2018-10-chanmanager-serialize
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Sat, 27 Oct 2018 14:46:12 +0000 (10:46 -0400)
committerGitHub <noreply@github.com>
Sat, 27 Oct 2018 14:46:12 +0000 (10:46 -0400)
Implement and document Channel/ChannelManager (de)serialization

fuzz/fuzz_targets/chanmon_deser_target.rs
src/chain/keysinterface.rs
src/ln/channel.rs
src/ln/channelmanager.rs
src/ln/channelmonitor.rs
src/ln/router.rs
src/util/ser.rs
src/util/ser_macros.rs
src/util/test_utils.rs

index ba525081a712ef3e5b27fcdbad48efe8a53d4809..9ddf52c662ec7712acccfb0613100176eb9108ce 100644 (file)
@@ -1,13 +1,20 @@
 // This file is auto-generated by gen_target.sh based on msg_target_template.txt
 // To modify it, modify msg_target_template.txt and run gen_target.sh instead.
 
+extern crate bitcoin;
 extern crate lightning;
 
+use bitcoin::util::hash::Sha256dHash;
+
 use lightning::ln::channelmonitor;
 use lightning::util::reset_rng_state;
-use lightning::util::ser::{Readable, Writer};
+use lightning::util::ser::{ReadableArgs, Writer};
+
+mod utils;
+use utils::test_logger;
 
 use std::io::Cursor;
+use std::sync::Arc;
 
 struct VecWriter(Vec<u8>);
 impl Writer for VecWriter {
@@ -23,10 +30,13 @@ impl Writer for VecWriter {
 #[inline]
 pub fn do_test(data: &[u8]) {
        reset_rng_state();
-       if let Ok(monitor) = channelmonitor::ChannelMonitor::read(&mut Cursor::new(data)) {
+       let logger = Arc::new(test_logger::TestLogger{});
+       if let Ok((latest_block_hash, monitor)) = <(Sha256dHash, channelmonitor::ChannelMonitor)>::read(&mut Cursor::new(data), logger.clone()) {
                let mut w = VecWriter(Vec::new());
                monitor.write_for_disk(&mut w).unwrap();
-               assert!(channelmonitor::ChannelMonitor::read(&mut Cursor::new(&w.0)).unwrap() == monitor);
+               let deserialized_copy = <(Sha256dHash, channelmonitor::ChannelMonitor)>::read(&mut Cursor::new(&w.0), logger.clone()).unwrap();
+               assert!(latest_block_hash == deserialized_copy.0);
+               assert!(monitor == deserialized_copy.1);
                w.0.clear();
                monitor.write_for_watchtower(&mut w).unwrap();
        }
index b3823e2156106ac79573e9b56a529ab33002a197..18b069369b70e72455cee7cc80893a76bacb5195 100644 (file)
@@ -78,6 +78,15 @@ pub struct ChannelKeys {
        pub commitment_seed: [u8; 32],
 }
 
+impl_writeable!(ChannelKeys, 0, {
+       funding_key,
+       revocation_base_key,
+       payment_base_key,
+       delayed_payment_base_key,
+       htlc_base_key,
+       commitment_seed
+});
+
 impl ChannelKeys {
        /// Generate a set of lightning keys needed to operate a channel by HKDF-expanding a given
        /// random 32-byte seed
index e8ec23d9451e837a3e2acde52d0de194ed283f91..2561fc279ebaef05a46445254a4137c4d9bdb644 100644 (file)
@@ -4,7 +4,9 @@ use bitcoin::blockdata::transaction::{TxIn, TxOut, Transaction, SigHashType};
 use bitcoin::blockdata::opcodes;
 use bitcoin::util::hash::{Sha256dHash, Hash160};
 use bitcoin::util::bip143;
-use bitcoin::network::serialize::BitcoinHash;
+use bitcoin::network;
+use bitcoin::network::serialize::{BitcoinHash, RawDecoder, RawEncoder};
+use bitcoin::network::encodable::{ConsensusEncodable, ConsensusDecodable};
 
 use secp256k1::key::{PublicKey,SecretKey};
 use secp256k1::{Secp256k1,Message,Signature};
@@ -13,7 +15,7 @@ use secp256k1;
 use crypto::digest::Digest;
 
 use ln::msgs;
-use ln::msgs::{ErrorAction, HandleError};
+use ln::msgs::{DecodeError, ErrorAction, HandleError};
 use ln::channelmonitor::ChannelMonitor;
 use ln::channelmanager::{PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingForwardHTLCInfo, RAACommitmentOrder};
 use ln::chan_utils::{TxCreationKeys,HTLCOutputInCommitment,HTLC_SUCCESS_TX_WEIGHT,HTLC_TIMEOUT_TX_WEIGHT};
@@ -22,7 +24,7 @@ use chain::chaininterface::{FeeEstimator,ConfirmationTarget};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{ChannelKeys, KeysInterface};
 use util::{transaction_utils,rng};
-use util::ser::Writeable;
+use util::ser::{Readable, ReadableArgs, Writeable, Writer, WriterWriteAdaptor};
 use util::sha2::Sha256;
 use util::logger::Logger;
 use util::errors::APIError;
@@ -306,8 +308,9 @@ pub(super) struct Channel {
        /// could miss the funding_tx_confirmed_in block as well, but it serves as a useful fallback.
        funding_tx_confirmed_in: Option<Sha256dHash>,
        short_channel_id: Option<u64>,
-       /// Used to deduplicate block_connected callbacks
-       last_block_connected: Sha256dHash,
+       /// Used to deduplicate block_connected callbacks, also used to verify consistency during
+       /// ChannelManager deserialization (hence pub(super))
+       pub(super) last_block_connected: Sha256dHash,
        funding_tx_confirmations: u64,
 
        their_dust_limit_satoshis: u64,
@@ -438,7 +441,7 @@ impl Channel {
                let secp_ctx = Secp256k1::new();
                let channel_monitor = ChannelMonitor::new(&chan_keys.revocation_base_key, &chan_keys.delayed_payment_base_key,
                                                          &chan_keys.htlc_base_key, BREAKDOWN_TIMEOUT,
-                                                         keys_provider.get_destination_script());
+                                                         keys_provider.get_destination_script(), logger.clone());
 
                Ok(Channel {
                        user_id: user_id,
@@ -600,7 +603,7 @@ impl Channel {
                let secp_ctx = Secp256k1::new();
                let mut channel_monitor = ChannelMonitor::new(&chan_keys.revocation_base_key, &chan_keys.delayed_payment_base_key,
                                                              &chan_keys.htlc_base_key, BREAKDOWN_TIMEOUT,
-                                                             keys_provider.get_destination_script());
+                                                             keys_provider.get_destination_script(), logger.clone());
                channel_monitor.set_their_base_keys(&msg.htlc_basepoint, &msg.delayed_payment_basepoint);
                channel_monitor.set_their_to_self_delay(msg.to_self_delay);
 
@@ -2560,6 +2563,18 @@ impl Channel {
                self.feerate_per_kw
        }
 
+       pub fn get_cur_local_commitment_transaction_number(&self) -> u64 {
+               self.cur_local_commitment_transaction_number + 1
+       }
+
+       pub fn get_cur_remote_commitment_transaction_number(&self) -> u64 {
+               self.cur_remote_commitment_transaction_number + 1 - if self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 }
+       }
+
+       pub fn get_revoked_remote_commitment_transaction_number(&self) -> u64 {
+               self.cur_remote_commitment_transaction_number + 2
+       }
+
        //TODO: Testing purpose only, should be changed in another way after #81
        #[cfg(test)]
        pub fn get_local_keys(&self) -> &ChannelKeys {
@@ -2671,9 +2686,10 @@ impl Channel {
        /// Only returns an ErrorAction of DisconnectPeer, if Err.
        pub fn block_connected(&mut self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> Result<Option<msgs::FundingLocked>, HandleError> {
                let non_shutdown_state = self.channel_state & (!MULTI_STATE_FLAGS);
-               if self.funding_tx_confirmations > 0 {
-                       if header.bitcoin_hash() != self.last_block_connected {
-                               self.last_block_connected = header.bitcoin_hash();
+               if header.bitcoin_hash() != self.last_block_connected {
+                       self.last_block_connected = header.bitcoin_hash();
+                       self.channel_monitor.last_block_hash = self.last_block_connected;
+                       if self.funding_tx_confirmations > 0 {
                                self.funding_tx_confirmations += 1;
                                if self.funding_tx_confirmations == Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 {
                                        let need_commitment_update = if non_shutdown_state == ChannelState::FundingSent as u32 {
@@ -2754,6 +2770,8 @@ impl Channel {
                if Some(header.bitcoin_hash()) == self.funding_tx_confirmed_in {
                        self.funding_tx_confirmations = Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 - 1;
                }
+               self.last_block_connected = header.bitcoin_hash();
+               self.channel_monitor.last_block_hash = self.last_block_connected;
                false
        }
 
@@ -3227,6 +3245,494 @@ impl Channel {
        }
 }
 
+const SERIALIZATION_VERSION: u8 = 1;
+const MIN_SERIALIZATION_VERSION: u8 = 1;
+
+impl Writeable for InboundHTLCRemovalReason {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &InboundHTLCRemovalReason::FailRelay(ref error_packet) => {
+                               0u8.write(writer)?;
+                               error_packet.write(writer)?;
+                       },
+                       &InboundHTLCRemovalReason::FailMalformed((ref onion_hash, ref err_code)) => {
+                               1u8.write(writer)?;
+                               onion_hash.write(writer)?;
+                               err_code.write(writer)?;
+                       },
+                       &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => {
+                               2u8.write(writer)?;
+                               payment_preimage.write(writer)?;
+                       },
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for InboundHTLCRemovalReason {
+       fn read(reader: &mut R) -> Result<Self, DecodeError> {
+               Ok(match <u8 as Readable<R>>::read(reader)? {
+                       0 => InboundHTLCRemovalReason::FailRelay(Readable::read(reader)?),
+                       1 => InboundHTLCRemovalReason::FailMalformed((Readable::read(reader)?, Readable::read(reader)?)),
+                       2 => InboundHTLCRemovalReason::Fulfill(Readable::read(reader)?),
+                       _ => return Err(DecodeError::InvalidValue),
+               })
+       }
+}
+
+impl Writeable for Channel {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
+               // called but include holding cell updates (and obviously we don't modify self).
+
+               writer.write_all(&[SERIALIZATION_VERSION; 1])?;
+               writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
+
+               self.user_id.write(writer)?;
+
+               self.channel_id.write(writer)?;
+               (self.channel_state | ChannelState::PeerDisconnected as u32).write(writer)?;
+               self.channel_outbound.write(writer)?;
+               self.announce_publicly.write(writer)?;
+               self.channel_value_satoshis.write(writer)?;
+
+               self.local_keys.write(writer)?;
+               self.shutdown_pubkey.write(writer)?;
+
+               self.cur_local_commitment_transaction_number.write(writer)?;
+               self.cur_remote_commitment_transaction_number.write(writer)?;
+               self.value_to_self_msat.write(writer)?;
+
+               self.received_commitment_while_awaiting_raa.write(writer)?;
+
+               let mut dropped_inbound_htlcs = 0;
+               for htlc in self.pending_inbound_htlcs.iter() {
+                       if let InboundHTLCState::RemoteAnnounced(_) = htlc.state {
+                               dropped_inbound_htlcs += 1;
+                       }
+               }
+               (self.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?;
+               for htlc in self.pending_inbound_htlcs.iter() {
+                       htlc.htlc_id.write(writer)?;
+                       htlc.amount_msat.write(writer)?;
+                       htlc.cltv_expiry.write(writer)?;
+                       htlc.payment_hash.write(writer)?;
+                       match &htlc.state {
+                               &InboundHTLCState::RemoteAnnounced(_) => {}, // Drop
+                               &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_state) => {
+                                       1u8.write(writer)?;
+                                       htlc_state.write(writer)?;
+                               },
+                               &InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_state) => {
+                                       2u8.write(writer)?;
+                                       htlc_state.write(writer)?;
+                               },
+                               &InboundHTLCState::Committed => {
+                                       3u8.write(writer)?;
+                               },
+                               &InboundHTLCState::LocalRemoved(ref removal_reason) => {
+                                       4u8.write(writer)?;
+                                       removal_reason.write(writer)?;
+                               },
+                       }
+               }
+
+               macro_rules! write_option {
+                       ($thing: expr) => {
+                               match &$thing {
+                                       &None => 0u8.write(writer)?,
+                                       &Some(ref v) => {
+                                               1u8.write(writer)?;
+                                               v.write(writer)?;
+                                       },
+                               }
+                       }
+               }
+
+               (self.pending_outbound_htlcs.len() as u64).write(writer)?;
+               for htlc in self.pending_outbound_htlcs.iter() {
+                       htlc.htlc_id.write(writer)?;
+                       htlc.amount_msat.write(writer)?;
+                       htlc.cltv_expiry.write(writer)?;
+                       htlc.payment_hash.write(writer)?;
+                       htlc.source.write(writer)?;
+                       write_option!(htlc.fail_reason);
+                       match &htlc.state {
+                               &OutboundHTLCState::LocalAnnounced(ref onion_packet) => {
+                                       0u8.write(writer)?;
+                                       onion_packet.write(writer)?;
+                               },
+                               &OutboundHTLCState::Committed => {
+                                       1u8.write(writer)?;
+                               },
+                               &OutboundHTLCState::RemoteRemoved => {
+                                       2u8.write(writer)?;
+                               },
+                               &OutboundHTLCState::AwaitingRemoteRevokeToRemove => {
+                                       3u8.write(writer)?;
+                               },
+                               &OutboundHTLCState::AwaitingRemovedRemoteRevoke => {
+                                       4u8.write(writer)?;
+                               },
+                       }
+               }
+
+               (self.holding_cell_htlc_updates.len() as u64).write(writer)?;
+               for update in self.holding_cell_htlc_updates.iter() {
+                       match update {
+                               &HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, time_created: _ } => {
+                                       0u8.write(writer)?;
+                                       amount_msat.write(writer)?;
+                                       cltv_expiry.write(writer)?;
+                                       payment_hash.write(writer)?;
+                                       source.write(writer)?;
+                                       onion_routing_packet.write(writer)?;
+                                       // time_created is not serialized - we re-init the timeout upon deserialization
+                               },
+                               &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => {
+                                       1u8.write(writer)?;
+                                       payment_preimage.write(writer)?;
+                                       htlc_id.write(writer)?;
+                               },
+                               &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => {
+                                       2u8.write(writer)?;
+                                       htlc_id.write(writer)?;
+                                       err_packet.write(writer)?;
+                               }
+                       }
+               }
+
+               self.monitor_pending_revoke_and_ack.write(writer)?;
+               self.monitor_pending_commitment_signed.write(writer)?;
+               match self.monitor_pending_order {
+                       None => 0u8.write(writer)?,
+                       Some(RAACommitmentOrder::CommitmentFirst) => 1u8.write(writer)?,
+                       Some(RAACommitmentOrder::RevokeAndACKFirst) => 2u8.write(writer)?,
+               }
+
+               (self.monitor_pending_forwards.len() as u64).write(writer)?;
+               for &(ref pending_forward, ref htlc_id) in self.monitor_pending_forwards.iter() {
+                       pending_forward.write(writer)?;
+                       htlc_id.write(writer)?;
+               }
+
+               (self.monitor_pending_failures.len() as u64).write(writer)?;
+               for &(ref htlc_source, ref payment_hash, ref fail_reason) in self.monitor_pending_failures.iter() {
+                       htlc_source.write(writer)?;
+                       payment_hash.write(writer)?;
+                       fail_reason.write(writer)?;
+               }
+
+               write_option!(self.pending_update_fee);
+               write_option!(self.holding_cell_update_fee);
+
+               self.next_local_htlc_id.write(writer)?;
+               (self.next_remote_htlc_id - dropped_inbound_htlcs).write(writer)?;
+               self.channel_update_count.write(writer)?;
+               self.feerate_per_kw.write(writer)?;
+
+               (self.last_local_commitment_txn.len() as u64).write(writer)?;
+               for tx in self.last_local_commitment_txn.iter() {
+                       if let Err(e) = tx.consensus_encode(&mut RawEncoder::new(WriterWriteAdaptor(writer))) {
+                               match e {
+                                       network::serialize::Error::Io(e) => return Err(e),
+                                       _ => panic!("last_local_commitment_txn must have been well-formed!"),
+                               }
+                       }
+               }
+
+               match self.last_sent_closing_fee {
+                       Some((feerate, fee)) => {
+                               1u8.write(writer)?;
+                               feerate.write(writer)?;
+                               fee.write(writer)?;
+                       },
+                       None => 0u8.write(writer)?,
+               }
+
+               write_option!(self.funding_tx_confirmed_in);
+               write_option!(self.short_channel_id);
+
+               self.last_block_connected.write(writer)?;
+               self.funding_tx_confirmations.write(writer)?;
+
+               self.their_dust_limit_satoshis.write(writer)?;
+               self.our_dust_limit_satoshis.write(writer)?;
+               self.their_max_htlc_value_in_flight_msat.write(writer)?;
+               self.their_channel_reserve_satoshis.write(writer)?;
+               self.their_htlc_minimum_msat.write(writer)?;
+               self.our_htlc_minimum_msat.write(writer)?;
+               self.their_to_self_delay.write(writer)?;
+               self.their_max_accepted_htlcs.write(writer)?;
+
+               write_option!(self.their_funding_pubkey);
+               write_option!(self.their_revocation_basepoint);
+               write_option!(self.their_payment_basepoint);
+               write_option!(self.their_delayed_payment_basepoint);
+               write_option!(self.their_htlc_basepoint);
+               write_option!(self.their_cur_commitment_point);
+
+               write_option!(self.their_prev_commitment_point);
+               self.their_node_id.write(writer)?;
+
+               write_option!(self.their_shutdown_scriptpubkey);
+
+               self.channel_monitor.write_for_disk(writer)?;
+               Ok(())
+       }
+}
+
+impl<R : ::std::io::Read> ReadableArgs<R, Arc<Logger>> for Channel {
+       fn read(reader: &mut R, logger: Arc<Logger>) -> Result<Self, DecodeError> {
+               let _ver: u8 = Readable::read(reader)?;
+               let min_ver: u8 = Readable::read(reader)?;
+               if min_ver > SERIALIZATION_VERSION {
+                       return Err(DecodeError::UnknownVersion);
+               }
+
+               let user_id = Readable::read(reader)?;
+
+               let channel_id = Readable::read(reader)?;
+               let channel_state = Readable::read(reader)?;
+               let channel_outbound = Readable::read(reader)?;
+               let announce_publicly = Readable::read(reader)?;
+               let channel_value_satoshis = Readable::read(reader)?;
+
+               let local_keys = Readable::read(reader)?;
+               let shutdown_pubkey = Readable::read(reader)?;
+
+               let cur_local_commitment_transaction_number = Readable::read(reader)?;
+               let cur_remote_commitment_transaction_number = Readable::read(reader)?;
+               let value_to_self_msat = Readable::read(reader)?;
+
+               let received_commitment_while_awaiting_raa = Readable::read(reader)?;
+
+               let pending_inbound_htlc_count: u64 = Readable::read(reader)?;
+               let mut pending_inbound_htlcs = Vec::with_capacity(cmp::min(pending_inbound_htlc_count as usize, OUR_MAX_HTLCS as usize));
+               for _ in 0..pending_inbound_htlc_count {
+                       pending_inbound_htlcs.push(InboundHTLCOutput {
+                               htlc_id: Readable::read(reader)?,
+                               amount_msat: Readable::read(reader)?,
+                               cltv_expiry: Readable::read(reader)?,
+                               payment_hash: Readable::read(reader)?,
+                               state: match <u8 as Readable<R>>::read(reader)? {
+                                       1 => InboundHTLCState::AwaitingRemoteRevokeToAnnounce(Readable::read(reader)?),
+                                       2 => InboundHTLCState::AwaitingAnnouncedRemoteRevoke(Readable::read(reader)?),
+                                       3 => InboundHTLCState::Committed,
+                                       4 => InboundHTLCState::LocalRemoved(Readable::read(reader)?),
+                                       _ => return Err(DecodeError::InvalidValue),
+                               },
+                       });
+               }
+
+               macro_rules! read_option { () => {
+                       match <u8 as Readable<R>>::read(reader)? {
+                               0 => None,
+                               1 => Some(Readable::read(reader)?),
+                               _ => return Err(DecodeError::InvalidValue),
+                       }
+               } }
+
+               let pending_outbound_htlc_count: u64 = Readable::read(reader)?;
+               let mut pending_outbound_htlcs = Vec::with_capacity(cmp::min(pending_outbound_htlc_count as usize, OUR_MAX_HTLCS as usize));
+               for _ in 0..pending_outbound_htlc_count {
+                       pending_outbound_htlcs.push(OutboundHTLCOutput {
+                               htlc_id: Readable::read(reader)?,
+                               amount_msat: Readable::read(reader)?,
+                               cltv_expiry: Readable::read(reader)?,
+                               payment_hash: Readable::read(reader)?,
+                               source: Readable::read(reader)?,
+                               fail_reason: read_option!(),
+                               state: match <u8 as Readable<R>>::read(reader)? {
+                                       0 => OutboundHTLCState::LocalAnnounced(Box::new(Readable::read(reader)?)),
+                                       1 => OutboundHTLCState::Committed,
+                                       2 => OutboundHTLCState::RemoteRemoved,
+                                       3 => OutboundHTLCState::AwaitingRemoteRevokeToRemove,
+                                       4 => OutboundHTLCState::AwaitingRemovedRemoteRevoke,
+                                       _ => return Err(DecodeError::InvalidValue),
+                               },
+                       });
+               }
+
+               let holding_cell_htlc_update_count: u64 = Readable::read(reader)?;
+               let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2));
+               for _ in 0..holding_cell_htlc_update_count {
+                       holding_cell_htlc_updates.push(match <u8 as Readable<R>>::read(reader)? {
+                               0 => HTLCUpdateAwaitingACK::AddHTLC {
+                                       amount_msat: Readable::read(reader)?,
+                                       cltv_expiry: Readable::read(reader)?,
+                                       payment_hash: Readable::read(reader)?,
+                                       source: Readable::read(reader)?,
+                                       onion_routing_packet: Readable::read(reader)?,
+                                       time_created: Instant::now(),
+                               },
+                               1 => HTLCUpdateAwaitingACK::ClaimHTLC {
+                                       payment_preimage: Readable::read(reader)?,
+                                       htlc_id: Readable::read(reader)?,
+                               },
+                               2 => HTLCUpdateAwaitingACK::FailHTLC {
+                                       htlc_id: Readable::read(reader)?,
+                                       err_packet: Readable::read(reader)?,
+                               },
+                               _ => return Err(DecodeError::InvalidValue),
+                       });
+               }
+
+               let monitor_pending_revoke_and_ack = Readable::read(reader)?;
+               let monitor_pending_commitment_signed = Readable::read(reader)?;
+
+               let monitor_pending_order = match <u8 as Readable<R>>::read(reader)? {
+                       0 => None,
+                       1 => Some(RAACommitmentOrder::CommitmentFirst),
+                       2 => Some(RAACommitmentOrder::RevokeAndACKFirst),
+                       _ => return Err(DecodeError::InvalidValue),
+               };
+
+               let monitor_pending_forwards_count: u64 = Readable::read(reader)?;
+               let mut monitor_pending_forwards = Vec::with_capacity(cmp::min(monitor_pending_forwards_count as usize, OUR_MAX_HTLCS as usize));
+               for _ in 0..monitor_pending_forwards_count {
+                       monitor_pending_forwards.push((Readable::read(reader)?, Readable::read(reader)?));
+               }
+
+               let monitor_pending_failures_count: u64 = Readable::read(reader)?;
+               let mut monitor_pending_failures = Vec::with_capacity(cmp::min(monitor_pending_failures_count as usize, OUR_MAX_HTLCS as usize));
+               for _ in 0..monitor_pending_failures_count {
+                       monitor_pending_failures.push((Readable::read(reader)?, Readable::read(reader)?, Readable::read(reader)?));
+               }
+
+               let pending_update_fee = read_option!();
+               let holding_cell_update_fee = read_option!();
+
+               let next_local_htlc_id = Readable::read(reader)?;
+               let next_remote_htlc_id = Readable::read(reader)?;
+               let channel_update_count = Readable::read(reader)?;
+               let feerate_per_kw = Readable::read(reader)?;
+
+               let last_local_commitment_txn_count: u64 = Readable::read(reader)?;
+               let mut last_local_commitment_txn = Vec::with_capacity(cmp::min(last_local_commitment_txn_count as usize, OUR_MAX_HTLCS as usize*2 + 1));
+               for _ in 0..last_local_commitment_txn_count {
+                       last_local_commitment_txn.push(match Transaction::consensus_decode(&mut RawDecoder::new(reader.by_ref())) {
+                               Ok(tx) => tx,
+                               Err(_) => return Err(DecodeError::InvalidValue),
+                       });
+               }
+
+               let last_sent_closing_fee = match <u8 as Readable<R>>::read(reader)? {
+                       0 => None,
+                       1 => Some((Readable::read(reader)?, Readable::read(reader)?)),
+                       _ => return Err(DecodeError::InvalidValue),
+               };
+
+               let funding_tx_confirmed_in = read_option!();
+               let short_channel_id = read_option!();
+
+               let last_block_connected = Readable::read(reader)?;
+               let funding_tx_confirmations = Readable::read(reader)?;
+
+               let their_dust_limit_satoshis = Readable::read(reader)?;
+               let our_dust_limit_satoshis = Readable::read(reader)?;
+               let their_max_htlc_value_in_flight_msat = Readable::read(reader)?;
+               let their_channel_reserve_satoshis = Readable::read(reader)?;
+               let their_htlc_minimum_msat = Readable::read(reader)?;
+               let our_htlc_minimum_msat = Readable::read(reader)?;
+               let their_to_self_delay = Readable::read(reader)?;
+               let their_max_accepted_htlcs = Readable::read(reader)?;
+
+               let their_funding_pubkey = read_option!();
+               let their_revocation_basepoint = read_option!();
+               let their_payment_basepoint = read_option!();
+               let their_delayed_payment_basepoint = read_option!();
+               let their_htlc_basepoint = read_option!();
+               let their_cur_commitment_point = read_option!();
+
+               let their_prev_commitment_point = read_option!();
+               let their_node_id = Readable::read(reader)?;
+
+               let their_shutdown_scriptpubkey = read_option!();
+               let (monitor_last_block, channel_monitor) = ReadableArgs::read(reader, logger.clone())?;
+               // We drop the ChannelMonitor's last block connected hash cause we don't actually bother
+               // doing full block connection operations on the internal CHannelMonitor copies
+               if monitor_last_block != last_block_connected {
+                       return Err(DecodeError::InvalidValue);
+               }
+
+               Ok(Channel {
+                       user_id,
+
+                       channel_id,
+                       channel_state,
+                       channel_outbound,
+                       secp_ctx: Secp256k1::new(),
+                       announce_publicly,
+                       channel_value_satoshis,
+
+                       local_keys,
+                       shutdown_pubkey,
+
+                       cur_local_commitment_transaction_number,
+                       cur_remote_commitment_transaction_number,
+                       value_to_self_msat,
+
+                       received_commitment_while_awaiting_raa,
+                       pending_inbound_htlcs,
+                       pending_outbound_htlcs,
+                       holding_cell_htlc_updates,
+
+                       monitor_pending_revoke_and_ack,
+                       monitor_pending_commitment_signed,
+                       monitor_pending_order,
+                       monitor_pending_forwards,
+                       monitor_pending_failures,
+
+                       pending_update_fee,
+                       holding_cell_update_fee,
+                       next_local_htlc_id,
+                       next_remote_htlc_id,
+                       channel_update_count,
+                       feerate_per_kw,
+
+                       #[cfg(debug_assertions)]
+                       max_commitment_tx_output_local: ::std::sync::Mutex::new((0, 0)),
+                       #[cfg(debug_assertions)]
+                       max_commitment_tx_output_remote: ::std::sync::Mutex::new((0, 0)),
+
+                       last_local_commitment_txn,
+
+                       last_sent_closing_fee,
+
+                       funding_tx_confirmed_in,
+                       short_channel_id,
+                       last_block_connected,
+                       funding_tx_confirmations,
+
+                       their_dust_limit_satoshis,
+                       our_dust_limit_satoshis,
+                       their_max_htlc_value_in_flight_msat,
+                       their_channel_reserve_satoshis,
+                       their_htlc_minimum_msat,
+                       our_htlc_minimum_msat,
+                       their_to_self_delay,
+                       their_max_accepted_htlcs,
+
+                       their_funding_pubkey,
+                       their_revocation_basepoint,
+                       their_payment_basepoint,
+                       their_delayed_payment_basepoint,
+                       their_htlc_basepoint,
+                       their_cur_commitment_point,
+
+                       their_prev_commitment_point,
+                       their_node_id,
+
+                       their_shutdown_scriptpubkey,
+
+                       channel_monitor,
+
+                       logger,
+               })
+       }
+}
+
 #[cfg(test)]
 mod tests {
        use bitcoin::util::hash::{Sha256dHash, Hash160};
index ae9669c2d81c60f636bb4781d070b8951ab6b8ec..6230225ba8cccc705599a0c301da67ddb5b140cc 100644 (file)
@@ -23,14 +23,14 @@ use secp256k1;
 use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
-use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
+use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
 use ln::router::{Route,RouteHop};
 use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, HandleError};
+use ln::msgs::{ChannelMessageHandler, DecodeError, HandleError};
 use chain::keysinterface::KeysInterface;
 use util::{byte_utils, events, internal_traits, rng};
 use util::sha2::Sha256;
-use util::ser::{Readable, Writeable};
+use util::ser::{Readable, ReadableArgs, Writeable, Writer};
 use util::chacha20poly1305rfc::ChaCha20;
 use util::logger::Logger;
 use util::errors::APIError;
@@ -41,11 +41,10 @@ use crypto::hmac::Hmac;
 use crypto::digest::Digest;
 use crypto::symmetriccipher::SynchronousStreamCipher;
 
-use std::{ptr, mem};
-use std::collections::HashMap;
-use std::collections::hash_map;
+use std::{cmp, ptr, mem};
+use std::collections::{HashMap, hash_map, HashSet};
 use std::io::Cursor;
-use std::sync::{Mutex,MutexGuard,Arc};
+use std::sync::{Arc, Mutex, MutexGuard, RwLock};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::time::{Instant,Duration};
 
@@ -65,13 +64,12 @@ mod channel_held_info {
        use ln::msgs;
        use ln::router::Route;
        use secp256k1::key::SecretKey;
-       use secp256k1::ecdh::SharedSecret;
 
        /// Stores the info we will need to send when we want to forward an HTLC onwards
        #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug
        pub struct PendingForwardHTLCInfo {
                pub(super) onion_packet: Option<msgs::OnionPacket>,
-               pub(super) incoming_shared_secret: SharedSecret,
+               pub(super) incoming_shared_secret: [u8; 32],
                pub(super) payment_hash: [u8; 32],
                pub(super) short_channel_id: u64,
                pub(super) amt_to_forward: u64,
@@ -96,7 +94,7 @@ mod channel_held_info {
        pub struct HTLCPreviousHopData {
                pub(super) short_channel_id: u64,
                pub(super) htlc_id: u64,
-               pub(super) incoming_packet_shared_secret: SharedSecret,
+               pub(super) incoming_packet_shared_secret: [u8; 32],
        }
 
        /// Tracks the inbound corresponding to an outbound HTLC
@@ -302,6 +300,25 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum
 ///
 /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through
 /// to individual Channels.
+///
+/// Implements Writeable to write out all channel state to disk. Implies peer_disconnected() for
+/// all peers during write/read (though does not modify this instance, only the instance being
+/// serialized). This will result in any channels which have not yet exchanged funding_created (ie
+/// called funding_transaction_generated for outbound channels).
+///
+/// Note that you can be a bit lazier about writing out ChannelManager than you can be with
+/// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before
+/// returning from ManyChannelMonitor::add_update_monitor, with ChannelManagers, writing updates
+/// happens out-of-band (and will prevent any other ChannelManager operations from occurring during
+/// the serialization process). If the deserialized version is out-of-date compared to the
+/// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the
+/// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees).
+///
+/// Note that the deserializer is only implemented for (Sha256dHash, ChannelManager), which
+/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
+/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call
+/// block_connected() to step towards your best block) upon deserialization before using the
+/// object!
 pub struct ChannelManager {
        genesis_hash: Sha256dHash,
        fee_estimator: Arc<FeeEstimator>,
@@ -312,12 +329,17 @@ pub struct ChannelManager {
        announce_channels_publicly: bool,
        fee_proportional_millionths: u32,
        latest_block_height: AtomicUsize,
+       last_block_hash: Mutex<Sha256dHash>,
        secp_ctx: Secp256k1<secp256k1::All>,
 
        channel_state: Mutex<ChannelHolder>,
        our_network_key: SecretKey,
 
        pending_events: Mutex<Vec<events::Event>>,
+       /// Used when we have to take a BIG lock to make sure everything is self-consistent.
+       /// Essentially just when we're serializing ourselves out.
+       /// Taken first everywhere where we are making changes before any other locks.
+       total_consistency_lock: RwLock<()>,
 
        keys_manager: Arc<KeysInterface>,
 
@@ -405,7 +427,8 @@ impl ChannelManager {
 
                        announce_channels_publicly,
                        fee_proportional_millionths,
-                       latest_block_height: AtomicUsize::new(0), //TODO: Get an init value (generally need to replay recent chain on chain_monitor registration)
+                       latest_block_height: AtomicUsize::new(0), //TODO: Get an init value
+                       last_block_hash: Mutex::new(Default::default()),
                        secp_ctx,
 
                        channel_state: Mutex::new(ChannelHolder{
@@ -419,6 +442,7 @@ impl ChannelManager {
                        our_network_key: keys_manager.get_node_secret(),
 
                        pending_events: Mutex::new(Vec::new()),
+                       total_consistency_lock: RwLock::new(()),
 
                        keys_manager,
 
@@ -443,6 +467,8 @@ impl ChannelManager {
        pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> {
                let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?;
                let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator);
+
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut channel_state = self.channel_state.lock().unwrap();
                match channel_state.by_id.entry(channel.channel_id()) {
                        hash_map::Entry::Occupied(_) => {
@@ -506,6 +532,8 @@ impl ChannelManager {
        ///
        /// May generate a SendShutdown message event on success, which should be relayed.
        pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let (mut failed_htlcs, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -568,6 +596,8 @@ impl ChannelManager {
        /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
        /// the chain and rejecting new HTLCs on the given channel.
        pub fn force_close_channel(&self, channel_id: &[u8; 32]) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let mut chan = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -625,7 +655,8 @@ impl ChannelManager {
        }
 
        #[inline]
-       fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
+       fn gen_rho_mu_from_shared_secret(shared_secret: &[u8]) -> ([u8; 32], [u8; 32]) {
+               assert_eq!(shared_secret.len(), 32);
                ({
                        let mut hmac = Hmac::new(Sha256::new(), &[0x72, 0x68, 0x6f]); // rho
                        hmac.input(&shared_secret[..]);
@@ -643,7 +674,8 @@ impl ChannelManager {
        }
 
        #[inline]
-       fn gen_um_from_shared_secret(shared_secret: &SharedSecret) -> [u8; 32] {
+       fn gen_um_from_shared_secret(shared_secret: &[u8]) -> [u8; 32] {
+               assert_eq!(shared_secret.len(), 32);
                let mut hmac = Hmac::new(Sha256::new(), &[0x75, 0x6d]); // um
                hmac.input(&shared_secret[..]);
                let mut res = [0; 32];
@@ -652,7 +684,8 @@ impl ChannelManager {
        }
 
        #[inline]
-       fn gen_ammag_from_shared_secret(shared_secret: &SharedSecret) -> [u8; 32] {
+       fn gen_ammag_from_shared_secret(shared_secret: &[u8]) -> [u8; 32] {
+               assert_eq!(shared_secret.len(), 32);
                let mut hmac = Hmac::new(Sha256::new(), &[0x61, 0x6d, 0x6d, 0x61, 0x67]); // ammag
                hmac.input(&shared_secret[..]);
                let mut res = [0; 32];
@@ -691,7 +724,7 @@ impl ChannelManager {
                let mut res = Vec::with_capacity(route.hops.len());
 
                Self::construct_onion_keys_callback(secp_ctx, route, session_priv, |shared_secret, _blinding_factor, ephemeral_pubkey, _| {
-                       let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret);
+                       let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret[..]);
 
                        res.push(OnionKeys {
                                #[cfg(test)]
@@ -815,7 +848,7 @@ impl ChannelManager {
 
        /// Encrypts a failure packet. raw_packet can either be a
        /// msgs::DecodedOnionErrorPacket.encode() result or a msgs::OnionErrorPacket.data element.
-       fn encrypt_failure_packet(shared_secret: &SharedSecret, raw_packet: &[u8]) -> msgs::OnionErrorPacket {
+       fn encrypt_failure_packet(shared_secret: &[u8], raw_packet: &[u8]) -> msgs::OnionErrorPacket {
                let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret);
 
                let mut packet_crypted = Vec::with_capacity(raw_packet.len());
@@ -827,7 +860,8 @@ impl ChannelManager {
                }
        }
 
-       fn build_failure_packet(shared_secret: &SharedSecret, failure_type: u16, failure_data: &[u8]) -> msgs::DecodedOnionErrorPacket {
+       fn build_failure_packet(shared_secret: &[u8], failure_type: u16, failure_data: &[u8]) -> msgs::DecodedOnionErrorPacket {
+               assert_eq!(shared_secret.len(), 32);
                assert!(failure_data.len() <= 256 - 2);
 
                let um = ChannelManager::gen_um_from_shared_secret(&shared_secret);
@@ -858,7 +892,7 @@ impl ChannelManager {
        }
 
        #[inline]
-       fn build_first_hop_failure_packet(shared_secret: &SharedSecret, failure_type: u16, failure_data: &[u8]) -> msgs::OnionErrorPacket {
+       fn build_first_hop_failure_packet(shared_secret: &[u8], failure_type: u16, failure_data: &[u8]) -> msgs::OnionErrorPacket {
                let failure_packet = ChannelManager::build_failure_packet(shared_secret, failure_type, failure_data);
                ChannelManager::encrypt_failure_packet(shared_secret, &failure_packet.encode()[..])
        }
@@ -886,7 +920,11 @@ impl ChannelManager {
                        })), self.channel_state.lock().unwrap());
                }
 
-               let shared_secret = SharedSecret::new(&self.secp_ctx, &msg.onion_routing_packet.public_key.unwrap(), &self.our_network_key);
+               let shared_secret = {
+                       let mut arr = [0; 32];
+                       arr.copy_from_slice(&SharedSecret::new(&self.secp_ctx, &msg.onion_routing_packet.public_key.unwrap(), &self.our_network_key)[..]);
+                       arr
+               };
                let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret);
 
                let mut channel_state = None;
@@ -963,7 +1001,7 @@ impl ChannelManager {
                                        onion_packet: None,
                                        payment_hash: msg.payment_hash.clone(),
                                        short_channel_id: 0,
-                                       incoming_shared_secret: shared_secret.clone(),
+                                       incoming_shared_secret: shared_secret,
                                        amt_to_forward: next_hop_data.data.amt_to_forward,
                                        outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value,
                                })
@@ -977,7 +1015,7 @@ impl ChannelManager {
                                let blinding_factor = {
                                        let mut sha = Sha256::new();
                                        sha.input(&new_pubkey.serialize()[..]);
-                                       sha.input(&shared_secret[..]);
+                                       sha.input(&shared_secret);
                                        let mut res = [0u8; 32];
                                        sha.result(&mut res);
                                        match SecretKey::from_slice(&self.secp_ctx, &res) {
@@ -1003,7 +1041,7 @@ impl ChannelManager {
                                        onion_packet: Some(outgoing_packet),
                                        payment_hash: msg.payment_hash.clone(),
                                        short_channel_id: next_hop_data.data.short_channel_id,
-                                       incoming_shared_secret: shared_secret.clone(),
+                                       incoming_shared_secret: shared_secret,
                                        amt_to_forward: next_hop_data.data.amt_to_forward,
                                        outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value,
                                })
@@ -1140,6 +1178,7 @@ impl ChannelManager {
                let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
                let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
 
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut channel_state = self.channel_state.lock().unwrap();
 
                let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
@@ -1196,6 +1235,8 @@ impl ChannelManager {
        /// May panic if the funding_txo is duplicative with some other channel (note that this should
        /// be trivially prevented by using unique funding transaction keys per-channel).
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let (chan, msg, chan_monitor) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.remove(temporary_channel_id) {
@@ -1261,6 +1302,8 @@ impl ChannelManager {
        /// Should only really ever be called in response to an PendingHTLCsForwardable event.
        /// Will likely generate further events.
        pub fn process_pending_htlc_forwards(&self) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let mut new_events = Vec::new();
                let mut failed_forwards = Vec::new();
                {
@@ -1382,6 +1425,8 @@ impl ChannelManager {
 
        /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect after a PaymentReceived event.
        pub fn fail_htlc_backwards(&self, payment_hash: &[u8; 32], reason: PaymentFailReason) -> bool {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash);
                if let Some(mut sources) = removed_source {
@@ -1477,6 +1522,8 @@ impl ChannelManager {
                let mut payment_hash = [0; 32];
                sha.result(&mut payment_hash);
 
+               let _ = self.total_consistency_lock.read().unwrap();
+
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash);
                if let Some(mut sources) = removed_source {
@@ -1555,6 +1602,7 @@ impl ChannelManager {
                let mut close_results = Vec::new();
                let mut htlc_forwards = Vec::new();
                let mut htlc_failures = Vec::new();
+               let _ = self.total_consistency_lock.read().unwrap();
 
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1938,7 +1986,7 @@ impl ChannelManager {
                                let amt_to_forward = htlc_msat - route_hop.fee_msat;
                                htlc_msat = amt_to_forward;
 
-                               let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret);
+                               let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret[..]);
 
                                let mut decryption_tmp = Vec::with_capacity(packet_decrypted.len());
                                decryption_tmp.resize(packet_decrypted.len(), 0);
@@ -1949,7 +1997,7 @@ impl ChannelManager {
                                let is_from_final_node = route.hops.last().unwrap().pubkey == route_hop.pubkey;
 
                                if let Ok(err_packet) = msgs::DecodedOnionErrorPacket::read(&mut Cursor::new(&packet_decrypted)) {
-                                       let um = ChannelManager::gen_um_from_shared_secret(&shared_secret);
+                                       let um = ChannelManager::gen_um_from_shared_secret(&shared_secret[..]);
                                        let mut hmac = Hmac::new(Sha256::new(), &um);
                                        hmac.input(&err_packet.encode()[32..]);
                                        let mut calc_tag = [0u8; 32];
@@ -2359,6 +2407,7 @@ impl ChannelManager {
        /// Note: This API is likely to change!
        #[doc(hidden)]
        pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut channel_state_lock = self.channel_state.lock().unwrap();
                let channel_state = channel_state_lock.borrow_parts();
 
@@ -2416,6 +2465,7 @@ impl events::EventsProvider for ChannelManager {
 
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2489,10 +2539,12 @@ impl ChainListener for ChannelManager {
                        self.finish_force_close_channel(failure);
                }
                self.latest_block_height.store(height as usize, Ordering::Release);
+               *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash();
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2520,6 +2572,7 @@ impl ChainListener for ChannelManager {
                        self.finish_force_close_channel(failure);
                }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
+               *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash();
        }
 }
 
@@ -2558,70 +2611,87 @@ macro_rules! handle_error {
 impl ChannelMessageHandler for ChannelManager {
        //TODO: Handle errors and close channel (or so)
        fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id)
        }
 
        fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id)
        }
 
        fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id)
        }
 
        fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id)
        }
 
        fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
        }
 
        fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id)
        }
 
        fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id)
        }
 
        fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), msgs::HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), their_node_id)
        }
 
        fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), their_node_id)
        }
 
        fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), their_node_id)
        }
 
        fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), their_node_id)
        }
 
        fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_commitment_signed(their_node_id, msg), their_node_id)
        }
 
        fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), their_node_id)
        }
 
        fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_update_fee(their_node_id, msg), their_node_id)
        }
 
        fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id)
        }
 
        fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), HandleError> {
+               let _ = self.total_consistency_lock.read().unwrap();
                handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), their_node_id)
        }
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut failed_channels = Vec::new();
                let mut failed_payments = Vec::new();
                {
@@ -2677,6 +2747,7 @@ impl ChannelMessageHandler for ChannelManager {
        }
 
        fn peer_connected(&self, their_node_id: &PublicKey) {
+               let _ = self.total_consistency_lock.read().unwrap();
                let mut channel_state_lock = self.channel_state.lock().unwrap();
                let channel_state = channel_state_lock.borrow_parts();
                let pending_msg_events = channel_state.pending_msg_events;
@@ -2701,6 +2772,8 @@ impl ChannelMessageHandler for ChannelManager {
        }
 
        fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
                if msg.channel_id == [0; 32] {
                        for chan in self.list_channels() {
                                if chan.remote_network_id == *their_node_id {
@@ -2713,6 +2786,391 @@ impl ChannelMessageHandler for ChannelManager {
        }
 }
 
+const SERIALIZATION_VERSION: u8 = 1;
+const MIN_SERIALIZATION_VERSION: u8 = 1;
+
+impl Writeable for PendingForwardHTLCInfo {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               if let &Some(ref onion) = &self.onion_packet {
+                       1u8.write(writer)?;
+                       onion.write(writer)?;
+               } else {
+                       0u8.write(writer)?;
+               }
+               self.incoming_shared_secret.write(writer)?;
+               self.payment_hash.write(writer)?;
+               self.short_channel_id.write(writer)?;
+               self.amt_to_forward.write(writer)?;
+               self.outgoing_cltv_value.write(writer)?;
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for PendingForwardHTLCInfo {
+       fn read(reader: &mut R) -> Result<PendingForwardHTLCInfo, DecodeError> {
+               let onion_packet = match <u8 as Readable<R>>::read(reader)? {
+                       0 => None,
+                       1 => Some(msgs::OnionPacket::read(reader)?),
+                       _ => return Err(DecodeError::InvalidValue),
+               };
+               Ok(PendingForwardHTLCInfo {
+                       onion_packet,
+                       incoming_shared_secret: Readable::read(reader)?,
+                       payment_hash: Readable::read(reader)?,
+                       short_channel_id: Readable::read(reader)?,
+                       amt_to_forward: Readable::read(reader)?,
+                       outgoing_cltv_value: Readable::read(reader)?,
+               })
+       }
+}
+
+impl Writeable for HTLCFailureMsg {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &HTLCFailureMsg::Relay(ref fail_msg) => {
+                               0u8.write(writer)?;
+                               fail_msg.write(writer)?;
+                       },
+                       &HTLCFailureMsg::Malformed(ref fail_msg) => {
+                               1u8.write(writer)?;
+                               fail_msg.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for HTLCFailureMsg {
+       fn read(reader: &mut R) -> Result<HTLCFailureMsg, DecodeError> {
+               match <u8 as Readable<R>>::read(reader)? {
+                       0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)),
+                       1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)),
+                       _ => Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
+impl Writeable for PendingHTLCStatus {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &PendingHTLCStatus::Forward(ref forward_info) => {
+                               0u8.write(writer)?;
+                               forward_info.write(writer)?;
+                       },
+                       &PendingHTLCStatus::Fail(ref fail_msg) => {
+                               1u8.write(writer)?;
+                               fail_msg.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for PendingHTLCStatus {
+       fn read(reader: &mut R) -> Result<PendingHTLCStatus, DecodeError> {
+               match <u8 as Readable<R>>::read(reader)? {
+                       0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)),
+                       1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)),
+                       _ => Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
+impl_writeable!(HTLCPreviousHopData, 0, {
+       short_channel_id,
+       htlc_id,
+       incoming_packet_shared_secret
+});
+
+impl Writeable for HTLCSource {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &HTLCSource::PreviousHopData(ref hop_data) => {
+                               0u8.write(writer)?;
+                               hop_data.write(writer)?;
+                       },
+                       &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => {
+                               1u8.write(writer)?;
+                               route.write(writer)?;
+                               session_priv.write(writer)?;
+                               first_hop_htlc_msat.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for HTLCSource {
+       fn read(reader: &mut R) -> Result<HTLCSource, DecodeError> {
+               match <u8 as Readable<R>>::read(reader)? {
+                       0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
+                       1 => Ok(HTLCSource::OutboundRoute {
+                               route: Readable::read(reader)?,
+                               session_priv: Readable::read(reader)?,
+                               first_hop_htlc_msat: Readable::read(reader)?,
+                       }),
+                       _ => Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
+impl Writeable for HTLCFailReason {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &HTLCFailReason::ErrorPacket { ref err } => {
+                               0u8.write(writer)?;
+                               err.write(writer)?;
+                       },
+                       &HTLCFailReason::Reason { ref failure_code, ref data } => {
+                               1u8.write(writer)?;
+                               failure_code.write(writer)?;
+                               data.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for HTLCFailReason {
+       fn read(reader: &mut R) -> Result<HTLCFailReason, DecodeError> {
+               match <u8 as Readable<R>>::read(reader)? {
+                       0 => Ok(HTLCFailReason::ErrorPacket { err: Readable::read(reader)? }),
+                       1 => Ok(HTLCFailReason::Reason {
+                               failure_code: Readable::read(reader)?,
+                               data: Readable::read(reader)?,
+                       }),
+                       _ => Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
+impl_writeable!(HTLCForwardInfo, 0, {
+       prev_short_channel_id,
+       prev_htlc_id,
+       forward_info
+});
+
+impl Writeable for ChannelManager {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               let _ = self.total_consistency_lock.write().unwrap();
+
+               writer.write_all(&[SERIALIZATION_VERSION; 1])?;
+               writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
+
+               self.genesis_hash.write(writer)?;
+               self.announce_channels_publicly.write(writer)?;
+               self.fee_proportional_millionths.write(writer)?;
+               (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?;
+               self.last_block_hash.lock().unwrap().write(writer)?;
+
+               let channel_state = self.channel_state.lock().unwrap();
+               let mut unfunded_channels = 0;
+               for (_, channel) in channel_state.by_id.iter() {
+                       if !channel.is_funding_initiated() {
+                               unfunded_channels += 1;
+                       }
+               }
+               ((channel_state.by_id.len() - unfunded_channels) as u64).write(writer)?;
+               for (_, channel) in channel_state.by_id.iter() {
+                       if channel.is_funding_initiated() {
+                               channel.write(writer)?;
+                       }
+               }
+
+               (channel_state.forward_htlcs.len() as u64).write(writer)?;
+               for (short_channel_id, pending_forwards) in channel_state.forward_htlcs.iter() {
+                       short_channel_id.write(writer)?;
+                       (pending_forwards.len() as u64).write(writer)?;
+                       for forward in pending_forwards {
+                               forward.write(writer)?;
+                       }
+               }
+
+               (channel_state.claimable_htlcs.len() as u64).write(writer)?;
+               for (payment_hash, previous_hops) in channel_state.claimable_htlcs.iter() {
+                       payment_hash.write(writer)?;
+                       (previous_hops.len() as u64).write(writer)?;
+                       for previous_hop in previous_hops {
+                               previous_hop.write(writer)?;
+                       }
+               }
+
+               Ok(())
+       }
+}
+
+/// Arguments for the creation of a ChannelManager that are not deserialized.
+///
+/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
+/// is:
+/// 1) Deserialize all stored ChannelMonitors.
+/// 2) Deserialize the ChannelManager by filling in this struct and calling <(Sha256dHash,
+///    ChannelManager)>::read(reader, args).
+///    This may result in closing some Channels if the ChannelMonitor is newer than the stored
+///    ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
+/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using
+///    ChannelMonitor::get_monitored_outpoints and ChannelMonitor::get_funding_txo().
+/// 4) Reconnect blocks on your ChannelMonitors.
+/// 5) Move the ChannelMonitors into your local ManyChannelMonitor.
+/// 6) Disconnect/connect blocks on the ChannelManager.
+/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen
+///    automatically as it does in ChannelManager::new()).
+pub struct ChannelManagerReadArgs<'a> {
+       /// The keys provider which will give us relevant keys. Some keys will be loaded during
+       /// deserialization.
+       pub keys_manager: Arc<KeysInterface>,
+
+       /// The fee_estimator for use in the ChannelManager in the future.
+       ///
+       /// No calls to the FeeEstimator will be made during deserialization.
+       pub fee_estimator: Arc<FeeEstimator>,
+       /// The ManyChannelMonitor for use in the ChannelManager in the future.
+       ///
+       /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that
+       /// you have deserialized ChannelMonitors separately and will add them to your
+       /// ManyChannelMonitor after deserializing this ChannelManager.
+       pub monitor: Arc<ManyChannelMonitor>,
+       /// The ChainWatchInterface for use in the ChannelManager in the future.
+       ///
+       /// No calls to the ChainWatchInterface will be made during deserialization.
+       pub chain_monitor: Arc<ChainWatchInterface>,
+       /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be
+       /// used to broadcast the latest local commitment transactions of channels which must be
+       /// force-closed during deserialization.
+       pub tx_broadcaster: Arc<BroadcasterInterface>,
+       /// The Logger for use in the ChannelManager and which may be used to log information during
+       /// deserialization.
+       pub logger: Arc<Logger>,
+
+
+       /// A map from channel funding outpoints to ChannelMonitors for those channels (ie
+       /// value.get_funding_txo() should be the key).
+       ///
+       /// If a monitor is inconsistent with the channel state during deserialization the channel will
+       /// be force-closed using the data in the channelmonitor and the Channel will be dropped. This
+       /// is true for missing channels as well. If there is a monitor missing for which we find
+       /// channel data Err(DecodeError::InvalidValue) will be returned.
+       ///
+       /// In such cases the latest local transactions will be sent to the tx_broadcaster included in
+       /// this struct.
+       pub channel_monitors: &'a HashMap<OutPoint, &'a ChannelMonitor>,
+}
+
+impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (Sha256dHash, ChannelManager) {
+       fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result<Self, DecodeError> {
+               let _ver: u8 = Readable::read(reader)?;
+               let min_ver: u8 = Readable::read(reader)?;
+               if min_ver > SERIALIZATION_VERSION {
+                       return Err(DecodeError::UnknownVersion);
+               }
+
+               let genesis_hash: Sha256dHash = Readable::read(reader)?;
+               let announce_channels_publicly: bool = Readable::read(reader)?;
+               let fee_proportional_millionths: u32 = Readable::read(reader)?;
+               let latest_block_height: u32 = Readable::read(reader)?;
+               let last_block_hash: Sha256dHash = Readable::read(reader)?;
+
+               let mut closed_channels = Vec::new();
+
+               let channel_count: u64 = Readable::read(reader)?;
+               let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
+               let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
+               let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
+               for _ in 0..channel_count {
+                       let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?;
+                       if channel.last_block_connected != last_block_hash {
+                               return Err(DecodeError::InvalidValue);
+                       }
+
+                       let funding_txo = channel.channel_monitor().get_funding_txo().ok_or(DecodeError::InvalidValue)?;
+                       funding_txo_set.insert(funding_txo.clone());
+                       if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
+                               if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() {
+                                       let mut force_close_res = channel.force_shutdown();
+                                       force_close_res.0 = monitor.get_latest_local_commitment_txn();
+                                       closed_channels.push(force_close_res);
+                               } else {
+                                       if let Some(short_channel_id) = channel.get_short_channel_id() {
+                                               short_to_id.insert(short_channel_id, channel.channel_id());
+                                       }
+                                       by_id.insert(channel.channel_id(), channel);
+                               }
+                       } else {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
+               for (ref funding_txo, ref monitor) in args.channel_monitors.iter() {
+                       if !funding_txo_set.contains(funding_txo) {
+                               closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new()));
+                       }
+               }
+
+               let forward_htlcs_count: u64 = Readable::read(reader)?;
+               let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
+               for _ in 0..forward_htlcs_count {
+                       let short_channel_id = Readable::read(reader)?;
+                       let pending_forwards_count: u64 = Readable::read(reader)?;
+                       let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128));
+                       for _ in 0..pending_forwards_count {
+                               pending_forwards.push(Readable::read(reader)?);
+                       }
+                       forward_htlcs.insert(short_channel_id, pending_forwards);
+               }
+
+               let claimable_htlcs_count: u64 = Readable::read(reader)?;
+               let mut claimable_htlcs = HashMap::with_capacity(cmp::min(claimable_htlcs_count as usize, 128));
+               for _ in 0..claimable_htlcs_count {
+                       let payment_hash = Readable::read(reader)?;
+                       let previous_hops_len: u64 = Readable::read(reader)?;
+                       let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2));
+                       for _ in 0..previous_hops_len {
+                               previous_hops.push(Readable::read(reader)?);
+                       }
+                       claimable_htlcs.insert(payment_hash, previous_hops);
+               }
+
+               let channel_manager = ChannelManager {
+                       genesis_hash,
+                       fee_estimator: args.fee_estimator,
+                       monitor: args.monitor,
+                       chain_monitor: args.chain_monitor,
+                       tx_broadcaster: args.tx_broadcaster,
+
+                       announce_channels_publicly,
+                       fee_proportional_millionths,
+                       latest_block_height: AtomicUsize::new(latest_block_height as usize),
+                       last_block_hash: Mutex::new(last_block_hash),
+                       secp_ctx: Secp256k1::new(),
+
+                       channel_state: Mutex::new(ChannelHolder {
+                               by_id,
+                               short_to_id,
+                               next_forward: Instant::now(),
+                               forward_htlcs,
+                               claimable_htlcs,
+                               pending_msg_events: Vec::new(),
+                       }),
+                       our_network_key: args.keys_manager.get_node_secret(),
+
+                       pending_events: Mutex::new(Vec::new()),
+                       total_consistency_lock: RwLock::new(()),
+                       keys_manager: args.keys_manager,
+                       logger: args.logger,
+               };
+
+               for close_res in closed_channels.drain(..) {
+                       channel_manager.finish_force_close_channel(close_res);
+                       //TODO: Broadcast channel update for closed channels, but only after we've made a
+                       //connection or two.
+               }
+
+               Ok((last_block_hash.clone(), channel_manager))
+       }
+}
+
 #[cfg(test)]
 mod tests {
        use chain::chaininterface;
@@ -2720,8 +3178,8 @@ mod tests {
        use chain::chaininterface::ChainListener;
        use chain::keysinterface::KeysInterface;
        use chain::keysinterface;
-       use ln::channelmanager::{ChannelManager,OnionKeys,PaymentFailReason,RAACommitmentOrder};
-       use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
+       use ln::channelmanager::{ChannelManager,ChannelManagerReadArgs,OnionKeys,PaymentFailReason,RAACommitmentOrder};
+       use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS, ManyChannelMonitor};
        use ln::router::{Route, RouteHop, Router};
        use ln::msgs;
        use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
@@ -2729,7 +3187,7 @@ mod tests {
        use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
        use util::errors::APIError;
        use util::logger::Logger;
-       use util::ser::Writeable;
+       use util::ser::{Writeable, Writer, ReadableArgs};
 
        use bitcoin::util::hash::Sha256dHash;
        use bitcoin::blockdata::block::{Block, BlockHeader};
@@ -2889,22 +3347,22 @@ mod tests {
                // Returning Errors test vectors from BOLT 4
 
                let onion_keys = build_test_onion_keys();
-               let onion_error = ChannelManager::build_failure_packet(&onion_keys[4].shared_secret, 0x2002, &[0; 0]);
+               let onion_error = ChannelManager::build_failure_packet(&onion_keys[4].shared_secret[..], 0x2002, &[0; 0]);
                assert_eq!(onion_error.encode(), hex::decode("4c2fc8bc08510334b6833ad9c3e79cd1b52ae59dfe5c2a4b23ead50f09f7ee0b0002200200fe0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000").unwrap());
 
-               let onion_packet_1 = ChannelManager::encrypt_failure_packet(&onion_keys[4].shared_secret, &onion_error.encode()[..]);
+               let onion_packet_1 = ChannelManager::encrypt_failure_packet(&onion_keys[4].shared_secret[..], &onion_error.encode()[..]);
                assert_eq!(onion_packet_1.data, hex::decode("a5e6bd0c74cb347f10cce367f949098f2457d14c046fd8a22cb96efb30b0fdcda8cb9168b50f2fd45edd73c1b0c8b33002df376801ff58aaa94000bf8a86f92620f343baef38a580102395ae3abf9128d1047a0736ff9b83d456740ebbb4aeb3aa9737f18fb4afb4aa074fb26c4d702f42968888550a3bded8c05247e045b866baef0499f079fdaeef6538f31d44deafffdfd3afa2fb4ca9082b8f1c465371a9894dd8c243fb4847e004f5256b3e90e2edde4c9fb3082ddfe4d1e734cacd96ef0706bf63c9984e22dc98851bcccd1c3494351feb458c9c6af41c0044bea3c47552b1d992ae542b17a2d0bba1a096c78d169034ecb55b6e3a7263c26017f033031228833c1daefc0dedb8cf7c3e37c9c37ebfe42f3225c326e8bcfd338804c145b16e34e4").unwrap());
 
-               let onion_packet_2 = ChannelManager::encrypt_failure_packet(&onion_keys[3].shared_secret, &onion_packet_1.data[..]);
+               let onion_packet_2 = ChannelManager::encrypt_failure_packet(&onion_keys[3].shared_secret[..], &onion_packet_1.data[..]);
                assert_eq!(onion_packet_2.data, hex::decode("c49a1ce81680f78f5f2000cda36268de34a3f0a0662f55b4e837c83a8773c22aa081bab1616a0011585323930fa5b9fae0c85770a2279ff59ec427ad1bbff9001c0cd1497004bd2a0f68b50704cf6d6a4bf3c8b6a0833399a24b3456961ba00736785112594f65b6b2d44d9f5ea4e49b5e1ec2af978cbe31c67114440ac51a62081df0ed46d4a3df295da0b0fe25c0115019f03f15ec86fabb4c852f83449e812f141a9395b3f70b766ebbd4ec2fae2b6955bd8f32684c15abfe8fd3a6261e52650e8807a92158d9f1463261a925e4bfba44bd20b166d532f0017185c3a6ac7957adefe45559e3072c8dc35abeba835a8cb01a71a15c736911126f27d46a36168ca5ef7dccd4e2886212602b181463e0dd30185c96348f9743a02aca8ec27c0b90dca270").unwrap());
 
-               let onion_packet_3 = ChannelManager::encrypt_failure_packet(&onion_keys[2].shared_secret, &onion_packet_2.data[..]);
+               let onion_packet_3 = ChannelManager::encrypt_failure_packet(&onion_keys[2].shared_secret[..], &onion_packet_2.data[..]);
                assert_eq!(onion_packet_3.data, hex::decode("a5d3e8634cfe78b2307d87c6d90be6fe7855b4f2cc9b1dfb19e92e4b79103f61ff9ac25f412ddfb7466e74f81b3e545563cdd8f5524dae873de61d7bdfccd496af2584930d2b566b4f8d3881f8c043df92224f38cf094cfc09d92655989531524593ec6d6caec1863bdfaa79229b5020acc034cd6deeea1021c50586947b9b8e6faa83b81fbfa6133c0af5d6b07c017f7158fa94f0d206baf12dda6b68f785b773b360fd0497e16cc402d779c8d48d0fa6315536ef0660f3f4e1865f5b38ea49c7da4fd959de4e83ff3ab686f059a45c65ba2af4a6a79166aa0f496bf04d06987b6d2ea205bdb0d347718b9aeff5b61dfff344993a275b79717cd815b6ad4c0beb568c4ac9c36ff1c315ec1119a1993c4b61e6eaa0375e0aaf738ac691abd3263bf937e3").unwrap());
 
-               let onion_packet_4 = ChannelManager::encrypt_failure_packet(&onion_keys[1].shared_secret, &onion_packet_3.data[..]);
+               let onion_packet_4 = ChannelManager::encrypt_failure_packet(&onion_keys[1].shared_secret[..], &onion_packet_3.data[..]);
                assert_eq!(onion_packet_4.data, hex::decode("aac3200c4968f56b21f53e5e374e3a2383ad2b1b6501bbcc45abc31e59b26881b7dfadbb56ec8dae8857add94e6702fb4c3a4de22e2e669e1ed926b04447fc73034bb730f4932acd62727b75348a648a1128744657ca6a4e713b9b646c3ca66cac02cdab44dd3439890ef3aaf61708714f7375349b8da541b2548d452d84de7084bb95b3ac2345201d624d31f4d52078aa0fa05a88b4e20202bd2b86ac5b52919ea305a8949de95e935eed0319cf3cf19ebea61d76ba92532497fcdc9411d06bcd4275094d0a4a3c5d3a945e43305a5a9256e333e1f64dbca5fcd4e03a39b9012d197506e06f29339dfee3331995b21615337ae060233d39befea925cc262873e0530408e6990f1cbd233a150ef7b004ff6166c70c68d9f8c853c1abca640b8660db2921").unwrap());
 
-               let onion_packet_5 = ChannelManager::encrypt_failure_packet(&onion_keys[0].shared_secret, &onion_packet_4.data[..]);
+               let onion_packet_5 = ChannelManager::encrypt_failure_packet(&onion_keys[0].shared_secret[..], &onion_packet_4.data[..]);
                assert_eq!(onion_packet_5.data, hex::decode("9c5add3963fc7f6ed7f148623c84134b5647e1306419dbe2174e523fa9e2fbed3a06a19f899145610741c83ad40b7712aefaddec8c6baf7325d92ea4ca4d1df8bce517f7e54554608bf2bd8071a4f52a7a2f7ffbb1413edad81eeea5785aa9d990f2865dc23b4bc3c301a94eec4eabebca66be5cf638f693ec256aec514620cc28ee4a94bd9565bc4d4962b9d3641d4278fb319ed2b84de5b665f307a2db0f7fbb757366067d88c50f7e829138fde4f78d39b5b5802f1b92a8a820865af5cc79f9f30bc3f461c66af95d13e5e1f0381c184572a91dee1c849048a647a1158cf884064deddbf1b0b88dfe2f791428d0ba0f6fb2f04e14081f69165ae66d9297c118f0907705c9c4954a199bae0bb96fad763d690e7daa6cfda59ba7f2c8d11448b604d12d").unwrap());
        }
 
@@ -2924,6 +3382,7 @@ mod tests {
                chan_monitor: Arc<test_utils::TestChannelMonitor>,
                node: Arc<ChannelManager>,
                router: Router,
+               node_seed: [u8; 32],
                network_payment_count: Rc<RefCell<u8>>,
                network_chan_count: Rc<RefCell<u32>>,
        }
@@ -3595,7 +4054,7 @@ mod tests {
                        let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone()));
                        let node = ChannelManager::new(0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone()).unwrap();
                        let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger));
-                       nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router,
+                       nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, node_seed: seed,
                                network_payment_count: payment_count.clone(),
                                network_chan_count: chan_count.clone(),
                        });
@@ -6398,4 +6857,137 @@ mod tests {
                sign_msg!(unsigned_msg);
                assert!(nodes[0].router.handle_channel_announcement(&chan_announcement).is_err());
        }
+
+       struct VecWriter(Vec<u8>);
+       impl Writer for VecWriter {
+               fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
+                       self.0.extend_from_slice(buf);
+                       Ok(())
+               }
+               fn size_hint(&mut self, size: usize) {
+                       self.0.reserve_exact(size);
+               }
+       }
+
+       #[test]
+       fn test_simple_manager_serialize_deserialize() {
+               let mut nodes = create_network(2);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               let (our_payment_preimage, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
+               let (_, our_payment_hash) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
+
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+               let nodes_0_serialized = nodes[0].node.encode();
+               let mut chan_0_monitor_serialized = VecWriter(Vec::new());
+               nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap();
+
+               nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone()));
+               let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
+               let (_, chan_0_monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap();
+               assert!(chan_0_monitor_read.is_empty());
+
+               let mut nodes_0_read = &nodes_0_serialized[..];
+               let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new())));
+               let (_, nodes_0_deserialized) = {
+                       let mut channel_monitors = HashMap::new();
+                       channel_monitors.insert(chan_0_monitor.get_funding_txo().unwrap(), &chan_0_monitor);
+                       <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+                               keys_manager,
+                               fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }),
+                               monitor: nodes[0].chan_monitor.clone(),
+                               chain_monitor: nodes[0].chain_monitor.clone(),
+                               tx_broadcaster: nodes[0].tx_broadcaster.clone(),
+                               logger: Arc::new(test_utils::TestLogger::new()),
+                               channel_monitors: &channel_monitors,
+                       }).unwrap()
+               };
+               assert!(nodes_0_read.is_empty());
+
+               assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok());
+               nodes[0].node = Arc::new(nodes_0_deserialized);
+               check_added_monitors!(nodes[0], 1);
+
+               reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+
+               fail_payment(&nodes[0], &[&nodes[1]], our_payment_hash);
+               claim_payment(&nodes[0], &[&nodes[1]], our_payment_preimage);
+       }
+
+       #[test]
+       fn test_manager_serialize_deserialize_inconsistent_monitor() {
+               // Test deserializing a ChannelManager with a out-of-date ChannelMonitor
+               let mut nodes = create_network(4);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+               create_announced_chan_between_nodes(&nodes, 2, 0);
+               let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3);
+
+               let (our_payment_preimage, _) = route_payment(&nodes[2], &[&nodes[0], &nodes[1]], 1000000);
+
+               // Serialize the ChannelManager here, but the monitor we keep up-to-date
+               let nodes_0_serialized = nodes[0].node.encode();
+
+               route_payment(&nodes[0], &[&nodes[3]], 1000000);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               nodes[3].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+               // Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/
+               // nodes[3])
+               let mut node_0_monitors_serialized = Vec::new();
+               for monitor in nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter() {
+                       let mut writer = VecWriter(Vec::new());
+                       monitor.1.write_for_disk(&mut writer).unwrap();
+                       node_0_monitors_serialized.push(writer.0);
+               }
+
+               nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone()));
+               let mut node_0_monitors = Vec::new();
+               for serialized in node_0_monitors_serialized.iter() {
+                       let mut read = &serialized[..];
+                       let (_, monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut read, Arc::new(test_utils::TestLogger::new())).unwrap();
+                       assert!(read.is_empty());
+                       node_0_monitors.push(monitor);
+               }
+
+               let mut nodes_0_read = &nodes_0_serialized[..];
+               let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new())));
+               let (_, nodes_0_deserialized) = <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+                       keys_manager,
+                       fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }),
+                       monitor: nodes[0].chan_monitor.clone(),
+                       chain_monitor: nodes[0].chain_monitor.clone(),
+                       tx_broadcaster: nodes[0].tx_broadcaster.clone(),
+                       logger: Arc::new(test_utils::TestLogger::new()),
+                       channel_monitors: &node_0_monitors.iter().map(|monitor| { (monitor.get_funding_txo().unwrap(), monitor) }).collect(),
+               }).unwrap();
+               assert!(nodes_0_read.is_empty());
+
+               { // Channel close should result in a commitment tx and an HTLC tx
+                       let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+                       assert_eq!(txn.len(), 2);
+                       assert_eq!(txn[0].input[0].previous_output.txid, funding_tx.txid());
+                       assert_eq!(txn[1].input[0].previous_output.txid, txn[0].txid());
+               }
+
+               for monitor in node_0_monitors.drain(..) {
+                       assert!(nodes[0].chan_monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor).is_ok());
+                       check_added_monitors!(nodes[0], 1);
+               }
+               nodes[0].node = Arc::new(nodes_0_deserialized);
+
+               // nodes[1] and nodes[2] have no lost state with nodes[0]...
+               reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+               reconnect_nodes(&nodes[0], &nodes[2], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+               //... and we can even still claim the payment!
+               claim_payment(&nodes[2], &[&nodes[0], &nodes[1]], our_payment_preimage);
+
+               nodes[3].node.peer_connected(&nodes[0].node.get_our_node_id());
+               let reestablish = get_event_msg!(nodes[3], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
+               nodes[0].node.peer_connected(&nodes[3].node.get_our_node_id());
+               if let Err(msgs::HandleError { action: Some(msgs::ErrorAction::SendErrorMessage { msg }), .. }) = nodes[0].node.handle_channel_reestablish(&nodes[3].node.get_our_node_id(), &reestablish) {
+                       assert_eq!(msg.channel_id, channel_id);
+               } else { panic!("Unexpected result"); }
+       }
 }
index 5adfe926ce278f8d7c9858f00c13823354340714..4972ae3221df921314e9ab940e24f4c9bb09f965 100644 (file)
@@ -16,6 +16,8 @@ use bitcoin::blockdata::transaction::{TxIn,TxOut,SigHashType,Transaction};
 use bitcoin::blockdata::transaction::OutPoint as BitcoinOutPoint;
 use bitcoin::blockdata::script::Script;
 use bitcoin::network::serialize;
+use bitcoin::network::serialize::BitcoinHash;
+use bitcoin::network::encodable::{ConsensusDecodable, ConsensusEncodable};
 use bitcoin::util::hash::Sha256dHash;
 use bitcoin::util::bip143;
 
@@ -31,7 +33,8 @@ use ln::chan_utils::HTLCOutputInCommitment;
 use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface};
 use chain::transaction::OutPoint;
 use chain::keysinterface::SpendableOutputDescriptor;
-use util::ser::{Readable, Writer};
+use util::logger::Logger;
+use util::ser::{ReadableArgs, Readable, Writer, Writeable, WriterWriteAdaptor, U48};
 use util::sha2::Sha256;
 use util::{byte_utils, events};
 
@@ -112,12 +115,13 @@ pub struct SimpleManyChannelMonitor<Key> {
 }
 
 impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
-       fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
+       fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
+               let block_hash = header.bitcoin_hash();
                let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
                {
-                       let monitors = self.monitors.lock().unwrap();
-                       for monitor in monitors.values() {
-                               let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &*self.broadcaster);
+                       let mut monitors = self.monitors.lock().unwrap();
+                       for monitor in monitors.values_mut() {
+                               let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster);
                                if spendable_outputs.len() > 0 {
                                        new_events.push(events::Event::SpendableOutputs {
                                                outputs: spendable_outputs,
@@ -239,6 +243,7 @@ const MIN_SERIALIZATION_VERSION: u8 = 1;
 ///
 /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
 /// information and are actively monitoring the chain.
+#[derive(Clone)]
 pub struct ChannelMonitor {
        funding_txo: Option<(OutPoint, Script)>,
        commitment_transaction_number_obscure_factor: u64,
@@ -259,7 +264,7 @@ pub struct ChannelMonitor {
        /// spending. Thus, in order to claim them via revocation key, we track all the remote
        /// commitment transactions which we find on-chain, mapping them to the commitment number which
        /// can be used to derive the revocation key and claim the transactions.
-       remote_commitment_txn_on_chain: Mutex<HashMap<Sha256dHash, u64>>,
+       remote_commitment_txn_on_chain: HashMap<Sha256dHash, (u64, Vec<Script>)>,
        /// Cache used to make pruning of payment_preimages faster.
        /// Maps payment_hash values to commitment numbers for remote transactions for non-revoked
        /// remote transactions (ie should remain pretty small).
@@ -273,39 +278,22 @@ pub struct ChannelMonitor {
        prev_local_signed_commitment_tx: Option<LocalSignedTx>,
        current_local_signed_commitment_tx: Option<LocalSignedTx>,
 
+       // Used just for ChannelManager to make sure it has the latest channel data during
+       // deserialization
+       current_remote_commitment_number: u64,
+
        payment_preimages: HashMap<[u8; 32], [u8; 32]>,
 
        destination_script: Script,
-       secp_ctx: Secp256k1<secp256k1::All>, //TODO: dedup this a bit...
-}
-impl Clone for ChannelMonitor {
-       fn clone(&self) -> Self {
-               ChannelMonitor {
-                       funding_txo: self.funding_txo.clone(),
-                       commitment_transaction_number_obscure_factor: self.commitment_transaction_number_obscure_factor.clone(),
-
-                       key_storage: self.key_storage.clone(),
-                       their_htlc_base_key: self.their_htlc_base_key.clone(),
-                       their_delayed_payment_base_key: self.their_delayed_payment_base_key.clone(),
-                       their_cur_revocation_points: self.their_cur_revocation_points.clone(),
-
-                       our_to_self_delay: self.our_to_self_delay,
-                       their_to_self_delay: self.their_to_self_delay,
-
-                       old_secrets: self.old_secrets.clone(),
-                       remote_claimable_outpoints: self.remote_claimable_outpoints.clone(),
-                       remote_commitment_txn_on_chain: Mutex::new((*self.remote_commitment_txn_on_chain.lock().unwrap()).clone()),
-                       remote_hash_commitment_number: self.remote_hash_commitment_number.clone(),
 
-                       prev_local_signed_commitment_tx: self.prev_local_signed_commitment_tx.clone(),
-                       current_local_signed_commitment_tx: self.current_local_signed_commitment_tx.clone(),
-
-                       payment_preimages: self.payment_preimages.clone(),
-
-                       destination_script: self.destination_script.clone(),
-                       secp_ctx: self.secp_ctx.clone(),
-               }
-       }
+       // We simply modify last_block_hash in Channel's block_connected so that serialization is
+       // consistent but hopefully the users' copy handles block_connected in a consistent way.
+       // (we do *not*, however, update them in insert_combine to ensure any local user copies keep
+       // their last_block_hash from its state and not based on updated copies that didn't run through
+       // the full block_connected).
+       pub(crate) last_block_hash: Sha256dHash,
+       secp_ctx: Secp256k1<secp256k1::All>, //TODO: dedup this a bit...
+       logger: Arc<Logger>,
 }
 
 #[cfg(any(test, feature = "fuzztarget"))]
@@ -322,8 +310,10 @@ impl PartialEq for ChannelMonitor {
                        self.our_to_self_delay != other.our_to_self_delay ||
                        self.their_to_self_delay != other.their_to_self_delay ||
                        self.remote_claimable_outpoints != other.remote_claimable_outpoints ||
+                       self.remote_commitment_txn_on_chain != other.remote_commitment_txn_on_chain ||
                        self.remote_hash_commitment_number != other.remote_hash_commitment_number ||
                        self.prev_local_signed_commitment_tx != other.prev_local_signed_commitment_tx ||
+                       self.current_remote_commitment_number != other.current_remote_commitment_number ||
                        self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
                        self.payment_preimages != other.payment_preimages ||
                        self.destination_script != other.destination_script
@@ -335,15 +325,13 @@ impl PartialEq for ChannelMonitor {
                                        return false
                                }
                        }
-                       let us = self.remote_commitment_txn_on_chain.lock().unwrap();
-                       let them = other.remote_commitment_txn_on_chain.lock().unwrap();
-                       *us == *them
+                       true
                }
        }
 }
 
 impl ChannelMonitor {
-       pub(super) fn new(revocation_base_key: &SecretKey, delayed_payment_base_key: &SecretKey, htlc_base_key: &SecretKey, our_to_self_delay: u16, destination_script: Script) -> ChannelMonitor {
+       pub(super) fn new(revocation_base_key: &SecretKey, delayed_payment_base_key: &SecretKey, htlc_base_key: &SecretKey, our_to_self_delay: u16, destination_script: Script, logger: Arc<Logger>) -> ChannelMonitor {
                ChannelMonitor {
                        funding_txo: None,
                        commitment_transaction_number_obscure_factor: 0,
@@ -364,16 +352,19 @@ impl ChannelMonitor {
 
                        old_secrets: [([0; 32], 1 << 48); 49],
                        remote_claimable_outpoints: HashMap::new(),
-                       remote_commitment_txn_on_chain: Mutex::new(HashMap::new()),
+                       remote_commitment_txn_on_chain: HashMap::new(),
                        remote_hash_commitment_number: HashMap::new(),
 
                        prev_local_signed_commitment_tx: None,
                        current_local_signed_commitment_tx: None,
+                       current_remote_commitment_number: 1 << 48,
 
                        payment_preimages: HashMap::new(),
-
                        destination_script: destination_script,
+
+                       last_block_hash: Default::default(),
                        secp_ctx: Secp256k1::new(),
+                       logger,
                }
        }
 
@@ -486,6 +477,7 @@ impl ChannelMonitor {
                        self.remote_hash_commitment_number.insert(htlc.payment_hash, commitment_number);
                }
                self.remote_claimable_outpoints.insert(unsigned_commitment_tx.txid(), htlc_outputs);
+               self.current_remote_commitment_number = commitment_number;
        }
 
        /// Informs this monitor of the latest local (ie broadcastable) commitment transaction. The
@@ -543,6 +535,8 @@ impl ChannelMonitor {
                if our_min_secret > other_min_secret {
                        self.provide_secret(other_min_secret, other.get_secret(other_min_secret).unwrap(), None)?;
                }
+               // TODO: We should use current_remote_commitment_number and the commitment number out of
+               // local transactions to decide how to merge
                if our_min_secret >= other_min_secret {
                        self.their_cur_revocation_points = other.their_cur_revocation_points;
                        for (txid, htlcs) in other.remote_claimable_outpoints.drain() {
@@ -556,6 +550,7 @@ impl ChannelMonitor {
                        }
                        self.payment_preimages = other.payment_preimages;
                }
+               self.current_remote_commitment_number = cmp::min(self.current_remote_commitment_number, other.current_remote_commitment_number);
                Ok(())
        }
 
@@ -597,6 +592,20 @@ impl ChannelMonitor {
                }
        }
 
+       /// Gets the sets of all outpoints which this ChannelMonitor expects to hear about spends of.
+       /// Generally useful when deserializing as during normal operation the return values of
+       /// block_connected are sufficient to ensure all relevant outpoints are being monitored (note
+       /// that the get_funding_txo outpoint and transaction must also be monitored for!).
+       pub fn get_monitored_outpoints(&self) -> Vec<(Sha256dHash, u32, &Script)> {
+               let mut res = Vec::with_capacity(self.remote_commitment_txn_on_chain.len() * 2);
+               for (ref txid, &(_, ref outputs)) in self.remote_commitment_txn_on_chain.iter() {
+                       for (idx, output) in outputs.iter().enumerate() {
+                               res.push(((*txid).clone(), idx as u32, output));
+                       }
+               }
+               res
+       }
+
        /// Serializes into a vec, with various modes for the exposed pub fns
        fn write<W: Writer>(&self, writer: &mut W, for_local_storage: bool) -> Result<(), ::std::io::Error> {
                //TODO: We still write out all the serialization here manually instead of using the fancy
@@ -608,8 +617,7 @@ impl ChannelMonitor {
                        &Some((ref outpoint, ref script)) => {
                                writer.write_all(&outpoint.txid[..])?;
                                writer.write_all(&byte_utils::be16_to_array(outpoint.index))?;
-                               writer.write_all(&byte_utils::be64_to_array(script.len() as u64))?;
-                               writer.write_all(&script[..])?;
+                               script.write(writer)?;
                        },
                        &None => {
                                // We haven't even been initialized...not sure why anyone is serializing us, but
@@ -619,7 +627,7 @@ impl ChannelMonitor {
                }
 
                // Set in initial Channel-object creation, so should always be set by now:
-               writer.write_all(&byte_utils::be48_to_array(self.commitment_transaction_number_obscure_factor))?;
+               U48(self.commitment_transaction_number_obscure_factor).write(writer)?;
 
                match self.key_storage {
                        KeyStorage::PrivMode { ref revocation_base_key, ref htlc_base_key, ref delayed_payment_base_key, ref prev_latest_per_commitment_point, ref latest_per_commitment_point } => {
@@ -684,7 +692,7 @@ impl ChannelMonitor {
                }
 
                writer.write_all(&byte_utils::be64_to_array(self.remote_claimable_outpoints.len() as u64))?;
-               for (txid, htlc_outputs) in self.remote_claimable_outpoints.iter() {
+               for (ref txid, ref htlc_outputs) in self.remote_claimable_outpoints.iter() {
                        writer.write_all(&txid[..])?;
                        writer.write_all(&byte_utils::be64_to_array(htlc_outputs.len() as u64))?;
                        for htlc_output in htlc_outputs.iter() {
@@ -692,19 +700,20 @@ impl ChannelMonitor {
                        }
                }
 
-               {
-                       let remote_commitment_txn_on_chain = self.remote_commitment_txn_on_chain.lock().unwrap();
-                       writer.write_all(&byte_utils::be64_to_array(remote_commitment_txn_on_chain.len() as u64))?;
-                       for (txid, commitment_number) in remote_commitment_txn_on_chain.iter() {
-                               writer.write_all(&txid[..])?;
-                               writer.write_all(&byte_utils::be48_to_array(*commitment_number))?;
+               writer.write_all(&byte_utils::be64_to_array(self.remote_commitment_txn_on_chain.len() as u64))?;
+               for (ref txid, &(commitment_number, ref txouts)) in self.remote_commitment_txn_on_chain.iter() {
+                       writer.write_all(&txid[..])?;
+                       writer.write_all(&byte_utils::be48_to_array(commitment_number))?;
+                       (txouts.len() as u64).write(writer)?;
+                       for script in txouts.iter() {
+                               script.write(writer)?;
                        }
                }
 
                if for_local_storage {
                        writer.write_all(&byte_utils::be64_to_array(self.remote_hash_commitment_number.len() as u64))?;
-                       for (payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() {
-                               writer.write_all(payment_hash)?;
+                       for (ref payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() {
+                               writer.write_all(*payment_hash)?;
                                writer.write_all(&byte_utils::be48_to_array(*commitment_number))?;
                        }
                } else {
@@ -713,9 +722,12 @@ impl ChannelMonitor {
 
                macro_rules! serialize_local_tx {
                        ($local_tx: expr) => {
-                               let tx_ser = serialize::serialize(&$local_tx.tx).unwrap();
-                               writer.write_all(&byte_utils::be64_to_array(tx_ser.len() as u64))?;
-                               writer.write_all(&tx_ser)?;
+                               if let Err(e) = $local_tx.tx.consensus_encode(&mut serialize::RawEncoder::new(WriterWriteAdaptor(writer))) {
+                                       match e {
+                                               serialize::Error::Io(e) => return Err(e),
+                                               _ => panic!("local tx must have been well-formed!"),
+                                       }
+                               }
 
                                writer.write_all(&$local_tx.revocation_key.serialize())?;
                                writer.write_all(&$local_tx.a_htlc_key.serialize())?;
@@ -746,23 +758,41 @@ impl ChannelMonitor {
                        writer.write_all(&[0; 1])?;
                }
 
+               if for_local_storage {
+                       writer.write_all(&byte_utils::be48_to_array(self.current_remote_commitment_number))?;
+               } else {
+                       writer.write_all(&byte_utils::be48_to_array(0))?;
+               }
+
                writer.write_all(&byte_utils::be64_to_array(self.payment_preimages.len() as u64))?;
                for payment_preimage in self.payment_preimages.values() {
                        writer.write_all(payment_preimage)?;
                }
 
-               writer.write_all(&byte_utils::be64_to_array(self.destination_script.len() as u64))?;
-               writer.write_all(&self.destination_script[..])?;
+               self.last_block_hash.write(writer)?;
+               self.destination_script.write(writer)?;
 
                Ok(())
        }
 
        /// Writes this monitor into the given writer, suitable for writing to disk.
+       ///
+       /// Note that the deserializer is only implemented for (Sha256dHash, ChannelMonitor), which
+       /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
+       /// the "reorg path" (ie not just starting at the same height but starting at the highest
+       /// common block that appears on your best chain as well as on the chain which contains the
+       /// last block hash returned) upon deserializing the object!
        pub fn write_for_disk<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                self.write(writer, true)
        }
 
        /// Encodes this monitor into the given writer, suitable for sending to a remote watchtower
+       ///
+       /// Note that the deserializer is only implemented for (Sha256dHash, ChannelMonitor), which
+       /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
+       /// the "reorg path" (ie not just starting at the same height but starting at the highest
+       /// common block that appears on your best chain as well as on the chain which contains the
+       /// last block hash returned) upon deserializing the object!
        pub fn write_for_watchtower<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                self.write(writer, false)
        }
@@ -792,11 +822,21 @@ impl ChannelMonitor {
                min
        }
 
+       pub(super) fn get_cur_remote_commitment_number(&self) -> u64 {
+               self.current_remote_commitment_number
+       }
+
+       pub(super) fn get_cur_local_commitment_number(&self) -> u64 {
+               if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx {
+                       0xffff_ffff_ffff - ((((local_tx.tx.input[0].sequence as u64 & 0xffffff) << 3*8) | (local_tx.tx.lock_time as u64 & 0xffffff)) ^ self.commitment_transaction_number_obscure_factor)
+               } else { 0xffff_ffff_ffff }
+       }
+
        /// Attempts to claim a remote commitment transaction's outputs using the revocation key and
        /// data in remote_claimable_outpoints. Will directly claim any HTLC outputs which expire at a
        /// height > height + CLTV_SHARED_CLAIM_BUFFER. In any case, will install monitoring for
        /// HTLC-Success/HTLC-Timeout transactions.
-       fn check_spend_remote_transaction(&self, tx: &Transaction, height: u32) -> (Vec<Transaction>, (Sha256dHash, Vec<TxOut>), Vec<SpendableOutputDescriptor>) {
+       fn check_spend_remote_transaction(&mut self, tx: &Transaction, height: u32) -> (Vec<Transaction>, (Sha256dHash, Vec<TxOut>), Vec<SpendableOutputDescriptor>) {
                // Most secp and related errors trying to create keys means we have no hope of constructing
                // a spend transaction...so we return no transactions to broadcast
                let mut txn_to_broadcast = Vec::new();
@@ -936,7 +976,7 @@ impl ChannelMonitor {
                        if !inputs.is_empty() || !txn_to_broadcast.is_empty() { // ie we're confident this is actually ours
                                // We're definitely a remote commitment transaction!
                                watch_outputs.append(&mut tx.output.clone());
-                               self.remote_commitment_txn_on_chain.lock().unwrap().insert(commitment_txid, commitment_number);
+                               self.remote_commitment_txn_on_chain.insert(commitment_txid, (commitment_number, tx.output.iter().map(|output| { output.script_pubkey.clone() }).collect()));
                        }
                        if inputs.is_empty() { return (txn_to_broadcast, (commitment_txid, watch_outputs), spendable_outputs); } // Nothing to be done...probably a false positive/local tx
 
@@ -973,7 +1013,7 @@ impl ChannelMonitor {
                        // not being generated by the above conditional. Thus, to be safe, we go ahead and
                        // insert it here.
                        watch_outputs.append(&mut tx.output.clone());
-                       self.remote_commitment_txn_on_chain.lock().unwrap().insert(commitment_txid, commitment_number);
+                       self.remote_commitment_txn_on_chain.insert(commitment_txid, (commitment_number, tx.output.iter().map(|output| { output.script_pubkey.clone() }).collect()));
 
                        if let Some(revocation_points) = self.their_cur_revocation_points {
                                let revocation_point_option =
@@ -1276,7 +1316,24 @@ impl ChannelMonitor {
                (Vec::new(), Vec::new())
        }
 
-       fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
+       /// Used by ChannelManager deserialization to broadcast the latest local state if it's copy of
+       /// the Channel was out-of-date.
+       pub(super) fn get_latest_local_commitment_txn(&self) -> Vec<Transaction> {
+               if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx {
+                       let mut res = vec![local_tx.tx.clone()];
+                       match self.key_storage {
+                               KeyStorage::PrivMode { ref delayed_payment_base_key, ref prev_latest_per_commitment_point, .. } => {
+                                       res.append(&mut self.broadcast_by_local_state(local_tx, prev_latest_per_commitment_point, &Some(*delayed_payment_base_key)).0);
+                               },
+                               _ => panic!("Can only broadcast by local channelmonitor"),
+                       };
+                       res
+               } else {
+                       Vec::new()
+               }
+       }
+
+       fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
                let mut watch_outputs = Vec::new();
                let mut spendable_outputs = Vec::new();
                for tx in txn_matched {
@@ -1300,9 +1357,8 @@ impl ChannelMonitor {
                                                txn = remote_txn;
                                        }
                                } else {
-                                       let remote_commitment_txn_on_chain = self.remote_commitment_txn_on_chain.lock().unwrap();
-                                       if let Some(commitment_number) = remote_commitment_txn_on_chain.get(&prevout.txid) {
-                                               let (tx, spendable_output) = self.check_spend_remote_htlc(tx, *commitment_number);
+                                       if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) {
+                                               let (tx, spendable_output) = self.check_spend_remote_htlc(tx, commitment_number);
                                                if let Some(tx) = tx {
                                                        txn.push(tx);
                                                }
@@ -1337,6 +1393,7 @@ impl ChannelMonitor {
                                }
                        }
                }
+               self.last_block_hash = block_hash.clone();
                (watch_outputs, spendable_outputs)
        }
 
@@ -1373,27 +1430,10 @@ impl ChannelMonitor {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
-       fn read(reader: &mut R) -> Result<Self, DecodeError> {
-               // TODO: read_to_end and then deserializing from that vector is really dumb, we should
-               // actually use the fancy serialization framework we have instead of hacking around it.
-               let mut datavec = Vec::new();
-               reader.read_to_end(&mut datavec)?;
-               let data = &datavec;
-
-               let mut read_pos = 0;
-               macro_rules! read_bytes {
-                       ($byte_count: expr) => {
-                               {
-                                       if ($byte_count as usize) > data.len() - read_pos {
-                                               return Err(DecodeError::ShortRead);
-                                       }
-                                       read_pos += $byte_count as usize;
-                                       &data[read_pos - $byte_count as usize..read_pos]
-                               }
-                       }
-               }
+const MAX_ALLOC_SIZE: usize = 64*1024;
 
+impl<R: ::std::io::Read> ReadableArgs<R, Arc<Logger>> for (Sha256dHash, ChannelMonitor) {
+       fn read(reader: &mut R, logger: Arc<Logger>) -> Result<Self, DecodeError> {
                let secp_ctx = Secp256k1::new();
                macro_rules! unwrap_obj {
                        ($key: expr) => {
@@ -1404,8 +1444,8 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        }
                }
 
-               let _ver = read_bytes!(1)[0];
-               let min_ver = read_bytes!(1)[0];
+               let _ver: u8 = Readable::read(reader)?;
+               let min_ver: u8 = Readable::read(reader)?;
                if min_ver > SERIALIZATION_VERSION {
                        return Err(DecodeError::UnknownVersion);
                }
@@ -1413,31 +1453,26 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                // Technically this can fail and serialize fail a round-trip, but only for serialization of
                // barely-init'd ChannelMonitors that we can't do anything with.
                let outpoint = OutPoint {
-                       txid: Sha256dHash::from(read_bytes!(32)),
-                       index: byte_utils::slice_to_be16(read_bytes!(2)),
+                       txid: Readable::read(reader)?,
+                       index: Readable::read(reader)?,
                };
-               let script_len = byte_utils::slice_to_be64(read_bytes!(8));
-               let funding_txo = Some((outpoint, Script::from(read_bytes!(script_len).to_vec())));
-               let commitment_transaction_number_obscure_factor = byte_utils::slice_to_be48(read_bytes!(6));
+               let funding_txo = Some((outpoint, Readable::read(reader)?));
+               let commitment_transaction_number_obscure_factor = <U48 as Readable<R>>::read(reader)?.0;
 
-               let key_storage = match read_bytes!(1)[0] {
+               let key_storage = match <u8 as Readable<R>>::read(reader)? {
                        0 => {
-                               let revocation_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32)));
-                               let htlc_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32)));
-                               let delayed_payment_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32)));
-                               let prev_latest_per_commitment_point = match read_bytes!(1)[0] {
-                                               0 => None,
-                                               1 => {
-                                                       Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))))
-                                               },
-                                               _ => return Err(DecodeError::InvalidValue),
+                               let revocation_base_key = Readable::read(reader)?;
+                               let htlc_base_key = Readable::read(reader)?;
+                               let delayed_payment_base_key = Readable::read(reader)?;
+                               let prev_latest_per_commitment_point = match <u8 as Readable<R>>::read(reader)? {
+                                       0 => None,
+                                       1 => Some(Readable::read(reader)?),
+                                       _ => return Err(DecodeError::InvalidValue),
                                };
-                               let latest_per_commitment_point = match read_bytes!(1)[0] {
-                                               0 => None,
-                                               1 => {
-                                                       Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))))
-                                               },
-                                               _ => return Err(DecodeError::InvalidValue),
+                               let latest_per_commitment_point = match <u8 as Readable<R>>::read(reader)? {
+                                       0 => None,
+                                       1 => Some(Readable::read(reader)?),
+                                       _ => return Err(DecodeError::InvalidValue),
                                };
                                KeyStorage::PrivMode {
                                        revocation_base_key,
@@ -1450,45 +1485,41 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        _ => return Err(DecodeError::InvalidValue),
                };
 
-               let their_htlc_base_key = Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))));
-               let their_delayed_payment_base_key = Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))));
+               let their_htlc_base_key = Some(Readable::read(reader)?);
+               let their_delayed_payment_base_key = Some(Readable::read(reader)?);
 
                let their_cur_revocation_points = {
-                       let first_idx = byte_utils::slice_to_be48(read_bytes!(6));
+                       let first_idx = <U48 as Readable<R>>::read(reader)?.0;
                        if first_idx == 0 {
                                None
                        } else {
-                               let first_point = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)));
-                               let second_point_slice = read_bytes!(33);
+                               let first_point = Readable::read(reader)?;
+                               let second_point_slice: [u8; 33] = Readable::read(reader)?;
                                if second_point_slice[0..32] == [0; 32] && second_point_slice[32] == 0 {
                                        Some((first_idx, first_point, None))
                                } else {
-                                       Some((first_idx, first_point, Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, second_point_slice)))))
+                                       Some((first_idx, first_point, Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, &second_point_slice)))))
                                }
                        }
                };
 
-               let our_to_self_delay = byte_utils::slice_to_be16(read_bytes!(2));
-               let their_to_self_delay = Some(byte_utils::slice_to_be16(read_bytes!(2)));
+               let our_to_self_delay: u16 = Readable::read(reader)?;
+               let their_to_self_delay: Option<u16> = Some(Readable::read(reader)?);
 
                let mut old_secrets = [([0; 32], 1 << 48); 49];
                for &mut (ref mut secret, ref mut idx) in old_secrets.iter_mut() {
-                       secret.copy_from_slice(read_bytes!(32));
-                       *idx = byte_utils::slice_to_be64(read_bytes!(8));
+                       *secret = Readable::read(reader)?;
+                       *idx = Readable::read(reader)?;
                }
 
                macro_rules! read_htlc_in_commitment {
                        () => {
                                {
-                                       let offered = match read_bytes!(1)[0] {
-                                               0 => false, 1 => true,
-                                               _ => return Err(DecodeError::InvalidValue),
-                                       };
-                                       let amount_msat = byte_utils::slice_to_be64(read_bytes!(8));
-                                       let cltv_expiry = byte_utils::slice_to_be32(read_bytes!(4));
-                                       let mut payment_hash = [0; 32];
-                                       payment_hash[..].copy_from_slice(read_bytes!(32));
-                                       let transaction_output_index = byte_utils::slice_to_be32(read_bytes!(4));
+                                       let offered: bool = Readable::read(reader)?;
+                                       let amount_msat: u64 = Readable::read(reader)?;
+                                       let cltv_expiry: u32 = Readable::read(reader)?;
+                                       let payment_hash: [u8; 32] = Readable::read(reader)?;
+                                       let transaction_output_index: u32 = Readable::read(reader)?;
 
                                        HTLCOutputInCommitment {
                                                offered, amount_msat, cltv_expiry, payment_hash, transaction_output_index
@@ -1497,14 +1528,12 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        }
                }
 
-               let remote_claimable_outpoints_len = byte_utils::slice_to_be64(read_bytes!(8));
-               if remote_claimable_outpoints_len > data.len() as u64 / 64 { return Err(DecodeError::BadLengthDescriptor); }
-               let mut remote_claimable_outpoints = HashMap::with_capacity(remote_claimable_outpoints_len as usize);
+               let remote_claimable_outpoints_len: u64 = Readable::read(reader)?;
+               let mut remote_claimable_outpoints = HashMap::with_capacity(cmp::min(remote_claimable_outpoints_len as usize, MAX_ALLOC_SIZE / 64));
                for _ in 0..remote_claimable_outpoints_len {
-                       let txid = Sha256dHash::from(read_bytes!(32));
-                       let outputs_count = byte_utils::slice_to_be64(read_bytes!(8));
-                       if outputs_count > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); }
-                       let mut outputs = Vec::with_capacity(outputs_count as usize);
+                       let txid: Sha256dHash = Readable::read(reader)?;
+                       let outputs_count: u64 = Readable::read(reader)?;
+                       let mut outputs = Vec::with_capacity(cmp::min(outputs_count as usize, MAX_ALLOC_SIZE / 32));
                        for _ in 0..outputs_count {
                                outputs.push(read_htlc_in_commitment!());
                        }
@@ -1513,24 +1542,26 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        }
                }
 
-               let remote_commitment_txn_on_chain_len = byte_utils::slice_to_be64(read_bytes!(8));
-               if remote_commitment_txn_on_chain_len > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); }
-               let mut remote_commitment_txn_on_chain = HashMap::with_capacity(remote_commitment_txn_on_chain_len as usize);
+               let remote_commitment_txn_on_chain_len: u64 = Readable::read(reader)?;
+               let mut remote_commitment_txn_on_chain = HashMap::with_capacity(cmp::min(remote_commitment_txn_on_chain_len as usize, MAX_ALLOC_SIZE / 32));
                for _ in 0..remote_commitment_txn_on_chain_len {
-                       let txid = Sha256dHash::from(read_bytes!(32));
-                       let commitment_number = byte_utils::slice_to_be48(read_bytes!(6));
-                       if let Some(_) = remote_commitment_txn_on_chain.insert(txid, commitment_number) {
+                       let txid: Sha256dHash = Readable::read(reader)?;
+                       let commitment_number = <U48 as Readable<R>>::read(reader)?.0;
+                       let outputs_count = <u64 as Readable<R>>::read(reader)?;
+                       let mut outputs = Vec::with_capacity(cmp::min(outputs_count as usize, MAX_ALLOC_SIZE / 8));
+                       for _ in 0..outputs_count {
+                               outputs.push(Readable::read(reader)?);
+                       }
+                       if let Some(_) = remote_commitment_txn_on_chain.insert(txid, (commitment_number, outputs)) {
                                return Err(DecodeError::InvalidValue);
                        }
                }
 
-               let remote_hash_commitment_number_len = byte_utils::slice_to_be64(read_bytes!(8));
-               if remote_hash_commitment_number_len > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); }
-               let mut remote_hash_commitment_number = HashMap::with_capacity(remote_hash_commitment_number_len as usize);
+               let remote_hash_commitment_number_len: u64 = Readable::read(reader)?;
+               let mut remote_hash_commitment_number = HashMap::with_capacity(cmp::min(remote_hash_commitment_number_len as usize, MAX_ALLOC_SIZE / 32));
                for _ in 0..remote_hash_commitment_number_len {
-                       let mut txid = [0; 32];
-                       txid[..].copy_from_slice(read_bytes!(32));
-                       let commitment_number = byte_utils::slice_to_be48(read_bytes!(6));
+                       let txid: [u8; 32] = Readable::read(reader)?;
+                       let commitment_number = <U48 as Readable<R>>::read(reader)?.0;
                        if let Some(_) = remote_hash_commitment_number.insert(txid, commitment_number) {
                                return Err(DecodeError::InvalidValue);
                        }
@@ -1539,29 +1570,29 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                macro_rules! read_local_tx {
                        () => {
                                {
-                                       let tx_len = byte_utils::slice_to_be64(read_bytes!(8));
-                                       let tx_ser = read_bytes!(tx_len);
-                                       let tx: Transaction = unwrap_obj!(serialize::deserialize(tx_ser));
-                                       if serialize::serialize(&tx).unwrap() != tx_ser {
-                                               // We check that the tx re-serializes to the same form to ensure there is
-                                               // no extra data, and as rust-bitcoin doesn't handle the 0-input ambiguity
-                                               // all that well.
+                                       let tx = match Transaction::consensus_decode(&mut serialize::RawDecoder::new(reader.by_ref())) {
+                                               Ok(tx) => tx,
+                                               Err(e) => match e {
+                                                       serialize::Error::Io(ioe) => return Err(DecodeError::Io(ioe)),
+                                                       _ => return Err(DecodeError::InvalidValue),
+                                               },
+                                       };
+
+                                       if tx.input.is_empty() {
+                                               // Ensure tx didn't hit the 0-input ambiguity case.
                                                return Err(DecodeError::InvalidValue);
                                        }
 
-                                       let revocation_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)));
-                                       let a_htlc_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)));
-                                       let b_htlc_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)));
-                                       let delayed_payment_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)));
-                                       let feerate_per_kw = byte_utils::slice_to_be64(read_bytes!(8));
+                                       let revocation_key = Readable::read(reader)?;
+                                       let a_htlc_key = Readable::read(reader)?;
+                                       let b_htlc_key = Readable::read(reader)?;
+                                       let delayed_payment_key = Readable::read(reader)?;
+                                       let feerate_per_kw: u64 = Readable::read(reader)?;
 
-                                       let htlc_outputs_len = byte_utils::slice_to_be64(read_bytes!(8));
-                                       if htlc_outputs_len > data.len() as u64 / 128 { return Err(DecodeError::BadLengthDescriptor); }
-                                       let mut htlc_outputs = Vec::with_capacity(htlc_outputs_len as usize);
+                                       let htlc_outputs_len: u64 = Readable::read(reader)?;
+                                       let mut htlc_outputs = Vec::with_capacity(cmp::min(htlc_outputs_len as usize, MAX_ALLOC_SIZE / 128));
                                        for _ in 0..htlc_outputs_len {
-                                               htlc_outputs.push((read_htlc_in_commitment!(),
-                                                               unwrap_obj!(Signature::from_compact(&secp_ctx, read_bytes!(64))),
-                                                               unwrap_obj!(Signature::from_compact(&secp_ctx, read_bytes!(64)))));
+                                               htlc_outputs.push((read_htlc_in_commitment!(), Readable::read(reader)?, Readable::read(reader)?));
                                        }
 
                                        LocalSignedTx {
@@ -1572,7 +1603,7 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        }
                }
 
-               let prev_local_signed_commitment_tx = match read_bytes!(1)[0] {
+               let prev_local_signed_commitment_tx = match <u8 as Readable<R>>::read(reader)? {
                        0 => None,
                        1 => {
                                Some(read_local_tx!())
@@ -1580,7 +1611,7 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        _ => return Err(DecodeError::InvalidValue),
                };
 
-               let current_local_signed_commitment_tx = match read_bytes!(1)[0] {
+               let current_local_signed_commitment_tx = match <u8 as Readable<R>>::read(reader)? {
                        0 => None,
                        1 => {
                                Some(read_local_tx!())
@@ -1588,13 +1619,13 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        _ => return Err(DecodeError::InvalidValue),
                };
 
-               let payment_preimages_len = byte_utils::slice_to_be64(read_bytes!(8));
-               if payment_preimages_len > data.len() as u64 / 32 { return Err(DecodeError::InvalidValue); }
-               let mut payment_preimages = HashMap::with_capacity(payment_preimages_len as usize);
+               let current_remote_commitment_number = <U48 as Readable<R>>::read(reader)?.0;
+
+               let payment_preimages_len: u64 = Readable::read(reader)?;
+               let mut payment_preimages = HashMap::with_capacity(cmp::min(payment_preimages_len as usize, MAX_ALLOC_SIZE / 32));
                let mut sha = Sha256::new();
                for _ in 0..payment_preimages_len {
-                       let mut preimage = [0; 32];
-                       preimage[..].copy_from_slice(read_bytes!(32));
+                       let preimage: [u8; 32] = Readable::read(reader)?;
                        sha.reset();
                        sha.input(&preimage);
                        let mut hash = [0; 32];
@@ -1604,10 +1635,10 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
                        }
                }
 
-               let destination_script_len = byte_utils::slice_to_be64(read_bytes!(8));
-               let destination_script = Script::from(read_bytes!(destination_script_len).to_vec());
+               let last_block_hash: Sha256dHash = Readable::read(reader)?;
+               let destination_script = Readable::read(reader)?;
 
-               Ok(ChannelMonitor {
+               Ok((last_block_hash.clone(), ChannelMonitor {
                        funding_txo,
                        commitment_transaction_number_obscure_factor,
 
@@ -1621,17 +1652,20 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitor {
 
                        old_secrets,
                        remote_claimable_outpoints,
-                       remote_commitment_txn_on_chain: Mutex::new(remote_commitment_txn_on_chain),
+                       remote_commitment_txn_on_chain,
                        remote_hash_commitment_number,
 
                        prev_local_signed_commitment_tx,
                        current_local_signed_commitment_tx,
+                       current_remote_commitment_number,
 
                        payment_preimages,
 
                        destination_script,
+                       last_block_hash,
                        secp_ctx,
-               })
+                       logger,
+               }))
        }
 
 }
@@ -1645,9 +1679,11 @@ mod tests {
        use ln::channelmonitor::ChannelMonitor;
        use ln::chan_utils::{HTLCOutputInCommitment, TxCreationKeys};
        use util::sha2::Sha256;
+       use util::test_utils::TestLogger;
        use secp256k1::key::{SecretKey,PublicKey};
        use secp256k1::{Secp256k1, Signature};
        use rand::{thread_rng,Rng};
+       use std::sync::Arc;
 
        #[test]
        fn test_per_commitment_storage() {
@@ -1655,6 +1691,7 @@ mod tests {
                let mut secrets: Vec<[u8; 32]> = Vec::new();
                let mut monitor: ChannelMonitor;
                let secp_ctx = Secp256k1::new();
+               let logger = Arc::new(TestLogger::new());
 
                macro_rules! test_secrets {
                        () => {
@@ -1670,7 +1707,7 @@ mod tests {
 
                {
                        // insert_secret correct sequence
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1716,7 +1753,7 @@ mod tests {
 
                {
                        // insert_secret #1 incorrect
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1732,7 +1769,7 @@ mod tests {
 
                {
                        // insert_secret #2 incorrect (#1 derived from incorrect)
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1758,7 +1795,7 @@ mod tests {
 
                {
                        // insert_secret #3 incorrect
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1784,7 +1821,7 @@ mod tests {
 
                {
                        // insert_secret #4 incorrect (1,2,3 derived from incorrect)
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1830,7 +1867,7 @@ mod tests {
 
                {
                        // insert_secret #5 incorrect
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1866,7 +1903,7 @@ mod tests {
 
                {
                        // insert_secret #6 incorrect (5 derived from incorrect)
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1912,7 +1949,7 @@ mod tests {
 
                {
                        // insert_secret #7 incorrect
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -1958,7 +1995,7 @@ mod tests {
 
                {
                        // insert_secret #8 incorrect
-                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+                       monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                        secrets.clear();
 
                        secrets.push([0; 32]);
@@ -2006,6 +2043,7 @@ mod tests {
        #[test]
        fn test_prune_preimages() {
                let secp_ctx = Secp256k1::new();
+               let logger = Arc::new(TestLogger::new());
                let dummy_sig = Signature::from_der(&secp_ctx, &hex::decode("3045022100fa86fa9a36a8cd6a7bb8f06a541787d51371d067951a9461d5404de6b928782e02201c8b7c334c10aed8976a3a465be9a28abff4cb23acbf00022295b378ce1fa3cd").unwrap()[..]).unwrap();
 
                macro_rules! dummy_keys {
@@ -2076,7 +2114,7 @@ mod tests {
 
                // Prune with one old state and a local commitment tx holding a few overlaps with the
                // old state.
-               let mut monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new());
+               let mut monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone());
                monitor.set_their_to_self_delay(10);
 
                monitor.provide_latest_local_commitment_tx_info(dummy_tx.clone(), dummy_keys!(), 0, preimages_to_local_htlcs!(preimages[0..10]));
index 0d049f5a78d6bfcb68bd41a49ab06602e98069c3..9fe2cfb50012b3619d1b10ef0edcae65f0fddee9 100644 (file)
@@ -13,9 +13,9 @@ use bitcoin::blockdata::opcodes;
 
 use chain::chaininterface::{ChainError, ChainWatchInterface};
 use ln::channelmanager;
-use ln::msgs::{ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures};
+use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures};
 use ln::msgs;
-use util::ser::Writeable;
+use util::ser::{Writeable, Readable};
 use util::logger::Logger;
 
 use std::cmp;
@@ -47,6 +47,37 @@ pub struct Route {
        pub hops: Vec<RouteHop>,
 }
 
+impl Writeable for Route {
+       fn write<W: ::util::ser::Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               (self.hops.len() as u8).write(writer)?;
+               for hop in self.hops.iter() {
+                       hop.pubkey.write(writer)?;
+                       hop.short_channel_id.write(writer)?;
+                       hop.fee_msat.write(writer)?;
+                       hop.cltv_expiry_delta.write(writer)?;
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for Route {
+       fn read(reader: &mut R) -> Result<Route, DecodeError> {
+               let hops_count: u8 = Readable::read(reader)?;
+               let mut hops = Vec::with_capacity(hops_count as usize);
+               for _ in 0..hops_count {
+                       hops.push(RouteHop {
+                               pubkey: Readable::read(reader)?,
+                               short_channel_id: Readable::read(reader)?,
+                               fee_msat: Readable::read(reader)?,
+                               cltv_expiry_delta: Readable::read(reader)?,
+                       });
+               }
+               Ok(Route {
+                       hops
+               })
+       }
+}
+
 struct DirectionalChannelInfo {
        src_node_id: PublicKey,
        last_update: u32,
index 92cc66b81e30af9599f35bcd8cec1bcb088d2713..84cfa8d1633f98e78a996792d53c146d981be61a 100644 (file)
@@ -2,19 +2,19 @@
 //! as ChannelsManagers and ChannelMonitors.
 
 use std::result::Result;
-use std::io::Read;
+use std::io::{Read, Write};
 use std::collections::HashMap;
 use std::hash::Hash;
 
 use secp256k1::{Secp256k1, Signature};
-use secp256k1::key::PublicKey;
+use secp256k1::key::{PublicKey, SecretKey};
 use bitcoin::util::hash::Sha256dHash;
 use bitcoin::blockdata::script::Script;
 use std::marker::Sized;
 use ln::msgs::DecodeError;
 use util::byte_utils;
 
-use util::byte_utils::{be64_to_array, be32_to_array, be16_to_array, slice_to_be16, slice_to_be32, slice_to_be64};
+use util::byte_utils::{be64_to_array, be48_to_array, be32_to_array, be16_to_array, slice_to_be16, slice_to_be32, slice_to_be48, slice_to_be64};
 
 const MAX_BUF_SIZE: usize = 64 * 1024;
 
@@ -30,7 +30,7 @@ pub trait Writer {
        fn size_hint(&mut self, size: usize);
 }
 
-impl<W: ::std::io::Write> Writer for W {
+impl<W: Write> Writer for W {
        #[inline]
        fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
                <Self as ::std::io::Write>::write_all(self, buf)
@@ -39,6 +39,20 @@ impl<W: ::std::io::Write> Writer for W {
        fn size_hint(&mut self, _size: usize) { }
 }
 
+pub(crate) struct WriterWriteAdaptor<'a, W: Writer + 'a>(pub &'a mut W);
+impl<'a, W: Writer + 'a> Write for WriterWriteAdaptor<'a, W> {
+       fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
+               self.0.write_all(buf)
+       }
+       fn write(&mut self, buf: &[u8]) -> Result<usize, ::std::io::Error> {
+               self.0.write_all(buf)?;
+               Ok(buf.len())
+       }
+       fn flush(&mut self) -> Result<(), ::std::io::Error> {
+               Ok(())
+       }
+}
+
 struct VecWriter(Vec<u8>);
 impl Writer for VecWriter {
        fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
@@ -82,6 +96,32 @@ pub trait Readable<R>
        fn read(reader: &mut R) -> Result<Self, DecodeError>;
 }
 
+/// A trait that various higher-level rust-lightning types implement allowing them to be read in
+/// from a Read given some additional set of arguments which is required to deserialize.
+pub trait ReadableArgs<R, P>
+       where Self: Sized,
+             R: Read
+{
+       /// Reads a Self in from the given Read
+       fn read(reader: &mut R, params: P) -> Result<Self, DecodeError>;
+}
+
+pub(crate) struct U48(pub u64);
+impl Writeable for U48 {
+       #[inline]
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               writer.write_all(&be48_to_array(self.0))
+       }
+}
+impl<R: Read> Readable<R> for U48 {
+       #[inline]
+       fn read(reader: &mut R) -> Result<U48, DecodeError> {
+               let mut buf = [0; 6];
+               reader.read_exact(&mut buf)?;
+               Ok(U48(slice_to_be48(&buf)))
+       }
+}
+
 macro_rules! impl_writeable_primitive {
        ($val_type:ty, $meth_write:ident, $len: expr, $meth_read:ident) => {
                impl Writeable for $val_type {
@@ -300,6 +340,24 @@ impl<R: Read> Readable<R> for PublicKey {
        }
 }
 
+impl Writeable for SecretKey {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
+               let mut ser = [0; 32];
+               ser.copy_from_slice(&self[..]);
+               ser.write(w)
+       }
+}
+
+impl<R: Read> Readable<R> for SecretKey {
+       fn read(r: &mut R) -> Result<Self, DecodeError> {
+               let buf: [u8; 32] = Readable::read(r)?;
+               match SecretKey::from_slice(&Secp256k1::without_caps(), &buf) {
+                       Ok(key) => Ok(key),
+                       Err(_) => return Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
 impl Writeable for Sha256dHash {
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
                self.as_bytes().write(w)
index 09a319fb45951a043106f9ae6d7fe4d13b94779b..48e87b3bc2108a35b97282981b8738b5b87a93fb 100644 (file)
@@ -1,17 +1,19 @@
 macro_rules! impl_writeable {
        ($st:ident, $len: expr, {$($field:ident),*}) => {
-               impl Writeable for $st {
-                       fn write<W: Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
-                               w.size_hint($len);
+               impl ::util::ser::Writeable for $st {
+                       fn write<W: ::util::ser::Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
+                               if $len != 0 {
+                                       w.size_hint($len);
+                               }
                                $( self.$field.write(w)?; )*
                                Ok(())
                        }
                }
 
-               impl<R: Read> Readable<R> for $st {
-                       fn read(r: &mut R) -> Result<Self, DecodeError> {
+               impl<R: ::std::io::Read> ::util::ser::Readable<R> for $st {
+                       fn read(r: &mut R) -> Result<Self, ::ln::msgs::DecodeError> {
                                Ok(Self {
-                                       $($field: Readable::read(r)?),*
+                                       $($field: ::util::ser::Readable::read(r)?),*
                                })
                        }
                }
@@ -29,7 +31,7 @@ macro_rules! impl_writeable_len_match {
                        }
                }
 
-               impl<R: Read> Readable<R> for $st {
+               impl<R: ::std::io::Read> Readable<R> for $st {
                        fn read(r: &mut R) -> Result<Self, DecodeError> {
                                Ok(Self {
                                        $($field: Readable::read(r)?),*
index 4fa29fd8e95c94b3a52850dc5867aa4f0f1caed0..0eb49702a02dee95889679cc85556e8810b2d7df 100644 (file)
@@ -6,9 +6,10 @@ use ln::msgs;
 use ln::msgs::{HandleError};
 use util::events;
 use util::logger::{Logger, Level, Record};
-use util::ser::{Readable, Writer};
+use util::ser::{ReadableArgs, Writer};
 
 use bitcoin::blockdata::transaction::Transaction;
+use bitcoin::util::hash::Sha256dHash;
 
 use secp256k1::PublicKey;
 
@@ -55,7 +56,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
                // to a watchtower and disk...
                let mut w = VecWriter(Vec::new());
                monitor.write_for_disk(&mut w).unwrap();
-               assert!(channelmonitor::ChannelMonitor::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == monitor);
+               assert!(<(Sha256dHash, channelmonitor::ChannelMonitor)>::read(
+                               &mut ::std::io::Cursor::new(&w.0), Arc::new(TestLogger::new())).unwrap().1 == monitor);
                w.0.clear();
                monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash...
                self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone()));