Split Event, move MessageSendEvent push() inside channel_state lock
authorMatt Corallo <git@bluematt.me>
Fri, 19 Oct 2018 20:25:32 +0000 (16:25 -0400)
committerMatt Corallo <git@bluematt.me>
Sat, 27 Oct 2018 13:42:04 +0000 (09:42 -0400)
fuzz/fuzz_targets/full_stack_target.rs
src/ln/channelmanager.rs
src/ln/msgs.rs
src/ln/peer_handler.rs
src/util/events.rs
src/util/test_utils.rs

index ed5001a3e08f86b49d6751619fb5f2b8b681ac35..bf6de14aeed03bbd096824523e7bc4eb1e149eb3 100644 (file)
@@ -460,7 +460,7 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
                        _ => return,
                }
                loss_detector.handler.process_events();
-               for event in loss_detector.handler.get_and_clear_pending_events() {
+               for event in loss_detector.manager.get_and_clear_pending_events() {
                        match event {
                                Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, output_script, .. } => {
                                        pending_funding_generation.push((temporary_channel_id, channel_value_satoshis, output_script));
@@ -473,11 +473,10 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
                                },
                                Event::PaymentSent {..} => {},
                                Event::PaymentFailed {..} => {},
-
                                Event::PendingHTLCsForwardable {..} => {
                                        should_forward = true;
                                },
-                               _ => panic!("Unknown event"),
+                               Event::SpendableOutputs {..} => {},
                        }
                }
        }
index c71013f950d9509f318ee21fd851ab582f3d5644..bd663c9241235e7dee611f709e71541b9cf3ceac 100644 (file)
@@ -257,6 +257,9 @@ struct ChannelHolder {
        /// guarantees are made about the channels given here actually existing anymore by the time you
        /// go to read them!
        claimable_htlcs: HashMap<[u8; 32], Vec<HTLCPreviousHopData>>,
+       /// Messages to send to peers - pushed to in the same lock that they are generated in (except
+       /// for broadcast messages, where ordering isn't as strict).
+       pending_msg_events: Vec<events::MessageSendEvent>,
 }
 struct MutChannelHolder<'a> {
        by_id: &'a mut HashMap<[u8; 32], Channel>,
@@ -264,6 +267,7 @@ struct MutChannelHolder<'a> {
        next_forward: &'a mut Instant,
        forward_htlcs: &'a mut HashMap<u64, Vec<HTLCForwardInfo>>,
        claimable_htlcs: &'a mut HashMap<[u8; 32], Vec<HTLCPreviousHopData>>,
+       pending_msg_events: &'a mut Vec<events::MessageSendEvent>,
 }
 impl ChannelHolder {
        fn borrow_parts(&mut self) -> MutChannelHolder {
@@ -273,6 +277,7 @@ impl ChannelHolder {
                        next_forward: &mut self.next_forward,
                        forward_htlcs: &mut self.forward_htlcs,
                        claimable_htlcs: &mut self.claimable_htlcs,
+                       pending_msg_events: &mut self.pending_msg_events,
                }
        }
 }
@@ -397,6 +402,7 @@ impl ChannelManager {
                                next_forward: Instant::now(),
                                forward_htlcs: HashMap::new(),
                                claimable_htlcs: HashMap::new(),
+                               pending_msg_events: Vec::new(),
                        }),
                        our_network_key: keys_manager.get_node_secret(),
 
@@ -418,7 +424,7 @@ impl ChannelManager {
        /// create_channel call. Note that user_channel_id defaults to 0 for inbound channels, so you
        /// may wish to avoid using 0 for user_id here.
        ///
-       /// If successful, will generate a SendOpenChannel event, so you should probably poll
+       /// If successful, will generate a SendOpenChannel message event, so you should probably poll
        /// PeerManager::process_events afterwards.
        ///
        /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat being greater than channel_value_satoshis * 1k
@@ -436,9 +442,7 @@ impl ChannelManager {
                        },
                        hash_map::Entry::Vacant(entry) => { entry.insert(channel); }
                }
-
-               let mut events = self.pending_events.lock().unwrap();
-               events.push(events::Event::SendOpenChannel {
+               channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
                        node_id: their_network_key,
                        msg: res,
                });
@@ -488,25 +492,29 @@ impl ChannelManager {
        /// will be accepted on the given channel, and after additional timeout/the closing of all
        /// pending HTLCs, the channel will be closed on chain.
        ///
-       /// May generate a SendShutdown event on success, which should be relayed.
+       /// May generate a SendShutdown message event on success, which should be relayed.
        pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
-               let (mut res, node_id, chan_option) = {
+               let (mut failed_htlcs, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
                        match channel_state.by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let res = chan_entry.get_mut().get_shutdown()?;
+                                       let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?;
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                               node_id: chan_entry.get().get_their_node_id(),
+                                               msg: shutdown_msg
+                                       });
                                        if chan_entry.get().is_shutdown() {
                                                if let Some(short_id) = chan_entry.get().get_short_channel_id() {
                                                        channel_state.short_to_id.remove(&short_id);
                                                }
-                                               (res, chan_entry.get().get_their_node_id(), Some(chan_entry.remove_entry().1))
-                                       } else { (res, chan_entry.get().get_their_node_id(), None) }
+                                               (failed_htlcs, Some(chan_entry.remove_entry().1))
+                                       } else { (failed_htlcs, None) }
                                },
                                hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel"})
                        }
                };
-               for htlc_source in res.1.drain(..) {
+               for htlc_source in failed_htlcs.drain(..) {
                        // unknown_next_peer...I dunno who that is anymore....
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
                }
@@ -516,16 +524,12 @@ impl ChannelManager {
                        } else { None }
                } else { None };
 
-               let mut events = self.pending_events.lock().unwrap();
                if let Some(update) = chan_update {
-                       events.push(events::Event::BroadcastChannelUpdate {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                msg: update
                        });
                }
-               events.push(events::Event::SendShutdown {
-                       node_id,
-                       msg: res.0
-               });
 
                Ok(())
        }
@@ -565,9 +569,9 @@ impl ChannelManager {
                        }
                };
                self.finish_force_close_channel(chan.force_shutdown());
-               let mut events = self.pending_events.lock().unwrap();
                if let Ok(update) = self.get_channel_update(&chan) {
-                       events.push(events::Event::BroadcastChannelUpdate {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                msg: update
                        });
                }
@@ -594,9 +598,9 @@ impl ChannelManager {
                                };
                                mem::drop(channel_state_lock);
                                self.finish_force_close_channel(chan.force_shutdown());
-                               let mut events = self.pending_events.lock().unwrap();
                                if let Ok(update) = self.get_channel_update(&chan) {
-                                       events.push(events::Event::BroadcastChannelUpdate {
+                                       let mut channel_state = self.channel_state.lock().unwrap();
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                msg: update
                                        });
                                }
