panic!("db init error: {}", initialization_error);
}
- let initialization = client
- .execute(config::db_announcement_table_creation_query(), &[])
- .await;
- if let Err(initialization_error) = initialization {
- panic!("db init error: {}", initialization_error);
- }
+ let table_creation_queries = [
+ config::db_announcement_table_creation_query(),
+ config::db_channel_update_table_creation_query(),
+ config::db_channel_update_table_creation_query(),
+ config::db_node_announcement_table_creation_query()
+ ];
- let initialization = client
- .execute(
- config::db_channel_update_table_creation_query(),
- &[],
- )
- .await;
- if let Err(initialization_error) = initialization {
- panic!("db init error: {}", initialization_error);
+ for current_table_creation_query in table_creation_queries {
+ let initialization = client
+ .execute(current_table_creation_query, &[])
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
}
let initialization = client
let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
+ GossipMessage::NodeAnnouncement(announcement, seen_override) => {
+ let public_key_hex = announcement.contents.node_id.to_string();
+
+ let mut announcement_signed = Vec::new();
+ announcement.write(&mut announcement_signed).unwrap();
+
+ let features = announcement.contents.features.encode();
+ let timestamp = announcement.contents.timestamp as i64;
+
+ let mut serialized_addresses = Vec::new();
+ announcement.contents.addresses.write(&mut serialized_addresses).unwrap();
+
+ let _task = self.tokio_runtime.spawn(async move {
+ if cfg!(test) && seen_override.is_some() {
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+ .execute("INSERT INTO node_announcements (\
+ public_key, \
+ features, \
+ socket_addresses, \
+ timestamp, \
+ announcement_signed, \
+ seen \
+ ) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6))", &[
+ &public_key_hex,
+ &features,
+ &serialized_addresses,
+ ×tamp,
+ &announcement_signed,
+ &(seen_override.unwrap() as f64)
+ ])).await.unwrap().unwrap();
+ } else {
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+ .execute("INSERT INTO node_announcements (\
+ public_key, \
+ features, \
+ socket_addresses, \
+ timestamp, \
+ announcement_signed \
+ ) VALUES ($1, $2, $3, $4, $5)", &[
+ &public_key_hex,
+ &features,
+ &serialized_addresses,
+ ×tamp,
+ &announcement_signed,
+ ])).await.unwrap().unwrap();
+ }
+ let mut connections_set = connections_cache_ref.lock().await;
+ connections_set.push(client);
+ limiter_ref.add_permits(1);
+ });
+ #[cfg(test)]
+ tasks_spawned.push(_task);
+ },
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;
}
#[cfg(test)]
for task in tasks_spawned {
- task.await;
+ task.await.unwrap();
}
}