Merge pull request #520 from TheBlueMatt/2020-02-events-in-monitors
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Fri, 28 Feb 2020 20:10:38 +0000 (20:10 +0000)
committerGitHub <noreply@github.com>
Fri, 28 Feb 2020 20:10:38 +0000 (20:10 +0000)
Move events into ChannelMonitor from ManyChannelMonitor

lightning/src/chain/keysinterface.rs
lightning/src/ln/channelmonitor.rs
lightning/src/util/events.rs
lightning/src/util/ser.rs

index e2e4403b528997e7a4ce56fb2a4591af85631efa..544df015a8f48b404c390a63a909ec7aca3746ed 100644 (file)
@@ -88,6 +88,57 @@ pub enum SpendableOutputDescriptor {
        }
 }
 
+impl Writeable for SpendableOutputDescriptor {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &SpendableOutputDescriptor::StaticOutput { ref outpoint, ref output } => {
+                               0u8.write(writer)?;
+                               outpoint.write(writer)?;
+                               output.write(writer)?;
+                       },
+                       &SpendableOutputDescriptor::DynamicOutputP2WSH { ref outpoint, ref key, ref witness_script, ref to_self_delay, ref output } => {
+                               1u8.write(writer)?;
+                               outpoint.write(writer)?;
+                               key.write(writer)?;
+                               witness_script.write(writer)?;
+                               to_self_delay.write(writer)?;
+                               output.write(writer)?;
+                       },
+                       &SpendableOutputDescriptor::DynamicOutputP2WPKH { ref outpoint, ref key, ref output } => {
+                               2u8.write(writer)?;
+                               outpoint.write(writer)?;
+                               key.write(writer)?;
+                               output.write(writer)?;
+                       },
+               }
+               Ok(())
+       }
+}
+
+impl<R: ::std::io::Read> Readable<R> for SpendableOutputDescriptor {
+       fn read(reader: &mut R) -> Result<Self, DecodeError> {
+               match Readable::read(reader)? {
+                       0u8 => Ok(SpendableOutputDescriptor::StaticOutput {
+                               outpoint: Readable::read(reader)?,
+                               output: Readable::read(reader)?,
+                       }),
+                       1u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WSH {
+                               outpoint: Readable::read(reader)?,
+                               key: Readable::read(reader)?,
+                               witness_script: Readable::read(reader)?,
+                               to_self_delay: Readable::read(reader)?,
+                               output: Readable::read(reader)?,
+                       }),
+                       2u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WPKH {
+                               outpoint: Readable::read(reader)?,
+                               key: Readable::read(reader)?,
+                               output: Readable::read(reader)?,
+                       }),
+                       _ => Err(DecodeError::InvalidValue),
+               }
+       }
+}
+
 /// A trait to describe an object which can get user secrets and key material.
 pub trait KeysInterface: Send + Sync {
        /// A type which implements ChannelKeys which will be returned by get_channel_keys.
index 7e83c3f12219d1dcb930eeec4a1f5e0917a8b109..1bc8c76b49e82765f73e550c41e17c3243a027fc 100644 (file)
@@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter
 use chain::transaction::OutPoint;
 use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
 use util::logger::Logger;
-use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48};
+use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48};
 use util::{byte_utils, events};
 
 use std::collections::{HashMap, hash_map, HashSet};
@@ -222,7 +222,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
        monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
        chain_monitor: Arc<ChainWatchInterface>,
        broadcaster: T,
-       pending_events: Mutex<Vec<events::Event>>,
        logger: Arc<Logger>,
        fee_estimator: F
 }
@@ -234,16 +233,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
 {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
                let block_hash = header.bitcoin_hash();
-               let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
                {
                        let mut monitors = self.monitors.lock().unwrap();
                        for monitor in monitors.values_mut() {
-                               let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
-                               if spendable_outputs.len() > 0 {
-                                       new_events.push(events::Event::SpendableOutputs {
-                                               outputs: spendable_outputs,
-                                       });
-                               }
+                               let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
 
                                for (ref txid, ref outputs) in txn_outputs {
                                        for (idx, output) in outputs.iter().enumerate() {
@@ -252,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
                                }
                        }
                }
-               let mut pending_events = self.pending_events.lock().unwrap();
-               pending_events.append(&mut new_events);
        }
 
        fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
@@ -276,7 +267,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
                        monitors: Mutex::new(HashMap::new()),
                        chain_monitor,
                        broadcaster,
-                       pending_events: Mutex::new(Vec::new()),
                        logger,
                        fee_estimator: feeest,
                };
