Merge pull request #516 from TheBlueMatt/2020-02-checkin-arch
[rust-lightning] / lightning / src / ln / channelmanager.rs
index c515ceab9a8b5c174942e83e85ac3adb5089d219..1afce1f7bb99e3b9ebe207567caeb205108b59f8 100644 (file)
@@ -28,9 +28,9 @@ use secp256k1;
 use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
-use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::features::{InitFeatures, NodeFeatures};
 use ln::router::Route;
-use ln::features::InitFeatures;
 use ln::msgs;
 use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
@@ -152,7 +152,7 @@ pub struct PaymentHash(pub [u8;32]);
 #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
 pub struct PaymentPreimage(pub [u8;32]);
 
-type ShutdownResult = (Vec<Transaction>, Vec<(HTLCSource, PaymentHash)>);
+type ShutdownResult = (Option<OutPoint>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>);
 
 /// Error type returned across the channel_state mutex boundary. When an Err is generated for a
 /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel
@@ -368,6 +368,10 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
        channel_state: Mutex<ChannelHolder<ChanSigner>>,
        our_network_key: SecretKey,
 
+       /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
+       /// value increases strictly since we don't assume access to a time source.
+       last_node_announcement_serial: AtomicUsize,
+
        /// The bulk of our storage will eventually be here (channels and message queues and the like).
        /// If we are connected to a peer we always at least have an entry here, even if no channels
        /// are currently open with that peer.
@@ -498,8 +502,7 @@ macro_rules! break_chan_entry {
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()))
-                       },
+                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())) },
                        Err(ChannelError::CloseDelayBroadcast { .. }) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                }
        }
@@ -518,7 +521,7 @@ macro_rules! try_chan_entry {
                                if let Some(short_id) = chan.get_short_channel_id() {
                                        $channel_state.short_to_id.remove(&short_id);
                                }
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()))
+                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
                        },
                        Err(ChannelError::CloseDelayBroadcast { msg, update }) => {
                                log_error!($self, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
@@ -536,11 +539,7 @@ macro_rules! try_chan_entry {
                                                ChannelMonitorUpdateErr::TemporaryFailure => {},
                                        }
                                }
-                               let mut shutdown_res = chan.force_shutdown();
-                               if shutdown_res.0.len() >= 1 {
-                                       log_error!($self, "You have a toxic local commitment transaction {} avaible in channel monitor, read comment in ChannelMonitor::get_latest_local_commitment_txn to be informed of manual action to take", shutdown_res.0[0].txid());
-                               }
-                               shutdown_res.0.clear();
+                               let shutdown_res = chan.force_shutdown(false);
                                return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
                        }
                }
@@ -568,7 +567,7 @@ macro_rules! handle_monitor_err {
                                // splitting hairs we'd prefer to claim payments that were to us, but we haven't
                                // given up the preimage yet, so might as well just wait until the payment is
                                // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", channel_id, chan.force_shutdown(), $self.get_channel_update(&chan).ok()));
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()));
                                res
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
@@ -665,6 +664,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        }),
                        our_network_key: keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(0),
+
                        per_peer_state: RwLock::new(HashMap::new()),
 
                        pending_events: Mutex::new(Vec::new()),
@@ -690,12 +691,13 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        ///
        /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat is
        /// greater than channel_value_satoshis * 1k or channel_value_satoshis is < 1000.
-       pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> {
+       pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64, override_config: Option<UserConfig>) -> Result<(), APIError> {
                if channel_value_satoshis < 1000 {
                        return Err(APIError::APIMisuseError { err: "channel_value must be at least 1000 satoshis" });
                }
 
-               let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, Arc::clone(&self.logger), &self.default_configuration)?;
+               let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
+               let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, Arc::clone(&self.logger), config)?;
                let res = channel.get_open_channel(self.genesis_hash.clone(), &self.fee_estimator);
 
                let _ = self.total_consistency_lock.read().unwrap();
