From a2fb3ccc08f27634b9e27c612c52f591f7bcaf84 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 26 Oct 2018 14:35:50 -0400 Subject: [PATCH] Implement and document Channel/ChannelManager (de)serialization --- src/chain/keysinterface.rs | 9 + src/ln/channel.rs | 511 ++++++++++++++++++++++++++++++++++++- src/ln/channelmanager.rs | 421 +++++++++++++++++++++++++++++- src/ln/channelmonitor.rs | 10 +- src/util/ser_macros.rs | 10 +- 5 files changed, 935 insertions(+), 26 deletions(-) diff --git a/src/chain/keysinterface.rs b/src/chain/keysinterface.rs index b3823e215..18b069369 100644 --- a/src/chain/keysinterface.rs +++ b/src/chain/keysinterface.rs @@ -78,6 +78,15 @@ pub struct ChannelKeys { pub commitment_seed: [u8; 32], } +impl_writeable!(ChannelKeys, 0, { + funding_key, + revocation_base_key, + payment_base_key, + delayed_payment_base_key, + htlc_base_key, + commitment_seed +}); + impl ChannelKeys { /// Generate a set of lightning keys needed to operate a channel by HKDF-expanding a given /// random 32-byte seed diff --git a/src/ln/channel.rs b/src/ln/channel.rs index d4b30f0c4..2561fc279 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -4,7 +4,9 @@ use bitcoin::blockdata::transaction::{TxIn, TxOut, Transaction, SigHashType}; use bitcoin::blockdata::opcodes; use bitcoin::util::hash::{Sha256dHash, Hash160}; use bitcoin::util::bip143; -use bitcoin::network::serialize::BitcoinHash; +use bitcoin::network; +use bitcoin::network::serialize::{BitcoinHash, RawDecoder, RawEncoder}; +use bitcoin::network::encodable::{ConsensusEncodable, ConsensusDecodable}; use secp256k1::key::{PublicKey,SecretKey}; use secp256k1::{Secp256k1,Message,Signature}; @@ -13,7 +15,7 @@ use secp256k1; use crypto::digest::Digest; use ln::msgs; -use ln::msgs::{ErrorAction, HandleError}; +use ln::msgs::{DecodeError, ErrorAction, HandleError}; use ln::channelmonitor::ChannelMonitor; use ln::channelmanager::{PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingForwardHTLCInfo, RAACommitmentOrder}; use ln::chan_utils::{TxCreationKeys,HTLCOutputInCommitment,HTLC_SUCCESS_TX_WEIGHT,HTLC_TIMEOUT_TX_WEIGHT}; @@ -22,7 +24,7 @@ use chain::chaininterface::{FeeEstimator,ConfirmationTarget}; use chain::transaction::OutPoint; use chain::keysinterface::{ChannelKeys, KeysInterface}; use util::{transaction_utils,rng}; -use util::ser::Writeable; +use util::ser::{Readable, ReadableArgs, Writeable, Writer, WriterWriteAdaptor}; use util::sha2::Sha256; use util::logger::Logger; use util::errors::APIError; @@ -306,8 +308,9 @@ pub(super) struct Channel { /// could miss the funding_tx_confirmed_in block as well, but it serves as a useful fallback. funding_tx_confirmed_in: Option, short_channel_id: Option, - /// Used to deduplicate block_connected callbacks - last_block_connected: Sha256dHash, + /// Used to deduplicate block_connected callbacks, also used to verify consistency during + /// ChannelManager deserialization (hence pub(super)) + pub(super) last_block_connected: Sha256dHash, funding_tx_confirmations: u64, their_dust_limit_satoshis: u64, @@ -2683,10 +2686,10 @@ impl Channel { /// Only returns an ErrorAction of DisconnectPeer, if Err. pub fn block_connected(&mut self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> Result, HandleError> { let non_shutdown_state = self.channel_state & (!MULTI_STATE_FLAGS); - if self.funding_tx_confirmations > 0 { - if header.bitcoin_hash() != self.last_block_connected { - self.last_block_connected = header.bitcoin_hash(); - self.channel_monitor.last_block_hash = self.last_block_connected; + if header.bitcoin_hash() != self.last_block_connected { + self.last_block_connected = header.bitcoin_hash(); + self.channel_monitor.last_block_hash = self.last_block_connected; + if self.funding_tx_confirmations > 0 { self.funding_tx_confirmations += 1; if self.funding_tx_confirmations == Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 { let need_commitment_update = if non_shutdown_state == ChannelState::FundingSent as u32 { @@ -2767,6 +2770,8 @@ impl Channel { if Some(header.bitcoin_hash()) == self.funding_tx_confirmed_in { self.funding_tx_confirmations = Channel::derive_minimum_depth(self.channel_value_satoshis*1000, self.value_to_self_msat) as u64 - 1; } + self.last_block_connected = header.bitcoin_hash(); + self.channel_monitor.last_block_hash = self.last_block_connected; false } @@ -3240,6 +3245,494 @@ impl Channel { } } +const SERIALIZATION_VERSION: u8 = 1; +const MIN_SERIALIZATION_VERSION: u8 = 1; + +impl Writeable for InboundHTLCRemovalReason { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &InboundHTLCRemovalReason::FailRelay(ref error_packet) => { + 0u8.write(writer)?; + error_packet.write(writer)?; + }, + &InboundHTLCRemovalReason::FailMalformed((ref onion_hash, ref err_code)) => { + 1u8.write(writer)?; + onion_hash.write(writer)?; + err_code.write(writer)?; + }, + &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => { + 2u8.write(writer)?; + payment_preimage.write(writer)?; + }, + } + Ok(()) + } +} + +impl Readable for InboundHTLCRemovalReason { + fn read(reader: &mut R) -> Result { + Ok(match >::read(reader)? { + 0 => InboundHTLCRemovalReason::FailRelay(Readable::read(reader)?), + 1 => InboundHTLCRemovalReason::FailMalformed((Readable::read(reader)?, Readable::read(reader)?)), + 2 => InboundHTLCRemovalReason::Fulfill(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }) + } +} + +impl Writeable for Channel { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been + // called but include holding cell updates (and obviously we don't modify self). + + writer.write_all(&[SERIALIZATION_VERSION; 1])?; + writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + + self.user_id.write(writer)?; + + self.channel_id.write(writer)?; + (self.channel_state | ChannelState::PeerDisconnected as u32).write(writer)?; + self.channel_outbound.write(writer)?; + self.announce_publicly.write(writer)?; + self.channel_value_satoshis.write(writer)?; + + self.local_keys.write(writer)?; + self.shutdown_pubkey.write(writer)?; + + self.cur_local_commitment_transaction_number.write(writer)?; + self.cur_remote_commitment_transaction_number.write(writer)?; + self.value_to_self_msat.write(writer)?; + + self.received_commitment_while_awaiting_raa.write(writer)?; + + let mut dropped_inbound_htlcs = 0; + for htlc in self.pending_inbound_htlcs.iter() { + if let InboundHTLCState::RemoteAnnounced(_) = htlc.state { + dropped_inbound_htlcs += 1; + } + } + (self.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?; + for htlc in self.pending_inbound_htlcs.iter() { + htlc.htlc_id.write(writer)?; + htlc.amount_msat.write(writer)?; + htlc.cltv_expiry.write(writer)?; + htlc.payment_hash.write(writer)?; + match &htlc.state { + &InboundHTLCState::RemoteAnnounced(_) => {}, // Drop + &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_state) => { + 1u8.write(writer)?; + htlc_state.write(writer)?; + }, + &InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_state) => { + 2u8.write(writer)?; + htlc_state.write(writer)?; + }, + &InboundHTLCState::Committed => { + 3u8.write(writer)?; + }, + &InboundHTLCState::LocalRemoved(ref removal_reason) => { + 4u8.write(writer)?; + removal_reason.write(writer)?; + }, + } + } + + macro_rules! write_option { + ($thing: expr) => { + match &$thing { + &None => 0u8.write(writer)?, + &Some(ref v) => { + 1u8.write(writer)?; + v.write(writer)?; + }, + } + } + } + + (self.pending_outbound_htlcs.len() as u64).write(writer)?; + for htlc in self.pending_outbound_htlcs.iter() { + htlc.htlc_id.write(writer)?; + htlc.amount_msat.write(writer)?; + htlc.cltv_expiry.write(writer)?; + htlc.payment_hash.write(writer)?; + htlc.source.write(writer)?; + write_option!(htlc.fail_reason); + match &htlc.state { + &OutboundHTLCState::LocalAnnounced(ref onion_packet) => { + 0u8.write(writer)?; + onion_packet.write(writer)?; + }, + &OutboundHTLCState::Committed => { + 1u8.write(writer)?; + }, + &OutboundHTLCState::RemoteRemoved => { + 2u8.write(writer)?; + }, + &OutboundHTLCState::AwaitingRemoteRevokeToRemove => { + 3u8.write(writer)?; + }, + &OutboundHTLCState::AwaitingRemovedRemoteRevoke => { + 4u8.write(writer)?; + }, + } + } + + (self.holding_cell_htlc_updates.len() as u64).write(writer)?; + for update in self.holding_cell_htlc_updates.iter() { + match update { + &HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, time_created: _ } => { + 0u8.write(writer)?; + amount_msat.write(writer)?; + cltv_expiry.write(writer)?; + payment_hash.write(writer)?; + source.write(writer)?; + onion_routing_packet.write(writer)?; + // time_created is not serialized - we re-init the timeout upon deserialization + }, + &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => { + 1u8.write(writer)?; + payment_preimage.write(writer)?; + htlc_id.write(writer)?; + }, + &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => { + 2u8.write(writer)?; + htlc_id.write(writer)?; + err_packet.write(writer)?; + } + } + } + + self.monitor_pending_revoke_and_ack.write(writer)?; + self.monitor_pending_commitment_signed.write(writer)?; + match self.monitor_pending_order { + None => 0u8.write(writer)?, + Some(RAACommitmentOrder::CommitmentFirst) => 1u8.write(writer)?, + Some(RAACommitmentOrder::RevokeAndACKFirst) => 2u8.write(writer)?, + } + + (self.monitor_pending_forwards.len() as u64).write(writer)?; + for &(ref pending_forward, ref htlc_id) in self.monitor_pending_forwards.iter() { + pending_forward.write(writer)?; + htlc_id.write(writer)?; + } + + (self.monitor_pending_failures.len() as u64).write(writer)?; + for &(ref htlc_source, ref payment_hash, ref fail_reason) in self.monitor_pending_failures.iter() { + htlc_source.write(writer)?; + payment_hash.write(writer)?; + fail_reason.write(writer)?; + } + + write_option!(self.pending_update_fee); + write_option!(self.holding_cell_update_fee); + + self.next_local_htlc_id.write(writer)?; + (self.next_remote_htlc_id - dropped_inbound_htlcs).write(writer)?; + self.channel_update_count.write(writer)?; + self.feerate_per_kw.write(writer)?; + + (self.last_local_commitment_txn.len() as u64).write(writer)?; + for tx in self.last_local_commitment_txn.iter() { + if let Err(e) = tx.consensus_encode(&mut RawEncoder::new(WriterWriteAdaptor(writer))) { + match e { + network::serialize::Error::Io(e) => return Err(e), + _ => panic!("last_local_commitment_txn must have been well-formed!"), + } + } + } + + match self.last_sent_closing_fee { + Some((feerate, fee)) => { + 1u8.write(writer)?; + feerate.write(writer)?; + fee.write(writer)?; + }, + None => 0u8.write(writer)?, + } + + write_option!(self.funding_tx_confirmed_in); + write_option!(self.short_channel_id); + + self.last_block_connected.write(writer)?; + self.funding_tx_confirmations.write(writer)?; + + self.their_dust_limit_satoshis.write(writer)?; + self.our_dust_limit_satoshis.write(writer)?; + self.their_max_htlc_value_in_flight_msat.write(writer)?; + self.their_channel_reserve_satoshis.write(writer)?; + self.their_htlc_minimum_msat.write(writer)?; + self.our_htlc_minimum_msat.write(writer)?; + self.their_to_self_delay.write(writer)?; + self.their_max_accepted_htlcs.write(writer)?; + + write_option!(self.their_funding_pubkey); + write_option!(self.their_revocation_basepoint); + write_option!(self.their_payment_basepoint); + write_option!(self.their_delayed_payment_basepoint); + write_option!(self.their_htlc_basepoint); + write_option!(self.their_cur_commitment_point); + + write_option!(self.their_prev_commitment_point); + self.their_node_id.write(writer)?; + + write_option!(self.their_shutdown_scriptpubkey); + + self.channel_monitor.write_for_disk(writer)?; + Ok(()) + } +} + +impl ReadableArgs> for Channel { + fn read(reader: &mut R, logger: Arc) -> Result { + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; + if min_ver > SERIALIZATION_VERSION { + return Err(DecodeError::UnknownVersion); + } + + let user_id = Readable::read(reader)?; + + let channel_id = Readable::read(reader)?; + let channel_state = Readable::read(reader)?; + let channel_outbound = Readable::read(reader)?; + let announce_publicly = Readable::read(reader)?; + let channel_value_satoshis = Readable::read(reader)?; + + let local_keys = Readable::read(reader)?; + let shutdown_pubkey = Readable::read(reader)?; + + let cur_local_commitment_transaction_number = Readable::read(reader)?; + let cur_remote_commitment_transaction_number = Readable::read(reader)?; + let value_to_self_msat = Readable::read(reader)?; + + let received_commitment_while_awaiting_raa = Readable::read(reader)?; + + let pending_inbound_htlc_count: u64 = Readable::read(reader)?; + let mut pending_inbound_htlcs = Vec::with_capacity(cmp::min(pending_inbound_htlc_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..pending_inbound_htlc_count { + pending_inbound_htlcs.push(InboundHTLCOutput { + htlc_id: Readable::read(reader)?, + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + state: match >::read(reader)? { + 1 => InboundHTLCState::AwaitingRemoteRevokeToAnnounce(Readable::read(reader)?), + 2 => InboundHTLCState::AwaitingAnnouncedRemoteRevoke(Readable::read(reader)?), + 3 => InboundHTLCState::Committed, + 4 => InboundHTLCState::LocalRemoved(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }, + }); + } + + macro_rules! read_option { () => { + match >::read(reader)? { + 0 => None, + 1 => Some(Readable::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + } + } } + + let pending_outbound_htlc_count: u64 = Readable::read(reader)?; + let mut pending_outbound_htlcs = Vec::with_capacity(cmp::min(pending_outbound_htlc_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..pending_outbound_htlc_count { + pending_outbound_htlcs.push(OutboundHTLCOutput { + htlc_id: Readable::read(reader)?, + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + source: Readable::read(reader)?, + fail_reason: read_option!(), + state: match >::read(reader)? { + 0 => OutboundHTLCState::LocalAnnounced(Box::new(Readable::read(reader)?)), + 1 => OutboundHTLCState::Committed, + 2 => OutboundHTLCState::RemoteRemoved, + 3 => OutboundHTLCState::AwaitingRemoteRevokeToRemove, + 4 => OutboundHTLCState::AwaitingRemovedRemoteRevoke, + _ => return Err(DecodeError::InvalidValue), + }, + }); + } + + let holding_cell_htlc_update_count: u64 = Readable::read(reader)?; + let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2)); + for _ in 0..holding_cell_htlc_update_count { + holding_cell_htlc_updates.push(match >::read(reader)? { + 0 => HTLCUpdateAwaitingACK::AddHTLC { + amount_msat: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + source: Readable::read(reader)?, + onion_routing_packet: Readable::read(reader)?, + time_created: Instant::now(), + }, + 1 => HTLCUpdateAwaitingACK::ClaimHTLC { + payment_preimage: Readable::read(reader)?, + htlc_id: Readable::read(reader)?, + }, + 2 => HTLCUpdateAwaitingACK::FailHTLC { + htlc_id: Readable::read(reader)?, + err_packet: Readable::read(reader)?, + }, + _ => return Err(DecodeError::InvalidValue), + }); + } + + let monitor_pending_revoke_and_ack = Readable::read(reader)?; + let monitor_pending_commitment_signed = Readable::read(reader)?; + + let monitor_pending_order = match >::read(reader)? { + 0 => None, + 1 => Some(RAACommitmentOrder::CommitmentFirst), + 2 => Some(RAACommitmentOrder::RevokeAndACKFirst), + _ => return Err(DecodeError::InvalidValue), + }; + + let monitor_pending_forwards_count: u64 = Readable::read(reader)?; + let mut monitor_pending_forwards = Vec::with_capacity(cmp::min(monitor_pending_forwards_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..monitor_pending_forwards_count { + monitor_pending_forwards.push((Readable::read(reader)?, Readable::read(reader)?)); + } + + let monitor_pending_failures_count: u64 = Readable::read(reader)?; + let mut monitor_pending_failures = Vec::with_capacity(cmp::min(monitor_pending_failures_count as usize, OUR_MAX_HTLCS as usize)); + for _ in 0..monitor_pending_failures_count { + monitor_pending_failures.push((Readable::read(reader)?, Readable::read(reader)?, Readable::read(reader)?)); + } + + let pending_update_fee = read_option!(); + let holding_cell_update_fee = read_option!(); + + let next_local_htlc_id = Readable::read(reader)?; + let next_remote_htlc_id = Readable::read(reader)?; + let channel_update_count = Readable::read(reader)?; + let feerate_per_kw = Readable::read(reader)?; + + let last_local_commitment_txn_count: u64 = Readable::read(reader)?; + let mut last_local_commitment_txn = Vec::with_capacity(cmp::min(last_local_commitment_txn_count as usize, OUR_MAX_HTLCS as usize*2 + 1)); + for _ in 0..last_local_commitment_txn_count { + last_local_commitment_txn.push(match Transaction::consensus_decode(&mut RawDecoder::new(reader.by_ref())) { + Ok(tx) => tx, + Err(_) => return Err(DecodeError::InvalidValue), + }); + } + + let last_sent_closing_fee = match >::read(reader)? { + 0 => None, + 1 => Some((Readable::read(reader)?, Readable::read(reader)?)), + _ => return Err(DecodeError::InvalidValue), + }; + + let funding_tx_confirmed_in = read_option!(); + let short_channel_id = read_option!(); + + let last_block_connected = Readable::read(reader)?; + let funding_tx_confirmations = Readable::read(reader)?; + + let their_dust_limit_satoshis = Readable::read(reader)?; + let our_dust_limit_satoshis = Readable::read(reader)?; + let their_max_htlc_value_in_flight_msat = Readable::read(reader)?; + let their_channel_reserve_satoshis = Readable::read(reader)?; + let their_htlc_minimum_msat = Readable::read(reader)?; + let our_htlc_minimum_msat = Readable::read(reader)?; + let their_to_self_delay = Readable::read(reader)?; + let their_max_accepted_htlcs = Readable::read(reader)?; + + let their_funding_pubkey = read_option!(); + let their_revocation_basepoint = read_option!(); + let their_payment_basepoint = read_option!(); + let their_delayed_payment_basepoint = read_option!(); + let their_htlc_basepoint = read_option!(); + let their_cur_commitment_point = read_option!(); + + let their_prev_commitment_point = read_option!(); + let their_node_id = Readable::read(reader)?; + + let their_shutdown_scriptpubkey = read_option!(); + let (monitor_last_block, channel_monitor) = ReadableArgs::read(reader, logger.clone())?; + // We drop the ChannelMonitor's last block connected hash cause we don't actually bother + // doing full block connection operations on the internal CHannelMonitor copies + if monitor_last_block != last_block_connected { + return Err(DecodeError::InvalidValue); + } + + Ok(Channel { + user_id, + + channel_id, + channel_state, + channel_outbound, + secp_ctx: Secp256k1::new(), + announce_publicly, + channel_value_satoshis, + + local_keys, + shutdown_pubkey, + + cur_local_commitment_transaction_number, + cur_remote_commitment_transaction_number, + value_to_self_msat, + + received_commitment_while_awaiting_raa, + pending_inbound_htlcs, + pending_outbound_htlcs, + holding_cell_htlc_updates, + + monitor_pending_revoke_and_ack, + monitor_pending_commitment_signed, + monitor_pending_order, + monitor_pending_forwards, + monitor_pending_failures, + + pending_update_fee, + holding_cell_update_fee, + next_local_htlc_id, + next_remote_htlc_id, + channel_update_count, + feerate_per_kw, + + #[cfg(debug_assertions)] + max_commitment_tx_output_local: ::std::sync::Mutex::new((0, 0)), + #[cfg(debug_assertions)] + max_commitment_tx_output_remote: ::std::sync::Mutex::new((0, 0)), + + last_local_commitment_txn, + + last_sent_closing_fee, + + funding_tx_confirmed_in, + short_channel_id, + last_block_connected, + funding_tx_confirmations, + + their_dust_limit_satoshis, + our_dust_limit_satoshis, + their_max_htlc_value_in_flight_msat, + their_channel_reserve_satoshis, + their_htlc_minimum_msat, + our_htlc_minimum_msat, + their_to_self_delay, + their_max_accepted_htlcs, + + their_funding_pubkey, + their_revocation_basepoint, + their_payment_basepoint, + their_delayed_payment_basepoint, + their_htlc_basepoint, + their_cur_commitment_point, + + their_prev_commitment_point, + their_node_id, + + their_shutdown_scriptpubkey, + + channel_monitor, + + logger, + }) + } +} + #[cfg(test)] mod tests { use bitcoin::util::hash::{Sha256dHash, Hash160}; diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index f0fd7ad2d..c8122ca83 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -23,14 +23,14 @@ use secp256k1; use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; -use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; +use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route,RouteHop}; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, HandleError}; +use ln::msgs::{ChannelMessageHandler, DecodeError, HandleError}; use chain::keysinterface::KeysInterface; use util::{byte_utils, events, internal_traits, rng}; use util::sha2::Sha256; -use util::ser::{Readable, Writeable}; +use util::ser::{Readable, ReadableArgs, Writeable, Writer}; use util::chacha20poly1305rfc::ChaCha20; use util::logger::Logger; use util::errors::APIError; @@ -41,9 +41,8 @@ use crypto::hmac::Hmac; use crypto::digest::Digest; use crypto::symmetriccipher::SynchronousStreamCipher; -use std::{ptr, mem}; -use std::collections::HashMap; -use std::collections::hash_map; +use std::{cmp, ptr, mem}; +use std::collections::{HashMap, hash_map, HashSet}; use std::io::Cursor; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -301,6 +300,25 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum /// /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through /// to individual Channels. +/// +/// Implements Writeable to write out all channel state to disk. Implies peer_disconnected() for +/// all peers during write/read (though does not modify this instance, only the instance being +/// serialized). This will result in any channels which have not yet exchanged funding_created (ie +/// called funding_transaction_generated for outbound channels). +/// +/// Note that you can be a bit lazier about writing out ChannelManager than you can be with +/// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before +/// returning from ManyChannelMonitor::add_update_monitor, with ChannelManagers, writing updates +/// happens out-of-band (and will prevent any other ChannelManager operations from occurring during +/// the serialization process). If the deserialized version is out-of-date compared to the +/// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the +/// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees). +/// +/// Note that the deserializer is only implemented for (Sha256dHash, ChannelManager), which +/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along +/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call +/// block_connected() to step towards your best block) upon deserialization before using the +/// object! pub struct ChannelManager { genesis_hash: Sha256dHash, fee_estimator: Arc, @@ -311,6 +329,7 @@ pub struct ChannelManager { announce_channels_publicly: bool, fee_proportional_millionths: u32, latest_block_height: AtomicUsize, + last_block_hash: Mutex, secp_ctx: Secp256k1, channel_state: Mutex, @@ -408,7 +427,8 @@ impl ChannelManager { announce_channels_publicly, fee_proportional_millionths, - latest_block_height: AtomicUsize::new(0), //TODO: Get an init value (generally need to replay recent chain on chain_monitor registration) + latest_block_height: AtomicUsize::new(0), //TODO: Get an init value + last_block_hash: Mutex::new(Default::default()), secp_ctx, channel_state: Mutex::new(ChannelHolder{ @@ -2519,6 +2539,7 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.store(height as usize, Ordering::Release); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } /// We force-close the channel without letting our counterparty participate in the shutdown @@ -2551,6 +2572,7 @@ impl ChainListener for ChannelManager { self.finish_force_close_channel(failure); } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); + *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } } @@ -2764,6 +2786,391 @@ impl ChannelMessageHandler for ChannelManager { } } +const SERIALIZATION_VERSION: u8 = 1; +const MIN_SERIALIZATION_VERSION: u8 = 1; + +impl Writeable for PendingForwardHTLCInfo { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + if let &Some(ref onion) = &self.onion_packet { + 1u8.write(writer)?; + onion.write(writer)?; + } else { + 0u8.write(writer)?; + } + self.incoming_shared_secret.write(writer)?; + self.payment_hash.write(writer)?; + self.short_channel_id.write(writer)?; + self.amt_to_forward.write(writer)?; + self.outgoing_cltv_value.write(writer)?; + Ok(()) + } +} + +impl Readable for PendingForwardHTLCInfo { + fn read(reader: &mut R) -> Result { + let onion_packet = match >::read(reader)? { + 0 => None, + 1 => Some(msgs::OnionPacket::read(reader)?), + _ => return Err(DecodeError::InvalidValue), + }; + Ok(PendingForwardHTLCInfo { + onion_packet, + incoming_shared_secret: Readable::read(reader)?, + payment_hash: Readable::read(reader)?, + short_channel_id: Readable::read(reader)?, + amt_to_forward: Readable::read(reader)?, + outgoing_cltv_value: Readable::read(reader)?, + }) + } +} + +impl Writeable for HTLCFailureMsg { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailureMsg::Relay(ref fail_msg) => { + 0u8.write(writer)?; + fail_msg.write(writer)?; + }, + &HTLCFailureMsg::Malformed(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailureMsg { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)), + 1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for PendingHTLCStatus { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &PendingHTLCStatus::Forward(ref forward_info) => { + 0u8.write(writer)?; + forward_info.write(writer)?; + }, + &PendingHTLCStatus::Fail(ref fail_msg) => { + 1u8.write(writer)?; + fail_msg.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for PendingHTLCStatus { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)), + 1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCPreviousHopData, 0, { + short_channel_id, + htlc_id, + incoming_packet_shared_secret +}); + +impl Writeable for HTLCSource { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCSource::PreviousHopData(ref hop_data) => { + 0u8.write(writer)?; + hop_data.write(writer)?; + }, + &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => { + 1u8.write(writer)?; + route.write(writer)?; + session_priv.write(writer)?; + first_hop_htlc_msat.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCSource { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), + 1 => Ok(HTLCSource::OutboundRoute { + route: Readable::read(reader)?, + session_priv: Readable::read(reader)?, + first_hop_htlc_msat: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl Writeable for HTLCFailReason { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &HTLCFailReason::ErrorPacket { ref err } => { + 0u8.write(writer)?; + err.write(writer)?; + }, + &HTLCFailReason::Reason { ref failure_code, ref data } => { + 1u8.write(writer)?; + failure_code.write(writer)?; + data.write(writer)?; + } + } + Ok(()) + } +} + +impl Readable for HTLCFailReason { + fn read(reader: &mut R) -> Result { + match >::read(reader)? { + 0 => Ok(HTLCFailReason::ErrorPacket { err: Readable::read(reader)? }), + 1 => Ok(HTLCFailReason::Reason { + failure_code: Readable::read(reader)?, + data: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + +impl_writeable!(HTLCForwardInfo, 0, { + prev_short_channel_id, + prev_htlc_id, + forward_info +}); + +impl Writeable for ChannelManager { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + let _ = self.total_consistency_lock.write().unwrap(); + + writer.write_all(&[SERIALIZATION_VERSION; 1])?; + writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + + self.genesis_hash.write(writer)?; + self.announce_channels_publicly.write(writer)?; + self.fee_proportional_millionths.write(writer)?; + (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?; + self.last_block_hash.lock().unwrap().write(writer)?; + + let channel_state = self.channel_state.lock().unwrap(); + let mut unfunded_channels = 0; + for (_, channel) in channel_state.by_id.iter() { + if !channel.is_funding_initiated() { + unfunded_channels += 1; + } + } + ((channel_state.by_id.len() - unfunded_channels) as u64).write(writer)?; + for (_, channel) in channel_state.by_id.iter() { + if channel.is_funding_initiated() { + channel.write(writer)?; + } + } + + (channel_state.forward_htlcs.len() as u64).write(writer)?; + for (short_channel_id, pending_forwards) in channel_state.forward_htlcs.iter() { + short_channel_id.write(writer)?; + (pending_forwards.len() as u64).write(writer)?; + for forward in pending_forwards { + forward.write(writer)?; + } + } + + (channel_state.claimable_htlcs.len() as u64).write(writer)?; + for (payment_hash, previous_hops) in channel_state.claimable_htlcs.iter() { + payment_hash.write(writer)?; + (previous_hops.len() as u64).write(writer)?; + for previous_hop in previous_hops { + previous_hop.write(writer)?; + } + } + + Ok(()) + } +} + +/// Arguments for the creation of a ChannelManager that are not deserialized. +/// +/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation +/// is: +/// 1) Deserialize all stored ChannelMonitors. +/// 2) Deserialize the ChannelManager by filling in this struct and calling <(Sha256dHash, +/// ChannelManager)>::read(reader, args). +/// This may result in closing some Channels if the ChannelMonitor is newer than the stored +/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. +/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using +/// ChannelMonitor::get_monitored_outpoints and ChannelMonitor::get_funding_txo(). +/// 4) Reconnect blocks on your ChannelMonitors. +/// 5) Move the ChannelMonitors into your local ManyChannelMonitor. +/// 6) Disconnect/connect blocks on the ChannelManager. +/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen +/// automatically as it does in ChannelManager::new()). +pub struct ChannelManagerReadArgs<'a> { + /// The keys provider which will give us relevant keys. Some keys will be loaded during + /// deserialization. + pub keys_manager: Arc, + + /// The fee_estimator for use in the ChannelManager in the future. + /// + /// No calls to the FeeEstimator will be made during deserialization. + pub fee_estimator: Arc, + /// The ManyChannelMonitor for use in the ChannelManager in the future. + /// + /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that + /// you have deserialized ChannelMonitors separately and will add them to your + /// ManyChannelMonitor after deserializing this ChannelManager. + pub monitor: Arc, + /// The ChainWatchInterface for use in the ChannelManager in the future. + /// + /// No calls to the ChainWatchInterface will be made during deserialization. + pub chain_monitor: Arc, + /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be + /// used to broadcast the latest local commitment transactions of channels which must be + /// force-closed during deserialization. + pub tx_broadcaster: Arc, + /// The Logger for use in the ChannelManager and which may be used to log information during + /// deserialization. + pub logger: Arc, + + + /// A map from channel funding outpoints to ChannelMonitors for those channels (ie + /// value.get_funding_txo() should be the key). + /// + /// If a monitor is inconsistent with the channel state during deserialization the channel will + /// be force-closed using the data in the channelmonitor and the Channel will be dropped. This + /// is true for missing channels as well. If there is a monitor missing for which we find + /// channel data Err(DecodeError::InvalidValue) will be returned. + /// + /// In such cases the latest local transactions will be sent to the tx_broadcaster included in + /// this struct. + pub channel_monitors: &'a HashMap, +} + +impl<'a, R : ::std::io::Read> ReadableArgs> for (Sha256dHash, ChannelManager) { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result { + let _ver: u8 = Readable::read(reader)?; + let min_ver: u8 = Readable::read(reader)?; + if min_ver > SERIALIZATION_VERSION { + return Err(DecodeError::UnknownVersion); + } + + let genesis_hash: Sha256dHash = Readable::read(reader)?; + let announce_channels_publicly: bool = Readable::read(reader)?; + let fee_proportional_millionths: u32 = Readable::read(reader)?; + let latest_block_height: u32 = Readable::read(reader)?; + let last_block_hash: Sha256dHash = Readable::read(reader)?; + + let mut closed_channels = Vec::new(); + + let channel_count: u64 = Readable::read(reader)?; + let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128)); + let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + for _ in 0..channel_count { + let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?; + if channel.last_block_connected != last_block_hash { + return Err(DecodeError::InvalidValue); + } + + let funding_txo = channel.channel_monitor().get_funding_txo().ok_or(DecodeError::InvalidValue)?; + funding_txo_set.insert(funding_txo.clone()); + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() || + channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() || + channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() { + let mut force_close_res = channel.force_shutdown(); + force_close_res.0 = monitor.get_latest_local_commitment_txn(); + closed_channels.push(force_close_res); + } else { + if let Some(short_channel_id) = channel.get_short_channel_id() { + short_to_id.insert(short_channel_id, channel.channel_id()); + } + by_id.insert(channel.channel_id(), channel); + } + } else { + return Err(DecodeError::InvalidValue); + } + } + + for (ref funding_txo, ref monitor) in args.channel_monitors.iter() { + if !funding_txo_set.contains(funding_txo) { + closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new())); + } + } + + let forward_htlcs_count: u64 = Readable::read(reader)?; + let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128)); + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128)); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + forward_htlcs.insert(short_channel_id, pending_forwards); + } + + let claimable_htlcs_count: u64 = Readable::read(reader)?; + let mut claimable_htlcs = HashMap::with_capacity(cmp::min(claimable_htlcs_count as usize, 128)); + for _ in 0..claimable_htlcs_count { + let payment_hash = Readable::read(reader)?; + let previous_hops_len: u64 = Readable::read(reader)?; + let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2)); + for _ in 0..previous_hops_len { + previous_hops.push(Readable::read(reader)?); + } + claimable_htlcs.insert(payment_hash, previous_hops); + } + + let channel_manager = ChannelManager { + genesis_hash, + fee_estimator: args.fee_estimator, + monitor: args.monitor, + chain_monitor: args.chain_monitor, + tx_broadcaster: args.tx_broadcaster, + + announce_channels_publicly, + fee_proportional_millionths, + latest_block_height: AtomicUsize::new(latest_block_height as usize), + last_block_hash: Mutex::new(last_block_hash), + secp_ctx: Secp256k1::new(), + + channel_state: Mutex::new(ChannelHolder { + by_id, + short_to_id, + next_forward: Instant::now(), + forward_htlcs, + claimable_htlcs, + pending_msg_events: Vec::new(), + }), + our_network_key: args.keys_manager.get_node_secret(), + + pending_events: Mutex::new(Vec::new()), + total_consistency_lock: RwLock::new(()), + keys_manager: args.keys_manager, + logger: args.logger, + }; + + for close_res in closed_channels.drain(..) { + channel_manager.finish_force_close_channel(close_res); + //TODO: Broadcast channel update for closed channels, but only after we've made a + //connection or two. + } + + Ok((last_block_hash.clone(), channel_manager)) + } +} + #[cfg(test)] mod tests { use chain::chaininterface; diff --git a/src/ln/channelmonitor.rs b/src/ln/channelmonitor.rs index 594045f46..4972ae322 100644 --- a/src/ln/channelmonitor.rs +++ b/src/ln/channelmonitor.rs @@ -692,7 +692,7 @@ impl ChannelMonitor { } writer.write_all(&byte_utils::be64_to_array(self.remote_claimable_outpoints.len() as u64))?; - for (txid, htlc_outputs) in self.remote_claimable_outpoints.iter() { + for (ref txid, ref htlc_outputs) in self.remote_claimable_outpoints.iter() { writer.write_all(&txid[..])?; writer.write_all(&byte_utils::be64_to_array(htlc_outputs.len() as u64))?; for htlc_output in htlc_outputs.iter() { @@ -701,9 +701,9 @@ impl ChannelMonitor { } writer.write_all(&byte_utils::be64_to_array(self.remote_commitment_txn_on_chain.len() as u64))?; - for (txid, (commitment_number, txouts)) in self.remote_commitment_txn_on_chain.iter() { + for (ref txid, &(commitment_number, ref txouts)) in self.remote_commitment_txn_on_chain.iter() { writer.write_all(&txid[..])?; - writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; + writer.write_all(&byte_utils::be48_to_array(commitment_number))?; (txouts.len() as u64).write(writer)?; for script in txouts.iter() { script.write(writer)?; @@ -712,8 +712,8 @@ impl ChannelMonitor { if for_local_storage { writer.write_all(&byte_utils::be64_to_array(self.remote_hash_commitment_number.len() as u64))?; - for (payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() { - writer.write_all(payment_hash)?; + for (ref payment_hash, commitment_number) in self.remote_hash_commitment_number.iter() { + writer.write_all(*payment_hash)?; writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; } } else { diff --git a/src/util/ser_macros.rs b/src/util/ser_macros.rs index 900270f31..48e87b3bc 100644 --- a/src/util/ser_macros.rs +++ b/src/util/ser_macros.rs @@ -1,7 +1,7 @@ macro_rules! impl_writeable { ($st:ident, $len: expr, {$($field:ident),*}) => { - impl Writeable for $st { - fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + impl ::util::ser::Writeable for $st { + fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { if $len != 0 { w.size_hint($len); } @@ -10,10 +10,10 @@ macro_rules! impl_writeable { } } - impl Readable for $st { - fn read(r: &mut R) -> Result { + impl ::util::ser::Readable for $st { + fn read(r: &mut R) -> Result { Ok(Self { - $($field: Readable::read(r)?),* + $($field: ::util::ser::Readable::read(r)?),* }) } } -- 2.39.5