Merge pull request #571 from ariard/2020-04-fix-minimalif
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 5f7e903fd201c6da81c0eadf20dd61d6f98e2dd6..a471ca3f3675fcab67677381164c7dc69dd8903a 100644 (file)
@@ -28,9 +28,9 @@ use secp256k1;
 use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
-use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::features::{InitFeatures, NodeFeatures};
 use ln::router::Route;
-use ln::features::InitFeatures;
 use ln::msgs;
 use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
@@ -152,7 +152,7 @@ pub struct PaymentHash(pub [u8;32]);
 #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
 pub struct PaymentPreimage(pub [u8;32]);
 
-type ShutdownResult = (Vec<Transaction>, Vec<(HTLCSource, PaymentHash)>);
+type ShutdownResult = (Option<OutPoint>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>);
 
 /// Error type returned across the channel_state mutex boundary. When an Err is generated for a
 /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel
@@ -368,6 +368,10 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
        channel_state: Mutex<ChannelHolder<ChanSigner>>,
        our_network_key: SecretKey,
 
+       /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
+       /// value increases strictly since we don't assume access to a time source.
+       last_node_announcement_serial: AtomicUsize,
+
        /// The bulk of our storage will eventually be here (channels and message queues and the like).
        /// If we are connected to a peer we always at least have an entry here, even if no channels
        /// are currently open with that peer.
@@ -463,21 +467,41 @@ pub struct ChannelDetails {
 }
 
 macro_rules! handle_error {
-       ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => {
+       ($self: ident, $internal: expr, $their_node_id: expr) => {
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, shutdown_finish }) => {
+                               #[cfg(debug_assertions)]
+                               {
+                                       // In testing, ensure there are no deadlocks where the lock is already held upon
+                                       // entering the macro.
+                                       assert!($self.channel_state.try_lock().is_ok());
+                               }
+
+                               let mut msg_events = Vec::with_capacity(2);
+
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
                                        $self.finish_force_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
-                                               $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                               msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
                                }
+
                                log_error!($self, "{}", err.err);
                                if let msgs::ErrorAction::IgnoreError = err.action {
-                               } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); }
+                               } else {
+                                       msg_events.push(events::MessageSendEvent::HandleError {
+                                               node_id: $their_node_id,
+                                               action: err.action.clone()
+                                       });
+                               }
+
+                               if !msg_events.is_empty() {
+                                       $self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events);
+                               }
+
                                // Return error in case higher-API need one
                                Err(err)
                        },
@@ -498,8 +522,7 @@ macro_rules! break_chan_entry {
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()))
-                       },
+                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())) },
                        Err(ChannelError::CloseDelayBroadcast { .. }) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                }
        }
@@ -518,7 +541,7 @@ macro_rules! try_chan_entry {
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()))
+                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
                        },
                        Err(ChannelError::CloseDelayBroadcast { msg, update }) => {
                                log_error!($self, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
@@ -536,11 +559,7 @@ macro_rules! try_chan_entry {
                                                ChannelMonitorUpdateErr::TemporaryFailure => {},
                                        }
                                }
-                               let mut shutdown_res = chan.force_shutdown();
-                               if shutdown_res.0.len() >= 1 {
-                                       log_error!($self, "You have a toxic local commitment transaction {} avaible in channel monitor, read comment in ChannelMonitor::get_latest_local_commitment_txn to be informed of manual action to take", shutdown_res.0[0].txid());
-                               }
-                               shutdown_res.0.clear();
+                               let shutdown_res = chan.force_shutdown(false);
                                return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
                        }
                }
@@ -568,7 +587,7 @@ macro_rules! handle_monitor_err {
                                // splitting hairs we'd prefer to claim payments that were to us, but we haven't
                                // given up the preimage yet, so might as well just wait until the payment is
                                // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()));
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()));
                                res
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
@@ -665,6 +684,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        }),
                        our_network_key: keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(0),
+
                        per_peer_state: RwLock::new(HashMap::new()),
 
                        pending_events: Mutex::new(Vec::new()),
@@ -814,14 +835,17 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
        #[inline]
        fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
-               let (local_txn, mut failed_htlcs) = shutdown_res;
-               log_trace!(self, "Finishing force-closure of channel with {} transactions to broadcast and {} HTLCs to fail", local_txn.len(), failed_htlcs.len());
+               let (funding_txo_option, monitor_update, mut failed_htlcs) = shutdown_res;
+               log_trace!(self, "Finishing force-closure of channel {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               for tx in local_txn {
-                       log_trace!(self, "Broadcast onchain {}", log_tx!(tx));
-                       self.tx_broadcaster.broadcast_transaction(&tx);
+               if let Some(funding_txo) = funding_txo_option {
+                       // There isn't anything we can do if we get an update failure - we're already
+                       // force-closing. The monitor update on the required in-memory copy should broadcast
+                       // the latest local state, which is the best we can do anyway. Thus, it is safe to
+                       // ignore the result here.
+                       let _ = self.monitor.update_monitor(funding_txo, monitor_update);
                }
        }
 
