Merge pull request #568 from jkczyz/2020-03-handle-error-deadlock
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 2 Apr 2020 20:06:00 +0000 (20:06 +0000)
committerGitHub <noreply@github.com>
Thu, 2 Apr 2020 20:06:00 +0000 (20:06 +0000)
Fix deadlock in ChannelManager's handle_error!()

ARCH.md [new file with mode: 0644]
README.md
lightning/src/ln/channelmanager.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/onchaintx.rs
lightning/src/ln/peer_handler.rs
lightning/src/util/macro_logger.rs

diff --git a/ARCH.md b/ARCH.md
new file mode 100644 (file)
index 0000000..dba91e7
--- /dev/null
+++ b/ARCH.md
@@ -0,0 +1,62 @@
+Rust-Lightning is broken into a number of high-level structures with APIs to hook them
+together, as well as APIs for you, the user, to provide external data.
+
+The two most important structures which nearly every application of Rust-Lightning will
+need to use are `ChannelManager` and `ChannelMonitor`. `ChannelManager` holds multiple
+channels, routes payments between them, and exposes a simple API to make and receive
+payments. Individual `ChannelMonitor`s monitor the on-chain state of a channel, punish
+counterparties if they misbehave, and force-close channels if they contain unresolved
+HTLCs which are near expiration. The `ManyChannelMonitor` API provides a way for you to
+receive `ChannelMonitorUpdate`s from `ChannelManager` and persist them to disk before the
+channel steps forward.
+
+There are two additional important structures that you may use either on the same device
+as the `ChannelManager` or on a separate one. `Router` handles receiving channel and node
+node announcements and calculates routes for sending payments. `PeerManager` handles the
+authenticated and encrypted communication protocol, monitoring for liveness of peers,
+routing messages to `ChannelManager` and `Router` instances directly, and receiving
+messages from them via the `EventsProvider` interface.
+
+These structs communicate with each other using a public API, so that you can easily add
+a proxy in between for special handling. Further, APIs for key generation, transaction
+broadcasting, block fetching, and fee estimation must be implemented and the data
+provided by you, the user.
+
+The library does not rely on the presence of a runtime environment outside of access to
+heap, atomic integers, and basic Mutex primitives. This means the library will never
+spawn threads or take any action whatsoever except when you call into it. Thus,
+`ChannelManager` and `PeerManager` have public functions which you should call on a timer,
+network reads and writes are external and provided by you, and the library relies only on
+block time for current time knowledge.
+
+At a high level, some of the common interfaces fit together as follows:
+
+
+```
+
+                     -----------------
+                     | KeysInterface |  --------------
+                     -----------------  | UserConfig |
+         --------------------       |   --------------
+  /------| MessageSendEvent |       |   |     ----------------
+ |       --------------------       |   |     | FeeEstimator |
+ |   (as MessageSendEventsProvider) |   |     ----------------
+ |                         ^        |   |    /          |      ------------------------
+ |                          \       |   |   /      ---------> | BroadcasterInterface |
+ |                           \      |   |  /      /     |     ------------------------
+ |                            \     v   v v      /      v        ^
+ |    (as                      ------------------       ----------------------
+ |    ChannelMessageHandler)-> | ChannelManager | ----> | ManyChannelMonitor |
+ v               /             ------------------       ----------------------
+--------------- /                ^         (as EventsProvider)   ^
+| PeerManager |-                 |              \     /         /
+---------------                  |        -------\---/----------
+ |              -----------------------  /        \ /
+ |              | ChainWatchInterface | -          v
+ |              -----------------------        ---------
+ |                            |                | Event |
+(as RoutingMessageHandler)    v                ---------
+  \                   ----------
+   -----------------> | Router |
+                      ----------
+```
index f6fba666c337d39d0c0e768e93e1d0fbd7a8453b..7dec713f637e23c5eb316c1ba56b7d6e93a9e724 100644 (file)
--- a/README.md
+++ b/README.md
@@ -46,6 +46,6 @@ Contributors are warmly welcome, see [CONTRIBUTING.md](CONTRIBUTING.md).
 Project Architecture
 ---------------------
 
-COMING SOON.
+For a Rust-Lightning high-level API introduction, see [ARCH.md](ARCH.md).
 
 License is Apache-2.0.
index e9dfe5a5680c60249d02954865141a8054e0cc96..a471ca3f3675fcab67677381164c7dc69dd8903a 100644 (file)
@@ -3417,10 +3417,17 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
-                               if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() ||
-                                               channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() ||
-                                               channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() ||
-                                               channel.get_latest_monitor_update_id() != monitor.get_latest_update_id() {
+                               if channel.get_cur_local_commitment_transaction_number() < monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() < monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() < monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() {
+                                       // If the channel is ahead of the monitor, return InvalidValue:
+                                       return Err(DecodeError::InvalidValue);
+                               } else if channel.get_cur_local_commitment_transaction_number() > monitor.get_cur_local_commitment_number() ||
+                                               channel.get_revoked_remote_commitment_transaction_number() > monitor.get_min_seen_secret() ||
+                                               channel.get_cur_remote_commitment_transaction_number() > monitor.get_cur_remote_commitment_number() ||
+                                               channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
+                                       // But if the channel is behind of the monitor, close the channel:
                                        let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true);
                                        failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster);
index 9b25da5b4c9fd23ce938f551f798021a17efd592..2ac113b9845eef43391f5758ebad8335f82af453 100644 (file)
@@ -290,22 +290,15 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
                        hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")),
                        hash_map::Entry::Vacant(e) => e,
                };
