Serialize ChannelManager events
[rust-lightning] / lightning / src / ln / channelmanager.rs
index cbcccb82f933f7416c7ad5a44e3a5c196ec51391..09798eb9bbab22b47c752605e97c71ce7f4e75bd 100644 (file)
@@ -4,7 +4,7 @@
 //! responsible for tracking which channels are open, HTLCs are in flight and reestablishing those
 //! upon reconnect to the relevant peer(s).
 //!
-//! It does not manage routing logic (see ln::router for that) nor does it manage constructing
+//! It does not manage routing logic (see routing::router::get_route for that) nor does it manage constructing
 //! on-chain transactions (it only monitors the chain to watch for any force-closes that might
 //! imply it needs to fail HTLCs/payments/channels it manages).
 
@@ -14,30 +14,31 @@ use bitcoin::blockdata::constants::genesis_block;
 use bitcoin::network::constants::Network;
 use bitcoin::util::hash::BitcoinHash;
 
-use bitcoin_hashes::{Hash, HashEngine};
-use bitcoin_hashes::hmac::{Hmac, HmacEngine};
-use bitcoin_hashes::sha256::Hash as Sha256;
-use bitcoin_hashes::sha256d::Hash as Sha256dHash;
-use bitcoin_hashes::cmp::fixed_time_eq;
+use bitcoin::hashes::{Hash, HashEngine};
+use bitcoin::hashes::hmac::{Hmac, HmacEngine};
+use bitcoin::hashes::sha256::Hash as Sha256;
+use bitcoin::hashes::sha256d::Hash as Sha256dHash;
+use bitcoin::hashes::cmp::fixed_time_eq;
+use bitcoin::hash_types::BlockHash;
 
-use secp256k1::key::{SecretKey,PublicKey};
-use secp256k1::Secp256k1;
-use secp256k1::ecdh::SharedSecret;
-use secp256k1;
+use bitcoin::secp256k1::key::{SecretKey,PublicKey};
+use bitcoin::secp256k1::Secp256k1;
+use bitcoin::secp256k1::ecdh::SharedSecret;
+use bitcoin::secp256k1;
 
 use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
-use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, HTLC_FAIL_BACK_BUFFER};
+use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
 use ln::features::{InitFeatures, NodeFeatures};
-use ln::router::{Route, RouteHop};
+use routing::router::{Route, RouteHop};
 use ln::msgs;
 use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 use chain::keysinterface::{ChannelKeys, KeysInterface, KeysManager, InMemoryChannelKeys};
 use util::config::UserConfig;
 use util::{byte_utils, events};
-use util::ser::{Readable, ReadableArgs, Writeable, Writer};
+use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::Logger;
 use util::errors::APIError;
@@ -251,7 +252,7 @@ impl MsgHandleErrInternal {
                                                },
                                        },
                                },
-                               ChannelError::CloseDelayBroadcast { msg, .. } => LightningError {
+                               ChannelError::CloseDelayBroadcast(msg) => LightningError {
                                        err: msg,
                                        action: msgs::ErrorAction::SendErrorMessage {
                                                msg: msgs::ErrorMessage {
@@ -375,7 +376,7 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
         F::Target: FeeEstimator,
 {
        default_configuration: UserConfig,
-       genesis_hash: Sha256dHash,
+       genesis_hash: BlockHash,
        fee_estimator: F,
        monitor: M,
        tx_broadcaster: T,
@@ -384,7 +385,7 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
        pub(super) latest_block_height: AtomicUsize,
        #[cfg(not(test))]
        latest_block_height: AtomicUsize,
-       last_block_hash: Mutex<Sha256dHash>,
+       last_block_hash: Mutex<BlockHash>,
        secp_ctx: Secp256k1<secp256k1::All>,
 
        #[cfg(test)]
@@ -574,8 +575,9 @@ macro_rules! break_chan_entry {
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())) },
-                       Err(ChannelError::CloseDelayBroadcast { .. }) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
+                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
+                       },
+                       Err(ChannelError::CloseDelayBroadcast(_)) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                }
        }
 }
@@ -595,22 +597,12 @@ macro_rules! try_chan_entry {
                                }
                                return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
                        },