@@ -843,7 +867,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        }
                };
                log_trace!(self, "Force-closing channel {}", log_bytes!(channel_id[..]));
-               self.finish_force_close_channel(chan.force_shutdown());
+               self.finish_force_close_channel(chan.force_shutdown(true));
                if let Ok(update) = self.get_channel_update(&chan) {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
@@ -1118,7 +1142,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let unsigned = msgs::UnsignedChannelUpdate {
                        chain_hash: self.genesis_hash,
                        short_channel_id: short_channel_id,
-                       timestamp: chan.get_channel_update_count(),
+                       timestamp: chan.get_update_time_counter(),
                        flags: (!were_node_one) as u16 | ((!chan.is_live() as u16) << 1),
                        cltv_expiry_delta: CLTV_EXPIRY_DELTA,
                        htlc_minimum_msat: chan.get_our_htlc_minimum_msat(),
@@ -1187,9 +1211,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
                let _ = self.total_consistency_lock.read().unwrap();
 
-               let mut channel_lock = self.channel_state.lock().unwrap();
                let err: Result<(), _> = loop {
-
+                       let mut channel_lock = self.channel_state.lock().unwrap();
                        let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
                                Some(id) => id.clone(),
@@ -1238,7 +1261,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        return Ok(());
                };
 
-               match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) {
+               match handle_error!(self, err, route.hops.first().unwrap().pubkey) {
                        Ok(_) => unreachable!(),
                        Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) }
                }
@@ -1257,18 +1280,17 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let _ = self.total_consistency_lock.read().unwrap();
 
                let (mut chan, msg, chan_monitor) = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) {
+                       let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
                                Some(mut chan) => {
                                        (chan.get_outbound_funding_created(funding_txo)
                                                .map_err(|e| if let ChannelError::Close(msg) = e {
-                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.force_shutdown(), None)
+                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.force_shutdown(true), None)
                                                } else { unreachable!(); })
                                        , chan)
                                },
                                None => return
                        };
-                       match handle_error!(self, res, chan.get_their_node_id(), channel_state) {
+                       match handle_error!(self, res, chan.get_their_node_id()) {
                                Ok(funding_msg) => {
                                        (chan, funding_msg.0, funding_msg.1)
                                },
@@ -1280,12 +1302,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                if let Err(e) = self.monitor.add_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                        match e {
                                ChannelMonitorUpdateErr::PermanentFailure => {
-                                       {
-                                               let mut channel_state = self.channel_state.lock().unwrap();
-                                               match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id(), channel_state) {
-                                                       Err(_) => { return; },
-                                                       Ok(()) => unreachable!(),
-                                               }
+                                       match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id()) {
+                                               Err(_) => { return; },
+                                               Ok(()) => unreachable!(),
                                        }
                                },
                                ChannelMonitorUpdateErr::TemporaryFailure => {
@@ -1334,6 +1353,57 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                })
        }
 
+       #[allow(dead_code)]
+       // Messages of up to 64KB should never end up more than half full with addresses, as that would
+       // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
+       // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
+       // message...
+       const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (msgs::NetAddress::MAX_LEN as u32 + 1) / 2;
+       #[deny(const_err)]
+       #[allow(dead_code)]
+       // ...by failing to compile if the number of addresses that would be half of a message is
+       // smaller than 500:
+       const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500;
+
+       /// Generates a signed node_announcement from the given arguments and creates a
+       /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have
+       /// seen a channel_announcement from us (ie unless we have public channels open).
+       ///
+       /// RGB is a node "color" and alias is a printable human-readable string to describe this node
+       /// to humans. They carry no in-protocol meaning.
+       ///
+       /// addresses represent the set (possibly empty) of socket addresses on which this node accepts
+       /// incoming connections. These will be broadcast to the network, publicly tying these
+       /// addresses together. If you wish to preserve user privacy, addresses should likely contain
+       /// only Tor Onion addresses.
+       ///
+       /// Panics if addresses is absurdly large (more than 500).
+       pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec<msgs::NetAddress>) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
+               if addresses.len() > 500 {
+                       panic!("More than half the message size was taken up by public addresses!");
+               }
+
+               let announcement = msgs::UnsignedNodeAnnouncement {
+                       features: NodeFeatures::supported(),
+                       timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
+                       node_id: self.get_our_node_id(),
+                       rgb, alias, addresses,
+                       excess_address_data: Vec::new(),
+                       excess_data: Vec::new(),
+               };
+               let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
+
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
+                       msg: msgs::NodeAnnouncement {
+                               signature: self.secp_ctx.sign(&msghash, &self.our_network_key),
+                               contents: announcement
+                       },
+               });
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -1461,14 +1531,12 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                                                                channel_state.short_to_id.remove(&short_id);
                                                                                        }
