From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Sat, 27 Oct 2018 14:46:12 +0000 (-0400) Subject: Merge pull request #223 from TheBlueMatt/2018-10-chanmanager-serialize X-Git-Tag: v0.0.12~283 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=8cc3be9eab9ff322d543e989f28c775a69d6b78f;hp=19a1a596857d2ca9d3217a2d224b44578c211855;p=rust-lightning Merge pull request #223 from TheBlueMatt/2018-10-chanmanager-serialize Implement and document Channel/ChannelManager (de)serialization --- diff --git a/fuzz/fuzz_targets/chanmon_deser_target.rs b/fuzz/fuzz_targets/chanmon_deser_target.rs index ba525081..9ddf52c6 100644 --- a/fuzz/fuzz_targets/chanmon_deser_target.rs +++ b/fuzz/fuzz_targets/chanmon_deser_target.rs @@ -1,13 +1,20 @@ // This file is auto-generated by gen_target.sh based on msg_target_template.txt // To modify it, modify msg_target_template.txt and run gen_target.sh instead. +extern crate bitcoin; extern crate lightning; +use bitcoin::util::hash::Sha256dHash; + use lightning::ln::channelmonitor; use lightning::util::reset_rng_state; -use lightning::util::ser::{Readable, Writer}; +use lightning::util::ser::{ReadableArgs, Writer}; + +mod utils; +use utils::test_logger; use std::io::Cursor; +use std::sync::Arc; struct VecWriter(Vec); impl Writer for VecWriter { @@ -23,10 +30,13 @@ impl Writer for VecWriter { #[inline] pub fn do_test(data: &[u8]) { reset_rng_state(); - if let Ok(monitor) = channelmonitor::ChannelMonitor::read(&mut Cursor::new(data)) { + let logger = Arc::new(test_logger::TestLogger{}); + if let Ok((latest_block_hash, monitor)) = <(Sha256dHash, channelmonitor::ChannelMonitor)>::read(&mut Cursor::new(data), logger.clone()) { let mut w = VecWriter(Vec::new()); monitor.write_for_disk(&mut w).unwrap(); - assert!(channelmonitor::ChannelMonitor::read(&mut Cursor::new(&w.0)).unwrap() == monitor); + let deserialized_copy = <(Sha256dHash, channelmonitor::ChannelMonitor)>::read(&mut Cursor::new(&w.0), logger.clone()).unwrap(); + assert!(latest_block_hash == deserialized_copy.0); + assert!(monitor == deserialized_copy.1); w.0.clear(); monitor.write_for_watchtower(&mut w).unwrap(); } diff --git a/src/chain/keysinterface.rs b/src/chain/keysinterface.rs index b3823e21..18b06936 100644 --- a/src/chain/keysinterface.rs +++ b/src/chain/keysinterface.rs @@ -78,6 +78,15 @@ pub struct ChannelKeys { pub commitment_seed: [u8; 32], } +impl_writeable!(ChannelKeys, 0, { + funding_key, + revocation_base_key, + payment_base_key, + delayed_payment_base_key, + htlc_base_key, + commitment_seed +}); + impl ChannelKeys { /// Generate a set of lightning keys needed to operate a channel by HKDF-expanding a given /// random 32-byte seed diff --git a/src/ln/channel.rs b/src/ln/channel.rs index e8ec23d9..2561fc27 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -4,7 +4,9 @@ use bitcoin::blockdata::transaction::{TxIn, TxOut, Transaction, SigHashType}; use bitcoin::blockdata::opcodes; use bitcoin::util::hash::{Sha256dHash, Hash160}; use bitcoin::util::bip143; -use bitcoin::network::serialize::BitcoinHash; +use bitcoin::network; +use bitcoin::network::serialize::{BitcoinHash, RawDecoder, RawEncoder}; +use bitcoin::network::encodable::{ConsensusEncodable, ConsensusDecodable}; use secp256k1::key::{PublicKey,SecretKey}; use secp256k1::{Secp256k1,Message,Signature}; @@ -13,7 +15,7 @@ use secp256k1; use crypto::digest::Digest; use ln::msgs; -use ln::msgs::{ErrorAction, HandleError}; +use ln::msgs::{DecodeError, ErrorAction, HandleError}; use ln::channelmonitor::ChannelMonitor; use ln::channelmanager::{PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingForwardHTLCInfo, RAACommitmentOrder}; use ln::chan_utils::{TxCreationKeys,HTLCOutputInCommitment,HTLC_SUCCESS_TX_WEIGHT,HTLC_TIMEOUT_TX_WEIGHT}; @@ -22,7 +24,7 @@ use chain::chaininterface::{FeeEstimator,ConfirmationTarget}; use chain::transaction::OutPoint; use chain::keysinterface::{ChannelKeys, KeysInterface}; use util::{transaction_utils,rng}; -use util::ser::Writeable; +use util::ser::{Readable, ReadableArgs, Writeable, Writer, WriterWriteAdaptor}; use util::sha2::Sha256; use util::logger::Logger; use util::errors::APIError; @@ -306,8 +308,9 @@ pub(super) struct Channel { /// could miss the funding_tx_confirmed_in block as well, but it serves as a useful fallback. funding_tx_confirmed_in: Option, short_channel_id: Option, - /// Used to deduplicate block_connected callbacks - last_block_connected: Sha256dHash, + /// Used to deduplicate block_connected callbacks, also used to verify consistency during + /// ChannelManager deserialization (hence pub(super)) + pub(super) last_block_connected: Sha256dHash, funding_tx_confirmations: u64, their_dust_limit_satoshis: u64, @@ -438,7 +441,7 @@ impl Channel { let secp_ctx = Secp256k1::new(); let channel_monitor = ChannelMonitor::new(&chan_keys.revocation_base_key, &chan_keys.delayed_payment_base_key, &chan_keys.htlc_base_key, BREAKDOWN_TIMEOUT, - keys_provider.get_destination_script()); + keys_provider.get_destination_script(), logger.clone()); Ok(Channel { user_id: user_id, @@ -600,7 +603,7 @@ impl Channel { let secp_ctx = Secp256k1::new(); let mut channel_monitor = ChannelMonitor::new(&chan_keys.revocation_base_key, &chan_keys.delayed_payment_base_key, &chan_keys.htlc_base_key, BREAKDOWN_TIMEOUT, - keys_provider.get_destination_script()); + keys_provider.get_destination_script(), logger.clone()); channel_monitor.set_their_base_keys(&msg.htlc_basepoint, &msg.delayed_payment_basepoint); channel_monitor.set_their_to_self_delay(msg.to_self_delay); @@ -2560,6 +2563,18 @@ impl Channel { self.feerate_per_kw } + pub fn get_cur_local_commitment_transaction_number(&self) -> u64 { + self.cur_local_commitment_transaction_number + 1 + } + + pub fn get_cur_remote_commitment_transaction_number(&self) -> u64 { + self.cur_remote_commitment_transaction_number + 1 - if self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 } + } + + pub fn get_revoked_remote_commitment_transaction_number(&self) -> u64 { + self.cur_remote_commitment_transaction_number + 2 + } + //TODO: Testing purpose only, should be changed in another way after #81 #[cfg(test)] pub fn get_local_keys(&self) -> &ChannelKeys { @@ -2671,9 +2686,10 @@ impl Channel { /// Only returns an ErrorAction of DisconnectPeer, if Err. pub fn block_connected(&mut self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> Result, HandleError> { let non_shutdown_state = self.channel_state & (!MULTI_STATE_FLAGS); - if self.funding_tx_confirmations > 0 { - if header.bitcoin_hash() != self.last_block_connected { - self.last_block_connected = header.bitcoin_hash(); + if header.bitcoin_hash() != self.last_block_connected { + self.last_block_connected = header.bitcoin_hash(); + self.channel_monitor.last_block_hash = self.last_block_connected; + if self.funding_tx_confirmations > 0 { self.funding_tx_confirmations += 1; if self.funding_tx_confirmations == Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 { let need_commitment_update = if non_shutdown_state == ChannelState::FundingSent as u32 { @@ -2754,6 +2770,8 @@ impl Channel { if Some(header.bitcoin_hash()) == self.funding_tx_confirmed_in { self.funding_tx_confirmations = Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 - 1; } + self.last_block_connected = header.bitcoin_hash(); + self.channel_monitor.last_block_hash = self.last_block_connected; false } @@ -3227,6 +3245,494 @@ impl Channel { } } +const SERIALIZATION_VERSION: u8 = 1; +const MIN_SERIALIZATION_VERSION: u8 = 1; + +impl Writeable for InboundHTLCRemovalReason { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &InboundHTLCRemovalReason::FailRelay(ref error_packet) => { + 0u8.write(writer)?; + error_packet.write(writer)?; + }, + &InboundHTLCRemovalReason::FailMalformed((ref onion_hash, ref err_code)) => { + 1u8.write(writer)?; + onion_hash.write(writer)?; + err_code.write(writer)?; + }, + &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => { + 2u8.write(writer)?; + payment_preimage.write(writer)?; + }, + } + Ok(()) + } +} + +impl Readable for InboundHTLCRemovalReason { + fn read(reader: &mut R) -> Result { + Ok(match >::read(reader)? { + 0 => InboundHTLCRemovalReason::FailRelay(Readable::read(reader)?), + 1 => InboundHTLCRemovalReason::FailMalformed((Readable::read(reader)?, Readable::read(reader)?)), + 2 => InboundHTLCRemovalReason::Fulfill(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }) + } +} + +impl Writeable for Channel { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been + // called but include holding cell updates (and obviously we don't modify self). + + writer.write_all(&[SERIALIZATION_VERSION; 1])?; + writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + + self.user_id.write(writer)?; + + self.channel_id.write(writer)?; + (self.channel_state | ChannelState::PeerDisconnected as u32).write(writer)?; + self.channel_outbound.write(writer)?; + self.announce_publicly.write(writer)?; + self.channel_value_satoshis.write(writer)?; + + self.local_keys.write(writer)?; + self.shutdown_pubkey.write(writer)?; + + self.cur_local_commitment_transaction_number.write(writer)?; + self.cur_remote_commitment_transaction_number.write(writer)?; + self.value_to_self_msat.write(writer)?; + + self.received_commitment_while_awaiting_raa.write(writer)?; + + let mut dropped_inbound_htlcs = 0; + for htlc in self.pending_inbound_htlcs.iter() { + if let InboundHTLCState::RemoteAnnounced(_) = htlc.state { + dropped_inbound_htlcs += 1; + } + } + (self.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?; + for htlc in self.pending_inbound_htlcs.iter() { + htlc.htlc_id.write(writer)?; + htlc.amount_msat.write(writer)?; + htlc.cltv_expiry.write(writer)?; + htlc.payment_hash.write(writer)?; + match &htlc.state { + &InboundHTLCState::RemoteAnnounced(_) => {}, // Drop + &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_state) => { + 1u8.write(writer)?; + htlc_state.write(writer)?; + }, + &InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_state) => { + 2u8.write(writer)?; + htlc_state.write(writer)?; + }, + &InboundHTLCState::Committed => { + 3u8.write(writer)?; + }, + &InboundHTLCState::LocalRemoved(ref removal_reason) => { + 4u8.write(writer)?; + removal_reason.write(writer)?; + }, + } + } + + macro_rules! write_option { + ($thing: expr) => { + match &$thing { + &None => 0u8.write(writer)?, + &Some(ref v) => { + 1u8.write(writer)?; + v.write(writer)?; + }, + } + } + } + + (self.pending_outbound_htlcs.len() as u64).write(writer)?; + for htlc in self.pending_outbound_htlcs.iter() { + htlc.htlc_id.write(writer)?; + htlc.amount_msat.write(writer)?; + htlc.cltv_expiry.write(writer)?; + htlc.payment_hash.write(writer)?; + htlc.source.write(writer)?; + write_option!(htlc.fail_reason); + match &htlc.state { + &OutboundHTLCState::LocalAnnounced(ref onion_packet) => { + 0u8.write(writer)?; + onion_packet.write(writer)?; + }, + &OutboundHTLCState::Committed => { + 1u8.write(writer)?; + }, + &OutboundHTLCState::RemoteRemoved => { + 2u8.write(writer)?; + }, + &OutboundHTLCState::AwaitingRemoteRevokeToRemove => { + 3u8.write(writer)?; + }, + &OutboundHTLCState::AwaitingRemovedRemoteRevoke => { + 4u8.write(writer)?; + }, + } + } + + (self.holding_cell_htlc_updates.len() as u64).write(writer)?; + for update in self.holding_cell_htlc_updates.iter() { + match update { + &HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, time_created: _ } => { + 0u8.write(writer)?; + amount_msat.write(writer)?; + cltv_expiry.write(writer)?; + payment_hash.write(writer)?; + source.write(writer)?; + onion_routing_packet.write(writer)?; + // time_created is not serialized - we re-init the timeout upon deserialization + }, + &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => { + 1u8.write(writer)?; + payment_preimage.write(writer)?; + htlc_id.write(writer)?; + }, + &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => { + 2u8.write(writer)?; + htlc_id.write(writer)?; + err_packet.write(writer)?; + } + } + } + + self.monitor_pending_revoke_and_ack.write(writer)?; + self.monitor_pending_commitment_signed.write(writer)?; + match self.monitor_pending_order { + None => 0u8.write(writer)?, + Some(RAACommitmentOrder::CommitmentFirst) => 1u8.write(writer)?, + Some(RAACommitmentOrder::RevokeAndACKFirst) => 2u8.write(writer)?, + } + + (self.monitor_pending_forwards.len() as u64).write(writer)?; + for &(ref pending_forward, ref htlc_id) in self.monitor_pending_forwards.iter() { + pending_forward.write(writer)?; + htlc_id.write(writer)?; + } + + (self.monitor_pending_failures.len() as u64).write(writer)?; + for &(ref htlc_source, ref payment_hash, ref fail_reason) in self.monitor_pending_failures.iter() { + htlc_source.write(writer)?; + payment_hash.write(writer)?; + fail_reason.write(writer)?; + } + + write_option!(self.pending_update_fee); + write_option!(self.holding_cell_update_fee); + + self.next_local_htlc_id.write(writer)?; + (self.next_remote_htlc_id - dropped_inbound_htlcs).write(writer)?; + self.channel_update_count.write(writer)?; + self.feerate_per_kw.write(writer)?; + + (self.last_local_commitment_txn.len() as u64).write(writer)?; + for tx in self.last_local_commitment_txn.iter() { + if let Err(e) = tx.consensus_encode(&mut RawEncoder::new(WriterWriteAdaptor(writer))) { + match e { + network::serialize::Error::Io(e) => return Err(e), + _ => panic!("last_local_commitment_txn must have been well-formed!"), + } + } + } + + match self.last_sent_closing_fee { + Some((feerate, fee)) => { + 1u8.write(writer)?; + feerate.write(writer)?; + fee.write(writer)?; + }, + None => 0u8.write(writer)?, + } + + write_option!(self.funding_tx_confirmed_in); + write_option!(self.short_channel_id); + + self.last_block_connected.write(writer)?; + self.funding_tx_confirmations.write(writer)?; + + self.their_dust_limit_satoshis.write(writer)?; + self.our_dust_limit_satoshis.write(writer)?; + self.their_max_htlc_value_in_flight_msat.write(writer)?; + self.their_channel_reserve_satoshis.write(writer)?; + self.their_htlc_minimum_msat.write(writer)?; + self.our_htlc_minimum_msat.write(writer)?; + self.their_to_self_delay.write(writer)?; + self.their_max_accepted_htlcs.write(writer)?; + + write_option!(self.their_funding_pubkey); + write_option!(self.their_revocation_basepoint); + write_option!(self.their_payment_basepoint); + write_option!(self.their_delayed_payment_basepoint); + write_option!(self.their_htlc_basepoint); + write_option!(self.their_cur_commitment_point); + + write_option!(self.their_prev_commitment_point); + self.their_node_id.write(writer)?; + + write_option!(self.their_shutdown_scriptpubkey); + + self.channel_monitor.write_for_disk(writer)?; + Ok(()) + } +} + +impl ReadableArgs> for Channel { + fn read(reader: &mut R, logger: Arc) -> Result { + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; + if min_ver > SERIALIZATION_VERSION { + return Err(DecodeError::UnknownVersion); + } + + let user_id = Readable::read(reader)?; + + let channel_id = Readable::read(reader)?; + let channel_state = Readable::read(reader)?; + let channel_outbound = Readable::read(reader)?; + let announce_publicly = Readable::read(reader)?; + let channel_value_satoshis = Readable::read(reader)?; + + let local_keys = Readable::read(reader)?; + let shutdown_pubkey = Readable::read(reader)?; + + let cur_local_commitment_transaction_number = Readable::read(reader)?; + let cur_remote_commitment_transaction_number = Readable::read(reader)?; + let value_to_self_msat = Readable::read(reader)?; + + let received_commitment_while_awaiting_raa = Readable::read(reader)?; + + let pending_inbound_htlc_count: u64 = Readable::read(reader)?; + let mut pending_inbound_htlcs = Vec::with_capacity(cmp::min(pending_inbound_htlc_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..pending_inbound_htlc_count { + pending_inbound_htlcs.push(InboundHTLCOutput { + htlc_id: Readable::read(reader)?, + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + state: match >::read(reader)? { + 1 => InboundHTLCState::AwaitingRemoteRevokeToAnnounce(Readable::read(reader)?), + 2 => InboundHTLCState::AwaitingAnnouncedRemoteRevoke(Readable::read(reader)?), + 3 => InboundHTLCState::Committed, + 4 => InboundHTLCState::LocalRemoved(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }, + }); + } + + macro_rules! read_option { () => { + match >::read(reader)? { + 0 => None, + 1 => Some(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + } + } } + + let pending_outbound_htlc_count: u64 = Readable::read(reader)?; + let mut pending_outbound_htlcs = Vec::with_capacity(cmp::min(pending_outbound_htlc_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..pending_outbound_htlc_count { + pending_outbound_htlcs.push(OutboundHTLCOutput { + htlc_id: Readable::read(reader)?, + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + source: Readable::read(reader)?, + fail_reason: read_option!(), + state: match >::read(reader)? { + 0 => OutboundHTLCState::LocalAnnounced(Box::new(Readable::read(reader)?)), + 1 => OutboundHTLCState::Committed, + 2 => OutboundHTLCState::RemoteRemoved, + 3 => OutboundHTLCState::AwaitingRemoteRevokeToRemove, + 4 => OutboundHTLCState::AwaitingRemovedRemoteRevoke, + _ => return Err(DecodeError::InvalidValue), + }, + }); + } + + let holding_cell_htlc_update_count: u64 = Readable::read(reader)?; + let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2)); + for _ in 0..holding_cell_htlc_update_count { + holding_cell_htlc_updates.push(match >::read(reader)? { + 0 => HTLCUpdateAwaitingACK::AddHTLC { + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + source: Readable::read(reader)?, + onion_routing_packet: Readable::read(reader)?, + time_created: Instant::now(), + }, + 1 => HTLCUpdateAwaitingACK::ClaimHTLC { + payment_preimage: Readable::read(reader)?, + htlc_id: Readable::read(reader)?, + }, + 2 => HTLCUpdateAwaitingACK::FailHTLC { + htlc_id: Readable::read(reader)?, + err_packet: Readable::read(reader)?, + }, + _ => return Err(DecodeError::InvalidValue), + }); + } + + let monitor_pending_revoke_and_ack = Readable::read(reader)?; + let monitor_pending_commitment_signed = Readable::read(reader)?; + + let monitor_pending_order = match >::read(reader)? { + 0 => None, + 1 => Some(RAACommitmentOrder::CommitmentFirst), + 2 => Some(RAACommitmentOrder::RevokeAndACKFirst), + _ => return Err(DecodeError::InvalidValue), + }; + + let monitor_pending_forwards_count: u64 = Readable::read(reader)?; + let mut monitor_pending_forwards = Vec::with_capacity(cmp::min(monitor_pending_forwards_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..monitor_pending_forwards_count { + monitor_pending_forwards.push((Readable::read(reader)?, Readable::read(reader)?)); + } + + let monitor_pending_failures_count: u64 = Readable::read(reader)?; + let mut monitor_pending_failures = Vec::with_capacity(cmp::min(monitor_pending_failures_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..monitor_pending_failures_count { + monitor_pending_failures.push((Readable::read(reader)?, Readable::read(reader)?, Readable::read(reader)?)); + } + + let pending_update_fee = read_option!(); + let holding_cell_update_fee = read_option!(); + + let next_local_htlc_id = Readable::read(reader)?; + let next_remote_htlc_id = Readable::read(reader)?; + let channel_update_count = Readable::read(reader)?; + let feerate_per_kw = Readable::read(reader)?; + + let last_local_commitment_txn_count: u64 = Readable::read(reader)?; + let mut last_local_commitment_txn = Vec::with_capacity(cmp::min(last_local_commitment_txn_count as usize, OUR_MAX_HTLCS as usize*2 + 1)); + for _ in 0..last_local_commitment_txn_count { + last_local_commitment_txn.push(match Transaction::consensus_decode(&mut RawDecoder::new(reader.by_ref())) { + Ok(tx) => tx, + Err(_) => return Err(DecodeError::InvalidValue), + }); + } + + let last_sent_closing_fee = match >::read(reader)? { + 0 => None, + 1 => Some((Readable::read(reader)?, Readable::read(reader)?)), + _ => return Err(DecodeError::InvalidValue), + }; + + let funding_tx_confirmed_in = read_option!(); + let short_channel_id = read_option!(); + + let last_block_connected = Readable::read(reader)?; + let funding_tx_confirmations = Readable::read(reader)?; + + let their_dust_limit_satoshis = Readable::read(reader)?; + let our_dust_limit_satoshis = Readable::read(reader)?; + let their_max_htlc_value_in_flight_msat = Readable::read(reader)?; + let their_channel_reserve_satoshis = Readable::read(reader)?; + let their_htlc_minimum_msat = Readable::read(reader)?; + let our_htlc_minimum_msat = Readable::read(reader)?; + let their_to_self_delay = Readable::read(reader)?; + let their_max_accepted_htlcs = Readable::read(reader)?; + + let their_funding_pubkey = read_option!(); + let their_revocation_basepoint = read_option!(); + let their_payment_basepoint = read_option!(); + let their_delayed_payment_basepoint = read_option!(); + let their_htlc_basepoint = read_option!(); + let their_cur_commitment_point = read_option!(); + + let their_prev_commitment_point = read_option!(); + let their_node_id = Readable::read(reader)?; + + let their_shutdown_scriptpubkey = read_option!(); + let (monitor_last_block, channel_monitor) = ReadableArgs::read(reader, logger.clone())?; + // We drop the ChannelMonitor's last block connected hash cause we don't actually bother + // doing full block connection operations on the internal CHannelMonitor copies + if monitor_last_block != last_block_connected { + return Err(DecodeError::InvalidValue); + } + + Ok(Channel { + user_id, + + channel_id, + channel_state, + channel_outbound, + secp_ctx: Secp256k1::new(), + announce_publicly, + channel_value_satoshis, + + local_keys, + shutdown_pubkey, + + cur_local_commitment_transaction_number, + cur_remote_commitment_transaction_number, + value_to_self_msat, + + received_commitment_while_awaiting_raa, + pending_inbound_htlcs, + pending_outbound_htlcs, + holding_cell_htlc_updates, + + monitor_pending_revoke_and_ack, + monitor_pending_commitment_signed, + monitor_pending_order, + monitor_pending_forwards, + monitor_pending_failures, + + pending_update_fee, + holding_cell_update_fee, + next_local_htlc_id, + next_remote_htlc_id, + channel_update_count, + feerate_per_kw, + + #[cfg(debug_assertions)] + max_commitment_tx_output_local: ::std::sync::Mutex::new((0, 0)), + #[cfg(debug_assertions)] + max_commitment_tx_output_remote: ::std::sync::Mutex::new((0, 0)), + + last_local_commitment_txn, + + last_sent_closing_fee, + + funding_tx_confirmed_in, + short_channel_id, + last_block_connected, + funding_tx_confirmations, + + their_dust_limit_satoshis, + our_dust_limit_satoshis, + their_max_htlc_value_in_flight_msat, + their_channel_reserve_satoshis, + their_htlc_minimum_msat, + our_htlc_minimum_msat, + their_to_self_delay, + their_max_accepted_htlcs, + + their_funding_pubkey, + their_revocation_basepoint, + their_payment_basepoint, + their_delayed_payment_basepoint, + their_htlc_basepoint, + their_cur_commitment_point, + + their_prev_commitment_point, + their_node_id, + + their_shutdown_scriptpubkey, + + channel_monitor, + + logger, + }) + } +} + #[cfg(test)] mod tests { use bitcoin::util::hash::{Sha256dHash, Hash160}; diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index ae9669c2..6230225b 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -23,14 +23,14 @@ use secp256k1; use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; -use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; +use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route,RouteHop}; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, HandleError}; +use ln::msgs::{ChannelMessageHandler, DecodeError, HandleError}; use chain::keysinterface::KeysInterface; use util::{byte_utils, events, internal_traits, rng}; use util::sha2::Sha256; -use util::ser::{Readable, Writeable}; +use util::ser::{Readable, ReadableArgs, Writeable, Writer}; use util::chacha20poly1305rfc::ChaCha20; use util::logger::Logger; use util::errors::APIError; @@ -41,11 +41,10 @@ use crypto::hmac::Hmac; use crypto::digest::Digest; use crypto::symmetriccipher::SynchronousStreamCipher; -use std::{ptr, mem}; -use std::collections::HashMap; -use std::collections::hash_map; +use std::{cmp, ptr, mem}; +use std::collections::{HashMap, hash_map, HashSet}; use std::io::Cursor; -use std::sync::{Mutex,MutexGuard,Arc}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Instant,Duration}; @@ -65,13 +64,12 @@ mod channel_held_info { use ln::msgs; use ln::router::Route; use secp256k1::key::SecretKey; - use secp256k1::ecdh::SharedSecret; /// Stores the info we will need to send when we want to forward an HTLC onwards #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug pub struct PendingForwardHTLCInfo { pub(super) onion_packet: Option, - pub(super) incoming_shared_secret: SharedSecret, + pub(super) incoming_shared_secret: [u8; 32], pub(super) payment_hash: [u8; 32], pub(super) short_channel_id: u64, pub(super) amt_to_forward: u64, @@ -96,7 +94,7 @@ mod channel_held_info { pub struct HTLCPreviousHopData { pub(super) short_channel_id: u64, pub(super) htlc_id: u64, - pub(super) incoming_packet_shared_secret: SharedSecret, + pub(super) incoming_packet_shared_secret: [u8; 32], } /// Tracks the inbound corresponding to an outbound HTLC @@ -302,6 +300,25 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum /// /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through /// to individual Channels. +/// +/// Implements Writeable to write out all channel state to disk. Implies peer_disconnected() for +/// all peers during write/read (though does not modify this instance, only the instance being +/// serialized). This will result in any channels which have not yet exchanged funding_created (ie +/// called funding_transaction_generated for outbound channels). +/// +/// Note that you can be a bit lazier about writing out ChannelManager than you can be with +/// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before +/// returning from ManyChannelMonitor::add_update_monitor, with ChannelManagers, writing updates +/// happens out-of-band (and will prevent any other ChannelManager operations from occurring during +/// the serialization process). If the deserialized version is out-of-date compared to the +/// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the +/// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees). +/// +/// Note that the deserializer is only implemented for (Sha256dHash, ChannelManager), which +/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along +/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call +/// block_connected() to step towards your best block) upon deserialization before using the +/// object! pub struct ChannelManager { genesis_hash: Sha256dHash, fee_estimator: Arc, @@ -312,12 +329,17 @@ pub struct ChannelManager { announce_channels_publicly: bool, fee_proportional_millionths: u32, latest_block_height: AtomicUsize, + last_block_hash: Mutex, secp_ctx: Secp256k1, channel_state: Mutex, our_network_key: SecretKey, pending_events: Mutex>, + /// Used when we have to take a BIG lock to make sure everything is self-consistent. + /// Essentially just when we're serializing ourselves out. + /// Taken first everywhere where we are making changes before any other locks. + total_consistency_lock: RwLock<()>, keys_manager: Arc, @@ -405,7 +427,8 @@ impl ChannelManager { announce_channels_publicly, fee_proportional_millionths, - latest_block_height: AtomicUsize::new(0), //TODO: Get an init value (generally need to replay recent chain on chain_monitor registration) + latest_block_height: AtomicUsize::new(0), //TODO: Get an init value + last_block_hash: Mutex::new(Default::default()), secp_ctx, channel_state: Mutex::new(ChannelHolder{ @@ -419,6 +442,7 @@ impl ChannelManager { our_network_key: keys_manager.get_node_secret(), pending_events: Mutex::new(Vec::new()), + total_consistency_lock: RwLock::new(()), keys_manager, @@ -443,6 +467,8 @@ impl ChannelManager { pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> { let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?; let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator); + + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -506,6 +532,8 @@ impl ChannelManager { /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { + let _ = self.total_consistency_lock.read().unwrap(); + let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -568,6 +596,8 @@ impl ChannelManager { /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. pub fn force_close_channel(&self, channel_id: &[u8; 32]) { + let _ = self.total_consistency_lock.read().unwrap(); + let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -625,7 +655,8 @@ impl ChannelManager { } #[inline] - fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) { + fn gen_rho_mu_from_shared_secret(shared_secret: &[u8]) -> ([u8; 32], [u8; 32]) { + assert_eq!(shared_secret.len(), 32); ({ let mut hmac = Hmac::new(Sha256::new(), &[0x72, 0x68, 0x6f]); // rho hmac.input(&shared_secret[..]); @@ -643,7 +674,8 @@ impl ChannelManager { } #[inline] - fn gen_um_from_shared_secret(shared_secret: &SharedSecret) -> [u8; 32] { + fn gen_um_from_shared_secret(shared_secret: &[u8]) -> [u8; 32] { + assert_eq!(shared_secret.len(), 32); let mut hmac = Hmac::new(Sha256::new(), &[0x75, 0x6d]); // um hmac.input(&shared_secret[..]); let mut res = [0; 32]; @@ -652,7 +684,8 @@ impl ChannelManager { } #[inline] - fn gen_ammag_from_shared_secret(shared_secret: &SharedSecret) -> [u8; 32] { + fn gen_ammag_from_shared_secret(shared_secret: &[u8]) -> [u8; 32] { + assert_eq!(shared_secret.len(), 32); let mut hmac = Hmac::new(Sha256::new(), &[0x61, 0x6d, 0x6d, 0x61, 0x67]); // ammag hmac.input(&shared_secret[..]); let mut res = [0; 32]; @@ -691,7 +724,7 @@ impl ChannelManager { let mut res = Vec::with_capacity(route.hops.len()); Self::construct_onion_keys_callback(secp_ctx, route, session_priv, |shared_secret, _blinding_factor, ephemeral_pubkey, _| { - let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret); + let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret[..]); res.push(OnionKeys { #[cfg(test)] @@ -815,7 +848,7 @@ impl ChannelManager { /// Encrypts a failure packet. raw_packet can either be a /// msgs::DecodedOnionErrorPacket.encode() result or a msgs::OnionErrorPacket.data element. - fn encrypt_failure_packet(shared_secret: &SharedSecret, raw_packet: &[u8]) -> msgs::OnionErrorPacket { + fn encrypt_failure_packet(shared_secret: &[u8], raw_packet: &[u8]) -> msgs::OnionErrorPacket { let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret); let mut packet_crypted = Vec::with_capacity(raw_packet.len()); @@ -827,7 +860,8 @@ impl ChannelManager { } } - fn build_failure_packet(shared_secret: &SharedSecret, failure_type: u16, failure_data: &[u8]) -> msgs::DecodedOnionErrorPacket { + fn build_failure_packet(shared_secret: &[u8], failure_type: u16, failure_data: &[u8]) -> msgs::DecodedOnionErrorPacket { + assert_eq!(shared_secret.len(), 32); assert!(failure_data.len() <= 256 - 2); let um = ChannelManager::gen_um_from_shared_secret(&shared_secret); @@ -858,7 +892,7 @@ impl ChannelManager { } #[inline] - fn build_first_hop_failure_packet(shared_secret: &SharedSecret, failure_type: u16, failure_data: &[u8]) -> msgs::OnionErrorPacket { + fn build_first_hop_failure_packet(shared_secret: &[u8], failure_type: u16, failure_data: &[u8]) -> msgs::OnionErrorPacket { let failure_packet = ChannelManager::build_failure_packet(shared_secret, failure_type, failure_data); ChannelManager::encrypt_failure_packet(shared_secret, &failure_packet.encode()[..]) } @@ -886,7 +920,11 @@ impl ChannelManager { })), self.channel_state.lock().unwrap()); } - let shared_secret = SharedSecret::new(&self.secp_ctx, &msg.onion_routing_packet.public_key.unwrap(), &self.our_network_key); + let shared_secret = { + let mut arr = [0; 32]; + arr.copy_from_slice(&SharedSecret::new(&self.secp_ctx, &msg.onion_routing_packet.public_key.unwrap(), &self.our_network_key)[..]); + arr + }; let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret); let mut channel_state = None; @@ -963,7 +1001,7 @@ impl ChannelManager { onion_packet: None, payment_hash: msg.payment_hash.clone(), short_channel_id: 0, - incoming_shared_secret: shared_secret.clone(), + incoming_shared_secret: shared_secret, amt_to_forward: next_hop_data.data.amt_to_forward, outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value, }) @@ -977,7 +1015,7 @@ impl ChannelManager { let blinding_factor = { let mut sha = Sha256::new(); sha.input(&new_pubkey.serialize()[..]); - sha.input(&shared_secret[..]); + sha.input(&shared_secret); let mut res = [0u8; 32]; sha.result(&mut res); match SecretKey::from_slice(&self.secp_ctx, &res) { @@ -1003,7 +1041,7 @@ impl ChannelManager { onion_packet: Some(outgoing_packet), payment_hash: msg.payment_hash.clone(), short_channel_id: next_hop_data.data.short_channel_id, - incoming_shared_secret: shared_secret.clone(), + incoming_shared_secret: shared_secret, amt_to_forward: next_hop_data.data.amt_to_forward, outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value, }) @@ -1140,6 +1178,7 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = self.channel_state.lock().unwrap(); let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { @@ -1196,6 +1235,8 @@ impl ChannelManager { /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { + let _ = self.total_consistency_lock.read().unwrap(); + let (chan, msg, chan_monitor) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.remove(temporary_channel_id) { @@ -1261,6 +1302,8 @@ impl ChannelManager { /// Should only really ever be called in response to an PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { + let _ = self.total_consistency_lock.read().unwrap(); + let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); { @@ -1382,6 +1425,8 @@ impl ChannelManager { /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect after a PaymentReceived event. pub fn fail_htlc_backwards(&self, payment_hash: &[u8; 32], reason: PaymentFailReason) -> bool { + let _ = self.total_consistency_lock.read().unwrap(); + let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash); if let Some(mut sources) = removed_source { @@ -1477,6 +1522,8 @@ impl ChannelManager { let mut payment_hash = [0; 32]; sha.result(&mut payment_hash); + let _ = self.total_consistency_lock.read().unwrap(); + let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash); if let Some(mut sources) = removed_source { @@ -1555,6 +1602,7 @@ impl ChannelManager { let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); let mut htlc_failures = Vec::new(); + let _ = self.total_consistency_lock.read().unwrap(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1938,7 +1986,7 @@ impl ChannelManager { let amt_to_forward = htlc_msat - route_hop.fee_msat; htlc_msat = amt_to_forward; - let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret); + let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret[..]); let mut decryption_tmp = Vec::with_capacity(packet_decrypted.len()); decryption_tmp.resize(packet_decrypted.len(), 0); @@ -1949,7 +1997,7 @@ impl ChannelManager { let is_from_final_node = route.hops.last().unwrap().pubkey == route_hop.pubkey; if let Ok(err_packet) = msgs::DecodedOnionErrorPacket::read(&mut Cursor::new(&packet_decrypted)) { - let um = ChannelManager::gen_um_from_shared_secret(&shared_secret); + let um = ChannelManager::gen_um_from_shared_secret(&shared_secret[..]); let mut hmac = Hmac::new(Sha256::new(), &um); hmac.input(&err_packet.encode()[32..]); let mut calc_tag = [0u8; 32]; @@ -2359,6 +2407,7 @@ impl ChannelManager { /// Note: This API is likely to change! #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -2416,6 +2465,7 @@ impl events::EventsProvider for ChannelManager { impl ChainListener for ChannelManager { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -2489,10 +2539,12 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.store(height as usize, Ordering::Release); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } /// We force-close the channel without letting our counterparty participate in the shutdown fn block_disconnected(&self, header: &BlockHeader) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -2520,6 +2572,7 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } } @@ -2558,70 +2611,87 @@ macro_rules! handle_error { impl ChannelMessageHandler for ChannelManager { //TODO: Handle errors and close channel (or so) fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id) } fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id) } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id) } fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id) } fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id) } fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id) } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id) } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), msgs::HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), their_node_id) } fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), their_node_id) } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), their_node_id) } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), their_node_id) } fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_commitment_signed(their_node_id, msg), their_node_id) } fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), their_node_id) } fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_update_fee(their_node_id, msg), their_node_id) } fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id) } fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), HandleError> { + let _ = self.total_consistency_lock.read().unwrap(); handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), their_node_id) } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { + let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); { @@ -2677,6 +2747,7 @@ impl ChannelMessageHandler for ChannelManager { } fn peer_connected(&self, their_node_id: &PublicKey) { + let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); let pending_msg_events = channel_state.pending_msg_events; @@ -2701,6 +2772,8 @@ impl ChannelMessageHandler for ChannelManager { } fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) { + let _ = self.total_consistency_lock.read().unwrap(); + if msg.channel_id == [0; 32] { for chan in self.list_channels() { if chan.remote_network_id == *their_node_id { @@ -2713,6 +2786,391 @@ impl ChannelMessageHandler for ChannelManager { } } +const SERIALIZATION_VERSION: u8 = 1; +const MIN_SERIALIZATION_VERSION: u8 = 1; + +impl Writeable for PendingForwardHTLCInfo { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + if let &Some(ref onion) = &self.onion_packet { + 1u8.write(writer)?; + onion.write(writer)?; + } else { + 0u8.write(writer)?; + } + self.incoming_shared_secret.write(writer)?; + self.payment_hash.write(writer)?; + self.short_channel_id.write(writer)?; + self.amt_to_forward.write(writer)?; + self.outgoing_cltv_value.write(writer)?; + Ok(()) + } +} + +impl Readable for PendingForwardHTLCInfo { + fn read(reader: &mut R) -> Result { + let onion_packet = match >::read(reader)? { + 0 => None, + 1 => Some(msgs::OnionPacket::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }; + Ok(PendingForwardHTLCInfo { + onion_packet, + incoming_shared_secret: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + short_channel_id: Readable::read(reader)?, + amt_to_forward: Readable::read(reader)?, + outgoing_cltv_value: Readable::read(reader)?, + }) + } +} + +impl Writeable for HTLCFailureMsg { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailureMsg::Relay(ref fail_msg) => { + 0u8.write(writer)?; + fail_msg.write(writer)?; + }, + &HTLCFailureMsg::Malformed(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailureMsg { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)), + 1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for PendingHTLCStatus { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &PendingHTLCStatus::Forward(ref forward_info) => { + 0u8.write(writer)?; + forward_info.write(writer)?; + }, + &PendingHTLCStatus::Fail(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for PendingHTLCStatus { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)), + 1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCPreviousHopData, 0, { + short_channel_id, + htlc_id, + incoming_packet_shared_secret +}); + +impl Writeable for HTLCSource { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCSource::PreviousHopData(ref hop_data) => { + 0u8.write(writer)?; + hop_data.write(writer)?; + }, + &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => { + 1u8.write(writer)?; + route.write(writer)?; + session_priv.write(writer)?; + first_hop_htlc_msat.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCSource { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), + 1 => Ok(HTLCSource::OutboundRoute { + route: Readable::read(reader)?, + session_priv: Readable::read(reader)?, + first_hop_htlc_msat: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for HTLCFailReason { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailReason::ErrorPacket { ref err } => { + 0u8.write(writer)?; + err.write(writer)?; + }, + &HTLCFailReason::Reason { ref failure_code, ref data } => { + 1u8.write(writer)?; + failure_code.write(writer)?; + data.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailReason { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailReason::ErrorPacket { err: Readable::read(reader)? }), + 1 => Ok(HTLCFailReason::Reason { + failure_code: Readable::read(reader)?, + data: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCForwardInfo, 0, { + prev_short_channel_id, + prev_htlc_id, + forward_info +}); + +impl Writeable for ChannelManager { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + let _ = self.total_consistency_lock.write().unwrap(); + + writer.write_all(&[SERIALIZATION_VERSION; 1])?; + writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + + self.genesis_hash.write(writer)?; + self.announce_channels_publicly.write(writer)?; + self.fee_proportional_millionths.write(writer)?; + (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?; + self.last_block_hash.lock().unwrap().write(writer)?; + + let channel_state = self.channel_state.lock().unwrap(); + let mut unfunded_channels = 0; + for (_, channel) in channel_state.by_id.iter() { + if !channel.is_funding_initiated() { + unfunded_channels += 1; + } + } + ((channel_state.by_id.len() - unfunded_channels) as u64).write(writer)?; + for (_, channel) in channel_state.by_id.iter() { + if channel.is_funding_initiated() { + channel.write(writer)?; + } + } + + (channel_state.forward_htlcs.len() as u64).write(writer)?; + for (short_channel_id, pending_forwards) in channel_state.forward_htlcs.iter() { + short_channel_id.write(writer)?; + (pending_forwards.len() as u64).write(writer)?; + for forward in pending_forwards { + forward.write(writer)?; + } + } + + (channel_state.claimable_htlcs.len() as u64).write(writer)?; + for (payment_hash, previous_hops) in channel_state.claimable_htlcs.iter() { + payment_hash.write(writer)?; + (previous_hops.len() as u64).write(writer)?; + for previous_hop in previous_hops { + previous_hop.write(writer)?; + } + } + + Ok(()) + } +} + +/// Arguments for the creation of a ChannelManager that are not deserialized. +/// +/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation +/// is: +/// 1) Deserialize all stored ChannelMonitors. +/// 2) Deserialize the ChannelManager by filling in this struct and calling <(Sha256dHash, +/// ChannelManager)>::read(reader, args). +/// This may result in closing some Channels if the ChannelMonitor is newer than the stored +/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. +/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using +/// ChannelMonitor::get_monitored_outpoints and ChannelMonitor::get_funding_txo(). +/// 4) Reconnect blocks on your ChannelMonitors. +/// 5) Move the ChannelMonitors into your local ManyChannelMonitor. +/// 6) Disconnect/connect blocks on the ChannelManager. +/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen +/// automatically as it does in ChannelManager::new()). +pub struct ChannelManagerReadArgs<'a> { + /// The keys provider which will give us relevant keys. Some keys will be loaded during + /// deserialization. + pub keys_manager: Arc, + + /// The fee_estimator for use in the ChannelManager in the future. + /// + /// No calls to the FeeEstimator will be made during deserialization. + pub fee_estimator: Arc, + /// The ManyChannelMonitor for use in the ChannelManager in the future. + /// + /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that + /// you have deserialized ChannelMonitors separately and will add them to your + /// ManyChannelMonitor after deserializing this ChannelManager. + pub monitor: Arc, + /// The ChainWatchInterface for use in the ChannelManager in the future. + /// + /// No calls to the ChainWatchInterface will be made during deserialization. + pub chain_monitor: Arc, + /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be + /// used to broadcast the latest local commitment transactions of channels which must be + /// force-closed during deserialization. + pub tx_broadcaster: Arc, + /// The Logger for use in the ChannelManager and which may be used to log information during + /// deserialization. + pub logger: Arc, + + + /// A map from channel funding outpoints to ChannelMonitors for those channels (ie + /// value.get_funding_txo() should be the key). + /// + /// If a monitor is inconsistent with the channel state during deserialization the channel will + /// be force-closed using the data in the channelmonitor and the Channel will be dropped. This + /// is true for missing channels as well. If there is a monitor missing for which we find + /// channel data Err(DecodeError::InvalidValue) will be returned. + /// + /// In such cases the latest local transactions will be sent to the tx_broadcaster included in + /// this struct. + pub channel_monitors: &'a HashMap, +} + +impl<'a, R : ::std::io::Read> ReadableArgs> for (Sha256dHash, ChannelManager) { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result { + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; + if min_ver > SERIALIZATION_VERSION { + return Err(DecodeError::UnknownVersion); + } + + let genesis_hash: Sha256dHash = Readable::read(reader)?; + let announce_channels_publicly: bool = Readable::read(reader)?; + let fee_proportional_millionths: u32 = Readable::read(reader)?; + let latest_block_height: u32 = Readable::read(reader)?; + let last_block_hash: Sha256dHash = Readable::read(reader)?; + + let mut closed_channels = Vec::new(); + + let channel_count: u64 = Readable::read(reader)?; + let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128)); + let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + for _ in 0..channel_count { + let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?; + if channel.last_block_connected != last_block_hash { + return Err(DecodeError::InvalidValue); + } + + let funding_txo = channel.channel_monitor().get_funding_txo().ok_or(DecodeError::InvalidValue)?; + funding_txo_set.insert(funding_txo.clone()); + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() || + channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() || + channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() { + let mut force_close_res = channel.force_shutdown(); + force_close_res.0 = monitor.get_latest_local_commitment_txn(); + closed_channels.push(force_close_res); + } else { + if let Some(short_channel_id) = channel.get_short_channel_id() { + short_to_id.insert(short_channel_id, channel.channel_id()); + } + by_id.insert(channel.channel_id(), channel); + } + } else { + return Err(DecodeError::InvalidValue); + } + } + + for (ref funding_txo, ref monitor) in args.channel_monitors.iter() { + if !funding_txo_set.contains(funding_txo) { + closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new())); + } + } + + let forward_htlcs_count: u64 = Readable::read(reader)?; + let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128)); + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128)); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + forward_htlcs.insert(short_channel_id, pending_forwards); + } + + let claimable_htlcs_count: u64 = Readable::read(reader)?; + let mut claimable_htlcs = HashMap::with_capacity(cmp::min(claimable_htlcs_count as usize, 128)); + for _ in 0..claimable_htlcs_count { + let payment_hash = Readable::read(reader)?; + let previous_hops_len: u64 = Readable::read(reader)?; + let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2)); + for _ in 0..previous_hops_len { + previous_hops.push(Readable::read(reader)?); + } + claimable_htlcs.insert(payment_hash, previous_hops); + } + + let channel_manager = ChannelManager { + genesis_hash, + fee_estimator: args.fee_estimator, + monitor: args.monitor, + chain_monitor: args.chain_monitor, + tx_broadcaster: args.tx_broadcaster, + + announce_channels_publicly, + fee_proportional_millionths, + latest_block_height: AtomicUsize::new(latest_block_height as usize), + last_block_hash: Mutex::new(last_block_hash), + secp_ctx: Secp256k1::new(), + + channel_state: Mutex::new(ChannelHolder { + by_id, + short_to_id, + next_forward: Instant::now(), + forward_htlcs, + claimable_htlcs, + pending_msg_events: Vec::new(), + }), + our_network_key: args.keys_manager.get_node_secret(), + + pending_events: Mutex::new(Vec::new()), + total_consistency_lock: RwLock::new(()), + keys_manager: args.keys_manager, + logger: args.logger, + }; + + for close_res in closed_channels.drain(..) { + channel_manager.finish_force_close_channel(close_res); + //TODO: Broadcast channel update for closed channels, but only after we've made a + //connection or two. + } + + Ok((last_block_hash.clone(), channel_manager)) + } +} + #[cfg(test)] mod tests { use chain::chaininterface; @@ -2720,8 +3178,8 @@ mod tests { use chain::chaininterface::ChainListener; use chain::keysinterface::KeysInterface; use chain::keysinterface; - use ln::channelmanager::{ChannelManager,OnionKeys,PaymentFailReason,RAACommitmentOrder}; - use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; + use ln::channelmanager::{ChannelManager,ChannelManagerReadArgs,OnionKeys,PaymentFailReason,RAACommitmentOrder}; + use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS, ManyChannelMonitor}; use ln::router::{Route, RouteHop, Router}; use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; @@ -2729,7 +3187,7 @@ mod tests { use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::logger::Logger; - use util::ser::Writeable; + use util::ser::{Writeable, Writer, ReadableArgs}; use bitcoin::util::hash::Sha256dHash; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -2889,22 +3347,22 @@ mod tests { // Returning Errors test vectors from BOLT 4 let onion_keys = build_test_onion_keys(); - let onion_error = ChannelManager::build_failure_packet(&onion_keys[4].shared_secret, 0x2002, &[0; 0]); + let onion_error = ChannelManager::build_failure_packet(&onion_keys[4].shared_secret[..], 0x2002, &[0; 0]); assert_eq!(onion_error.encode(), hex::decode("4c2fc8bc08510334b6833ad9c3e79cd1b52ae59dfe5c2a4b23ead50f09f7ee0b0002200200fe0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000").unwrap()); - let onion_packet_1 = ChannelManager::encrypt_failure_packet(&onion_keys[4].shared_secret, &onion_error.encode()[..]); + let onion_packet_1 = ChannelManager::encrypt_failure_packet(&onion_keys[4].shared_secret[..], &onion_error.encode()[..]); assert_eq!(onion_packet_1.data, hex::decode("a5e6bd0c74cb347f10cce367f949098f2457d14c046fd8a22cb96efb30b0fdcda8cb9168b50f2fd45edd73c1b0c8b33002df376801ff58aaa94000bf8a86f92620f343baef38a580102395ae3abf9128d1047a0736ff9b83d456740ebbb4aeb3aa9737f18fb4afb4aa074fb26c4d702f42968888550a3bded8c05247e045b866baef0499f079fdaeef6538f31d44deafffdfd3afa2fb4ca9082b8f1c465371a9894dd8c243fb4847e004f5256b3e90e2edde4c9fb3082ddfe4d1e734cacd96ef0706bf63c9984e22dc98851bcccd1c3494351feb458c9c6af41c0044bea3c47552b1d992ae542b17a2d0bba1a096c78d169034ecb55b6e3a7263c26017f033031228833c1daefc0dedb8cf7c3e37c9c37ebfe42f3225c326e8bcfd338804c145b16e34e4").unwrap()); - let onion_packet_2 = ChannelManager::encrypt_failure_packet(&onion_keys[3].shared_secret, &onion_packet_1.data[..]); + let onion_packet_2 = ChannelManager::encrypt_failure_packet(&onion_keys[3].shared_secret[..], &onion_packet_1.data[..]); assert_eq!(onion_packet_2.data, hex::decode("c49a1ce81680f78f5f2000cda36268de34a3f0a0662f55b4e837c83a8773c22aa081bab1616a0011585323930fa5b9fae0c85770a2279ff59ec427ad1bbff9001c0cd1497004bd2a0f68b50704cf6d6a4bf3c8b6a0833399a24b3456961ba00736785112594f65b6b2d44d9f5ea4e49b5e1ec2af978cbe31c67114440ac51a62081df0ed46d4a3df295da0b0fe25c0115019f03f15ec86fabb4c852f83449e812f141a9395b3f70b766ebbd4ec2fae2b6955bd8f32684c15abfe8fd3a6261e52650e8807a92158d9f1463261a925e4bfba44bd20b166d532f0017185c3a6ac7957adefe45559e3072c8dc35abeba835a8cb01a71a15c736911126f27d46a36168ca5ef7dccd4e2886212602b181463e0dd30185c96348f9743a02aca8ec27c0b90dca270").unwrap()); - let onion_packet_3 = ChannelManager::encrypt_failure_packet(&onion_keys[2].shared_secret, &onion_packet_2.data[..]); + let onion_packet_3 = ChannelManager::encrypt_failure_packet(&onion_keys[2].shared_secret[..], &onion_packet_2.data[..]); assert_eq!(onion_packet_3.data, hex::decode("a5d3e8634cfe78b2307d87c6d90be6fe7855b4f2cc9b1dfb19e92e4b79103f61ff9ac25f412ddfb7466e74f81b3e545563cdd8f5524dae873de61d7bdfccd496af2584930d2b566b4f8d3881f8c043df92224f38cf094cfc09d92655989531524593ec6d6caec1863bdfaa79229b5020acc034cd6deeea1021c50586947b9b8e6faa83b81fbfa6133c0af5d6b07c017f7158fa94f0d206baf12dda6b68f785b773b360fd0497e16cc402d779c8d48d0fa6315536ef0660f3f4e1865f5b38ea49c7da4fd959de4e83ff3ab686f059a45c65ba2af4a6a79166aa0f496bf04d06987b6d2ea205bdb0d347718b9aeff5b61dfff344993a275b79717cd815b6ad4c0beb568c4ac9c36ff1c315ec1119a1993c4b61e6eaa0375e0aaf738ac691abd3263bf937e3").unwrap()); - let onion_packet_4 = ChannelManager::encrypt_failure_packet(&onion_keys[1].shared_secret, &onion_packet_3.data[..]); + let onion_packet_4 = ChannelManager::encrypt_failure_packet(&onion_keys[1].shared_secret[..], &onion_packet_3.data[..]); assert_eq!(onion_packet_4.data, hex::decode("aac3200c4968f56b21f53e5e374e3a2383ad2b1b6501bbcc45abc31e59b26881b7dfadbb56ec8dae8857add94e6702fb4c3a4de22e2e669e1ed926b04447fc73034bb730f4932acd62727b75348a648a1128744657ca6a4e713b9b646c3ca66cac02cdab44dd3439890ef3aaf61708714f7375349b8da541b2548d452d84de7084bb95b3ac2345201d624d31f4d52078aa0fa05a88b4e20202bd2b86ac5b52919ea305a8949de95e935eed0319cf3cf19ebea61d76ba92532497fcdc9411d06bcd4275094d0a4a3c5d3a945e43305a5a9256e333e1f64dbca5fcd4e03a39b9012d197506e06f29339dfee3331995b21615337ae060233d39befea925cc262873e0530408e6990f1cbd233a150ef7b004ff6166c70c68d9f8c853c1abca640b8660db2921").unwrap()); - let onion_packet_5 = ChannelManager::encrypt_failure_packet(&onion_keys[0].shared_secret, &onion_packet_4.data[..]); + let onion_packet_5 = ChannelManager::encrypt_failure_packet(&onion_keys[0].shared_secret[..], &onion_packet_4.data[..]); assert_eq!(onion_packet_5.data, hex::decode("9c5add3963fc7f6ed7f148623c84134b5647e1306419dbe2174e523fa9e2fbed3a06a19f899145610741c83ad40b7712aefaddec8c6baf7325d92ea4ca4d1df8bce517f7e54554608bf2bd8071a4f52a7a2f7ffbb1413edad81eeea5785aa9d990f2865dc23b4bc3c301a94eec4eabebca66be5cf638f693ec256aec514620cc28ee4a94bd9565bc4d4962b9d3641d4278fb319ed2b84de5b665f307a2db0f7fbb757366067d88c50f7e829138fde4f78d39b5b5802f1b92a8a820865af5cc79f9f30bc3f461c66af95d13e5e1f0381c184572a91dee1c849048a647a1158cf884064deddbf1b0b88dfe2f791428d0ba0f6fb2f04e14081f69165ae66d9297c118f0907705c9c4954a199bae0bb96fad763d690e7daa6cfda59ba7f2c8d11448b604d12d").unwrap()); } @@ -2924,6 +3382,7 @@ mod tests { chan_monitor: Arc, node: Arc, router: Router, + node_seed: [u8; 32], network_payment_count: Rc>, network_chan_count: Rc>, } @@ -3595,7 +4054,7 @@ mod tests { let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone())); let node = ChannelManager::new(0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone()).unwrap(); let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger)); - nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, + nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, node_seed: seed, network_payment_count: payment_count.clone(), network_chan_count: chan_count.clone(), }); @@ -6398,4 +6857,137 @@ mod tests { sign_msg!(unsigned_msg); assert!(nodes[0].router.handle_channel_announcement(&chan_announcement).is_err()); } + + struct VecWriter(Vec); + impl Writer for VecWriter { + fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { + self.0.extend_from_slice(buf); + Ok(()) + } + fn size_hint(&mut self, size: usize) { + self.0.reserve_exact(size); + } + } + + #[test] + fn test_simple_manager_serialize_deserialize() { + let mut nodes = create_network(2); + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (our_payment_preimage, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); + let (_, our_payment_hash) = route_payment(&nodes[0], &[&nodes[1]], 1000000); + + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + let nodes_0_serialized = nodes[0].node.encode(); + let mut chan_0_monitor_serialized = VecWriter(Vec::new()); + nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap(); + + nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone())); + let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; + let (_, chan_0_monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(chan_0_monitor_read.is_empty()); + + let mut nodes_0_read = &nodes_0_serialized[..]; + let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()))); + let (_, nodes_0_deserialized) = { + let mut channel_monitors = HashMap::new(); + channel_monitors.insert(chan_0_monitor.get_funding_txo().unwrap(), &chan_0_monitor); + <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + keys_manager, + fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }), + monitor: nodes[0].chan_monitor.clone(), + chain_monitor: nodes[0].chain_monitor.clone(), + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &channel_monitors, + }).unwrap() + }; + assert!(nodes_0_read.is_empty()); + + assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); + nodes[0].node = Arc::new(nodes_0_deserialized); + check_added_monitors!(nodes[0], 1); + + reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + + fail_payment(&nodes[0], &[&nodes[1]], our_payment_hash); + claim_payment(&nodes[0], &[&nodes[1]], our_payment_preimage); + } + + #[test] + fn test_manager_serialize_deserialize_inconsistent_monitor() { + // Test deserializing a ChannelManager with a out-of-date ChannelMonitor + let mut nodes = create_network(4); + create_announced_chan_between_nodes(&nodes, 0, 1); + create_announced_chan_between_nodes(&nodes, 2, 0); + let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3); + + let (our_payment_preimage, _) = route_payment(&nodes[2], &[&nodes[0], &nodes[1]], 1000000); + + // Serialize the ChannelManager here, but the monitor we keep up-to-date + let nodes_0_serialized = nodes[0].node.encode(); + + route_payment(&nodes[0], &[&nodes[3]], 1000000); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + nodes[3].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + // Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/ + // nodes[3]) + let mut node_0_monitors_serialized = Vec::new(); + for monitor in nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter() { + let mut writer = VecWriter(Vec::new()); + monitor.1.write_for_disk(&mut writer).unwrap(); + node_0_monitors_serialized.push(writer.0); + } + + nodes[0].chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone())); + let mut node_0_monitors = Vec::new(); + for serialized in node_0_monitors_serialized.iter() { + let mut read = &serialized[..]; + let (_, monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(read.is_empty()); + node_0_monitors.push(monitor); + } + + let mut nodes_0_read = &nodes_0_serialized[..]; + let keys_manager = Arc::new(keysinterface::KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()))); + let (_, nodes_0_deserialized) = <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + keys_manager, + fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }), + monitor: nodes[0].chan_monitor.clone(), + chain_monitor: nodes[0].chain_monitor.clone(), + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &node_0_monitors.iter().map(|monitor| { (monitor.get_funding_txo().unwrap(), monitor) }).collect(), + }).unwrap(); + assert!(nodes_0_read.is_empty()); + + { // Channel close should result in a commitment tx and an HTLC tx + let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert_eq!(txn.len(), 2); + assert_eq!(txn[0].input[0].previous_output.txid, funding_tx.txid()); + assert_eq!(txn[1].input[0].previous_output.txid, txn[0].txid()); + } + + for monitor in node_0_monitors.drain(..) { + assert!(nodes[0].chan_monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor).is_ok()); + check_added_monitors!(nodes[0], 1); + } + nodes[0].node = Arc::new(nodes_0_deserialized); + + // nodes[1] and nodes[2] have no lost state with nodes[0]... + reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + reconnect_nodes(&nodes[0], &nodes[2], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + //... and we can even still claim the payment! + claim_payment(&nodes[2], &[&nodes[0], &nodes[1]], our_payment_preimage); + + nodes[3].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish = get_event_msg!(nodes[3], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id()); + nodes[0].node.peer_connected(&nodes[3].node.get_our_node_id()); + if let Err(msgs::HandleError { action: Some(msgs::ErrorAction::SendErrorMessage { msg }), .. }) = nodes[0].node.handle_channel_reestablish(&nodes[3].node.get_our_node_id(), &reestablish) { + assert_eq!(msg.channel_id, channel_id); + } else { panic!("Unexpected result"); } + } } diff --git a/src/ln/channelmonitor.rs b/src/ln/channelmonitor.rs index 5adfe926..4972ae32 100644 --- a/src/ln/channelmonitor.rs +++ b/src/ln/channelmonitor.rs @@ -16,6 +16,8 @@ use bitcoin::blockdata::transaction::{TxIn,TxOut,SigHashType,Transaction}; use bitcoin::blockdata::transaction::OutPoint as BitcoinOutPoint; use bitcoin::blockdata::script::Script; use bitcoin::network::serialize; +use bitcoin::network::serialize::BitcoinHash; +use bitcoin::network::encodable::{ConsensusDecodable, ConsensusEncodable}; use bitcoin::util::hash::Sha256dHash; use bitcoin::util::bip143; @@ -31,7 +33,8 @@ use ln::chan_utils::HTLCOutputInCommitment; use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface}; use chain::transaction::OutPoint; use chain::keysinterface::SpendableOutputDescriptor; -use util::ser::{Readable, Writer}; +use util::logger::Logger; +use util::ser::{ReadableArgs, Readable, Writer, Writeable, WriterWriteAdaptor, U48}; use util::sha2::Sha256; use util::{byte_utils, events}; @@ -112,12 +115,13 @@ pub struct SimpleManyChannelMonitor { } impl ChainListener for SimpleManyChannelMonitor { - fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { + fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { + let block_hash = header.bitcoin_hash(); let mut new_events: Vec = Vec::with_capacity(0); { - let monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values() { - let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &*self.broadcaster); + let mut monitors = self.monitors.lock().unwrap(); + for monitor in monitors.values_mut() { + let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster); if spendable_outputs.len() > 0 { new_events.push(events::Event::SpendableOutputs { outputs: spendable_outputs, @@ -239,6 +243,7 @@ const MIN_SERIALIZATION_VERSION: u8 = 1; /// /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. +#[derive(Clone)] pub struct ChannelMonitor { funding_txo: Option<(OutPoint, Script)>, commitment_transaction_number_obscure_factor: u64, @@ -259,7 +264,7 @@ pub struct ChannelMonitor { /// spending. Thus, in order to claim them via revocation key, we track all the remote /// commitment transactions which we find on-chain, mapping them to the commitment number which /// can be used to derive the revocation key and claim the transactions. - remote_commitment_txn_on_chain: Mutex>, + remote_commitment_txn_on_chain: HashMap)>, /// Cache used to make pruning of payment_preimages faster. /// Maps payment_hash values to commitment numbers for remote transactions for non-revoked /// remote transactions (ie should remain pretty small). @@ -273,39 +278,22 @@ pub struct ChannelMonitor { prev_local_signed_commitment_tx: Option, current_local_signed_commitment_tx: Option, + // Used just for ChannelManager to make sure it has the latest channel data during + // deserialization + current_remote_commitment_number: u64, + payment_preimages: HashMap<[u8; 32], [u8; 32]>, destination_script: Script, - secp_ctx: Secp256k1, //TODO: dedup this a bit... -} -impl Clone for ChannelMonitor { - fn clone(&self) -> Self { - ChannelMonitor { - funding_txo: self.funding_txo.clone(), - commitment_transaction_number_obscure_factor: self.commitment_transaction_number_obscure_factor.clone(), - - key_storage: self.key_storage.clone(), - their_htlc_base_key: self.their_htlc_base_key.clone(), - their_delayed_payment_base_key: self.their_delayed_payment_base_key.clone(), - their_cur_revocation_points: self.their_cur_revocation_points.clone(), - - our_to_self_delay: self.our_to_self_delay, - their_to_self_delay: self.their_to_self_delay, - - old_secrets: self.old_secrets.clone(), - remote_claimable_outpoints: self.remote_claimable_outpoints.clone(), - remote_commitment_txn_on_chain: Mutex::new((*self.remote_commitment_txn_on_chain.lock().unwrap()).clone()), - remote_hash_commitment_number: self.remote_hash_commitment_number.clone(), - prev_local_signed_commitment_tx: self.prev_local_signed_commitment_tx.clone(), - current_local_signed_commitment_tx: self.current_local_signed_commitment_tx.clone(), - - payment_preimages: self.payment_preimages.clone(), - - destination_script: self.destination_script.clone(), - secp_ctx: self.secp_ctx.clone(), - } - } + // We simply modify last_block_hash in Channel's block_connected so that serialization is + // consistent but hopefully the users' copy handles block_connected in a consistent way. + // (we do *not*, however, update them in insert_combine to ensure any local user copies keep + // their last_block_hash from its state and not based on updated copies that didn't run through + // the full block_connected). + pub(crate) last_block_hash: Sha256dHash, + secp_ctx: Secp256k1, //TODO: dedup this a bit... + logger: Arc, } #[cfg(any(test, feature = "fuzztarget"))] @@ -322,8 +310,10 @@ impl PartialEq for ChannelMonitor { self.our_to_self_delay != other.our_to_self_delay || self.their_to_self_delay != other.their_to_self_delay || self.remote_claimable_outpoints != other.remote_claimable_outpoints || + self.remote_commitment_txn_on_chain != other.remote_commitment_txn_on_chain || self.remote_hash_commitment_number != other.remote_hash_commitment_number || self.prev_local_signed_commitment_tx != other.prev_local_signed_commitment_tx || + self.current_remote_commitment_number != other.current_remote_commitment_number || self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx || self.payment_preimages != other.payment_preimages || self.destination_script != other.destination_script @@ -335,15 +325,13 @@ impl PartialEq for ChannelMonitor { return false } } - let us = self.remote_commitment_txn_on_chain.lock().unwrap(); - let them = other.remote_commitment_txn_on_chain.lock().unwrap(); - *us == *them + true } } } impl ChannelMonitor { - pub(super) fn new(revocation_base_key: &SecretKey, delayed_payment_base_key: &SecretKey, htlc_base_key: &SecretKey, our_to_self_delay: u16, destination_script: Script) -> ChannelMonitor { + pub(super) fn new(revocation_base_key: &SecretKey, delayed_payment_base_key: &SecretKey, htlc_base_key: &SecretKey, our_to_self_delay: u16, destination_script: Script, logger: Arc) -> ChannelMonitor { ChannelMonitor { funding_txo: None, commitment_transaction_number_obscure_factor: 0, @@ -364,16 +352,19 @@ impl ChannelMonitor { old_secrets: [([0; 32], 1 << 48); 49], remote_claimable_outpoints: HashMap::new(), - remote_commitment_txn_on_chain: Mutex::new(HashMap::new()), + remote_commitment_txn_on_chain: HashMap::new(), remote_hash_commitment_number: HashMap::new(), prev_local_signed_commitment_tx: None, current_local_signed_commitment_tx: None, + current_remote_commitment_number: 1 << 48, payment_preimages: HashMap::new(), - destination_script: destination_script, + + last_block_hash: Default::default(), secp_ctx: Secp256k1::new(), + logger, } } @@ -486,6 +477,7 @@ impl ChannelMonitor { self.remote_hash_commitment_number.insert(htlc.payment_hash, commitment_number); } self.remote_claimable_outpoints.insert(unsigned_commitment_tx.txid(), htlc_outputs); + self.current_remote_commitment_number = commitment_number; } /// Informs this monitor of the latest local (ie broadcastable) commitment transaction. The @@ -543,6 +535,8 @@ impl ChannelMonitor { if our_min_secret > other_min_secret { self.provide_secret(other_min_secret, other.get_secret(other_min_secret).unwrap(), None)?; } + // TODO: We should use current_remote_commitment_number and the commitment number out of + // local transactions to decide how to merge if our_min_secret >= other_min_secret { self.their_cur_revocation_points = other.their_cur_revocation_points; for (txid, htlcs) in other.remote_claimable_outpoints.drain() { @@ -556,6 +550,7 @@ impl ChannelMonitor { } self.payment_preimages = other.payment_preimages; } + self.current_remote_commitment_number = cmp::min(self.current_remote_commitment_number, other.current_remote_commitment_number); Ok(()) } @@ -597,6 +592,20 @@ impl ChannelMonitor { } } + /// Gets the sets of all outpoints which this ChannelMonitor expects to hear about spends of. + /// Generally useful when deserializing as during normal operation the return values of + /// block_connected are sufficient to ensure all relevant outpoints are being monitored (note + /// that the get_funding_txo outpoint and transaction must also be monitored for!). + pub fn get_monitored_outpoints(&self) -> Vec<(Sha256dHash, u32, &Script)> { + let mut res = Vec::with_capacity(self.remote_commitment_txn_on_chain.len() * 2); + for (ref txid, &(_, ref outputs)) in self.remote_commitment_txn_on_chain.iter() { + for (idx, output) in outputs.iter().enumerate() { + res.push(((*txid).clone(), idx as u32, output)); + } + } + res + } + /// Serializes into a vec, with various modes for the exposed pub fns fn write(&self, writer: &mut W, for_local_storage: bool) -> Result<(), ::std::io::Error> { //TODO: We still write out all the serialization here manually instead of using the fancy @@ -608,8 +617,7 @@ impl ChannelMonitor { &Some((ref outpoint, ref script)) => { writer.write_all(&outpoint.txid[..])?; writer.write_all(&byte_utils::be16_to_array(outpoint.index))?; - writer.write_all(&byte_utils::be64_to_array(script.len() as u64))?; - writer.write_all(&script[..])?; + script.write(writer)?; }, &None => { // We haven't even been initialized...not sure why anyone is serializing us, but @@ -619,7 +627,7 @@ impl ChannelMonitor { } // Set in initial Channel-object creation, so should always be set by now: - writer.write_all(&byte_utils::be48_to_array(self.commitment_transaction_number_obscure_factor))?; + U48(self.commitment_transaction_number_obscure_factor).write(writer)?; match self.key_storage { KeyStorage::PrivMode { ref revocation_base_key, ref htlc_base_key, ref delayed_payment_base_key, ref prev_latest_per_commitment_point, ref latest_per_commitment_point } => { @@ -684,7 +692,7 @@ impl ChannelMonitor { } writer.write_all(&byte_utils::be64_to_array(self.remote_claimable_outpoints.len() as u64))?; - for (txid, htlc_outputs) in self.remote_claimable_outpoints.iter() { + for (ref txid, ref htlc_outputs) in self.remote_claimable_outpoints.iter() { writer.write_all(&txid[..])?; writer.write_all(&byte_utils::be64_to_array(htlc_outputs.len() as u64))?; for htlc_output in htlc_outputs.iter() { @@ -692,19 +700,20 @@ impl ChannelMonitor { } } - { - let remote_commitment_txn_on_chain = self.remote_commitment_txn_on_chain.lock().unwrap(); - writer.write_all(&byte_utils::be64_to_array(remote_commitment_txn_on_chain.len() as u64))?; - for (txid, commitment_number) in remote_commitment_txn_on_chain.iter() { - writer.write_all(&txid[..])?; - writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; + writer.write_all(&byte_utils::be64_to_array(self.remote_commitment_txn_on_chain.len() as u64))?; + for (ref txid, &(commitment_number, ref txouts)) in self.remote_commitment_txn_on_chain.iter() { + writer.write_all(&txid[..])?; + writer.write_all(&byte_utils::be48_to_array(commitment_number))?; + (txouts.len() as u64).write(writer)?; + for script in txouts.iter() { + script.write(writer)?; } } if for_local_storage { writer.write_all(&byte_utils::be64_to_array(self.remote_hash_commitment_number.len() as u64))?; - for (payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() { - writer.write_all(payment_hash)?; + for (ref payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() { + writer.write_all(*payment_hash)?; writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; } } else { @@ -713,9 +722,12 @@ impl ChannelMonitor { macro_rules! serialize_local_tx { ($local_tx: expr) => { - let tx_ser = serialize::serialize(&$local_tx.tx).unwrap(); - writer.write_all(&byte_utils::be64_to_array(tx_ser.len() as u64))?; - writer.write_all(&tx_ser)?; + if let Err(e) = $local_tx.tx.consensus_encode(&mut serialize::RawEncoder::new(WriterWriteAdaptor(writer))) { + match e { + serialize::Error::Io(e) => return Err(e), + _ => panic!("local tx must have been well-formed!"), + } + } writer.write_all(&$local_tx.revocation_key.serialize())?; writer.write_all(&$local_tx.a_htlc_key.serialize())?; @@ -746,23 +758,41 @@ impl ChannelMonitor { writer.write_all(&[0; 1])?; } + if for_local_storage { + writer.write_all(&byte_utils::be48_to_array(self.current_remote_commitment_number))?; + } else { + writer.write_all(&byte_utils::be48_to_array(0))?; + } + writer.write_all(&byte_utils::be64_to_array(self.payment_preimages.len() as u64))?; for payment_preimage in self.payment_preimages.values() { writer.write_all(payment_preimage)?; } - writer.write_all(&byte_utils::be64_to_array(self.destination_script.len() as u64))?; - writer.write_all(&self.destination_script[..])?; + self.last_block_hash.write(writer)?; + self.destination_script.write(writer)?; Ok(()) } /// Writes this monitor into the given writer, suitable for writing to disk. + /// + /// Note that the deserializer is only implemented for (Sha256dHash, ChannelMonitor), which + /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along + /// the "reorg path" (ie not just starting at the same height but starting at the highest + /// common block that appears on your best chain as well as on the chain which contains the + /// last block hash returned) upon deserializing the object! pub fn write_for_disk(&self, writer: &mut W) -> Result<(), ::std::io::Error> { self.write(writer, true) } /// Encodes this monitor into the given writer, suitable for sending to a remote watchtower + /// + /// Note that the deserializer is only implemented for (Sha256dHash, ChannelMonitor), which + /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along + /// the "reorg path" (ie not just starting at the same height but starting at the highest + /// common block that appears on your best chain as well as on the chain which contains the + /// last block hash returned) upon deserializing the object! pub fn write_for_watchtower(&self, writer: &mut W) -> Result<(), ::std::io::Error> { self.write(writer, false) } @@ -792,11 +822,21 @@ impl ChannelMonitor { min } + pub(super) fn get_cur_remote_commitment_number(&self) -> u64 { + self.current_remote_commitment_number + } + + pub(super) fn get_cur_local_commitment_number(&self) -> u64 { + if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx { + 0xffff_ffff_ffff - ((((local_tx.tx.input[0].sequence as u64 & 0xffffff) << 3*8) | (local_tx.tx.lock_time as u64 & 0xffffff)) ^ self.commitment_transaction_number_obscure_factor) + } else { 0xffff_ffff_ffff } + } + /// Attempts to claim a remote commitment transaction's outputs using the revocation key and /// data in remote_claimable_outpoints. Will directly claim any HTLC outputs which expire at a /// height > height + CLTV_SHARED_CLAIM_BUFFER. In any case, will install monitoring for /// HTLC-Success/HTLC-Timeout transactions. - fn check_spend_remote_transaction(&self, tx: &Transaction, height: u32) -> (Vec, (Sha256dHash, Vec), Vec) { + fn check_spend_remote_transaction(&mut self, tx: &Transaction, height: u32) -> (Vec, (Sha256dHash, Vec), Vec) { // Most secp and related errors trying to create keys means we have no hope of constructing // a spend transaction...so we return no transactions to broadcast let mut txn_to_broadcast = Vec::new(); @@ -936,7 +976,7 @@ impl ChannelMonitor { if !inputs.is_empty() || !txn_to_broadcast.is_empty() { // ie we're confident this is actually ours // We're definitely a remote commitment transaction! watch_outputs.append(&mut tx.output.clone()); - self.remote_commitment_txn_on_chain.lock().unwrap().insert(commitment_txid, commitment_number); + self.remote_commitment_txn_on_chain.insert(commitment_txid, (commitment_number, tx.output.iter().map(|output| { output.script_pubkey.clone() }).collect())); } if inputs.is_empty() { return (txn_to_broadcast, (commitment_txid, watch_outputs), spendable_outputs); } // Nothing to be done...probably a false positive/local tx @@ -973,7 +1013,7 @@ impl ChannelMonitor { // not being generated by the above conditional. Thus, to be safe, we go ahead and // insert it here. watch_outputs.append(&mut tx.output.clone()); - self.remote_commitment_txn_on_chain.lock().unwrap().insert(commitment_txid, commitment_number); + self.remote_commitment_txn_on_chain.insert(commitment_txid, (commitment_number, tx.output.iter().map(|output| { output.script_pubkey.clone() }).collect())); if let Some(revocation_points) = self.their_cur_revocation_points { let revocation_point_option = @@ -1276,7 +1316,24 @@ impl ChannelMonitor { (Vec::new(), Vec::new()) } - fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec)>, Vec) { + /// Used by ChannelManager deserialization to broadcast the latest local state if it's copy of + /// the Channel was out-of-date. + pub(super) fn get_latest_local_commitment_txn(&self) -> Vec { + if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx { + let mut res = vec![local_tx.tx.clone()]; + match self.key_storage { + KeyStorage::PrivMode { ref delayed_payment_base_key, ref prev_latest_per_commitment_point, .. } => { + res.append(&mut self.broadcast_by_local_state(local_tx, prev_latest_per_commitment_point, &Some(*delayed_payment_base_key)).0); + }, + _ => panic!("Can only broadcast by local channelmonitor"), + }; + res + } else { + Vec::new() + } + } + + fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec)>, Vec) { let mut watch_outputs = Vec::new(); let mut spendable_outputs = Vec::new(); for tx in txn_matched { @@ -1300,9 +1357,8 @@ impl ChannelMonitor { txn = remote_txn; } } else { - let remote_commitment_txn_on_chain = self.remote_commitment_txn_on_chain.lock().unwrap(); - if let Some(commitment_number) = remote_commitment_txn_on_chain.get(&prevout.txid) { - let (tx, spendable_output) = self.check_spend_remote_htlc(tx, *commitment_number); + if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) { + let (tx, spendable_output) = self.check_spend_remote_htlc(tx, commitment_number); if let Some(tx) = tx { txn.push(tx); } @@ -1337,6 +1393,7 @@ impl ChannelMonitor { } } } + self.last_block_hash = block_hash.clone(); (watch_outputs, spendable_outputs) } @@ -1373,27 +1430,10 @@ impl ChannelMonitor { } } -impl Readable for ChannelMonitor { - fn read(reader: &mut R) -> Result { - // TODO: read_to_end and then deserializing from that vector is really dumb, we should - // actually use the fancy serialization framework we have instead of hacking around it. - let mut datavec = Vec::new(); - reader.read_to_end(&mut datavec)?; - let data = &datavec; - - let mut read_pos = 0; - macro_rules! read_bytes { - ($byte_count: expr) => { - { - if ($byte_count as usize) > data.len() - read_pos { - return Err(DecodeError::ShortRead); - } - read_pos += $byte_count as usize; - &data[read_pos - $byte_count as usize..read_pos] - } - } - } +const MAX_ALLOC_SIZE: usize = 64*1024; +impl ReadableArgs> for (Sha256dHash, ChannelMonitor) { + fn read(reader: &mut R, logger: Arc) -> Result { let secp_ctx = Secp256k1::new(); macro_rules! unwrap_obj { ($key: expr) => { @@ -1404,8 +1444,8 @@ impl Readable for ChannelMonitor { } } - let _ver = read_bytes!(1)[0]; - let min_ver = read_bytes!(1)[0]; + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; if min_ver > SERIALIZATION_VERSION { return Err(DecodeError::UnknownVersion); } @@ -1413,31 +1453,26 @@ impl Readable for ChannelMonitor { // Technically this can fail and serialize fail a round-trip, but only for serialization of // barely-init'd ChannelMonitors that we can't do anything with. let outpoint = OutPoint { - txid: Sha256dHash::from(read_bytes!(32)), - index: byte_utils::slice_to_be16(read_bytes!(2)), + txid: Readable::read(reader)?, + index: Readable::read(reader)?, }; - let script_len = byte_utils::slice_to_be64(read_bytes!(8)); - let funding_txo = Some((outpoint, Script::from(read_bytes!(script_len).to_vec()))); - let commitment_transaction_number_obscure_factor = byte_utils::slice_to_be48(read_bytes!(6)); + let funding_txo = Some((outpoint, Readable::read(reader)?)); + let commitment_transaction_number_obscure_factor = >::read(reader)?.0; - let key_storage = match read_bytes!(1)[0] { + let key_storage = match >::read(reader)? { 0 => { - let revocation_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32))); - let htlc_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32))); - let delayed_payment_base_key = unwrap_obj!(SecretKey::from_slice(&secp_ctx, read_bytes!(32))); - let prev_latest_per_commitment_point = match read_bytes!(1)[0] { - 0 => None, - 1 => { - Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)))) - }, - _ => return Err(DecodeError::InvalidValue), + let revocation_base_key = Readable::read(reader)?; + let htlc_base_key = Readable::read(reader)?; + let delayed_payment_base_key = Readable::read(reader)?; + let prev_latest_per_commitment_point = match >::read(reader)? { + 0 => None, + 1 => Some(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), }; - let latest_per_commitment_point = match read_bytes!(1)[0] { - 0 => None, - 1 => { - Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)))) - }, - _ => return Err(DecodeError::InvalidValue), + let latest_per_commitment_point = match >::read(reader)? { + 0 => None, + 1 => Some(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), }; KeyStorage::PrivMode { revocation_base_key, @@ -1450,45 +1485,41 @@ impl Readable for ChannelMonitor { _ => return Err(DecodeError::InvalidValue), }; - let their_htlc_base_key = Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)))); - let their_delayed_payment_base_key = Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33)))); + let their_htlc_base_key = Some(Readable::read(reader)?); + let their_delayed_payment_base_key = Some(Readable::read(reader)?); let their_cur_revocation_points = { - let first_idx = byte_utils::slice_to_be48(read_bytes!(6)); + let first_idx = >::read(reader)?.0; if first_idx == 0 { None } else { - let first_point = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))); - let second_point_slice = read_bytes!(33); + let first_point = Readable::read(reader)?; + let second_point_slice: [u8; 33] = Readable::read(reader)?; if second_point_slice[0..32] == [0; 32] && second_point_slice[32] == 0 { Some((first_idx, first_point, None)) } else { - Some((first_idx, first_point, Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, second_point_slice))))) + Some((first_idx, first_point, Some(unwrap_obj!(PublicKey::from_slice(&secp_ctx, &second_point_slice))))) } } }; - let our_to_self_delay = byte_utils::slice_to_be16(read_bytes!(2)); - let their_to_self_delay = Some(byte_utils::slice_to_be16(read_bytes!(2))); + let our_to_self_delay: u16 = Readable::read(reader)?; + let their_to_self_delay: Option = Some(Readable::read(reader)?); let mut old_secrets = [([0; 32], 1 << 48); 49]; for &mut (ref mut secret, ref mut idx) in old_secrets.iter_mut() { - secret.copy_from_slice(read_bytes!(32)); - *idx = byte_utils::slice_to_be64(read_bytes!(8)); + *secret = Readable::read(reader)?; + *idx = Readable::read(reader)?; } macro_rules! read_htlc_in_commitment { () => { { - let offered = match read_bytes!(1)[0] { - 0 => false, 1 => true, - _ => return Err(DecodeError::InvalidValue), - }; - let amount_msat = byte_utils::slice_to_be64(read_bytes!(8)); - let cltv_expiry = byte_utils::slice_to_be32(read_bytes!(4)); - let mut payment_hash = [0; 32]; - payment_hash[..].copy_from_slice(read_bytes!(32)); - let transaction_output_index = byte_utils::slice_to_be32(read_bytes!(4)); + let offered: bool = Readable::read(reader)?; + let amount_msat: u64 = Readable::read(reader)?; + let cltv_expiry: u32 = Readable::read(reader)?; + let payment_hash: [u8; 32] = Readable::read(reader)?; + let transaction_output_index: u32 = Readable::read(reader)?; HTLCOutputInCommitment { offered, amount_msat, cltv_expiry, payment_hash, transaction_output_index @@ -1497,14 +1528,12 @@ impl Readable for ChannelMonitor { } } - let remote_claimable_outpoints_len = byte_utils::slice_to_be64(read_bytes!(8)); - if remote_claimable_outpoints_len > data.len() as u64 / 64 { return Err(DecodeError::BadLengthDescriptor); } - let mut remote_claimable_outpoints = HashMap::with_capacity(remote_claimable_outpoints_len as usize); + let remote_claimable_outpoints_len: u64 = Readable::read(reader)?; + let mut remote_claimable_outpoints = HashMap::with_capacity(cmp::min(remote_claimable_outpoints_len as usize, MAX_ALLOC_SIZE / 64)); for _ in 0..remote_claimable_outpoints_len { - let txid = Sha256dHash::from(read_bytes!(32)); - let outputs_count = byte_utils::slice_to_be64(read_bytes!(8)); - if outputs_count > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); } - let mut outputs = Vec::with_capacity(outputs_count as usize); + let txid: Sha256dHash = Readable::read(reader)?; + let outputs_count: u64 = Readable::read(reader)?; + let mut outputs = Vec::with_capacity(cmp::min(outputs_count as usize, MAX_ALLOC_SIZE / 32)); for _ in 0..outputs_count { outputs.push(read_htlc_in_commitment!()); } @@ -1513,24 +1542,26 @@ impl Readable for ChannelMonitor { } } - let remote_commitment_txn_on_chain_len = byte_utils::slice_to_be64(read_bytes!(8)); - if remote_commitment_txn_on_chain_len > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); } - let mut remote_commitment_txn_on_chain = HashMap::with_capacity(remote_commitment_txn_on_chain_len as usize); + let remote_commitment_txn_on_chain_len: u64 = Readable::read(reader)?; + let mut remote_commitment_txn_on_chain = HashMap::with_capacity(cmp::min(remote_commitment_txn_on_chain_len as usize, MAX_ALLOC_SIZE / 32)); for _ in 0..remote_commitment_txn_on_chain_len { - let txid = Sha256dHash::from(read_bytes!(32)); - let commitment_number = byte_utils::slice_to_be48(read_bytes!(6)); - if let Some(_) = remote_commitment_txn_on_chain.insert(txid, commitment_number) { + let txid: Sha256dHash = Readable::read(reader)?; + let commitment_number = >::read(reader)?.0; + let outputs_count = >::read(reader)?; + let mut outputs = Vec::with_capacity(cmp::min(outputs_count as usize, MAX_ALLOC_SIZE / 8)); + for _ in 0..outputs_count { + outputs.push(Readable::read(reader)?); + } + if let Some(_) = remote_commitment_txn_on_chain.insert(txid, (commitment_number, outputs)) { return Err(DecodeError::InvalidValue); } } - let remote_hash_commitment_number_len = byte_utils::slice_to_be64(read_bytes!(8)); - if remote_hash_commitment_number_len > data.len() as u64 / 32 { return Err(DecodeError::BadLengthDescriptor); } - let mut remote_hash_commitment_number = HashMap::with_capacity(remote_hash_commitment_number_len as usize); + let remote_hash_commitment_number_len: u64 = Readable::read(reader)?; + let mut remote_hash_commitment_number = HashMap::with_capacity(cmp::min(remote_hash_commitment_number_len as usize, MAX_ALLOC_SIZE / 32)); for _ in 0..remote_hash_commitment_number_len { - let mut txid = [0; 32]; - txid[..].copy_from_slice(read_bytes!(32)); - let commitment_number = byte_utils::slice_to_be48(read_bytes!(6)); + let txid: [u8; 32] = Readable::read(reader)?; + let commitment_number = >::read(reader)?.0; if let Some(_) = remote_hash_commitment_number.insert(txid, commitment_number) { return Err(DecodeError::InvalidValue); } @@ -1539,29 +1570,29 @@ impl Readable for ChannelMonitor { macro_rules! read_local_tx { () => { { - let tx_len = byte_utils::slice_to_be64(read_bytes!(8)); - let tx_ser = read_bytes!(tx_len); - let tx: Transaction = unwrap_obj!(serialize::deserialize(tx_ser)); - if serialize::serialize(&tx).unwrap() != tx_ser { - // We check that the tx re-serializes to the same form to ensure there is - // no extra data, and as rust-bitcoin doesn't handle the 0-input ambiguity - // all that well. + let tx = match Transaction::consensus_decode(&mut serialize::RawDecoder::new(reader.by_ref())) { + Ok(tx) => tx, + Err(e) => match e { + serialize::Error::Io(ioe) => return Err(DecodeError::Io(ioe)), + _ => return Err(DecodeError::InvalidValue), + }, + }; + + if tx.input.is_empty() { + // Ensure tx didn't hit the 0-input ambiguity case. return Err(DecodeError::InvalidValue); } - let revocation_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))); - let a_htlc_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))); - let b_htlc_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))); - let delayed_payment_key = unwrap_obj!(PublicKey::from_slice(&secp_ctx, read_bytes!(33))); - let feerate_per_kw = byte_utils::slice_to_be64(read_bytes!(8)); + let revocation_key = Readable::read(reader)?; + let a_htlc_key = Readable::read(reader)?; + let b_htlc_key = Readable::read(reader)?; + let delayed_payment_key = Readable::read(reader)?; + let feerate_per_kw: u64 = Readable::read(reader)?; - let htlc_outputs_len = byte_utils::slice_to_be64(read_bytes!(8)); - if htlc_outputs_len > data.len() as u64 / 128 { return Err(DecodeError::BadLengthDescriptor); } - let mut htlc_outputs = Vec::with_capacity(htlc_outputs_len as usize); + let htlc_outputs_len: u64 = Readable::read(reader)?; + let mut htlc_outputs = Vec::with_capacity(cmp::min(htlc_outputs_len as usize, MAX_ALLOC_SIZE / 128)); for _ in 0..htlc_outputs_len { - htlc_outputs.push((read_htlc_in_commitment!(), - unwrap_obj!(Signature::from_compact(&secp_ctx, read_bytes!(64))), - unwrap_obj!(Signature::from_compact(&secp_ctx, read_bytes!(64))))); + htlc_outputs.push((read_htlc_in_commitment!(), Readable::read(reader)?, Readable::read(reader)?)); } LocalSignedTx { @@ -1572,7 +1603,7 @@ impl Readable for ChannelMonitor { } } - let prev_local_signed_commitment_tx = match read_bytes!(1)[0] { + let prev_local_signed_commitment_tx = match >::read(reader)? { 0 => None, 1 => { Some(read_local_tx!()) @@ -1580,7 +1611,7 @@ impl Readable for ChannelMonitor { _ => return Err(DecodeError::InvalidValue), }; - let current_local_signed_commitment_tx = match read_bytes!(1)[0] { + let current_local_signed_commitment_tx = match >::read(reader)? { 0 => None, 1 => { Some(read_local_tx!()) @@ -1588,13 +1619,13 @@ impl Readable for ChannelMonitor { _ => return Err(DecodeError::InvalidValue), }; - let payment_preimages_len = byte_utils::slice_to_be64(read_bytes!(8)); - if payment_preimages_len > data.len() as u64 / 32 { return Err(DecodeError::InvalidValue); } - let mut payment_preimages = HashMap::with_capacity(payment_preimages_len as usize); + let current_remote_commitment_number = >::read(reader)?.0; + + let payment_preimages_len: u64 = Readable::read(reader)?; + let mut payment_preimages = HashMap::with_capacity(cmp::min(payment_preimages_len as usize, MAX_ALLOC_SIZE / 32)); let mut sha = Sha256::new(); for _ in 0..payment_preimages_len { - let mut preimage = [0; 32]; - preimage[..].copy_from_slice(read_bytes!(32)); + let preimage: [u8; 32] = Readable::read(reader)?; sha.reset(); sha.input(&preimage); let mut hash = [0; 32]; @@ -1604,10 +1635,10 @@ impl Readable for ChannelMonitor { } } - let destination_script_len = byte_utils::slice_to_be64(read_bytes!(8)); - let destination_script = Script::from(read_bytes!(destination_script_len).to_vec()); + let last_block_hash: Sha256dHash = Readable::read(reader)?; + let destination_script = Readable::read(reader)?; - Ok(ChannelMonitor { + Ok((last_block_hash.clone(), ChannelMonitor { funding_txo, commitment_transaction_number_obscure_factor, @@ -1621,17 +1652,20 @@ impl Readable for ChannelMonitor { old_secrets, remote_claimable_outpoints, - remote_commitment_txn_on_chain: Mutex::new(remote_commitment_txn_on_chain), + remote_commitment_txn_on_chain, remote_hash_commitment_number, prev_local_signed_commitment_tx, current_local_signed_commitment_tx, + current_remote_commitment_number, payment_preimages, destination_script, + last_block_hash, secp_ctx, - }) + logger, + })) } } @@ -1645,9 +1679,11 @@ mod tests { use ln::channelmonitor::ChannelMonitor; use ln::chan_utils::{HTLCOutputInCommitment, TxCreationKeys}; use util::sha2::Sha256; + use util::test_utils::TestLogger; use secp256k1::key::{SecretKey,PublicKey}; use secp256k1::{Secp256k1, Signature}; use rand::{thread_rng,Rng}; + use std::sync::Arc; #[test] fn test_per_commitment_storage() { @@ -1655,6 +1691,7 @@ mod tests { let mut secrets: Vec<[u8; 32]> = Vec::new(); let mut monitor: ChannelMonitor; let secp_ctx = Secp256k1::new(); + let logger = Arc::new(TestLogger::new()); macro_rules! test_secrets { () => { @@ -1670,7 +1707,7 @@ mod tests { { // insert_secret correct sequence - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1716,7 +1753,7 @@ mod tests { { // insert_secret #1 incorrect - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1732,7 +1769,7 @@ mod tests { { // insert_secret #2 incorrect (#1 derived from incorrect) - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1758,7 +1795,7 @@ mod tests { { // insert_secret #3 incorrect - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1784,7 +1821,7 @@ mod tests { { // insert_secret #4 incorrect (1,2,3 derived from incorrect) - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1830,7 +1867,7 @@ mod tests { { // insert_secret #5 incorrect - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1866,7 +1903,7 @@ mod tests { { // insert_secret #6 incorrect (5 derived from incorrect) - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1912,7 +1949,7 @@ mod tests { { // insert_secret #7 incorrect - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -1958,7 +1995,7 @@ mod tests { { // insert_secret #8 incorrect - monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); secrets.clear(); secrets.push([0; 32]); @@ -2006,6 +2043,7 @@ mod tests { #[test] fn test_prune_preimages() { let secp_ctx = Secp256k1::new(); + let logger = Arc::new(TestLogger::new()); let dummy_sig = Signature::from_der(&secp_ctx, &hex::decode("3045022100fa86fa9a36a8cd6a7bb8f06a541787d51371d067951a9461d5404de6b928782e02201c8b7c334c10aed8976a3a465be9a28abff4cb23acbf00022295b378ce1fa3cd").unwrap()[..]).unwrap(); macro_rules! dummy_keys { @@ -2076,7 +2114,7 @@ mod tests { // Prune with one old state and a local commitment tx holding a few overlaps with the // old state. - let mut monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new()); + let mut monitor = ChannelMonitor::new(&SecretKey::from_slice(&secp_ctx, &[42; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[43; 32]).unwrap(), &SecretKey::from_slice(&secp_ctx, &[44; 32]).unwrap(), 0, Script::new(), logger.clone()); monitor.set_their_to_self_delay(10); monitor.provide_latest_local_commitment_tx_info(dummy_tx.clone(), dummy_keys!(), 0, preimages_to_local_htlcs!(preimages[0..10])); diff --git a/src/ln/router.rs b/src/ln/router.rs index 0d049f5a..9fe2cfb5 100644 --- a/src/ln/router.rs +++ b/src/ln/router.rs @@ -13,9 +13,9 @@ use bitcoin::blockdata::opcodes; use chain::chaininterface::{ChainError, ChainWatchInterface}; use ln::channelmanager; -use ln::msgs::{ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures}; +use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures}; use ln::msgs; -use util::ser::Writeable; +use util::ser::{Writeable, Readable}; use util::logger::Logger; use std::cmp; @@ -47,6 +47,37 @@ pub struct Route { pub hops: Vec, } +impl Writeable for Route { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + (self.hops.len() as u8).write(writer)?; + for hop in self.hops.iter() { + hop.pubkey.write(writer)?; + hop.short_channel_id.write(writer)?; + hop.fee_msat.write(writer)?; + hop.cltv_expiry_delta.write(writer)?; + } + Ok(()) + } +} + +impl Readable for Route { + fn read(reader: &mut R) -> Result { + let hops_count: u8 = Readable::read(reader)?; + let mut hops = Vec::with_capacity(hops_count as usize); + for _ in 0..hops_count { + hops.push(RouteHop { + pubkey: Readable::read(reader)?, + short_channel_id: Readable::read(reader)?, + fee_msat: Readable::read(reader)?, + cltv_expiry_delta: Readable::read(reader)?, + }); + } + Ok(Route { + hops + }) + } +} + struct DirectionalChannelInfo { src_node_id: PublicKey, last_update: u32, diff --git a/src/util/ser.rs b/src/util/ser.rs index 92cc66b8..84cfa8d1 100644 --- a/src/util/ser.rs +++ b/src/util/ser.rs @@ -2,19 +2,19 @@ //! as ChannelsManagers and ChannelMonitors. use std::result::Result; -use std::io::Read; +use std::io::{Read, Write}; use std::collections::HashMap; use std::hash::Hash; use secp256k1::{Secp256k1, Signature}; -use secp256k1::key::PublicKey; +use secp256k1::key::{PublicKey, SecretKey}; use bitcoin::util::hash::Sha256dHash; use bitcoin::blockdata::script::Script; use std::marker::Sized; use ln::msgs::DecodeError; use util::byte_utils; -use util::byte_utils::{be64_to_array, be32_to_array, be16_to_array, slice_to_be16, slice_to_be32, slice_to_be64}; +use util::byte_utils::{be64_to_array, be48_to_array, be32_to_array, be16_to_array, slice_to_be16, slice_to_be32, slice_to_be48, slice_to_be64}; const MAX_BUF_SIZE: usize = 64 * 1024; @@ -30,7 +30,7 @@ pub trait Writer { fn size_hint(&mut self, size: usize); } -impl Writer for W { +impl Writer for W { #[inline] fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { ::write_all(self, buf) @@ -39,6 +39,20 @@ impl Writer for W { fn size_hint(&mut self, _size: usize) { } } +pub(crate) struct WriterWriteAdaptor<'a, W: Writer + 'a>(pub &'a mut W); +impl<'a, W: Writer + 'a> Write for WriterWriteAdaptor<'a, W> { + fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { + self.0.write_all(buf) + } + fn write(&mut self, buf: &[u8]) -> Result { + self.0.write_all(buf)?; + Ok(buf.len()) + } + fn flush(&mut self) -> Result<(), ::std::io::Error> { + Ok(()) + } +} + struct VecWriter(Vec); impl Writer for VecWriter { fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { @@ -82,6 +96,32 @@ pub trait Readable fn read(reader: &mut R) -> Result; } +/// A trait that various higher-level rust-lightning types implement allowing them to be read in +/// from a Read given some additional set of arguments which is required to deserialize. +pub trait ReadableArgs + where Self: Sized, + R: Read +{ + /// Reads a Self in from the given Read + fn read(reader: &mut R, params: P) -> Result; +} + +pub(crate) struct U48(pub u64); +impl Writeable for U48 { + #[inline] + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + writer.write_all(&be48_to_array(self.0)) + } +} +impl Readable for U48 { + #[inline] + fn read(reader: &mut R) -> Result { + let mut buf = [0; 6]; + reader.read_exact(&mut buf)?; + Ok(U48(slice_to_be48(&buf))) + } +} + macro_rules! impl_writeable_primitive { ($val_type:ty, $meth_write:ident, $len: expr, $meth_read:ident) => { impl Writeable for $val_type { @@ -300,6 +340,24 @@ impl Readable for PublicKey { } } +impl Writeable for SecretKey { + fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + let mut ser = [0; 32]; + ser.copy_from_slice(&self[..]); + ser.write(w) + } +} + +impl Readable for SecretKey { + fn read(r: &mut R) -> Result { + let buf: [u8; 32] = Readable::read(r)?; + match SecretKey::from_slice(&Secp256k1::without_caps(), &buf) { + Ok(key) => Ok(key), + Err(_) => return Err(DecodeError::InvalidValue), + } + } +} + impl Writeable for Sha256dHash { fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { self.as_bytes().write(w) diff --git a/src/util/ser_macros.rs b/src/util/ser_macros.rs index 09a319fb..48e87b3b 100644 --- a/src/util/ser_macros.rs +++ b/src/util/ser_macros.rs @@ -1,17 +1,19 @@ macro_rules! impl_writeable { ($st:ident, $len: expr, {$($field:ident),*}) => { - impl Writeable for $st { - fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { - w.size_hint($len); + impl ::util::ser::Writeable for $st { + fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + if $len != 0 { + w.size_hint($len); + } $( self.$field.write(w)?; )* Ok(()) } } - impl Readable for $st { - fn read(r: &mut R) -> Result { + impl ::util::ser::Readable for $st { + fn read(r: &mut R) -> Result { Ok(Self { - $($field: Readable::read(r)?),* + $($field: ::util::ser::Readable::read(r)?),* }) } } @@ -29,7 +31,7 @@ macro_rules! impl_writeable_len_match { } } - impl Readable for $st { + impl Readable for $st { fn read(r: &mut R) -> Result { Ok(Self { $($field: Readable::read(r)?),* diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index 4fa29fd8..0eb49702 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -6,9 +6,10 @@ use ln::msgs; use ln::msgs::{HandleError}; use util::events; use util::logger::{Logger, Level, Record}; -use util::ser::{Readable, Writer}; +use util::ser::{ReadableArgs, Writer}; use bitcoin::blockdata::transaction::Transaction; +use bitcoin::util::hash::Sha256dHash; use secp256k1::PublicKey; @@ -55,7 +56,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor { // to a watchtower and disk... let mut w = VecWriter(Vec::new()); monitor.write_for_disk(&mut w).unwrap(); - assert!(channelmonitor::ChannelMonitor::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == monitor); + assert!(<(Sha256dHash, channelmonitor::ChannelMonitor)>::read( + &mut ::std::io::Cursor::new(&w.0), Arc::new(TestLogger::new())).unwrap().1 == monitor); w.0.clear(); monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash... self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone()));