/// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
/// notify or not based on whether relevant changes have been made, providing a closure to
/// `optionally_notify` which returns a `NotifyOption`.
-struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
+struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> {
event_persist_notifier: &'a Notifier,
needs_persist_flag: &'a AtomicBool,
should_persist: F,
/// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
/// other cases where losing the changes on restart may result in a force-close or otherwise
/// isn't ideal.
- fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+ fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
}
- fn optionally_notify<F: Fn() -> NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F)
- -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+ fn optionally_notify<F: FnMut() -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
+ -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
let force_notify = cm.get_cm().process_background_events();
}
}
-impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
+impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
fn drop(&mut self) {
match (self.should_persist)() {
NotifyOption::DoPersist => {
return Err(());
}
- let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+ let mut res = Ok(());
- // If we have too many peers connected which don't have funded channels, disconnect the
- // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
- // unfunded channels taking up space in memory for disconnected peers, we still let new
- // peers connect, but we'll reject new channels from them.
- let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
- let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
+ PersistenceNotifierGuard::optionally_notify(self, || {
+ // If we have too many peers connected which don't have funded channels, disconnect the
+ // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
+ // unfunded channels taking up space in memory for disconnected peers, we still let new
+ // peers connect, but we'll reject new channels from them.
+ let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
+ let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
- {
- let mut peer_state_lock = self.per_peer_state.write().unwrap();
- match peer_state_lock.entry(counterparty_node_id.clone()) {
- hash_map::Entry::Vacant(e) => {
- if inbound_peer_limited {
- return Err(());
- }
- e.insert(Mutex::new(PeerState {
- channel_by_id: HashMap::new(),
- inbound_channel_request_by_id: HashMap::new(),
- latest_features: init_msg.features.clone(),
- pending_msg_events: Vec::new(),
- in_flight_monitor_updates: BTreeMap::new(),
- monitor_update_blocked_actions: BTreeMap::new(),
- actions_blocking_raa_monitor_updates: BTreeMap::new(),
- is_connected: true,
- }));
- },
- hash_map::Entry::Occupied(e) => {
- let mut peer_state = e.get().lock().unwrap();
- peer_state.latest_features = init_msg.features.clone();
-
- let best_block_height = self.best_block.read().unwrap().height();
- if inbound_peer_limited &&
- Self::unfunded_channel_count(&*peer_state, best_block_height) ==
- peer_state.channel_by_id.len()
- {
- return Err(());
- }
+ {
+ let mut peer_state_lock = self.per_peer_state.write().unwrap();
+ match peer_state_lock.entry(counterparty_node_id.clone()) {
+ hash_map::Entry::Vacant(e) => {
+ if inbound_peer_limited {
+ res = Err(());
+ return NotifyOption::SkipPersistNoEvents;
+ }
+ e.insert(Mutex::new(PeerState {
+ channel_by_id: HashMap::new(),
+ inbound_channel_request_by_id: HashMap::new(),
+ latest_features: init_msg.features.clone(),
+ pending_msg_events: Vec::new(),
+ in_flight_monitor_updates: BTreeMap::new(),
+ monitor_update_blocked_actions: BTreeMap::new(),
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
+ is_connected: true,
+ }));
+ },
+ hash_map::Entry::Occupied(e) => {
+ let mut peer_state = e.get().lock().unwrap();
+ peer_state.latest_features = init_msg.features.clone();
+
+ let best_block_height = self.best_block.read().unwrap().height();
+ if inbound_peer_limited &&
+ Self::unfunded_channel_count(&*peer_state, best_block_height) ==
+ peer_state.channel_by_id.len()
+ {
+ res = Err(());
+ return NotifyOption::SkipPersistNoEvents;
+ }
- debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
- peer_state.is_connected = true;
- },
+ debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
+ peer_state.is_connected = true;
+ },
+ }
}
- }
- log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
+ log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
- let per_peer_state = self.per_peer_state.read().unwrap();
- if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- let pending_msg_events = &mut peer_state.pending_msg_events;
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ let pending_msg_events = &mut peer_state.pending_msg_events;
- peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
- if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
- // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
- // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
- // worry about closing and removing them.
- debug_assert!(false);
- None
- }
- ).for_each(|chan| {
- pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
- node_id: chan.context.get_counterparty_node_id(),
- msg: chan.get_channel_reestablish(&self.logger),
+ peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
+ if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
+ // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
+ // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
+ // worry about closing and removing them.
+ debug_assert!(false);
+ None
+ }
+ ).for_each(|chan| {
+ pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+ node_id: chan.context.get_counterparty_node_id(),
+ msg: chan.get_channel_reestablish(&self.logger),
+ });
});
- });
- }
- //TODO: Also re-broadcast announcement_signatures
- Ok(())
+ }
+
+ return NotifyOption::SkipPersistHandleEvents;
+ //TODO: Also re-broadcast announcement_signatures
+ });
+ res
}
fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {