Handle Persister returning TemporaryFailure for new channels
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index aeab80a7ae55bc05e603d5f1ef165f2edcd83a35..f1ce0f79ae9cd99860283ec72c00df49430c87a3 100644 (file)
@@ -27,19 +27,90 @@ use bitcoin::blockdata::block::{Block, BlockHeader};
 use bitcoin::hash_types::Txid;
 
 use chain;
-use chain::{Filter, WatchedOutput};
+use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor;
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::logger::Logger;
 use util::events;
-use util::events::Event;
+use util::events::EventHandler;
+use ln::channelmanager::ChannelDetails;
 
-use std::collections::{HashMap, hash_map};
-use std::sync::RwLock;
-use std::ops::Deref;
+use prelude::*;
+use sync::{RwLock, RwLockReadGuard};
+use core::ops::Deref;
+
+/// `Persist` defines behavior for persisting channel monitors: this could mean
+/// writing once to disk, and/or uploading to one or more backup services.
+///
+/// Note that for every new monitor, you **must** persist the new `ChannelMonitor`
+/// to disk/backups. And, on every update, you **must** persist either the
+/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk
+/// of situations such as revoking a transaction, then crashing before this
+/// revocation can be persisted, then unintentionally broadcasting a revoked
+/// transaction and losing money. This is a risk because previous channel states
+/// are toxic, so it's important that whatever channel state is persisted is
+/// kept up-to-date.
+pub trait Persist<ChannelSigner: Sign> {
+       /// Persist a new channel's data. The data can be stored any way you want, but
+       /// the identifier provided by Rust-Lightning is the channel's outpoint (and
+       /// it is up to you to maintain a correct mapping between the outpoint and the
+       /// stored channel data). Note that you **must** persist every new monitor to
+       /// disk. See the `Persist` trait documentation for more details.
+       ///
+       /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
+       /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       ///
+       /// [`Writeable::write`]: crate::util::ser::Writeable::write
+       fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
+
+       /// Update one channel's data. The provided `ChannelMonitor` has already
+       /// applied the given update.
+       ///
+       /// Note that on every update, you **must** persist either the
+       /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See
+       /// the `Persist` trait documentation for more details.
+       ///
+       /// If an implementer chooses to persist the updates only, they need to make
+       /// sure that all the updates are applied to the `ChannelMonitors` *before*
+       /// the set of channel monitors is given to the `ChannelManager`
+       /// deserialization routine. See [`ChannelMonitor::update_monitor`] for
+       /// applying a monitor update to a monitor. If full `ChannelMonitors` are
+       /// persisted, then there is no need to persist individual updates.
+       ///
+       /// Note that there could be a performance tradeoff between persisting complete
+       /// channel monitors on every update vs. persisting only updates and applying
+       /// them in batches. The size of each monitor grows `O(number of state updates)`
+       /// whereas updates are small and `O(1)`.
+       ///
+       /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
+       /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
+       /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       ///
+       /// [`Writeable::write`]: crate::util::ser::Writeable::write
+       fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
+}
+
+struct MonitorHolder<ChannelSigner: Sign> {
+       monitor: ChannelMonitor<ChannelSigner>,
+}
+
+/// A read-only reference to a current ChannelMonitor.
+///
+/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
+/// released.
+pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> {
+       lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
+       funding_txo: OutPoint,
+}
+
+impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
+       type Target = ChannelMonitor<ChannelSigner>;
+       fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
+               &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
+       }
+}
 
 /// An implementation of [`chain::Watch`] for monitoring channels.
 ///
@@ -55,10 +126,9 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
         T::Target: BroadcasterInterface,
         F::Target: FeeEstimator,
         L::Target: Logger,