@@ -362,10 +352,11 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
              F::Target: FeeEstimator
 {
        fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
-               let mut ret = Vec::new();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
+               let mut pending_events = Vec::new();
+               for chan in self.monitors.lock().unwrap().values_mut() {
+                       pending_events.append(&mut chan.get_and_clear_pending_events());
+               }
+               pending_events
        }
 }
 
@@ -792,6 +783,11 @@ impl<R: ::std::io::Read> Readable<R> for ChannelMonitorUpdateStep {
 ///
 /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
 /// information and are actively monitoring the chain.
+///
+/// Pending Events or updated HTLCs which have not yet been read out by
+/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
+/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
+/// gotten are fully handled before re-serializing the new state.
 pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
        latest_update_id: u64,
        commitment_transaction_number_obscure_factor: u64,
@@ -835,6 +831,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
        payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
 
        pending_htlcs_updated: Vec<HTLCUpdate>,
+       pending_events: Vec<events::Event>,
 
        destination_script: Script,
        // Thanks to data loss protection, we may be able to claim our non-htlc funds
@@ -948,6 +945,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
                        self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
                        self.payment_preimages != other.payment_preimages ||
                        self.pending_htlcs_updated != other.pending_htlcs_updated ||
+                       self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
                        self.destination_script != other.destination_script ||
                        self.to_remote_rescue != other.to_remote_rescue ||
                        self.pending_claim_requests != other.pending_claim_requests ||
@@ -1135,6 +1133,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
                        data.write(writer)?;
                }
 
+               writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
+               for event in self.pending_events.iter() {
+                       event.write(writer)?;
+               }
+
                self.last_block_hash.write(writer)?;
                self.destination_script.write(writer)?;
                if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -1267,6 +1270,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
 
                        payment_preimages: HashMap::new(),
                        pending_htlcs_updated: Vec::new(),
+                       pending_events: Vec::new(),
 
                        destination_script: destination_script.clone(),
                        to_remote_rescue: None,
@@ -1560,6 +1564,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                ret
        }
 
+       /// Gets the list of pending events which were generated by previous actions, clearing the list
+       /// in the process.
+       ///
+       /// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to
+       /// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do
+       /// no internal locking in ChannelMonitors.
+       pub fn get_and_clear_pending_events(&mut self) -> Vec<events::Event> {
+               let mut ret = Vec::new();
+               mem::swap(&mut ret, &mut self.pending_events);
+               ret
+       }
+
        /// Can only fail if idx is < get_min_seen_secret
        pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
                self.commitment_secrets.get_secret(idx)
@@ -2534,7 +2550,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
        /// Eventually this should be pub and, roughly, implement ChainListener, however this requires
        /// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
        /// on-chain.
-       fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>)
+       fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec<TxOut>)>
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator
        {
@@ -2767,7 +2783,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                for &(ref txid, ref output_scripts) in watch_outputs.iter() {
                        self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
                }
-               (watch_outputs, spendable_outputs)
+
+               if spendable_outputs.len() > 0 {
+                       self.pending_events.push(events::Event::SpendableOutputs {
+                               outputs: spendable_outputs,
+                       });
+               }
+
+               watch_outputs
        }
 
        fn block_disconnected<B: Deref, F: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)
@@ -3369,6 +3392,14 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
                        pending_htlcs_updated.push(Readable::read(reader)?);
                }
 
+               let pending_events_len: u64 = Readable::read(reader)?;
+               let mut pending_events = Vec::with_capacity(cmp::min(pending_events_len as usize, MAX_ALLOC_SIZE / mem::size_of::<events::Event>()));
+               for _ in 0..pending_events_len {
+                       if let Some(event) = MaybeReadable::read(reader)? {
+                               pending_events.push(event);
+                       }
+               }
+
                let last_block_hash: Sha256dHash = Readable::read(reader)?;
                let destination_script = Readable::read(reader)?;
                let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3471,6 +3502,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
 
                        payment_preimages,
                        pending_htlcs_updated,
+                       pending_events,
 
                        destination_script,
                        to_remote_rescue,
index eda6fc7ee4eab89068a0b7f7a5848c5a3d2e2810..d165f200adbe5616fd914e4fb647097fd40035a3 100644 (file)
@@ -4,18 +4,12 @@
 //! Because we don't have a built-in runtime, it's up to the client to call events at a time in the
 //! future, as well as generate and broadcast funding transactions handle payment preimages and a
 //! few other things.
