From bd2c839a73817b1464f3e9192a944ad646617123 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 17 Oct 2018 08:47:33 -0400 Subject: [PATCH] Move channel monitor updates inside the channel_state lock This really, really sucks as it defeats almost all of the cross-channel parallelism we'd intended to have - waiting on a client to update a watchtower for an unrelated channel to process any messages is really shitty. We should revisit this with per-channel locks as a compile-time option post-0.1. --- src/ln/channelmanager.rs | 137 ++++++++++++++++++++------------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 774fdf6aa..dd6c7357c 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -1096,7 +1096,7 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); - let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = { + let (first_hop_node_id, update_add, commitment_signed) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -1113,25 +1113,31 @@ impl ChannelManager { if !chan.is_live() { return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); } - chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + match chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { route: route.clone(), session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, - }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? + }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + Some((update_add, commitment_signed)) + }, + None => None, + } }; let first_hop_node_id = route.hops.first().unwrap().pubkey; match res { - Some(msgs) => (first_hop_node_id, msgs), + Some((update_add, commitment_signed)) => { + (first_hop_node_id, update_add, commitment_signed) + }, None => return Ok(()), } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - let mut events = self.pending_events.lock().unwrap(); events.push(events::Event::UpdateHTLCs { node_id: first_hop_node_id, @@ -1184,7 +1190,9 @@ impl ChannelManager { }, None => return } - }; // Release channel lock for install_watch_outpoint call, + }; + // Because we have exclusive ownership of the channel here we can release the channel_state + // lock before add_update_monitor if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } @@ -1299,7 +1307,10 @@ impl ChannelManager { continue; }, }; - new_events.push((Some(monitor), events::Event::UpdateHTLCs { + if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { + unimplemented!();// but def dont push the event... + } + new_events.push(events::Event::UpdateHTLCs { node_id: forward_chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: add_htlc_msgs, @@ -1309,7 +1320,7 @@ impl ChannelManager { update_fee: None, commitment_signed: commitment_msg, }, - })); + }); } } else { for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) { @@ -1322,10 +1333,10 @@ impl ChannelManager { hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data), hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); }, }; - new_events.push((None, events::Event::PaymentReceived { + new_events.push(events::Event::PaymentReceived { payment_hash: forward_info.payment_hash, amt: forward_info.amt_to_forward, - })); + }); } } } @@ -1339,21 +1350,8 @@ impl ChannelManager { } if new_events.is_empty() { return } - - new_events.retain(|event| { - if let &Some(ref monitor) = &event.0 { - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) { - unimplemented!();// but def dont push the event... - } - } - true - }); - let mut events = self.pending_events.lock().unwrap(); - events.reserve(new_events.len()); - for event in new_events.drain(..) { - events.push(event.1); - } + events.append(&mut new_events); } /// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event. @@ -1416,7 +1414,13 @@ impl ChannelManager { let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { - Ok(msg) => (chan.get_their_node_id(), msg), + Ok(Some((msg, commitment_msg, chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (chan.get_their_node_id(), Some((msg, commitment_msg))) + }, + Ok(None) => (chan.get_their_node_id(), None), Err(_e) => { //TODO: Do something with e? return; @@ -1425,13 +1429,9 @@ impl ChannelManager { }; match fail_msgs { - Some((msg, commitment_msg, chan_monitor)) => { + Some((msg, commitment_msg)) => { mem::drop(channel_state); - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!();// but def dont push the event... - } - let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::UpdateHTLCs { node_id, @@ -1496,7 +1496,13 @@ impl ChannelManager { let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { - Ok(msg) => (chan.get_their_node_id(), msg), + Ok((msgs, Some(chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!();// but def dont push the event... + } + (chan.get_their_node_id(), msgs) + }, + Ok((msgs, None)) => (chan.get_their_node_id(), msgs), Err(_e) => { // TODO: There is probably a channel manager somewhere that needs to // learn the preimage as the channel may be about to hit the chain. @@ -1507,13 +1513,7 @@ impl ChannelManager { }; mem::drop(channel_state); - if let Some(chan_monitor) = fulfill_msgs.1 { - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!();// but def dont push the event... - } - } - - if let Some((msg, commitment_msg)) = fulfill_msgs.0 { + if let Some((msg, commitment_msg)) = fulfill_msgs { let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::UpdateHTLCs { node_id: node_id, @@ -1626,10 +1626,9 @@ impl ChannelManager { }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.temporary_channel_id)) } - }; // Release channel lock for install_watch_outpoint call, - // note that this means if the remote end is misbehaving and sends a message for the same - // channel back-to-back with funding_created, we'll end up thinking they sent a message - // for a bogus channel. + }; + // Because we have exclusive ownership of the channel here we can release the channel_state + // lock before add_update_monitor if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) { unimplemented!(); } @@ -1646,7 +1645,7 @@ impl ChannelManager { } fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { - let (funding_txo, user_id, monitor) = { + let (funding_txo, user_id) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -1655,14 +1654,14 @@ impl ChannelManager { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } let chan_monitor = chan.funding_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; - (chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor) + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (chan.get_funding_txo().unwrap(), chan.get_user_id()) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { - unimplemented!(); - } let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::FundingBroadcastSafe { funding_txo: funding_txo, @@ -2042,7 +2041,7 @@ impl ChannelManager { } fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option), MsgHandleErrInternal> { - let (revoke_and_ack, commitment_signed, chan_monitor) = { + let (revoke_and_ack, commitment_signed) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2050,20 +2049,20 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))? + let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (revoke_and_ack, commitment_signed) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - Ok((revoke_and_ack, commitment_signed)) } fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result, MsgHandleErrInternal> { - let ((res, mut pending_forwards, mut pending_failures, chan_monitor), short_channel_id) = { + let ((res, mut pending_forwards, mut pending_failures), short_channel_id) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2071,14 +2070,15 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - (chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel")) + let (res, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + ((res, pending_forwards, pending_failures), chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel")) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } @@ -2169,7 +2169,7 @@ impl ChannelManager { } fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option, Option, Option, RAACommitmentOrder), MsgHandleErrInternal> { - let (res, chan_monitor) = { + let res = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2178,16 +2178,17 @@ impl ChannelManager { } let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg) .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; - (Ok((funding_locked, revoke_and_ack, commitment_update, order)), channel_monitor) + if let Some(monitor) = channel_monitor { + if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { + unimplemented!(); + } + } + Ok((funding_locked, revoke_and_ack, commitment_update, order)) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Some(monitor) = chan_monitor { - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { - unimplemented!(); - } - } + res } -- 2.39.5