use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator};
use ln::channel::{Channel, ChannelKeys};
use ln::channelmonitor::ManyChannelMonitor;
-use ln::router::Route;
+use ln::router::{Route,RouteHop};
use ln::msgs;
use ln::msgs::{HandleError,ChannelMessageHandler,MsgEncodable,MsgDecodable};
use util::{byte_utils, events, internal_traits, rng};
use util::sha2::Sha256;
+use crypto;
use crypto::mac::{Mac,MacResult};
use crypto::hmac::Hmac;
use crypto::digest::Digest;
},
OutboundRoute {
route: Route,
+ session_priv: SecretKey,
},
/// Used for channel rebalancing
CycledRoute {
source_short_channel_id: u64,
incoming_packet_shared_secret: SharedSecret,
route: Route,
+ session_priv: SecretKey,
}
}
/// will be accepted on the given channel, and after additional timeout/the closing of all
/// pending HTLCs, the channel will be closed on chain.
pub fn close_channel(&self, channel_id: &Uint256) -> Result<msgs::Shutdown, HandleError> {
- let res = {
- let mut channel_state = self.channel_state.lock().unwrap();
+ let (res, chan_option) = {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
match channel_state.by_id.entry(channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_entry) => {
let res = chan_entry.get_mut().get_shutdown()?;
if chan_entry.get().is_shutdown() {
- chan_entry.remove_entry();
- }
- res
+ if let Some(short_id) = chan_entry.get().get_short_channel_id() {
+ channel_state.short_to_id.remove(&short_id);
+ }
+ (res, Some(chan_entry.remove_entry().1))
+ } else { (res, None) }
},
hash_map::Entry::Vacant(_) => return Err(HandleError{err: "No such channel", msg: None})
}
// unknown_next_peer...I dunno who that is anymore....
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
}
+ if let Some(chan) = chan_option {
+ if let Ok(update) = self.get_channel_update(&chan) {
+ let mut events = self.pending_events.lock().unwrap();
+ events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ }
Ok(res.0)
}
res
}
- fn construct_onion_keys(secp_ctx: &Secp256k1, route: &Route, session_priv: &SecretKey) -> Result<Vec<OnionKeys>, HandleError> {
- let mut res = Vec::with_capacity(route.hops.len());
+ // can only fail if an intermediary hop has an invalid public key or session_priv is invalid
+ #[inline]
+ fn construct_onion_keys_callback<FType: FnMut(SharedSecret, [u8; 32], PublicKey, &RouteHop)> (secp_ctx: &Secp256k1, route: &Route, session_priv: &SecretKey, mut callback: FType) -> Result<(), HandleError> {
let mut blinded_priv = session_priv.clone();
let mut blinded_pub = secp_call!(PublicKey::from_secret_key(secp_ctx, &blinded_priv));
let mut first_iteration = true;
secp_call!(blinded_priv.mul_assign(secp_ctx, &secp_call!(SecretKey::from_slice(secp_ctx, &blinding_factor))));
blinded_pub = secp_call!(PublicKey::from_secret_key(secp_ctx, &blinded_priv));
+ callback(shared_secret, blinding_factor, ephemeral_pubkey, hop);
+ }
+
+ Ok(())
+ }
+
+ // can only fail if an intermediary hop has an invalid public key or session_priv is invalid
+ fn construct_onion_keys(secp_ctx: &Secp256k1, route: &Route, session_priv: &SecretKey) -> Result<Vec<OnionKeys>, HandleError> {
+ let mut res = Vec::with_capacity(route.hops.len());
+
+ Self::construct_onion_keys_callback(secp_ctx, route, session_priv, |shared_secret, _blinding_factor, ephemeral_pubkey, _| {
let (rho, mu) = ChannelManager::gen_rho_mu_from_shared_secret(&shared_secret);
res.push(OnionKeys {
#[cfg(test)]
- shared_secret: shared_secret,
+ shared_secret,
#[cfg(test)]
- blinding_factor: blinding_factor,
- ephemeral_pubkey: ephemeral_pubkey,
- rho: rho,
- mu: mu,
+ blinding_factor: _blinding_factor,
+ ephemeral_pubkey,
+ rho,
+ mu,
});
- }
+ })?;
Ok(res)
}
}
/// only fails if the channel does not yet have an assigned short_id
- fn get_channel_update(&self, chan: &mut Channel) -> Result<msgs::ChannelUpdate, HandleError> {
+ fn get_channel_update(&self, chan: &Channel) -> Result<msgs::ChannelUpdate, HandleError> {
let short_channel_id = match chan.get_short_channel_id() {
None => return Err(HandleError{err: "Channel not yet established", msg: None}),
Some(id) => id,
})
}
- /// Sends a payment along a given route, returning the UpdateAddHTLC message to give to the
- /// first hop in route. Value parameters are provided via the last hop in route, see
- /// documentation for RouteHop fields for more info.
+ /// Sends a payment along a given route.
+ /// Value parameters are provided via the last hop in route, see documentation for RouteHop
+ /// fields for more info.
/// See-also docs on Channel::send_htlc_and_commit.
- pub fn send_payment(&self, route: Route, payment_hash: [u8; 32]) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned)>, HandleError> {
+ /// May generate a SendHTLCs event on success, which should be relayed.
+ pub fn send_payment(&self, route: Route, payment_hash: [u8; 32]) -> Result<(), HandleError> {
if route.hops.len() < 1 || route.hops.len() > 20 {
return Err(HandleError{err: "Route didn't go anywhere/had bogus size", msg: None});
}
let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route)?;
let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, associated_data)?;
- let mut channel_state = self.channel_state.lock().unwrap();
- let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
- None => return Err(HandleError{err: "No channel available with first hop!", msg: None}),
- Some(id) => id.clone()
- };
- let res = {
- let chan = channel_state.by_id.get_mut(&id).unwrap();
- if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
- return Err(HandleError{err: "Node ID mismatch on first hop!", msg: None});
+ let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = {
+ let mut channel_state = self.channel_state.lock().unwrap();
+ let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
+ None => return Err(HandleError{err: "No channel available with first hop!", msg: None}),
+ Some(id) => id.clone()
+ };
+ let res = {
+ let chan = channel_state.by_id.get_mut(&id).unwrap();
+ if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
+ return Err(HandleError{err: "Node ID mismatch on first hop!", msg: None});
+ }
+ chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, onion_packet)?
+ };
+
+ let first_hop_node_id = route.hops.first().unwrap().pubkey;
+
+ if channel_state.claimable_htlcs.insert(payment_hash, PendingOutboundHTLC::OutboundRoute {
+ route,
+ session_priv,
+ }).is_some() {
+ // TODO: We need to track these better, we're not generating these, so a
+ // third-party might make this happen:
+ panic!("payment_hash was repeated! Don't let this happen");
+ }
+
+ match res {
+ Some(msgs) => (first_hop_node_id, msgs),
+ None => return Ok(()),
}
- chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, onion_packet)?
};
- if channel_state.claimable_htlcs.insert(payment_hash, PendingOutboundHTLC::OutboundRoute {
- route: route,
- }).is_some() {
- // TODO: We need to track these better, we're not generating these, so a
- // third-party might make this happen:
- panic!("payment_hash was repeated! Don't let this happen");
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!(); // maybe remove from claimable_htlcs?
}
- Ok(res)
+ let mut events = self.pending_events.lock().unwrap();
+ events.push(events::Event::SendHTLCs {
+ node_id: first_hop_node_id,
+ msgs: vec![update_add],
+ commitment_msg: commitment_signed,
+ });
+ Ok(())
}
/// Call this upon creation of a funding transaction for the given channel.
/// Panics if a funding transaction has already been provided for this channel.
pub fn funding_transaction_generated(&self, temporary_channel_id: &Uint256, funding_txo: (Sha256dHash, u16)) {
- let (chan, msg) = {
+ let (chan, msg, chan_monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.remove(&temporary_channel_id) {
Some(mut chan) => {
match chan.get_outbound_funding_created(funding_txo.0, funding_txo.1) {
Ok(funding_msg) => {
- (chan, funding_msg)
+ (chan, funding_msg.0, funding_msg.1)
},
Err(_e) => {
//TODO: Push e to pendingevents
None => return
}
}; // Release channel lock for install_watch_outpoint call,
- let chan_monitor = chan.channel_monitor();
- match self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- Ok(()) => {},
- Err(_e) => {
- //TODO: Push e to pendingevents?
- return;
- }
- };
-
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!(); // maybe remove from claimable_htlcs?
+ }
{
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::SendFundingCreated {
}
if !add_htlc_msgs.is_empty() {
- let commitment_msg = match forward_chan.send_commitment() {
- Ok(msg) => msg,
+ let (commitment_msg, monitor) = match forward_chan.send_commitment() {
+ Ok(res) => res,
Err(_) => {
//TODO: Handle...this is bad!
continue;
},
};
- new_events.push(events::Event::SendHTLCs {
+ new_events.push((Some(monitor), events::Event::SendHTLCs {
node_id: forward_chan.get_their_node_id(),
msgs: add_htlc_msgs,
commitment_msg: commitment_msg,
- });
+ }));
}
} else {
for forward_info in pending_forwards {
- new_events.push(events::Event::PaymentReceived {
+ new_events.push((None, events::Event::PaymentReceived {
payment_hash: forward_info.payment_hash,
amt: forward_info.amt_to_forward,
- });
+ }));
}
}
}
for failed_forward in failed_forwards.drain(..) {
match failed_forward.2 {
None => self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &failed_forward.0, HTLCFailReason::Reason { failure_code: failed_forward.1, data: Vec::new() }),
- Some(chan_update) => self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &failed_forward.0, HTLCFailReason::Reason { failure_code: failed_forward.1, data: chan_update.encode() }),
+ Some(chan_update) => self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &failed_forward.0, HTLCFailReason::Reason { failure_code: failed_forward.1, data: chan_update.encode_with_len() }),
};
}
if new_events.is_empty() { return }
+ new_events.retain(|event| {
+ if let &Some(ref monitor) = &event.0 {
+ if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) {
+ unimplemented!();// but def dont push the event...
+ }
+ }
+ true
+ });
+
let mut events = self.pending_events.lock().unwrap();
events.reserve(new_events.len());
for event in new_events.drain(..) {
- events.push(event);
+ events.push(event.1);
}
}
};
match pending_htlc {
- PendingOutboundHTLC::CycledRoute { source_short_channel_id, incoming_packet_shared_secret, .. } => {
+ PendingOutboundHTLC::CycledRoute { source_short_channel_id, incoming_packet_shared_secret, route, session_priv } => {
+ channel_state.claimable_htlcs.insert(payment_hash.clone(), PendingOutboundHTLC::OutboundRoute {
+ route,
+ session_priv,
+ });
pending_htlc = PendingOutboundHTLC::IntermediaryHopData { source_short_channel_id, incoming_packet_shared_secret };
},
_ => {}
match pending_htlc {
PendingOutboundHTLC::CycledRoute { .. } => { panic!("WAT"); },
PendingOutboundHTLC::OutboundRoute { .. } => {
- //TODO: DECRYPT route from OutboundRoute
mem::drop(channel_state);
+
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::PaymentFailed {
payment_hash: payment_hash.clone()
};
match fail_msgs {
- Some(msgs) => {
+ Some((msg, commitment_msg, chan_monitor)) => {
mem::drop(channel_state);
+
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();// but def dont push the event...
+ }
+
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::SendFailHTLC {
node_id,
- msg: msgs.0,
- commitment_msg: msgs.1,
+ msg: msg,
+ commitment_msg: commitment_msg,
});
},
None => {},
};
match pending_htlc {
- PendingOutboundHTLC::CycledRoute { source_short_channel_id, incoming_packet_shared_secret, route } => {
+ PendingOutboundHTLC::CycledRoute { source_short_channel_id, incoming_packet_shared_secret, route, session_priv } => {
if from_user { // This was the end hop back to us
pending_htlc = PendingOutboundHTLC::IntermediaryHopData { source_short_channel_id, incoming_packet_shared_secret };
- channel_state.claimable_htlcs.insert(payment_hash, PendingOutboundHTLC::OutboundRoute { route });
+ channel_state.claimable_htlcs.insert(payment_hash, PendingOutboundHTLC::OutboundRoute { route, session_priv });
} else { // This came from the first upstream node
// Bank error in our favor! Maybe we should tell the user this somehow???
- pending_htlc = PendingOutboundHTLC::OutboundRoute { route };
+ pending_htlc = PendingOutboundHTLC::OutboundRoute { route, session_priv };
channel_state.claimable_htlcs.insert(payment_hash, PendingOutboundHTLC::IntermediaryHopData { source_short_channel_id, incoming_packet_shared_secret });
}
},
false
},
PendingOutboundHTLC::IntermediaryHopData { source_short_channel_id, .. } => {
- let (node_id, fulfill_msgs, monitor) = {
+ let (node_id, fulfill_msgs) = {
let chan_id = match channel_state.short_to_id.get(&source_short_channel_id) {
Some(chan_id) => chan_id.clone(),
None => return false
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
match chan.get_update_fulfill_htlc_and_commit(payment_preimage) {
- Ok(msg) => (chan.get_their_node_id(), msg, if from_user { Some(chan.channel_monitor()) } else { None }),
+ Ok(msg) => (chan.get_their_node_id(), msg),
Err(_e) => {
//TODO: Do something with e?
return false;
mem::drop(channel_state);
match fulfill_msgs {
- Some(msgs) => {
+ Some((msg, commitment_msg, chan_monitor)) => {
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();// but def dont push the event...
+ }
+
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::SendFulfillHTLC {
node_id: node_id,
- msg: msgs.0,
- commitment_msg: msgs.1,
+ msg,
+ commitment_msg,
});
},
None => {},
}
-
- //TODO: It may not be possible to handle add_update_monitor fails gracefully, maybe
- //it should return no Err? Sadly, panic!()s instead doesn't help much :(
- if from_user {
- match self.monitor.add_update_monitor(monitor.as_ref().unwrap().get_funding_txo().unwrap(), monitor.unwrap()) {
- Ok(()) => true,
- Err(_) => true,
- }
- } else { true }
+ true
},
}
}
pub fn get_our_node_id(&self) -> PublicKey {
PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key).unwrap()
}
+
+ /// Used to restore channels to normal operation after a
+ /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
+ /// operation.
+ pub fn test_restore_channel_monitor(&self) {
+ unimplemented!();
+ }
}
impl events::EventsProvider for ChannelManager {
impl ChainListener for ChannelManager {
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
- let mut new_funding_locked_messages = Vec::new();
+ let mut new_events = Vec::new();
{
let mut channel_state = self.channel_state.lock().unwrap();
let mut short_to_ids_to_insert = Vec::new();
- for channel in channel_state.by_id.values_mut() {
- match channel.block_connected(header, height, txn_matched, indexes_of_txn_matched) {
- Some(funding_locked) => {
- let announcement_sigs = match self.get_announcement_sigs(channel) {
- Ok(res) => res,
- Err(_e) => {
- //TODO: push e on events and blow up the channel (it has bad keys)
- continue;
+ let mut short_to_ids_to_remove = Vec::new();
+ channel_state.by_id.retain(|_, channel| {
+ if let Some(funding_locked) = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched) {
+ let announcement_sigs = match self.get_announcement_sigs(channel) {
+ Ok(res) => res,
+ Err(_e) => {
+ //TODO: push e on events and blow up the channel (it has bad keys)
+ return true;
+ }
+ };
+ new_events.push(events::Event::SendFundingLocked {
+ node_id: channel.get_their_node_id(),
+ msg: funding_locked,
+ announcement_sigs: announcement_sigs
+ });
+ short_to_ids_to_insert.push((channel.get_short_channel_id().unwrap(), channel.channel_id()));
+ }
+ if let Some(funding_txo) = channel.get_funding_txo() {
+ for tx in txn_matched {
+ for inp in tx.input.iter() {
+ if inp.prev_hash == funding_txo.0 && inp.prev_index == funding_txo.1 as u32 {
+ if let Some(short_id) = channel.get_short_channel_id() {
+ short_to_ids_to_remove.push(short_id);
+ }
+ channel.force_shutdown();
+ if let Ok(update) = self.get_channel_update(&channel) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ return false;
}
- };
- new_funding_locked_messages.push(events::Event::SendFundingLocked {
- node_id: channel.get_their_node_id(),
- msg: funding_locked,
- announcement_sigs: announcement_sigs
+ }
+ }
+ }
+ if channel.channel_monitor().would_broadcast_at_height(height) {
+ if let Some(short_id) = channel.get_short_channel_id() {
+ short_to_ids_to_remove.push(short_id);
+ }
+ channel.force_shutdown();
+ if let Ok(update) = self.get_channel_update(&channel) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
});
- short_to_ids_to_insert.push((channel.get_short_channel_id().unwrap(), channel.channel_id()));
- },
- None => {}
+ }
+ return false;
}
- //TODO: Check if channel was closed (or disabled) here
+ true
+ });
+ for to_remove in short_to_ids_to_remove {
+ channel_state.short_to_id.remove(&to_remove);
}
for to_insert in short_to_ids_to_insert {
channel_state.short_to_id.insert(to_insert.0, to_insert.1);
}
}
let mut pending_events = self.pending_events.lock().unwrap();
- for funding_locked in new_funding_locked_messages.drain(..) {
+ for funding_locked in new_events.drain(..) {
pending_events.push(funding_locked);
}
}
//TODO: broke this - a node shouldn't be able to get their channel removed by sending a
//funding_created a second time, or long after the first, or whatever (note this also
//leaves the short_to_id map in a busted state.
- let chan = {
+ let (chan, funding_msg, monitor_update) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.remove(&msg.temporary_channel_id) {
Some(mut chan) => {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
}
match chan.funding_created(msg) {
- Ok(funding_msg) => {
- (chan, funding_msg)
+ Ok((funding_msg, monitor_update)) => {
+ (chan, funding_msg, monitor_update)
},
Err(e) => {
return Err(e);
// note that this means if the remote end is misbehaving and sends a message for the same
// channel back-to-back with funding_created, we'll end up thinking they sent a message
// for a bogus channel.
- let chan_monitor = chan.0.channel_monitor();
- self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor)?;
+ if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) {
+ unimplemented!();
+ }
let mut channel_state = self.channel_state.lock().unwrap();
- channel_state.by_id.insert(chan.1.channel_id, chan.0);
- Ok(chan.1)
+ channel_state.by_id.insert(funding_msg.channel_id, chan);
+ Ok(funding_msg)
}
fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), HandleError> {
- let (funding_txo, user_id) = {
+ let (funding_txo, user_id, monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
if chan.get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
}
- chan.funding_signed(&msg)?;
- (chan.get_funding_txo().unwrap(), chan.get_user_id())
+ let chan_monitor = chan.funding_signed(&msg)?;
+ (chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor)
},
None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
};
+ if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+ unimplemented!();
+ }
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::FundingBroadcastSafe {
funding_txo: funding_txo,
}
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
- let res = {
- let mut channel_state = self.channel_state.lock().unwrap();
+ let (res, chan_option) = {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
match channel_state.by_id.entry(msg.channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_entry) => {
}
let res = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg)?;
if chan_entry.get().is_shutdown() {
- chan_entry.remove_entry();
- }
- res
+ if let Some(short_id) = chan_entry.get().get_short_channel_id() {
+ channel_state.short_to_id.remove(&short_id);
+ }
+ (res, Some(chan_entry.remove_entry().1))
+ } else { (res, None) }
},
hash_map::Entry::Vacant(_) => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
// unknown_next_peer...I dunno who that is anymore....
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
}
+ if let Some(chan) = chan_option {
+ if let Ok(update) = self.get_channel_update(&chan) {
+ let mut events = self.pending_events.lock().unwrap();
+ events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ }
Ok((res.0, res.1))
}
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
- let res = {
- let mut channel_state = self.channel_state.lock().unwrap();
+ let (res, chan_option) = {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
match channel_state.by_id.entry(msg.channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_entry) => {
if chan_entry.get().get_their_node_id() != *their_node_id {
// also implies there are no pending HTLCs left on the channel, so we can
// fully delete it from tracking (the channel monitor is still around to
// watch for old state broadcasts)!
- chan_entry.remove_entry();
- }
- res
+ if let Some(short_id) = chan_entry.get().get_short_channel_id() {
+ channel_state.short_to_id.remove(&short_id);
+ }
+ (res, Some(chan_entry.remove_entry().1))
+ } else { (res, None) }
},
hash_map::Entry::Vacant(_) => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
if let Some(broadcast_tx) = res.1 {
self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
}
+ if let Some(chan) = chan_option {
+ if let Ok(update) = self.get_channel_update(&chan) {
+ let mut events = self.pending_events.lock().unwrap();
+ events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ }
Ok(res.0)
}
let chan = channel_state.by_id.get_mut(&forwarding_id).unwrap();
if !chan.is_live() {
let chan_update = self.get_channel_update(chan).unwrap();
- return_err!("Forwarding channel is not in a ready state.", 0x4000 | 10, &chan_update.encode()[..]);
+ return_err!("Forwarding channel is not in a ready state.", 0x4000 | 7, &chan_update.encode_with_len()[..]);
}
}
match claimable_htlcs_entry {
hash_map::Entry::Occupied(mut e) => {
let outbound_route = e.get_mut();
- let route = match outbound_route {
- &mut PendingOutboundHTLC::OutboundRoute { ref route } => {
- route.clone()
+ let (route, session_priv) = match outbound_route {
+ &mut PendingOutboundHTLC::OutboundRoute { ref route, ref session_priv } => {
+ (route.clone(), session_priv.clone())
},
_ => { panic!("WAT") },
};
*outbound_route = PendingOutboundHTLC::CycledRoute {
source_short_channel_id,
incoming_packet_shared_secret: shared_secret,
- route
+ route,
+ session_priv,
};
},
hash_map::Entry::Vacant(e) => {
if chan.get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
}
- chan.update_fulfill_htlc(&msg)?;
- chan.channel_monitor()
+ chan.update_fulfill_htlc(&msg)?
},
None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
};
- self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?;
+ if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+ unimplemented!();
+ }
Ok(())
}
- fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), HandleError> {
+ fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<Option<msgs::HTLCFailChannelUpdate>, HandleError> {
let mut channel_state = self.channel_state.lock().unwrap();
- match channel_state.by_id.get_mut(&msg.channel_id) {
+ let payment_hash = match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
if chan.get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
chan.update_fail_htlc(&msg, HTLCFailReason::ErrorPacket { err: msg.reason.clone() })
},
None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
+ }?;
+
+ if let Some(pending_htlc) = channel_state.claimable_htlcs.get(&payment_hash) {
+ match pending_htlc {
+ &PendingOutboundHTLC::OutboundRoute { ref route, ref session_priv } => {
+ // Handle packed channel/node updates for passing back for the route handler
+ let mut packet_decrypted = msg.reason.data.clone();
+ let mut res = None;
+ Self::construct_onion_keys_callback(&self.secp_ctx, &route, &session_priv, |shared_secret, _, _, route_hop| {
+ if res.is_some() { return; }
+
+ let ammag = ChannelManager::gen_ammag_from_shared_secret(&shared_secret);
+
+ let mut decryption_tmp = Vec::with_capacity(packet_decrypted.len());
+ decryption_tmp.resize(packet_decrypted.len(), 0);
+ let mut chacha = ChaCha20::new(&ammag, &[0u8; 8]);
+ chacha.process(&packet_decrypted, &mut decryption_tmp[..]);
+ packet_decrypted = decryption_tmp;
+
+ if let Ok(err_packet) = msgs::DecodedOnionErrorPacket::decode(&packet_decrypted) {
+ if err_packet.failuremsg.len() >= 2 {
+ let um = ChannelManager::gen_um_from_shared_secret(&shared_secret);
+
+ let mut hmac = Hmac::new(Sha256::new(), &um);
+ hmac.input(&err_packet.encode()[32..]);
+ let mut calc_tag = [0u8; 32];
+ hmac.raw_result(&mut calc_tag);
+ if crypto::util::fixed_time_eq(&calc_tag, &err_packet.hmac) {
+ const UNKNOWN_CHAN: u16 = 0x4000|10;
+ const TEMP_CHAN_FAILURE: u16 = 0x4000|7;
+ match byte_utils::slice_to_be16(&err_packet.failuremsg[0..2]) {
+ TEMP_CHAN_FAILURE => {
+ if err_packet.failuremsg.len() >= 4 {
+ let update_len = byte_utils::slice_to_be16(&err_packet.failuremsg[2..4]) as usize;
+ if err_packet.failuremsg.len() >= 4 + update_len {
+ if let Ok(chan_update) = msgs::ChannelUpdate::decode(&err_packet.failuremsg[4..4 + update_len]) {
+ res = Some(msgs::HTLCFailChannelUpdate::ChannelUpdateMessage {
+ msg: chan_update,
+ });
+ }
+ }
+ }
+ },
+ UNKNOWN_CHAN => {
+ // No such next-hop. We know this came from the
+ // current node as the HMAC validated.
+ res = Some(msgs::HTLCFailChannelUpdate::ChannelClosed {
+ short_channel_id: route_hop.short_channel_id
+ });
+ },
+ _ => {}, //TODO: Enumerate all of these!
+ }
+ }
+ }
+ }
+ }).unwrap();
+ Ok(res)
+ },
+ _ => { Ok(None) },
+ }
+ } else {
+ Ok(None)
}
}
}
fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>), HandleError> {
- let (res, monitor) = {
+ let (revoke_and_ack, commitment_signed, chan_monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
if chan.get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
}
- (chan.commitment_signed(&msg)?, chan.channel_monitor())
+ chan.commitment_signed(&msg)?
},
None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
};
- //TODO: Only if we store HTLC sigs
- self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?;
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
- Ok(res)
+ Ok((revoke_and_ack, commitment_signed))
}
fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<Option<msgs::CommitmentUpdate>, HandleError> {
- let ((res, mut pending_forwards, mut pending_failures), monitor) = {
+ let (res, mut pending_forwards, mut pending_failures, chan_monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
if chan.get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None})
}
- (chan.revoke_and_ack(&msg)?, chan.channel_monitor())
+ chan.revoke_and_ack(&msg)?
},
None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None})
}
};
- self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?;
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
for failure in pending_failures.drain(..) {
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &failure.0, failure.1);
}
}
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = channel_state_lock.borrow_parts();
- let short_to_id = channel_state.short_to_id;
- if no_connection_possible {
- channel_state.by_id.retain(move |_, chan| {
- if chan.get_their_node_id() == *their_node_id {
- match chan.get_short_channel_id() {
- Some(short_id) => {
+ let mut new_events = Vec::new();
+ {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
+ let short_to_id = channel_state.short_to_id;
+ if no_connection_possible {
+ channel_state.by_id.retain(|_, chan| {
+ if chan.get_their_node_id() == *their_node_id {
+ if let Some(short_id) = chan.get_short_channel_id() {
short_to_id.remove(&short_id);
- },
- None => {},
+ }
+ let txn_to_broadcast = chan.force_shutdown();
+ for tx in txn_to_broadcast {
+ self.tx_broadcaster.broadcast_transaction(&tx);
+ }
+ if let Ok(update) = self.get_channel_update(&chan) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ false
+ } else {
+ true
+ }
+ });
+ } else {
+ for chan in channel_state.by_id {
+ if chan.1.get_their_node_id() == *their_node_id {
+ //TODO: mark channel disabled (and maybe announce such after a timeout). Also
+ //fail and wipe any uncommitted outbound HTLCs as those are considered after
+ //reconnect.
}
- //TODO: get the latest commitment tx, any HTLC txn built on top of it, etc out
- //of the channel and throw those into the announcement blackhole.
- false
- } else {
- true
- }
- });
- } else {
- for chan in channel_state.by_id {
- if chan.1.get_their_node_id() == *their_node_id {
- //TODO: mark channel disabled (and maybe announce such after a timeout). Also
- //fail and wipe any uncommitted outbound HTLCs as those are considered after
- //reconnect.
}
}
}
+ if !new_events.is_empty() {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ for event in new_events.drain(..) {
+ pending_events.push(event);
+ }
+ }
}
}
use std::default::Default;
use std::sync::{Arc, Mutex};
use std::time::Instant;
+ use std::mem;
fn build_test_onion_keys() -> Vec<OnionKeys> {
// Keys from BOLT 4, used in both test vector tests
};
node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &funding_signed).unwrap();
+ {
+ let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
let events_3 = node_a.node.get_and_clear_pending_events();
assert_eq!(events_3.len(), 1);
(chan_announcement.1, chan_announcement.2, chan_announcement.3, chan_announcement.4)
}
- fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &Uint256, funding_tx: Transaction, close_inbound_first: bool) {
+ fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &Uint256, funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate) {
let (node_a, broadcaster_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster) } else { (&outbound_node.node, &outbound_node.tx_broadcaster) };
let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
let (tx_a, tx_b);
let mut funding_tx_map = HashMap::new();
funding_tx_map.insert(funding_tx.txid(), funding_tx);
tx_a.verify(&funding_tx_map).unwrap();
+
+ let events_1 = node_a.get_and_clear_pending_events();
+ assert_eq!(events_1.len(), 1);
+ let as_update = match events_1[0] {
+ Event::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ let events_2 = node_b.get_and_clear_pending_events();
+ assert_eq!(events_2.len(), 1);
+ let bs_update = match events_2[0] {
+ Event::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ (as_update, bs_update)
}
struct SendEvent {
};
let mut payment_event = {
- let msgs = origin_node.node.send_payment(route, our_payment_hash).unwrap().unwrap();
- SendEvent {
- node_id: expected_route[0].node.get_our_node_id(),
- msgs: vec!(msgs.0),
- commitment_msg: msgs.1,
+ origin_node.node.send_payment(route, our_payment_hash).unwrap();
+ {
+ let mut added_monitors = origin_node.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ added_monitors.clear();
}
+
+ let mut events = origin_node.node.get_and_clear_pending_events();
+ assert_eq!(events.len(), 1);
+ SendEvent::from_event(events.remove(0))
};
let mut prev_node = origin_node;
_ => panic!("Unexpected event"),
}
} else {
- for event in events_2.drain(..) {
- payment_event = SendEvent::from_event(event);
+ {
+ let mut added_monitors = node.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ added_monitors.clear();
}
+ payment_event = SendEvent::from_event(events_2.remove(0));
assert_eq!(payment_event.msgs.len(), 1);
}
let mut next_msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)> = None;
macro_rules! update_fulfill_dance {
- ($node: expr, $prev_node: expr) => {
+ ($node: expr, $prev_node: expr, $last_node: expr) => {
{
$node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0).unwrap();
+ {
+ let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
+ if $last_node {
+ assert_eq!(added_monitors.len(), 1);
+ } else {
+ assert_eq!(added_monitors.len(), 2);
+ assert!(added_monitors[0].0 != added_monitors[1].0);
+ }
+ added_monitors.clear();
+ }
let revoke_and_commit = $node.node.handle_commitment_signed(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap();
{
let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
- assert_eq!(added_monitors.len(), 2);
+ assert_eq!(added_monitors.len(), 1);
added_monitors.clear();
}
assert!($prev_node.node.handle_revoke_and_ack(&$node.node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none());
for node in expected_route.iter().rev() {
assert_eq!(expected_next_node, node.node.get_our_node_id());
if next_msgs.is_some() {
- update_fulfill_dance!(node, prev_node);
+ update_fulfill_dance!(node, prev_node, false);
}
let events = node.node.get_and_clear_pending_events();
}
assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
- update_fulfill_dance!(origin_node, expected_route.first().unwrap());
+ update_fulfill_dance!(origin_node, expected_route.first().unwrap(), true);
let events = origin_node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
}
}
+ const TEST_FINAL_CLTV: u32 = 32;
+
fn route_payment(origin_node: &Node, expected_route: &[&Node], recv_value: u64) -> ([u8; 32], [u8; 32]) {
- let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), &Vec::new(), recv_value, 142).unwrap();
+ let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), &Vec::new(), recv_value, TEST_FINAL_CLTV).unwrap();
assert_eq!(route.hops.len(), expected_route.len());
for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
assert_eq!(hop.pubkey, node.node.get_our_node_id());
}
fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value: u64) {
- let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), &Vec::new(), recv_value, 142).unwrap();
+ let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), &Vec::new(), recv_value, TEST_FINAL_CLTV).unwrap();
assert_eq!(route.hops.len(), expected_route.len());
for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
assert_eq!(hop.pubkey, node.node.get_our_node_id());
claim_payment(&origin, expected_route, our_payment_preimage);
}
- fn send_failed_payment(origin_node: &Node, expected_route: &[&Node]) {
- let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), &Vec::new(), 1000000, 142).unwrap();
- assert_eq!(route.hops.len(), expected_route.len());
- for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
- assert_eq!(hop.pubkey, node.node.get_our_node_id());
- }
- let our_payment_hash = send_along_route(origin_node, route, expected_route, 1000000).1;
-
+ fn fail_payment(origin_node: &Node, expected_route: &[&Node], our_payment_hash: [u8; 32]) {
assert!(expected_route.last().unwrap().node.fail_htlc_backwards(&our_payment_hash));
+ {
+ let mut added_monitors = expected_route.last().unwrap().chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ added_monitors.clear();
+ }
let mut next_msgs: Option<(msgs::UpdateFailHTLC, msgs::CommitmentSigned)> = None;
macro_rules! update_fail_dance {
- ($node: expr, $prev_node: expr) => {
+ ($node: expr, $prev_node: expr, $last_node: expr) => {
{
$node.node.handle_update_fail_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0).unwrap();
let revoke_and_commit = $node.node.handle_commitment_signed(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap();
+
{
let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
assert_eq!(added_monitors.len(), 1);
added_monitors.clear();
}
assert!($prev_node.node.handle_revoke_and_ack(&$node.node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none());
+ {
+ let mut added_monitors = $prev_node.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ added_monitors.clear();
+ }
let revoke_and_ack = $prev_node.node.handle_commitment_signed(&$node.node.get_our_node_id(), &revoke_and_commit.1.unwrap()).unwrap();
- assert!(revoke_and_ack.1.is_none());
{
let mut added_monitors = $prev_node.chan_monitor.added_monitors.lock().unwrap();
- assert_eq!(added_monitors.len(), 2);
+ assert_eq!(added_monitors.len(), 1);
added_monitors.clear();
}
+ assert!(revoke_and_ack.1.is_none());
+ assert!($node.node.get_and_clear_pending_events().is_empty());
assert!($node.node.handle_revoke_and_ack(&$prev_node.node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none());
{
let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
- assert_eq!(added_monitors.len(), 1);
+ if $last_node {
+ assert_eq!(added_monitors.len(), 1);
+ } else {
+ assert_eq!(added_monitors.len(), 2);
+ assert!(added_monitors[0].0 != added_monitors[1].0);
+ }
added_monitors.clear();
}
}
for node in expected_route.iter().rev() {
assert_eq!(expected_next_node, node.node.get_our_node_id());
if next_msgs.is_some() {
- update_fail_dance!(node, prev_node);
+ update_fail_dance!(node, prev_node, false);
}
let events = node.node.get_and_clear_pending_events();
}
assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
- update_fail_dance!(origin_node, expected_route.first().unwrap());
+ update_fail_dance!(origin_node, expected_route.first().unwrap(), true);
let events = origin_node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
send_payment(&nodes[3], &vec!(&nodes[2], &nodes[1])[..], 1000000);
// Test failure packets
- send_failed_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3])[..]);
+ let payment_hash_1 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3])[..], 1000000).1;
+ fail_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3])[..], payment_hash_1);
// Add a new channel that skips 3
let chan_4 = create_announced_chan_between_nodes(&nodes, 1, 3);
pubkey: nodes[1].node.get_our_node_id(),
short_channel_id: chan_4.0.contents.short_channel_id,
fee_msat: 1000000,
- cltv_expiry_delta: 142,
+ cltv_expiry_delta: TEST_FINAL_CLTV,
});
hops[1].fee_msat = chan_4.1.contents.fee_base_msat as u64 + chan_4.1.contents.fee_proportional_millionths as u64 * hops[2].fee_msat as u64 / 1000000;
hops[0].fee_msat = chan_3.0.contents.fee_base_msat as u64 + chan_3.0.contents.fee_proportional_millionths as u64 * hops[1].fee_msat as u64 / 1000000;
pubkey: nodes[1].node.get_our_node_id(),
short_channel_id: chan_2.0.contents.short_channel_id,
fee_msat: 1000000,
- cltv_expiry_delta: 142,
+ cltv_expiry_delta: TEST_FINAL_CLTV,
});
hops[1].fee_msat = chan_2.1.contents.fee_base_msat as u64 + chan_2.1.contents.fee_proportional_millionths as u64 * hops[2].fee_msat as u64 / 1000000;
hops[0].fee_msat = chan_3.1.contents.fee_base_msat as u64 + chan_3.1.contents.fee_proportional_millionths as u64 * hops[1].fee_msat as u64 / 1000000;
- let payment_preimage_2 = send_along_route(&nodes[1], Route { hops }, &vec!(&nodes[3], &nodes[2], &nodes[1])[..], 1000000).0;
+ let payment_hash_2 = send_along_route(&nodes[1], Route { hops }, &vec!(&nodes[3], &nodes[2], &nodes[1])[..], 1000000).1;
// Claim the rebalances...
- claim_payment(&nodes[1], &vec!(&nodes[3], &nodes[2], &nodes[1])[..], payment_preimage_2);
+ fail_payment(&nodes[1], &vec!(&nodes[3], &nodes[2], &nodes[1])[..], payment_hash_2);
claim_payment(&nodes[1], &vec!(&nodes[2], &nodes[3], &nodes[1])[..], payment_preimage_1);
// Add a duplicate new channel from 2 to 4
assert_eq!(node.chan_monitor.added_monitors.lock().unwrap().len(), 0);
}
}
+
+ #[derive(PartialEq)]
+ enum HTLCType { NONE, TIMEOUT, SUCCESS }
+ fn test_txn_broadcast(node: &Node, chan: &(msgs::ChannelUpdate, msgs::ChannelUpdate, Uint256, Transaction), commitment_tx: Option<Transaction>, has_htlc_tx: HTLCType) -> Vec<Transaction> {
+ let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
+ assert!(node_txn.len() >= if commitment_tx.is_some() { 0 } else { 1 } + if has_htlc_tx == HTLCType::NONE { 0 } else { 1 });
+
+ let mut res = Vec::with_capacity(2);
+
+ if let Some(explicit_tx) = commitment_tx {
+ res.push(explicit_tx.clone());
+ } else {
+ for tx in node_txn.iter() {
+ if tx.input.len() == 1 && tx.input[0].prev_hash == chan.3.txid() {
+ let mut funding_tx_map = HashMap::new();
+ funding_tx_map.insert(chan.3.txid(), chan.3.clone());
+ tx.verify(&funding_tx_map).unwrap();
+ res.push(tx.clone());
+ }
+ }
+ }
+ assert_eq!(res.len(), 1);
+
+ if has_htlc_tx != HTLCType::NONE {
+ for tx in node_txn.iter() {
+ if tx.input.len() == 1 && tx.input[0].prev_hash == res[0].txid() {
+ let mut funding_tx_map = HashMap::new();
+ funding_tx_map.insert(res[0].txid(), res[0].clone());
+ tx.verify(&funding_tx_map).unwrap();
+ if has_htlc_tx == HTLCType::TIMEOUT {
+ assert!(tx.lock_time != 0);
+ } else {
+ assert!(tx.lock_time == 0);
+ }
+ res.push(tx.clone());
+ break;
+ }
+ }
+ assert_eq!(res.len(), 2);
+ }
+ node_txn.clear();
+ res
+ }
+
+ fn check_preimage_claim(node: &Node, prev_txn: &Vec<Transaction>) -> Vec<Transaction> {
+ let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
+
+ assert!(node_txn.len() >= 1);
+ assert_eq!(node_txn[0].input.len(), 1);
+ let mut found_prev = false;
+
+ for tx in prev_txn {
+ if node_txn[0].input[0].prev_hash == tx.txid() {
+ let mut funding_tx_map = HashMap::new();
+ funding_tx_map.insert(tx.txid(), tx.clone());
+ node_txn[0].verify(&funding_tx_map).unwrap();
+
+ assert!(node_txn[0].input[0].witness[2].len() > 106); // must spend an htlc output
+ assert_eq!(tx.input.len(), 1); // must spend a commitment tx
+
+ found_prev = true;
+ break;
+ }
+ }
+ assert!(found_prev);
+
+ let mut res = Vec::new();
+ mem::swap(&mut *node_txn, &mut res);
+ res
+ }
+
+ fn get_announce_close_broadcast_events(nodes: &Vec<Node>, a: usize, b: usize) {
+ let events_1 = nodes[a].node.get_and_clear_pending_events();
+ assert_eq!(events_1.len(), 1);
+ let as_update = match events_1[0] {
+ Event::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ let events_2 = nodes[b].node.get_and_clear_pending_events();
+ assert_eq!(events_2.len(), 1);
+ let bs_update = match events_2[0] {
+ Event::BroadcastChannelUpdate { ref msg } => {
+ msg.clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ for node in nodes {
+ node.router.handle_channel_update(&as_update).unwrap();
+ node.router.handle_channel_update(&bs_update).unwrap();
+ }
+ }
+
+ #[test]
+ fn channel_monitor_network_test() {
+ // Simple test which builds a network of ChannelManagers, connects them to each other, and
+ // tests that ChannelMonitor is able to recover from various states.
+ let nodes = create_network(5);
+
+ // Create some initial channels
+ let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1);
+ let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2);
+ let chan_3 = create_announced_chan_between_nodes(&nodes, 2, 3);
+ let chan_4 = create_announced_chan_between_nodes(&nodes, 3, 4);
+
+ // Rebalance the network a bit by relaying one payment through all the channels...
+ send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3], &nodes[4])[..], 8000000);
+ send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3], &nodes[4])[..], 8000000);
+ send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3], &nodes[4])[..], 8000000);
+ send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2], &nodes[3], &nodes[4])[..], 8000000);
+
+ // Simple case with no pending HTLCs:
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), true);
+ {
+ let node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
+ let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0]; 1], &[4; 1]);
+ assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+ }
+ get_announce_close_broadcast_events(&nodes, 0, 1);
+ assert_eq!(nodes[0].node.list_channels().len(), 0);
+ assert_eq!(nodes[1].node.list_channels().len(), 1);
+
+ // One pending HTLC is discarded by the force-close:
+ let payment_preimage_1 = route_payment(&nodes[1], &vec!(&nodes[2], &nodes[3])[..], 3000000).0;
+
+ // Simple case of one pending HTLC to HTLC-Timeout
+ nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id(), true);
+ {
+ let node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
+ let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[2].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0]; 1], &[4; 1]);
+ assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+ }
+ get_announce_close_broadcast_events(&nodes, 1, 2);
+ assert_eq!(nodes[1].node.list_channels().len(), 0);
+ assert_eq!(nodes[2].node.list_channels().len(), 1);
+
+ macro_rules! claim_funds {
+ ($node: expr, $prev_node: expr, $preimage: expr) => {
+ {
+ assert!($node.node.claim_funds($preimage));
+ {
+ let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ added_monitors.clear();
+ }
+
+ let events = $node.node.get_and_clear_pending_events();
+ assert_eq!(events.len(), 1);
+ match events[0] {
+ Event::SendFulfillHTLC { ref node_id, .. } => {
+ assert_eq!(*node_id, $prev_node.node.get_our_node_id());
+ },
+ _ => panic!("Unexpected event"),
+ };
+ }
+ }
+ }
+
+ // nodes[3] gets the preimage, but nodes[2] already disconnected, resulting in a nodes[2]
+ // HTLC-Timeout and a nodes[3] claim against it (+ its own announces)
+ nodes[2].node.peer_disconnected(&nodes[3].node.get_our_node_id(), true);
+ {
+ let node_txn = test_txn_broadcast(&nodes[2], &chan_3, None, HTLCType::TIMEOUT);
+
+ // Claim the payment on nodes[3], giving it knowledge of the preimage
+ claim_funds!(nodes[3], nodes[2], payment_preimage_1);
+
+ let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[3].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0]; 1], &[4; 1]);
+
+ check_preimage_claim(&nodes[3], &node_txn);
+ }
+ get_announce_close_broadcast_events(&nodes, 2, 3);
+ assert_eq!(nodes[2].node.list_channels().len(), 0);
+ assert_eq!(nodes[3].node.list_channels().len(), 1);
+
+ // One pending HTLC to time out:
+ let payment_preimage_2 = route_payment(&nodes[3], &vec!(&nodes[4])[..], 3000000).0;
+
+ {
+ let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[3].chain_monitor.block_connected_checked(&header, 1, &Vec::new()[..], &[0; 0]);
+ for i in 2..TEST_FINAL_CLTV - 5 {
+ header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[3].chain_monitor.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
+ }
+
+ let node_txn = test_txn_broadcast(&nodes[3], &chan_4, None, HTLCType::TIMEOUT);
+
+ // Claim the payment on nodes[3], giving it knowledge of the preimage
+ claim_funds!(nodes[4], nodes[3], payment_preimage_2);
+
+ header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[4].chain_monitor.block_connected_checked(&header, 1, &Vec::new()[..], &[0; 0]);
+ for i in 2..TEST_FINAL_CLTV - 5 {
+ header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[4].chain_monitor.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
+ }
+
+ test_txn_broadcast(&nodes[4], &chan_4, None, HTLCType::SUCCESS);
+
+ header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[4].chain_monitor.block_connected_checked(&header, TEST_FINAL_CLTV - 5, &[&node_txn[0]; 1], &[4; 1]);
+
+ check_preimage_claim(&nodes[4], &node_txn);
+ }
+ get_announce_close_broadcast_events(&nodes, 3, 4);
+ assert_eq!(nodes[3].node.list_channels().len(), 0);
+ assert_eq!(nodes[4].node.list_channels().len(), 0);
+
+ // Create some new channels:
+ let chan_5 = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+ // A pending HTLC which will be revoked:
+ let payment_preimage_3 = route_payment(&nodes[0], &vec!(&nodes[1])[..], 3000000).0;
+ // Get the will-be-revoked local txn from nodes[0]
+ let revoked_local_txn = nodes[0].node.channel_state.lock().unwrap().by_id.iter().next().unwrap().1.last_local_commitment_txn.clone();
+ // Revoke the old state
+ claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage_3);
+
+ {
+ let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[1].chain_monitor.block_connected_checked(&header, 1, &vec![&revoked_local_txn[0]; 1], &[4; 1]);
+ {
+ let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ assert_eq!(node_txn.len(), 1);
+ assert_eq!(node_txn[0].input.len(), 1);
+
+ let mut funding_tx_map = HashMap::new();
+ funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone());
+ node_txn[0].verify(&funding_tx_map).unwrap();
+ node_txn.clear();
+ }
+
+ nodes[0].chain_monitor.block_connected_checked(&header, 1, &vec![&revoked_local_txn[0]; 1], &[4; 0]);
+ let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);
+ header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[1]; 1], &[4; 1]);
+
+ //TODO: At this point nodes[1] should claim the revoked HTLC-Timeout output, but that's
+ //not yet implemented in ChannelMonitor
+ }
+ get_announce_close_broadcast_events(&nodes, 0, 1);
+ assert_eq!(nodes[0].node.list_channels().len(), 0);
+ assert_eq!(nodes[1].node.list_channels().len(), 0);
+
+ // Check that we processed all pending events
+ for node in nodes {
+ assert_eq!(node.node.get_and_clear_pending_events().len(), 0);
+ assert_eq!(node.chan_monitor.added_monitors.lock().unwrap().len(), 0);
+ }
+ }
}