-               match monitor.key_storage {
-                       Storage::Local { ref funding_info, .. } => {
-                               match funding_info {
-                                       &None => {
-                                               return Err(MonitorUpdateError("Try to update a useless monitor without funding_txo !"));
-                                       },
-                                       &Some((ref outpoint, ref script)) => {
-                                               log_trace!(self, "Got new Channel Monitor for channel {}", log_bytes!(outpoint.to_channel_id()[..]));
-                                               self.chain_monitor.install_watch_tx(&outpoint.txid, script);
-                                               self.chain_monitor.install_watch_outpoint((outpoint.txid, outpoint.index as u32), script);
-                                       },
-                               }
+               match monitor.onchain_detection.funding_info {
+                       None => {
+                               return Err(MonitorUpdateError("Try to update a useless monitor without funding_txo !"));
+                       },
+                       Some((ref outpoint, ref script)) => {
+                               log_trace!(self, "Got new Channel Monitor for channel {}", log_bytes!(outpoint.to_channel_id()[..]));
+                               self.chain_monitor.install_watch_tx(&outpoint.txid, script);
+                               self.chain_monitor.install_watch_outpoint((outpoint.txid, outpoint.index as u32), script);
                        },
-                       Storage::Watchtower { .. } => {
-                               self.chain_monitor.watch_all_txn();
-                       }
                }
                for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
                        for (idx, script) in outputs.iter().enumerate() {
@@ -321,7 +314,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
                let mut monitors = self.monitors.lock().unwrap();
                match monitors.get_mut(&key) {
                        Some(orig_monitor) => {
-                               log_trace!(self, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor.key_storage));
+                               log_trace!(self, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor.onchain_detection));
                                orig_monitor.update_monitor(update, &self.broadcaster)
                        },
                        None => Err(MonitorUpdateError("No such monitor registered"))
@@ -398,44 +391,17 @@ pub(crate) const LATENCY_GRACE_PERIOD_BLOCKS: u32 = 3;
 /// keeping bumping another claim tx to solve the outpoint.
 pub(crate) const ANTI_REORG_DELAY: u32 = 6;
 
-enum Storage<ChanSigner: ChannelKeys> {
-       Local {
-               keys: ChanSigner,
-               funding_key: SecretKey,
-               revocation_base_key: SecretKey,
-               htlc_base_key: SecretKey,
-               delayed_payment_base_key: SecretKey,
-               payment_base_key: SecretKey,
-               funding_info: Option<(OutPoint, Script)>,
-               current_remote_commitment_txid: Option<Sha256dHash>,
-               prev_remote_commitment_txid: Option<Sha256dHash>,
-       },
-       Watchtower {
-               revocation_base_key: PublicKey,
-               htlc_base_key: PublicKey,
-       }
+struct OnchainDetection<ChanSigner: ChannelKeys> {
+       keys: ChanSigner,
+       funding_info: Option<(OutPoint, Script)>,
+       current_remote_commitment_txid: Option<Sha256dHash>,
+       prev_remote_commitment_txid: Option<Sha256dHash>,
 }
 
 #[cfg(any(test, feature = "fuzztarget"))]
-impl<ChanSigner: ChannelKeys> PartialEq for Storage<ChanSigner> {
+impl<ChanSigner: ChannelKeys> PartialEq for OnchainDetection<ChanSigner> {
        fn eq(&self, other: &Self) -> bool {
-               match *self {
-                       Storage::Local { ref keys, .. } => {
-                               let k = keys;
-                               match *other {
-                                       Storage::Local { ref keys, .. } => keys.pubkeys() == k.pubkeys(),
-                                       Storage::Watchtower { .. } => false,
-                               }
-                       },
-                       Storage::Watchtower {ref revocation_base_key, ref htlc_base_key} => {
-                               let (rbk, hbk) = (revocation_base_key, htlc_base_key);
-                               match *other {
-                                       Storage::Local { .. } => false,
-                                       Storage::Watchtower {ref revocation_base_key, ref htlc_base_key} =>
-                                               revocation_base_key == rbk && htlc_base_key == hbk,
-                               }
-                       },
-               }
+               self.keys.pubkeys() == other.keys.pubkeys()
        }
 }
 
@@ -766,7 +732,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
        broadcasted_remote_payment_script: Option<(Script, SecretKey)>,
        shutdown_script: Script,
 
-       key_storage: Storage<ChanSigner>,
+       onchain_detection: OnchainDetection<ChanSigner>,
        their_htlc_base_key: Option<PublicKey>,
        their_delayed_payment_base_key: Option<PublicKey>,
        funding_redeemscript: Option<Script>,
@@ -819,9 +785,9 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
        outputs_to_watch: HashMap<Sha256dHash, Vec<Script>>,
 
        #[cfg(test)]
-       pub onchain_tx_handler: OnchainTxHandler,
+       pub onchain_tx_handler: OnchainTxHandler<ChanSigner>,
        #[cfg(not(test))]
-       onchain_tx_handler: OnchainTxHandler,
+       onchain_tx_handler: OnchainTxHandler<ChanSigner>,
 
        // We simply modify last_block_hash in Channel's block_connected so that serialization is
        // consistent but hopefully the users' copy handles block_connected in a consistent way.
@@ -843,7 +809,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
                        self.destination_script != other.destination_script ||
                        self.broadcasted_local_revokable_script != other.broadcasted_local_revokable_script ||
                        self.broadcasted_remote_payment_script != other.broadcasted_remote_payment_script ||
-                       self.key_storage != other.key_storage ||
+                       self.onchain_detection != other.onchain_detection ||
                        self.their_htlc_base_key != other.their_htlc_base_key ||
                        self.their_delayed_payment_base_key != other.their_delayed_payment_base_key ||
                        self.funding_redeemscript != other.funding_redeemscript ||
@@ -903,30 +869,19 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
                }
                self.shutdown_script.write(writer)?;
 
-               match self.key_storage {
-                       Storage::Local { ref keys, ref funding_key, ref revocation_base_key, ref htlc_base_key, ref delayed_payment_base_key, ref payment_base_key, ref funding_info, ref current_remote_commitment_txid, ref prev_remote_commitment_txid } => {
-                               writer.write_all(&[0; 1])?;
-                               keys.write(writer)?;
-                               writer.write_all(&funding_key[..])?;
-                               writer.write_all(&revocation_base_key[..])?;
-                               writer.write_all(&htlc_base_key[..])?;
-                               writer.write_all(&delayed_payment_base_key[..])?;
-                               writer.write_all(&payment_base_key[..])?;
-                               match funding_info  {
-                                       &Some((ref outpoint, ref script)) => {
-                                               writer.write_all(&outpoint.txid[..])?;
-                                               writer.write_all(&byte_utils::be16_to_array(outpoint.index))?;
-                                               script.write(writer)?;
-                                       },
-                                       &None => {
-                                               debug_assert!(false, "Try to serialize a useless Local monitor !");
-                                       },
-                               }
-                               current_remote_commitment_txid.write(writer)?;
-                               prev_remote_commitment_txid.write(writer)?;
+               self.onchain_detection.keys.write(writer)?;
+               match self.onchain_detection.funding_info  {
+                       Some((ref outpoint, ref script)) => {
+                               writer.write_all(&outpoint.txid[..])?;
+                               writer.write_all(&byte_utils::be16_to_array(outpoint.index))?;
+                               script.write(writer)?;
+                       },
+                       None => {
+                               debug_assert!(false, "Try to serialize a useless Local monitor !");
                        },
-                       Storage::Watchtower { .. } => unimplemented!(),
                }
+               self.onchain_detection.current_remote_commitment_txid.write(writer)?;
+               self.onchain_detection.prev_remote_commitment_txid.write(writer)?;
 
                writer.write_all(&self.their_htlc_base_key.as_ref().unwrap().serialize())?;
                writer.write_all(&self.their_delayed_payment_base_key.as_ref().unwrap().serialize())?;
@@ -1121,13 +1076,16 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        logger: Arc<Logger>) -> ChannelMonitor<ChanSigner> {
 
                assert!(commitment_transaction_number_obscure_factor <= (1 << 48));
-               let funding_key = keys.funding_key().clone();
-               let revocation_base_key = keys.revocation_base_key().clone();
-               let htlc_base_key = keys.htlc_base_key().clone();
-               let delayed_payment_base_key = keys.delayed_payment_base_key().clone();
-               let payment_base_key = keys.payment_base_key().clone();
                let our_channel_close_key_hash = Hash160::hash(&shutdown_pubkey.serialize());
                let shutdown_script = Builder::new().push_opcode(opcodes::all::OP_PUSHBYTES_0).push_slice(&our_channel_close_key_hash[..]).into_script();
+
+               let onchain_detection = OnchainDetection {
+                       keys: keys.clone(),
+                       funding_info: Some(funding_info.clone()),
+                       current_remote_commitment_txid: None,
+                       prev_remote_commitment_txid: None,
+               };
+
                ChannelMonitor {
                        latest_update_id: 0,
                        commitment_transaction_number_obscure_factor,
@@ -1137,17 +1095,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        broadcasted_remote_payment_script: None,
                        shutdown_script,
 
-                       key_storage: Storage::Local {
-                               keys,
-                               funding_key,
-                               revocation_base_key,
-                               htlc_base_key,
-                               delayed_payment_base_key,
-                               payment_base_key,
-                               funding_info: Some(funding_info),
-                               current_remote_commitment_txid: None,
-                               prev_remote_commitment_txid: None,
-                       },
+                       onchain_detection: onchain_detection,
                        their_htlc_base_key: Some(their_htlc_base_key.clone()),
                        their_delayed_payment_base_key: Some(their_delayed_payment_base_key.clone()),
                        funding_redeemscript: Some(funding_redeemscript),
@@ -1173,7 +1121,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        onchain_events_waiting_threshold_conf: HashMap::new(),
                        outputs_to_watch: HashMap::new(),
 
-                       onchain_tx_handler: OnchainTxHandler::new(destination_script.clone(), logger.clone()),
+                       onchain_tx_handler: OnchainTxHandler::new(destination_script.clone(), keys, logger.clone()),
 
                        last_block_hash: Default::default(),
                        secp_ctx: Secp256k1::new(),
@@ -1191,11 +1139,9 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
 
                // Prune HTLCs from the previous remote commitment tx so we don't generate failure/fulfill
                // events for now-revoked/fulfilled HTLCs.
-               if let Storage::Local { ref mut prev_remote_commitment_txid, .. } = self.key_storage {
-                       if let Some(txid) = prev_remote_commitment_txid.take() {
-                               for &mut (_, ref mut source) in self.remote_claimable_outpoints.get_mut(&txid).unwrap() {
-                                       *source = None;
-                               }
+               if let Some(txid) = self.onchain_detection.prev_remote_commitment_txid.take() {
+                       for &mut (_, ref mut source) in self.remote_claimable_outpoints.get_mut(&txid).unwrap() {
+                               *source = None;
                        }
                }
 
@@ -1250,10 +1196,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                let new_txid = unsigned_commitment_tx.txid();
                log_trace!(self, "Tracking new remote commitment transaction with txid {} at commitment number {} with {} HTLC outputs", new_txid, commitment_number, htlc_outputs.len());
                log_trace!(self, "New potential remote commitment transaction: {}", encode::serialize_hex(unsigned_commitment_tx));
-               if let Storage::Local { ref mut current_remote_commitment_txid, ref mut prev_remote_commitment_txid, .. } = self.key_storage {
-                       *prev_remote_commitment_txid = current_remote_commitment_txid.take();
-                       *current_remote_commitment_txid = Some(new_txid);
-               }
+               self.onchain_detection.prev_remote_commitment_txid = self.onchain_detection.current_remote_commitment_txid.take();
+               self.onchain_detection.current_remote_commitment_txid = Some(new_txid);
                self.remote_claimable_outpoints.insert(new_txid, htlc_outputs);
                self.current_remote_commitment_number = commitment_number;
                //TODO: Merge this into the other per-remote-transaction output storage stuff
@@ -1278,18 +1222,13 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
        }
 
        pub(super) fn provide_rescue_remote_commitment_tx_info(&mut self, their_revocation_point: PublicKey) {
-               match self.key_storage {
-                       Storage::Local { ref payment_base_key, ref keys, .. } => {
-                               if let Ok(payment_key) = chan_utils::derive_public_key(&self.secp_ctx, &their_revocation_point, &keys.pubkeys().payment_basepoint) {
-                                       let to_remote_script =  Builder::new().push_opcode(opcodes::all::OP_PUSHBYTES_0)
-                                               .push_slice(&Hash160::hash(&payment_key.serialize())[..])
-                                               .into_script();
-                                       if let Ok(to_remote_key) = chan_utils::derive_private_key(&self.secp_ctx, &their_revocation_point, &payment_base_key) {
-                                               self.broadcasted_remote_payment_script = Some((to_remote_script, to_remote_key));
-                                       }
-                               }
-                       },
-                       Storage::Watchtower { .. } => {}
+               if let Ok(payment_key) = chan_utils::derive_public_key(&self.secp_ctx, &their_revocation_point, &self.onchain_detection.keys.pubkeys().payment_basepoint) {
+                       let to_remote_script =  Builder::new().push_opcode(opcodes::all::OP_PUSHBYTES_0)
+                               .push_slice(&Hash160::hash(&payment_key.serialize())[..])
+                               .into_script();
+                       if let Ok(to_remote_key) = chan_utils::derive_private_key(&self.secp_ctx, &their_revocation_point, &self.onchain_detection.keys.payment_base_key()) {
+                               self.broadcasted_remote_payment_script = Some((to_remote_script, to_remote_key));
+                       }
                }
        }
 
@@ -1395,17 +1334,10 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
 
        /// Gets the funding transaction outpoint of the channel this ChannelMonitor is monitoring for.
        pub fn get_funding_txo(&self) -> Option<OutPoint> {
-               match self.key_storage {
-                       Storage::Local { ref funding_info, .. } => {
-                               match funding_info {
-                                       &Some((outpoint, _)) => Some(outpoint),
-                                       &None => None
-                               }
-                       },
-                       Storage::Watchtower { .. } => {
-                               return None;
-                       }
+               if let Some((outp, _)) = self.onchain_detection.funding_info {
+                       return Some(outp)
                }
+               None
        }
 
        /// Gets a list of txids, with their output scripts (in the order they appear in the
@@ -1495,18 +1427,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                if commitment_number >= self.get_min_seen_secret() {
                        let secret = self.get_secret(commitment_number).unwrap();
                        let per_commitment_key = ignore_error!(SecretKey::from_slice(&secret));
-                       let (revocation_pubkey, revocation_key, b_htlc_key, local_payment_key) = match self.key_storage {
-                               Storage::Local { ref keys, ref revocation_base_key, ref payment_base_key, .. } => {
-                                       let per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key);
-                                       (ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, &per_commitment_point, &keys.pubkeys().revocation_basepoint)),
-                                       ignore_error!(chan_utils::derive_private_revocation_key(&self.secp_ctx, &per_commitment_key, &revocation_base_key)),
-                                       ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &per_commitment_point, &keys.pubkeys().htlc_basepoint)),
-                                       ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, &per_commitment_point, &payment_base_key)))
-                               },
-                               Storage::Watchtower { .. } => {
-                                       unimplemented!()
-                               },
-                       };
+                       let per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key);
+                       let revocation_pubkey = ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, &per_commitment_point, &self.onchain_detection.keys.pubkeys().revocation_basepoint));
+                       let revocation_key = ignore_error!(chan_utils::derive_private_revocation_key(&self.secp_ctx, &per_commitment_key, &self.onchain_detection.keys.revocation_base_key()));
+                       let b_htlc_key = ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &per_commitment_point, &self.onchain_detection.keys.pubkeys().htlc_basepoint));
+                       let local_payment_key = ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, &per_commitment_point, &self.onchain_detection.keys.payment_base_key()));
                        let delayed_key = ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key), &self.their_delayed_payment_base_key.unwrap()));
                        let a_htlc_key = match self.their_htlc_base_key {
                                None => return (claimable_outpoints, (commitment_txid, watch_outputs)),
@@ -1582,13 +1507,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                                }
                                        }
                                }