@@ -1096,7 +1100,7 @@ impl ChannelManager {
        /// payment_preimage tracking (which you should already be doing as they represent "proof of
        /// payment") and prevent double-sends yourself.
        ///
-       /// May generate a SendHTLCs event on success, which should be relayed.
+       /// May generate a SendHTLCs message event on success, which should be relayed.
        ///
        /// Raises APIError::RoutError when invalid route or forward parameter
        /// (cltv_delta, fee, node public key) is specified
@@ -1124,66 +1128,52 @@ impl ChannelManager {
                let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
                let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
 
-               let (first_hop_node_id, update_add, commitment_signed) = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-
-                       let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
-                               None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
-                               Some(id) => id.clone(),
-                       };
-
-                       let res = {
-                               let res = {
-                                       let chan = channel_state.by_id.get_mut(&id).unwrap();
-                                       if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
-                                               return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
-                                       }
-                                       if chan.is_awaiting_monitor_update() {
-                                               return Err(APIError::MonitorUpdateFailed);
-                                       }
-                                       if !chan.is_live() {
-                                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
-                                       }
-                                       chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                               route: route.clone(),
-                                               session_priv: session_priv.clone(),
-                                               first_hop_htlc_msat: htlc_msat,
-                                       }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
-                               };
-                               match res {
-                                       Some((update_add, commitment_signed, chan_monitor)) => {
-                                               if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                                       self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst);
-                                                       return Err(APIError::MonitorUpdateFailed);
-                                               }
-                                               Some((update_add, commitment_signed))
-                                       },
-                                       None => None,
-                               }
-                       };
+               let mut channel_state = self.channel_state.lock().unwrap();
 
-                       let first_hop_node_id = route.hops.first().unwrap().pubkey;
+               let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
+                       None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
+                       Some(id) => id.clone(),
+               };
 
-                       match res {
-                               Some((update_add, commitment_signed)) => {
-                                       (first_hop_node_id, update_add, commitment_signed)
-                               },
-                               None => return Ok(()),
+               let res = {
+                       let chan = channel_state.by_id.get_mut(&id).unwrap();
+                       if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
+                               return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
                        }
+                       if chan.is_awaiting_monitor_update() {
+                               return Err(APIError::MonitorUpdateFailed);
+                       }
+                       if !chan.is_live() {
+                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
+                       }
+                       chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+                               route: route.clone(),
+                               session_priv: session_priv.clone(),
+                               first_hop_htlc_msat: htlc_msat,
+                       }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
                };
+               match res {
+                       Some((update_add, commitment_signed, chan_monitor)) => {
+                               if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                       self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst);
+                                       return Err(APIError::MonitorUpdateFailed);
+                               }
 
-               let mut events = self.pending_events.lock().unwrap();
-               events.push(events::Event::UpdateHTLCs {
-                       node_id: first_hop_node_id,
-                       updates: msgs::CommitmentUpdate {
-                               update_add_htlcs: vec![update_add],
-                               update_fulfill_htlcs: Vec::new(),
-                               update_fail_htlcs: Vec::new(),
-                               update_fail_malformed_htlcs: Vec::new(),
-                               update_fee: None,
-                               commitment_signed,
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                       node_id: route.hops.first().unwrap().pubkey,
+                                       updates: msgs::CommitmentUpdate {
+                                               update_add_htlcs: vec![update_add],
+                                               update_fulfill_htlcs: Vec::new(),
+                                               update_fail_htlcs: Vec::new(),
+                                               update_fail_malformed_htlcs: Vec::new(),
+                                               update_fee: None,
+                                               commitment_signed,
+                                       },
+                               });
                        },
-               });
+                       None => {},
+               }
+
                Ok(())
        }
 
