This is important for a number of reasons:
* Firstly, I hit this trying to implement rescan in the demo
bitcoinrpc client - if individual ChannelMonitors are out of
sync with each other, we cannot add them all into a
ManyChannelMonitor together and then rescan, but need to rescan
them individually without having to do a bunch of manual work.
Of the three return values in ChannelMonitor::block_connected,
only the HTLCsource stuff that is moved here makes no sense to
be exposed to the user.
* Secondly, the logic currently in ManyChannelMonitor cannot be
reproduced by the user! HTLCSource is deliberately an opaque
type but we use its data to decide which things to keep when
inserting into the HashMap. This would prevent a user from
properly implementing a replacement ManyChannelMonitor, which is
unacceptable.
* Finally, by moving the tracking into ChannelMonitor, we can
serialize them out, which prevents us from forgetting them when
loading from disk, though there are still other races which need
to be handled to make this fully safe (see TODOs in
ChannelManager).
This is safe as no two entries can have the same HTLCSource across
different channels (or, if they did, it would be a rather serious
bug), though note that, IIRC, when this code was added, the
HTLCSource field in the values was not present.
We also take this opportunity to rename the fetch function to match
our other event interfaces, makaing it clear that by calling the
function the set of HTLCUpdates will also be cleared.
- fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
- return self.simple_monitor.fetch_pending_htlc_updated();
+ fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
+ return self.simple_monitor.get_and_clear_pending_htlcs_updated();
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
- for htlc_update in self.monitor.fetch_pending_htlc_updated() {
+ for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
{
//TODO: This behavior should be documented.
- for htlc_update in self.monitor.fetch_pending_htlc_updated() {
+ for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
/// forward channel and from which info are needed to update HTLC in a backward channel.
/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
/// forward channel and from which info are needed to update HTLC in a backward channel.
+#[derive(Clone, PartialEq)]
pub struct HTLCUpdate {
pub(super) payment_hash: PaymentHash,
pub(super) payment_preimage: Option<PaymentPreimage>,
pub(super) source: HTLCSource
}
pub struct HTLCUpdate {
pub(super) payment_hash: PaymentHash,
pub(super) payment_preimage: Option<PaymentPreimage>,
pub(super) source: HTLCSource
}
+impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source });
/// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between
/// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing
/// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between
/// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
- /// with success or failure backward
- fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
+ /// with success or failure.
+ ///
+ /// You should probably just call through to
+ /// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
+ /// the full list.
+ fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
}
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
}
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: Arc<BroadcasterInterface>,
pending_events: Mutex<Vec<events::Event>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: Arc<BroadcasterInterface>,
pending_events: Mutex<Vec<events::Event>>,
- pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
logger: Arc<Logger>,
fee_estimator: Arc<FeeEstimator>
}
logger: Arc<Logger>,
fee_estimator: Arc<FeeEstimator>
}
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
- let mut htlc_updated_infos = Vec::new();
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
- let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
+ let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
if spendable_outputs.len() > 0 {
new_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
if spendable_outputs.len() > 0 {
new_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
}
}
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
}
}
- htlc_updated_infos.append(&mut htlc_updated);
- }
- }
- {
- // ChannelManager will just need to fetch pending_htlc_updated and pass state backward
- let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
- for htlc in htlc_updated_infos.drain(..) {
- match pending_htlc_updated.entry(htlc.2) {
- hash_map::Entry::Occupied(mut e) => {
- // In case of reorg we may have htlc outputs solved in a different way so
- // we prefer to keep claims but don't store duplicate updates for a given
- // (payment_hash, HTLCSource) pair.
- let mut existing_claim = false;
- e.get_mut().retain(|htlc_data| {
- if htlc.0 == htlc_data.0 {
- if htlc_data.1.is_some() {
- existing_claim = true;
- true
- } else { false }
- } else { true }
- });
- if !existing_claim {
- e.get_mut().push((htlc.0, htlc.1));
- }
- }
- hash_map::Entry::Vacant(e) => {
- e.insert(vec![(htlc.0, htlc.1)]);
- }
- }
}
}
let mut pending_events = self.pending_events.lock().unwrap();
}
}
let mut pending_events = self.pending_events.lock().unwrap();
chain_monitor,
broadcaster,
pending_events: Mutex::new(Vec::new()),
chain_monitor,
broadcaster,
pending_events: Mutex::new(Vec::new()),
- pending_htlc_updated: Mutex::new(HashMap::new()),
logger,
fee_estimator: feeest,
};
logger,
fee_estimator: feeest,
};
- fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
- let mut updated = self.pending_htlc_updated.lock().unwrap();
- let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
- for (k, v) in updated.drain() {
- for htlc_data in v {
- pending_htlcs_updated.push(HTLCUpdate {
- payment_hash: k,
- payment_preimage: htlc_data.1,
- source: htlc_data.0,
- });
- }
+ fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
+ let mut pending_htlcs_updated = Vec::new();
+ for chan in self.monitors.lock().unwrap().values_mut() {
+ pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
}
pending_htlcs_updated
}
}
pending_htlcs_updated
}
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
+ pending_htlcs_updated: Vec<HTLCUpdate>,
+
destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
// back, this is the script we have to spend from but we need to
destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
// back, this is the script we have to spend from but we need to
self.current_remote_commitment_number != other.current_remote_commitment_number ||
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
self.current_remote_commitment_number != other.current_remote_commitment_number ||
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
+ self.pending_htlcs_updated != other.pending_htlcs_updated ||
self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
writer.write_all(&payment_preimage.0[..])?;
}
writer.write_all(&payment_preimage.0[..])?;
}
+ writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
+ for data in self.pending_htlcs_updated.iter() {
+ data.write(writer)?;
+ }
+
self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
current_remote_commitment_number: 1 << 48,
payment_preimages: HashMap::new(),
current_remote_commitment_number: 1 << 48,
payment_preimages: HashMap::new(),
+ pending_htlcs_updated: Vec::new(),
+
destination_script: destination_script,
to_remote_rescue: None,
destination_script: destination_script,
to_remote_rescue: None,
+ /// Get the list of HTLCs who's status has been updated on chain. This should be called by
+ /// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
+ pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
+ let mut ret = Vec::new();
+ mem::swap(&mut ret, &mut self.pending_htlcs_updated);
+ ret
+ }
+
/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
for i in 0..self.old_secrets.len() {
/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
for i in 0..self.old_secrets.len() {
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
- fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
+ fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
for tx in txn_matched {
let mut output_val = 0;
for out in tx.output.iter() {
for tx in txn_matched {
let mut output_val = 0;
for out in tx.output.iter() {
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
let mut watch_outputs = Vec::new();
let mut spendable_outputs = Vec::new();
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
let mut watch_outputs = Vec::new();
let mut spendable_outputs = Vec::new();
- let mut htlc_updated = Vec::new();
let mut bump_candidates = HashSet::new();
for tx in txn_matched {
if tx.input.len() == 1 {
let mut bump_candidates = HashSet::new();
for tx in txn_matched {
if tx.input.len() == 1 {
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
// can also be resolved in a few other ways which can have more than one output. Thus,
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
// can also be resolved in a few other ways which can have more than one output. Thus,
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
- let mut updated = self.is_resolving_htlc_output(&tx, height);
- if updated.len() > 0 {
- htlc_updated.append(&mut updated);
- }
+ self.is_resolving_htlc_output(&tx, height);
// Scan all input to verify is one of the outpoint spent is of interest for us
let mut claimed_outputs_material = Vec::new();
// Scan all input to verify is one of the outpoint spent is of interest for us
let mut claimed_outputs_material = Vec::new();
},
OnchainEvent::HTLCUpdate { htlc_update } => {
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
},
OnchainEvent::HTLCUpdate { htlc_update } => {
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
- htlc_updated.push((htlc_update.0, None, htlc_update.1));
+ self.pending_htlcs_updated.push(HTLCUpdate {
+ payment_hash: htlc_update.1,
+ payment_preimage: None,
+ source: htlc_update.0,
+ });
},
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
self.claimable_outpoints.remove(&outpoint);
},
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
self.claimable_outpoints.remove(&outpoint);
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
- (watch_outputs, spendable_outputs, htlc_updated)
+ (watch_outputs, spendable_outputs)
}
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
}
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
/// Check if any transaction broadcasted is resolving HTLC output by a success or timeout on a local
/// or remote commitment tx, if so send back the source, preimage if found and payment_hash of resolved HTLC
/// Check if any transaction broadcasted is resolving HTLC output by a success or timeout on a local
/// or remote commitment tx, if so send back the source, preimage if found and payment_hash of resolved HTLC
- fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) -> Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)> {
- let mut htlc_updated = Vec::new();
-
+ fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) {
'outer_loop: for input in &tx.input {
let mut payment_data = None;
let revocation_sig_claim = (input.witness.len() == 3 && HTLCType::scriptlen_to_htlctype(input.witness[2].len()) == Some(HTLCType::OfferedHTLC) && input.witness[1].len() == 33)
'outer_loop: for input in &tx.input {
let mut payment_data = None;
let revocation_sig_claim = (input.witness.len() == 3 && HTLCType::scriptlen_to_htlctype(input.witness[2].len()) == Some(HTLCType::OfferedHTLC) && input.witness[1].len() == 33)
let mut payment_preimage = PaymentPreimage([0; 32]);
if accepted_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[3]);
let mut payment_preimage = PaymentPreimage([0; 32]);
if accepted_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[3]);
- htlc_updated.push((source, Some(payment_preimage), payment_hash));
+ self.pending_htlcs_updated.push(HTLCUpdate {
+ source,
+ payment_preimage: Some(payment_preimage),
+ payment_hash
+ });
} else if offered_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[1]);
} else if offered_preimage_claim {
payment_preimage.0.copy_from_slice(&input.witness[1]);
- htlc_updated.push((source, Some(payment_preimage), payment_hash));
+ self.pending_htlcs_updated.push(HTLCUpdate {
+ source,
+ payment_preimage: Some(payment_preimage),
+ payment_hash
+ });
} else {
log_info!(self, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
} else {
log_info!(self, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
}
/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration
}
/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration
+ let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
+ let mut pending_htlcs_updated = Vec::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
+ for _ in 0..pending_htlcs_updated_len {
+ pending_htlcs_updated.push(Readable::read(reader)?);
+ }
+
let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
current_remote_commitment_number,
payment_preimages,
current_remote_commitment_number,
payment_preimages,
destination_script,
to_remote_rescue,
destination_script,
to_remote_rescue,
self.update_ret.lock().unwrap().clone()
}
self.update_ret.lock().unwrap().clone()
}
- fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
- return self.simple_monitor.fetch_pending_htlc_updated();
+ fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
+ return self.simple_monitor.get_and_clear_pending_htlcs_updated();