-                               if let Storage::Local { ref current_remote_commitment_txid, ref prev_remote_commitment_txid, .. } = self.key_storage {
-                                       if let &Some(ref txid) = current_remote_commitment_txid {
-                                               check_htlc_fails!(txid, "current");
-                                       }
-                                       if let &Some(ref txid) = prev_remote_commitment_txid {
-                                               check_htlc_fails!(txid, "remote");
-                                       }
+                               if let Some(ref txid) = self.onchain_detection.current_remote_commitment_txid {
+                                       check_htlc_fails!(txid, "current");
+                               }
+                               if let Some(ref txid) = self.onchain_detection.prev_remote_commitment_txid {
+                                       check_htlc_fails!(txid, "remote");
                                }
                                // No need to check local commitment txn, symmetric HTLCSource must be present as per-htlc data on remote commitment tx
                        }
@@ -1647,13 +1570,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                        }
                                }
                        }
-                       if let Storage::Local { ref current_remote_commitment_txid, ref prev_remote_commitment_txid, .. } = self.key_storage {
-                               if let &Some(ref txid) = current_remote_commitment_txid {
-                                       check_htlc_fails!(txid, "current", 'current_loop);
-                               }
-                               if let &Some(ref txid) = prev_remote_commitment_txid {
-                                       check_htlc_fails!(txid, "previous", 'prev_loop);
-                               }
+                       if let Some(ref txid) = self.onchain_detection.current_remote_commitment_txid {
+                               check_htlc_fails!(txid, "current", 'current_loop);
+                       }
+                       if let Some(ref txid) = self.onchain_detection.prev_remote_commitment_txid {
+                               check_htlc_fails!(txid, "previous", 'prev_loop);
                        }
 
                        if let Some(revocation_points) = self.their_cur_revocation_points {
@@ -1663,19 +1584,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                                if revocation_points.0 == commitment_number + 1 { Some(point) } else { None }
                                        } else { None };
                                if let Some(revocation_point) = revocation_point_option {
-                                       let (revocation_pubkey, b_htlc_key, htlc_privkey, local_payment_key) = match self.key_storage {
-                                               Storage::Local { ref keys, ref htlc_base_key, ref payment_base_key, .. } => {
-                                                       (ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, revocation_point, &keys.pubkeys().revocation_basepoint)),
-                                                       ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, revocation_point, &keys.pubkeys().htlc_basepoint)),
-                                                       ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, revocation_point, &htlc_base_key)),
-                                                       ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, revocation_point, &payment_base_key)))
-                                               },
-                                               Storage::Watchtower { .. } => { unimplemented!() }
-                                       };
+                                       let revocation_pubkey = ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, revocation_point, &self.onchain_detection.keys.pubkeys().revocation_basepoint));
+                                       let b_htlc_key = ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, revocation_point, &self.onchain_detection.keys.pubkeys().htlc_basepoint));
+                                       let htlc_privkey = ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, revocation_point, &self.onchain_detection.keys.htlc_base_key()));
                                        let a_htlc_key = match self.their_htlc_base_key {
                                                None => return (claimable_outpoints, (commitment_txid, watch_outputs)),
                                                Some(their_htlc_base_key) => ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, revocation_point, &their_htlc_base_key)),
                                        };