-//!
-//! Note that many events are handled for you by PeerHandler, so in the common design of having a
-//! PeerManager which marshalls messages to ChannelManager and Router you only need to call
-//! process_events on the PeerHandler and then get_and_clear_pending_events and handle the events
-//! that bubble up to the surface. If, however, you do not have a PeerHandler managing a
-//! ChannelManager you need to handle all of the events which may be generated.
-//TODO: We need better separation of event types ^
 
 use ln::msgs;
 use ln::channelmanager::{PaymentPreimage, PaymentHash};
 use chain::transaction::OutPoint;
 use chain::keysinterface::SpendableOutputDescriptor;
+use util::ser::{Writeable, Writer, MaybeReadable, Readable};
 
 use bitcoin::blockdata::script::Script;
 
@@ -24,6 +18,10 @@ use secp256k1::key::PublicKey;
 use std::time::Duration;
 
 /// An Event which you should probably take some action in response to.
+///
+/// Note that while Writeable and Readable are implemented for Event, you probably shouldn't use
+/// them directly as they don't round-trip exactly (for example FundingGenerationReady is never
+/// written as it makes no sense to respond to it after reconnecting to peers).
 pub enum Event {
        /// Used to indicate that the client should generate a funding transaction with the given
        /// parameters and then call ChannelManager::funding_transaction_generated.
@@ -108,6 +106,91 @@ pub enum Event {
        },
 }
 
+impl Writeable for Event {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               match self {
+                       &Event::FundingGenerationReady { .. } => {
+                               0u8.write(writer)?;
+                               // We never write out FundingGenerationReady events as, upon disconnection, peers
+                               // drop any channels which have not yet exchanged funding_signed.
+                       },
+                       &Event::FundingBroadcastSafe { ref funding_txo, ref user_channel_id } => {
+                               1u8.write(writer)?;
+                               funding_txo.write(writer)?;
+                               user_channel_id.write(writer)?;
+                       },
+                       &Event::PaymentReceived { ref payment_hash, ref amt } => {
+                               2u8.write(writer)?;
+                               payment_hash.write(writer)?;
+                               amt.write(writer)?;
+                       },
+                       &Event::PaymentSent { ref payment_preimage } => {
+                               3u8.write(writer)?;
+                               payment_preimage.write(writer)?;
+                       },
+                       &Event::PaymentFailed { ref payment_hash, ref rejected_by_dest,
+                               #[cfg(test)]
+                               ref error_code,
+                       } => {
+                               4u8.write(writer)?;
+                               payment_hash.write(writer)?;
+                               rejected_by_dest.write(writer)?;
+                               #[cfg(test)]
+                               error_code.write(writer)?;
+                       },
+                       &Event::PendingHTLCsForwardable { time_forwardable: _ } => {
+                               5u8.write(writer)?;
+                               // We don't write the time_fordwardable out at all, as we presume when the user
+                               // deserializes us at least that much time has elapsed.
+                       },
+                       &Event::SpendableOutputs { ref outputs } => {
+                               6u8.write(writer)?;
+                               (outputs.len() as u64).write(writer)?;
+                               for output in outputs.iter() {
+                                       output.write(writer)?;
+                               }
+                       },
+               }
+               Ok(())
+       }
+}
+impl<R: ::std::io::Read> MaybeReadable<R> for Event {
+       fn read(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
+               match Readable::read(reader)? {
+                       0u8 => Ok(None),
+                       1u8 => Ok(Some(Event::FundingBroadcastSafe {
+                                       funding_txo: Readable::read(reader)?,
+                                       user_channel_id: Readable::read(reader)?,
+                               })),
+                       2u8 => Ok(Some(Event::PaymentReceived {
+                                       payment_hash: Readable::read(reader)?,
+                                       amt: Readable::read(reader)?,
+                               })),
+                       3u8 => Ok(Some(Event::PaymentSent {
+                                       payment_preimage: Readable::read(reader)?,
+                               })),
+                       4u8 => Ok(Some(Event::PaymentFailed {
+                                       payment_hash: Readable::read(reader)?,
+                                       rejected_by_dest: Readable::read(reader)?,
+                                       #[cfg(test)]
+                                       error_code: Readable::read(reader)?,
+                               })),
+                       5u8 => Ok(Some(Event::PendingHTLCsForwardable {
+                                       time_forwardable: Duration::from_secs(0)
+                               })),
+                       6u8 => {
+                               let outputs_len: u64 = Readable::read(reader)?;
+                               let mut outputs = Vec::new();
+                               for _ in 0..outputs_len {
+                                       outputs.push(Readable::read(reader)?);
+                               }
+                               Ok(Some(Event::SpendableOutputs { outputs }))
+                       },
+                       _ => Err(msgs::DecodeError::InvalidValue)
+               }
+       }
+}
+
 /// An event generated by ChannelManager which indicates a message should be sent to a peer (or
 /// broadcast to most peers).
 /// These events are handled by PeerManager::process_events if you are using a PeerManager.
