use crate::ln::wire::Encode;
use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient};
use crate::util::config::{UserConfig, ChannelConfig};
-use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
+use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
use crate::util::{byte_utils, events};
use crate::util::wakers::{Future, Notifier};
use crate::util::scid_utils::fake_scid;
}
let routing = match hop_data.format {
- msgs::OnionHopDataFormat::Legacy { .. } => {
- return Err(ReceiveError {
- err_code: 0x4000|0x2000|3,
- err_data: Vec::new(),
- msg: "We require payment_secrets",
- });
- },
msgs::OnionHopDataFormat::NonFinalNode { .. } => {
return Err(ReceiveError {
err_code: 0x4000|22,
};
let short_channel_id = match next_hop_data.format {
- msgs::OnionHopDataFormat::Legacy { short_channel_id } => short_channel_id,
msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id,
msgs::OnionHopDataFormat::FinalNode { .. } => {
return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]);
if chan.get().get_counterparty_node_id() != *counterparty_node_id {
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
}
- try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), chan);
+ try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg, &self.logger), chan);
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
}
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
let events = core::cell::RefCell::new(Vec::new());
- let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+ let event_handler = |event: events::Event| events.borrow_mut().push(event);
self.process_pending_events(&event_handler);
events.into_inner()
}
pub fn clear_pending_payments(&self) {
self.pending_outbound_payments.lock().unwrap().clear()
}
+
+ /// Processes any events asynchronously in the order they were generated since the last call
+ /// using the given event handler.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ // We'll acquire our total consistency lock until the returned future completes so that
+ // we can be sure no other persists happen while processing events.
+ let _read_guard = self.total_consistency_lock.read().unwrap();
+
+ let mut result = NotifyOption::SkipPersist;
+
+ // TODO: This behavior should be documented. It's unintuitive that we query
+ // ChannelMonitors when clearing other events.
+ if self.process_pending_monitor_events() {
+ result = NotifyOption::DoPersist;
+ }
+
+ let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+ if !pending_events.is_empty() {
+ result = NotifyOption::DoPersist;
+ }
+
+ for event in pending_events {
+ handler(event).await;
+ }
+
+ if result == NotifyOption::DoPersist {
+ self.persistence_notifier.notify();
+ }
+ }
}
impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<M, T, K, F, L>
result = NotifyOption::DoPersist;
}
- let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+ let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
- for event in pending_events.drain(..) {
- handler.handle_event(&event);
+ for event in pending_events {
+ handler.handle_event(event);
}
result
});
}
- fn get_relevant_txids(&self) -> Vec<Txid> {
+ fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
let channel_state = self.channel_state.lock().unwrap();
let mut res = Vec::with_capacity(channel_state.by_id.len());
for chan in channel_state.by_id.values() {
- if let Some(funding_txo) = chan.get_funding_txo() {
- res.push(funding_txo.txid);
+ if let (Some(funding_txo), block_hash) = (chan.get_funding_txo(), chan.get_funding_tx_confirmed_in()) {
+ res.push((funding_txo.txid, block_hash));
}
}
res
/// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
/// indicating whether persistence is necessary. Only one listener on
- /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
- /// up.
+ /// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by
+ /// [`get_persistable_update_future`] is guaranteed to be woken up.
///
/// Note that this method is not available with the `no-std` feature.
+ ///
+ /// [`await_persistable_update`]: Self::await_persistable_update
+ /// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout
+ /// [`get_persistable_update_future`]: Self::get_persistable_update_future
#[cfg(any(test, feature = "std"))]
pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool {
self.persistence_notifier.wait_timeout(max_wait)
}
/// Blocks until ChannelManager needs to be persisted. Only one listener on
- /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
- /// up.
+ /// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by
+ /// [`get_persistable_update_future`] is guaranteed to be woken up.
+ ///
+ /// [`await_persistable_update`]: Self::await_persistable_update
+ /// [`get_persistable_update_future`]: Self::get_persistable_update_future
pub fn await_persistable_update(&self) {
self.persistence_notifier.wait()
}
/// which you've already broadcasted the transaction.
///
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
-pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
- where M::Target: chain::Watch<Signer>,
+pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+ where M::Target: chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: BroadcasterInterface,
- K::Target: KeysInterface<Signer = Signer>,
+ K::Target: KeysInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
/// this struct.
///
/// (C-not exported) because we have no HashMap bindings
- pub channel_monitors: HashMap<OutPoint, &'a mut ChannelMonitor<Signer>>,
+ pub channel_monitors: HashMap<OutPoint, &'a mut ChannelMonitor<<K::Target as KeysInterface>::Signer>>,
}
-impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
- ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>
- where M::Target: chain::Watch<Signer>,
+impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+ ChannelManagerReadArgs<'a, M, T, K, F, L>
+ where M::Target: chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: BroadcasterInterface,
- K::Target: KeysInterface<Signer = Signer>,
+ K::Target: KeysInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
/// HashMap for you. This is primarily useful for C bindings where it is not practical to
/// populate a HashMap directly from C.
pub fn new(keys_manager: K, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, logger: L, default_config: UserConfig,
- mut channel_monitors: Vec<&'a mut ChannelMonitor<Signer>>) -> Self {
+ mut channel_monitors: Vec<&'a mut ChannelMonitor<<K::Target as KeysInterface>::Signer>>) -> Self {
Self {
keys_manager, fee_estimator, chain_monitor, tx_broadcaster, logger, default_config,
channel_monitors: channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect()
// Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the
// SipmleArcChannelManager type:
impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
- ReadableArgs<ChannelManagerReadArgs<'a, <K::Target as KeysInterface>::Signer, M, T, K, F, L>> for (BlockHash, Arc<ChannelManager<M, T, K, F, L>>)
+ ReadableArgs<ChannelManagerReadArgs<'a, M, T, K, F, L>> for (BlockHash, Arc<ChannelManager<M, T, K, F, L>>)
where M::Target: chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
- fn read<R: io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, <K::Target as KeysInterface>::Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
+ fn read<R: io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result<Self, DecodeError> {
let (blockhash, chan_manager) = <(BlockHash, ChannelManager<M, T, K, F, L>)>::read(reader, args)?;
Ok((blockhash, Arc::new(chan_manager)))
}
}
impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
- ReadableArgs<ChannelManagerReadArgs<'a, <K::Target as KeysInterface>::Signer, M, T, K, F, L>> for (BlockHash, ChannelManager<M, T, K, F, L>)
+ ReadableArgs<ChannelManagerReadArgs<'a, M, T, K, F, L>> for (BlockHash, ChannelManager<M, T, K, F, L>)
where M::Target: chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
- fn read<R: io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, <K::Target as KeysInterface>::Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
+ fn read<R: io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result<Self, DecodeError> {
let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
let genesis_hash: BlockHash = Readable::read(reader)?;