+                                       let local_payment_key = ignore_error!(chan_utils::derive_private_key(&self.secp_ctx, revocation_point, &self.onchain_detection.keys.payment_base_key()));
 
                                        self.broadcasted_remote_payment_script = {
                                                // Note that the Network here is ignored as we immediately drop the address for the
@@ -1726,13 +1642,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return (Vec::new(), None); };
                let per_commitment_key = ignore_error!(SecretKey::from_slice(&secret));
                let per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key);
-               let (revocation_pubkey, revocation_key) = match self.key_storage {
-                       Storage::Local { ref keys, ref revocation_base_key, .. } => {
-                               (ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, &per_commitment_point, &keys.pubkeys().revocation_basepoint)),
-                               ignore_error!(chan_utils::derive_private_revocation_key(&self.secp_ctx, &per_commitment_key, revocation_base_key)))
-                       },
-                       Storage::Watchtower { .. } => { unimplemented!() }
-               };
+               let revocation_pubkey = ignore_error!(chan_utils::derive_public_revocation_key(&self.secp_ctx, &per_commitment_point, &self.onchain_detection.keys.pubkeys().revocation_basepoint));
+               let revocation_key = ignore_error!(chan_utils::derive_private_revocation_key(&self.secp_ctx, &per_commitment_key, &self.onchain_detection.keys.revocation_base_key()));
                let delayed_key = match self.their_delayed_payment_base_key {
                        None => return (Vec::new(), None),
                        Some(their_delayed_payment_base_key) => ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &per_commitment_point, &their_delayed_payment_base_key)),
@@ -1745,53 +1656,51 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                (claimable_outpoints, Some((htlc_txid, tx.output.clone())))
        }
 