-        P::Target: channelmonitor::Persist<ChannelSigner>,
+        P::Target: Persist<ChannelSigner>,
 {
-       /// The monitors
-       pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
+       monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
        chain_source: Option<C>,
        broadcaster: T,
        logger: L,
@@ -71,10 +141,10 @@ where C::Target: chain::Filter,
            T::Target: BroadcasterInterface,
            F::Target: FeeEstimator,
            L::Target: Logger,
-           P::Target: channelmonitor::Persist<ChannelSigner>,
+           P::Target: Persist<ChannelSigner>,
 {
        /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
-       /// of a channel and reacting accordingly based on transactions in the connected block. See
+       /// of a channel and reacting accordingly based on transactions in the given chain data. See
        /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
        /// be returned by [`chain::Watch::release_pending_monitor_events`].
        ///
@@ -82,64 +152,14 @@ where C::Target: chain::Filter,
        /// calls must not exclude any transactions matching the new outputs nor any in-block
        /// descendants of such transactions. It is not necessary to re-fetch the block to obtain
        /// updated `txdata`.
-       pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
-               self.process_chain_data(header, txdata, |monitor, txdata| {
-                       monitor.block_connected(
-                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
-               });
-       }
-
-       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
-       /// of a channel and reacting accordingly to newly confirmed transactions. For details, see
-       /// [`ChannelMonitor::transactions_confirmed`].
-       ///
-       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
-       /// blocks. May be called before or after [`best_block_updated`] for transactions in the
-       /// corresponding block. See [`best_block_updated`] for further calling expectations.
-       ///
-       /// [`block_connected`]: Self::block_connected
-       /// [`best_block_updated`]: Self::best_block_updated
-       pub fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
-               self.process_chain_data(header, txdata, |monitor, txdata| {
-                       monitor.transactions_confirmed(
-                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
-               });
-       }
-
-       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
-       /// of a channel and reacting accordingly based on the new chain tip. For details, see
-       /// [`ChannelMonitor::best_block_updated`].
-       ///
-       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
-       /// blocks. May be called before or after [`transactions_confirmed`] for the corresponding
-       /// block.
-       ///
-       /// Must be called after new blocks become available for the most recent block. Intermediary
-       /// blocks, however, may be safely skipped. In the event of a chain re-organization, this only
-       /// needs to be called for the most recent block assuming `transaction_unconfirmed` is called
-       /// for any affected transactions.
-       ///
-       /// [`block_connected`]: Self::block_connected
-       /// [`transactions_confirmed`]: Self::transactions_confirmed
-       /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed
-       pub fn best_block_updated(&self, header: &BlockHeader, height: u32) {
-               self.process_chain_data(header, &[], |monitor, txdata| {
-                       // While in practice there shouldn't be any recursive calls when given empty txdata,
-                       // it's still possible if a chain::Filter implementation returns a transaction.
-                       debug_assert!(txdata.is_empty());
-                       monitor.best_block_updated(
-                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
-               });
-       }
-
        fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
                let mut dependent_txdata = Vec::new();
-               let monitors = self.monitors.read().unwrap();
-               for monitor in monitors.values() {
-                       let mut txn_outputs = process(monitor, txdata);
+               let monitor_states = self.monitors.read().unwrap();
+               for monitor_state in monitor_states.values() {
+                       let mut txn_outputs = process(&monitor_state.monitor, txdata);
 
                        // Register any new outputs with the chain source for filtering, storing any dependent
                        // transactions from within the block that previously had not been included in txdata.
@@ -172,46 +192,6 @@ where C::Target: chain::Filter,
                }
        }
 
-       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
-       /// of a channel based on the disconnected block. See [`ChannelMonitor::block_disconnected`] for
-       /// details.
-       pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
-               let monitors = self.monitors.read().unwrap();
-               for monitor in monitors.values() {
-                       monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
-               }
-       }
-
-       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
-       /// of a channel based on transactions unconfirmed as a result of a chain reorganization. See
-       /// [`ChannelMonitor::transaction_unconfirmed`] for details.
-       ///
-       /// Used instead of [`block_disconnected`] by clients that are notified of transactions rather
-       /// than blocks. May be called before or after [`best_block_updated`] for transactions in the
-       /// corresponding block. See [`best_block_updated`] for further calling expectations.
-       ///
-       /// [`block_disconnected`]: Self::block_disconnected
-       /// [`best_block_updated`]: Self::best_block_updated
-       pub fn transaction_unconfirmed(&self, txid: &Txid) {
-               let monitors = self.monitors.read().unwrap();
-               for monitor in monitors.values() {
-                       monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
-               }
-       }
-
-       /// Returns the set of txids that should be monitored for re-organization out of the chain.
-       pub fn get_relevant_txids(&self) -> Vec<Txid> {
-               let mut txids = Vec::new();
-               let monitors = self.monitors.read().unwrap();
-               for monitor in monitors.values() {
-                       txids.append(&mut monitor.get_relevant_txids());
-               }
-
-               txids.sort_unstable();
-               txids.dedup();
-               txids
-       }
-
        /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
        ///
        /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
@@ -229,35 +209,154 @@ where C::Target: chain::Filter,
                        persister,
                }
        }
