use chain::transaction::OutPoint;
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
use util::logger::Logger;
-use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48};
+use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48};
use util::{byte_utils, events};
use std::collections::{HashMap, hash_map, HashSet};
///
/// If you're using this for local monitoring of your own channels, you probably want to use
/// `OutPoint` as the key, which will give you a ManyChannelMonitor implementation.
-pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref> where T::Target: BroadcasterInterface {
+pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: Deref>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
+{
#[cfg(test)] // Used in ChannelManager tests to manipulate channels directly
pub monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
#[cfg(not(test))]
monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: T,
- pending_events: Mutex<Vec<events::Event>>,
logger: Arc<Logger>,
- fee_estimator: Arc<FeeEstimator>
+ fee_estimator: F
}
-impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send> ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T>
- where T::Target: BroadcasterInterface
+impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send>
+ ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T, F>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
- let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
- let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
- if spendable_outputs.len() > 0 {
- new_events.push(events::Event::SpendableOutputs {
- outputs: spendable_outputs,
- });
- }
+ let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
for (ref txid, ref outputs) in txn_outputs {
for (idx, output) in outputs.iter().enumerate() {
}
}
}
- let mut pending_events = self.pending_events.lock().unwrap();
- pending_events.append(&mut new_events);
}
fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
}
}
-impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: Deref> SimpleManyChannelMonitor<Key, ChanSigner, T>
- where T::Target: BroadcasterInterface
+impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: Deref, F: Deref> SimpleManyChannelMonitor<Key, ChanSigner, T, F>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
/// Creates a new object which can be used to monitor several channels given the chain
/// interface with which to register to receive notifications.
- pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: T, logger: Arc<Logger>, feeest: Arc<FeeEstimator>) -> SimpleManyChannelMonitor<Key, ChanSigner, T> {
+ pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: T, logger: Arc<Logger>, feeest: F) -> SimpleManyChannelMonitor<Key, ChanSigner, T, F> {
let res = SimpleManyChannelMonitor {
monitors: Mutex::new(HashMap::new()),
chain_monitor,
broadcaster,
- pending_events: Mutex::new(Vec::new()),
logger,
fee_estimator: feeest,
};
}
}
-impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send> ManyChannelMonitor<ChanSigner> for SimpleManyChannelMonitor<OutPoint, ChanSigner, T>
- where T::Target: BroadcasterInterface
+impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send> ManyChannelMonitor<ChanSigner> for SimpleManyChannelMonitor<OutPoint, ChanSigner, T, F>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
match self.add_monitor_by_key(funding_txo, monitor) {
}
}
-impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref> events::EventsProvider for SimpleManyChannelMonitor<Key, ChanSigner, T>
- where T::Target: BroadcasterInterface
+impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref> events::EventsProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
- let mut pending_events = self.pending_events.lock().unwrap();
- let mut ret = Vec::new();
- mem::swap(&mut ret, &mut *pending_events);
- ret
+ let mut pending_events = Vec::new();
+ for chan in self.monitors.lock().unwrap().values_mut() {
+ pending_events.append(&mut chan.get_and_clear_pending_events());
+ }
+ pending_events
}
}
///
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
/// information and are actively monitoring the chain.
+///
+/// Pending Events or updated HTLCs which have not yet been read out by
+/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
+/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
+/// gotten are fully handled before re-serializing the new state.
pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
latest_update_id: u64,
commitment_transaction_number_obscure_factor: u64,
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
pending_htlcs_updated: Vec<HTLCUpdate>,
+ pending_events: Vec<events::Event>,
destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
self.pending_htlcs_updated != other.pending_htlcs_updated ||
+ self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
data.write(writer)?;
}
+ writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
+ for event in self.pending_events.iter() {
+ event.write(writer)?;
+ }
+
self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
payment_preimages: HashMap::new(),
pending_htlcs_updated: Vec::new(),
+ pending_events: Vec::new(),
destination_script: destination_script.clone(),
to_remote_rescue: None,
// Prune HTLCs from the previous remote commitment tx so we don't generate failure/fulfill
// events for now-revoked/fulfilled HTLCs.
- // TODO: We should probably consider whether we're really getting the next secret here.
if let Storage::Local { ref mut prev_remote_commitment_txid, .. } = self.key_storage {
if let Some(txid) = prev_remote_commitment_txid.take() {
for &mut (_, ref mut source) in self.remote_claimable_outpoints.get_mut(&txid).unwrap() {
ret
}
+ /// Gets the list of pending events which were generated by previous actions, clearing the list
+ /// in the process.
+ ///
+ /// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to
+ /// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do
+ /// no internal locking in ChannelMonitors.
+ pub fn get_and_clear_pending_events(&mut self) -> Vec<events::Event> {
+ let mut ret = Vec::new();
+ mem::swap(&mut ret, &mut self.pending_events);
+ ret
+ }
+
/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
self.commitment_secrets.get_secret(idx)
/// HTLC-Success/HTLC-Timeout transactions.
/// Return updates for HTLC pending in the channel and failed automatically by the broadcast of
/// revoked remote commitment tx
- fn check_spend_remote_transaction(&mut self, tx: &Transaction, height: u32, fee_estimator: &FeeEstimator) -> (Vec<Transaction>, (Sha256dHash, Vec<TxOut>), Vec<SpendableOutputDescriptor>) {
+ fn check_spend_remote_transaction<F: Deref>(&mut self, tx: &Transaction, height: u32, fee_estimator: F) -> (Vec<Transaction>, (Sha256dHash, Vec<TxOut>), Vec<SpendableOutputDescriptor>)
+ where F::Target: FeeEstimator
+ {
// Most secp and related errors trying to create keys means we have no hope of constructing
// a spend transaction...so we return no transactions to broadcast
let mut txn_to_broadcast = Vec::new();
}
/// Attempts to claim a remote HTLC-Success/HTLC-Timeout's outputs using the revocation key
- fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32, fee_estimator: &FeeEstimator) -> (Option<Transaction>, Option<SpendableOutputDescriptor>) {
+ fn check_spend_remote_htlc<F: Deref>(&mut self, tx: &Transaction, commitment_number: u64, height: u32, fee_estimator: F) -> (Option<Transaction>, Option<SpendableOutputDescriptor>)
+ where F::Target: FeeEstimator
+ {
//TODO: send back new outputs to guarantee pending_claim_request consistency
if tx.input.len() != 1 || tx.output.len() != 1 {
return (None, None)
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
- fn block_connected<B: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>)
- where B::Target: BroadcasterInterface
+ fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec<TxOut>)>
+ where B::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
for tx in txn_matched {
let mut output_val = 0;
};
if funding_txo.is_none() || (prevout.txid == funding_txo.as_ref().unwrap().0.txid && prevout.vout == funding_txo.as_ref().unwrap().0.index as u32) {
if (tx.input[0].sequence >> 8*3) as u8 == 0x80 && (tx.lock_time >> 8*3) as u8 == 0x20 {
- let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(&tx, height, fee_estimator);
+ let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(&tx, height, &*fee_estimator);
txn = remote_txn;
spendable_outputs.append(&mut spendable_output);
if !new_outputs.1.is_empty() {
}
} else {
if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) {
- let (tx, spendable_output) = self.check_spend_remote_htlc(&tx, commitment_number, height, fee_estimator);
+ let (tx, spendable_output) = self.check_spend_remote_htlc(&tx, commitment_number, height, &*fee_estimator);
if let Some(tx) = tx {
txn.push(tx);
}
for first_claim_txid in bump_candidates.iter() {
if let Some((new_timer, new_feerate)) = {
if let Some(claim_material) = self.pending_claim_requests.get(first_claim_txid) {
- if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, fee_estimator) {
+ if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, &*fee_estimator) {
broadcaster.broadcast_transaction(&bump_tx);
Some((new_timer, new_feerate))
} else { None }
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
- (watch_outputs, spendable_outputs)
+
+ if spendable_outputs.len() > 0 {
+ self.pending_events.push(events::Event::SpendableOutputs {
+ outputs: spendable_outputs,
+ });
+ }
+
+ watch_outputs
}
- fn block_disconnected<B: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: &FeeEstimator)
- where B::Target: BroadcasterInterface
+ fn block_disconnected<B: Deref, F: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)
+ where B::Target: BroadcasterInterface,
+ F::Target: FeeEstimator
{
log_trace!(self, "Block {} at height {} disconnected", block_hash, height);
let mut bump_candidates = HashMap::new();
}
}
for (_, claim_material) in bump_candidates.iter_mut() {
- if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, fee_estimator) {
+ if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, &*fee_estimator) {
claim_material.height_timer = new_timer;
claim_material.feerate_previous = new_feerate;
broadcaster.broadcast_transaction(&bump_tx);
/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration
/// (CSV or CLTV following cases). In case of high-fee spikes, claim tx may stuck in the mempool, so you need to bump its feerate quickly using Replace-By-Fee or Child-Pay-For-Parent.
- fn bump_claim_tx(&self, height: u32, cached_claim_datas: &ClaimTxBumpMaterial, fee_estimator: &FeeEstimator) -> Option<(u32, u64, Transaction)> {
+ fn bump_claim_tx<F: Deref>(&self, height: u32, cached_claim_datas: &ClaimTxBumpMaterial, fee_estimator: F) -> Option<(u32, u64, Transaction)>
+ where F::Target: FeeEstimator
+ {
if cached_claim_datas.per_input_material.len() == 0 { return None } // But don't prune pending claiming request yet, we may have to resurrect HTLCs
let mut inputs = Vec::new();
for outp in cached_claim_datas.per_input_material.keys() {
pending_htlcs_updated.push(Readable::read(reader)?);
}
+ let pending_events_len: u64 = Readable::read(reader)?;
+ let mut pending_events = Vec::with_capacity(cmp::min(pending_events_len as usize, MAX_ALLOC_SIZE / mem::size_of::<events::Event>()));
+ for _ in 0..pending_events_len {
+ if let Some(event) = MaybeReadable::read(reader)? {
+ pending_events.push(event);
+ }
+ }
+
let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
payment_preimages,
pending_htlcs_updated,
+ pending_events,
destination_script,
to_remote_rescue,