@@ -813,14 +815,17 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
        #[inline]
        fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
-               let (local_txn, mut failed_htlcs) = shutdown_res;
-               log_trace!(self, "Finishing force-closure of channel with {} transactions to broadcast and {} HTLCs to fail", local_txn.len(), failed_htlcs.len());
+               let (funding_txo_option, monitor_update, mut failed_htlcs) = shutdown_res;
+               log_trace!(self, "Finishing force-closure of channel {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               for tx in local_txn {
-                       log_trace!(self, "Broadcast onchain {}", log_tx!(tx));
-                       self.tx_broadcaster.broadcast_transaction(&tx);
+               if let Some(funding_txo) = funding_txo_option {
+                       // There isn't anything we can do if we get an update failure - we're already
+                       // force-closing. The monitor update on the required in-memory copy should broadcast
+                       // the latest local state, which is the best we can do anyway. Thus, it is safe to
+                       // ignore the result here.
+                       let _ = self.monitor.update_monitor(funding_txo, monitor_update);
                }
        }
 
@@ -842,7 +847,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        }
                };
                log_trace!(self, "Force-closing channel {}", log_bytes!(channel_id[..]));
-               self.finish_force_close_channel(chan.force_shutdown());
+               self.finish_force_close_channel(chan.force_shutdown(true));
                if let Ok(update) = self.get_channel_update(&chan) {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
@@ -1117,7 +1122,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let unsigned = msgs::UnsignedChannelUpdate {
                        chain_hash: self.genesis_hash,
                        short_channel_id: short_channel_id,
-                       timestamp: chan.get_channel_update_count(),
+                       timestamp: chan.get_update_time_counter(),
                        flags: (!were_node_one) as u16 | ((!chan.is_live() as u16) << 1),
                        cltv_expiry_delta: CLTV_EXPIRY_DELTA,
                        htlc_minimum_msat: chan.get_our_htlc_minimum_msat(),
@@ -1261,7 +1266,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                Some(mut chan) => {
                                        (chan.get_outbound_funding_created(funding_txo)
                                                .map_err(|e| if let ChannelError::Close(msg) = e {
-                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.force_shutdown(), None)
+                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.force_shutdown(true), None)
                                                } else { unreachable!(); })
                                        , chan)
                                },
@@ -1281,7 +1286,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                ChannelMonitorUpdateErr::PermanentFailure => {
                                        {
                                                let mut channel_state = self.channel_state.lock().unwrap();
-                                               match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id(), channel_state) {
+                                               match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id(), channel_state) {
                                                        Err(_) => { return; },
                                                        Ok(()) => unreachable!(),
                                                }
@@ -1333,6 +1338,57 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                })
        }
 
+       #[allow(dead_code)]
+       // Messages of up to 64KB should never end up more than half full with addresses, as that would
+       // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
+       // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
+       // message...
+       const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (msgs::NetAddress::MAX_LEN as u32 + 1) / 2;
+       #[deny(const_err)]
+       #[allow(dead_code)]
+       // ...by failing to compile if the number of addresses that would be half of a message is
+       // smaller than 500:
+       const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500;
+
+       /// Generates a signed node_announcement from the given arguments and creates a
+       /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have
+       /// seen a channel_announcement from us (ie unless we have public channels open).
+       ///
+       /// RGB is a node "color" and alias is a printable human-readable string to describe this node
+       /// to humans. They carry no in-protocol meaning.
+       ///
+       /// addresses represent the set (possibly empty) of socket addresses on which this node accepts
+       /// incoming connections. These will be broadcast to the network, publicly tying these
+       /// addresses together. If you wish to preserve user privacy, addresses should likely contain
+       /// only Tor Onion addresses.
+       ///
+       /// Panics if addresses is absurdly large (more than 500).
+       pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec<msgs::NetAddress>) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
+               if addresses.len() > 500 {
+                       panic!("More than half the message size was taken up by public addresses!");
+               }
+
+               let announcement = msgs::UnsignedNodeAnnouncement {
+                       features: NodeFeatures::supported(),
+                       timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
+                       node_id: self.get_our_node_id(),
+                       rgb, alias, addresses,
+                       excess_address_data: Vec::new(),
+                       excess_data: Vec::new(),
+               };
+               let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
+
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
+                       msg: msgs::NodeAnnouncement {
+                               signature: self.secp_ctx.sign(&msghash, &self.our_network_key),
+                               contents: announcement
+                       },
+               });
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -1460,7 +1516,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                                                                channel_state.short_to_id.remove(&short_id);
                                                                                        }