+
+       /// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or
+       /// claims which are awaiting confirmation.
+       ///
+       /// Includes the balances from each [`ChannelMonitor`] *except* those included in
+       /// `ignored_channels`, allowing you to filter out balances from channels which are still open
+       /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
+       ///
+       /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
+       /// inclusion in the return value.
+       pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
+               let mut ret = Vec::new();
+               let monitor_states = self.monitors.read().unwrap();
+               for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
+                       for chan in ignored_channels {
+                               if chan.funding_txo.as_ref() == Some(funding_outpoint) {
+                                       return false;
+                               }
+                       }
+                       true
+               }) {
+                       ret.append(&mut monitor_state.monitor.get_claimable_balances());
+               }
+               ret
+       }
+
+       /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
+       /// such [`ChannelMonitor`] is currently being monitored for.
+       ///
+       /// Note that the result holds a mutex over our monitor set, and should not be held
+       /// indefinitely.
+       pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
+               let lock = self.monitors.read().unwrap();
+               if lock.get(&funding_txo).is_some() {
+                       Ok(LockedChannelMonitor { lock, funding_txo })
+               } else {
+                       Err(())
+               }
+       }
+
+       /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
+       ///
+       /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
+       /// monitoring for on-chain state resolutions.
+       pub fn list_monitors(&self) -> Vec<OutPoint> {
+               self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
+       }
+
+       #[cfg(test)]
+       pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
+               self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
+       }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               use util::events::EventsProvider;
+               let events = core::cell::RefCell::new(Vec::new());
+               let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
-impl<ChannelSigner: Sign, C: Deref + Send + Sync, T: Deref + Send + Sync, F: Deref + Send + Sync, L: Deref + Send + Sync, P: Deref + Send + Sync>
+impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
 chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where
-       ChannelSigner: Sign,
        C::Target: chain::Filter,
        T::Target: BroadcasterInterface,
        F::Target: FeeEstimator,
        L::Target: Logger,
