pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
where L::Target: Logger {
let announcement = {
+ let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
let mut async_messages = self.state.lock().unwrap();
if async_messages.channel_announce.is_none() {
async_messages.complete = Some(result);
return;
}
+ let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
+ ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
+ ChannelAnnouncement::Unsigned(msg) => &msg,
+ };
+
+ pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
async_messages.channel_announce.take().unwrap()
};
}
}
+struct PendingChecksContext {
+ channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
+}
+
+impl PendingChecksContext {
+ fn lookup_completed(&mut self,
+ msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
+ ) {
+ if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
+ if Weak::ptr_eq(e.get(), &completed_state) {
+ e.remove();
+ }
+ }
+ }
+}
+
/// A set of messages which are pending UTXO lookups for processing.
pub(super) struct PendingChecks {
+ internal: Mutex<PendingChecksContext>,
}
impl PendingChecks {
pub(super) fn new() -> Self {
- PendingChecks {}
+ PendingChecks { internal: Mutex::new(PendingChecksContext {
+ channels: HashMap::new(),
+ }) }
+ }
+
+ fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
+ full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
+ pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
+ ) -> Result<(), msgs::LightningError> {
+ match pending_channels.entry(msg.short_channel_id) {
+ hash_map::Entry::Occupied(mut e) => {
+ // There's already a pending lookup for the given SCID. Check if the messages
+ // are the same and, if so, return immediately (don't bother spawning another
+ // lookup if we haven't gotten that far yet).
+ match Weak::upgrade(&e.get()) {
+ Some(pending_msgs) => {
+ let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
+ Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
+ Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
+ None => {
+ // This shouldn't actually be reachable. We set the
+ // `channel_announce` field under the same lock as setting the
+ // channel map entry. Still, we can just treat it as
+ // non-matching and let the new request fly.
+ debug_assert!(false);
+ false
+ },
+ };
+ if pending_matches {
+ return Err(LightningError {
+ err: "Channel announcement is already being checked".to_owned(),
+ action: ErrorAction::IgnoreDuplicateGossip,
+ });
+ } else {
+ // The earlier lookup is a different message. If we have another
+ // request in-flight now replace the original.
+ // Note that in the replace case whether to replace is somewhat
+ // arbitrary - both results will be handled, we're just updating the
+ // value that will be compared to future lookups with the same SCID.
+ if let Some(item) = replacement {
+ *e.get_mut() = item;
+ }
+ }
+ },
+ None => {
+ // The earlier lookup already resolved. We can't be sure its the same
+ // so just remove/replace it and move on.
+ if let Some(item) = replacement {
+ *e.get_mut() = item;
+ } else { e.remove(); }
+ },
+ }
+ },
+ hash_map::Entry::Vacant(v) => {
+ if let Some(item) = replacement { v.insert(item); }
+ },
+ }
+ Ok(())
}
pub(super) fn check_channel_announcement<U: Deref>(&self,
}
};
+ Self::check_replace_previous_entry(msg, full_msg, None,
+ &mut self.internal.lock().unwrap().channels)?;
+
match utxo_lookup {
&None => {
// Tentatively accept, potentially exposing us to DoS attacks
match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
UtxoResult::Sync(res) => handle_result(res),
UtxoResult::Async(future) => {
+ let mut pending_checks = self.internal.lock().unwrap();
let mut async_messages = future.state.lock().unwrap();
if let Some(res) = async_messages.complete.take() {
// In the unlikely event the future resolved before we managed to get it,
// handle the result in-line.
handle_result(res)
} else {
+ Self::check_replace_previous_entry(msg, full_msg,
+ Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
async_messages.channel_announce = Some(
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
else { ChannelAnnouncement::Unsigned(msg.clone()) });