-                       Err(ChannelError::CloseDelayBroadcast { msg, update }) => {
+                       Err(ChannelError::CloseDelayBroadcast(msg)) => {
                                log_error!($self, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
                                let (channel_id, mut chan) = $entry.remove_entry();
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               if let Err(e) = $self.monitor.update_monitor(chan.get_funding_txo().unwrap(), update) {
-                                       match e {
-                                               // Upstream channel is dead, but we want at least to fail backward HTLCs to save
-                                               // downstream channels. In case of PermanentFailure, we are not going to be able
-                                               // to claim back to_remote output on remote commitment transaction. Doesn't
-                                               // make a difference here, we are concern about HTLCs circuit, not onchain funds.
-                                               ChannelMonitorUpdateErr::PermanentFailure => {},
-                                               ChannelMonitorUpdateErr::TemporaryFailure => {},
-                                       }
-                               }
                                let shutdown_res = chan.force_shutdown(false);
                                return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
                        }
@@ -827,7 +819,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        }
 
        /// Gets the list of usable channels, in random order. Useful as an argument to
-       /// Router::get_route to ensure non-announced channels are used.
+       /// get_route to ensure non-announced channels are used.
        ///
        /// These are guaranteed to have their is_live value set to true, see the documentation for
        /// ChannelDetails::is_live for more info on exactly what the criteria are.
@@ -1039,7 +1031,11 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
                                // OUR PAYMENT!
                                // final_expiry_too_soon
-                               if (msg.cltv_expiry as u64) < self.latest_block_height.load(Ordering::Acquire) as u64 + (CLTV_CLAIM_BUFFER + LATENCY_GRACE_PERIOD_BLOCKS) as u64 {
+                               // We have to have some headroom to broadcast on chain if we have the preimage, so make sure we have at least
+                               // HTLC_FAIL_BACK_BUFFER blocks to go.
+                               // Also, ensure that, in the case of an unknown payment hash, our payment logic has enough time to fail the HTLC backward
+                               // before our onchain logic triggers a channel closure (see HTLC_FAIL_BACK_BUFFER rational).
+                               if (msg.cltv_expiry as u64) <= self.latest_block_height.load(Ordering::Acquire) as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
                                        return_err!("The final CLTV expiry is too soon to handle", 17, &[0;0]);
                                }
                                // final_incorrect_htlc_amount
@@ -1163,13 +1159,20 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                break Some(("Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", 0x1000 | 13, Some(self.get_channel_update(chan).unwrap())));
                                        }
                                        let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1;
-                                       // We want to have at least LATENCY_GRACE_PERIOD_BLOCKS to fail prior to going on chain CLAIM_BUFFER blocks before expiration
-                                       if msg.cltv_expiry <= cur_height + CLTV_CLAIM_BUFFER + LATENCY_GRACE_PERIOD_BLOCKS as u32 { // expiry_too_soon
+                                       // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, but we want to be robust wrt to counterparty
+                                       // packet sanitization (see HTLC_FAIL_BACK_BUFFER rational)
+                                       if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon
                                                break Some(("CLTV expiry is too close", 0x1000 | 14, Some(self.get_channel_update(chan).unwrap())));
                                        }
                                        if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far
                                                break Some(("CLTV expiry is too far in the future", 21, None));
                                        }
+                                       // In theory, we would be safe against unitentional channel-closure, if we only required a margin of LATENCY_GRACE_PERIOD_BLOCKS.
+                                       // But, to be safe against policy reception, we use a longuer delay.
+                                       if (*outgoing_cltv_value) as u64 <= (cur_height + HTLC_FAIL_BACK_BUFFER) as u64 {
+                                               break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, Some(self.get_channel_update(chan).unwrap())));
+                                       }
+
                                        break None;
                                }
                                {
@@ -1496,7 +1499,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                }
 
                let announcement = msgs::UnsignedNodeAnnouncement {
-                       features: NodeFeatures::supported(),
+                       features: NodeFeatures::known(),
                        timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
                        node_id: self.get_our_node_id(),
                        rgb, alias, addresses,
@@ -1653,7 +1656,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                                                        }
                                                                                        Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update(&channel).ok()))
                                                                                },
-                                                                               ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
+                                                                               ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                                                                        };
                                                                        handle_errors.push((their_node_id, err));
                                                                        continue;