index 44fdd1c0cf4d3097d27e7abebb83e0a397f0ac86..96936fe95416fd06a1061e323dbbe1952cb09c09 100644 (file)
@@ -11,7 +11,7 @@ use std::cmp;
 use secp256k1::Signature;
 use secp256k1::key::{PublicKey, SecretKey};
 use bitcoin::blockdata::script::Script;
-use bitcoin::blockdata::transaction::{OutPoint, Transaction};
+use bitcoin::blockdata::transaction::{OutPoint, Transaction, TxOut};
 use bitcoin::consensus;
 use bitcoin::consensus::Encodable;
 use bitcoin_hashes::sha256d::Hash as Sha256dHash;
@@ -191,6 +191,15 @@ pub trait ReadableArgs<R, P>
        fn read(reader: &mut R, params: P) -> Result<Self, DecodeError>;
 }
 
+/// A trait that various rust-lightning types implement allowing them to (maybe) be read in from a Read
+pub trait MaybeReadable<R>
+       where Self: Sized,
+             R: Read
+{
+       /// Reads a Self in from the given Read
+       fn read(reader: &mut R) -> Result<Option<Self>, DecodeError>;
+}
+
 pub(crate) struct U48(pub u64);
 impl Writeable for U48 {
        #[inline]
@@ -627,26 +636,32 @@ impl<R: Read> Readable<R> for OutPoint {
        }
 }
 
-impl Writeable for Transaction {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self.consensus_encode(WriterWriteAdaptor(writer)) {
-                       Ok(_) => Ok(()),
-                       Err(consensus::encode::Error::Io(e)) => Err(e),
-                       Err(_) => panic!("We shouldn't get a consensus::encode::Error unless our Write generated an std::io::Error"),
+macro_rules! impl_consensus_ser {
+       ($bitcoin_type: ty) => {
+               impl Writeable for $bitcoin_type {
+                       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+                               match self.consensus_encode(WriterWriteAdaptor(writer)) {
+                                       Ok(_) => Ok(()),
+                                       Err(consensus::encode::Error::Io(e)) => Err(e),
+                                       Err(_) => panic!("We shouldn't get a consensus::encode::Error unless our Write generated an std::io::Error"),
+                               }
+                       }
                }
-       }
-}
 
-impl<R: Read> Readable<R> for Transaction {
-       fn read(r: &mut R) -> Result<Self, DecodeError> {
-               match consensus::encode::Decodable::consensus_decode(r) {
-                       Ok(t) => Ok(t),
-                       Err(consensus::encode::Error::Io(ref e)) if e.kind() == ::std::io::ErrorKind::UnexpectedEof => Err(DecodeError::ShortRead),
-                       Err(consensus::encode::Error::Io(e)) => Err(DecodeError::Io(e)),
-                       Err(_) => Err(DecodeError::InvalidValue),
+               impl<R: Read> Readable<R> for $bitcoin_type {
+                       fn read(r: &mut R) -> Result<Self, DecodeError> {
+                               match consensus::encode::Decodable::consensus_decode(r) {
+                                       Ok(t) => Ok(t),
+                                       Err(consensus::encode::Error::Io(ref e)) if e.kind() == ::std::io::ErrorKind::UnexpectedEof => Err(DecodeError::ShortRead),
+                                       Err(consensus::encode::Error::Io(e)) => Err(DecodeError::Io(e)),
+                                       Err(_) => Err(DecodeError::InvalidValue),
+                               }
+                       }
                }
        }
 }
+impl_consensus_ser!(Transaction);
+impl_consensus_ser!(TxOut);
 
 impl<R: Read, T: Readable<R>> Readable<R> for Mutex<T> {
        fn read(r: &mut R) -> Result<Self, DecodeError> {