Send announcement_signatures msgs out-of-band for ordered delivery
[rust-lightning] / src / ln / channelmanager.rs
index 7e736ba15d2b4a4feda53bb94e1aab597e99217b..4777a3e810a220349d0f66bbaa921e77e489d9d4 100644 (file)
@@ -22,11 +22,12 @@ use secp256k1;
 
 use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator};
 use chain::transaction::OutPoint;
-use ln::channel::{Channel, ChannelError, ChannelKeys};
+use ln::channel::{Channel, ChannelError};
 use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
 use ln::router::{Route,RouteHop};
 use ln::msgs;
 use ln::msgs::{ChannelMessageHandler, HandleError, RAACommitmentOrder};
+use chain::keysinterface::KeysInterface;
 use util::{byte_utils, events, internal_traits, rng};
 use util::sha2::Sha256;
 use util::ser::{Readable, Writeable};
@@ -256,6 +257,9 @@ struct ChannelHolder {
        /// guarantees are made about the channels given here actually existing anymore by the time you
        /// go to read them!
        claimable_htlcs: HashMap<[u8; 32], Vec<HTLCPreviousHopData>>,
+       /// Messages to send to peers - pushed to in the same lock that they are generated in (except
+       /// for broadcast messages, where ordering isn't as strict).
+       pending_msg_events: Vec<events::MessageSendEvent>,
 }
 struct MutChannelHolder<'a> {
        by_id: &'a mut HashMap<[u8; 32], Channel>,
@@ -263,6 +267,7 @@ struct MutChannelHolder<'a> {
        next_forward: &'a mut Instant,
        forward_htlcs: &'a mut HashMap<u64, Vec<HTLCForwardInfo>>,
        claimable_htlcs: &'a mut HashMap<[u8; 32], Vec<HTLCPreviousHopData>>,
+       pending_msg_events: &'a mut Vec<events::MessageSendEvent>,
 }
 impl ChannelHolder {
        fn borrow_parts(&mut self) -> MutChannelHolder {
@@ -272,6 +277,7 @@ impl ChannelHolder {
                        next_forward: &mut self.next_forward,
                        forward_htlcs: &mut self.forward_htlcs,
                        claimable_htlcs: &mut self.claimable_htlcs,
+                       pending_msg_events: &mut self.pending_msg_events,
                }
        }
 }
@@ -301,6 +307,8 @@ pub struct ChannelManager {
 
        pending_events: Mutex<Vec<events::Event>>,
 
+       keys_manager: Arc<KeysInterface>,
+
        logger: Arc<Logger>,
 }
 
