f638894dd3590c9d0c583452c7e03aba8275cb15
[rapid-gossip-sync-server] / src / persistence.rs
1 use std::fs::OpenOptions;
2 use std::io::{BufWriter, Write};
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Duration, Instant};
6 use lightning::log_info;
7 use lightning::routing::gossip::NetworkGraph;
8 use lightning::util::logger::Logger;
9 use lightning::util::ser::Writeable;
10 use tokio::sync::mpsc;
11 use tokio_postgres::NoTls;
12
13 use crate::config;
14 use crate::types::GossipMessage;
15
16 const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
17
18 pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
19         gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
20         network_graph: Arc<NetworkGraph<L>>,
21         logger: L
22 }
23
24 impl<L: Deref> GossipPersister<L> where L::Target: Logger {
25         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
26                 let (gossip_persistence_sender, gossip_persistence_receiver) =
27                         mpsc::channel::<GossipMessage>(100);
28                 (GossipPersister {
29                         gossip_persistence_receiver,
30                         network_graph,
31                         logger
32                 }, gossip_persistence_sender)
33         }
34
35         pub(crate) async fn persist_gossip(&mut self) {
36                 let connection_config = config::db_connection_config();
37                 let (mut client, connection) =
38                         connection_config.connect(NoTls).await.unwrap();
39
40                 tokio::spawn(async move {
41                         if let Err(e) = connection.await {
42                                 panic!("connection error: {}", e);
43                         }
44                 });
45
46                 {
47                         // initialize the database
48                         let initialization = client
49                                 .execute(config::db_config_table_creation_query(), &[])
50                                 .await;
51                         if let Err(initialization_error) = initialization {
52                                 panic!("db init error: {}", initialization_error);
53                         }
54
55                         let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
56                         if !cur_schema.is_empty() {
57                                 config::upgrade_db(cur_schema[0].get(0), &mut client).await;
58                         }
59
60                         let initialization = client
61                                 .execute(
62                                         // TODO: figure out a way to fix the id value without Postgres complaining about
63                                         // its value not being default
64                                         "INSERT INTO config (id, db_schema) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING",
65                                         &[&1, &config::SCHEMA_VERSION]
66                                 ).await;
67                         if let Err(initialization_error) = initialization {
68                                 panic!("db init error: {}", initialization_error);
69                         }
70
71                         let initialization = client
72                                 .execute(config::db_announcement_table_creation_query(), &[])
73                                 .await;
74                         if let Err(initialization_error) = initialization {
75                                 panic!("db init error: {}", initialization_error);
76                         }
77
78                         let initialization = client
79                                 .execute(
80                                         config::db_channel_update_table_creation_query(),
81                                         &[],
82                                 )
83                                 .await;
84                         if let Err(initialization_error) = initialization {
85                                 panic!("db init error: {}", initialization_error);
86                         }
87
88                         let initialization = client
89                                 .batch_execute(config::db_index_creation_query())
90                                 .await;
91                         if let Err(initialization_error) = initialization {
92                                 panic!("db init error: {}", initialization_error);
93                         }
94                 }
95
96                 // print log statement every minute
97                 let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
98                 let mut i = 0u32;
99                 let mut latest_graph_cache_time = Instant::now();
100                 // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
101                 // inactivity, some sort of message could be broadcast signaling the activation of request
102                 // processing
103                 while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
104                         i += 1; // count the persisted gossip messages
105
106                         if latest_persistence_log.elapsed().as_secs() >= 60 {
107                                 log_info!(self.logger, "Persisting gossip message #{}", i);
108                                 latest_persistence_log = Instant::now();
109                         }
110
111                         // has it been ten minutes? Just cache it
112                         if latest_graph_cache_time.elapsed().as_secs() >= 600 {
113                                 self.persist_network_graph();
114                                 latest_graph_cache_time = Instant::now();
115                         }
116
117                         match &gossip_message {
118                                 GossipMessage::ChannelAnnouncement(announcement) => {
119                                         let scid = announcement.contents.short_channel_id as i64;
120
121                                         // start with the type prefix, which is already known a priori
122                                         let mut announcement_signed = Vec::new();
123                                         announcement.write(&mut announcement_signed).unwrap();
124
125                                         tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
126                                                 .execute("INSERT INTO channel_announcements (\
127                                                         short_channel_id, \
128                                                         announcement_signed \
129                                                 ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
130                                                         &scid,
131                                                         &announcement_signed
132                                                 ])).await.unwrap().unwrap();
133                                 }
134                                 GossipMessage::ChannelUpdate(update) => {
135                                         let scid = update.contents.short_channel_id as i64;
136
137                                         let timestamp = update.contents.timestamp as i64;
138
139                                         let direction = (update.contents.flags & 1) == 1;
140                                         let disable = (update.contents.flags & 2) > 0;
141
142                                         let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32;
143                                         let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64;
144                                         let fee_base_msat = update.contents.fee_base_msat as i32;
145                                         let fee_proportional_millionths =
146                                                 update.contents.fee_proportional_millionths as i32;
147                                         let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64;
148
149                                         // start with the type prefix, which is already known a priori
150                                         let mut update_signed = Vec::new();
151                                         update.write(&mut update_signed).unwrap();
152
153                                         tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
154                                                 .execute("INSERT INTO channel_updates (\
155                                                         short_channel_id, \
156                                                         timestamp, \
157                                                         channel_flags, \
158                                                         direction, \
159                                                         disable, \
160                                                         cltv_expiry_delta, \
161                                                         htlc_minimum_msat, \
162                                                         fee_base_msat, \
163                                                         fee_proportional_millionths, \
164                                                         htlc_maximum_msat, \
165                                                         blob_signed \
166                                                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)  ON CONFLICT DO NOTHING", &[
167                                                         &scid,
168                                                         &timestamp,
169                                                         &(update.contents.flags as i16),
170                                                         &direction,
171                                                         &disable,
172                                                         &cltv_expiry_delta,
173                                                         &htlc_minimum_msat,
174                                                         &fee_base_msat,
175                                                         &fee_proportional_millionths,
176                                                         &htlc_maximum_msat,
177                                                         &update_signed
178                                                 ])).await.unwrap().unwrap();
179                                 }
180                         }
181                 }
182         }
183
184         fn persist_network_graph(&self) {
185                 log_info!(self.logger, "Caching network graph…");
186                 let cache_path = config::network_graph_cache_path();
187                 let file = OpenOptions::new()
188                         .create(true)
189                         .write(true)
190                         .truncate(true)
191                         .open(&cache_path)
192                         .unwrap();
193                 self.network_graph.remove_stale_channels_and_tracking();
194                 let mut writer = BufWriter::new(file);
195                 self.network_graph.write(&mut writer).unwrap();
196                 writer.flush().unwrap();
197                 log_info!(self.logger, "Cached network graph!");
198         }
199 }