-                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(), self.get_channel_update(&channel).ok()))
+                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update(&channel).ok()))
                                                                                },
                                                                                ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                                                                        };
-                                                                       match handle_error!(self, err, their_node_id, channel_state) {
-                                                                               Ok(_) => unreachable!(),
-                                                                               Err(_) => { continue; },
-                                                                       }
+                                                                       handle_errors.push((their_node_id, err));
+                                                                       continue;
                                                                }
                                                        };
                                                        if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
@@ -1524,11 +1592,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        };
                }
 
-               if handle_errors.len() > 0 {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       for (their_node_id, err) in handle_errors.drain(..) {
-                               let _ = handle_error!(self, err, their_node_id, channel_state_lock);
-                       }
+               for (their_node_id, err) in handle_errors.drain(..) {
+                       let _ = handle_error!(self, err, their_node_id);
                }
 
                if new_events.is_empty() { return }
@@ -1780,7 +1845,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        return;
                };
 
-               let _ = handle_error!(self, err, their_node_id, channel_state_lock);
+               mem::drop(channel_state_lock);
+               let _ = handle_error!(self, err, their_node_id);
        }
 
        /// Gets the node_id held by this ChannelManager
@@ -1964,7 +2030,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                        // channel, not the temporary_channel_id. This is compatible with ourselves, but the
                                        // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
                                        // any messages referencing a previously-closed channel anyway.
-                                       return Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", funding_msg.channel_id, chan.force_shutdown(), None));
+                                       return Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", funding_msg.channel_id, chan.force_shutdown(true), None));
                                },
                                ChannelMonitorUpdateErr::TemporaryFailure => {
                                        // There's no problem signing a counterparty's funding transaction if our monitor
@@ -2524,9 +2590,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        #[doc(hidden)]
        pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
                let _ = self.total_consistency_lock.read().unwrap();
-               let mut channel_state_lock = self.channel_state.lock().unwrap();
                let their_node_id;
                let err: Result<(), _> = loop {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
 
                        match channel_state.by_id.entry(channel_id) {
@@ -2565,7 +2631,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        return Ok(())
                };
 
-               match handle_error!(self, err, their_node_id, channel_state_lock) {
+               match handle_error!(self, err, their_node_id) {
                        Ok(_) => unreachable!(),
                        Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
                }
@@ -2684,7 +2750,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                                                // It looks like our counterparty went on-chain. We go ahead and
                                                                // broadcast our latest local state as well here, just in case its
                                                                // some kind of SPV attack, though we expect these to be dropped.
-                                                               failed_channels.push(channel.force_shutdown());
+                                                               failed_channels.push(channel.force_shutdown(true));
                                                                if let Ok(update) = self.get_channel_update(&channel) {
                                                                        pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
@@ -2699,11 +2765,10 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                short_to_id.remove(&short_id);
                                        }
-                                       failed_channels.push(channel.force_shutdown());
                                        // If would_broadcast_at_height() is true, the channel_monitor will broadcast
                                        // the latest local tx for us, so we should skip that here (it doesn't really
                                        // hurt anything, but does make tests a bit simpler).
-                                       failed_channels.last_mut().unwrap().0 = Vec::new();
+                                       failed_channels.push(channel.force_shutdown(false));
                                        if let Ok(update) = self.get_channel_update(&channel) {
                                                pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2719,6 +2784,18 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                }
                self.latest_block_height.store(height as usize, Ordering::Release);
                *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash;
+               loop {
+                       // Update last_node_announcement_serial to be the max of its current value and the
+                       // block timestamp. This should keep us close to the current time without relying on
+                       // having an explicit local time source.
+                       // Just in case we end up in a race, we loop until we either successfully update
+                       // last_node_announcement_serial or decide we don't need to.
+                       let old_serial = self.last_node_announcement_serial.load(Ordering::Acquire);
+                       if old_serial >= header.time as usize { break; }
+                       if self.last_node_announcement_serial.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
+                               break;
+                       }
+               }
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
@@ -2735,7 +2812,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        if let Some(short_id) = v.get_short_channel_id() {
                                                short_to_id.remove(&short_id);
                                        }
-                                       failed_channels.push(v.force_shutdown());
+                                       failed_channels.push(v.force_shutdown(true));
                                        if let Ok(update) = self.get_channel_update(&v) {
                                                pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2764,146 +2841,82 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
 {
        fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_open_channel(their_node_id, their_features, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id);
        }
 
        fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_accept_channel(their_node_id, their_features, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id);
        }
 
        fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_funding_created(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id);
        }
 
        fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_funding_signed(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id);
        }
 
        fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_funding_locked(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id);
        }
 
        fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_shutdown(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id);
        }
 
        fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_closing_signed(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id);
        }
 
        fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_update_add_htlc(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id);
        }
 
        fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_update_fulfill_htlc(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id);
        }
 
        fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_update_fail_htlc(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id);
        }
 
        fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_update_fail_malformed_htlc(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id);
        }
 
        fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_commitment_signed(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id);
        }
 
        fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_revoke_and_ack(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id);
        }
 
        fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_update_fee(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id);
        }
 
        fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_announcement_signatures(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id);
        }
 
        fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
                let _ = self.total_consistency_lock.read().unwrap();
