Require DB insertions to complete in fifteen seconds
[rapid-gossip-sync-server] / src / persistence.rs
1 use std::fs::OpenOptions;
2 use std::io::{BufWriter, Write};
3 use std::sync::Arc;
4 use std::time::{Duration, Instant};
5 use lightning::routing::gossip::NetworkGraph;
6 use lightning::util::ser::Writeable;
7 use tokio::sync::mpsc;
8 use tokio_postgres::NoTls;
9
10 use crate::{config, TestLogger};
11 use crate::types::GossipMessage;
12
13 const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
14
15 pub(crate) struct GossipPersister {
16         gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
17         network_graph: Arc<NetworkGraph<TestLogger>>,
18 }
19
20 impl GossipPersister {
21         pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
22                 let (gossip_persistence_sender, gossip_persistence_receiver) =
23                         mpsc::channel::<GossipMessage>(100);
24                 (GossipPersister {
25                         gossip_persistence_receiver,
26                         network_graph
27                 }, gossip_persistence_sender)
28         }
29
30         pub(crate) async fn persist_gossip(&mut self) {
31                 let connection_config = config::db_connection_config();
32                 let (mut client, connection) =
33                         connection_config.connect(NoTls).await.unwrap();
34
35                 tokio::spawn(async move {
36                         if let Err(e) = connection.await {
37                                 panic!("connection error: {}", e);
38                         }
39                 });
40
41                 {
42                         // initialize the database
43                         let initialization = client
44                                 .execute(config::db_config_table_creation_query(), &[])
45                                 .await;
46                         if let Err(initialization_error) = initialization {
47                                 panic!("db init error: {}", initialization_error);
48                         }
49
50                         let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
51                         if !cur_schema.is_empty() {
52                                 config::upgrade_db(cur_schema[0].get(0), &mut client).await;
53                         }
54
55                         let initialization = client
56                                 .execute(
57                                         // TODO: figure out a way to fix the id value without Postgres complaining about
58                                         // its value not being default
59                                         "INSERT INTO config (id, db_schema) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING",
60                                         &[&1, &config::SCHEMA_VERSION]
61                                 ).await;
62                         if let Err(initialization_error) = initialization {
63                                 panic!("db init error: {}", initialization_error);
64                         }
65
66                         let initialization = client
67                                 .execute(config::db_announcement_table_creation_query(), &[])
68                                 .await;
69                         if let Err(initialization_error) = initialization {
70                                 panic!("db init error: {}", initialization_error);
71                         }
72
73                         let initialization = client
74                                 .execute(
75                                         config::db_channel_update_table_creation_query(),
76                                         &[],
77                                 )
78                                 .await;
79                         if let Err(initialization_error) = initialization {
80                                 panic!("db init error: {}", initialization_error);
81                         }
82
83                         let initialization = client
84                                 .batch_execute(config::db_index_creation_query())
85                                 .await;
86                         if let Err(initialization_error) = initialization {
87                                 panic!("db init error: {}", initialization_error);
88                         }
89                 }
90
91                 // print log statement every minute
92                 let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
93                 let mut i = 0u32;
94                 let mut latest_graph_cache_time = Instant::now();
95                 // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
96                 // inactivity, some sort of message could be broadcast signaling the activation of request
97                 // processing
98                 while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
99                         i += 1; // count the persisted gossip messages
100
101                         if latest_persistence_log.elapsed().as_secs() >= 60 {
102                                 println!("Persisting gossip message #{}", i);
103                                 latest_persistence_log = Instant::now();
104                         }
105
106                         // has it been ten minutes? Just cache it
107                         if latest_graph_cache_time.elapsed().as_secs() >= 600 {
108                                 self.persist_network_graph();
109                                 latest_graph_cache_time = Instant::now();
110                         }
111
112                         match &gossip_message {
113                                 GossipMessage::ChannelAnnouncement(announcement) => {
114                                         let scid = announcement.contents.short_channel_id as i64;
115
116                                         // start with the type prefix, which is already known a priori
117                                         let mut announcement_signed = Vec::new();
118                                         announcement.write(&mut announcement_signed).unwrap();
119
120                                         tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
121                                                 .execute("INSERT INTO channel_announcements (\
122                                                         short_channel_id, \
123                                                         announcement_signed \
124                                                 ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
125                                                         &scid,
126                                                         &announcement_signed
127                                                 ])).await.unwrap().unwrap();
128                                 }
129                                 GossipMessage::ChannelUpdate(update) => {
130                                         let scid = update.contents.short_channel_id as i64;
131
132                                         let timestamp = update.contents.timestamp as i64;
133
134                                         let direction = (update.contents.flags & 1) == 1;
135                                         let disable = (update.contents.flags & 2) > 0;
136
137                                         let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32;
138                                         let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64;
139                                         let fee_base_msat = update.contents.fee_base_msat as i32;
140                                         let fee_proportional_millionths =
141                                                 update.contents.fee_proportional_millionths as i32;
142                                         let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64;
143
144                                         // start with the type prefix, which is already known a priori
145                                         let mut update_signed = Vec::new();
146                                         update.write(&mut update_signed).unwrap();
147
148                                         tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
149                                                 .execute("INSERT INTO channel_updates (\
150                                                         short_channel_id, \
151                                                         timestamp, \
152                                                         channel_flags, \
153                                                         direction, \
154                                                         disable, \
155                                                         cltv_expiry_delta, \
156                                                         htlc_minimum_msat, \
157                                                         fee_base_msat, \
158                                                         fee_proportional_millionths, \
159                                                         htlc_maximum_msat, \
160                                                         blob_signed \
161                                                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)  ON CONFLICT DO NOTHING", &[
162                                                         &scid,
163                                                         &timestamp,
164                                                         &(update.contents.flags as i16),
165                                                         &direction,
166                                                         &disable,
167                                                         &cltv_expiry_delta,
168                                                         &htlc_minimum_msat,
169                                                         &fee_base_msat,
170                                                         &fee_proportional_millionths,
171                                                         &htlc_maximum_msat,
172                                                         &update_signed
173                                                 ])).await.unwrap().unwrap();
174                                 }
175                         }
176                 }
177         }
178
179         fn persist_network_graph(&self) {
180                 println!("Caching network graph…");
181                 let cache_path = config::network_graph_cache_path();
182                 let file = OpenOptions::new()
183                         .create(true)
184                         .write(true)
185                         .truncate(true)
186                         .open(&cache_path)
187                         .unwrap();
188                 self.network_graph.remove_stale_channels_and_tracking();
189                 let mut writer = BufWriter::new(file);
190                 self.network_graph.write(&mut writer).unwrap();
191                 writer.flush().unwrap();
192                 println!("Cached network graph!");
193         }
194 }