@@ -1866,7 +1869,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                        .. } => {
                                                // we get a fail_malformed_htlc from the first hop
                                                // TODO: We'd like to generate a PaymentFailureNetworkUpdate for temporary
-                                               // failures here, but that would be insufficient as Router::get_route
+                                               // failures here, but that would be insufficient as get_route
                                                // generally ignores its view of our own channels as we provide them via
                                                // ChannelDetails.
                                                // TODO: For non-temporary failures, we really should be closing the
@@ -3605,6 +3608,12 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
                        peer_state.latest_features.write(writer)?;
                }
 
+               let events = self.pending_events.lock().unwrap();
+               (events.len() as u64).write(writer)?;
+               for event in events.iter() {
+                       event.write(writer)?;
+               }
+
                (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
 
                Ok(())
@@ -3675,20 +3684,20 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: 'a + ChannelKeys, M: Deref, T:
 // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the
 // SipmleArcChannelManager type:
 impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref>
-       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (Sha256dHash, Arc<ChannelManager<ChanSigner, M, T, K, F>>)
+       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (BlockHash, Arc<ChannelManager<ChanSigner, M, T, K, F>>)
        where M::Target: ManyChannelMonitor<ChanSigner>,
         T::Target: BroadcasterInterface,
         K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
         F::Target: FeeEstimator,
 {
        fn read<R: ::std::io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>) -> Result<Self, DecodeError> {
-               let (blockhash, chan_manager) = <(Sha256dHash, ChannelManager<ChanSigner, M, T, K, F>)>::read(reader, args)?;
+               let (blockhash, chan_manager) = <(BlockHash, ChannelManager<ChanSigner, M, T, K, F>)>::read(reader, args)?;
                Ok((blockhash, Arc::new(chan_manager)))
        }
 }
 
 impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref>
-       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (Sha256dHash, ChannelManager<ChanSigner, M, T, K, F>)
+       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (BlockHash, ChannelManager<ChanSigner, M, T, K, F>)
        where M::Target: ManyChannelMonitor<ChanSigner>,
         T::Target: BroadcasterInterface,
         K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
@@ -3701,9 +3710,9 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        return Err(DecodeError::UnknownVersion);
                }
 
-               let genesis_hash: Sha256dHash = Readable::read(reader)?;
+               let genesis_hash: BlockHash = Readable::read(reader)?;
                let latest_block_height: u32 = Readable::read(reader)?;
-               let last_block_hash: Sha256dHash = Readable::read(reader)?;
+               let last_block_hash: BlockHash = Readable::read(reader)?;
 
                let mut failed_htlcs = Vec::new();
 
@@ -3751,12 +3760,13 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        }
                }
 
+               const MAX_ALLOC_SIZE: usize = 1024 * 64;
                let forward_htlcs_count: u64 = Readable::read(reader)?;
                let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
                for _ in 0..forward_htlcs_count {
                        let short_channel_id = Readable::read(reader)?;
                        let pending_forwards_count: u64 = Readable::read(reader)?;
-                       let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128));
+                       let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, MAX_ALLOC_SIZE/mem::size_of::<HTLCForwardInfo>()));
                        for _ in 0..pending_forwards_count {
                                pending_forwards.push(Readable::read(reader)?);
                        }
@@ -3768,7 +3778,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                for _ in 0..claimable_htlcs_count {
                        let payment_hash = Readable::read(reader)?;
                        let previous_hops_len: u64 = Readable::read(reader)?;
-                       let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2));
+                       let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, MAX_ALLOC_SIZE/mem::size_of::<ClaimableHTLC>()));
                        for _ in 0..previous_hops_len {
                                previous_hops.push(Readable::read(reader)?);
                        }
@@ -3776,7 +3786,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                }
 
                let peer_count: u64 = Readable::read(reader)?;
-               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, 128));
+               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState>)>()));
                for _ in 0..peer_count {
                        let peer_pubkey = Readable::read(reader)?;
                        let peer_state = PeerState {
@@ -3785,6 +3795,15 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
+               let event_count: u64 = Readable::read(reader)?;
+               let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
+               for _ in 0..event_count {
+                       match MaybeReadable::read(reader)? {
+                               Some(event) => pending_events_read.push(event),
+                               None => continue,
+                       }
+               }
+
                let last_node_announcement_serial: u32 = Readable::read(reader)?;
 
                let channel_manager = ChannelManager {
@@ -3810,7 +3829,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
 
                        per_peer_state: RwLock::new(per_peer_state),
 
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(pending_events_read),
                        total_consistency_lock: RwLock::new(()),
                        keys_manager: args.keys_manager,
                        logger: args.logger,