@@ -373,7 +381,7 @@ impl ChannelManager {
        /// Non-proportional fees are fixed according to our risk using the provided fee estimator.
        ///
        /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`!
-       pub fn new(our_network_key: SecretKey, fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>) -> Result<Arc<ChannelManager>, secp256k1::Error> {
+       pub fn new(fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>, keys_manager: Arc<KeysInterface>) -> Result<Arc<ChannelManager>, secp256k1::Error> {
                let secp_ctx = Secp256k1::new();
 
                let res = Arc::new(ChannelManager {
@@ -394,11 +402,14 @@ impl ChannelManager {
                                next_forward: Instant::now(),
                                forward_htlcs: HashMap::new(),
                                claimable_htlcs: HashMap::new(),
+                               pending_msg_events: Vec::new(),
                        }),
-                       our_network_key,
+                       our_network_key: keys_manager.get_node_secret(),
 
                        pending_events: Mutex::new(Vec::new()),
 
+                       keys_manager,
+
                        logger,
                });
                let weak_res = Arc::downgrade(&res);
@@ -413,32 +424,12 @@ impl ChannelManager {
        /// create_channel call. Note that user_channel_id defaults to 0 for inbound channels, so you
        /// may wish to avoid using 0 for user_id here.
        ///
-       /// If successful, will generate a SendOpenChannel event, so you should probably poll
+       /// If successful, will generate a SendOpenChannel message event, so you should probably poll
        /// PeerManager::process_events afterwards.
        ///
        /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat being greater than channel_value_satoshis * 1k
        pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> {
-               let chan_keys = if cfg!(feature = "fuzztarget") {
-                       ChannelKeys {
-                               funding_key:               SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               revocation_base_key:       SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               payment_base_key:          SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               delayed_payment_base_key:  SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               htlc_base_key:             SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               channel_close_key:         SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
-                       }
-               } else {
-                       let mut key_seed = [0u8; 32];
-                       rng::fill_bytes(&mut key_seed);
-                       match ChannelKeys::new_from_seed(&key_seed) {
-                               Ok(key) => key,
-                               Err(_) => panic!("RNG is busted!")
-                       }
-               };
-
-               let channel = Channel::new_outbound(&*self.fee_estimator, chan_keys, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?;
+               let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?;
                let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator);
                let mut channel_state = self.channel_state.lock().unwrap();
                match channel_state.by_id.entry(channel.channel_id()) {
@@ -451,9 +442,7 @@ impl ChannelManager {
                        },
                        hash_map::Entry::Vacant(entry) => { entry.insert(channel); }
                }
-
-               let mut events = self.pending_events.lock().unwrap();
-               events.push(events::Event::SendOpenChannel {
+               channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
                        node_id: their_network_key,
                        msg: res,
                });
@@ -503,25 +492,29 @@ impl ChannelManager {
        /// will be accepted on the given channel, and after additional timeout/the closing of all
        /// pending HTLCs, the channel will be closed on chain.
        ///
-       /// May generate a SendShutdown event on success, which should be relayed.
+       /// May generate a SendShutdown message event on success, which should be relayed.
        pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
-               let (mut res, node_id, chan_option) = {
+               let (mut failed_htlcs, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
                        match channel_state.by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let res = chan_entry.get_mut().get_shutdown()?;
+                                       let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?;
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                               node_id: chan_entry.get().get_their_node_id(),
+                                               msg: shutdown_msg
+                                       });
                                        if chan_entry.get().is_shutdown() {
                                                if let Some(short_id) = chan_entry.get().get_short_channel_id() {
                                                        channel_state.short_to_id.remove(&short_id);
                                                }
-                                               (res, chan_entry.get().get_their_node_id(), Some(chan_entry.remove_entry().1))
-                                       } else { (res, chan_entry.get().get_their_node_id(), None) }
+                                               (failed_htlcs, Some(chan_entry.remove_entry().1))
+                                       } else { (failed_htlcs, None) }
                                },
                                hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel"})
                        }
                };
-               for htlc_source in res.1.drain(..) {
+               for htlc_source in failed_htlcs.drain(..) {
                        // unknown_next_peer...I dunno who that is anymore....
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
                }
@@ -531,16 +524,12 @@ impl ChannelManager {
                        } else { None }
                } else { None };
 
-               let mut events = self.pending_events.lock().unwrap();
                if let Some(update) = chan_update {
-                       events.push(events::Event::BroadcastChannelUpdate {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                msg: update
                        });
                }
-               events.push(events::Event::SendShutdown {
-                       node_id,
-                       msg: res.0
-               });
 
                Ok(())
        }
@@ -580,9 +569,9 @@ impl ChannelManager {
                        }
                };
                self.finish_force_close_channel(chan.force_shutdown());
-               let mut events = self.pending_events.lock().unwrap();
                if let Ok(update) = self.get_channel_update(&chan) {
-                       events.push(events::Event::BroadcastChannelUpdate {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                msg: update
                        });
                }
@@ -609,9 +598,9 @@ impl ChannelManager {
                                };
                                mem::drop(channel_state_lock);
                                self.finish_force_close_channel(chan.force_shutdown());
-                               let mut events = self.pending_events.lock().unwrap();
                                if let Ok(update) = self.get_channel_update(&chan) {
-                                       events.push(events::Event::BroadcastChannelUpdate {
+                                       let mut channel_state = self.channel_state.lock().unwrap();
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                msg: update
                                        });
                                }
@@ -1111,7 +1100,7 @@ impl ChannelManager {
        /// payment_preimage tracking (which you should already be doing as they represent "proof of
        /// payment") and prevent double-sends yourself.
        ///
-       /// May generate a SendHTLCs event on success, which should be relayed.
+       /// May generate a SendHTLCs message event on success, which should be relayed.
        ///
        /// Raises APIError::RoutError when invalid route or forward parameter
        /// (cltv_delta, fee, node public key) is specified
@@ -1139,66 +1128,52 @@ impl ChannelManager {
                let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
                let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
 
-               let (first_hop_node_id, update_add, commitment_signed) = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-
-                       let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
-                               None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
-                               Some(id) => id.clone(),
-                       };
-
-                       let res = {
-                               let res = {
-                                       let chan = channel_state.by_id.get_mut(&id).unwrap();
-                                       if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
-                                               return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
-                                       }
-                                       if chan.is_awaiting_monitor_update() {
-                                               return Err(APIError::MonitorUpdateFailed);
-                                       }
-                                       if !chan.is_live() {
-                                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
-                                       }
-                                       chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                               route: route.clone(),
-                                               session_priv: session_priv.clone(),
-                                               first_hop_htlc_msat: htlc_msat,
-                                       }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
-                               };
-                               match res {
-                                       Some((update_add, commitment_signed, chan_monitor)) => {
-                                               if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                                       self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst);
-                                                       return Err(APIError::MonitorUpdateFailed);
-                                               }
-                                               Some((update_add, commitment_signed))
-                                       },
-                                       None => None,
-                               }
-                       };
+               let mut channel_state = self.channel_state.lock().unwrap();
 
-                       let first_hop_node_id = route.hops.first().unwrap().pubkey;
+               let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
+                       None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
+                       Some(id) => id.clone(),
+               };
 
-                       match res {
-                               Some((update_add, commitment_signed)) => {
-                                       (first_hop_node_id, update_add, commitment_signed)
-                               },
-                               None => return Ok(()),
+               let res = {
+                       let chan = channel_state.by_id.get_mut(&id).unwrap();
+                       if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
+                               return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
+                       }
+                       if chan.is_awaiting_monitor_update() {
+                               return Err(APIError::MonitorUpdateFailed);
                        }
+                       if !chan.is_live() {
+                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
+                       }
+                       chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+                               route: route.clone(),
+                               session_priv: session_priv.clone(),
+                               first_hop_htlc_msat: htlc_msat,
+                       }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
                };
+               match res {
+                       Some((update_add, commitment_signed, chan_monitor)) => {
+                               if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                       self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst);
+                                       return Err(APIError::MonitorUpdateFailed);
+                               }
 
-               let mut events = self.pending_events.lock().unwrap();
-               events.push(events::Event::UpdateHTLCs {
-                       node_id: first_hop_node_id,
-                       updates: msgs::CommitmentUpdate {
-                               update_add_htlcs: vec![update_add],
-                               update_fulfill_htlcs: Vec::new(),
-                               update_fail_htlcs: Vec::new(),
-                               update_fail_malformed_htlcs: Vec::new(),
-                               update_fee: None,
-                               commitment_signed,
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                       node_id: route.hops.first().unwrap().pubkey,
+                                       updates: msgs::CommitmentUpdate {
+                                               update_add_htlcs: vec![update_add],
+                                               update_fulfill_htlcs: Vec::new(),
+                                               update_fail_htlcs: Vec::new(),
+                                               update_fail_malformed_htlcs: Vec::new(),
+                                               update_fee: None,
+                                               commitment_signed,
+                                       },
+                               });
                        },
-               });
+                       None => {},
+               }
+
                Ok(())
        }
 
@@ -1209,15 +1184,6 @@ impl ChannelManager {
        /// May panic if the funding_txo is duplicative with some other channel (note that this should
        /// be trivially prevented by using unique funding transaction keys per-channel).
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) {
-               macro_rules! add_pending_event {
-                       ($event: expr) => {
-                               {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push($event);
-                               }
-                       }
-               }
-
                let (chan, msg, chan_monitor) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.remove(temporary_channel_id) {
@@ -1228,8 +1194,7 @@ impl ChannelManager {
                                                },
                                                Err(e) => {
                                                        log_error!(self, "Got bad signatures: {}!", e.err);
-                                                       mem::drop(channel_state);
-                                                       add_pending_event!(events::Event::HandleError {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                                node_id: chan.get_their_node_id(),
                                                                action: e.action,
                                                        });
@@ -1245,12 +1210,12 @@ impl ChannelManager {
                if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                        unimplemented!();
                }
-               add_pending_event!(events::Event::SendFundingCreated {
+
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
                        node_id: chan.get_their_node_id(),
                        msg: msg,
                });
-
-               let mut channel_state = self.channel_state.lock().unwrap();
                match channel_state.by_id.entry(chan.channel_id()) {
                        hash_map::Entry::Occupied(_) => {
                                panic!("Generated duplicate funding txid?");
@@ -1359,7 +1324,7 @@ impl ChannelManager {
                                                if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
                                                        unimplemented!();// but def dont push the event...
                                                }
-                                               new_events.push(events::Event::UpdateHTLCs {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                        node_id: forward_chan.get_their_node_id(),
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: add_htlc_msgs,
@@ -1422,19 +1387,20 @@ impl ChannelManager {
        /// to fail and take the channel_state lock for each iteration (as we take ownership and may
        /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to
        /// still-available channels.
-       fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard<ChannelHolder>, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) {
+       fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) {
                match source {
                        HTLCSource::OutboundRoute { .. } => {
-                               mem::drop(channel_state);
+                               mem::drop(channel_state_lock);
                                if let &HTLCFailReason::ErrorPacket { ref err } = &onion_error {
                                        let (channel_update, payment_retryable) = self.process_onion_failure(&source, err.data.clone());
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       if let Some(channel_update) = channel_update {
-                                               pending_events.push(events::Event::PaymentFailureNetworkUpdate {
-                                                       update: channel_update,
-                                               });
+                                       if let Some(update) = channel_update {
+                                               self.channel_state.lock().unwrap().pending_msg_events.push(
+                                                       events::MessageSendEvent::PaymentFailureNetworkUpdate {
+                                                               update,
+                                                       }
+                                               );
                                        }
-                                       pending_events.push(events::Event::PaymentFailed {
+                                       self.pending_events.lock().unwrap().push(events::Event::PaymentFailed {
                                                payment_hash: payment_hash.clone(),
                                                rejected_by_dest: !payment_retryable,
                                        });
@@ -1453,35 +1419,21 @@ impl ChannelManager {
                                        }
                                };
 
-                               let (node_id, fail_msgs) = {
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => return
-                                       };
+                               let channel_state = channel_state_lock.borrow_parts();
 
-                                       let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
-                                       match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
-                                               Ok(Some((msg, commitment_msg, chan_monitor))) => {
-                                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                                               unimplemented!();
-                                                       }
-                                                       (chan.get_their_node_id(), Some((msg, commitment_msg)))
-                                               },
-                                               Ok(None) => (chan.get_their_node_id(), None),
-                                               Err(_e) => {
-                                                       //TODO: Do something with e?
-                                                       return;
-                                               },
-                                       }
+                               let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
+                                       Some(chan_id) => chan_id.clone(),
+                                       None => return
                                };
 
-                               match fail_msgs {
-                                       Some((msg, commitment_msg)) => {
-                                               mem::drop(channel_state);
-
-                                               let mut pending_events = self.pending_events.lock().unwrap();
-                                               pending_events.push(events::Event::UpdateHTLCs {
-                                                       node_id,
+                               let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
+                               match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
+                                       Ok(Some((msg, commitment_msg, chan_monitor))) => {
+                                               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                                       unimplemented!();
+                                               }
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                       node_id: chan.get_their_node_id(),
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: Vec::new(),
                                                                update_fulfill_htlcs: Vec::new(),
@@ -1492,7 +1444,11 @@ impl ChannelManager {
                                                        },
                                                });
                                        },
-                                       None => {},
+                                       Ok(None) => {},
+                                       Err(_e) => {
+                                               //TODO: Do something with e?
+                                               return;
+                                       },
                                }
                        },
                }
@@ -1519,10 +1475,10 @@ impl ChannelManager {
                        true
                } else { false }
        }
-       fn claim_funds_internal(&self, mut channel_state: MutexGuard<ChannelHolder>, source: HTLCSource, payment_preimage: [u8; 32]) {
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_preimage: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { .. } => {
-                               mem::drop(channel_state);
+                               mem::drop(channel_state_lock);
                                let mut pending_events = self.pending_events.lock().unwrap();
                                pending_events.push(events::Event::PaymentSent {
                                        payment_preimage
@@ -1530,49 +1486,46 @@ impl ChannelManager {
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => {
                                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
-                               let (node_id, fulfill_msgs) = {
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel already hit the chain and that's
-                                                       // why its missing.
-                                                       return
-                                               }
-                                       };
+                               let channel_state = channel_state_lock.borrow_parts();
+
+                               let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
+                                       Some(chan_id) => chan_id.clone(),
+                                       None => {
+                                               // TODO: There is probably a channel manager somewhere that needs to
+                                               // learn the preimage as the channel already hit the chain and that's
+                                               // why its missing.
+                                               return
+                                       }
+                               };
 
-                                       let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
-                                       match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
-                                               Ok((msgs, Some(chan_monitor))) => {
+                               let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
+                               match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
+                                       Ok((msgs, monitor_option)) => {
+                                               if let Some(chan_monitor) = monitor_option {
                                                        if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                                                                unimplemented!();// but def dont push the event...
                                                        }
-                                                       (chan.get_their_node_id(), msgs)
-                                               },
-                                               Ok((msgs, None)) => (chan.get_their_node_id(), msgs),
-                                               Err(_e) => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel may be about to hit the chain.
-                                                       //TODO: Do something with e?
-                                                       return
-                                               },
-                                       }
-                               };
-
-                               mem::drop(channel_state);
-                               if let Some((msg, commitment_msg)) = fulfill_msgs {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::UpdateHTLCs {
-                                               node_id: node_id,
-                                               updates: msgs::CommitmentUpdate {
-                                                       update_add_htlcs: Vec::new(),
-                                                       update_fulfill_htlcs: vec![msg],
-                                                       update_fail_htlcs: Vec::new(),
-                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                       update_fee: None,
-                                                       commitment_signed: commitment_msg,
                                                }
-                                       });
+                                               if let Some((msg, commitment_signed)) = msgs {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                               node_id: chan.get_their_node_id(),
+                                                               updates: msgs::CommitmentUpdate {
+                                                                       update_add_htlcs: Vec::new(),
+                                                                       update_fulfill_htlcs: vec![msg],
+                                                                       update_fail_htlcs: Vec::new(),
+                                                                       update_fail_malformed_htlcs: Vec::new(),
+                                                                       update_fee: None,
+                                                                       commitment_signed,
+                                                               }
+                                                       });
+                                               }
+                                       },
+                                       Err(_e) => {
+                                               // TODO: There is probably a channel manager somewhere that needs to
+                                               // learn the preimage as the channel may be about to hit the chain.
+                                               //TODO: Do something with e?
+                                               return
+                                       },
                                }
                        },
                }
@@ -1587,7 +1540,6 @@ impl ChannelManager {
        /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
        /// operation.
        pub fn test_restore_channel_monitor(&self) {
-               let mut new_events = Vec::new();
                let mut close_results = Vec::new();
                let mut htlc_forwards = Vec::new();
                let mut htlc_failures = Vec::new();
@@ -1596,6 +1548,7 @@ impl ChannelManager {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_, channel| {
                                if channel.is_awaiting_monitor_update() {
                                        let chan_monitor = channel.channel_monitor();
@@ -1607,7 +1560,7 @@ impl ChannelManager {
                                                                }
                                                                close_results.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
-                                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -1624,7 +1577,7 @@ impl ChannelManager {
 
                                                macro_rules! handle_cs { () => {
                                                        if let Some(update) = commitment_update {
-                                                               new_events.push(events::Event::UpdateHTLCs {
+                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                                        node_id: channel.get_their_node_id(),
                                                                        updates: update,
                                                                });
@@ -1632,7 +1585,7 @@ impl ChannelManager {
                                                } }
                                                macro_rules! handle_raa { () => {
                                                        if let Some(revoke_and_ack) = raa {
-                                                               new_events.push(events::Event::SendRevokeAndACK {
+                                                               pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
                                                                        node_id: channel.get_their_node_id(),
                                                                        msg: revoke_and_ack,
                                                                });
@@ -1662,44 +1615,28 @@ impl ChannelManager {
                for res in close_results.drain(..) {
                        self.finish_force_close_channel(res);
                }
-
-               self.pending_events.lock().unwrap().append(&mut new_events);
        }
 
-       fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<msgs::AcceptChannel, MsgHandleErrInternal> {
+       fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash", msg.temporary_channel_id.clone()));
                }
-               let mut channel_state = self.channel_state.lock().unwrap();
-               if channel_state.by_id.contains_key(&msg.temporary_channel_id) {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!", msg.temporary_channel_id.clone()));
-               }
-
-               let chan_keys = if cfg!(feature = "fuzztarget") {
-                       ChannelKeys {
-                               funding_key:               SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]).unwrap(),
-                               revocation_base_key:       SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0]).unwrap(),
-                               payment_base_key:          SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0]).unwrap(),
-                               delayed_payment_base_key:  SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0]).unwrap(),
-                               htlc_base_key:             SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0]).unwrap(),
-                               channel_close_key:         SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0]).unwrap(),
-                               channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0]).unwrap(),
-                               commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
-                       }
-               } else {
-                       let mut key_seed = [0u8; 32];
-                       rng::fill_bytes(&mut key_seed);
-                       match ChannelKeys::new_from_seed(&key_seed) {
-                               Ok(key) => key,
-                               Err(_) => panic!("RNG is busted!")
-                       }
-               };
 
-               let channel = Channel::new_from_req(&*self.fee_estimator, chan_keys, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger))
+               let channel = Channel::new_from_req(&*self.fee_estimator, &self.keys_manager, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger))
                        .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?;
-               let accept_msg = channel.get_accept_channel();
-               channel_state.by_id.insert(channel.channel_id(), channel);
-               Ok(accept_msg)
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
+               match channel_state.by_id.entry(channel.channel_id()) {
+                       hash_map::Entry::Occupied(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!", msg.temporary_channel_id.clone())),
+                       hash_map::Entry::Vacant(entry) => {
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
+                                       node_id: their_node_id.clone(),
+                                       msg: channel.get_accept_channel(),
+                               });
+                               entry.insert(channel);
+                       }
+               }
+               Ok(())
        }
 
        fn internal_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
@@ -1729,7 +1666,7 @@ impl ChannelManager {
                Ok(())
        }
 
-       fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<msgs::FundingSigned, MsgHandleErrInternal> {
+       fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
                let (chan, funding_msg, monitor_update) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.entry(msg.temporary_channel_id.clone()) {
@@ -1755,16 +1692,21 @@ impl ChannelManager {
                if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) {
                        unimplemented!();
                }
-               let mut channel_state = self.channel_state.lock().unwrap();
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
                match channel_state.by_id.entry(funding_msg.channel_id) {
                        hash_map::Entry::Occupied(_) => {
                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id", funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+                                       node_id: their_node_id.clone(),
+                                       msg: funding_msg,
+                               });
                                e.insert(chan);
                        }
                }
-               Ok(funding_msg)
+               Ok(())
        }
 
        fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
@@ -1793,8 +1735,9 @@ impl ChannelManager {
                Ok(())
        }
 
-       fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<Option<msgs::AnnouncementSignatures>, MsgHandleErrInternal> {
-               let mut channel_state = self.channel_state.lock().unwrap();
+       fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), MsgHandleErrInternal> {
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
                match channel_state.by_id.get_mut(&msg.channel_id) {
                        Some(chan) => {
                                if chan.get_their_node_id() != *their_node_id {
@@ -1803,10 +1746,16 @@ impl ChannelManager {
                                }
                                chan.funding_locked(&msg)
                                        .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
-                               return Ok(self.get_announcement_sigs(chan));
+                               if let Some(announcement_sigs) = self.get_announcement_sigs(chan) {
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
+                                               node_id: their_node_id.clone(),
+                                               msg: announcement_sigs,
+                                       });
+                               }
+                               Ok(())
                        },
-                       None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
-               };
+                       None => Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
+               }
        }
 
        fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), MsgHandleErrInternal> {
@@ -1837,8 +1786,8 @@ impl ChannelManager {
                }
                if let Some(chan) = chan_option {
                        if let Ok(update) = self.get_channel_update(&chan) {
-                               let mut events = self.pending_events.lock().unwrap();
-                               events.push(events::Event::BroadcastChannelUpdate {
+                               let mut channel_state = self.channel_state.lock().unwrap();
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -1877,8 +1826,8 @@ impl ChannelManager {
                }
                if let Some(chan) = chan_option {
                        if let Ok(update) = self.get_channel_update(&chan) {
-                               let mut events = self.pending_events.lock().unwrap();
-                               events.push(events::Event::BroadcastChannelUpdate {
+                               let mut channel_state = self.channel_state.lock().unwrap();
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -2258,42 +2207,43 @@ impl ChannelManager {
        }
 
        fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> {
-               let (chan_announcement, chan_update) = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       match channel_state.by_id.get_mut(&msg.channel_id) {
-                               Some(chan) => {
-                                       if chan.get_their_node_id() != *their_node_id {
-                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
-                                       }
-                                       if !chan.is_usable() {
-                                               return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)}));
-                                       }
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
 
-                                       let our_node_id = self.get_our_node_id();
-                                       let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone())
-                                               .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
+               match channel_state.by_id.get_mut(&msg.channel_id) {
+                       Some(chan) => {
+                               if chan.get_their_node_id() != *their_node_id {
+                                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
+                               }
+                               if !chan.is_usable() {
+                                       return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)}));
+                               }
 
-                                       let were_node_one = announcement.node_id_1 == our_node_id;
-                                       let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap();
-                                       let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id);
-                                       secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action);
-                                       secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action);
+                               let our_node_id = self.get_our_node_id();
+                               let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone())
+                                       .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
+
+                               let were_node_one = announcement.node_id_1 == our_node_id;
+                               let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap();
+                               let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id);
+                               secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action);
+                               secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action);
 
-                                       let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
+                               let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
 
-                                       (msgs::ChannelAnnouncement {
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
+                                       msg: msgs::ChannelAnnouncement {
                                                node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature },
                                                node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig },
                                                bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature },
                                                bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig },
                                                contents: announcement,
-                                       }, self.get_channel_update(chan).unwrap()) // can only fail if we're not in a ready state
-                               },
-                               None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
-                       }
-               };
-               let mut pending_events = self.pending_events.lock().unwrap();
-               pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update });
+                                       },
+                                       update_msg: self.get_channel_update(chan).unwrap(), // can only fail if we're not in a ready state
+                               });
+                       },
+                       None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
+               }
                Ok(())
        }
 
@@ -2327,7 +2277,9 @@ impl ChannelManager {
        /// Note: This API is likely to change!
        #[doc(hidden)]
        pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
-               let mut channel_state = self.channel_state.lock().unwrap();
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
+
                match channel_state.by_id.get_mut(&channel_id) {
                        None => return Err(APIError::APIMisuseError{err: "Failed to find corresponding channel"}),
                        Some(chan) => {
@@ -2344,8 +2296,7 @@ impl ChannelManager {
                                        if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                                                unimplemented!();
                                        }
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::UpdateHTLCs {
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                node_id: chan.get_their_node_id(),
                                                updates: msgs::CommitmentUpdate {
                                                        update_add_htlcs: Vec::new(),
@@ -2363,10 +2314,19 @@ impl ChannelManager {
        }
 }
 
+impl events::MessageSendEventsProvider for ChannelManager {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               let mut ret = Vec::new();
+               let mut channel_state = self.channel_state.lock().unwrap();
+               mem::swap(&mut ret, &mut channel_state.pending_msg_events);
+               ret
+       }
+}
+
 impl events::EventsProvider for ChannelManager {
        fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
                let mut ret = Vec::new();
+               let mut pending_events = self.pending_events.lock().unwrap();
                mem::swap(&mut ret, &mut *pending_events);
                ret
        }
@@ -2374,24 +2334,28 @@ impl events::EventsProvider for ChannelManager {
 
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_, channel| {
                                let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
                                if let Ok(Some(funding_locked)) = chan_res {
-                                       let announcement_sigs = self.get_announcement_sigs(channel);
-                                       new_events.push(events::Event::SendFundingLocked {
+                                       pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
                                                node_id: channel.get_their_node_id(),
                                                msg: funding_locked,
-                                               announcement_sigs: announcement_sigs
                                        });
+                                       if let Some(announcement_sigs) = self.get_announcement_sigs(channel) {
+                                               pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
+                                                       node_id: channel.get_their_node_id(),
+                                                       msg: announcement_sigs,
+                                               });
+                                       }
                                        short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
                                } else if let Err(e) = chan_res {
-                                       new_events.push(events::Event::HandleError {
+                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_their_node_id(),
                                                action: e.action,
                                        });
@@ -2411,7 +2375,7 @@ impl ChainListener for ChannelManager {
                                                                // some kind of SPV attack, though we expect these to be dropped.
                                                                failed_channels.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
-                                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -2430,7 +2394,7 @@ impl ChainListener for ChannelManager {
                                        // hurt anything, but does make tests a bit simpler).
                                        failed_channels.last_mut().unwrap().0 = Vec::new();
                                        if let Ok(update) = self.get_channel_update(&channel) {
-                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2442,21 +2406,17 @@ impl ChainListener for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               let mut pending_events = self.pending_events.lock().unwrap();
-               for funding_locked in new_events.drain(..) {
-                       pending_events.push(funding_locked);
-               }
                self.latest_block_height.store(height as usize, Ordering::Release);
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_,  v| {
                                if v.block_disconnected(header) {
                                        if let Some(short_id) = v.get_short_channel_id() {
@@ -2464,7 +2424,7 @@ impl ChainListener for ChannelManager {
                                        }
                                        failed_channels.push(v.force_shutdown());
                                        if let Ok(update) = self.get_channel_update(&v) {
-                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2477,12 +2437,6 @@ impl ChainListener for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               if !new_events.is_empty() {
-                       let mut pending_events = self.pending_events.lock().unwrap();
-                       for funding_locked in new_events.drain(..) {
-                               pending_events.push(funding_locked);
-                       }
-               }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
        }
 }
@@ -2521,7 +2475,7 @@ macro_rules! handle_error {
 
 impl ChannelMessageHandler for ChannelManager {
        //TODO: Handle errors and close channel (or so)
-       fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<msgs::AcceptChannel, HandleError> {
+       fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), HandleError> {
                handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id)
        }
 
@@ -2529,7 +2483,7 @@ impl ChannelMessageHandler for ChannelManager {
                handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id)
        }
 
-       fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<msgs::FundingSigned, HandleError> {
+       fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), HandleError> {
                handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id)
        }
 
@@ -2537,7 +2491,7 @@ impl ChannelMessageHandler for ChannelManager {
                handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id)
        }
 
-       fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<Option<msgs::AnnouncementSignatures>, HandleError> {
+       fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), HandleError> {
                handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
        }
 
@@ -2586,13 +2540,13 @@ impl ChannelMessageHandler for ChannelManager {
        }
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                let mut failed_payments = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        if no_connection_possible {
                                channel_state.by_id.retain(|_, chan| {
                                        if chan.get_their_node_id() == *their_node_id {
@@ -2601,7 +2555,7 @@ impl ChannelMessageHandler for ChannelManager {
                                                }
                                                failed_channels.push(chan.force_shutdown());
                                                if let Ok(update) = self.get_channel_update(&chan) {
-                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                msg: update
                                                        });
                                                }
@@ -2633,12 +2587,6 @@ impl ChannelMessageHandler for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               if !new_events.is_empty() {
-                       let mut pending_events = self.pending_events.lock().unwrap();
-                       for event in new_events.drain(..) {
-                               pending_events.push(event);
-                       }
-               }
                for (chan_update, mut htlc_sources) in failed_payments {
                        for (htlc_source, payment_hash) in htlc_sources.drain(..) {
                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
@@ -2685,13 +2633,15 @@ mod tests {
        use chain::chaininterface;
        use chain::transaction::OutPoint;
        use chain::chaininterface::ChainListener;
+       use chain::keysinterface::KeysInterface;
+       use chain::keysinterface;
        use ln::channelmanager::{ChannelManager,OnionKeys,PaymentFailReason};
        use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
        use ln::router::{Route, RouteHop, Router};
        use ln::msgs;
        use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
        use util::test_utils;
-       use util::events::{Event, EventsProvider};
+       use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
        use util::errors::APIError;
        use util::logger::Logger;
        use util::ser::Writeable;
@@ -2896,6 +2846,7 @@ mod tests {
                fn drop(&mut self) {
                        if !::std::thread::panicking() {
                                // Check that we processed all pending events
+                               assert_eq!(self.node.get_and_clear_pending_msg_events().len(), 0);
                                assert_eq!(self.node.get_and_clear_pending_events().len(), 0);
                                assert_eq!(self.chan_monitor.added_monitors.lock().unwrap().len(), 0);
                        }
@@ -2912,20 +2863,26 @@ mod tests {
                (announcement, as_update, bs_update, channel_id, tx)
        }
 
+       macro_rules! get_event_msg {
+               ($node: expr, $event_type: path, $node_id: expr) => {
+                       {
+                               let events = $node.node.get_and_clear_pending_msg_events();
+                               assert_eq!(events.len(), 1);
+                               match events[0] {
+                                       $event_type { ref node_id, ref msg } => {
+                                               assert_eq!(*node_id, $node_id);
+                                               (*msg).clone()
+                                       },
+                                       _ => panic!("Unexpected event"),
+                               }
+                       }
+               }
+       }
+
        fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> Transaction {
                node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42).unwrap();
-
-               let events_1 = node_a.node.get_and_clear_pending_events();
-               assert_eq!(events_1.len(), 1);
-               let accept_chan = match events_1[0] {
-                       Event::SendOpenChannel { ref node_id, ref msg } => {
-                               assert_eq!(*node_id, node_b.node.get_our_node_id());
-                               node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), msg).unwrap()
-                       },
-                       _ => panic!("Unexpected event"),
-               };
-
-               node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), &accept_chan).unwrap();
+               node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id())).unwrap();
+               node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id())).unwrap();
 
                let chan_id = *node_a.network_chan_count.borrow();
                let tx;
@@ -2952,22 +2909,15 @@ mod tests {
                        _ => panic!("Unexpected event"),
                }
 
-               let events_3 = node_a.node.get_and_clear_pending_events();
-               assert_eq!(events_3.len(), 1);
-               let funding_signed = match events_3[0] {
-                       Event::SendFundingCreated { ref node_id, ref msg } => {
-                               assert_eq!(*node_id, node_b.node.get_our_node_id());
-                               let res = node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), msg).unwrap();
-                               let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
-                               assert_eq!(added_monitors.len(), 1);
-                               assert_eq!(added_monitors[0].0, funding_output);
-                               added_monitors.clear();
-                               res
-                       },
-                       _ => panic!("Unexpected event"),
-               };
+               node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendFundingCreated, node_b.node.get_our_node_id())).unwrap();
+               {
+                       let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
+                       assert_eq!(added_monitors.len(), 1);
+                       assert_eq!(added_monitors[0].0, funding_output);
+                       added_monitors.clear();
+               }
 
-               node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &funding_signed).unwrap();
+               node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id())).unwrap();
                {
                        let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
                        assert_eq!(added_monitors.len(), 1);
@@ -2990,30 +2940,27 @@ mod tests {
 
        fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
                confirm_transaction(&node_b.chain_monitor, &tx, tx.version);
-               let events_5 = node_b.node.get_and_clear_pending_events();
-               assert_eq!(events_5.len(), 1);
-               match events_5[0] {
-                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
-                               assert_eq!(*node_id, node_a.node.get_our_node_id());
-                               assert!(announcement_sigs.is_none());
-                               node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), msg).unwrap()
-                       },
-                       _ => panic!("Unexpected event"),
-               };
+               node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingLocked, node_a.node.get_our_node_id())).unwrap();
 
                let channel_id;
 
                confirm_transaction(&node_a.chain_monitor, &tx, tx.version);
-               let events_6 = node_a.node.get_and_clear_pending_events();
-               assert_eq!(events_6.len(), 1);
-               (match events_6[0] {
-                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
+               let events_6 = node_a.node.get_and_clear_pending_msg_events();
+               assert_eq!(events_6.len(), 2);
+               ((match events_6[0] {
+                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
                                channel_id = msg.channel_id.clone();
                                assert_eq!(*node_id, node_b.node.get_our_node_id());
-                               (msg.clone(), announcement_sigs.clone().unwrap())
+                               msg.clone()
+                       },
+                       _ => panic!("Unexpected event"),
+               }, match events_6[1] {
+                       MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
+                               assert_eq!(*node_id, node_b.node.get_our_node_id());
+                               msg.clone()
                        },
                        _ => panic!("Unexpected event"),
-               }, channel_id)
+               }), channel_id)
        }
 
        fn create_chan_between_nodes_with_value_a(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32], Transaction) {
@@ -3023,26 +2970,24 @@ mod tests {
        }
 
        fn create_chan_between_nodes_with_value_b(node_a: &Node, node_b: &Node, as_funding_msgs: &(msgs::FundingLocked, msgs::AnnouncementSignatures)) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate) {
-               let bs_announcement_sigs = {
-                       let bs_announcement_sigs = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &as_funding_msgs.0).unwrap().unwrap();
-                       node_b.node.handle_announcement_signatures(&node_a.node.get_our_node_id(), &as_funding_msgs.1).unwrap();
-                       bs_announcement_sigs
-               };
+               node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &as_funding_msgs.0).unwrap();
+               let bs_announcement_sigs = get_event_msg!(node_b, MessageSendEvent::SendAnnouncementSignatures, node_a.node.get_our_node_id());
+               node_b.node.handle_announcement_signatures(&node_a.node.get_our_node_id(), &as_funding_msgs.1).unwrap();
 
-               let events_7 = node_b.node.get_and_clear_pending_events();
+               let events_7 = node_b.node.get_and_clear_pending_msg_events();
                assert_eq!(events_7.len(), 1);
                let (announcement, bs_update) = match events_7[0] {
-                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                (msg, update_msg)
                        },
                        _ => panic!("Unexpected event"),
                };
 
                node_a.node.handle_announcement_signatures(&node_b.node.get_our_node_id(), &bs_announcement_sigs).unwrap();
-               let events_8 = node_a.node.get_and_clear_pending_events();
+               let events_8 = node_a.node.get_and_clear_pending_msg_events();
                assert_eq!(events_8.len(), 1);
                let as_update = match events_8[0] {
-                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                assert!(*announcement == *msg);
                                update_msg
                        },
@@ -3085,10 +3030,10 @@ mod tests {
                let (tx_a, tx_b);
 
                node_a.close_channel(channel_id).unwrap();
-               let events_1 = node_a.get_and_clear_pending_events();
+               let events_1 = node_a.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                let shutdown_a = match events_1[0] {
-                       Event::SendShutdown { ref node_id, ref msg } => {
+                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                assert_eq!(node_id, &node_b.get_our_node_id());
                                msg.clone()
                        },
@@ -3124,19 +3069,19 @@ mod tests {
                assert_eq!(tx_a, tx_b);
                check_spends!(tx_a, funding_tx);
 
-               let events_2 = node_a.get_and_clear_pending_events();
+               let events_2 = node_a.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let as_update = match events_2[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
                };
 
-               let events_3 = node_b.get_and_clear_pending_events();
+               let events_3 = node_b.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                let bs_update = match events_3[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
@@ -3159,9 +3104,9 @@ mod tests {
                        SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed }
                }
 
-               fn from_event(event: Event) -> SendEvent {
+               fn from_event(event: MessageSendEvent) -> SendEvent {
                        match event {
-                               Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates),
+                               MessageSendEvent::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates),
                                _ => panic!("Unexpected event type!"),
                        }
                }
@@ -3191,6 +3136,7 @@ mod tests {
                                check_added_monitors!($node_b, 1);
                                if $fail_backwards {
                                        assert!($node_a.node.get_and_clear_pending_events().is_empty());
+                                       assert!($node_a.node.get_and_clear_pending_msg_events().is_empty());
                                }
                                assert!($node_a.node.handle_revoke_and_ack(&$node_b.node.get_our_node_id(), &bs_revoke_and_ack).unwrap().is_none());
                                {
@@ -3228,7 +3174,7 @@ mod tests {
                        origin_node.node.send_payment(route, our_payment_hash).unwrap();
                        check_added_monitors!(origin_node, 1);
 
-                       let mut events = origin_node.node.get_and_clear_pending_events();
+                       let mut events = origin_node.node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -3251,9 +3197,9 @@ mod tests {
                        node.node.channel_state.lock().unwrap().next_forward = Instant::now();
                        node.node.process_pending_htlc_forwards();
 
-                       let mut events_2 = node.node.get_and_clear_pending_events();
-                       assert_eq!(events_2.len(), 1);
                        if idx == expected_route.len() - 1 {
+                               let events_2 = node.node.get_and_clear_pending_events();
+                               assert_eq!(events_2.len(), 1);
                                match events_2[0] {
                                        Event::PaymentReceived { ref payment_hash, amt } => {
                                                assert_eq!(our_payment_hash, *payment_hash);
@@ -3262,6 +3208,8 @@ mod tests {
                                        _ => panic!("Unexpected event"),
                                }
                        } else {
+                               let mut events_2 = node.node.get_and_clear_pending_msg_events();
+                               assert_eq!(events_2.len(), 1);
                                check_added_monitors!(node, 1);
                                payment_event = SendEvent::from_event(events_2.remove(0));
                                assert_eq!(payment_event.msgs.len(), 1);
@@ -3300,11 +3248,11 @@ mod tests {
                                update_fulfill_dance!(node, prev_node, false);
                        }
 
-                       let events = node.node.get_and_clear_pending_events();
+                       let events = node.node.get_and_clear_pending_msg_events();
                        if !skip_last || idx != expected_route.len() - 1 {
                                assert_eq!(events.len(), 1);
                                match events[0] {
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                assert!(update_add_htlcs.is_empty());
                                                assert_eq!(update_fulfill_htlcs.len(), 1);
                                                assert!(update_fail_htlcs.is_empty());
@@ -3400,11 +3348,11 @@ mod tests {
                                update_fail_dance!(node, prev_node, skip_last && idx == expected_route.len() - 1);
                        }
 
-                       let events = node.node.get_and_clear_pending_events();
+                       let events = node.node.get_and_clear_pending_msg_events();
                        if !skip_last || idx != expected_route.len() - 1 {
                                assert_eq!(events.len(), 1);
                                match events[0] {
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                assert!(update_add_htlcs.is_empty());
                                                assert!(update_fulfill_htlcs.is_empty());
                                                assert_eq!(update_fail_htlcs.len(), 1);
@@ -3457,14 +3405,12 @@ mod tests {
                        let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
                        let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&logger)));
                        let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
+                       let mut seed = [0; 32];
+                       rng.fill_bytes(&mut seed);
+                       let keys_manager = Arc::new(keysinterface::KeysManager::new(&seed, Network::Testnet, Arc::clone(&logger)));
                        let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone()));
-                       let node_id = {
-                               let mut key_slice = [0; 32];
-                               rng.fill_bytes(&mut key_slice);
-                               SecretKey::from_slice(&secp_ctx, &key_slice).unwrap()
-                       };
-                       let node = ChannelManager::new(node_id.clone(), 0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger)).unwrap();
-                       let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &node_id), chain_monitor.clone(), Arc::clone(&logger));
+                       let node = ChannelManager::new(0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone()).unwrap();
+                       let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger));
                        nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router,
                                network_payment_count: payment_count.clone(),
                                network_chan_count: chan_count.clone(),
@@ -3512,10 +3458,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3529,7 +3475,7 @@ mod tests {
                check_added_monitors!(nodes[1], 1);
 
                let payment_event = {
-                       let mut events_1 = nodes[1].node.get_and_clear_pending_events();
+                       let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_1.len(), 1);
                        SendEvent::from_event(events_1.remove(0))
                };
@@ -3608,10 +3554,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let update_msg = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => {
                                update_fee.as_ref()
                        },
                        _ => panic!("Unexpected event"),
@@ -3625,7 +3571,7 @@ mod tests {
                check_added_monitors!(nodes[1], 1);
 
                let payment_event = {
-                       let mut events_1 = nodes[1].node.get_and_clear_pending_events();
+                       let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_1.len(), 1);
                        SendEvent::from_event(events_1.remove(0))
                };
@@ -3682,10 +3628,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, initial_feerate + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg_1, commitment_signed_1) = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
                                (update_fee.as_ref().unwrap(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3700,6 +3646,7 @@ mod tests {
                // transaction:
                nodes[0].node.update_fee(channel_id, initial_feerate + 40).unwrap();
                assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                // Create the (3) update_fee message that nodes[0] will generate before it does...
                let mut update_msg_2 = msgs::UpdateFee {
@@ -3764,10 +3711,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3812,10 +3759,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3837,8 +3784,8 @@ mod tests {
                        assert_eq!(added_monitors.len(), 0);
                        added_monitors.clear();
                }
-               let events = nodes[0].node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 0);
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                // node[1] has nothing to do
 
                let resp_option = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &revoke_msg).unwrap();
@@ -3929,10 +3876,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3952,10 +3899,10 @@ mod tests {
 
                // Create and deliver (4)...
                nodes[0].node.update_fee(channel_id, feerate+30).unwrap();
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -4226,19 +4173,19 @@ mod tests {
        }
 
        fn get_announce_close_broadcast_events(nodes: &Vec<Node>, a: usize, b: usize) {
-               let events_1 = nodes[a].node.get_and_clear_pending_events();
+               let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                let as_update = match events_1[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
                };
 
-               let events_2 = nodes[b].node.get_and_clear_pending_events();
+               let events_2 = nodes[b].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let bs_update = match events_2[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
@@ -4297,7 +4244,7 @@ mod tests {
 
                macro_rules! expect_forward {
                        ($node: expr) => {{
-                               let mut events = $node.node.get_and_clear_pending_events();
+                               let mut events = $node.node.get_and_clear_pending_msg_events();
                                assert_eq!(events.len(), 1);
                                check_added_monitors!($node, 1);
                                let payment_event = SendEvent::from_event(events.remove(0));
@@ -4380,7 +4327,7 @@ mod tests {
                        nodes[0].node.send_payment(route_1, our_payment_hash_1).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -4456,8 +4403,8 @@ mod tests {
                // this will also stuck in the holding cell
                nodes[0].node.send_payment(route_22, our_payment_hash_22).unwrap();
                check_added_monitors!(nodes[0], 0);
-               let events = nodes[0].node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 0);
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                // flush the pending htlc
                let (as_revoke_and_ack, as_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event_1.commitment_msg).unwrap();
@@ -4576,10 +4523,10 @@ mod tests {
                                        assert!($node.node.claim_funds($preimage));
                                        check_added_monitors!($node, 1);
 
-                                       let events = $node.node.get_and_clear_pending_events();
+                                       let events = $node.node.get_and_clear_pending_msg_events();
                                        assert_eq!(events.len(), 1);
                                        match events[0] {
-                                               Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => {
+                                               MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => {
                                                        assert!(update_add_htlcs.is_empty());
                                                        assert!(update_fail_htlcs.is_empty());
                                                        assert_eq!(*node_id, $prev_node.node.get_our_node_id());
@@ -4870,10 +4817,10 @@ mod tests {
                route_payment(&nodes[0], &[&nodes[1]], 10000000);
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id);
                {
-                       let events = nodes[0].node.get_and_clear_pending_events();
+                       let events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -4887,10 +4834,10 @@ mod tests {
                nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
 
                {
-                       let events = nodes[1].node.get_and_clear_pending_events();
+                       let events = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -4917,7 +4864,7 @@ mod tests {
                        nodes[0].node.send_payment(route, our_payment_hash).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -4935,7 +4882,7 @@ mod tests {
                nodes[1].node.channel_state.lock().unwrap().next_forward = Instant::now();
                nodes[1].node.process_pending_htlc_forwards();
 
-               let mut events_2 = nodes[1].node.get_and_clear_pending_events();
+               let mut events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                payment_event = SendEvent::from_event(events_2.remove(0));
                assert_eq!(payment_event.msgs.len(), 1);
@@ -4950,10 +4897,10 @@ mod tests {
                // transaction and ensure nodes[1] doesn't fail-backwards (this was originally a bug!).
 
                nodes[2].node.force_close_channel(&payment_event.commitment_msg.channel_id);
-               let events_3 = nodes[2].node.get_and_clear_pending_events();
+               let events_3 = nodes[2].node.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                match events_3[0] {
-                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                       MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                assert_eq!(flags & 0b10, 0b10);
                        },
                        _ => panic!("Unexpected event"),
@@ -4971,11 +4918,11 @@ mod tests {
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
 
-               let events_4 = nodes[1].node.get_and_clear_pending_events();
+               let events_4 = nodes[1].node.get_and_clear_pending_msg_events();
                // Note no UpdateHTLCs event here from nodes[1] to nodes[0]!
                assert_eq!(events_4.len(), 1);
                match events_4[0] {
-                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                       MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                assert_eq!(flags & 0b10, 0b10);
                        },
                        _ => panic!("Unexpected event"),
@@ -5020,10 +4967,10 @@ mod tests {
                        nodes[0].node.block_disconnected(&headers.pop().unwrap());
                }
                {
-                       let events = nodes[0].node.get_and_clear_pending_events();
+                       let events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -5066,9 +5013,14 @@ mod tests {
 
                for chan_msgs in resp_1.drain(..) {
                        if pre_all_htlcs {
-                               let a = node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap());
-                               let _announcement_sigs_opt = a.unwrap();
-                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                               node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               let announcement_event = node_a.node.get_and_clear_pending_msg_events();
+                               if !announcement_event.is_empty() {
+                                       assert_eq!(announcement_event.len(), 1);
+                                       if let MessageSendEvent::SendAnnouncementSignatures { .. } = announcement_event[0] {
+                                               //TODO: Test announcement_sigs re-sending
+                                       } else { panic!("Unexpected event!"); }
+                               }
                        } else {
                                assert!(chan_msgs.0.is_none());
                        }
@@ -5115,8 +5067,14 @@ mod tests {
 
                for chan_msgs in resp_2.drain(..) {
                        if pre_all_htlcs {
-                               let _announcement_sigs_opt = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
-                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                               node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               let announcement_event = node_b.node.get_and_clear_pending_msg_events();
+                               if !announcement_event.is_empty() {
+                                       assert_eq!(announcement_event.len(), 1);
+                                       if let MessageSendEvent::SendAnnouncementSignatures { .. } = announcement_event[0] {
+                                               //TODO: Test announcement_sigs re-sending
+                                       } else { panic!("Unexpected event!"); }
+                               }
                        } else {
                                assert!(chan_msgs.0.is_none());
                        }
@@ -5231,7 +5189,7 @@ mod tests {
                        nodes[0].node.send_payment(route.clone(), payment_hash_1).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -5308,10 +5266,10 @@ mod tests {
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
 
-               let events_3 = nodes[1].node.get_and_clear_pending_events();
+               let events_3 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                let (update_fulfill_htlc, commitment_signed) = match events_3[0] {
-                       Event::UpdateHTLCs { ref node_id, ref updates } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(updates.update_add_htlcs.is_empty());
                                assert!(updates.update_fail_htlcs.is_empty());
@@ -5419,23 +5377,21 @@ mod tests {
                nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
                confirm_transaction(&nodes[0].chain_monitor, &tx, tx.version);
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => {
                                assert_eq!(*node_id, nodes[1].node.get_our_node_id());
-                               assert!(announcement_sigs.is_none());
                        },
                        _ => panic!("Unexpected event"),
                }
 
                confirm_transaction(&nodes[1].chain_monitor, &tx, tx.version);
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                match events_2[0] {
-                       Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
-                               assert!(announcement_sigs.is_none());
                        },
                        _ => panic!("Unexpected event"),
                }
@@ -5469,20 +5425,20 @@ mod tests {
                nodes[0].node.send_payment(route.clone(), payment_hash_2).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::UpdateHTLCs { .. } => {},
+                       MessageSendEvent::UpdateHTLCs { .. } => {},
                        _ => panic!("Unexpected event"),
                }
 
                assert!(nodes[1].node.claim_funds(payment_preimage_1));
                check_added_monitors!(nodes[1], 1);
 
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                match events_2[0] {
-                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(update_add_htlcs.is_empty());
                                assert_eq!(update_fulfill_htlcs.len(), 1);
@@ -5601,10 +5557,10 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::BroadcastChannelUpdate { .. } => {},
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("Unexpected event"),
                };
 
@@ -5627,8 +5583,8 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_1.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                if disconnect {
@@ -5641,7 +5597,7 @@ mod tests {
                nodes[0].node.test_restore_channel_monitor();
                check_added_monitors!(nodes[0], 1);
 
-               let mut events_2 = nodes[0].node.get_and_clear_pending_events();
+               let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let payment_event = SendEvent::from_event(events_2.pop().unwrap());
                assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
@@ -5668,8 +5624,8 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_4 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_4.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                if disconnect {
@@ -5683,10 +5639,10 @@ mod tests {
                nodes[0].node.test_restore_channel_monitor();
                check_added_monitors!(nodes[0], 1);
 
-               let events_5 = nodes[0].node.get_and_clear_pending_events();
+               let events_5 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_5.len(), 1);
                match events_5[0] {
-                       Event::BroadcastChannelUpdate { .. } => {},
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("Unexpected event"),
                }
 
@@ -5735,18 +5691,18 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_1.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1]
                // but nodes[0] won't respond since it is frozen.
                assert!(nodes[1].node.claim_funds(payment_preimage_1));
                check_added_monitors!(nodes[1], 1);
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] {
-                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(update_add_htlcs.is_empty());
                                assert_eq!(update_fulfill_htlcs.len(), 1);
@@ -5804,8 +5760,8 @@ mod tests {
                } } }
 
                let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 {
-                       let events_4 = nodes[0].node.get_and_clear_pending_events();
-                       assert!(events_4.is_empty());
+                       assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+                       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                        let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
                        assert_eq!(reestablish_1.len(), 1);
@@ -5868,10 +5824,10 @@ mod tests {
 
                        (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap())
                } else {
-                       let mut events_4 = nodes[0].node.get_and_clear_pending_events();
+                       let mut events_4 = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_4.len(), 2);
                        (SendEvent::from_event(events_4.remove(0)), match events_4[0] {
-                               Event::SendRevokeAndACK { ref node_id, ref msg } => {
+                               MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
                                        assert_eq!(*node_id, nodes[1].node.get_our_node_id());
                                        msg.clone()
                                },