-               let res = self.internal_channel_reestablish(their_node_id, msg);
-               if res.is_err() {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
-               }
+               let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id);
        }
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
@@ -2923,7 +2936,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                                if let Some(short_id) = chan.get_short_channel_id() {
                                                        short_to_id.remove(&short_id);
                                                }
-                                               failed_channels.push(chan.force_shutdown());
+                                               failed_channels.push(chan.force_shutdown(true));
                                                if let Ok(update) = self.get_channel_update(&chan) {
                                                        pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                msg: update
@@ -2970,6 +2983,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
+                                       &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
@@ -3288,6 +3302,8 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
                        peer_state.latest_features.write(writer)?;
                }
 
+               (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+
                Ok(())
        }
 }
@@ -3386,7 +3402,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                let latest_block_height: u32 = Readable::read(reader)?;
                let last_block_hash: Sha256dHash = Readable::read(reader)?;
 
-               let mut closed_channels = Vec::new();
+               let mut failed_htlcs = Vec::new();
 
                let channel_count: u64 = Readable::read(reader)?;
                let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
@@ -3401,13 +3417,20 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
-                               if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() ||
-                                               channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() ||
-                                               channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() ||
-                                               channel.get_latest_monitor_update_id() != monitor.get_latest_update_id() {
-                                       let mut force_close_res = channel.force_shutdown();
-                                       force_close_res.0 = monitor.get_latest_local_commitment_txn();
-                                       closed_channels.push(force_close_res);
+                               if channel.get_cur_local_commitment_transaction_number() < monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() < monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() < monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() {
+                                       // If the channel is ahead of the monitor, return InvalidValue:
+                                       return Err(DecodeError::InvalidValue);
+                               } else if channel.get_cur_local_commitment_transaction_number() > monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() > monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() > monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
+                                       // But if the channel is behind of the monitor, close the channel:
+                                       let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true);
+                                       failed_htlcs.append(&mut new_failed_htlcs);
+                                       monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster);
                                } else {
                                        if let Some(short_channel_id) = channel.get_short_channel_id() {
                                                short_to_id.insert(short_channel_id, channel.channel_id());
@@ -3421,7 +3444,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
 
                for (ref funding_txo, ref mut monitor) in args.channel_monitors.iter_mut() {
                        if !funding_txo_set.contains(funding_txo) {
-                               closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new()));
+                               monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster);
                        }
                }
 
@@ -3459,6 +3482,8 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
+               let last_node_announcement_serial: u32 = Readable::read(reader)?;
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
@@ -3478,6 +3503,8 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        }),
                        our_network_key: args.keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
+
                        per_peer_state: RwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(Vec::new()),
@@ -3487,12 +3514,13 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        default_configuration: args.default_config,
                };
 
-               for close_res in closed_channels.drain(..) {
-                       channel_manager.finish_force_close_channel(close_res);
-                       //TODO: Broadcast channel update for closed channels, but only after we've made a
-                       //connection or two.
+               for htlc_source in failed_htlcs.drain(..) {
+                       channel_manager.fail_htlc_backwards_internal(channel_manager.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
 
+               //TODO: Broadcast channel update for closed channels, but only after we've made a
+               //connection or two.
+
                Ok((last_block_hash.clone(), channel_manager))
        }
 }