@@ -1194,15 +1184,6 @@ impl ChannelManager {
        /// May panic if the funding_txo is duplicative with some other channel (note that this should
        /// be trivially prevented by using unique funding transaction keys per-channel).
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) {
-               macro_rules! add_pending_event {
-                       ($event: expr) => {
-                               {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push($event);
-                               }
-                       }
-               }
-
                let (chan, msg, chan_monitor) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.remove(temporary_channel_id) {
@@ -1213,8 +1194,7 @@ impl ChannelManager {
                                                },
                                                Err(e) => {
                                                        log_error!(self, "Got bad signatures: {}!", e.err);
-                                                       mem::drop(channel_state);
-                                                       add_pending_event!(events::Event::HandleError {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                                node_id: chan.get_their_node_id(),
                                                                action: e.action,
                                                        });
@@ -1230,12 +1210,12 @@ impl ChannelManager {
                if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                        unimplemented!();
                }
-               add_pending_event!(events::Event::SendFundingCreated {
+
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
                        node_id: chan.get_their_node_id(),
                        msg: msg,
                });
-
-               let mut channel_state = self.channel_state.lock().unwrap();
                match channel_state.by_id.entry(chan.channel_id()) {
                        hash_map::Entry::Occupied(_) => {
                                panic!("Generated duplicate funding txid?");
@@ -1344,7 +1324,7 @@ impl ChannelManager {
                                                if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
                                                        unimplemented!();// but def dont push the event...
                                                }
-                                               new_events.push(events::Event::UpdateHTLCs {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                        node_id: forward_chan.get_their_node_id(),
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: add_htlc_msgs,
@@ -1407,19 +1387,20 @@ impl ChannelManager {
        /// to fail and take the channel_state lock for each iteration (as we take ownership and may
        /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to
        /// still-available channels.
-       fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard<ChannelHolder>, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) {
+       fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) {
                match source {
                        HTLCSource::OutboundRoute { .. } => {
-                               mem::drop(channel_state);
+                               mem::drop(channel_state_lock);
                                if let &HTLCFailReason::ErrorPacket { ref err } = &onion_error {
                                        let (channel_update, payment_retryable) = self.process_onion_failure(&source, err.data.clone());
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       if let Some(channel_update) = channel_update {
-                                               pending_events.push(events::Event::PaymentFailureNetworkUpdate {
-                                                       update: channel_update,
-                                               });
+                                       if let Some(update) = channel_update {
+                                               self.channel_state.lock().unwrap().pending_msg_events.push(
+                                                       events::MessageSendEvent::PaymentFailureNetworkUpdate {
+                                                               update,
+                                                       }
+                                               );
                                        }
-                                       pending_events.push(events::Event::PaymentFailed {
+                                       self.pending_events.lock().unwrap().push(events::Event::PaymentFailed {
                                                payment_hash: payment_hash.clone(),
                                                rejected_by_dest: !payment_retryable,
                                        });
@@ -1438,35 +1419,21 @@ impl ChannelManager {
                                        }
                                };
 
-                               let (node_id, fail_msgs) = {
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => return
-                                       };
+                               let channel_state = channel_state_lock.borrow_parts();
 
-                                       let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
-                                       match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
-                                               Ok(Some((msg, commitment_msg, chan_monitor))) => {
-                                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                                               unimplemented!();
-                                                       }
-                                                       (chan.get_their_node_id(), Some((msg, commitment_msg)))
-                                               },
-                                               Ok(None) => (chan.get_their_node_id(), None),
-                                               Err(_e) => {
-                                                       //TODO: Do something with e?
-                                                       return;
-                                               },
-                                       }
+                               let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
+                                       Some(chan_id) => chan_id.clone(),
+                                       None => return
                                };
 
-                               match fail_msgs {
-                                       Some((msg, commitment_msg)) => {
-                                               mem::drop(channel_state);
-
-                                               let mut pending_events = self.pending_events.lock().unwrap();
-                                               pending_events.push(events::Event::UpdateHTLCs {
-                                                       node_id,
+                               let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
+                               match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
+                                       Ok(Some((msg, commitment_msg, chan_monitor))) => {
+                                               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                                       unimplemented!();
+                                               }
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                       node_id: chan.get_their_node_id(),
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: Vec::new(),
                                                                update_fulfill_htlcs: Vec::new(),
@@ -1477,7 +1444,11 @@ impl ChannelManager {
                                                        },
                                                });
                                        },
-                                       None => {},
+                                       Ok(None) => {},
+                                       Err(_e) => {
+                                               //TODO: Do something with e?
+                                               return;
+                                       },
                                }
                        },
                }
@@ -1504,10 +1475,10 @@ impl ChannelManager {
                        true
                } else { false }
        }
-       fn claim_funds_internal(&self, mut channel_state: MutexGuard<ChannelHolder>, source: HTLCSource, payment_preimage: [u8; 32]) {
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_preimage: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { .. } => {
-                               mem::drop(channel_state);
+                               mem::drop(channel_state_lock);
                                let mut pending_events = self.pending_events.lock().unwrap();
                                pending_events.push(events::Event::PaymentSent {
                                        payment_preimage
@@ -1515,49 +1486,46 @@ impl ChannelManager {
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => {
                                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
-                               let (node_id, fulfill_msgs) = {
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel already hit the chain and that's
-                                                       // why its missing.
-                                                       return
-                                               }
-                                       };
+                               let channel_state = channel_state_lock.borrow_parts();
+
+                               let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
+                                       Some(chan_id) => chan_id.clone(),
+                                       None => {
+                                               // TODO: There is probably a channel manager somewhere that needs to
+                                               // learn the preimage as the channel already hit the chain and that's
+                                               // why its missing.
+                                               return
+                                       }
+                               };
 
-                                       let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
-                                       match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
-                                               Ok((msgs, Some(chan_monitor))) => {
+                               let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
+                               match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
+                                       Ok((msgs, monitor_option)) => {
+                                               if let Some(chan_monitor) = monitor_option {
                                                        if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                                                                unimplemented!();// but def dont push the event...
                                                        }
-                                                       (chan.get_their_node_id(), msgs)
-                                               },
-                                               Ok((msgs, None)) => (chan.get_their_node_id(), msgs),
-                                               Err(_e) => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel may be about to hit the chain.
-                                                       //TODO: Do something with e?
-                                                       return
-                                               },
-                                       }
-                               };
-
-                               mem::drop(channel_state);
-                               if let Some((msg, commitment_msg)) = fulfill_msgs {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::UpdateHTLCs {
-                                               node_id: node_id,
-                                               updates: msgs::CommitmentUpdate {
-                                                       update_add_htlcs: Vec::new(),
-                                                       update_fulfill_htlcs: vec![msg],
-                                                       update_fail_htlcs: Vec::new(),
-                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                       update_fee: None,
-                                                       commitment_signed: commitment_msg,
                                                }
-                                       });
+                                               if let Some((msg, commitment_signed)) = msgs {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                               node_id: chan.get_their_node_id(),
+                                                               updates: msgs::CommitmentUpdate {
+                                                                       update_add_htlcs: Vec::new(),
+                                                                       update_fulfill_htlcs: vec![msg],
+                                                                       update_fail_htlcs: Vec::new(),
+                                                                       update_fail_malformed_htlcs: Vec::new(),
+                                                                       update_fee: None,
+                                                                       commitment_signed,
+                                                               }
+                                                       });
+                                               }
+                                       },
+                                       Err(_e) => {
+                                               // TODO: There is probably a channel manager somewhere that needs to
+                                               // learn the preimage as the channel may be about to hit the chain.
+                                               //TODO: Do something with e?
+                                               return
+                                       },
                                }
                        },
                }
@@ -1572,7 +1540,6 @@ impl ChannelManager {
        /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
        /// operation.
        pub fn test_restore_channel_monitor(&self) {
-               let mut new_events = Vec::new();
                let mut close_results = Vec::new();
                let mut htlc_forwards = Vec::new();
                let mut htlc_failures = Vec::new();
@@ -1581,6 +1548,7 @@ impl ChannelManager {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_, channel| {
                                if channel.is_awaiting_monitor_update() {
                                        let chan_monitor = channel.channel_monitor();
@@ -1592,7 +1560,7 @@ impl ChannelManager {
                                                                }
                                                                close_results.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
-                                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -1609,7 +1577,7 @@ impl ChannelManager {
 
                                                macro_rules! handle_cs { () => {
                                                        if let Some(update) = commitment_update {
-                                                               new_events.push(events::Event::UpdateHTLCs {
+                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                                        node_id: channel.get_their_node_id(),
                                                                        updates: update,
                                                                });
@@ -1617,7 +1585,7 @@ impl ChannelManager {
                                                } }
                                                macro_rules! handle_raa { () => {
                                                        if let Some(revoke_and_ack) = raa {
-                                                               new_events.push(events::Event::SendRevokeAndACK {
+                                                               pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
                                                                        node_id: channel.get_their_node_id(),
                                                                        msg: revoke_and_ack,
                                                                });
@@ -1647,8 +1615,6 @@ impl ChannelManager {
                for res in close_results.drain(..) {
                        self.finish_force_close_channel(res);
                }
-
-               self.pending_events.lock().unwrap().append(&mut new_events);
        }
 
        fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<msgs::AcceptChannel, MsgHandleErrInternal> {
@@ -1802,8 +1768,8 @@ impl ChannelManager {
                }
                if let Some(chan) = chan_option {
                        if let Ok(update) = self.get_channel_update(&chan) {
-                               let mut events = self.pending_events.lock().unwrap();
-                               events.push(events::Event::BroadcastChannelUpdate {
+                               let mut channel_state = self.channel_state.lock().unwrap();
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -1842,8 +1808,8 @@ impl ChannelManager {
                }
                if let Some(chan) = chan_option {
                        if let Ok(update) = self.get_channel_update(&chan) {
-                               let mut events = self.pending_events.lock().unwrap();
-                               events.push(events::Event::BroadcastChannelUpdate {
+                               let mut channel_state = self.channel_state.lock().unwrap();
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -2223,42 +2189,43 @@ impl ChannelManager {
        }
 
        fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> {
-               let (chan_announcement, chan_update) = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       match channel_state.by_id.get_mut(&msg.channel_id) {
-                               Some(chan) => {
-                                       if chan.get_their_node_id() != *their_node_id {
-                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
-                                       }
-                                       if !chan.is_usable() {
-                                               return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)}));
-                                       }
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
 
-                                       let our_node_id = self.get_our_node_id();
-                                       let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone())
-                                               .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
+               match channel_state.by_id.get_mut(&msg.channel_id) {
+                       Some(chan) => {
+                               if chan.get_their_node_id() != *their_node_id {
+                                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
+                               }
+                               if !chan.is_usable() {
+                                       return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)}));
+                               }
 
-                                       let were_node_one = announcement.node_id_1 == our_node_id;
-                                       let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap();
-                                       let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id);
-                                       secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action);
-                                       secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action);
+                               let our_node_id = self.get_our_node_id();
+                               let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone())
+                                       .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
 
-                                       let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
+                               let were_node_one = announcement.node_id_1 == our_node_id;
+                               let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap();
+                               let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id);
+                               secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action);
+                               secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action);
 
-                                       (msgs::ChannelAnnouncement {
+                               let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
+
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
+                                       msg: msgs::ChannelAnnouncement {
                                                node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature },
                                                node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig },
                                                bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature },
                                                bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig },
                                                contents: announcement,
-                                       }, self.get_channel_update(chan).unwrap()) // can only fail if we're not in a ready state
-                               },
-                               None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
-                       }
-               };
-               let mut pending_events = self.pending_events.lock().unwrap();
-               pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update });
+                                       },
+                                       update_msg: self.get_channel_update(chan).unwrap(), // can only fail if we're not in a ready state
+                               });
+                       },
+                       None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
+               }
                Ok(())
        }
 
