CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
OMH: 'static + Deref + Send + Sync,
- EH: 'static + EventHandler + Send,
+ EventHandlerFuture: core::future::Future<Output = ()>,
+ EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+ M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
- PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+ PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
{
- let mut should_continue = true;
- define_run_body!(persister, event_handler, chain_monitor, channel_manager,
- gossip_sync, peer_manager, logger, scorer, should_continue, {
+ let mut should_break = true;
+ let async_event_handler = |event| {
+ let network_graph = gossip_sync.network_graph();
+ let event_handler = &event_handler;
+ async move {
+ if let Some(network_graph) = network_graph {
+ handle_network_graph_update(network_graph, &event)
+ }
+ event_handler(event).await;
+ }
+ };
+ define_run_body!(persister,
+ chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+ channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
+ gossip_sync, peer_manager, logger, scorer, should_break, {
select_biased! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
- cont = sleeper(Duration::from_millis(100)).fuse() => {
- should_continue = cont;
+ exit = sleeper(Duration::from_millis(100)).fuse() => {
+ should_break = exit;
false
}
}