use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface};
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
use lightning::ln::channelmonitor;
-use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, HTLCUpdate};
+use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs};
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, UpdateAddHTLC, Init};
self.update_ret.lock().unwrap().clone()
}
- fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
- return self.simple_monitor.get_and_clear_pending_htlcs_updated();
+ fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ return self.simple_monitor.get_and_clear_pending_monitor_events();
}
}
use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
use chain::transaction::OutPoint;
use ln::channel::{Channel, ChannelError};
-use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent};
use ln::features::{InitFeatures, NodeFeatures};
use routing::router::{Route, RouteHop};
use ln::msgs;
Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
}
}
+
+ /// Process pending events from the ManyChannelMonitor.
+ fn process_pending_monitor_events(&self) {
+ let mut failed_channels = Vec::new();
+ {
+ for monitor_event in self.monitor.get_and_clear_pending_monitor_events() {
+ match monitor_event {
+ MonitorEvent::HTLCEvent(htlc_update) => {
+ if let Some(preimage) = htlc_update.payment_preimage {
+ log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+ self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
+ } else {
+ log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+ self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+ }
+ },
+ MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = &mut *channel_lock;
+ let by_id = &mut channel_state.by_id;
+ let short_to_id = &mut channel_state.short_to_id;
+ let pending_msg_events = &mut channel_state.pending_msg_events;
+ if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
+ if let Some(short_id) = chan.get_short_channel_id() {
+ short_to_id.remove(&short_id);
+ }
+ failed_channels.push(chan.force_shutdown(false));
+ if let Ok(update) = self.get_channel_update(&chan) {
+ pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ }
+ },
+ }
+ }
+ }
+
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
+ }
+ }
}
impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> events::MessageSendEventsProvider for ChannelManager<ChanSigner, M, T, K, F, L>
L::Target: Logger,
{
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
- // TODO: Event release to users and serialization is currently race-y: it's very easy for a
- // user to serialize a ChannelManager with pending events in it and lose those events on
- // restart. This is doubly true for the fail/fulfill-backs from monitor events!
- {
- //TODO: This behavior should be documented.
- for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
- if let Some(preimage) = htlc_update.payment_preimage {
- log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
- } else {
- log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
- self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
- }
- }
- }
+ //TODO: This behavior should be documented. It's non-intuitive that we query
+ // ChannelMonitors when clearing other events.
+ self.process_pending_monitor_events();
let mut ret = Vec::new();
let mut channel_state = self.channel_state.lock().unwrap();
L::Target: Logger,
{
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
- // TODO: Event release to users and serialization is currently race-y: it's very easy for a
- // user to serialize a ChannelManager with pending events in it and lose those events on
- // restart. This is doubly true for the fail/fulfill-backs from monitor events!
- {
- //TODO: This behavior should be documented.
- for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
- if let Some(preimage) = htlc_update.payment_preimage {
- log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
- } else {
- log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
- self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
- }
- }
- }
+ //TODO: This behavior should be documented. It's non-intuitive that we query
+ // ChannelMonitors when clearing other events.
+ self.process_pending_monitor_events();
let mut ret = Vec::new();
let mut pending_events = self.pending_events.lock().unwrap();
}
}
}
- if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height, &self.logger) {
- if let Some(short_id) = channel.get_short_channel_id() {
- short_to_id.remove(&short_id);
- }
- // If would_broadcast_at_height() is true, the channel_monitor will broadcast
- // the latest local tx for us, so we should skip that here (it doesn't really
- // hurt anything, but does make tests a bit simpler).
- failed_channels.push(channel.force_shutdown(false));
- if let Ok(update) = self.get_channel_update(&channel) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
- });
- }
- return false;
- }
true
});
#[derive(Debug)]
pub struct MonitorUpdateError(pub &'static str);
+/// An event to be processed by the ChannelManager.
+#[derive(PartialEq)]
+pub enum MonitorEvent {
+ /// A monitor event containing an HTLCUpdate.
+ HTLCEvent(HTLCUpdate),
+
+ /// A monitor event that the Channel's commitment transaction was broadcasted.
+ CommitmentTxBroadcasted(OutPoint),
+}
+
/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
/// forward channel and from which info are needed to update HTLC in a backward channel.
#[derive(Clone, PartialEq)]
}
}
- fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
- let mut pending_htlcs_updated = Vec::new();
+ fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ let mut pending_monitor_events = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
- pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
+ pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events());
}
- pending_htlcs_updated
+ pending_monitor_events
}
}
/// information and are actively monitoring the chain.
///
/// Pending Events or updated HTLCs which have not yet been read out by
-/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
+/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
/// gotten are fully handled before re-serializing the new state.
pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
- pending_htlcs_updated: Vec<HTLCUpdate>,
+ pending_monitor_events: Vec<MonitorEvent>,
pending_events: Vec<events::Event>,
// Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
/// with success or failure.
///
/// You should probably just call through to
- /// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
+ /// ChannelMonitor::get_and_clear_pending_monitor_events() for each ChannelMonitor and return
/// the full list.
- fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
+ fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent>;
}
#[cfg(any(test, feature = "fuzztarget"))]
self.current_local_commitment_number != other.current_local_commitment_number ||
self.current_local_commitment_tx != other.current_local_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
- self.pending_htlcs_updated != other.pending_htlcs_updated ||
+ self.pending_monitor_events != other.pending_monitor_events ||
self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf ||
self.outputs_to_watch != other.outputs_to_watch ||
writer.write_all(&payment_preimage.0[..])?;
}
- writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
- for data in self.pending_htlcs_updated.iter() {
- data.write(writer)?;
+ writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
+ for event in self.pending_monitor_events.iter() {
+ match event {
+ MonitorEvent::HTLCEvent(upd) => {
+ 0u8.write(writer)?;
+ upd.write(writer)?;
+ },
+ MonitorEvent::CommitmentTxBroadcasted(_) => 1u8.write(writer)?
+ }
}
writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
current_local_commitment_number: 0xffff_ffff_ffff - ((((local_tx_sequence & 0xffffff) << 3*8) | (local_tx_locktime as u64 & 0xffffff)) ^ commitment_transaction_number_obscure_factor),
payment_preimages: HashMap::new(),
- pending_htlcs_updated: Vec::new(),
+ pending_monitor_events: Vec::new(),
pending_events: Vec::new(),
onchain_events_waiting_threshold_conf: HashMap::new(),
for tx in self.get_latest_local_commitment_txn(logger).iter() {
broadcaster.broadcast_transaction(tx);
}
+ self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
}
/// Used in Channel to cheat wrt the update_ids since it plays games, will be removed soon!
}
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
- /// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
- pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
+ /// ChannelManager via ManyChannelMonitor::get_and_clear_pending_monitor_events().
+ pub fn get_and_clear_pending_monitor_events(&mut self) -> Vec<MonitorEvent> {
let mut ret = Vec::new();
- mem::swap(&mut ret, &mut self.pending_htlcs_updated);
+ mem::swap(&mut ret, &mut self.pending_monitor_events);
ret
}
claimable_outpoints.push(ClaimRequest { absolute_timelock: height, aggregable: false, outpoint: BitcoinOutPoint { txid: self.funding_info.0.txid.clone(), vout: self.funding_info.0.index as u32 }, witness_data: InputMaterial::Funding { funding_redeemscript: self.funding_redeemscript.clone() }});
}
if should_broadcast {
+ self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
if let Some(commitment_tx) = self.onchain_tx_handler.get_fully_signed_local_tx(&self.funding_redeemscript) {
+ self.local_tx_signed = true;
let (mut new_outpoints, new_outputs, _) = self.broadcast_by_local_state(&commitment_tx, &self.current_local_commitment_tx);
if !new_outputs.is_empty() {
watch_outputs.push((self.current_local_commitment_tx.txid.clone(), new_outputs));
match ev {
OnchainEvent::HTLCUpdate { htlc_update } => {
log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
- self.pending_htlcs_updated.push(HTLCUpdate {
+ self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
payment_hash: htlc_update.1,
payment_preimage: None,
source: htlc_update.0,
- });
+ }));
},
OnchainEvent::MaturingOutput { descriptor } => {
log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor));
}
}
}
+
self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
self.last_block_hash = block_hash.clone();
self.last_block_hash = block_hash.clone();
}
- pub(super) fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
+ fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
// We need to consider all HTLCs which are:
// * in any unrevoked remote commitment transaction, as they could broadcast said
// transactions and we'd end up in a race, or
if let Some((source, payment_hash)) = payment_data {
let mut payment_preimage = PaymentPreimage([0; 32]);
if accepted_preimage_claim {
- if !self.pending_htlcs_updated.iter().any(|update| update.source == source) {
+ if !self.pending_monitor_events.iter().any(
+ |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
payment_preimage.0.copy_from_slice(&input.witness[3]);
- self.pending_htlcs_updated.push(HTLCUpdate {
+ self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
- });
+ }));
}
} else if offered_preimage_claim {
- if !self.pending_htlcs_updated.iter().any(|update| update.source == source) {
+ if !self.pending_monitor_events.iter().any(
+ |update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
+ upd.source == source
+ } else { false }) {
payment_preimage.0.copy_from_slice(&input.witness[1]);
- self.pending_htlcs_updated.push(HTLCUpdate {
+ self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
- });
+ }));
}
} else {
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
}
}
- let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
- let mut pending_htlcs_updated = Vec::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
- for _ in 0..pending_htlcs_updated_len {
- pending_htlcs_updated.push(Readable::read(reader)?);
+ let pending_monitor_events_len: u64 = Readable::read(reader)?;
+ let mut pending_monitor_events = Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
+ for _ in 0..pending_monitor_events_len {
+ let ev = match <u8 as Readable>::read(reader)? {
+ 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?),
+ 1 => MonitorEvent::CommitmentTxBroadcasted(funding_info.0),
+ _ => return Err(DecodeError::InvalidValue)
+ };
+ pending_monitor_events.push(ev);
}
let pending_events_len: u64 = Readable::read(reader)?;
current_local_commitment_number,
payment_preimages,
- pending_htlcs_updated,
+ pending_monitor_events,
pending_events,
onchain_events_waiting_threshold_conf,
// CLTV expires at TEST_FINAL_CLTV + 1 (current height) + 1 (added in send_payment for
// buffer space).
- {
+ let (close_chan_update_1, close_chan_update_2) = {
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[3].block_notifier.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
for i in 3..TEST_FINAL_CLTV + 2 + LATENCY_GRACE_PERIOD_BLOCKS + 1 {
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[3].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
}
+ let events = nodes[3].node.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 1);
+ let close_chan_update_1 = match events[0] {
+ MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
check_added_monitors!(nodes[3], 1);
// Clear bumped claiming txn spending node 2 commitment tx. Bumped txn are generated after reaching some height timer.
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[4].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
}
-
+ let events = nodes[4].node.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 1);
+ let close_chan_update_2 = match events[0] {
+ MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
check_added_monitors!(nodes[4], 1);
test_txn_broadcast(&nodes[4], &chan_4, None, HTLCType::SUCCESS);
nodes[4].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, TEST_FINAL_CLTV - 5);
check_preimage_claim(&nodes[4], &node_txn);
- }
- get_announce_close_broadcast_events(&nodes, 3, 4);
+ (close_chan_update_1, close_chan_update_2)
+ };
+ nodes[3].net_graph_msg_handler.handle_channel_update(&close_chan_update_2).unwrap();
+ nodes[4].net_graph_msg_handler.handle_channel_update(&close_chan_update_1).unwrap();
assert_eq!(nodes[3].node.list_channels().len(), 0);
assert_eq!(nodes[4].node.list_channels().len(), 0);
}
header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[1].block_notifier.block_connected(&Block { header, txdata: claim_txn }, CHAN_CONFIRM_DEPTH + 1);
- // ChannelManager only polls ManyChannelMonitor::get_and_clear_pending_htlcs_updated when we
+ // ChannelManager only polls ManyChannelMonitor::get_and_clear_pending_monitor_events when we
// probe it for events, so we probe non-message events here (which should still end up empty):
assert_eq!(nodes[1].node.get_and_clear_pending_events().len(), 0);
} else {
use ln::features::{ChannelFeatures, InitFeatures};
use ln::msgs;
use ln::msgs::OptionalField;
-use ln::channelmonitor::HTLCUpdate;
+use ln::channelmonitor::MonitorEvent;
use util::enforcing_trait_impls::EnforcingChannelKeys;
use util::events;
use util::logger::{Logger, Level, Record};
ret
}
- fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
- return self.simple_monitor.get_and_clear_pending_htlcs_updated();
+ fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ return self.simple_monitor.get_and_clear_pending_monitor_events();
}
}