use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;
-pub(crate) const SCHEMA_VERSION: i32 = 14;
+pub(crate) const SCHEMA_VERSION: i32 = 15;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
+ if schema >= 1 && schema <= 14 {
+ client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap();
+ }
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
counter.channel_announcements += 1;
}
- let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
+ let mut funding_amount_sats = self.verifier.get_cached_funding_value(msg.contents.short_channel_id);
+ if funding_amount_sats.is_none() {
+ tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async {
+ funding_amount_sats = self.verifier.retrieve_funding_value(msg.contents.short_channel_id).await.ok();
+ })});
+ }
+ let funding_amount_sats = funding_amount_sats
+ .expect("If we've accepted a ChannelAnnouncement, we must be able to fetch the TXO for it");
+
+ let gossip_message = GossipMessage::ChannelAnnouncement(msg, funding_amount_sats, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
#[cfg(test)]
tasks_spawned.push(_task);
},
- GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
+ GossipMessage::ChannelAnnouncement(announcement, funding_value, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;
// start with the type prefix, which is already known a priori
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed, \
+ funding_amount_sats, \
seen \
- ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
+ ) VALUES ($1, $2, $3, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed,
+ &(funding_value as i64),
&(seen_override.unwrap() as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
+ funding_amount_sats, \
announcement_signed \
- ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
+ ) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
+ &(funding_value as i64),
&announcement_signed
])).await.unwrap().unwrap();
}
pub(crate) enum GossipMessage {
NodeAnnouncement(NodeAnnouncement, Option<u32>),
// the second element is an optional override for the seen value
- ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
+ ChannelAnnouncement(ChannelAnnouncement, u64, Option<u32>),
ChannelUpdate(ChannelUpdate, Option<u32>),
}
+use std::collections::HashMap;
use std::io::ErrorKind;
use std::ops::Deref;
use std::sync::Arc;
graph: Arc<NetworkGraph<L>>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
peer_handler: Mutex<Option<GossipPeerManager<L>>>,
+ scid_value: Arc<Mutex<HashMap<u64, u64>>>,
logger: L
}
outbound_gossiper,
graph,
peer_handler: Mutex::new(None),
- logger
+ scid_value: Arc::new(Mutex::new(HashMap::new())),
+ logger,
}
}
pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
*self.peer_handler.lock().unwrap() = Some(peer_handler);
}
- async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
+ pub(crate) fn get_cached_funding_value(&self, scid: u64) -> Option<u64> {
+ self.scid_value.lock().unwrap().get(&scid).map(|v| *v)
+ }
+
+ pub(crate) async fn retrieve_funding_value(&self, scid: u64) -> Result<u64, UtxoLookupError> {
+ Self::retrieve_cache_txo(Arc::clone(&self.rest_client), Some(Arc::clone(&self.scid_value)), scid, self.logger.clone())
+ .await.map(|txo| txo.value.to_sat())
+ }
+
+ async fn retrieve_cache_txo(client: Arc<RestClient>, scid_value: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;
log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.compute_txid());
return Err(UtxoLookupError::UnknownTx);
}
- Ok(transaction.output.swap_remove(output_index as usize))
+ let txo = transaction.output.swap_remove(output_index as usize);
+ if let Some(scid_value) = scid_value {
+ scid_value.lock().unwrap().insert(short_channel_id, txo.value.to_sat());
+ }
+ Ok(txo)
}
async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
let graph_ref = Arc::clone(&self.graph);
let client_ref = Arc::clone(&self.rest_client);
let gossip_ref = Arc::clone(&self.outbound_gossiper);
+ let scid_value_cache_ref = Arc::clone(&self.scid_value);
let pm_ref = self.peer_handler.lock().unwrap().clone();
let logger_ref = self.logger.clone();
tokio::spawn(async move {
- let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
+ let res = Self::retrieve_cache_txo(client_ref, Some(scid_value_cache_ref), short_channel_id, logger_ref).await;
fut.resolve(&*graph_ref, &*gossip_ref, res);
if let Some(pm) = pm_ref { pm.process_events(); }
});