use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::ops::Deref;
-use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::log_info;
}
pub(crate) async fn persist_gossip(&mut self) {
- let mut client = crate::connect_to_db().await;
+ { // initialize the database
+ // this client instance is only used once
+ let mut client = crate::connect_to_db().await;
- {
- // initialize the database
let initialization = client
.execute(config::db_config_table_creation_query(), &[])
.await;
panic!("db init error: {}", initialization_error);
}
- let initialization = client
- .execute(config::db_announcement_table_creation_query(), &[])
- .await;
- if let Err(initialization_error) = initialization {
- panic!("db init error: {}", initialization_error);
- }
+ let table_creation_queries = [
+ config::db_announcement_table_creation_query(),
+ config::db_channel_update_table_creation_query(),
+ config::db_channel_update_table_creation_query(),
+ config::db_node_announcement_table_creation_query()
+ ];
- let initialization = client
- .execute(
- config::db_channel_update_table_creation_query(),
- &[],
- )
- .await;
- if let Err(initialization_error) = initialization {
- panic!("db init error: {}", initialization_error);
+ for current_table_creation_query in table_creation_queries {
+ let initialization = client
+ .execute(current_table_creation_query, &[])
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
}
let initialization = client
insert_limiter.acquire().await.unwrap().forget();
let limiter_ref = Arc::clone(&insert_limiter);
+ let client = {
+ let mut connections_set = connections_cache.lock().await;
+ let client = if connections_set.is_empty() {
+ crate::connect_to_db().await
+ } else {
+ connections_set.pop().unwrap()
+ };
+ client
+ };
+
let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
+ GossipMessage::NodeAnnouncement(announcement, seen_override) => {
+ let public_key_hex = announcement.contents.node_id.to_string();
+
+ let mut announcement_signed = Vec::new();
+ announcement.write(&mut announcement_signed).unwrap();
+
+ let features = announcement.contents.features.encode();
+ let timestamp = announcement.contents.timestamp as i64;
+
+ let mut serialized_addresses = Vec::new();
+ announcement.contents.addresses.write(&mut serialized_addresses).unwrap();
+
+ let _task = self.tokio_runtime.spawn(async move {
+ if cfg!(test) && seen_override.is_some() {
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+ .execute("INSERT INTO node_announcements (\
+ public_key, \
+ features, \
+ socket_addresses, \
+ timestamp, \
+ announcement_signed, \
+ seen \
+ ) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6))", &[
+ &public_key_hex,
+ &features,
+ &serialized_addresses,
+ ×tamp,
+ &announcement_signed,
+ &(seen_override.unwrap() as f64)
+ ])).await.unwrap().unwrap();
+ } else {
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+ .execute("INSERT INTO node_announcements (\
+ public_key, \
+ features, \
+ socket_addresses, \
+ timestamp, \
+ announcement_signed \
+ ) VALUES ($1, $2, $3, $4, $5)", &[
+ &public_key_hex,
+ &features,
+ &serialized_addresses,
+ ×tamp,
+ &announcement_signed,
+ ])).await.unwrap().unwrap();
+ }
+ let mut connections_set = connections_cache_ref.lock().await;
+ connections_set.push(client);
+ limiter_ref.add_permits(1);
+ });
+ #[cfg(test)]
+ tasks_spawned.push(_task);
+ },
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;
announcement.write(&mut announcement_signed).unwrap();
let _task = self.tokio_runtime.spawn(async move {
- let client;
- {
- let mut connections_set = connections_cache_ref.lock().await;
- if connections_set.is_empty() {
- mem::drop(connections_set);
- client = crate::connect_to_db().await;
- } else {
- client = connections_set.pop().unwrap();
- }
- }
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;
let _task = self.tokio_runtime.spawn(async move {
- let client;
- {
- let mut connections_set = connections_cache_ref.lock().await;
- if connections_set.is_empty() {
- mem::drop(connections_set);
- client = crate::connect_to_db().await;
- } else {
- client = connections_set.pop().unwrap();
- }
- }
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
}
#[cfg(test)]
for task in tasks_spawned {
- task.await;
+ task.await.unwrap();
}
}