-       P::Target: channelmonitor::Persist<ChannelSigner>,
+       P::Target: Persist<ChannelSigner>,
 {
        fn block_connected(&self, block: &Block, height: u32) {
+               let header = &block.header;
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
-               ChainMonitor::block_connected(self, &block.header, &txdata, height);
+               log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
+               self.process_chain_data(header, &txdata, |monitor, txdata| {
+                       monitor.block_connected(
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
        }
 
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
-               ChainMonitor::block_disconnected(self, header, height);
+               let monitor_states = self.monitors.read().unwrap();
+               log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
+               for monitor_state in monitor_states.values() {
+                       monitor_state.monitor.block_disconnected(
+                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+               }
+       }
+}
+
+impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
+where
+       C::Target: chain::Filter,
+       T::Target: BroadcasterInterface,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
+       P::Target: Persist<ChannelSigner>,
+{
+       fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+               log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
+               self.process_chain_data(header, txdata, |monitor, txdata| {
+                       monitor.transactions_confirmed(
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
+       }
+
+       fn transaction_unconfirmed(&self, txid: &Txid) {
+               log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
+               let monitor_states = self.monitors.read().unwrap();
+               for monitor_state in monitor_states.values() {
+                       monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+               }
+       }
+
+       fn best_block_updated(&self, header: &BlockHeader, height: u32) {
+               log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
+               self.process_chain_data(header, &[], |monitor, txdata| {
+                       // While in practice there shouldn't be any recursive calls when given empty txdata,
+                       // it's still possible if a chain::Filter implementation returns a transaction.
+                       debug_assert!(txdata.is_empty());
+                       monitor.best_block_updated(
+                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
+       }
+
+       fn get_relevant_txids(&self) -> Vec<Txid> {
+               let mut txids = Vec::new();
+               let monitor_states = self.monitors.read().unwrap();
+               for monitor_state in monitor_states.values() {
+                       txids.append(&mut monitor_state.monitor.get_relevant_txids());
+               }
+
+               txids.sort_unstable();
+               txids.dedup();
+               txids
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, P: Deref + Sync + Send>
+impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
 chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where C::Target: chain::Filter,
            T::Target: BroadcasterInterface,
            F::Target: FeeEstimator,
            L::Target: Logger,
-           P::Target: channelmonitor::Persist<ChannelSigner>,
+           P::Target: Persist<ChannelSigner>,
 {
        /// Adds the monitor that watches the channel referred to by the given outpoint.
        ///
@@ -273,9 +372,12 @@ where C::Target: chain::Filter,
                                return Err(ChannelMonitorUpdateErr::PermanentFailure)},
                        hash_map::Entry::Vacant(e) => e,
                };
-               if let Err(e) = self.persister.persist_new_channel(funding_outpoint, &monitor) {
-                       log_error!(self.logger, "Failed to persist new channel data");
-                       return Err(e);
+               let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor);
+               if persist_res.is_err() {
+                       log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res);
+               }
+               if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
+                       return persist_res;
                }
                {
                        let funding_txo = monitor.get_funding_txo();
@@ -285,8 +387,8 @@ where C::Target: chain::Filter,
                                monitor.load_outputs_to_watch(chain_source);
                        }
                }
-               entry.insert(monitor);
-               Ok(())
+               entry.insert(MonitorHolder { monitor });
+               persist_res
        }
 
        /// Note that we persist the given `ChannelMonitor` update while holding the
@@ -306,7 +408,8 @@ where C::Target: chain::Filter,
                                #[cfg(not(any(test, feature = "fuzztarget")))]
                                Err(ChannelMonitorUpdateErr::PermanentFailure)
                        },
-                       Some(monitor) => {
+                       Some(monitor_state) => {
+                               let monitor = &monitor_state.monitor;
                                log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
                                let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
                                if let Err(e) = &update_res {
@@ -329,8 +432,8 @@ where C::Target: chain::Filter,
 
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
                let mut pending_monitor_events = Vec::new();
-               for monitor in self.monitors.read().unwrap().values() {
-                       pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
+               for monitor_state in self.monitors.read().unwrap().values() {
+                       pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
                }
                pending_monitor_events
        }
@@ -341,14 +444,22 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
              T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
-             P::Target: channelmonitor::Persist<ChannelSigner>,
+             P::Target: Persist<ChannelSigner>,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
+       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+       /// order to handle these events.
+       ///
+       /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
                let mut pending_events = Vec::new();
-               for monitor in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor.get_and_clear_pending_events());
+               for monitor_state in self.monitors.read().unwrap().values() {
+                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+               }
+               for event in pending_events.drain(..) {
+                       handler.handle_event(&event);
                }
-               pending_events
        }
 }
 
@@ -357,7 +468,6 @@ mod tests {
        use ::{check_added_monitors, get_local_commitment_txn};
        use ln::features::InitFeatures;
        use ln::functional_test_utils::*;
-       use util::events::EventsProvider;
        use util::events::MessageSendEventsProvider;
        use util::test_utils::{OnRegisterOutput, TxOutReference};
 
@@ -379,7 +489,7 @@ mod tests {
                let (commitment_tx, htlc_tx) = {
                        let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
                        let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
-                       claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage, 5_000_000);
+                       claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage);
 
                        assert_eq!(txn.len(), 2);
                        (txn.remove(0), txn.remove(0))