use std::sync::{Arc, RwLock};
-use std::time::{SystemTime, UNIX_EPOCH};
use bitcoin::secp256k1::PublicKey;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
use crate::{GossipChainAccess, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
pub(crate) struct GossipCounter {
pub(crate) channel_announcements: u64,
pub(crate) struct GossipRouter {
pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
pub(crate) counter: RwLock<GossipCounter>,
- pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
+ pub(crate) sender: mpsc::Sender<GossipMessage>,
}
impl MessageSendEventsProvider for GossipRouter {
}
fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
- let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-
let mut counter = self.counter.write().unwrap();
let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
counter.channel_announcements += 1;
let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
- let detected_gossip_message = DetectedGossipMessage {
- message: gossip_message,
- timestamp_seen: timestamp_seen as u32,
- };
- let sender = self.sender.clone();
- tokio::spawn(async move {
- let _ = sender.send(detected_gossip_message).await;
- });
+ if let Err(err) = self.sender.try_send(gossip_message) {
+ let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+ tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+ self.sender.send(gossip_message).await.unwrap();
+ })});
+ }
Ok(output_value)
}
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
- let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let output_value = self.native_router.handle_channel_update(msg)?;
let mut counter = self.counter.write().unwrap();
counter.channel_updates += 1;
let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
- let detected_gossip_message = DetectedGossipMessage {
- message: gossip_message,
- timestamp_seen: timestamp_seen as u32,
- };
- let sender = self.sender.clone();
- tokio::spawn(async move {
- let _ = sender.send(detected_gossip_message).await;
- });
+
+ if let Err(err) = self.sender.try_send(gossip_message) {
+ let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+ tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+ self.sender.send(gossip_message).await.unwrap();
+ })});
+ }
Ok(output_value)
}
use tokio_postgres::NoTls;
use crate::{config, hex_utils, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
pub(crate) struct GossipPersister {
- pub(crate) gossip_persistence_sender: mpsc::Sender<DetectedGossipMessage>,
- gossip_persistence_receiver: mpsc::Receiver<DetectedGossipMessage>,
+ pub(crate) gossip_persistence_sender: mpsc::Sender<GossipMessage>,
+ gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
server_sync_completion_sender: mpsc::Sender<()>,
network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
}
impl GossipPersister {
pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
let (gossip_persistence_sender, gossip_persistence_receiver) =
- mpsc::channel::<DetectedGossipMessage>(10000);
+ mpsc::channel::<GossipMessage>(100);
GossipPersister {
gossip_persistence_sender,
gossip_persistence_receiver,
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
// inactivity, some sort of message could be broadcast signaling the activation of request
// processing
- while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await {
+ while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
i += 1; // count the persisted gossip messages
if i == 1 || i % persistence_log_threshold == 0 {
latest_graph_cache_time = Some(Instant::now());
}
- match &detected_gossip_message.message {
+ match &gossip_message {
GossipMessage::InitialSyncComplete => {
// signal to the server that it may now serve dynamic responses and calculate
// snapshots
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant};
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::{PublicKey, SecretKey};
use crate::{config, TestLogger};
use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
+use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager};
use crate::verifier::ChainVerifier;
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
let mut key = [0; 32];
let mut random_data = [0; 32];
thread_rng().fill_bytes(&mut key);
let sleep = tokio::time::sleep(Duration::from_secs(5));
sleep.await;
- let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let router_clone = Arc::clone(&local_router);
{
if needs_to_notify_persister {
needs_to_notify_persister = false;
- let sender = local_persistence_sender.clone();
- tokio::spawn(async move {
- let _ = sender.send(DetectedGossipMessage {
- timestamp_seen: current_timestamp as u32,
- message: GossipMessage::InitialSyncComplete,
- }).await;
- });
+ local_persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
}
}
});
pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, Arc<ErroringMessageHandler>, Arc<GossipRouter>, Arc<TestLogger>, Arc<IgnoringMessageHandler>>>;
+#[derive(Debug)]
pub(crate) enum GossipMessage {
ChannelAnnouncement(ChannelAnnouncement),
ChannelUpdate(ChannelUpdate),
InitialSyncComplete,
}
-pub(crate) struct DetectedGossipMessage {
- pub(crate) timestamp_seen: u32,
- pub(crate) message: GossipMessage,
-}
-
pub(crate) struct TestLogger {}
impl TestLogger {