-                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(), self.get_channel_update(&channel).ok()))
+                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update(&channel).ok()))
                                                                                },
                                                                                ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                                                                        };
@@ -1963,7 +2019,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                        // channel, not the temporary_channel_id. This is compatible with ourselves, but the
                                        // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
                                        // any messages referencing a previously-closed channel anyway.
-                                       return Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", funding_msg.channel_id, chan.force_shutdown(), None));
+                                       return Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", funding_msg.channel_id, chan.force_shutdown(true), None));
                                },
                                ChannelMonitorUpdateErr::TemporaryFailure => {
                                        // There's no problem signing a counterparty's funding transaction if our monitor
@@ -2683,7 +2739,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                                                // It looks like our counterparty went on-chain. We go ahead and
                                                                // broadcast our latest local state as well here, just in case its
                                                                // some kind of SPV attack, though we expect these to be dropped.
-                                                               failed_channels.push(channel.force_shutdown());
+                                                               failed_channels.push(channel.force_shutdown(true));
                                                                if let Ok(update) = self.get_channel_update(&channel) {
                                                                        pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
@@ -2698,11 +2754,10 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                short_to_id.remove(&short_id);
                                        }
-                                       failed_channels.push(channel.force_shutdown());
                                        // If would_broadcast_at_height() is true, the channel_monitor will broadcast
                                        // the latest local tx for us, so we should skip that here (it doesn't really
                                        // hurt anything, but does make tests a bit simpler).
-                                       failed_channels.last_mut().unwrap().0 = Vec::new();
+                                       failed_channels.push(channel.force_shutdown(false));
                                        if let Ok(update) = self.get_channel_update(&channel) {
                                                pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2718,6 +2773,18 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                }
                self.latest_block_height.store(height as usize, Ordering::Release);
                *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash;
+               loop {
+                       // Update last_node_announcement_serial to be the max of its current value and the
+                       // block timestamp. This should keep us close to the current time without relying on
+                       // having an explicit local time source.
+                       // Just in case we end up in a race, we loop until we either successfully update
+                       // last_node_announcement_serial or decide we don't need to.
+                       let old_serial = self.last_node_announcement_serial.load(Ordering::Acquire);
+                       if old_serial >= header.time as usize { break; }
+                       if self.last_node_announcement_serial.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
+                               break;
+                       }
+               }
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
@@ -2734,7 +2801,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        if let Some(short_id) = v.get_short_channel_id() {
                                                short_to_id.remove(&short_id);
                                        }
