use tokio::sync::mpsc;
use tokio_postgres::NoTls;
-use crate::{config, hex_utils, TestLogger};
+use crate::{config, TestLogger};
use crate::types::GossipMessage;
+const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
+
pub(crate) struct GossipPersister {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
network_graph: Arc<NetworkGraph<TestLogger>>,
match &gossip_message {
GossipMessage::ChannelAnnouncement(announcement) => {
- let scid = announcement.contents.short_channel_id;
- let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
- // scid is 8 bytes
- // block height is the first three bytes
- // to obtain block height, shift scid right by 5 bytes (40 bits)
- let block_height = (scid >> 5 * 8) as i32;
- let chain_hash = announcement.contents.chain_hash.as_ref();
- let chain_hash_hex = hex_utils::hex_str(chain_hash);
+ let scid = announcement.contents.short_channel_id as i64;
// start with the type prefix, which is already known a priori
let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();
- let result = client
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
- block_height, \
- chain_hash, \
announcement_signed \
- ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[
- &scid_hex,
- &block_height,
- &chain_hash_hex,
+ ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
+ &scid,
&announcement_signed
- ]).await;
- if result.is_err() {
- panic!("error: {}", result.err().unwrap());
- }
+ ])).await.unwrap().unwrap();
}
GossipMessage::ChannelUpdate(update) => {
- let scid = update.contents.short_channel_id;
- let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
-
- let chain_hash = update.contents.chain_hash.as_ref();
- let chain_hash_hex = hex_utils::hex_str(chain_hash);
+ let scid = update.contents.short_channel_id as i64;
let timestamp = update.contents.timestamp as i64;
- let channel_flags = update.contents.flags as i32;
- let direction = channel_flags & 1;
- let disable = (channel_flags & 2) > 0;
-
- let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction);
+ let direction = (update.contents.flags & 1) == 1;
+ let disable = (update.contents.flags & 2) > 0;
let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32;
let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64;
let mut update_signed = Vec::new();
update.write(&mut update_signed).unwrap();
- let result = client
+ tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_updates (\
- composite_index, \
- chain_hash, \
short_channel_id, \
timestamp, \
channel_flags, \
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
- ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (composite_index) DO NOTHING", &[
- &composite_index,
- &chain_hash_hex,
- &scid_hex,
+ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[
+ &scid,
×tamp,
- &channel_flags,
+ &(update.contents.flags as i16),
&direction,
&disable,
&cltv_expiry_delta,
&fee_proportional_millionths,
&htlc_maximum_msat,
&update_signed
- ]).await;
- if result.is_err() {
- panic!("error: {}", result.err().unwrap());
- }
+ ])).await.unwrap().unwrap();
}
}
}
.truncate(true)
.open(&cache_path)
.unwrap();
- self.network_graph.remove_stale_channels();
+ self.network_graph.remove_stale_channels_and_tracking();
let mut writer = BufWriter::new(file);
self.network_graph.write(&mut writer).unwrap();
writer.flush().unwrap();