let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
- let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = {
+ let (first_hop_node_id, update_add, commitment_signed) = {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = channel_state_lock.borrow_parts();
if !chan.is_live() {
return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
}
- chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+ match chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
route: route.clone(),
session_priv: session_priv.clone(),
first_hop_htlc_msat: htlc_msat,
- }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
+ }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? {
+ Some((update_add, commitment_signed, chan_monitor)) => {
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
+ Some((update_add, commitment_signed))
+ },
+ None => None,
+ }
};
let first_hop_node_id = route.hops.first().unwrap().pubkey;
match res {
- Some(msgs) => (first_hop_node_id, msgs),
+ Some((update_add, commitment_signed)) => {
+ (first_hop_node_id, update_add, commitment_signed)
+ },
None => return Ok(()),
}
};
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();
- }
-
let mut events = self.pending_events.lock().unwrap();
events.push(events::Event::UpdateHTLCs {
node_id: first_hop_node_id,
},
None => return
}
- }; // Release channel lock for install_watch_outpoint call,
+ };
+ // Because we have exclusive ownership of the channel here we can release the channel_state
+ // lock before add_update_monitor
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
unimplemented!();
}
continue;
},
};
- new_events.push((Some(monitor), events::Event::UpdateHTLCs {
+ if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+ unimplemented!();// but def dont push the event...
+ }
+ new_events.push(events::Event::UpdateHTLCs {
node_id: forward_chan.get_their_node_id(),
updates: msgs::CommitmentUpdate {
update_add_htlcs: add_htlc_msgs,
update_fee: None,
commitment_signed: commitment_msg,
},
- }));
+ });
}
} else {
for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
};
- new_events.push((None, events::Event::PaymentReceived {
+ new_events.push(events::Event::PaymentReceived {
payment_hash: forward_info.payment_hash,
amt: forward_info.amt_to_forward,
- }));
+ });
}
}
}
}
if new_events.is_empty() { return }
-
- new_events.retain(|event| {
- if let &Some(ref monitor) = &event.0 {
- if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) {
- unimplemented!();// but def dont push the event...
- }
- }
- true
- });
-
let mut events = self.pending_events.lock().unwrap();
- events.reserve(new_events.len());
- for event in new_events.drain(..) {
- events.push(event.1);
- }
+ events.append(&mut new_events);
}
/// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event.
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
- Ok(msg) => (chan.get_their_node_id(), msg),
+ Ok(Some((msg, commitment_msg, chan_monitor))) => {
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
+ (chan.get_their_node_id(), Some((msg, commitment_msg)))
+ },
+ Ok(None) => (chan.get_their_node_id(), None),
Err(_e) => {
//TODO: Do something with e?
return;
};
match fail_msgs {
- Some((msg, commitment_msg, chan_monitor)) => {
+ Some((msg, commitment_msg)) => {
mem::drop(channel_state);
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();// but def dont push the event...
- }
-
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::UpdateHTLCs {
node_id,
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
- Ok(msg) => (chan.get_their_node_id(), msg),
+ Ok((msgs, Some(chan_monitor))) => {
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();// but def dont push the event...
+ }
+ (chan.get_their_node_id(), msgs)
+ },
+ Ok((msgs, None)) => (chan.get_their_node_id(), msgs),
Err(_e) => {
// TODO: There is probably a channel manager somewhere that needs to
// learn the preimage as the channel may be about to hit the chain.
};
mem::drop(channel_state);
- if let Some(chan_monitor) = fulfill_msgs.1 {
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();// but def dont push the event...
- }
- }
-
- if let Some((msg, commitment_msg)) = fulfill_msgs.0 {
+ if let Some((msg, commitment_msg)) = fulfill_msgs {
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::UpdateHTLCs {
node_id: node_id,
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.temporary_channel_id))
}
- }; // Release channel lock for install_watch_outpoint call,
- // note that this means if the remote end is misbehaving and sends a message for the same
- // channel back-to-back with funding_created, we'll end up thinking they sent a message
- // for a bogus channel.
+ };
+ // Because we have exclusive ownership of the channel here we can release the channel_state
+ // lock before add_update_monitor
if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) {
unimplemented!();
}
}
fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
- let (funding_txo, user_id, monitor) = {
+ let (funding_txo, user_id) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
}
let chan_monitor = chan.funding_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
- (chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor)
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
+ (chan.get_funding_txo().unwrap(), chan.get_user_id())
},
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
}
};
- if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
- unimplemented!();
- }
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::FundingBroadcastSafe {
funding_txo: funding_txo,
}
fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>), MsgHandleErrInternal> {
- let (revoke_and_ack, commitment_signed, chan_monitor) = {
+ let (revoke_and_ack, commitment_signed) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
//TODO: here and below MsgHandleErrInternal, #153 case
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
}
- chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?
+ let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
+ (revoke_and_ack, commitment_signed)
},
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
}
};
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();
- }
-
Ok((revoke_and_ack, commitment_signed))
}
fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<Option<msgs::CommitmentUpdate>, MsgHandleErrInternal> {
- let ((res, mut pending_forwards, mut pending_failures, chan_monitor), short_channel_id) = {
+ let ((res, mut pending_forwards, mut pending_failures), short_channel_id) = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
//TODO: here and below MsgHandleErrInternal, #153 case
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
}
- (chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
+ let (res, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();
+ }
+ ((res, pending_forwards, pending_failures), chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
},
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
}
};
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();
- }
for failure in pending_failures.drain(..) {
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
}
}
fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder), MsgHandleErrInternal> {
- let (res, chan_monitor) = {
+ let res = {
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.get_mut(&msg.channel_id) {
Some(chan) => {
}
let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg)
.map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
- (Ok((funding_locked, revoke_and_ack, commitment_update, order)), channel_monitor)
+ if let Some(monitor) = channel_monitor {
+ if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+ unimplemented!();
+ }
+ }
+ Ok((funding_locked, revoke_and_ack, commitment_update, order))
},
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
}
};
- if let Some(monitor) = chan_monitor {
- if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
- unimplemented!();
- }
- }
+
res
}