--- /dev/null
+use serde_json;
+
+use serde_derive::Deserialize;
+
+use crate::utils::hex_to_uint256;
+use crate::{BlockHeaderData, BlockSource, BlockSourceRespErr};
+
+use bitcoin::hashes::hex::{ToHex, FromHex};
+use bitcoin::hash_types::{BlockHash, TxMerkleNode};
+
+use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::consensus::encode;
+
+use std::convert::TryInto;
+use std::cmp;
+use std::future::Future;
+use std::pin::Pin;
+use std::net::ToSocketAddrs;
+use std::io::Write;
+use std::time::Duration;
+
+#[cfg(feature = "rpc-client")]
+use crate::utils::hex_to_vec;
+#[cfg(feature = "rpc-client")]
+use std::sync::atomic::{AtomicUsize, Ordering};
+#[cfg(feature = "rpc-client")]
+use base64;
+
+#[cfg(feature = "tokio")]
+use tokio::net::TcpStream;
+#[cfg(feature = "tokio")]
+use tokio::io::AsyncReadExt;
+
+#[cfg(not(feature = "tokio"))]
+use std::net::TcpStream;
+#[cfg(not(feature = "tokio"))]
+use std::io::Read;
+
+/// Splits an HTTP URI into its component parts - (is_ssl, hostname, port number, and HTTP path)
+fn split_uri<'a>(uri: &'a str) -> Option<(bool, &'a str, u16, &'a str)> {
+ let mut uri_iter = uri.splitn(2, ":");
+ let ssl = match uri_iter.next() {
+ Some("http") => false,
+ Some("https") => true,
+ _ => return None,
+ };
+ let mut host_path = match uri_iter.next() {
+ Some(r) => r,
+ None => return None,
+ };
+ host_path = host_path.trim_start_matches("/");
+ let mut host_path_iter = host_path.splitn(2, "/");
+ let (host_port_len, host, port) = match host_path_iter.next() {
+ Some(r) if !r.is_empty() => {
+ let is_v6_explicit = r.starts_with("[");
+ let mut iter = if is_v6_explicit {
+ r[1..].splitn(2, "]")
+ } else {
+ r.splitn(2, ":")
+ };
+ (r.len(), match iter.next() {
+ Some(host) => host,
+ None => return None,
+ }, match iter.next() {
+ Some(port) if !is_v6_explicit || !port.is_empty() => match if is_v6_explicit {
+ if port.as_bytes()[0] != ':' as u8 { return None; }
+ &port[1..]
+ } else { port }
+ .parse::<u16>() {
+ Ok(p) => p,
+ Err(_) => return None,
+ },
+ _ => if ssl { 443 } else { 80 },
+ })
+ },
+ _ => return None,
+ };
+ let path = &host_path[host_port_len..];
+
+ Some((ssl, host, port, path))
+}
+
+async fn read_http_resp(mut socket: TcpStream, max_resp: usize) -> Option<Vec<u8>> {
+ let mut resp = Vec::new();
+ let mut bytes_read = 0;
+ macro_rules! read_socket { () => { {
+ #[cfg(feature = "tokio")]
+ let res = socket.read(&mut resp[bytes_read..]).await;
+ #[cfg(not(feature = "tokio"))]
+ let res = socket.read(&mut resp[bytes_read..]);
+ match res {
+ Ok(0) => return None,
+ Ok(b) => b,
+ Err(_) => return None,
+ }
+ } } }
+
+ let mut actual_len = 0;
+ let mut ok_found = false;
+ let mut chunked = false;
+ // We expect the HTTP headers to fit in 8KB, and use resp as a temporary buffer for headers
+ // until we know our real length.
+ resp.extend_from_slice(&[0; 8192]);
+ 'read_headers: loop {
+ if bytes_read >= 8192 { return None; }
+ bytes_read += read_socket!();
+ for line in resp[..bytes_read].split(|c| *c == '\n' as u8 || *c == '\r' as u8) {
+ let content_header = b"Content-Length: ";
+ if line.len() > content_header.len() && line[..content_header.len()].eq_ignore_ascii_case(content_header) {
+ actual_len = match match std::str::from_utf8(&line[content_header.len()..]){
+ Ok(s) => s, Err(_) => return None,
+ }.parse() {
+ Ok(len) => len, Err(_) => return None,
+ };
+ }
+ let http_resp_1 = b"HTTP/1.1 200 ";
+ let http_resp_0 = b"HTTP/1.0 200 ";
+ if line.len() > http_resp_1.len() && (line[..http_resp_1.len()].eq_ignore_ascii_case(http_resp_1) ||
+ line[..http_resp_0.len()].eq_ignore_ascii_case(http_resp_0)) {
+ ok_found = true;
+ }
+ let transfer_encoding = b"Transfer-Encoding: ";
+ if line.len() > transfer_encoding.len() && line[..transfer_encoding.len()].eq_ignore_ascii_case(transfer_encoding) {
+ match &*String::from_utf8_lossy(&line[transfer_encoding.len()..]).to_ascii_lowercase() {
+ "chunked" => chunked = true,
+ _ => return None, // Unsupported
+ }
+ }
+ }
+ for (idx, window) in resp[..bytes_read].windows(4).enumerate() {
+ if window[0..2] == *b"\n\n" || window[0..2] == *b"\r\r" {
+ resp = resp.split_off(idx + 2);
+ resp.resize(bytes_read - idx - 2, 0);
+ break 'read_headers;
+ } else if window[0..4] == *b"\r\n\r\n" {
+ resp = resp.split_off(idx + 4);
+ resp.resize(bytes_read - idx - 4, 0);
+ break 'read_headers;
+ }
+ }
+ }
+ if !ok_found || (!chunked && (actual_len == 0 || actual_len > max_resp)) { return None; } // Sorry, not implemented
+ bytes_read = resp.len();
+ if !chunked {
+ resp.resize(actual_len, 0);
+ while bytes_read < actual_len {
+ bytes_read += read_socket!();
+ }
+ Some(resp)
+ } else {
+ actual_len = 0;
+ let mut chunk_remaining = 0;
+ 'read_bytes: loop {
+ if chunk_remaining == 0 {
+ let mut bytes_skipped = 0;
+ let mut finished_read = false;
+ let mut lineiter = resp[actual_len..bytes_read].split(|c| *c == '\n' as u8 || *c == '\r' as u8).peekable();
+ loop {
+ let line = match lineiter.next() { Some(line) => line, None => break };
+ if lineiter.peek().is_none() { // We haven't yet read to the end of this line
+ if line.len() > 8 {
+ // No reason to ever have a chunk length line longer than 4 chars
+ return None;
+ }
+ break;
+ }
+ bytes_skipped += line.len() + 1;
+ if line.len() == 0 { continue; } // Probably between the \r and \n
+ match usize::from_str_radix(&match std::str::from_utf8(line) {
+ Ok(s) => s, Err(_) => return None,
+ }, 16) {
+ Ok(chunklen) => {
+ if chunklen == 0 { finished_read = true; }
+ chunk_remaining = chunklen;
+ match lineiter.next() {
+ Some(l) if l.is_empty() => {
+ // Drop \r after \n
+ bytes_skipped += 1;
+ if actual_len + bytes_skipped > bytes_read {
+ // Go back and get more bytes so we can skip trailing \n
+ chunk_remaining = 0;
+ }
+ },
+ Some(_) => {},
+ None => {
+ // Go back and get more bytes so we can skip trailing \n
+ chunk_remaining = 0;
+ },
+ }
+ break;
+ },
+ Err(_) => return None,
+ }
+ }
+ if chunk_remaining != 0 {
+ bytes_read -= bytes_skipped;
+ resp.drain(actual_len..actual_len + bytes_skipped);
+ if actual_len + chunk_remaining > max_resp { return None; }
+ let already_in_chunk = cmp::min(bytes_read - actual_len, chunk_remaining);
+ actual_len += already_in_chunk;
+ chunk_remaining -= already_in_chunk;
+ continue 'read_bytes;
+ } else {
+ if finished_read {
+ // Note that we may leave some extra \r\ns to be read, but that's OK,
+ // we'll ignore then when parsing headers for the next request.
+ resp.resize(actual_len, 0);
+ return Some(resp);
+ } else {
+ // Need to read more bytes to figure out chunk length
+ }
+ }
+ }
+ resp.resize(bytes_read + cmp::max(10, chunk_remaining), 0);
+ let avail = read_socket!();
+ bytes_read += avail;
+ if chunk_remaining != 0 {
+ let chunk_read = cmp::min(chunk_remaining, avail);
+ chunk_remaining -= chunk_read;
+ actual_len += chunk_read;
+ }
+ }
+ }
+}
+
+#[cfg(feature = "rest-client")]
+pub struct RESTClient {
+ uri: String,
+}
+
+#[cfg(feature = "rest-client")]
+impl RESTClient {
+ pub fn new(uri: String) -> Option<Self> {
+ match split_uri(&uri) {
+ Some((ssl, _host, _port, _path)) if !ssl => Some(Self { uri }),
+ _ => None,
+ }
+ }
+
+ async fn make_raw_rest_call(&self, req_path: &str) -> Result<Vec<u8>, ()> {
+ let (ssl, host, port, path) = split_uri(&self.uri).unwrap();
+ if ssl { unreachable!(); }
+
+ let mut stream = match std::net::TcpStream::connect_timeout(&match (host, port).to_socket_addrs() {
+ Ok(mut sockaddrs) => match sockaddrs.next() { Some(sockaddr) => sockaddr, None => return Err(()) },
+ Err(_) => return Err(()),
+ }, Duration::from_secs(1)) {
+ Ok(stream) => stream,
+ Err(_) => return Err(()),
+ };
+ stream.set_write_timeout(Some(Duration::from_secs(1))).expect("Host kernel is uselessly old?");
+ stream.set_read_timeout(Some(Duration::from_secs(2))).expect("Host kernel is uselessly old?");
+
+ let req = format!("GET {}/{} HTTP/1.1\nHost: {}\nConnection: keep-alive\n\n", path, req_path, host);
+ match stream.write(req.as_bytes()) {
+ Ok(len) if len == req.len() => {},
+ _ => return Err(()),
+ }
+ #[cfg(feature = "tokio")]
+ let stream = TcpStream::from_std(stream).unwrap();
+ match read_http_resp(stream, 4_000_000).await {
+ Some(r) => Ok(r),
+ None => return Err(()),
+ }
+ }
+
+ async fn make_rest_call(&self, req_path: &str) -> Result<serde_json::Value, ()> {
+ let resp = self.make_raw_rest_call(req_path).await?;
+ let v: serde_json::Value = match serde_json::from_slice(&resp[..]) {
+ Ok(v) => v,
+ Err(_) => return Err(()),
+ };
+ if !v.is_object() {
+ return Err(());
+ }
+ Ok(v)
+ }
+}
+
+#[cfg(feature = "rpc-client")]
+pub struct RPCClient {
+ basic_auth: String,
+ uri: String,
+ id: AtomicUsize,
+}
+
+#[cfg(feature = "rpc-client")]
+impl RPCClient {
+ pub fn new(user_auth: &str, uri: String) -> Option<Self> {
+ match split_uri(&uri) {
+ Some((ssl, _host, _port, _path)) if !ssl => {
+ Some(Self {
+ basic_auth: "Basic ".to_string() + &base64::encode(user_auth),
+ uri,
+ id: AtomicUsize::new(0),
+ })
+ },
+ _ => None,
+ }
+ }
+
+ /// params entries must be pre-quoted if appropriate
+ async fn make_rpc_call(&self, method: &str, params: &[&str]) -> Result<serde_json::Value, ()> {
+ let (ssl, host, port, path) = split_uri(&self.uri).unwrap();
+ if ssl { unreachable!(); }
+
+ let mut stream = match std::net::TcpStream::connect_timeout(&match (host, port).to_socket_addrs() {
+ Ok(mut sockaddrs) => match sockaddrs.next() { Some(sockaddr) => sockaddr, None => return Err(()) },
+ Err(_) => return Err(()),
+ }, Duration::from_secs(1)) {
+ Ok(stream) => stream,
+ Err(_) => return Err(()),
+ };
+ stream.set_write_timeout(Some(Duration::from_secs(1))).expect("Host kernel is uselessly old?");
+ stream.set_read_timeout(Some(Duration::from_secs(2))).expect("Host kernel is uselessly old?");
+
+ let mut param_str = String::new();
+ for (idx, param) in params.iter().enumerate() {
+ param_str += param;
+ if idx != params.len() - 1 {
+ param_str += ",";
+ }
+ }
+ let req = "{\"method\":\"".to_string() + method + "\",\"params\":[" + ¶m_str + "],\"id\":" + &self.id.fetch_add(1, Ordering::AcqRel).to_string() + "}";
+
+ let req = format!("POST {} HTTP/1.1\r\nHost: {}\r\nAuthorization: {}\r\nConnection: keep-alive\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", path, host, &self.basic_auth, req.len(), req);
+ match stream.write(req.as_bytes()) {
+ Ok(len) if len == req.len() => {},
+ _ => return Err(()),
+ }
+ #[cfg(feature = "tokio")]
+ let stream = TcpStream::from_std(stream).unwrap();
+ let resp = match read_http_resp(stream, 4_000_000).await {
+ Some(r) => r,
+ None => return Err(()),
+ };
+
+ let v: serde_json::Value = match serde_json::from_slice(&resp[..]) {
+ Ok(v) => v,
+ Err(_) => return Err(()),
+ };
+ if !v.is_object() {
+ return Err(());
+ }
+ let v_obj = v.as_object().unwrap();
+ if v_obj.get("error") != Some(&serde_json::Value::Null) {
+ return Err(());
+ }
+ if let Some(res) = v_obj.get("result") {
+ Ok((*res).clone())
+ } else {
+ Err(())
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct GetHeaderResponse {
+ pub chainwork: String,
+ pub height: u32,
+
+ pub version: u32,
+ pub merkleroot: String,
+ pub time: u32,
+ pub nonce: u32,
+ pub bits: String,
+ pub previousblockhash: String,
+}
+
+impl GetHeaderResponse {
+ /// Always returns BogusData if we return an Err
+ pub fn to_block_header(self) -> Result<BlockHeaderData, BlockSourceRespErr> {
+ let header = BlockHeader {
+ version: self.version,
+ prev_blockhash: BlockHash::from_hex(&self.previousblockhash).map_err(|_| BlockSourceRespErr::BogusData)?,
+ merkle_root: TxMerkleNode::from_hex(&self.merkleroot).map_err(|_| BlockSourceRespErr::BogusData)?,
+ time: self.time,
+ bits: u32::from_str_radix(&self.bits, 16).map_err(|_| BlockSourceRespErr::BogusData)?,
+ nonce: self.nonce,
+ };
+
+ Ok(BlockHeaderData {
+ chainwork: hex_to_uint256(&self.chainwork).ok_or(BlockSourceRespErr::BogusData)?,
+ height: self.height,
+ header,
+ })
+ }
+}
+
+#[cfg(feature = "rpc-client")]
+impl BlockSource for RPCClient {
+ fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>> {
+ let param = "\"".to_string() + &header_hash.to_hex() + "\"";
+ Box::pin(async move {
+ let res = self.make_rpc_call("getblockheader", &[¶m]).await;
+ if let Ok(mut v) = res {
+ if v.is_object() {
+ if let None = v.get("previousblockhash") {
+ // Got a request for genesis block, add a dummy previousblockhash
+ v.as_object_mut().unwrap().insert("previousblockhash".to_string(), serde_json::Value::String("".to_string()));
+ }
+ }
+ let deser_res: Result<GetHeaderResponse, _> = serde_json::from_value(v);
+ match deser_res {
+ Ok(resp) => resp.to_block_header(),
+ Err(_) => Err(BlockSourceRespErr::NoResponse),
+ }
+ } else { Err(BlockSourceRespErr::NoResponse) }
+ })
+ }
+
+ fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>> {
+ let param = "\"".to_string() + &header_hash.to_hex() + "\"";
+ Box::pin(async move {
+ let blockhex = self.make_rpc_call("getblock", &[¶m, "0"]).await.map_err(|_| BlockSourceRespErr::NoResponse)?;
+ let blockdata = hex_to_vec(blockhex.as_str().ok_or(BlockSourceRespErr::NoResponse)?).ok_or(BlockSourceRespErr::NoResponse)?;
+ let block: Block = encode::deserialize(&blockdata).map_err(|_| BlockSourceRespErr::NoResponse)?;
+ Ok(block)
+ })
+ }
+
+ fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>> {
+ Box::pin(async move {
+ if let Ok(v) = self.make_rpc_call("getblockchaininfo", &[]).await {
+ let height = v["blocks"].as_u64().ok_or(BlockSourceRespErr::NoResponse)?
+ .try_into().map_err(|_| BlockSourceRespErr::NoResponse)?;
+ let blockstr = v["bestblockhash"].as_str().ok_or(BlockSourceRespErr::NoResponse)?;
+ Ok((BlockHash::from_hex(blockstr).map_err(|_| BlockSourceRespErr::NoResponse)?, Some(height)))
+ } else { Err(BlockSourceRespErr::NoResponse) }
+ })
+ }
+}
+
+#[cfg(feature = "rest-client")]
+impl BlockSource for RESTClient {
+ fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>> {
+ Box::pin(async move {
+ let reqpath = format!("headers/1/{}.json", header_hash.to_hex());
+ match self.make_rest_call(&reqpath).await {
+ Ok(serde_json::Value::Array(mut v)) if !v.is_empty() => {
+ let mut header = v.drain(..).next().unwrap();
+ if !header.is_object() { return Err(BlockSourceRespErr::NoResponse); }
+ if let None = header.get("previousblockhash") {
+ // Got a request for genesis block, add a dummy previousblockhash
+ header.as_object_mut().unwrap().insert("previousblockhash".to_string(), serde_json::Value::String("".to_string()));
+ }
+ let deser_res: Result<GetHeaderResponse, _> = serde_json::from_value(header);
+ match deser_res {
+ Ok(resp) => resp.to_block_header(),
+ Err(_) => Err(BlockSourceRespErr::NoResponse),
+ }
+ },
+ _ => Err(BlockSourceRespErr::NoResponse)
+ }
+ })
+ }
+
+ fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>> {
+ Box::pin(async move {
+ let reqpath = format!("block/{}.bin", header_hash.to_hex());
+ let blockdata = self.make_raw_rest_call(&reqpath).await.map_err(|_| BlockSourceRespErr::NoResponse)?;
+ let block: Block = encode::deserialize(&blockdata).map_err(|_| BlockSourceRespErr::NoResponse)?;
+ Ok(block)
+ })
+ }
+
+ fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>> {
+ Box::pin(async move {
+ let v = self.make_rest_call("chaininfo.json").await.map_err(|_| BlockSourceRespErr::NoResponse)?;
+ let height = v["blocks"].as_u64().ok_or(BlockSourceRespErr::NoResponse)?
+ .try_into().map_err(|_| BlockSourceRespErr::NoResponse)?;
+ let blockstr = v["bestblockhash"].as_str().ok_or(BlockSourceRespErr::NoResponse)?;
+ Ok((BlockHash::from_hex(blockstr).map_err(|_| BlockSourceRespErr::NoResponse)?, Some(height)))
+ })
+ }
+}
+
+#[cfg(test)]
+#[test]
+fn test_split_uri() {
+ assert_eq!(split_uri("http://example.com:8080/path"), Some((false, "example.com", 8080, "/path")));
+ assert_eq!(split_uri("http:example.com:8080/path/b"), Some((false, "example.com", 8080, "/path/b")));
+ assert_eq!(split_uri("https://0.0.0.0/"), Some((true, "0.0.0.0", 443, "/")));
+ assert_eq!(split_uri("http:[0:bad::43]:80/"), Some((false, "0:bad::43", 80, "/")));
+ assert_eq!(split_uri("http:[::]"), Some((false, "::", 80, "")));
+ assert_eq!(split_uri("http://"), None);
+ assert_eq!(split_uri("http://example.com:70000/"), None);
+ assert_eq!(split_uri("ftp://example.com:80/"), None);
+ assert_eq!(split_uri("http://example.com"), Some((false, "example.com", 80, "")));
+}
--- /dev/null
+//! An implementation of a simple SPV client which can interrogate abstract block sources to keep
+//! lightning objects on the best chain.
+//!
+//! With feature `rpc-client` we provide a client which can fetch blocks from Bitcoin Core's RPC
+//! interface.
+//!
+//! With feature `rest-client` we provide a client which can fetch blocks from Bitcoin Core's REST
+//! interface.
+//!
+//! Both provided clients support either blocking TCP reads from std::net::TcpStream or, with
+//! feature `tokio`, tokio::net::TcpStream inside a Tokio runtime.
+
+#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
+mod utils;
+
+#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
+pub mod http_clients;
+
+use lightning::chain::{chaininterface, keysinterface};
+use lightning::chain::chaininterface::{BlockNotifierArc, ChainListener};
+use lightning::ln::channelmonitor::{ChannelMonitor, ManyChannelMonitor};
+use lightning::ln::channelmanager::SimpleArcChannelManager;
+
+use bitcoin::hashes::hex::ToHex;
+
+use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::util::hash::BitcoinHash;
+use bitcoin::util::uint::Uint256;
+use bitcoin::hash_types::BlockHash;
+
+use std::future::Future;
+use std::vec::Vec;
+use std::pin::Pin;
+use std::ops::DerefMut;
+
+#[derive(Clone, Debug, PartialEq)]
+/// A block header and some associated data. This information should be available from most block
+/// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces).
+pub struct BlockHeaderData {
+ /// The total chain work, in expected number of double-SHA256 hashes required to build a chain
+ /// of equivalent weight
+ pub chainwork: Uint256,
+ /// The block height, with the genesis block heigh set to 0
+ pub height: u32,
+ /// The block header itself
+ pub header: BlockHeader
+}
+
+#[derive(Debug, Clone)]
+/// Failure type for requests to block sources.
+pub enum BlockSourceRespErr {
+ /// Indicates a BlockSource provided bogus data. After this is returned once we will never
+ /// bother polling the returning BlockSource for block data again, so use it sparingly.
+ BogusData,
+ /// Indicates the BlockSource isn't responsive or may be misconfigured but we want to continue
+ /// polling it.
+ NoResponse,
+}
+/// Abstract type for a source of block header and block data.
+pub trait BlockSource : Sync + Send {
+ /// Gets the header for a given hash. The height the header should be at is provided, though
+ /// note that you must return either the header with the requested hash, or an Err, not a
+ /// different header with the same eight.
+ ///
+ /// For sources which cannot find headers based on the hash, returning NoResponse when
+ /// height_hint is None is fine, though get_best_block() should never return a None for height
+ /// on the same source. Such a source should never be used in init_sync_chain_monitor as it
+ /// doesn't have any initial height information.
+ ///
+ /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
+ /// values yet, so we have to Box + dyn the future.
+ fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>>;
+
+ /// Gets the block for a given hash. BlockSources may be headers-only, in which case they
+ /// should always return Err(BlockSourceRespErr::NoResponse) here.
+ /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
+ /// values yet, so we have to Box + dyn the future.
+ fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>>;
+
+ /// Gets the best block hash and, optionally, its height.
+ /// Including the height doesn't impact the chain-scannling algorithm, but it is passed to
+ /// get_header() which may allow some BlockSources to more effeciently find the target header.
+ ///
+ /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
+ /// values yet, so we have to Box + dyn the future.
+ fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>>;
+}
+
+/// Stateless header checks on a given header.
+#[inline]
+fn stateless_check_header(header: &BlockHeader) -> Result<(), BlockSourceRespErr> {
+ if header.validate_pow(&header.target()).is_err() {
+ Err(BlockSourceRespErr::BogusData)
+ } else { Ok(()) }
+}
+
+/// Check that child_header correctly builds on previous_header - the claimed work differential
+/// matches the actual PoW in child_header and the difficulty transition is possible, ie within 4x.
+/// Includes stateless header checks on previous_header.
+fn check_builds_on(child_header: &BlockHeaderData, previous_header: &BlockHeaderData, mainnet: bool) -> Result<(), BlockSourceRespErr> {
+ if child_header.header.prev_blockhash != previous_header.header.bitcoin_hash() {
+ return Err(BlockSourceRespErr::BogusData);
+ }
+
+ stateless_check_header(&previous_header.header)?;
+ let new_work = child_header.header.work();
+ if previous_header.height != child_header.height - 1 ||
+ previous_header.chainwork + new_work != child_header.chainwork {
+ return Err(BlockSourceRespErr::BogusData);
+ }
+ if mainnet {
+ if child_header.height % 2016 == 0 {
+ let prev_work = previous_header.header.work();
+ if new_work > prev_work << 2 || new_work < prev_work >> 2 {
+ return Err(BlockSourceRespErr::BogusData)
+ }
+ } else if child_header.header.bits != previous_header.header.bits {
+ return Err(BlockSourceRespErr::BogusData)
+ }
+ }
+ Ok(())
+}
+
+enum ForkStep {
+ ForkPoint(BlockHeaderData),
+ DisconnectBlock(BlockHeaderData),
+ ConnectBlock(BlockHeaderData),
+}
+fn find_fork_step<'a>(steps_tx: &'a mut Vec<ForkStep>, current_header: BlockHeaderData, prev_header: &'a BlockHeaderData, block_source: &'a mut dyn BlockSource, head_blocks: &'a [BlockHeaderData], mainnet: bool) -> Pin<Box<dyn Future<Output=Result<(), BlockSourceRespErr>> + Send + 'a>> {
+ Box::pin(async move {
+ if prev_header.header.prev_blockhash == current_header.header.prev_blockhash {
+ // Found the fork, get the fork point header and we're done!
+ steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
+ steps_tx.push(ForkStep::ConnectBlock(current_header));
+ if !head_blocks.is_empty() {
+ let new_prev_header = head_blocks.last().unwrap();
+ steps_tx.push(ForkStep::ForkPoint(new_prev_header.clone()));
+ } else {
+ let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
+ check_builds_on(&prev_header, &new_prev_header, mainnet)?;
+ steps_tx.push(ForkStep::ForkPoint(new_prev_header.clone()));
+ }
+ } else if current_header.height == 0 {
+ // We're connect through genesis, we must be on a different chain!
+ return Err(BlockSourceRespErr::BogusData);
+ } else if prev_header.height < current_header.height {
+ if prev_header.height + 1 == current_header.height &&
+ prev_header.header.bitcoin_hash() == current_header.header.prev_blockhash {
+ // Current header is the one above prev_header, we're done!
+ steps_tx.push(ForkStep::ConnectBlock(current_header));
+ } else {
+ // Current is higher than the prev, walk current down by listing blocks we need to
+ // connect
+ let new_cur_header = block_source.get_header(¤t_header.header.prev_blockhash, Some(current_header.height - 1)).await?;
+ check_builds_on(¤t_header, &new_cur_header, mainnet)?;
+ steps_tx.push(ForkStep::ConnectBlock(current_header));
+ find_fork_step(steps_tx, new_cur_header, prev_header, block_source, head_blocks, mainnet).await?;
+ }
+ } else if prev_header.height > current_header.height {
+ // Previous is higher, walk it back and recurse
+ steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
+ if !head_blocks.is_empty() {
+ let new_prev_header = head_blocks.last().unwrap();
+ let new_head_blocks = &head_blocks[..head_blocks.len() - 1];
+ find_fork_step(steps_tx, current_header, new_prev_header, block_source, new_head_blocks, mainnet).await?;
+ } else {
+ let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
+ check_builds_on(&prev_header, &new_prev_header, mainnet)?;
+ find_fork_step(steps_tx, current_header, &new_prev_header, block_source, head_blocks, mainnet).await?;
+ }
+ } else {
+ // Target and current are at the same height, but we're not at fork yet, walk
+ // both back and recurse
+ let new_cur_header = block_source.get_header(¤t_header.header.prev_blockhash, Some(current_header.height - 1)).await?;
+ check_builds_on(¤t_header, &new_cur_header, mainnet)?;
+ steps_tx.push(ForkStep::ConnectBlock(current_header));
+ steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
+ if !head_blocks.is_empty() {
+ let new_prev_header = head_blocks.last().unwrap();
+ let new_head_blocks = &head_blocks[..head_blocks.len() - 1];
+ find_fork_step(steps_tx, new_cur_header, new_prev_header, block_source, new_head_blocks, mainnet).await?;
+ } else {
+ let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
+ check_builds_on(&prev_header, &new_prev_header, mainnet)?;
+ find_fork_step(steps_tx, new_cur_header, &new_prev_header, block_source, head_blocks, mainnet).await?;
+ }
+ }
+ Ok(())
+ })
+}
+/// Walks backwards from current_header and prev_header finding the fork and sending ForkStep events
+/// into the steps_tx Sender. There is no ordering guarantee between different ForkStep types, but
+/// DisconnectBlock and ConnectBlock events are each in reverse, height-descending order.
+async fn find_fork<'a>(current_header: BlockHeaderData, prev_header: &'a BlockHeaderData, block_source: &'a mut dyn BlockSource, mut head_blocks: &'a [BlockHeaderData], mainnet: bool) -> Result<Vec<ForkStep>, BlockSourceRespErr> {
+ let mut steps_tx = Vec::new();
+ if current_header.header == prev_header.header { return Ok(steps_tx); }
+
+ // If we have cached headers, they have to end with where we used to be
+ head_blocks = if !head_blocks.is_empty() {
+ assert_eq!(head_blocks.last().unwrap(), prev_header);
+ &head_blocks[..head_blocks.len() - 1]
+ } else { head_blocks };
+
+ find_fork_step(&mut steps_tx, current_header, &prev_header, block_source, head_blocks, mainnet).await?;
+ Ok(steps_tx)
+}
+
+/// A dummy trait for capturing an object which wants the chain to be replayed.
+/// Implemented for lightning BlockNotifiers for general use, as well as
+/// ChannelManagers and ChannelMonitors to allow for easy replaying of chain
+/// data upon deserialization.
+pub trait AChainListener {
+ fn a_block_connected(&mut self, block: &Block, height: u32);
+ fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32);
+}
+
+impl AChainListener for &BlockNotifierArc {
+ fn a_block_connected(&mut self, block: &Block, height: u32) {
+ self.block_connected(block, height);
+ }
+ fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
+ self.block_disconnected(header, height);
+ }
+}
+
+impl<M, B, F> AChainListener for &SimpleArcChannelManager<M, B, F>
+ where M: ManyChannelMonitor<keysinterface::InMemoryChannelKeys>,
+ B: chaininterface::BroadcasterInterface, F: chaininterface::FeeEstimator {
+ fn a_block_connected(&mut self, block: &Block, height: u32) {
+ let mut txn = Vec::with_capacity(block.txdata.len());
+ let mut idxn = Vec::with_capacity(block.txdata.len());
+ for (i, tx) in block.txdata.iter().enumerate() {
+ txn.push(tx);
+ idxn.push(i as u32);
+ }
+ self.block_connected(&block.header, height, &txn, &idxn);
+ }
+ fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
+ self.block_disconnected(header, height);
+ }
+}
+
+impl<CS, B, F> AChainListener for (&mut ChannelMonitor<CS>, &B, &F)
+ where CS: keysinterface::ChannelKeys,
+ B: chaininterface::BroadcasterInterface, F: chaininterface::FeeEstimator {
+ fn a_block_connected(&mut self, block: &Block, height: u32) {
+ let mut txn = Vec::with_capacity(block.txdata.len());
+ for tx in block.txdata.iter() {
+ txn.push(tx);
+ }
+ self.0.block_connected(&txn, height, &block.bitcoin_hash(), self.1, self.2);
+ }
+ fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
+ self.0.block_disconnected(height, &header.bitcoin_hash(), self.1, self.2);
+ }
+}
+
+/// Finds the fork point between new_header and old_header, disconnecting blocks from old_header to
+/// get to that point and then connecting blocks until we get to new_header.
+///
+/// We validate the headers along the transition path, but don't fetch blocks until we've
+/// disconnected to the fork point. Thus, we may return an Err() that includes where our tip ended
+/// up which may not be new_header. Note that iff the returned Err has a BlockHeaderData, the
+/// header transition from old_header to new_header is valid.
+async fn sync_chain_monitor<CL : AChainListener + Sized>(new_header: BlockHeaderData, old_header: &BlockHeaderData, block_source: &mut dyn BlockSource, chain_notifier: &mut CL, head_blocks: &mut Vec<BlockHeaderData>, mainnet: bool)
+ -> Result<(), (BlockSourceRespErr, Option<BlockHeaderData>)> {
+ let mut events = find_fork(new_header, old_header, block_source, &*head_blocks, mainnet).await.map_err(|e| (e, None))?;
+
+ let mut last_disconnect_tip = None;
+ let mut new_tip = None;
+ for event in events.iter() {
+ match &event {
+ &ForkStep::DisconnectBlock(ref header) => {
+ println!("Disconnecting block {}", header.header.bitcoin_hash());
+ if let Some(cached_head) = head_blocks.pop() {
+ assert_eq!(cached_head, *header);
+ }
+ chain_notifier.a_block_disconnected(&header.header, header.height);
+ last_disconnect_tip = Some(header.header.prev_blockhash);
+ },
+ &ForkStep::ForkPoint(ref header) => {
+ new_tip = Some(header.clone());
+ },
+ _ => {},
+ }
+ }
+
+ // If we disconnected any blocks, we should have new tip data available, which should match our
+ // cached header data if it is available. If we didn't disconnect any blocks we shouldn't have
+ // set a ForkPoint as there is no fork.
+ assert_eq!(last_disconnect_tip.is_some(), new_tip.is_some());
+ if let &Some(ref tip_header) = &new_tip {
+ if let Some(cached_head) = head_blocks.last() {
+ assert_eq!(cached_head, tip_header);
+ }
+ debug_assert_eq!(tip_header.header.bitcoin_hash(), *last_disconnect_tip.as_ref().unwrap());
+ } else {
+ // Set new_tip to indicate that we got a valid header chain we wanted to connect to, but
+ // failed
+ new_tip = Some(old_header.clone());
+ }
+
+ for event in events.drain(..).rev() {
+ if let ForkStep::ConnectBlock(header_data) = event {
+ let block = match block_source.get_block(&header_data.header.bitcoin_hash()).await {
+ Err(e) => return Err((e, new_tip)),
+ Ok(b) => b,
+ };
+ if block.header != header_data.header || !block.check_merkle_root() || !block.check_witness_commitment() {
+ return Err((BlockSourceRespErr::BogusData, new_tip));
+ }
+ println!("Connecting block {}", header_data.header.bitcoin_hash().to_hex());
+ chain_notifier.a_block_connected(&block, header_data.height);
+ head_blocks.push(header_data.clone());
+ new_tip = Some(header_data);
+ }
+ }
+ Ok(())
+}
+
+/// Do a one-time sync of a chain listener from a single *trusted* block source bringing its view
+/// of the latest chain tip from old_block to new_block. This is useful on startup when you need
+/// to bring each ChannelMonitor, as well as the overall ChannelManager, into sync with each other.
+///
+/// Once you have them all at the same block, you should switch to using MicroSPVClient.
+pub async fn init_sync_chain_monitor<CL : AChainListener + Sized, B: BlockSource>(new_block: BlockHash, old_block: BlockHash, block_source: &mut B, mut chain_notifier: CL) {
+ if &old_block[..] == &[0; 32] { return; }
+
+ let new_header = block_source.get_header(&new_block, None).await.unwrap();
+ assert_eq!(new_header.header.bitcoin_hash(), new_block);
+ stateless_check_header(&new_header.header).unwrap();
+ let old_header = block_source.get_header(&old_block, None).await.unwrap();
+ assert_eq!(old_header.header.bitcoin_hash(), old_block);
+ stateless_check_header(&old_header.header).unwrap();
+ sync_chain_monitor(new_header, &old_header, block_source, &mut chain_notifier, &mut Vec::new(), false).await.unwrap();
+}
+
+/// Keep the chain that a chain listener knows about up-to-date with the best chain from any of the
+/// given block_sources.
+///
+/// This implements a pretty bare-bones SPV client, checking all relevant commitments and finding
+/// the heaviest chain, but not storing the full header chain, leading to some important
+/// limitations.
+///
+/// While we never check full difficulty transition logic, the mainnet option enables checking that
+/// difficulty transitions only happen every two weeks and never shift difficulty more than 4x in
+/// either direction, which is sufficient to prevent most minority hashrate attacks.
+///
+/// We cache any headers which we connect until every block source is in agreement on the best tip.
+/// This prevents one block source from being able to orphan us on a fork of its own creation by
+/// not responding to requests for old headers on that fork. However, if one block source is
+/// unreachable this may result in our memory usage growing in accordance with the chain.
+pub struct MicroSPVClient<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send, CL : AChainListener + Sized> {
+ chain_tip: (BlockHash, BlockHeaderData),
+ block_sources: Vec<B>,
+ backup_block_sources: Vec<B>,
+ cur_blocks: Vec<Result<BlockHash, BlockSourceRespErr>>,
+ blocks_past_common_tip: Vec<BlockHeaderData>,
+ chain_notifier: CL,
+ mainnet: bool
+}
+impl<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send, CL : AChainListener + Sized> MicroSPVClient<'a, B, CL> {
+ /// Create a new MicroSPVClient with a set of block sources and a chain listener which will
+ /// receive updates of the new tip.
+ ///
+ /// We assume that at least one of the provided BlockSources can provide all neccessary headers
+ /// to disconnect from the given chain_tip back to its common ancestor with the best chain.
+ /// We assume that the height, hash, and chain work given in chain_tip are correct.
+ ///
+ /// `backup_block_sources` are never queried unless we learned, via some `block_sources` source
+ /// that there exists a better, valid header chain but we failed to fetch the blocks. This is
+ /// useful when you have a block source which is more censorship-resistant than others but
+ /// which only provides headers. In this case, we can use such source(s) to learn of a censorship
+ /// attack without giving up privacy by querying a privacy-losing block sources.
+ pub fn init(chain_tip: BlockHeaderData, block_sources: Vec<B>, backup_block_sources: Vec<B>, chain_notifier: CL, mainnet: bool) -> Self {
+ let cur_blocks = vec![Err(BlockSourceRespErr::NoResponse); block_sources.len() + backup_block_sources.len()];
+ let blocks_past_common_tip = Vec::new();
+ Self {
+ chain_tip: (chain_tip.header.bitcoin_hash(), chain_tip),
+ block_sources, backup_block_sources, cur_blocks, blocks_past_common_tip, chain_notifier, mainnet
+ }
+ }
+ /// Check each source for a new best tip and update the chain listener accordingly.
+ /// Returns true if some blocks were [dis]connected, false otherwise.
+ pub async fn poll_best_tip(&mut self) -> bool {
+ let mut highest_valid_tip = self.chain_tip.1.chainwork;
+ let mut blocks_connected = false;
+
+ macro_rules! process_source {
+ ($cur_hash: expr, $source: expr) => { {
+ if let Err(BlockSourceRespErr::BogusData) = $cur_hash {
+ // We gave up on this provider, move on.
+ continue;
+ }
+ macro_rules! handle_err {
+ ($err: expr) => {
+ match $err {
+ Ok(r) => r,
+ Err(BlockSourceRespErr::BogusData) => {
+ $cur_hash = Err(BlockSourceRespErr::BogusData);
+ continue;
+ },
+ Err(BlockSourceRespErr::NoResponse) => {
+ continue;
+ },
+ }
+ }
+ }
+ let (new_hash, height_opt) = handle_err!($source.get_best_block().await);
+ if new_hash == self.chain_tip.0 {
+ $cur_hash = Ok(new_hash);
+ continue;
+ }
+ let new_header = handle_err!($source.get_header(&new_hash, height_opt).await);
+ if new_header.header.bitcoin_hash() != new_hash {
+ $cur_hash = Err(BlockSourceRespErr::BogusData);
+ continue;
+ }
+ handle_err!(stateless_check_header(&new_header.header));
+ if new_header.chainwork <= self.chain_tip.1.chainwork {
+ $cur_hash = Ok(new_hash);
+ continue;
+ }
+
+ let syncres = sync_chain_monitor(new_header.clone(), &self.chain_tip.1, &mut *$source, &mut self.chain_notifier, &mut self.blocks_past_common_tip, self.mainnet).await;
+ if let Err((e, new_tip)) = syncres {
+ if let Some(tip) = new_tip {
+ let tiphash = tip.header.bitcoin_hash();
+ if tiphash != self.chain_tip.0 {
+ self.chain_tip = (tiphash, tip);
+ blocks_connected = true;
+ }
+ // We set cur_hash to where we got to since we don't mind dropping our
+ // block header cache if its on a fork that no block sources care about,
+ // but we (may) want to continue trying to get the blocks from this source
+ // the next time we poll.
+ $cur_hash = Ok(tiphash);
+ highest_valid_tip = std::cmp::max(highest_valid_tip, new_header.chainwork);
+ }
+ handle_err!(Err(e));
+ } else {
+ highest_valid_tip = std::cmp::max(highest_valid_tip, new_header.chainwork);
+ self.chain_tip = (new_hash, new_header);
+ $cur_hash = Ok(new_hash);
+ blocks_connected = true;
+ }
+ } }
+ }
+
+ for (cur_hash, source) in self.cur_blocks.iter_mut().take(self.block_sources.len())
+ .zip(self.block_sources.iter_mut()) {
+ process_source!(*cur_hash, *source);
+ }
+
+ if highest_valid_tip != self.chain_tip.1.chainwork {
+ for (cur_hash, source) in self.cur_blocks.iter_mut().skip(self.block_sources.len())
+ .zip(self.backup_block_sources.iter_mut()) {
+ process_source!(*cur_hash, *source);
+ if highest_valid_tip == self.chain_tip.1.chainwork { break; }
+ }
+ }
+
+ let mut common_tip = true;
+ for cur_hash in self.cur_blocks.iter() {
+ if let Ok(hash) = cur_hash {
+ if *hash != self.chain_tip.0 {
+ common_tip = false;
+ break;
+ }
+ }
+ }
+ if common_tip {
+ // All block sources have the same tip. Assume we will be able to trivially get old
+ // headers and drop our reorg cache.
+ self.blocks_past_common_tip.clear();
+ }
+ blocks_connected
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use bitcoin::blockdata::block::{Block, BlockHeader};
+ use bitcoin::util::uint::Uint256;
+ use std::collections::HashMap;
+ use std::sync::{Arc, Mutex};
+
+ struct ChainListener {
+ blocks_connected: Mutex<Vec<(BlockHash, u32)>>,
+ blocks_disconnected: Mutex<Vec<(BlockHash, u32)>>,
+ }
+ impl AChainListener for Arc<ChainListener> {
+ fn a_block_connected(&mut self, block: &Block, height: u32) {
+ self.blocks_connected.lock().unwrap().push((block.header.bitcoin_hash(), height));
+ }
+ fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
+ self.blocks_disconnected.lock().unwrap().push((header.bitcoin_hash(), height));
+ }
+ }
+
+ #[derive(Clone)]
+ struct BlockData {
+ block: Block,
+ chainwork: Uint256,
+ height: u32,
+ }
+ struct Blockchain {
+ blocks: Mutex<HashMap<BlockHash, BlockData>>,
+ best_block: Mutex<(BlockHash, Option<u32>)>,
+ headers_only: bool,
+ disallowed: Mutex<bool>,
+ }
+ impl BlockSource for &Blockchain {
+ fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>> {
+ if *self.disallowed.lock().unwrap() { unreachable!(); }
+ Box::pin(async move {
+ match self.blocks.lock().unwrap().get(header_hash) {
+ Some(block) => {
+ assert_eq!(Some(block.height), height_hint);
+ Ok(BlockHeaderData {
+ chainwork: block.chainwork,
+ height: block.height,
+ header: block.block.header.clone(),
+ })
+ },
+ None => Err(BlockSourceRespErr::NoResponse),
+ }
+ })
+ }
+ fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>> {
+ if *self.disallowed.lock().unwrap() { unreachable!(); }
+ Box::pin(async move {
+ if self.headers_only {
+ Err(BlockSourceRespErr::NoResponse)
+ } else {
+ match self.blocks.lock().unwrap().get(header_hash) {
+ Some(block) => Ok(block.block.clone()),
+ None => Err(BlockSourceRespErr::NoResponse),
+ }
+ }
+ })
+ }
+ fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>> {
+ if *self.disallowed.lock().unwrap() { unreachable!(); }
+ Box::pin(async move { Ok(self.best_block.lock().unwrap().clone()) })
+ }
+ }
+
+ #[tokio::test]
+ async fn simple_block_connect() {
+ let genesis = BlockData {
+ block: bitcoin::blockdata::constants::genesis_block(bitcoin::network::constants::Network::Bitcoin),
+ chainwork: Uint256::from_u64(0).unwrap(),
+ height: 0,
+ };
+
+ // Build a chain based on genesis 1a, 2a, 3a, and 4a
+ let block_1a = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: genesis.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 0,
+ bits: genesis.block.header.bits,
+ nonce: 647569994,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833).unwrap(),
+ height: 1
+ };
+ let block_1a_hash = block_1a.block.header.bitcoin_hash();
+ let block_2a = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_1a.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 4,
+ bits: genesis.block.header.bits,
+ nonce: 1185103332,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 2).unwrap(),
+ height: 2
+ };
+ let block_2a_hash = block_2a.block.header.bitcoin_hash();
+ let block_3a = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_2a.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 6,
+ bits: genesis.block.header.bits,
+ nonce: 198739431,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 3).unwrap(),
+ height: 3
+ };
+ let block_3a_hash = block_3a.block.header.bitcoin_hash();
+ let block_4a = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_3a.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 0,
+ bits: genesis.block.header.bits,
+ nonce: 590371681,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 4).unwrap(),
+ height: 4
+ };
+ let block_4a_hash = block_4a.block.header.bitcoin_hash();
+
+ // Build a second chain based on genesis 1b, 2b, and 3b
+ let block_1b = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: genesis.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 6,
+ bits: genesis.block.header.bits,
+ nonce: 1347696353,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833).unwrap(),
+ height: 1
+ };
+ let block_1b_hash = block_1b.block.header.bitcoin_hash();
+ let block_2b = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_1b.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 5,
+ bits: genesis.block.header.bits,
+ nonce: 144775545,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 2).unwrap(),
+ height: 2
+ };
+ let block_2b_hash = block_2b.block.header.bitcoin_hash();
+
+ // Build a second chain based on 3a: 4c and 5c.
+ let block_4c = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_3a.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 17,
+ bits: genesis.block.header.bits,
+ nonce: 316634915,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 4).unwrap(),
+ height: 4
+ };
+ let block_4c_hash = block_4c.block.header.bitcoin_hash();
+ let block_5c = BlockData {
+ block: Block {
+ header: BlockHeader {
+ version: 0,
+ prev_blockhash: block_4c.block.bitcoin_hash(),
+ merkle_root: Default::default(), time: 3,
+ bits: genesis.block.header.bits,
+ nonce: 218413871,
+ },
+ txdata: Vec::new(),
+ },
+ chainwork: Uint256::from_u64(4295032833 * 5).unwrap(),
+ height: 5
+ };
+ let block_5c_hash = block_5c.block.header.bitcoin_hash();
+
+ // Create four block sources:
+ // * chain_one and chain_two are general purpose block sources which we use to test reorgs,
+ // * headers_chain only provides headers,
+ // * and backup_chain is a backup which should not receive any queries (ie disallowed is
+ // false) until the headers_chain gets ahead of chain_one and chain_two.
+ let mut blocks_one = HashMap::new();
+ blocks_one.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
+ blocks_one.insert(block_1a_hash, block_1a.clone());
+ blocks_one.insert(block_1b_hash, block_1b);
+ blocks_one.insert(block_2b_hash, block_2b);
+ let chain_one = Blockchain {
+ blocks: Mutex::new(blocks_one), best_block: Mutex::new((block_2b_hash, Some(2))),
+ headers_only: false, disallowed: Mutex::new(false)
+ };
+
+ let mut blocks_two = HashMap::new();
+ blocks_two.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
+ blocks_two.insert(block_1a_hash, block_1a.clone());
+ let chain_two = Blockchain {
+ blocks: Mutex::new(blocks_two), best_block: Mutex::new((block_1a_hash, Some(1))),
+ headers_only: false, disallowed: Mutex::new(false)
+ };
+
+ let mut blocks_three = HashMap::new();
+ blocks_three.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
+ blocks_three.insert(block_1a_hash, block_1a.clone());
+ let header_chain = Blockchain {
+ blocks: Mutex::new(blocks_three), best_block: Mutex::new((block_1a_hash, Some(1))),
+ headers_only: true, disallowed: Mutex::new(false)
+ };
+
+ let mut blocks_four = HashMap::new();
+ blocks_four.insert(genesis.block.header.bitcoin_hash(), genesis);
+ blocks_four.insert(block_1a_hash, block_1a);
+ blocks_four.insert(block_2a_hash, block_2a.clone());
+ blocks_four.insert(block_3a_hash, block_3a.clone());
+ let backup_chain = Blockchain {
+ blocks: Mutex::new(blocks_four), best_block: Mutex::new((block_3a_hash, Some(3))),
+ headers_only: false, disallowed: Mutex::new(true)
+ };
+
+ // Stand up a client at block_1a with all four sources:
+ let chain_notifier = Arc::new(ChainListener {
+ blocks_connected: Mutex::new(Vec::new()), blocks_disconnected: Mutex::new(Vec::new())
+ });
+ let mut source_one = &chain_one;
+ let mut source_two = &chain_two;
+ let mut source_three = &header_chain;
+ let mut source_four = &backup_chain;
+ let mut client = MicroSPVClient::init((&chain_one).get_header(&block_1a_hash, Some(1)).await.unwrap(),
+ vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource],
+ vec![&mut source_four as &mut dyn BlockSource],
+ Arc::clone(&chain_notifier), true);
+
+ // Test that we will reorg onto 2b because chain_one knows about 1b + 2b
+ assert!(client.poll_best_tip().await);
+ assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_1a_hash, 1)][..]);
+ assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_1b_hash, 1), (block_2b_hash, 2)][..]);
+ assert_eq!(client.blocks_past_common_tip.len(), 2);
+ assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_1b_hash);
+ assert_eq!(client.blocks_past_common_tip[1].header.bitcoin_hash(), block_2b_hash);
+
+ // Test that even if chain_one (which we just got blocks from) stops responding to block or
+ // header requests we can still reorg back because we never wiped our block cache as
+ // chain_two always considered the "a" chain to contain the tip. We do this by simply
+ // wiping the blocks chain_one knows about:
+ chain_one.blocks.lock().unwrap().clear();
+ chain_notifier.blocks_connected.lock().unwrap().clear();
+ chain_notifier.blocks_disconnected.lock().unwrap().clear();
+
+ // First test that nothing happens if nothing changes:
+ assert!(!client.poll_best_tip().await);
+ assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
+ assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
+
+ // Now add block 2a and 3a to chain_two and test that we reorg appropriately:
+ chain_two.blocks.lock().unwrap().insert(block_2a_hash, block_2a.clone());
+ chain_two.blocks.lock().unwrap().insert(block_3a_hash, block_3a.clone());
+ *chain_two.best_block.lock().unwrap() = (block_3a_hash, Some(3));
+
+ assert!(client.poll_best_tip().await);
+ assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_2b_hash, 2), (block_1b_hash, 1)][..]);
+ assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_1a_hash, 1), (block_2a_hash, 2), (block_3a_hash, 3)][..]);
+
+ // Note that blocks_past_common_tip is not wiped as chain_one still returns 2a as its tip
+ // (though a smarter MicroSPVClient may wipe 1a and 2a from the set eventually.
+ assert_eq!(client.blocks_past_common_tip.len(), 3);
+ assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_1a_hash);
+ assert_eq!(client.blocks_past_common_tip[1].header.bitcoin_hash(), block_2a_hash);
+ assert_eq!(client.blocks_past_common_tip[2].header.bitcoin_hash(), block_3a_hash);
+
+ chain_notifier.blocks_connected.lock().unwrap().clear();
+ chain_notifier.blocks_disconnected.lock().unwrap().clear();
+
+ // Test that after chain_one and header_chain consider 3a as their tip that we'll wipe our
+ // block header cache:
+ *chain_one.best_block.lock().unwrap() = (block_3a_hash, Some(3));
+ *header_chain.best_block.lock().unwrap() = (block_3a_hash, Some(3));
+ assert!(!client.poll_best_tip().await);
+ assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
+ assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
+
+ assert!(client.blocks_past_common_tip.is_empty());
+
+ // Test that setting the header chain to 4a does...almost nothing (though backup_chain
+ // should now be queried) since we can't get the blocks from anywhere.
+ header_chain.blocks.lock().unwrap().insert(block_2a_hash, block_2a);
+ header_chain.blocks.lock().unwrap().insert(block_3a_hash, block_3a);
+ header_chain.blocks.lock().unwrap().insert(block_4a_hash, block_4a.clone());
+ *header_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
+ *backup_chain.disallowed.lock().unwrap() = false;
+
+ assert!(!client.poll_best_tip().await);
+ assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
+ assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
+ assert!(client.blocks_past_common_tip.is_empty());
+
+ // But if backup_chain *also* has 4a, we'll fetch it from there:
+ backup_chain.blocks.lock().unwrap().insert(block_4a_hash, block_4a);
+ *backup_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
+
+ assert!(client.poll_best_tip().await);
+ assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
+ assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
+ assert_eq!(client.blocks_past_common_tip.len(), 1);
+ assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_4a_hash);
+
+ chain_notifier.blocks_connected.lock().unwrap().clear();
+ chain_notifier.blocks_disconnected.lock().unwrap().clear();
+
+ // Note that if only headers_chain has a reorg, we'll end up in a somewhat pessimal case
+ // where we will disconnect and reconnect at each poll. We should fix this at some point by
+ // making sure we can at least fetch one block before we disconnect, but short of using a
+ // ton more memory there isn't much we can do in the case of two disconnects. We check that
+ // the disconnect happens here on a one-block-disconnected reorg, even though its
+ // non-normative behavior, as a good test of failing to reorg and returning back to the
+ // best chain.
+ header_chain.blocks.lock().unwrap().insert(block_4c_hash, block_4c);
+ header_chain.blocks.lock().unwrap().insert(block_5c_hash, block_5c);
+ *header_chain.best_block.lock().unwrap() = (block_5c_hash, Some(5));
+ // We'll check the backup chain last, so don't give it 4a, as otherwise we'll connect it:
+ *backup_chain.best_block.lock().unwrap() = (block_3a_hash, Some(3));
+
+ assert!(client.poll_best_tip().await);
+ assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
+ assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
+
+ chain_notifier.blocks_disconnected.lock().unwrap().clear();
+
+ // Now reset the headers chain to 4a and test that we end up back there.
+ *backup_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
+ *header_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
+ assert!(client.poll_best_tip().await);
+ assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
+ assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
+ }
+}