Change Mutex to RwLock in ChainMonitor
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 8483f5ca829ff3e1ad5aeecb11b5c97ee1a3a347..de826d054d3716bd41df35e03706ac3e900c1322 100644 (file)
@@ -29,7 +29,7 @@
 //! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html
 //! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html
 
-use bitcoin::blockdata::block::BlockHeader;
+use bitcoin::blockdata::block::{Block, BlockHeader};
 
 use chain;
 use chain::Filter;
@@ -43,7 +43,7 @@ use util::events;
 use util::events::Event;
 
 use std::collections::{HashMap, hash_map};
-use std::sync::Mutex;
+use std::sync::RwLock;
 use std::ops::Deref;
 
 /// An implementation of [`chain::Watch`] for monitoring channels.
@@ -64,7 +64,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
         P::Target: channelmonitor::Persist<ChannelSigner>,
 {
        /// The monitors
-       pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
+       pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
        chain_source: Option<C>,
        broadcaster: T,
        logger: L,
@@ -93,8 +93,8 @@ where C::Target: chain::Filter,
        /// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events
        /// [`chain::Filter`]: ../trait.Filter.html
        pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
-               let mut monitors = self.monitors.lock().unwrap();
-               for monitor in monitors.values_mut() {
+               let monitors = self.monitors.read().unwrap();
+               for monitor in monitors.values() {
                        let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
 
                        if let Some(ref chain_source) = self.chain_source {
@@ -113,8 +113,8 @@ where C::Target: chain::Filter,
        ///
        /// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected
        pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
-               let mut monitors = self.monitors.lock().unwrap();
-               for monitor in monitors.values_mut() {
+               let monitors = self.monitors.read().unwrap();
+               for monitor in monitors.values() {
                        monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
                }
        }
@@ -130,7 +130,7 @@ where C::Target: chain::Filter,
        /// [`chain::Filter`]: ../trait.Filter.html
        pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
                Self {
-                       monitors: Mutex::new(HashMap::new()),
+                       monitors: RwLock::new(HashMap::new()),
                        chain_source,
                        broadcaster,
                        logger,
@@ -140,6 +140,26 @@ where C::Target: chain::Filter,
        }
 }
 
+impl<ChannelSigner: Sign, C: Deref + Send + Sync, T: Deref + Send + Sync, F: Deref + Send + Sync, L: Deref + Send + Sync, P: Deref + Send + Sync>
+chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
+where
+       ChannelSigner: Sign,
+       C::Target: chain::Filter,
+       T::Target: BroadcasterInterface,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
+       P::Target: channelmonitor::Persist<ChannelSigner>,
+{
+       fn block_connected(&self, block: &Block, height: u32) {
+               let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+               ChainMonitor::block_connected(self, &block.header, &txdata, height);
+       }
+
+       fn block_disconnected(&self, header: &BlockHeader, height: u32) {
+               ChainMonitor::block_disconnected(self, header, height);
+       }
+}
+
 impl<ChannelSigner: Sign, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, P: Deref + Sync + Send>
 chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where C::Target: chain::Filter,
@@ -157,7 +177,7 @@ where C::Target: chain::Filter,
        ///
        /// [`chain::Filter`]: ../trait.Filter.html
        fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
-               let mut monitors = self.monitors.lock().unwrap();
+               let mut monitors = self.monitors.write().unwrap();
                let entry = match monitors.entry(funding_outpoint) {
                        hash_map::Entry::Occupied(_) => {
                                log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
@@ -189,8 +209,8 @@ where C::Target: chain::Filter,
        /// `ChainMonitor` monitors lock.
        fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
                // Update the monitor that watches the channel referred to by the given outpoint.
-               let mut monitors = self.monitors.lock().unwrap();
-               match monitors.get_mut(&funding_txo) {
+               let monitors = self.monitors.read().unwrap();
+               match monitors.get(&funding_txo) {
                        None => {
                                log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
 
@@ -202,15 +222,15 @@ where C::Target: chain::Filter,
                                #[cfg(not(any(test, feature = "fuzztarget")))]
                                Err(ChannelMonitorUpdateErr::PermanentFailure)
                        },
-                       Some(orig_monitor) => {
-                               log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor));
-                               let update_res = orig_monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
+                       Some(monitor) => {
+                               log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
+                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
                                if let Err(e) = &update_res {
                                        log_error!(self.logger, "Failed to update channel monitor: {:?}", e);
                                }
                                // Even if updating the monitor returns an error, the monitor's state will
                                // still be changed. So, persist the updated monitor despite the error.
-                               let persist_res = self.persister.update_persisted_channel(funding_txo, &update, orig_monitor);
+                               let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor);
                                if let Err(ref e) = persist_res {
                                        log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
                                }
@@ -225,8 +245,8 @@ where C::Target: chain::Filter,
 
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
                let mut pending_monitor_events = Vec::new();
-               for chan in self.monitors.lock().unwrap().values_mut() {
-                       pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events());
+               for monitor in self.monitors.read().unwrap().values() {
+                       pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
                }
                pending_monitor_events
        }
@@ -241,8 +261,8 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
 {
        fn get_and_clear_pending_events(&self) -> Vec<Event> {
                let mut pending_events = Vec::new();
-               for chan in self.monitors.lock().unwrap().values_mut() {
-                       pending_events.append(&mut chan.get_and_clear_pending_events());
+               for monitor in self.monitors.read().unwrap().values() {
+                       pending_events.append(&mut monitor.get_and_clear_pending_events());
                }
                pending_events
        }