-                                       failed_channels.push(v.force_shutdown());
+                                       failed_channels.push(v.force_shutdown(true));
                                        if let Ok(update) = self.get_channel_update(&v) {
                                                pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2922,7 +2989,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                                if let Some(short_id) = chan.get_short_channel_id() {
                                                        short_to_id.remove(&short_id);
                                                }
-                                               failed_channels.push(chan.force_shutdown());
+                                               failed_channels.push(chan.force_shutdown(true));
                                                if let Ok(update) = self.get_channel_update(&chan) {
                                                        pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                msg: update
@@ -2969,6 +3036,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
+                                       &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
@@ -3061,8 +3129,8 @@ impl Writeable for PendingHTLCInfo {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for PendingHTLCInfo {
-       fn read(reader: &mut R) -> Result<PendingHTLCInfo, DecodeError> {
+impl Readable for PendingHTLCInfo {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<PendingHTLCInfo, DecodeError> {
                Ok(PendingHTLCInfo {
                        onion_packet: Readable::read(reader)?,
                        incoming_shared_secret: Readable::read(reader)?,
@@ -3090,9 +3158,9 @@ impl Writeable for HTLCFailureMsg {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for HTLCFailureMsg {
-       fn read(reader: &mut R) -> Result<HTLCFailureMsg, DecodeError> {
-               match <u8 as Readable<R>>::read(reader)? {
+impl Readable for HTLCFailureMsg {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCFailureMsg, DecodeError> {
+               match <u8 as Readable>::read(reader)? {
                        0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)),
                        1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)),
                        _ => Err(DecodeError::InvalidValue),
@@ -3116,9 +3184,9 @@ impl Writeable for PendingHTLCStatus {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for PendingHTLCStatus {
-       fn read(reader: &mut R) -> Result<PendingHTLCStatus, DecodeError> {
-               match <u8 as Readable<R>>::read(reader)? {
+impl Readable for PendingHTLCStatus {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<PendingHTLCStatus, DecodeError> {
+               match <u8 as Readable>::read(reader)? {
                        0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)),
                        1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)),
                        _ => Err(DecodeError::InvalidValue),
@@ -3150,9 +3218,9 @@ impl Writeable for HTLCSource {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for HTLCSource {
-       fn read(reader: &mut R) -> Result<HTLCSource, DecodeError> {
-               match <u8 as Readable<R>>::read(reader)? {
+impl Readable for HTLCSource {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCSource, DecodeError> {
+               match <u8 as Readable>::read(reader)? {
                        0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
                        1 => Ok(HTLCSource::OutboundRoute {
                                route: Readable::read(reader)?,
@@ -3181,9 +3249,9 @@ impl Writeable for HTLCFailReason {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for HTLCFailReason {
-       fn read(reader: &mut R) -> Result<HTLCFailReason, DecodeError> {
-               match <u8 as Readable<R>>::read(reader)? {
+impl Readable for HTLCFailReason {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCFailReason, DecodeError> {
+               match <u8 as Readable>::read(reader)? {
                        0 => Ok(HTLCFailReason::LightningError { err: Readable::read(reader)? }),
                        1 => Ok(HTLCFailReason::Reason {
                                failure_code: Readable::read(reader)?,
@@ -3213,9 +3281,9 @@ impl Writeable for HTLCForwardInfo {
        }
 }
 
-impl<R: ::std::io::Read> Readable<R> for HTLCForwardInfo {
-       fn read(reader: &mut R) -> Result<HTLCForwardInfo, DecodeError> {
-               match <u8 as Readable<R>>::read(reader)? {
+impl Readable for HTLCForwardInfo {
+       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCForwardInfo, DecodeError> {
+               match <u8 as Readable>::read(reader)? {
                        0 => Ok(HTLCForwardInfo::AddHTLC {
                                prev_short_channel_id: Readable::read(reader)?,
                                prev_htlc_id: Readable::read(reader)?,
@@ -3287,6 +3355,8 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
                        peer_state.latest_features.write(writer)?;
                }
 
+               (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+
                Ok(())
        }
 }
@@ -3352,14 +3422,29 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: 'a + ChannelKeys, M: Deref, T:
        pub channel_monitors: &'a mut HashMap<OutPoint, &'a mut ChannelMonitor<ChanSigner>>,
 }
 
-impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T: Deref, K: Deref, F: Deref>
-       ReadableArgs<R, ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (Sha256dHash, ChannelManager<ChanSigner, M, T, K, F>)
+// Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the
+// SipmleArcChannelManager type:
+impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref>
+       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (Sha256dHash, Arc<ChannelManager<ChanSigner, M, T, K, F>>)
        where M::Target: ManyChannelMonitor<ChanSigner>,
         T::Target: BroadcasterInterface,
         K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
         F::Target: FeeEstimator,
 {
-       fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>) -> Result<Self, DecodeError> {
+       fn read<R: ::std::io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>) -> Result<Self, DecodeError> {
+               let (blockhash, chan_manager) = <(Sha256dHash, ChannelManager<ChanSigner, M, T, K, F>)>::read(reader, args)?;
+               Ok((blockhash, Arc::new(chan_manager)))
+       }
+}
+
+impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref>
+       ReadableArgs<ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>> for (Sha256dHash, ChannelManager<ChanSigner, M, T, K, F>)
+       where M::Target: ManyChannelMonitor<ChanSigner>,
+        T::Target: BroadcasterInterface,
+        K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
+        F::Target: FeeEstimator,
+{
+       fn read<R: ::std::io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F>) -> Result<Self, DecodeError> {
                let _ver: u8 = Readable::read(reader)?;
                let min_ver: u8 = Readable::read(reader)?;
                if min_ver > SERIALIZATION_VERSION {
@@ -3370,7 +3455,7 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
                let latest_block_height: u32 = Readable::read(reader)?;
                let last_block_hash: Sha256dHash = Readable::read(reader)?;
 
-               let mut closed_channels = Vec::new();
+               let mut failed_htlcs = Vec::new();
 
                let channel_count: u64 = Readable::read(reader)?;
                let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
@@ -3385,13 +3470,20 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
-                               if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() ||
-                                               channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() ||
-                                               channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() ||
-                                               channel.get_latest_monitor_update_id() != monitor.get_latest_update_id() {
-                                       let mut force_close_res = channel.force_shutdown();
-                                       force_close_res.0 = monitor.get_latest_local_commitment_txn();
-                                       closed_channels.push(force_close_res);
+                               if channel.get_cur_local_commitment_transaction_number() < monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() < monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() < monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() {
+                                       // If the channel is ahead of the monitor, return InvalidValue:
+                                       return Err(DecodeError::InvalidValue);
+                               } else if channel.get_cur_local_commitment_transaction_number() > monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() > monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() > monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
+                                       // But if the channel is behind of the monitor, close the channel:
+                                       let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true);
+                                       failed_htlcs.append(&mut new_failed_htlcs);
+                                       monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster);
                                } else {
                                        if let Some(short_channel_id) = channel.get_short_channel_id() {
                                                short_to_id.insert(short_channel_id, channel.channel_id());
@@ -3405,7 +3497,7 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
 
                for (ref funding_txo, ref mut monitor) in args.channel_monitors.iter_mut() {
                        if !funding_txo_set.contains(funding_txo) {
-                               closed_channels.push((monitor.get_latest_local_commitment_txn(), Vec::new()));
+                               monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster);
                        }
                }
 
@@ -3443,6 +3535,8 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
+               let last_node_announcement_serial: u32 = Readable::read(reader)?;
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
@@ -3462,6 +3556,8 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
                        }),
                        our_network_key: args.keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
+
                        per_peer_state: RwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(Vec::new()),
@@ -3471,12 +3567,13 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>, M: Deref, T
                        default_configuration: args.default_config,
                };
 
-               for close_res in closed_channels.drain(..) {
-                       channel_manager.finish_force_close_channel(close_res);
-                       //TODO: Broadcast channel update for closed channels, but only after we've made a
-                       //connection or two.
+               for htlc_source in failed_htlcs.drain(..) {
+                       channel_manager.fail_htlc_backwards_internal(channel_manager.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
 
+               //TODO: Broadcast channel update for closed channels, but only after we've made a
+               //connection or two.
+
                Ok((last_block_hash.clone(), channel_manager))
        }
 }