@@ -2292,7 +2259,9 @@ impl ChannelManager {
        /// Note: This API is likely to change!
        #[doc(hidden)]
        pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
-               let mut channel_state = self.channel_state.lock().unwrap();
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
+
                match channel_state.by_id.get_mut(&channel_id) {
                        None => return Err(APIError::APIMisuseError{err: "Failed to find corresponding channel"}),
                        Some(chan) => {
@@ -2309,8 +2278,7 @@ impl ChannelManager {
                                        if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                                                unimplemented!();
                                        }
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::UpdateHTLCs {
+                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                node_id: chan.get_their_node_id(),
                                                updates: msgs::CommitmentUpdate {
                                                        update_add_htlcs: Vec::new(),
@@ -2328,10 +2296,19 @@ impl ChannelManager {
        }
 }
 
+impl events::MessageSendEventsProvider for ChannelManager {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               let mut ret = Vec::new();
+               let mut channel_state = self.channel_state.lock().unwrap();
+               mem::swap(&mut ret, &mut channel_state.pending_msg_events);
+               ret
+       }
+}
+
 impl events::EventsProvider for ChannelManager {
        fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
                let mut ret = Vec::new();
+               let mut pending_events = self.pending_events.lock().unwrap();
                mem::swap(&mut ret, &mut *pending_events);
                ret
        }
@@ -2339,24 +2316,24 @@ impl events::EventsProvider for ChannelManager {
 
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_, channel| {
                                let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
                                if let Ok(Some(funding_locked)) = chan_res {
                                        let announcement_sigs = self.get_announcement_sigs(channel);
-                                       new_events.push(events::Event::SendFundingLocked {
+                                       pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
                                                node_id: channel.get_their_node_id(),
                                                msg: funding_locked,
                                                announcement_sigs: announcement_sigs
                                        });
                                        short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
                                } else if let Err(e) = chan_res {
-                                       new_events.push(events::Event::HandleError {
+                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_their_node_id(),
                                                action: e.action,
                                        });
@@ -2376,7 +2353,7 @@ impl ChainListener for ChannelManager {
                                                                // some kind of SPV attack, though we expect these to be dropped.
                                                                failed_channels.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
-                                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -2395,7 +2372,7 @@ impl ChainListener for ChannelManager {
                                        // hurt anything, but does make tests a bit simpler).
                                        failed_channels.last_mut().unwrap().0 = Vec::new();
                                        if let Ok(update) = self.get_channel_update(&channel) {
-                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2407,21 +2384,17 @@ impl ChainListener for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               let mut pending_events = self.pending_events.lock().unwrap();
-               for funding_locked in new_events.drain(..) {
-                       pending_events.push(funding_locked);
-               }
                self.latest_block_height.store(height as usize, Ordering::Release);
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_,  v| {
                                if v.block_disconnected(header) {
                                        if let Some(short_id) = v.get_short_channel_id() {
@@ -2429,7 +2402,7 @@ impl ChainListener for ChannelManager {
                                        }
                                        failed_channels.push(v.force_shutdown());
                                        if let Ok(update) = self.get_channel_update(&v) {
-                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2442,12 +2415,6 @@ impl ChainListener for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               if !new_events.is_empty() {
-                       let mut pending_events = self.pending_events.lock().unwrap();
-                       for funding_locked in new_events.drain(..) {
-                               pending_events.push(funding_locked);
-                       }
-               }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
        }
 }
@@ -2551,13 +2518,13 @@ impl ChannelMessageHandler for ChannelManager {
        }
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
-               let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
                let mut failed_payments = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
                        let short_to_id = channel_state.short_to_id;
+                       let pending_msg_events = channel_state.pending_msg_events;
                        if no_connection_possible {
                                channel_state.by_id.retain(|_, chan| {
                                        if chan.get_their_node_id() == *their_node_id {
@@ -2566,7 +2533,7 @@ impl ChannelMessageHandler for ChannelManager {
                                                }
                                                failed_channels.push(chan.force_shutdown());
                                                if let Ok(update) = self.get_channel_update(&chan) {
-                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                msg: update
                                                        });
                                                }
@@ -2598,12 +2565,6 @@ impl ChannelMessageHandler for ChannelManager {
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               if !new_events.is_empty() {
-                       let mut pending_events = self.pending_events.lock().unwrap();
-                       for event in new_events.drain(..) {
-                               pending_events.push(event);
-                       }
-               }
                for (chan_update, mut htlc_sources) in failed_payments {
                        for (htlc_source, payment_hash) in htlc_sources.drain(..) {
                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
@@ -2658,7 +2619,7 @@ mod tests {
        use ln::msgs;
        use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
        use util::test_utils;
-       use util::events::{Event, EventsProvider};
+       use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
        use util::errors::APIError;
        use util::logger::Logger;
        use util::ser::Writeable;
@@ -2863,6 +2824,7 @@ mod tests {
                fn drop(&mut self) {
                        if !::std::thread::panicking() {
                                // Check that we processed all pending events
+                               assert_eq!(self.node.get_and_clear_pending_msg_events().len(), 0);
                                assert_eq!(self.node.get_and_clear_pending_events().len(), 0);
                                assert_eq!(self.chan_monitor.added_monitors.lock().unwrap().len(), 0);
                        }
@@ -2882,10 +2844,10 @@ mod tests {
        fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> Transaction {
                node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42).unwrap();
 
-               let events_1 = node_a.node.get_and_clear_pending_events();
+               let events_1 = node_a.node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                let accept_chan = match events_1[0] {
-                       Event::SendOpenChannel { ref node_id, ref msg } => {
+                       MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                assert_eq!(*node_id, node_b.node.get_our_node_id());
                                node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), msg).unwrap()
                        },
@@ -2919,10 +2881,10 @@ mod tests {
                        _ => panic!("Unexpected event"),
                }
 
-               let events_3 = node_a.node.get_and_clear_pending_events();
+               let events_3 = node_a.node.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                let funding_signed = match events_3[0] {
-                       Event::SendFundingCreated { ref node_id, ref msg } => {
+                       MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                assert_eq!(*node_id, node_b.node.get_our_node_id());
                                let res = node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), msg).unwrap();
                                let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
@@ -2957,10 +2919,10 @@ mod tests {
 
        fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
                confirm_transaction(&node_b.chain_monitor, &tx, tx.version);
-               let events_5 = node_b.node.get_and_clear_pending_events();
+               let events_5 = node_b.node.get_and_clear_pending_msg_events();
                assert_eq!(events_5.len(), 1);
                match events_5[0] {
-                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
                                assert_eq!(*node_id, node_a.node.get_our_node_id());
                                assert!(announcement_sigs.is_none());
                                node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), msg).unwrap()
@@ -2971,10 +2933,10 @@ mod tests {
                let channel_id;
 
                confirm_transaction(&node_a.chain_monitor, &tx, tx.version);
-               let events_6 = node_a.node.get_and_clear_pending_events();
+               let events_6 = node_a.node.get_and_clear_pending_msg_events();
                assert_eq!(events_6.len(), 1);
                (match events_6[0] {
-                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
                                channel_id = msg.channel_id.clone();
                                assert_eq!(*node_id, node_b.node.get_our_node_id());
                                (msg.clone(), announcement_sigs.clone().unwrap())
@@ -2996,20 +2958,20 @@ mod tests {
                        bs_announcement_sigs
                };
 
-               let events_7 = node_b.node.get_and_clear_pending_events();
+               let events_7 = node_b.node.get_and_clear_pending_msg_events();
                assert_eq!(events_7.len(), 1);
                let (announcement, bs_update) = match events_7[0] {
-                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                (msg, update_msg)
                        },
                        _ => panic!("Unexpected event"),
                };
 
                node_a.node.handle_announcement_signatures(&node_b.node.get_our_node_id(), &bs_announcement_sigs).unwrap();
-               let events_8 = node_a.node.get_and_clear_pending_events();
+               let events_8 = node_a.node.get_and_clear_pending_msg_events();
                assert_eq!(events_8.len(), 1);
                let as_update = match events_8[0] {
-                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                assert!(*announcement == *msg);
                                update_msg
                        },
@@ -3052,10 +3014,10 @@ mod tests {
                let (tx_a, tx_b);
 
                node_a.close_channel(channel_id).unwrap();
-               let events_1 = node_a.get_and_clear_pending_events();
+               let events_1 = node_a.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                let shutdown_a = match events_1[0] {
-                       Event::SendShutdown { ref node_id, ref msg } => {
+                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                assert_eq!(node_id, &node_b.get_our_node_id());
                                msg.clone()
                        },
@@ -3091,19 +3053,19 @@ mod tests {
                assert_eq!(tx_a, tx_b);
                check_spends!(tx_a, funding_tx);
 
-               let events_2 = node_a.get_and_clear_pending_events();
+               let events_2 = node_a.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let as_update = match events_2[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
                };
 
-               let events_3 = node_b.get_and_clear_pending_events();
+               let events_3 = node_b.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                let bs_update = match events_3[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
@@ -3126,9 +3088,9 @@ mod tests {
                        SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed }
                }
 
-               fn from_event(event: Event) -> SendEvent {
+               fn from_event(event: MessageSendEvent) -> SendEvent {
                        match event {
-                               Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates),
+                               MessageSendEvent::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates),
                                _ => panic!("Unexpected event type!"),
                        }
                }
@@ -3158,6 +3120,7 @@ mod tests {
                                check_added_monitors!($node_b, 1);
                                if $fail_backwards {
                                        assert!($node_a.node.get_and_clear_pending_events().is_empty());
+                                       assert!($node_a.node.get_and_clear_pending_msg_events().is_empty());
                                }
                                assert!($node_a.node.handle_revoke_and_ack(&$node_b.node.get_our_node_id(), &bs_revoke_and_ack).unwrap().is_none());
                                {
@@ -3195,7 +3158,7 @@ mod tests {
                        origin_node.node.send_payment(route, our_payment_hash).unwrap();
                        check_added_monitors!(origin_node, 1);
 
-                       let mut events = origin_node.node.get_and_clear_pending_events();
+                       let mut events = origin_node.node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -3218,9 +3181,9 @@ mod tests {
                        node.node.channel_state.lock().unwrap().next_forward = Instant::now();
                        node.node.process_pending_htlc_forwards();
 
-                       let mut events_2 = node.node.get_and_clear_pending_events();
-                       assert_eq!(events_2.len(), 1);
                        if idx == expected_route.len() - 1 {
+                               let events_2 = node.node.get_and_clear_pending_events();
+                               assert_eq!(events_2.len(), 1);
                                match events_2[0] {
                                        Event::PaymentReceived { ref payment_hash, amt } => {
                                                assert_eq!(our_payment_hash, *payment_hash);
@@ -3229,6 +3192,8 @@ mod tests {
                                        _ => panic!("Unexpected event"),
                                }
                        } else {
+                               let mut events_2 = node.node.get_and_clear_pending_msg_events();
+                               assert_eq!(events_2.len(), 1);
                                check_added_monitors!(node, 1);
                                payment_event = SendEvent::from_event(events_2.remove(0));
                                assert_eq!(payment_event.msgs.len(), 1);
@@ -3267,11 +3232,11 @@ mod tests {
                                update_fulfill_dance!(node, prev_node, false);
                        }
 
-                       let events = node.node.get_and_clear_pending_events();
+                       let events = node.node.get_and_clear_pending_msg_events();
                        if !skip_last || idx != expected_route.len() - 1 {
                                assert_eq!(events.len(), 1);
                                match events[0] {
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                assert!(update_add_htlcs.is_empty());
                                                assert_eq!(update_fulfill_htlcs.len(), 1);
                                                assert!(update_fail_htlcs.is_empty());
@@ -3367,11 +3332,11 @@ mod tests {
                                update_fail_dance!(node, prev_node, skip_last && idx == expected_route.len() - 1);
                        }
 
-                       let events = node.node.get_and_clear_pending_events();
+                       let events = node.node.get_and_clear_pending_msg_events();
                        if !skip_last || idx != expected_route.len() - 1 {
                                assert_eq!(events.len(), 1);
                                match events[0] {
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                assert!(update_add_htlcs.is_empty());
                                                assert!(update_fulfill_htlcs.is_empty());
                                                assert_eq!(update_fail_htlcs.len(), 1);
@@ -3477,10 +3442,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3494,7 +3459,7 @@ mod tests {
                check_added_monitors!(nodes[1], 1);
 
                let payment_event = {
-                       let mut events_1 = nodes[1].node.get_and_clear_pending_events();
+                       let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_1.len(), 1);
                        SendEvent::from_event(events_1.remove(0))
                };
@@ -3573,10 +3538,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let update_msg = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => {
                                update_fee.as_ref()
                        },
                        _ => panic!("Unexpected event"),
@@ -3590,7 +3555,7 @@ mod tests {
                check_added_monitors!(nodes[1], 1);
 
                let payment_event = {
-                       let mut events_1 = nodes[1].node.get_and_clear_pending_events();
+                       let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_1.len(), 1);
                        SendEvent::from_event(events_1.remove(0))
                };
@@ -3647,10 +3612,10 @@ mod tests {
                nodes[0].node.update_fee(channel_id, initial_feerate + 20).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg_1, commitment_signed_1) = match events_0[0] { // (1)
-                       Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
                                (update_fee.as_ref().unwrap(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3665,6 +3630,7 @@ mod tests {
                // transaction:
                nodes[0].node.update_fee(channel_id, initial_feerate + 40).unwrap();
                assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                // Create the (3) update_fee message that nodes[0] will generate before it does...
                let mut update_msg_2 = msgs::UpdateFee {
@@ -3729,10 +3695,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3777,10 +3743,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3802,8 +3768,8 @@ mod tests {
                        assert_eq!(added_monitors.len(), 0);
                        added_monitors.clear();
                }
-               let events = nodes[0].node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 0);
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                // node[1] has nothing to do
 
                let resp_option = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &revoke_msg).unwrap();
@@ -3894,10 +3860,10 @@ mod tests {
                let feerate = get_feerate!(nodes[0]);
                nodes[0].node.update_fee(channel_id, feerate+20).unwrap();
 
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -3917,10 +3883,10 @@ mod tests {
 
                // Create and deliver (4)...
                nodes[0].node.update_fee(channel_id, feerate+30).unwrap();
-               let events_0 = nodes[0].node.get_and_clear_pending_events();
+               let events_0 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_0.len(), 1);
                let (update_msg, commitment_signed) = match events_0[0] {
-                               Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
+                               MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => {
                                (update_fee.as_ref(), commitment_signed)
                        },
                        _ => panic!("Unexpected event"),
@@ -4191,19 +4157,19 @@ mod tests {
        }
 
        fn get_announce_close_broadcast_events(nodes: &Vec<Node>, a: usize, b: usize) {
-               let events_1 = nodes[a].node.get_and_clear_pending_events();
+               let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                let as_update = match events_1[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
                };
 
-               let events_2 = nodes[b].node.get_and_clear_pending_events();
+               let events_2 = nodes[b].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let bs_update = match events_2[0] {
-                       Event::BroadcastChannelUpdate { ref msg } => {
+                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
@@ -4262,7 +4228,7 @@ mod tests {
 
                macro_rules! expect_forward {
                        ($node: expr) => {{
-                               let mut events = $node.node.get_and_clear_pending_events();
+                               let mut events = $node.node.get_and_clear_pending_msg_events();
                                assert_eq!(events.len(), 1);
                                check_added_monitors!($node, 1);
                                let payment_event = SendEvent::from_event(events.remove(0));
@@ -4345,7 +4311,7 @@ mod tests {
                        nodes[0].node.send_payment(route_1, our_payment_hash_1).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -4421,8 +4387,8 @@ mod tests {
                // this will also stuck in the holding cell
                nodes[0].node.send_payment(route_22, our_payment_hash_22).unwrap();
                check_added_monitors!(nodes[0], 0);
-               let events = nodes[0].node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 0);
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                // flush the pending htlc
                let (as_revoke_and_ack, as_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event_1.commitment_msg).unwrap();
@@ -4541,10 +4507,10 @@ mod tests {
                                        assert!($node.node.claim_funds($preimage));
                                        check_added_monitors!($node, 1);
 
-                                       let events = $node.node.get_and_clear_pending_events();
+                                       let events = $node.node.get_and_clear_pending_msg_events();
                                        assert_eq!(events.len(), 1);
                                        match events[0] {
-                                               Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => {
+                                               MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => {
                                                        assert!(update_add_htlcs.is_empty());
                                                        assert!(update_fail_htlcs.is_empty());
                                                        assert_eq!(*node_id, $prev_node.node.get_our_node_id());
@@ -4835,10 +4801,10 @@ mod tests {
                route_payment(&nodes[0], &[&nodes[1]], 10000000);
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id);
                {
-                       let events = nodes[0].node.get_and_clear_pending_events();
+                       let events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -4852,10 +4818,10 @@ mod tests {
                nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
 
                {
-                       let events = nodes[1].node.get_and_clear_pending_events();
+                       let events = nodes[1].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -4882,7 +4848,7 @@ mod tests {
                        nodes[0].node.send_payment(route, our_payment_hash).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -4900,7 +4866,7 @@ mod tests {
                nodes[1].node.channel_state.lock().unwrap().next_forward = Instant::now();
                nodes[1].node.process_pending_htlc_forwards();
 
-               let mut events_2 = nodes[1].node.get_and_clear_pending_events();
+               let mut events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                payment_event = SendEvent::from_event(events_2.remove(0));
                assert_eq!(payment_event.msgs.len(), 1);
@@ -4915,10 +4881,10 @@ mod tests {
                // transaction and ensure nodes[1] doesn't fail-backwards (this was originally a bug!).
 
                nodes[2].node.force_close_channel(&payment_event.commitment_msg.channel_id);
-               let events_3 = nodes[2].node.get_and_clear_pending_events();
+               let events_3 = nodes[2].node.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                match events_3[0] {
-                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                       MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                assert_eq!(flags & 0b10, 0b10);
                        },
                        _ => panic!("Unexpected event"),
@@ -4936,11 +4902,11 @@ mod tests {
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
 
-               let events_4 = nodes[1].node.get_and_clear_pending_events();
+               let events_4 = nodes[1].node.get_and_clear_pending_msg_events();
                // Note no UpdateHTLCs event here from nodes[1] to nodes[0]!
                assert_eq!(events_4.len(), 1);
                match events_4[0] {
-                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                       MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                assert_eq!(flags & 0b10, 0b10);
                        },
                        _ => panic!("Unexpected event"),
@@ -4985,10 +4951,10 @@ mod tests {
                        nodes[0].node.block_disconnected(&headers.pop().unwrap());
                }
                {
-                       let events = nodes[0].node.get_and_clear_pending_events();
+                       let events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
                                        assert_eq!(flags & 0b10, 0b10);
                                },
                                _ => panic!("Unexpected event"),
@@ -5196,7 +5162,7 @@ mod tests {
                        nodes[0].node.send_payment(route.clone(), payment_hash_1).unwrap();
                        check_added_monitors!(nodes[0], 1);
 
-                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events.len(), 1);
                        SendEvent::from_event(events.remove(0))
                };
@@ -5273,10 +5239,10 @@ mod tests {
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
 
-               let events_3 = nodes[1].node.get_and_clear_pending_events();
+               let events_3 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_3.len(), 1);
                let (update_fulfill_htlc, commitment_signed) = match events_3[0] {
-                       Event::UpdateHTLCs { ref node_id, ref updates } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(updates.update_add_htlcs.is_empty());
                                assert!(updates.update_fail_htlcs.is_empty());
@@ -5384,10 +5350,10 @@ mod tests {
                nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
                confirm_transaction(&nodes[0].chain_monitor, &tx, tx.version);
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
                                assert_eq!(*node_id, nodes[1].node.get_our_node_id());
                                assert!(announcement_sigs.is_none());
                        },
@@ -5395,10 +5361,10 @@ mod tests {
                }
 
                confirm_transaction(&nodes[1].chain_monitor, &tx, tx.version);
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                match events_2[0] {
-                       Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
+                       MessageSendEvent::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(announcement_sigs.is_none());
                        },
@@ -5434,20 +5400,20 @@ mod tests {
                nodes[0].node.send_payment(route.clone(), payment_hash_2).unwrap();
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::UpdateHTLCs { .. } => {},
+                       MessageSendEvent::UpdateHTLCs { .. } => {},
                        _ => panic!("Unexpected event"),
                }
 
                assert!(nodes[1].node.claim_funds(payment_preimage_1));
                check_added_monitors!(nodes[1], 1);
 
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                match events_2[0] {
-                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(update_add_htlcs.is_empty());
                                assert_eq!(update_fulfill_htlcs.len(), 1);
@@ -5566,10 +5532,10 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_1.len(), 1);
                match events_1[0] {
-                       Event::BroadcastChannelUpdate { .. } => {},
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("Unexpected event"),
                };
 
@@ -5592,8 +5558,8 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_1.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                if disconnect {
@@ -5606,7 +5572,7 @@ mod tests {
                nodes[0].node.test_restore_channel_monitor();
                check_added_monitors!(nodes[0], 1);
 
-               let mut events_2 = nodes[0].node.get_and_clear_pending_events();
+               let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let payment_event = SendEvent::from_event(events_2.pop().unwrap());
                assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
@@ -5633,8 +5599,8 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_4 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_4.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                if disconnect {
@@ -5648,10 +5614,10 @@ mod tests {
                nodes[0].node.test_restore_channel_monitor();
                check_added_monitors!(nodes[0], 1);
 
-               let events_5 = nodes[0].node.get_and_clear_pending_events();
+               let events_5 = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events_5.len(), 1);
                match events_5[0] {
-                       Event::BroadcastChannelUpdate { .. } => {},
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("Unexpected event"),
                }
 
@@ -5700,18 +5666,18 @@ mod tests {
                if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); }
                check_added_monitors!(nodes[0], 1);
 
-               let events_1 = nodes[0].node.get_and_clear_pending_events();
-               assert!(events_1.is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
                assert_eq!(nodes[0].node.list_channels().len(), 1);
 
                // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1]
                // but nodes[0] won't respond since it is frozen.
                assert!(nodes[1].node.claim_funds(payment_preimage_1));
                check_added_monitors!(nodes[1], 1);
-               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
                assert_eq!(events_2.len(), 1);
                let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] {
-                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                assert_eq!(*node_id, nodes[0].node.get_our_node_id());
                                assert!(update_add_htlcs.is_empty());
                                assert_eq!(update_fulfill_htlcs.len(), 1);
@@ -5769,8 +5735,8 @@ mod tests {
                } } }
 
                let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 {
-                       let events_4 = nodes[0].node.get_and_clear_pending_events();
-                       assert!(events_4.is_empty());
+                       assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+                       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
                        let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
                        assert_eq!(reestablish_1.len(), 1);
@@ -5833,10 +5799,10 @@ mod tests {
 
                        (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap())
                } else {
-                       let mut events_4 = nodes[0].node.get_and_clear_pending_events();
+                       let mut events_4 = nodes[0].node.get_and_clear_pending_msg_events();
                        assert_eq!(events_4.len(), 2);
                        (SendEvent::from_event(events_4.remove(0)), match events_4[0] {
-                               Event::SendRevokeAndACK { ref node_id, ref msg } => {
+                               MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
                                        assert_eq!(*node_id, nodes[1].node.get_our_node_id());
                                        msg.clone()
                                },
index b5db51cf2bb4a200c58e6e03c0bf52a65626b54c..29ff1a8760539ea6cffbe235a36118bb837b9360 100644 (file)
@@ -521,7 +521,7 @@ pub enum RAACommitmentOrder {
 ///
 /// Messages MAY be called in parallel when they originate from different their_node_ids, however
 /// they MUST NOT be called in parallel when the two calls have the same their_node_id.
-pub trait ChannelMessageHandler : events::EventsProvider + Send + Sync {
+pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Sync {
        //Channel init:
        /// Handle an incoming open_channel message from the given peer.
        fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &OpenChannel) -> Result<AcceptChannel, HandleError>;
index 82784efe5fc037c97cb28da923b7f00e235e993e..631e48aa1eb5bcd8d855aff83c6c5fd15b4733b4 100644 (file)
@@ -12,13 +12,13 @@ use ln::msgs;
 use util::ser::{Writeable, Writer, Readable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use util::byte_utils;
-use util::events::{EventsProvider,Event};
+use util::events::{MessageSendEvent};
 use util::logger::Logger;
 
 use std::collections::{HashMap,hash_map,LinkedList};
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
-use std::{cmp,error,mem,hash,fmt};
+use std::{cmp,error,hash,fmt};
 
 /// Provides references to trait impls which handle different types of messages.
 pub struct MessageHandler {
@@ -127,7 +127,6 @@ impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
 pub struct PeerManager<Descriptor: SocketDescriptor> {
        message_handler: MessageHandler,
        peers: Mutex<PeerHolder<Descriptor>>,
-       pending_events: Mutex<Vec<Event>>,
        our_node_secret: SecretKey,
        initial_syncs_sent: AtomicUsize,
        logger: Arc<Logger>,
@@ -164,7 +163,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                PeerManager {
                        message_handler: message_handler,
                        peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
-                       pending_events: Mutex::new(Vec::new()),
                        our_node_secret: our_node_secret,
                        initial_syncs_sent: AtomicUsize::new(0),
                        logger,
@@ -757,13 +755,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        /// Checks for any events generated by our handlers and processes them. May be needed after eg
        /// calls to ChannelManager::process_pending_htlc_forward.
        pub fn process_events(&self) {
-               let mut upstream_events = Vec::new();
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
                        // buffer by doing things like announcing channels on another node. We should be willing to
                        // drop optional-ish messages when send buffers get full!
 
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events();
+                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
                        let mut peers = self.peers.lock().unwrap();
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
@@ -790,15 +787,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                        }
                                }
                                match event {
-                                       Event::FundingGenerationReady {..} => { /* Hand upstream */ },
-                                       Event::FundingBroadcastSafe {..} => { /* Hand upstream */ },
-                                       Event::PaymentReceived {..} => { /* Hand upstream */ },
-                                       Event::PaymentSent {..} => { /* Hand upstream */ },
-                                       Event::PaymentFailed {..} => { /* Hand upstream */ },
-                                       Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ },
-                                       Event::SpendableOutputs { .. } => { /* Hand upstream */ },
-
-                                       Event::SendOpenChannel { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
@@ -807,9 +796,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingCreated { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
@@ -820,9 +808,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
+                                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
                                                log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}",
                                                                log_pubkey!(node_id),
                                                                if announcement_sigs.is_some() { " with announcement sigs" } else { "" },
@@ -836,9 +823,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        &None => {},
                                                }
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
                                                                log_pubkey!(node_id),
                                                                update_add_htlcs.len(),
@@ -865,9 +851,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                }
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendRevokeAndACK { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
@@ -876,9 +861,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendShutdown { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
@@ -887,9 +871,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 256);
@@ -912,9 +895,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                                log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 258);
@@ -927,13 +909,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::PaymentFailureNetworkUpdate { ref update } => {
+                                       MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
                                                self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
-                                               continue;
                                        },
-                                       Event::HandleError { ref node_id, ref action } => {
+                                       MessageSendEvent::HandleError { ref node_id, ref action } => {
                                                if let Some(ref action) = *action {
                                                        match *action {
                                                                msgs::ErrorAction::DisconnectPeer { ref msg } => {
@@ -955,9 +935,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                self.message_handler.chan_handler.peer_disconnected(&node_id, false);
                                                                        }
                                                                },
-                                                               msgs::ErrorAction::IgnoreError => {
-                                                                       continue;
-                                                               },
+                                                               msgs::ErrorAction::IgnoreError => {},
                                                                msgs::ErrorAction::SendErrorMessage { ref msg } => {
                                                                        log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
@@ -972,18 +950,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                } else {
                                                        log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id));
                                                }
-                                               continue;
                                        }
                                }
-
-                               upstream_events.push(event);
                        }
                }
-
-               let mut pending_events = self.pending_events.lock().unwrap();
-               for event in upstream_events.drain(..) {
-                       pending_events.push(event);
-               }
        }
 
        /// Indicates that the given socket descriptor's connection is now closed.
@@ -1014,15 +984,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        }
 }
 
-impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
-               let mut ret = Vec::new();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
-       }
-}
-
 #[cfg(test)]
 mod tests {
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@@ -1094,7 +1055,7 @@ mod tests {
                let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
 
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError {
+               chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
                        node_id: their_id,
                        action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }),
                });
index 3a077a4b5ba9caad01086b23bad35a404217a57f..261ee57ccc08fca9ca413416d6317ad5d150f9fc 100644 (file)
@@ -24,7 +24,6 @@ use std::time::Instant;
 
 /// An Event which you should probably take some action in response to.
 pub enum Event {
-       // Events a user will probably have to handle
        /// Used to indicate that the client should generate a funding transaction with the given
        /// parameters and then call ChannelManager::funding_transaction_generated.
        /// Generated in ChannelManager message handling.
@@ -97,13 +96,14 @@ pub enum Event {
                /// The outputs which you should store as spendable by you.
                outputs: Vec<SpendableOutputDescriptor>,
        },
+}
 
-       // Events indicating the network loop should send a message to a peer:
-       // TODO: Move these into a separate struct and make a top-level enum
+/// An event generated by ChannelManager which indicates a message should be sent to a peer (or
+/// broadcast to most peers).
+/// These events are handled by PeerManager::process_events if you are using a PeerManager.
+pub enum MessageSendEvent {
        /// Used to indicate that we've initialted a channel open and should send the open_channel
        /// message provided to the given peer.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        SendOpenChannel {
                /// The node_id of the node which should receive this message
                node_id: PublicKey,
@@ -111,8 +111,6 @@ pub enum Event {
                msg: msgs::OpenChannel,
        },
        /// Used to indicate that a funding_created message should be sent to the peer with the given node_id.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        SendFundingCreated {
                /// The node_id of the node which should receive this message
                node_id: PublicKey,
@@ -120,8 +118,6 @@ pub enum Event {
                msg: msgs::FundingCreated,
        },
        /// Used to indicate that a funding_locked message should be sent to the peer with the given node_id.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        SendFundingLocked {
                /// The node_id of the node which should receive these message(s)
                node_id: PublicKey,
@@ -132,8 +128,6 @@ pub enum Event {
        },
        /// Used to indicate that a series of HTLC update messages, as well as a commitment_signed
        /// message should be sent to the peer with the given node_id.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        UpdateHTLCs {
                /// The node_id of the node which should receive these message(s)
                node_id: PublicKey,
@@ -141,8 +135,6 @@ pub enum Event {
                updates: msgs::CommitmentUpdate,
        },
        /// Used to indicate that a revoke_and_ack message should be sent to the peer with the given node_id.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        SendRevokeAndACK {
                /// The node_id of the node which should receive this message
                node_id: PublicKey,
@@ -150,8 +142,6 @@ pub enum Event {
                msg: msgs::RevokeAndACK,
        },
        /// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        SendShutdown {
                /// The node_id of the node which should receive this message
                node_id: PublicKey,
@@ -160,8 +150,6 @@ pub enum Event {
        },
        /// Used to indicate that a channel_announcement and channel_update should be broadcast to all
        /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        BroadcastChannelAnnouncement {
                /// The channel_announcement which should be sent.
                msg: msgs::ChannelAnnouncement,
@@ -169,17 +157,11 @@ pub enum Event {
                update_msg: msgs::ChannelUpdate,
        },
        /// Used to indicate that a channel_update should be broadcast to all peers.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        BroadcastChannelUpdate {
                /// The channel_update which should be sent.
                msg: msgs::ChannelUpdate,
        },
-
-       //Error handling
        /// Broadcast an error downstream to be handled
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        HandleError {
                /// The node_id of the node which should receive this message
                node_id: PublicKey,
@@ -188,14 +170,19 @@ pub enum Event {
        },
        /// When a payment fails we may receive updates back from the hop where it failed. In such
        /// cases this event is generated so that we can inform the router of this information.
-       ///
-       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
        PaymentFailureNetworkUpdate {
                /// The channel/node update which should be sent to router
                update: msgs::HTLCFailChannelUpdate,
        }
 }
 
+/// A trait indicating an object may generate message send events
+pub trait MessageSendEventsProvider {
+       /// Gets the list of pending events which were generated by previous actions, clearing the list
+       /// in the process.
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
+}
+
 /// A trait indicating an object may generate events
 pub trait EventsProvider {
        /// Gets the list of pending events which were generated by previous actions, clearing the list
index 2577bc9f9badae0f6bd2acbb89f2011424cf1e4e..31a19776a2c2f6a00c83a90ea207054dcb597c33 100644 (file)
@@ -74,7 +74,7 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {
 }
 
 pub struct TestChannelMessageHandler {
-       pub pending_events: Mutex<Vec<events::Event>>,
+       pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
 }
 
 impl TestChannelMessageHandler {
@@ -141,8 +141,8 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
        fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
 }
 
-impl events::EventsProvider for TestChannelMessageHandler {
-       fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+impl events::MessageSendEventsProvider for TestChannelMessageHandler {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
                let mut pending_events = self.pending_events.lock().unwrap();
                let mut ret = Vec::new();
                mem::swap(&mut ret, &mut *pending_events);