-       fn broadcast_by_local_state(&self, local_tx: &LocalSignedTx, delayed_payment_base_key: &SecretKey) -> (Vec<Transaction>, Vec<TxOut>, Option<(Script, SecretKey, Script)>) {
+       fn broadcast_by_local_state(&self, local_tx: &LocalSignedTx) -> (Vec<Transaction>, Vec<TxOut>, Option<(Script, SecretKey, Script)>) {
                let mut res = Vec::with_capacity(local_tx.htlc_outputs.len());
                let mut watch_outputs = Vec::with_capacity(local_tx.htlc_outputs.len());
 
                let redeemscript = chan_utils::get_revokeable_redeemscript(&local_tx.revocation_key, self.their_to_self_delay.unwrap(), &local_tx.delayed_payment_key);
-               let broadcasted_local_revokable_script = if let Ok(local_delayedkey) = chan_utils::derive_private_key(&self.secp_ctx, &local_tx.per_commitment_point, delayed_payment_base_key) {
+               let broadcasted_local_revokable_script = if let Ok(local_delayedkey) = chan_utils::derive_private_key(&self.secp_ctx, &local_tx.per_commitment_point, self.onchain_detection.keys.delayed_payment_base_key()) {
                        Some((redeemscript.to_v0_p2wsh(), local_delayedkey, redeemscript))
                } else { None };
 
-               if let &Storage::Local { ref htlc_base_key, .. } = &self.key_storage {
-                       for &(ref htlc, ref sigs, _) in local_tx.htlc_outputs.iter() {
-                               if let Some(transaction_output_index) = htlc.transaction_output_index {
-                                       if let &Some(ref their_sig) = sigs {
-                                               if htlc.offered {
-                                                       log_trace!(self, "Broadcasting HTLC-Timeout transaction against local commitment transactions");
-                                                       let mut htlc_timeout_tx = chan_utils::build_htlc_transaction(&local_tx.txid, local_tx.feerate_per_kw, self.their_to_self_delay.unwrap(), htlc, &local_tx.delayed_payment_key, &local_tx.revocation_key);
+               for &(ref htlc, ref sigs, _) in local_tx.htlc_outputs.iter() {
+                       if let Some(transaction_output_index) = htlc.transaction_output_index {
+                               if let &Some(ref their_sig) = sigs {
+                                       if htlc.offered {
+                                               log_trace!(self, "Broadcasting HTLC-Timeout transaction against local commitment transactions");
+                                               let mut htlc_timeout_tx = chan_utils::build_htlc_transaction(&local_tx.txid, local_tx.feerate_per_kw, self.their_to_self_delay.unwrap(), htlc, &local_tx.delayed_payment_key, &local_tx.revocation_key);
+                                               let (our_sig, htlc_script) = match
+                                                               chan_utils::sign_htlc_transaction(&mut htlc_timeout_tx, their_sig, &None, htlc, &local_tx.a_htlc_key, &local_tx.b_htlc_key, &local_tx.revocation_key, &local_tx.per_commitment_point, &self.onchain_detection.keys.htlc_base_key(), &self.secp_ctx) {
+                                                       Ok(res) => res,
+                                                       Err(_) => continue,
+                                               };
+
+                                               let mut per_input_material = HashMap::with_capacity(1);
+                                               per_input_material.insert(htlc_timeout_tx.input[0].previous_output, InputMaterial::LocalHTLC { witness_script: htlc_script, sigs: (*their_sig, our_sig), preimage: None, amount: htlc.amount_msat / 1000});
+                                               //TODO: with option_simplified_commitment track outpoint too
+                                               log_trace!(self, "Outpoint {}:{} is being being claimed", htlc_timeout_tx.input[0].previous_output.vout, htlc_timeout_tx.input[0].previous_output.txid);
+                                               res.push(htlc_timeout_tx);
+                                       } else {
+                                               if let Some(payment_preimage) = self.payment_preimages.get(&htlc.payment_hash) {
+                                                       log_trace!(self, "Broadcasting HTLC-Success transaction against local commitment transactions");
+                                                       let mut htlc_success_tx = chan_utils::build_htlc_transaction(&local_tx.txid, local_tx.feerate_per_kw, self.their_to_self_delay.unwrap(), htlc, &local_tx.delayed_payment_key, &local_tx.revocation_key);
                                                        let (our_sig, htlc_script) = match
-                                                                       chan_utils::sign_htlc_transaction(&mut htlc_timeout_tx, their_sig, &None, htlc, &local_tx.a_htlc_key, &local_tx.b_htlc_key, &local_tx.revocation_key, &local_tx.per_commitment_point, htlc_base_key, &self.secp_ctx) {
+                                                                       chan_utils::sign_htlc_transaction(&mut htlc_success_tx, their_sig, &Some(*payment_preimage), htlc, &local_tx.a_htlc_key, &local_tx.b_htlc_key, &local_tx.revocation_key, &local_tx.per_commitment_point, &self.onchain_detection.keys.htlc_base_key(), &self.secp_ctx) {
                                                                Ok(res) => res,
                                                                Err(_) => continue,
                                                        };
 
                                                        let mut per_input_material = HashMap::with_capacity(1);
-                                                       per_input_material.insert(htlc_timeout_tx.input[0].previous_output, InputMaterial::LocalHTLC { witness_script: htlc_script, sigs: (*their_sig, our_sig), preimage: None, amount: htlc.amount_msat / 1000});
+                                                       per_input_material.insert(htlc_success_tx.input[0].previous_output, InputMaterial::LocalHTLC { witness_script: htlc_script, sigs: (*their_sig, our_sig), preimage: Some(*payment_preimage), amount: htlc.amount_msat / 1000});
                                                        //TODO: with option_simplified_commitment track outpoint too
-                                                       log_trace!(self, "Outpoint {}:{} is being being claimed", htlc_timeout_tx.input[0].previous_output.vout, htlc_timeout_tx.input[0].previous_output.txid);
-                                                       res.push(htlc_timeout_tx);
-                                               } else {
-                                                       if let Some(payment_preimage) = self.payment_preimages.get(&htlc.payment_hash) {
-                                                               log_trace!(self, "Broadcasting HTLC-Success transaction against local commitment transactions");
-                                                               let mut htlc_success_tx = chan_utils::build_htlc_transaction(&local_tx.txid, local_tx.feerate_per_kw, self.their_to_self_delay.unwrap(), htlc, &local_tx.delayed_payment_key, &local_tx.revocation_key);
-                                                               let (our_sig, htlc_script) = match
-                                                                               chan_utils::sign_htlc_transaction(&mut htlc_success_tx, their_sig, &Some(*payment_preimage), htlc, &local_tx.a_htlc_key, &local_tx.b_htlc_key, &local_tx.revocation_key, &local_tx.per_commitment_point, htlc_base_key, &self.secp_ctx) {
-                                                                       Ok(res) => res,
-                                                                       Err(_) => continue,
-                                                               };
-
-                                                               let mut per_input_material = HashMap::with_capacity(1);
-                                                               per_input_material.insert(htlc_success_tx.input[0].previous_output, InputMaterial::LocalHTLC { witness_script: htlc_script, sigs: (*their_sig, our_sig), preimage: Some(*payment_preimage), amount: htlc.amount_msat / 1000});
-                                                               //TODO: with option_simplified_commitment track outpoint too
-                                                               log_trace!(self, "Outpoint {}:{} is being being claimed", htlc_success_tx.input[0].previous_output.vout, htlc_success_tx.input[0].previous_output.txid);
-                                                               res.push(htlc_success_tx);
-                                                       }
+                                                       log_trace!(self, "Outpoint {}:{} is being being claimed", htlc_success_tx.input[0].previous_output.vout, htlc_success_tx.input[0].previous_output.txid);
+                                                       res.push(htlc_success_tx);
                                                }
-                                               watch_outputs.push(local_tx.tx.without_valid_witness().output[transaction_output_index as usize].clone());
-                                       } else { panic!("Should have sigs for non-dust local tx outputs!") }
-                               }
+                                       }
+                                       watch_outputs.push(local_tx.tx.without_valid_witness().output[transaction_output_index as usize].clone());
+                               } else { panic!("Should have sigs for non-dust local tx outputs!") }
                        }
                }
 
@@ -1840,38 +1749,12 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                // HTLCs set may differ between last and previous local commitment txn, in case of one them hitting chain, ensure we cancel all HTLCs backward
                let mut is_local_tx = false;
 
-               if let &mut Some(ref mut local_tx) = &mut self.current_local_signed_commitment_tx {
-                       if local_tx.txid == commitment_txid {
-                               match self.key_storage {
-                                       Storage::Local { ref funding_key, .. } => {
-                                               local_tx.tx.add_local_sig(funding_key, self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
-                                       },
-                                       _ => {},
-                               }
-                       }
-               }
                if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx {
                        if local_tx.txid == commitment_txid {
                                is_local_tx = true;
                                log_trace!(self, "Got latest local commitment tx broadcast, searching for available HTLCs to claim");
-                               assert!(local_tx.tx.has_local_sig());
-                               match self.key_storage {
-                                       Storage::Local { ref delayed_payment_base_key, .. } => {
-                                               let mut res = self.broadcast_by_local_state(local_tx, delayed_payment_base_key);
-                                               append_onchain_update!(res);
-                                       },
-                                       Storage::Watchtower { .. } => { }
-                               }
-                       }
-               }
-               if let &mut Some(ref mut local_tx) = &mut self.prev_local_signed_commitment_tx {
-                       if local_tx.txid == commitment_txid {
-                               match self.key_storage {
-                                       Storage::Local { ref funding_key, .. } => {
-                                               local_tx.tx.add_local_sig(funding_key, self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
-                                       },
-                                       _ => {},
-                               }
+                               let mut res = self.broadcast_by_local_state(local_tx);
+                               append_onchain_update!(res);
                        }
                }
                if let &Some(ref local_tx) = &self.prev_local_signed_commitment_tx {
@@ -1879,13 +1762,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                is_local_tx = true;
                                log_trace!(self, "Got previous local commitment tx broadcast, searching for available HTLCs to claim");
                                assert!(local_tx.tx.has_local_sig());
-                               match self.key_storage {
-                                       Storage::Local { ref delayed_payment_base_key, .. } => {
-                                               let mut res = self.broadcast_by_local_state(local_tx, delayed_payment_base_key);
-                                               append_onchain_update!(res);
-                                       },
-                                       Storage::Watchtower { .. } => { }
-                               }
+                               let mut res = self.broadcast_by_local_state(local_tx);
+                               append_onchain_update!(res);
                        }
                }
 
@@ -1928,23 +1806,13 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                // tracking state and panic!()ing if we get an update after force-closure/local-tx signing.
                log_trace!(self, "Getting signed latest local commitment transaction!");
                if let &mut Some(ref mut local_tx) = &mut self.current_local_signed_commitment_tx {
-                       match self.key_storage {
-                               Storage::Local { ref funding_key, .. } => {
-                                       local_tx.tx.add_local_sig(funding_key, self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
-                               },
-                               _ => {},
-                       }
+                       local_tx.tx.add_local_sig(&self.onchain_detection.keys.funding_key(), self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
                }
                if let &Some(ref local_tx) = &self.current_local_signed_commitment_tx {
                        let mut res = vec![local_tx.tx.with_valid_witness().clone()];
-                       match self.key_storage {
-                               Storage::Local { ref delayed_payment_base_key, .. } => {
-                                       res.append(&mut self.broadcast_by_local_state(local_tx, delayed_payment_base_key).0);
-                                       // We throw away the generated waiting_first_conf data as we aren't (yet) confirmed and we don't actually know what the caller wants to do.
-                                       // The data will be re-generated and tracked in check_spend_local_transaction if we get a confirmation.
-                               },
-                               _ => panic!("Can only broadcast by local channelmonitor"),
-                       };
+                       res.append(&mut self.broadcast_by_local_state(local_tx).0);
+                       // We throw away the generated waiting_first_conf data as we aren't (yet) confirmed and we don't actually know what the caller wants to do.
+                       // The data will be re-generated and tracked in check_spend_local_transaction if we get a confirmation.
                        res
                } else {
                        Vec::new()
@@ -1979,14 +1847,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                // which is an easy way to filter out any potential non-matching txn for lazy
                                // filters.
                                let prevout = &tx.input[0].previous_output;
-                               let funding_txo = match self.key_storage {
-                                       Storage::Local { ref funding_info, .. } => {
-                                               funding_info.clone()
-                                       }
-                                       Storage::Watchtower { .. } => {
-                                               unimplemented!();
-                                       }
-                               };
+                               let funding_txo = self.onchain_detection.funding_info.clone();
                                if funding_txo.is_none() || (prevout.txid == funding_txo.as_ref().unwrap().0.txid && prevout.vout == funding_txo.as_ref().unwrap().0.index as u32) {
                                        if (tx.input[0].sequence >> 8*3) as u8 == 0x80 && (tx.lock_time >> 8*3) as u8 == 0x20 {
                                                let (mut new_outpoints, new_outputs) = self.check_spend_remote_transaction(&tx, height);
@@ -2027,30 +1888,20 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                } else { false };
                if let Some(ref mut cur_local_tx) = self.current_local_signed_commitment_tx {
                        if should_broadcast {
-                               match self.key_storage {
-                                       Storage::Local { ref funding_key, .. } => {
-                                               cur_local_tx.tx.add_local_sig(funding_key, self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
-                                       },
-                                       _ => {}
-                               }
+                               cur_local_tx.tx.add_local_sig(&self.onchain_detection.keys.funding_key(), self.funding_redeemscript.as_ref().unwrap(), self.channel_value_satoshis.unwrap(), &self.secp_ctx);
                        }
                }
                if let Some(ref cur_local_tx) = self.current_local_signed_commitment_tx {
                        if should_broadcast {
                                log_trace!(self, "Broadcast onchain {}", log_tx!(cur_local_tx.tx.with_valid_witness()));
                                broadcaster.broadcast_transaction(&cur_local_tx.tx.with_valid_witness());
-                               match self.key_storage {
-                                       Storage::Local { ref delayed_payment_base_key, .. } => {
-                                               let (txs, new_outputs, _) = self.broadcast_by_local_state(&cur_local_tx, delayed_payment_base_key);
-                                               if !new_outputs.is_empty() {
-                                                       watch_outputs.push((cur_local_tx.txid.clone(), new_outputs));
-                                               }
-                                               for tx in txs {
-                                                       log_trace!(self, "Broadcast onchain {}", log_tx!(tx));
-                                                       broadcaster.broadcast_transaction(&tx);
-                                               }
-                                       },
-                                       Storage::Watchtower { .. } => { },
+                               let (txs, new_outputs, _) = self.broadcast_by_local_state(&cur_local_tx);
+                               if !new_outputs.is_empty() {
+                                       watch_outputs.push((cur_local_tx.txid.clone(), new_outputs));
+                               }
+                               for tx in txs {
+                                       log_trace!(self, "Broadcast onchain {}", log_tx!(tx));
+                                       broadcaster.broadcast_transaction(&tx);
                                }
                        }
                }
@@ -2151,16 +2002,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        scan_commitment!(cur_local_tx.htlc_outputs.iter().map(|&(ref a, _, _)| a), true);
                }
 
-               if let Storage::Local { ref current_remote_commitment_txid, ref prev_remote_commitment_txid, .. } = self.key_storage {
-                       if let &Some(ref txid) = current_remote_commitment_txid {
-                               if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
-                                       scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
-                               }
+               if let Some(ref txid) = self.onchain_detection.current_remote_commitment_txid {
+                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
+                               scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
                        }
-                       if let &Some(ref txid) = prev_remote_commitment_txid {
-                               if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
-                                       scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
-                               }
+               }
+               if let Some(ref txid) = self.onchain_detection.prev_remote_commitment_txid {
+                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
+                               scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
                        }
                }
 
@@ -2201,7 +2050,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
 
                        macro_rules! check_htlc_valid_remote {
                                ($remote_txid: expr, $htlc_output: expr) => {
-                                       if let &Some(txid) = $remote_txid {
+                                       if let Some(txid) = $remote_txid {
                                                for &(ref pending_htlc, ref pending_source) in self.remote_claimable_outpoints.get(&txid).unwrap() {
                                                        if pending_htlc.payment_hash == $htlc_output.payment_hash && pending_htlc.amount_msat == $htlc_output.amount_msat {
                                                                if let &Some(ref source) = pending_source {
@@ -2228,13 +2077,9 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                                                // resolve the source HTLC with the original sender.
                                                                payment_data = Some(((*source).clone(), htlc_output.payment_hash));
                                                        } else if !$local_tx {
-                                                               if let Storage::Local { ref current_remote_commitment_txid, .. } = self.key_storage {
-                                                                       check_htlc_valid_remote!(current_remote_commitment_txid, htlc_output);
-                                                               }
+                                                                       check_htlc_valid_remote!(self.onchain_detection.current_remote_commitment_txid, htlc_output);
                                                                if payment_data.is_none() {
-                                                                       if let Storage::Local { ref prev_remote_commitment_txid, .. } = self.key_storage {
-                                                                               check_htlc_valid_remote!(prev_remote_commitment_txid, htlc_output);
-                                                                       }
+                                                                       check_htlc_valid_remote!(self.onchain_detection.prev_remote_commitment_txid, htlc_output);
                                                                }
                                                        }
                                                        if payment_data.is_none() {
@@ -2405,36 +2250,23 @@ impl<ChanSigner: ChannelKeys + Readable> ReadableArgs<Arc<Logger>> for (Sha256dH
                };
                let shutdown_script = Readable::read(reader)?;
 
-               let key_storage = match <u8 as Readable>::read(reader)? {
-                       0 => {
-                               let keys = Readable::read(reader)?;
-                               let funding_key = Readable::read(reader)?;
-                               let revocation_base_key = Readable::read(reader)?;
-                               let htlc_base_key = Readable::read(reader)?;
-                               let delayed_payment_base_key = Readable::read(reader)?;
-                               let payment_base_key = Readable::read(reader)?;
-                               // Technically this can fail and serialize fail a round-trip, but only for serialization of
-                               // barely-init'd ChannelMonitors that we can't do anything with.
-                               let outpoint = OutPoint {
-                                       txid: Readable::read(reader)?,
-                                       index: Readable::read(reader)?,
-                               };
-                               let funding_info = Some((outpoint, Readable::read(reader)?));
-                               let current_remote_commitment_txid = Readable::read(reader)?;
-                               let prev_remote_commitment_txid = Readable::read(reader)?;
-                               Storage::Local {
-                                       keys,
-                                       funding_key,
-                                       revocation_base_key,
-                                       htlc_base_key,
-                                       delayed_payment_base_key,
-                                       payment_base_key,
-                                       funding_info,
-                                       current_remote_commitment_txid,
-                                       prev_remote_commitment_txid,
-                               }
-                       },
-                       _ => return Err(DecodeError::InvalidValue),
+               let onchain_detection = {
+                       let keys = Readable::read(reader)?;
+                       // Technically this can fail and serialize fail a round-trip, but only for serialization of
+                       // barely-init'd ChannelMonitors that we can't do anything with.
+                       let outpoint = OutPoint {
+                               txid: Readable::read(reader)?,
+                               index: Readable::read(reader)?,
+                       };
+                       let funding_info = Some((outpoint, Readable::read(reader)?));
+                       let current_remote_commitment_txid = Readable::read(reader)?;
+                       let prev_remote_commitment_txid = Readable::read(reader)?;
+                       OnchainDetection {
+                               keys,
+                               funding_info,
+                               current_remote_commitment_txid,
+                               prev_remote_commitment_txid,
+                       }
                };
 
                let their_htlc_base_key = Some(Readable::read(reader)?);
@@ -2645,7 +2477,7 @@ impl<ChanSigner: ChannelKeys + Readable> ReadableArgs<Arc<Logger>> for (Sha256dH
                        broadcasted_remote_payment_script,
                        shutdown_script,
 
-                       key_storage,
+                       onchain_detection,
                        their_htlc_base_key,
                        their_delayed_payment_base_key,
                        funding_redeemscript,
@@ -2903,7 +2735,7 @@ mod tests {
                for (idx, inp) in claim_tx.input.iter_mut().zip(inputs_des.iter()).enumerate() {
                        sign_input!(sighash_parts, inp.0, idx as u32, 0, inp.1, sum_actual_sigs);
                }
-               assert_eq!(base_weight + OnchainTxHandler::get_witnesses_weight(&inputs_des[..]),  claim_tx.get_weight() + /* max_length_sig */ (73 * inputs_des.len() - sum_actual_sigs));
+               assert_eq!(base_weight + OnchainTxHandler::<InMemoryChannelKeys>::get_witnesses_weight(&inputs_des[..]),  claim_tx.get_weight() + /* max_length_sig */ (73 * inputs_des.len() - sum_actual_sigs));
 
                // Claim tx with 1 offered HTLCs, 3 received HTLCs
                claim_tx.input.clear();
@@ -2925,7 +2757,7 @@ mod tests {
                for (idx, inp) in claim_tx.input.iter_mut().zip(inputs_des.iter()).enumerate() {
                        sign_input!(sighash_parts, inp.0, idx as u32, 0, inp.1, sum_actual_sigs);
                }
-               assert_eq!(base_weight + OnchainTxHandler::get_witnesses_weight(&inputs_des[..]),  claim_tx.get_weight() + /* max_length_sig */ (73 * inputs_des.len() - sum_actual_sigs));
+               assert_eq!(base_weight + OnchainTxHandler::<InMemoryChannelKeys>::get_witnesses_weight(&inputs_des[..]),  claim_tx.get_weight() + /* max_length_sig */ (73 * inputs_des.len() - sum_actual_sigs));
 
                // Justice tx with 1 revoked HTLC-Success tx output
                claim_tx.input.clear();
@@ -2945,7 +2777,7 @@ mod tests {
                for (idx, inp) in claim_tx.input.iter_mut().zip(inputs_des.iter()).enumerate() {
                        sign_input!(sighash_parts, inp.0, idx as u32, 0, inp.1, sum_actual_sigs);
                }
-               assert_eq!(base_weight + OnchainTxHandler::get_witnesses_weight(&inputs_des[..]), claim_tx.get_weight() + /* max_length_isg */ (73 * inputs_des.len() - sum_actual_sigs));
+               assert_eq!(base_weight + OnchainTxHandler::<InMemoryChannelKeys>::get_witnesses_weight(&inputs_des[..]), claim_tx.get_weight() + /* max_length_isg */ (73 * inputs_des.len() - sum_actual_sigs));
        }
 
        // Further testing is done in the ChannelManager integration tests.
index 76749c3b37ad44c18df96c4ff476450247b37ddc..eb1194c0112adcbea3a158f59e8d6c9bf5273cc0 100644 (file)
@@ -3909,6 +3909,13 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
        create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::supported(), InitFeatures::supported());
        let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3, InitFeatures::supported(), InitFeatures::supported());
 
+       let mut node_0_stale_monitors_serialized = Vec::new();
+       for monitor in nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter() {
+               let mut writer = test_utils::TestVecWriter(Vec::new());
+               monitor.1.write_for_disk(&mut writer).unwrap();
+               node_0_stale_monitors_serialized.push(writer.0);
+       }
+
        let (our_payment_preimage, _) = route_payment(&nodes[2], &[&nodes[0], &nodes[1]], 1000000);
 
        // Serialize the ChannelManager here, but the monitor we keep up-to-date
@@ -3931,6 +3938,15 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
        new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()), &fee_estimator);
        nodes[0].chan_monitor = &new_chan_monitor;
+
+       let mut node_0_stale_monitors = Vec::new();
+       for serialized in node_0_stale_monitors_serialized.iter() {
+               let mut read = &serialized[..];
+               let (_, monitor) = <(Sha256dHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut read, Arc::new(test_utils::TestLogger::new())).unwrap();
+               assert!(read.is_empty());
+               node_0_stale_monitors.push(monitor);
+       }
+
        let mut node_0_monitors = Vec::new();
        for serialized in node_0_monitors_serialized.iter() {
                let mut read = &serialized[..];
@@ -3939,9 +3955,25 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
                node_0_monitors.push(monitor);
        }
 
-       let mut nodes_0_read = &nodes_0_serialized[..];
        keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()));
-       let (_, nodes_0_deserialized_tmp) = <(Sha256dHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+
+       let mut nodes_0_read = &nodes_0_serialized[..];
+       if let Err(msgs::DecodeError::InvalidValue) =
+               <(Sha256dHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
+               default_config: UserConfig::default(),
+               keys_manager: &keys_manager,
+               fee_estimator: &fee_estimator,
+               monitor: nodes[0].chan_monitor,
+               tx_broadcaster: nodes[0].tx_broadcaster.clone(),
+               logger: Arc::new(test_utils::TestLogger::new()),
+               channel_monitors: &mut node_0_stale_monitors.iter_mut().map(|monitor| { (monitor.get_funding_txo().unwrap(), monitor) }).collect(),
+       }) { } else {
+               panic!("If the monitor(s) are stale, this indicates a bug and we should get an Err return");
+       };
+
+       let mut nodes_0_read = &nodes_0_serialized[..];
+       let (_, nodes_0_deserialized_tmp) =
+               <(Sha256dHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
                default_config: UserConfig::default(),
                keys_manager: &keys_manager,
                fee_estimator: &fee_estimator,
index 8323a58fb5cb231196d707197ebd389d46cfc2e9..02dc2ebc50b69644db51374219def8137151d3ec 100644 (file)
@@ -17,6 +17,7 @@ use ln::msgs::DecodeError;
 use ln::channelmonitor::{ANTI_REORG_DELAY, CLTV_SHARED_CLAIM_BUFFER, InputMaterial, ClaimRequest};
 use ln::chan_utils::HTLCType;
 use chain::chaininterface::{FeeEstimator, BroadcasterInterface, ConfirmationTarget, MIN_RELAY_FEE_SAT_PER_1000_WEIGHT};
+use chain::keysinterface::ChannelKeys;
 use util::logger::Logger;
 use util::ser::{ReadableArgs, Readable, Writer, Writeable};
 use util::byte_utils;
@@ -138,10 +139,11 @@ macro_rules! subtract_high_prio_fee {
 
 /// OnchainTxHandler receives claiming requests, aggregates them if it's sound, broadcast and
 /// do RBF bumping if possible.
-#[derive(Clone)]
-pub struct OnchainTxHandler {
+pub struct OnchainTxHandler<ChanSigner: ChannelKeys> {
        destination_script: Script,
 
+       key_storage: ChanSigner,
+
        // Used to track claiming requests. If claim tx doesn't confirm before height timer expiration we need to bump
        // it (RBF or CPFP). If an input has been part of an aggregate tx at first claim try, we need to keep it within
        // another bumped aggregate tx to comply with RBF rules. We may have multiple claiming txn in the flight for the
@@ -175,10 +177,12 @@ pub struct OnchainTxHandler {
        logger: Arc<Logger>
 }
 
-impl Writeable for OnchainTxHandler {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+impl<ChanSigner: ChannelKeys + Writeable> OnchainTxHandler<ChanSigner> {
+       pub(crate) fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                self.destination_script.write(writer)?;
 
+               self.key_storage.write(writer)?;
+
                writer.write_all(&byte_utils::be64_to_array(self.pending_claim_requests.len() as u64))?;
                for (ref ancestor_claim_txid, claim_tx_data) in self.pending_claim_requests.iter() {
                        ancestor_claim_txid.write(writer)?;
@@ -214,10 +218,12 @@ impl Writeable for OnchainTxHandler {
        }
 }
 
-impl ReadableArgs<Arc<Logger>> for OnchainTxHandler {
+impl<ChanSigner: ChannelKeys + Readable> ReadableArgs<Arc<Logger>> for OnchainTxHandler<ChanSigner> {
        fn read<R: ::std::io::Read>(reader: &mut R, logger: Arc<Logger>) -> Result<Self, DecodeError> {
                let destination_script = Readable::read(reader)?;
 
+               let key_storage = Readable::read(reader)?;
+
                let pending_claim_requests_len: u64 = Readable::read(reader)?;
                let mut pending_claim_requests = HashMap::with_capacity(cmp::min(pending_claim_requests_len as usize, MAX_ALLOC_SIZE / 128));
                for _ in 0..pending_claim_requests_len {
@@ -263,6 +269,7 @@ impl ReadableArgs<Arc<Logger>> for OnchainTxHandler {
 
                Ok(OnchainTxHandler {
                        destination_script,
+                       key_storage,
                        claimable_outpoints,
                        pending_claim_requests,
                        onchain_events_waiting_threshold_conf,
@@ -272,10 +279,14 @@ impl ReadableArgs<Arc<Logger>> for OnchainTxHandler {
        }
 }
 
-impl OnchainTxHandler {
-       pub(super) fn new(destination_script: Script, logger: Arc<Logger>) -> Self {
+impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
+       pub(super) fn new(destination_script: Script, keys: ChanSigner, logger: Arc<Logger>) -> Self {
+
+               let key_storage = keys;
+
                OnchainTxHandler {
                        destination_script,
+                       key_storage,
                        pending_claim_requests: HashMap::new(),
                        claimable_outpoints: HashMap::new(),
                        onchain_events_waiting_threshold_conf: HashMap::new(),
index 12cf937bc0f07785fbb58d93db58355dbbf0dce9..fc6d1cbcda13883864382bb0cb07d35e6c3e1d32 100644 (file)
@@ -354,7 +354,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
                                                let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
+                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
                                                for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
                                                        encode_and_send_msg!(announce);
                                                        encode_and_send_msg!(update_a);
@@ -1149,8 +1149,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
 
 #[cfg(test)]
 mod tests {
+       use secp256k1::Signature;
+       use bitcoin::BitcoinHash;
+       use bitcoin::network::constants::Network;
+       use bitcoin::blockdata::constants::genesis_block;
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
        use ln::msgs;
+       use ln::features::ChannelFeatures;
        use util::events;
        use util::test_utils;
        use util::logger::Logger;
@@ -1161,7 +1166,9 @@ mod tests {
        use rand::{thread_rng, Rng};
 
        use std;
+       use std::cmp::min;
        use std::sync::{Arc, Mutex};
+       use std::sync::atomic::{AtomicUsize, Ordering};
 
        #[derive(Clone)]
        struct FileDescriptor {
@@ -1199,7 +1206,7 @@ mod tests {
                chan_handlers
        }
 
-       fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
+       fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>, routing_handlers: Option<&'a Vec<Arc<msgs::RoutingMessageHandler>>>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
                let mut peers = Vec::new();
                let mut rng = thread_rng();
                let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
@@ -1207,13 +1214,15 @@ mod tests {
                rng.fill_bytes(&mut ephemeral_bytes);
 
                for i in 0..peer_count {
-                       let router = test_utils::TestRoutingMessageHandler::new();
+                       let router = if let Some(routers) = routing_handlers { routers[i].clone() } else {
+                               Arc::new(test_utils::TestRoutingMessageHandler::new())
+                       };
                        let node_id = {
                                let mut key_slice = [0;32];
                                rng.fill_bytes(&mut key_slice);
                                SecretKey::from_slice(&key_slice).unwrap()
                        };
-                       let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: Arc::new(router) };
+                       let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: router };
                        let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger));
                        peers.push(peer);
                }
@@ -1221,7 +1230,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) -> (FileDescriptor, FileDescriptor) {
                let secp_ctx = Secp256k1::new();
                let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
                let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
@@ -1231,6 +1240,7 @@ mod tests {
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               (fd_a.clone(), fd_b.clone())
        }
 
        #[test]
@@ -1239,7 +1249,7 @@ mod tests {
                // push a DisconnectPeer event to remove the node flagged by id
                let chan_handlers = create_chan_handlers(2);
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               let mut peers = create_network(2, &chan_handlers);
+               let mut peers = create_network(2, &chan_handlers, None);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1256,11 +1266,12 @@ mod tests {
                peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
+
        #[test]
-       fn test_timer_tick_occured(){
+       fn test_timer_tick_occurred() {
                // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
                let chan_handlers = create_chan_handlers(2);
-               let peers = create_network(2, &chan_handlers);
+               let peers = create_network(2, &chan_handlers, None);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1272,4 +1283,138 @@ mod tests {
                peers[0].timer_tick_occured();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
+
+       pub struct TestRoutingMessageHandler {
+               pub chan_upds_recvd: AtomicUsize,
+               pub chan_anns_recvd: AtomicUsize,
+               pub chan_anns_sent: AtomicUsize,
+       }
+
+       impl TestRoutingMessageHandler {
+               pub fn new() -> Self {
+                       TestRoutingMessageHandler {
+                               chan_upds_recvd: AtomicUsize::new(0),
+                               chan_anns_recvd: AtomicUsize::new(0),
+                               chan_anns_sent: AtomicUsize::new(0),
+                       }
+               }
+
+       }
+       impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
+               fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
+                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+               }
+               fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
+                       self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
+                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+               }
+               fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
+                       self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
+                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+               }
+               fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
+               fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
+                       let mut chan_anns = Vec::new();
+                       const TOTAL_UPDS: u64 = 100;
+                       let end: u64 =  min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
+                       for i in starting_point..end {
+                               let chan_upd_1 = get_dummy_channel_update(i);
+                               let chan_upd_2 = get_dummy_channel_update(i);
+                               let chan_ann = get_dummy_channel_announcement(i);
+
+                               chan_anns.push((chan_ann, chan_upd_1, chan_upd_2));
+                       }
+
+                       self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
+                       chan_anns
+               }
+
+               fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
+                       Vec::new()
+               }
+
+               fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
+                       true
+               }
+       }
+
+       fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
+               use secp256k1::ffi::Signature as FFISignature;
+               let secp_ctx = Secp256k1::new();
+               let network = Network::Testnet;
+               let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
+               let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
+               let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
+               let unsigned_ann = msgs::UnsignedChannelAnnouncement {
+                       features: ChannelFeatures::supported(),
+                       chain_hash: genesis_block(network).header.bitcoin_hash(),
+                       short_channel_id: short_chan_id,
+                       node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
+                       node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
+                       bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
+                       bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
+                       excess_data: Vec::new(),
+               };
+
+               msgs::ChannelAnnouncement {
+                       node_signature_1: Signature::from(FFISignature::new()),
+                       node_signature_2: Signature::from(FFISignature::new()),
+                       bitcoin_signature_1: Signature::from(FFISignature::new()),
+                       bitcoin_signature_2: Signature::from(FFISignature::new()),
+                       contents: unsigned_ann,
+               }
+       }
+
+       fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
+               use secp256k1::ffi::Signature as FFISignature;
+               let network = Network::Testnet;
+               msgs::ChannelUpdate {
+                       signature: Signature::from(FFISignature::new()),
+                       contents: msgs::UnsignedChannelUpdate {
+                               chain_hash: genesis_block(network).header.bitcoin_hash(),
+                               short_channel_id: short_chan_id,
+                               timestamp: 0,
+                               flags: 0,
+                               cltv_expiry_delta: 0,
+                               htlc_minimum_msat: 0,
+                               fee_base_msat: 0,
+                               fee_proportional_millionths: 0,
+                               excess_data: vec![],
+                       }
+               }
+       }
+
+       #[test]
+       fn test_do_attempt_write_data() {
+               // Create 2 peers with custom TestRoutingMessageHandlers and connect them.
+               let chan_handlers = create_chan_handlers(2);
+               let mut routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = Vec::new();
+               let mut routing_handlers_concrete: Vec<Arc<TestRoutingMessageHandler>> = Vec::new();
+               for _ in 0..2 {
+                       let routing_handler = Arc::new(TestRoutingMessageHandler::new());
+                       routing_handlers.push(routing_handler.clone());
+                       routing_handlers_concrete.push(routing_handler.clone());
+               }
+               let peers = create_network(2, &chan_handlers, Some(&routing_handlers));
+
+               // By calling establish_connect, we trigger do_attempt_write_data between
+               // the peers. Previously this function would mistakenly enter an infinite loop
+               // when there were more channel messages available than could fit into a peer's
+               // buffer. This issue would now be detected by this test (because we use custom
+               // RoutingMessageHandlers that intentionally return more channel messages
+               // than can fit into a peer's buffer).
+               let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
+
+               // Make each peer to read the messages that the other peer just wrote to them.
+               peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
+               peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
+
+               // Check that each peer has received the expected number of channel updates and channel
+               // announcements.
+               assert_eq!(routing_handlers_concrete[0].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(routing_handlers_concrete[0].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(routing_handlers_concrete[1].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(routing_handlers_concrete[1].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
+       }
 }
index 5a70241428bacf3d974d86bc96b349ec37ac544f..d16bd48aebb53a77ca39dbb935b36fb4104560fc 100644 (file)
@@ -66,14 +66,7 @@ impl<'a, T> std::fmt::Display for DebugFundingInfo<'a, T> {
 }
 macro_rules! log_funding_info {
        ($key_storage: expr) => {
-               match $key_storage {
-                       Storage::Local { ref funding_info, .. } => {
-                               ::util::macro_logger::DebugFundingInfo(&funding_info)
-                       },
-                       Storage::Watchtower { .. } => {
-                               ::util::macro_logger::DebugFundingInfo(&None)
-                       }
-               }
+               ::util::macro_logger::DebugFundingInfo(&$key_storage.funding_info)
        }
 }