use util::errors::APIError;
use core::{cmp, mem};
+use std::cell::RefCell;
use std::collections::{HashMap, hash_map, HashSet};
use std::io::{Cursor, Read};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
/// Process pending events from the `chain::Watch`, returning whether any events were processed.
fn process_pending_monitor_events(&self) -> bool {
let mut failed_channels = Vec::new();
- let has_pending_monitor_events = {
- let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
- let has_pending_monitor_events = !pending_monitor_events.is_empty();
- for monitor_event in pending_monitor_events {
- match monitor_event {
- MonitorEvent::HTLCEvent(htlc_update) => {
- if let Some(preimage) = htlc_update.payment_preimage {
- log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
- } else {
- log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
- self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+ let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+ let has_pending_monitor_events = !pending_monitor_events.is_empty();
+ for monitor_event in pending_monitor_events {
+ match monitor_event {
+ MonitorEvent::HTLCEvent(htlc_update) => {
+ if let Some(preimage) = htlc_update.payment_preimage {
+ log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+ self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
+ } else {
+ log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+ self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+ }
+ },
+ MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = &mut *channel_lock;
+ let by_id = &mut channel_state.by_id;
+ let short_to_id = &mut channel_state.short_to_id;
+ let pending_msg_events = &mut channel_state.pending_msg_events;
+ if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
+ if let Some(short_id) = chan.get_short_channel_id() {
+ short_to_id.remove(&short_id);
}
- },
- MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
- let by_id = &mut channel_state.by_id;
- let short_to_id = &mut channel_state.short_to_id;
- let pending_msg_events = &mut channel_state.pending_msg_events;
- if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
- if let Some(short_id) = chan.get_short_channel_id() {
- short_to_id.remove(&short_id);
- }
- failed_channels.push(chan.force_shutdown(false));
- if let Ok(update) = self.get_channel_update(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
- });
- }
- pending_msg_events.push(events::MessageSendEvent::HandleError {
- node_id: chan.get_counterparty_node_id(),
- action: msgs::ErrorAction::SendErrorMessage {
- msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
- },
+ failed_channels.push(chan.force_shutdown(false));
+ if let Ok(update) = self.get_channel_update(&chan) {
+ pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
});
}
- },
- }
+ pending_msg_events.push(events::MessageSendEvent::HandleError {
+ node_id: chan.get_counterparty_node_id(),
+ action: msgs::ErrorAction::SendErrorMessage {
+ msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+ },
+ });
+ }
+ },
}
- has_pending_monitor_events
- };
+ }
for failure in failed_channels.drain(..) {
self.finish_force_close_channel(failure);
}
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
+ /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
+ /// update was applied.
+ ///
/// This should only apply to HTLCs which were added to the holding cell because we were
/// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
/// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
/// code to inform them of a channel monitor update.
- fn check_free_holding_cells(&self) {
+ fn check_free_holding_cells(&self) -> bool {
+ let mut has_monitor_update = false;
let mut failed_htlcs = Vec::new();
let mut handle_errors = Vec::new();
{
by_id.retain(|channel_id, chan| {
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
- Ok((None, ref htlcs)) if htlcs.is_empty() => true,
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
- failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+ if !holding_cell_failed_htlcs.is_empty() {
+ failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+ }
if let Some((commitment_update, monitor_update)) = commitment_opt {
if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+ has_monitor_update = true;
let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
handle_errors.push((chan.get_counterparty_node_id(), res));
if close_channel { return false; }
}
});
}
+
+ let has_update = has_monitor_update || !failed_htlcs.is_empty();
for (failures, channel_id) in failed_htlcs.drain(..) {
self.fail_holding_cell_htlcs(failures, channel_id);
}
for (counterparty_node_id, err) in handle_errors.drain(..) {
let _ = handle_error!(self, err, counterparty_node_id);
}
+
+ has_update
}
/// Handle a list of channel failures during a block_connected or block_disconnected call,
L::Target: Logger,
{
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
- //TODO: This behavior should be documented. It's non-intuitive that we query
- // ChannelMonitors when clearing other events.
- self.process_pending_monitor_events();
+ let events = RefCell::new(Vec::new());
+ PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+ let mut result = NotifyOption::SkipPersist;
- self.check_free_holding_cells();
+ // TODO: This behavior should be documented. It's unintuitive that we query
+ // ChannelMonitors when clearing other events.
+ if self.process_pending_monitor_events() {
+ result = NotifyOption::DoPersist;
+ }
- let mut ret = Vec::new();
- let mut channel_state = self.channel_state.lock().unwrap();
- mem::swap(&mut ret, &mut channel_state.pending_msg_events);
- ret
+ if self.check_free_holding_cells() {
+ result = NotifyOption::DoPersist;
+ }
+
+ let mut pending_events = Vec::new();
+ let mut channel_state = self.channel_state.lock().unwrap();
+ mem::swap(&mut pending_events, &mut channel_state.pending_msg_events);
+
+ if !pending_events.is_empty() {
+ events.replace(pending_events);
+ }
+
+ result
+ });
+ events.into_inner()
}
}
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
let _consistency_lock = self.total_consistency_lock.write().unwrap();
- writer.write_all(&[SERIALIZATION_VERSION; 1])?;
- writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
+ write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
self.genesis_hash.write(writer)?;
{
session_priv.write(writer)?;
}
+ write_tlv_fields!(writer, {});
+
Ok(())
}
}
L::Target: Logger,
{
fn read<R: ::std::io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
- let _ver: u8 = Readable::read(reader)?;
- let min_ver: u8 = Readable::read(reader)?;
- if min_ver > SERIALIZATION_VERSION {
- return Err(DecodeError::UnknownVersion);
- }
+ let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
let genesis_hash: BlockHash = Readable::read(reader)?;
let best_block_height: u32 = Readable::read(reader)?;
}
}
+ read_tlv_fields!(reader, {}, {});
+
let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());