+ pending_monitor_events: Mutex::new(Vec::new()),
+ highest_chain_height: AtomicUsize::new(0),
+ }
+ }
+
+ /// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or
+ /// claims which are awaiting confirmation.
+ ///
+ /// Includes the balances from each [`ChannelMonitor`] *except* those included in
+ /// `ignored_channels`, allowing you to filter out balances from channels which are still open
+ /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
+ ///
+ /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
+ /// inclusion in the return value.
+ pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
+ let mut ret = Vec::new();
+ let monitor_states = self.monitors.read().unwrap();
+ for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
+ for chan in ignored_channels {
+ if chan.funding_txo.as_ref() == Some(funding_outpoint) {
+ return false;
+ }
+ }
+ true
+ }) {
+ ret.append(&mut monitor_state.monitor.get_claimable_balances());
+ }
+ ret
+ }
+
+ /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
+ /// such [`ChannelMonitor`] is currently being monitored for.
+ ///
+ /// Note that the result holds a mutex over our monitor set, and should not be held
+ /// indefinitely.
+ pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
+ let lock = self.monitors.read().unwrap();
+ if lock.get(&funding_txo).is_some() {
+ Ok(LockedChannelMonitor { lock, funding_txo })
+ } else {
+ Err(())
+ }
+ }
+
+ /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
+ ///
+ /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
+ /// monitoring for on-chain state resolutions.
+ pub fn list_monitors(&self) -> Vec<OutPoint> {
+ self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
+ }
+
+ #[cfg(test)]
+ pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
+ self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
+ }
+
+ /// Indicates the persistence of a [`ChannelMonitor`] has completed after
+ /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
+ ///
+ /// Thus, the anticipated use is, at a high level:
+ /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
+ /// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
+ /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
+ /// 2) once all remote copies are updated, you call this function with the
+ /// `completed_update_id` that completed, and once all pending updates have completed the
+ /// channel will be re-enabled.
+ // Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't
+ // care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We
+ // only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s.
+ ///
+ /// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently
+ /// registered [`ChannelMonitor`]s.
+ pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> {
+ let monitors = self.monitors.read().unwrap();
+ let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else {
+ return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) });
+ };
+ let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
+ pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
+
+ match completed_update_id {
+ MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => {
+ // Note that we only check for `UpdateOrigin::OffChain` failures here - if
+ // we're being told that a `UpdateOrigin::OffChain` monitor update completed,
+ // we only care about ensuring we don't tell the `ChannelManager` to restore
+ // the channel to normal operation until all `UpdateOrigin::OffChain` updates
+ // complete.
+ // If there's some `UpdateOrigin::ChainSync` update still pending that's okay
+ // - we can still update our channel state, just as long as we don't return
+ // `MonitorEvent`s from the monitor back to the `ChannelManager` until they
+ // complete.
+ let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates);
+ if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
+ // If there are still monitor updates pending (or an old monitor update
+ // finished after a later one perm-failed), we cannot yet construct an
+ // UpdateCompleted event.
+ return Ok(());
+ }
+ self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ funding_txo,
+ monitor_update_id: monitor_data.monitor.get_latest_update_id(),
+ });
+ },
+ MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
+ if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
+ monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
+ // The next time release_pending_monitor_events is called, any events for this
+ // ChannelMonitor will be returned.
+ }
+ },
+ }
+ Ok(())
+ }
+
+ /// This wrapper avoids having to update some of our tests for now as they assume the direct
+ /// chain::Watch API wherein we mark a monitor fully-updated by just calling
+ /// channel_monitor_updated once with the highest ID.
+ #[cfg(any(test, feature = "fuzztarget"))]
+ pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
+ self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ funding_txo,
+ monitor_update_id,
+ });
+ }
+
+ #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+ pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+ use util::events::EventsProvider;
+ let events = core::cell::RefCell::new(Vec::new());
+ let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+ self.process_pending_events(&event_handler);
+ events.into_inner()
+ }
+}
+
+impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
+where
+ C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator,
+ L::Target: Logger,
+ P::Target: Persist<ChannelSigner>,
+{
+ fn block_connected(&self, block: &Block, height: u32) {
+ let header = &block.header;
+ let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+ log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
+ self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
+ monitor.block_connected(
+ header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+ });
+ }
+
+ fn block_disconnected(&self, header: &BlockHeader, height: u32) {
+ let monitor_states = self.monitors.read().unwrap();
+ log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
+ for monitor_state in monitor_states.values() {
+ monitor_state.monitor.block_disconnected(
+ header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+ }
+ }
+}
+
+impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
+where
+ C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator,
+ L::Target: Logger,
+ P::Target: Persist<ChannelSigner>,
+{
+ fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+ log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
+ self.process_chain_data(header, None, txdata, |monitor, txdata| {
+ monitor.transactions_confirmed(
+ header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+ });
+ }
+
+ fn transaction_unconfirmed(&self, txid: &Txid) {
+ log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
+ let monitor_states = self.monitors.read().unwrap();
+ for monitor_state in monitor_states.values() {
+ monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);