Add a Headers-over-DNS client to lightning-block-sync.
[rust-lightning] / lightning-block-sync / src / lib.rs
1 //! An implementation of a simple SPV client which can interrogate abstract block sources to keep
2 //! lightning objects on the best chain.
3 //!
4 //! With feature `rpc-client` we provide a client which can fetch blocks from Bitcoin Core's RPC
5 //! interface.
6 //!
7 //! With feature `rest-client` we provide a client which can fetch blocks from Bitcoin Core's REST
8 //! interface.
9 //!
10 //! Both provided clients support either blocking TCP reads from std::net::TcpStream or, with
11 //! feature `tokio`, tokio::net::TcpStream inside a Tokio runtime.
12
13 #[cfg(any(feature = "rest-client", feature = "rpc-client"))]
14 mod utils;
15
16 #[cfg(any(feature = "rest-client", feature = "rpc-client"))]
17 pub mod http_clients;
18
19 pub mod dns_headers;
20
21 use lightning::chain::{chaininterface, keysinterface};
22 use lightning::chain::chaininterface::{BlockNotifierArc, ChainListener};
23 use lightning::ln::channelmonitor::{ChannelMonitor, ManyChannelMonitor};
24 use lightning::ln::channelmanager::SimpleArcChannelManager;
25
26 use bitcoin::hashes::hex::ToHex;
27
28 use bitcoin::blockdata::block::{Block, BlockHeader};
29 use bitcoin::util::hash::BitcoinHash;
30 use bitcoin::util::uint::Uint256;
31 use bitcoin::hash_types::BlockHash;
32
33 use std::future::Future;
34 use std::vec::Vec;
35 use std::pin::Pin;
36 use std::ops::DerefMut;
37
38 #[derive(Clone, Debug, PartialEq)]
39 /// A block header and some associated data. This information should be available from most block
40 /// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces).
41 pub struct BlockHeaderData {
42         /// The total chain work, in expected number of double-SHA256 hashes required to build a chain
43         /// of equivalent weight
44         pub chainwork: Uint256,
45         /// The block height, with the genesis block heigh set to 0
46         pub height: u32,
47         /// The block header itself
48         pub header: BlockHeader
49 }
50
51 #[derive(Debug, Clone)]
52 /// Failure type for requests to block sources.
53 pub enum BlockSourceRespErr {
54         /// Indicates a BlockSource provided bogus data. After this is returned once we will never
55         /// bother polling the returning BlockSource for block data again, so use it sparingly.
56         BogusData,
57         /// Indicates the BlockSource isn't responsive or may be misconfigured but we want to continue
58         /// polling it.
59         NoResponse,
60 }
61 /// Abstract type for a source of block header and block data.
62 pub trait BlockSource : Sync + Send {
63         /// Gets the header for a given hash. The height the header should be at is provided, though
64         /// note that you must return either the header with the requested hash, or an Err, not a
65         /// different header with the same eight.
66         ///
67         /// For sources which cannot find headers based on the hash, returning NoResponse when
68         /// height_hint is None is fine, though get_best_block() should never return a None for height
69         /// on the same source. Such a source should never be used in init_sync_chain_monitor as it
70         /// doesn't have any initial height information.
71         ///
72         /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
73         /// values yet, so we have to Box + dyn the future.
74         fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>>;
75
76         /// Gets the block for a given hash. BlockSources may be headers-only, in which case they
77         /// should always return Err(BlockSourceRespErr::NoResponse) here.
78         /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
79         /// values yet, so we have to Box + dyn the future.
80         fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>>;
81
82         /// Gets the best block hash and, optionally, its height.
83         /// Including the height doesn't impact the chain-scannling algorithm, but it is passed to
84         /// get_header() which may allow some BlockSources to more effeciently find the target header.
85         ///
86         /// Sadly rust's trait system hasn't grown the ability to take impl/differentially-sized return
87         /// values yet, so we have to Box + dyn the future.
88         fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>>;
89 }
90
91 /// Stateless header checks on a given header.
92 #[inline]
93 fn stateless_check_header(header: &BlockHeader) -> Result<(), BlockSourceRespErr> {
94         if header.validate_pow(&header.target()).is_err() {
95                 Err(BlockSourceRespErr::BogusData)
96         } else { Ok(()) }
97 }
98
99 /// Check that child_header correctly builds on previous_header - the claimed work differential
100 /// matches the actual PoW in child_header and the difficulty transition is possible, ie within 4x.
101 /// Includes stateless header checks on previous_header.
102 fn check_builds_on(child_header: &BlockHeaderData, previous_header: &BlockHeaderData, mainnet: bool) -> Result<(), BlockSourceRespErr> {
103         if child_header.header.prev_blockhash != previous_header.header.bitcoin_hash() {
104                 return Err(BlockSourceRespErr::BogusData);
105         }
106
107         stateless_check_header(&previous_header.header)?;
108         let new_work = child_header.header.work();
109         if previous_header.height != child_header.height - 1 ||
110                         previous_header.chainwork + new_work != child_header.chainwork {
111                 return Err(BlockSourceRespErr::BogusData);
112         }
113         if mainnet {
114                 if child_header.height % 2016 == 0 {
115                         let prev_work = previous_header.header.work();
116                         if new_work > prev_work << 2 || new_work < prev_work >> 2 {
117                                 return Err(BlockSourceRespErr::BogusData)
118                         }
119                 } else if child_header.header.bits != previous_header.header.bits {
120                         return Err(BlockSourceRespErr::BogusData)
121                 }
122         }
123         Ok(())
124 }
125
126 enum ForkStep {
127         ForkPoint(BlockHeaderData),
128         DisconnectBlock(BlockHeaderData),
129         ConnectBlock(BlockHeaderData),
130 }
131 fn find_fork_step<'a>(steps_tx: &'a mut Vec<ForkStep>, current_header: BlockHeaderData, prev_header: &'a BlockHeaderData, block_source: &'a mut dyn BlockSource, head_blocks: &'a [BlockHeaderData], mainnet: bool) -> Pin<Box<dyn Future<Output=Result<(), BlockSourceRespErr>> + Send + 'a>> {
132         Box::pin(async move {
133                 if prev_header.header.prev_blockhash == current_header.header.prev_blockhash {
134                         // Found the fork, get the fork point header and we're done!
135                         steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
136                         steps_tx.push(ForkStep::ConnectBlock(current_header));
137                         if !head_blocks.is_empty() {
138                                 let new_prev_header = head_blocks.last().unwrap();
139                                 steps_tx.push(ForkStep::ForkPoint(new_prev_header.clone()));
140                         } else {
141                                 let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
142                                 check_builds_on(&prev_header, &new_prev_header, mainnet)?;
143                                 steps_tx.push(ForkStep::ForkPoint(new_prev_header.clone()));
144                         }
145                 } else if current_header.height == 0 {
146                         // We're connect through genesis, we must be on a different chain!
147                         return Err(BlockSourceRespErr::BogusData);
148                 } else if prev_header.height < current_header.height {
149                         if prev_header.height + 1 == current_header.height &&
150                                         prev_header.header.bitcoin_hash() == current_header.header.prev_blockhash {
151                                 // Current header is the one above prev_header, we're done!
152                                 steps_tx.push(ForkStep::ConnectBlock(current_header));
153                         } else {
154                                 // Current is higher than the prev, walk current down by listing blocks we need to
155                                 // connect
156                                 let new_cur_header = block_source.get_header(&current_header.header.prev_blockhash, Some(current_header.height - 1)).await?;
157                                 check_builds_on(&current_header, &new_cur_header, mainnet)?;
158                                 steps_tx.push(ForkStep::ConnectBlock(current_header));
159                                 find_fork_step(steps_tx, new_cur_header, prev_header, block_source, head_blocks, mainnet).await?;
160                         }
161                 } else if prev_header.height > current_header.height {
162                         // Previous is higher, walk it back and recurse
163                         steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
164                         if !head_blocks.is_empty() {
165                                 let new_prev_header = head_blocks.last().unwrap();
166                                 let new_head_blocks = &head_blocks[..head_blocks.len() - 1];
167                                 find_fork_step(steps_tx, current_header, new_prev_header, block_source, new_head_blocks, mainnet).await?;
168                         } else {
169                                 let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
170                                 check_builds_on(&prev_header, &new_prev_header, mainnet)?;
171                                 find_fork_step(steps_tx, current_header, &new_prev_header, block_source, head_blocks, mainnet).await?;
172                         }
173                 } else {
174                         // Target and current are at the same height, but we're not at fork yet, walk
175                         // both back and recurse
176                         let new_cur_header = block_source.get_header(&current_header.header.prev_blockhash, Some(current_header.height - 1)).await?;
177                         check_builds_on(&current_header, &new_cur_header, mainnet)?;
178                         steps_tx.push(ForkStep::ConnectBlock(current_header));
179                         steps_tx.push(ForkStep::DisconnectBlock(prev_header.clone()));
180                         if !head_blocks.is_empty() {
181                                 let new_prev_header = head_blocks.last().unwrap();
182                                 let new_head_blocks = &head_blocks[..head_blocks.len() - 1];
183                                 find_fork_step(steps_tx, new_cur_header, new_prev_header, block_source, new_head_blocks, mainnet).await?;
184                         } else {
185                                 let new_prev_header = block_source.get_header(&prev_header.header.prev_blockhash, Some(prev_header.height - 1)).await?;
186                                 check_builds_on(&prev_header, &new_prev_header, mainnet)?;
187                                 find_fork_step(steps_tx, new_cur_header, &new_prev_header, block_source, head_blocks, mainnet).await?;
188                         }
189                 }
190                 Ok(())
191         })
192 }
193 /// Walks backwards from current_header and prev_header finding the fork and sending ForkStep events
194 /// into the steps_tx Sender. There is no ordering guarantee between different ForkStep types, but
195 /// DisconnectBlock and ConnectBlock events are each in reverse, height-descending order.
196 async fn find_fork<'a>(current_header: BlockHeaderData, prev_header: &'a BlockHeaderData, block_source: &'a mut dyn BlockSource, mut head_blocks: &'a [BlockHeaderData], mainnet: bool) -> Result<Vec<ForkStep>, BlockSourceRespErr> {
197         let mut steps_tx = Vec::new();
198         if current_header.header == prev_header.header { return Ok(steps_tx); }
199
200         // If we have cached headers, they have to end with where we used to be
201         head_blocks = if !head_blocks.is_empty() {
202                 assert_eq!(head_blocks.last().unwrap(), prev_header);
203                 &head_blocks[..head_blocks.len() - 1]
204         } else { head_blocks };
205
206         find_fork_step(&mut steps_tx, current_header, &prev_header, block_source, head_blocks, mainnet).await?;
207         Ok(steps_tx)
208 }
209
210 /// A dummy trait for capturing an object which wants the chain to be replayed.
211 /// Implemented for lightning BlockNotifiers for general use, as well as
212 /// ChannelManagers and ChannelMonitors to allow for easy replaying of chain
213 /// data upon deserialization.
214 pub trait AChainListener {
215         fn a_block_connected(&mut self, block: &Block, height: u32);
216         fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32);
217 }
218
219 impl AChainListener for &BlockNotifierArc {
220         fn a_block_connected(&mut self, block: &Block, height: u32) {
221                 self.block_connected(block, height);
222         }
223         fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
224                 self.block_disconnected(header, height);
225         }
226 }
227
228 impl<M, B, F> AChainListener for &SimpleArcChannelManager<M, B, F>
229                 where M: ManyChannelMonitor<keysinterface::InMemoryChannelKeys>,
230                       B: chaininterface::BroadcasterInterface, F: chaininterface::FeeEstimator {
231         fn a_block_connected(&mut self, block: &Block, height: u32) {
232                 let mut txn = Vec::with_capacity(block.txdata.len());
233                 let mut idxn = Vec::with_capacity(block.txdata.len());
234                 for (i, tx) in block.txdata.iter().enumerate() {
235                         txn.push(tx);
236                         idxn.push(i as u32);
237                 }
238                 self.block_connected(&block.header, height, &txn, &idxn);
239         }
240         fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
241                 self.block_disconnected(header, height);
242         }
243 }
244
245 impl<CS, B, F> AChainListener for (&mut ChannelMonitor<CS>, &B, &F)
246                 where CS: keysinterface::ChannelKeys,
247                       B: chaininterface::BroadcasterInterface, F: chaininterface::FeeEstimator {
248         fn a_block_connected(&mut self, block: &Block, height: u32) {
249                 let mut txn = Vec::with_capacity(block.txdata.len());
250                 for tx in block.txdata.iter() {
251                         txn.push(tx);
252                 }
253                 self.0.block_connected(&txn, height, &block.bitcoin_hash(), self.1, self.2);
254         }
255         fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
256                 self.0.block_disconnected(height, &header.bitcoin_hash(), self.1, self.2);
257         }
258 }
259
260 /// Finds the fork point between new_header and old_header, disconnecting blocks from old_header to
261 /// get to that point and then connecting blocks until we get to new_header.
262 ///
263 /// We validate the headers along the transition path, but don't fetch blocks until we've
264 /// disconnected to the fork point. Thus, we may return an Err() that includes where our tip ended
265 /// up which may not be new_header. Note that iff the returned Err has a BlockHeaderData, the
266 /// header transition from old_header to new_header is valid.
267 async fn sync_chain_monitor<CL : AChainListener + Sized>(new_header: BlockHeaderData, old_header: &BlockHeaderData, block_source: &mut dyn BlockSource, chain_notifier: &mut CL, head_blocks: &mut Vec<BlockHeaderData>, mainnet: bool)
268                 -> Result<(), (BlockSourceRespErr, Option<BlockHeaderData>)> {
269         let mut events = find_fork(new_header, old_header, block_source, &*head_blocks, mainnet).await.map_err(|e| (e, None))?;
270
271         let mut last_disconnect_tip = None;
272         let mut new_tip = None;
273         for event in events.iter() {
274                 match &event {
275                         &ForkStep::DisconnectBlock(ref header) => {
276                                 println!("Disconnecting block {}", header.header.bitcoin_hash());
277                                 if let Some(cached_head) = head_blocks.pop() {
278                                         assert_eq!(cached_head, *header);
279                                 }
280                                 chain_notifier.a_block_disconnected(&header.header, header.height);
281                                 last_disconnect_tip = Some(header.header.prev_blockhash);
282                         },
283                         &ForkStep::ForkPoint(ref header) => {
284                                 new_tip = Some(header.clone());
285                         },
286                         _ => {},
287                 }
288         }
289
290         // If we disconnected any blocks, we should have new tip data available, which should match our
291         // cached header data if it is available. If we didn't disconnect any blocks we shouldn't have
292         // set a ForkPoint as there is no fork.
293         assert_eq!(last_disconnect_tip.is_some(), new_tip.is_some());
294         if let &Some(ref tip_header) = &new_tip {
295                 if let Some(cached_head) = head_blocks.last() {
296                         assert_eq!(cached_head, tip_header);
297                 }
298                 debug_assert_eq!(tip_header.header.bitcoin_hash(), *last_disconnect_tip.as_ref().unwrap());
299         } else {
300                 // Set new_tip to indicate that we got a valid header chain we wanted to connect to, but
301                 // failed
302                 new_tip = Some(old_header.clone());
303         }
304
305         for event in events.drain(..).rev() {
306                 if let ForkStep::ConnectBlock(header_data) = event {
307                         let block = match block_source.get_block(&header_data.header.bitcoin_hash()).await {
308                                 Err(e) => return Err((e, new_tip)),
309                                 Ok(b) => b,
310                         };
311                         if block.header != header_data.header || !block.check_merkle_root() || !block.check_witness_commitment() {
312                                 return Err((BlockSourceRespErr::BogusData, new_tip));
313                         }
314                         println!("Connecting block {}", header_data.header.bitcoin_hash().to_hex());
315                         chain_notifier.a_block_connected(&block, header_data.height);
316                         head_blocks.push(header_data.clone());
317                         new_tip = Some(header_data);
318                 }
319         }
320         Ok(())
321 }
322
323 /// Do a one-time sync of a chain listener from a single *trusted* block source bringing its view
324 /// of the latest chain tip from old_block to new_block. This is useful on startup when you need
325 /// to bring each ChannelMonitor, as well as the overall ChannelManager, into sync with each other.
326 ///
327 /// Once you have them all at the same block, you should switch to using MicroSPVClient.
328 pub async fn init_sync_chain_monitor<CL : AChainListener + Sized, B: BlockSource>(new_block: BlockHash, old_block: BlockHash, block_source: &mut B, mut chain_notifier: CL) {
329         if &old_block[..] == &[0; 32] { return; }
330
331         let new_header = block_source.get_header(&new_block, None).await.unwrap();
332         assert_eq!(new_header.header.bitcoin_hash(), new_block);
333         stateless_check_header(&new_header.header).unwrap();
334         let old_header = block_source.get_header(&old_block, None).await.unwrap();
335         assert_eq!(old_header.header.bitcoin_hash(), old_block);
336         stateless_check_header(&old_header.header).unwrap();
337         sync_chain_monitor(new_header, &old_header, block_source, &mut chain_notifier, &mut Vec::new(), false).await.unwrap();
338 }
339
340 /// Keep the chain that a chain listener knows about up-to-date with the best chain from any of the
341 /// given block_sources.
342 ///
343 /// This implements a pretty bare-bones SPV client, checking all relevant commitments and finding
344 /// the heaviest chain, but not storing the full header chain, leading to some important
345 /// limitations.
346 ///
347 /// While we never check full difficulty transition logic, the mainnet option enables checking that
348 /// difficulty transitions only happen every two weeks and never shift difficulty more than 4x in
349 /// either direction, which is sufficient to prevent most minority hashrate attacks.
350 ///
351 /// We cache any headers which we connect until every block source is in agreement on the best tip.
352 /// This prevents one block source from being able to orphan us on a fork of its own creation by
353 /// not responding to requests for old headers on that fork. However, if one block source is
354 /// unreachable this may result in our memory usage growing in accordance with the chain.
355 pub struct MicroSPVClient<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send, CL : AChainListener + Sized> {
356         chain_tip: (BlockHash, BlockHeaderData),
357         block_sources: Vec<B>,
358         backup_block_sources: Vec<B>,
359         cur_blocks: Vec<Result<BlockHash, BlockSourceRespErr>>,
360         blocks_past_common_tip: Vec<BlockHeaderData>,
361         chain_notifier: CL,
362         mainnet: bool
363 }
364 impl<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send, CL : AChainListener + Sized> MicroSPVClient<'a, B, CL> {
365         /// Create a new MicroSPVClient with a set of block sources and a chain listener which will
366         /// receive updates of the new tip.
367         ///
368         /// We assume that at least one of the provided BlockSources can provide all neccessary headers
369         /// to disconnect from the given chain_tip back to its common ancestor with the best chain.
370         /// We assume that the height, hash, and chain work given in chain_tip are correct.
371         ///
372         /// `backup_block_sources` are never queried unless we learned, via some `block_sources` source
373         /// that there exists a better, valid header chain but we failed to fetch the blocks. This is
374         /// useful when you have a block source which is more censorship-resistant than others but
375         /// which only provides headers. In this case, we can use such source(s) to learn of a censorship
376         /// attack without giving up privacy by querying a privacy-losing block sources.
377         pub fn init(chain_tip: BlockHeaderData, block_sources: Vec<B>, backup_block_sources: Vec<B>, chain_notifier: CL, mainnet: bool) -> Self {
378                 let cur_blocks = vec![Err(BlockSourceRespErr::NoResponse); block_sources.len() + backup_block_sources.len()];
379                 let blocks_past_common_tip = Vec::new();
380                 Self {
381                         chain_tip: (chain_tip.header.bitcoin_hash(), chain_tip),
382                         block_sources, backup_block_sources, cur_blocks, blocks_past_common_tip, chain_notifier, mainnet
383                 }
384         }
385         /// Check each source for a new best tip and update the chain listener accordingly.
386         /// Returns true if some blocks were [dis]connected, false otherwise.
387         pub async fn poll_best_tip(&mut self) -> bool {
388                 let mut highest_valid_tip = self.chain_tip.1.chainwork;
389                 let mut blocks_connected = false;
390
391                 macro_rules! process_source {
392                         ($cur_hash: expr, $source: expr) => { {
393                                 if let Err(BlockSourceRespErr::BogusData) = $cur_hash {
394                                         // We gave up on this provider, move on.
395                                         continue;
396                                 }
397                                 macro_rules! handle_err {
398                                         ($err: expr) => {
399                                                 match $err {
400                                                         Ok(r) => r,
401                                                         Err(BlockSourceRespErr::BogusData) => {
402                                                                 $cur_hash = Err(BlockSourceRespErr::BogusData);
403                                                                 continue;
404                                                         },
405                                                         Err(BlockSourceRespErr::NoResponse) => {
406                                                                 continue;
407                                                         },
408                                                 }
409                                         }
410                                 }
411                                 let (new_hash, height_opt) = handle_err!($source.get_best_block().await);
412                                 if new_hash == self.chain_tip.0 {
413                                         $cur_hash = Ok(new_hash);
414                                         continue;
415                                 }
416                                 let new_header = handle_err!($source.get_header(&new_hash, height_opt).await);
417                                 if new_header.header.bitcoin_hash() != new_hash {
418                                         $cur_hash = Err(BlockSourceRespErr::BogusData);
419                                         continue;
420                                 }
421                                 handle_err!(stateless_check_header(&new_header.header));
422                                 if new_header.chainwork <= self.chain_tip.1.chainwork {
423                                         $cur_hash = Ok(new_hash);
424                                         continue;
425                                 }
426
427                                 let syncres = sync_chain_monitor(new_header.clone(), &self.chain_tip.1, &mut *$source, &mut self.chain_notifier, &mut self.blocks_past_common_tip, self.mainnet).await;
428                                 if let Err((e, new_tip)) = syncres {
429                                         if let Some(tip) = new_tip {
430                                                 let tiphash = tip.header.bitcoin_hash();
431                                                 if tiphash != self.chain_tip.0 {
432                                                         self.chain_tip = (tiphash, tip);
433                                                         blocks_connected = true;
434                                                 }
435                                                 // We set cur_hash to where we got to since we don't mind dropping our
436                                                 // block header cache if its on a fork that no block sources care about,
437                                                 // but we (may) want to continue trying to get the blocks from this source
438                                                 // the next time we poll.
439                                                 $cur_hash = Ok(tiphash);
440                                                 highest_valid_tip = std::cmp::max(highest_valid_tip, new_header.chainwork);
441                                         }
442                                         handle_err!(Err(e));
443                                 } else {
444                                         highest_valid_tip = std::cmp::max(highest_valid_tip, new_header.chainwork);
445                                         self.chain_tip = (new_hash, new_header);
446                                         $cur_hash = Ok(new_hash);
447                                         blocks_connected = true;
448                                 }
449                         } }
450                 }
451
452                 for (cur_hash, source) in self.cur_blocks.iter_mut().take(self.block_sources.len())
453                                 .zip(self.block_sources.iter_mut()) {
454                         process_source!(*cur_hash, *source);
455                 }
456
457                 if highest_valid_tip != self.chain_tip.1.chainwork {
458                         for (cur_hash, source) in self.cur_blocks.iter_mut().skip(self.block_sources.len())
459                                         .zip(self.backup_block_sources.iter_mut()) {
460                                 process_source!(*cur_hash, *source);
461                                 if highest_valid_tip == self.chain_tip.1.chainwork { break; }
462                         }
463                 }
464
465                 let mut common_tip = true;
466                 for cur_hash in self.cur_blocks.iter() {
467                         if let Ok(hash) = cur_hash {
468                                 if *hash != self.chain_tip.0 {
469                                         common_tip = false;
470                                         break;
471                                 }
472                         }
473                 }
474                 if common_tip {
475                         // All block sources have the same tip. Assume we will be able to trivially get old
476                         // headers and drop our reorg cache.
477                         self.blocks_past_common_tip.clear();
478                 }
479                 blocks_connected
480         }
481 }
482
483 #[cfg(test)]
484 mod tests {
485         use super::*;
486         use bitcoin::blockdata::block::{Block, BlockHeader};
487         use bitcoin::util::uint::Uint256;
488         use std::collections::HashMap;
489         use std::sync::{Arc, Mutex};
490
491         struct ChainListener {
492                 blocks_connected: Mutex<Vec<(BlockHash, u32)>>,
493                 blocks_disconnected: Mutex<Vec<(BlockHash, u32)>>,
494         }
495         impl AChainListener for Arc<ChainListener> {
496                 fn a_block_connected(&mut self, block: &Block, height: u32) {
497                         self.blocks_connected.lock().unwrap().push((block.header.bitcoin_hash(), height));
498                 }
499                 fn a_block_disconnected(&mut self, header: &BlockHeader, height: u32) {
500                         self.blocks_disconnected.lock().unwrap().push((header.bitcoin_hash(), height));
501                 }
502         }
503
504         #[derive(Clone)]
505         struct BlockData {
506                 block: Block,
507                 chainwork: Uint256,
508                 height: u32,
509         }
510         struct Blockchain {
511                 blocks: Mutex<HashMap<BlockHash, BlockData>>,
512                 best_block: Mutex<(BlockHash, Option<u32>)>,
513                 headers_only: bool,
514                 disallowed: Mutex<bool>,
515         }
516         impl BlockSource for &Blockchain {
517                 fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> Pin<Box<dyn Future<Output = Result<BlockHeaderData, BlockSourceRespErr>> + 'a + Send>> {
518                         if *self.disallowed.lock().unwrap() { unreachable!(); }
519                         Box::pin(async move {
520                                 match self.blocks.lock().unwrap().get(header_hash) {
521                                         Some(block) => {
522                                                 assert_eq!(Some(block.height), height_hint);
523                                                 Ok(BlockHeaderData {
524                                                         chainwork: block.chainwork,
525                                                         height: block.height,
526                                                         header: block.block.header.clone(),
527                                                 })
528                                         },
529                                         None => Err(BlockSourceRespErr::NoResponse),
530                                 }
531                         })
532                 }
533                 fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> Pin<Box<dyn Future<Output = Result<Block, BlockSourceRespErr>> + 'a + Send>> {
534                         if *self.disallowed.lock().unwrap() { unreachable!(); }
535                         Box::pin(async move {
536                                 if self.headers_only {
537                                         Err(BlockSourceRespErr::NoResponse)
538                                 } else {
539                                         match self.blocks.lock().unwrap().get(header_hash) {
540                                                 Some(block) => Ok(block.block.clone()),
541                                                 None => Err(BlockSourceRespErr::NoResponse),
542                                         }
543                                 }
544                         })
545                 }
546                 fn get_best_block<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(BlockHash, Option<u32>), BlockSourceRespErr>> + 'a + Send>> {
547                         if *self.disallowed.lock().unwrap() { unreachable!(); }
548                         Box::pin(async move { Ok(self.best_block.lock().unwrap().clone()) })
549                 }
550         }
551
552         #[tokio::test]
553         async fn simple_block_connect() {
554                 let genesis = BlockData {
555                         block: bitcoin::blockdata::constants::genesis_block(bitcoin::network::constants::Network::Bitcoin),
556                         chainwork: Uint256::from_u64(0).unwrap(),
557                         height: 0,
558                 };
559
560                 // Build a chain based on genesis 1a, 2a, 3a, and 4a
561                 let block_1a = BlockData {
562                         block: Block {
563                                 header: BlockHeader {
564                                         version: 0,
565                                         prev_blockhash: genesis.block.bitcoin_hash(),
566                                         merkle_root: Default::default(), time: 0,
567                                         bits: genesis.block.header.bits,
568                                         nonce: 647569994,
569                                 },
570                                 txdata: Vec::new(),
571                         },
572                         chainwork: Uint256::from_u64(4295032833).unwrap(),
573                         height: 1
574                 };
575                 let block_1a_hash = block_1a.block.header.bitcoin_hash();
576                 let block_2a = BlockData {
577                         block: Block {
578                                 header: BlockHeader {
579                                         version: 0,
580                                         prev_blockhash: block_1a.block.bitcoin_hash(),
581                                         merkle_root: Default::default(), time: 4,
582                                         bits: genesis.block.header.bits,
583                                         nonce: 1185103332,
584                                 },
585                                 txdata: Vec::new(),
586                         },
587                         chainwork: Uint256::from_u64(4295032833 * 2).unwrap(),
588                         height: 2
589                 };
590                 let block_2a_hash = block_2a.block.header.bitcoin_hash();
591                 let block_3a = BlockData {
592                         block: Block {
593                                 header: BlockHeader {
594                                         version: 0,
595                                         prev_blockhash: block_2a.block.bitcoin_hash(),
596                                         merkle_root: Default::default(), time: 6,
597                                         bits: genesis.block.header.bits,
598                                         nonce: 198739431,
599                                 },
600                                 txdata: Vec::new(),
601                         },
602                         chainwork: Uint256::from_u64(4295032833 * 3).unwrap(),
603                         height: 3
604                 };
605                 let block_3a_hash = block_3a.block.header.bitcoin_hash();
606                 let block_4a = BlockData {
607                         block: Block {
608                                 header: BlockHeader {
609                                         version: 0,
610                                         prev_blockhash: block_3a.block.bitcoin_hash(),
611                                         merkle_root: Default::default(), time: 0,
612                                         bits: genesis.block.header.bits,
613                                         nonce: 590371681,
614                                 },
615                                 txdata: Vec::new(),
616                         },
617                         chainwork: Uint256::from_u64(4295032833 * 4).unwrap(),
618                         height: 4
619                 };
620                 let block_4a_hash = block_4a.block.header.bitcoin_hash();
621
622                 // Build a second chain based on genesis 1b, 2b, and 3b
623                 let block_1b = BlockData {
624                         block: Block {
625                                 header: BlockHeader {
626                                         version: 0,
627                                         prev_blockhash: genesis.block.bitcoin_hash(),
628                                         merkle_root: Default::default(), time: 6,
629                                         bits: genesis.block.header.bits,
630                                         nonce: 1347696353,
631                                 },
632                                 txdata: Vec::new(),
633                         },
634                         chainwork: Uint256::from_u64(4295032833).unwrap(),
635                         height: 1
636                 };
637                 let block_1b_hash = block_1b.block.header.bitcoin_hash();
638                 let block_2b = BlockData {
639                         block: Block {
640                                 header: BlockHeader {
641                                         version: 0,
642                                         prev_blockhash: block_1b.block.bitcoin_hash(),
643                                         merkle_root: Default::default(), time: 5,
644                                         bits: genesis.block.header.bits,
645                                         nonce: 144775545,
646                                 },
647                                 txdata: Vec::new(),
648                         },
649                         chainwork: Uint256::from_u64(4295032833 * 2).unwrap(),
650                         height: 2
651                 };
652                 let block_2b_hash = block_2b.block.header.bitcoin_hash();
653
654                 // Build a second chain based on 3a: 4c and 5c.
655                 let block_4c = BlockData {
656                         block: Block {
657                                 header: BlockHeader {
658                                         version: 0,
659                                         prev_blockhash: block_3a.block.bitcoin_hash(),
660                                         merkle_root: Default::default(), time: 17,
661                                         bits: genesis.block.header.bits,
662                                         nonce: 316634915,
663                                 },
664                                 txdata: Vec::new(),
665                         },
666                         chainwork: Uint256::from_u64(4295032833 * 4).unwrap(),
667                         height: 4
668                 };
669                 let block_4c_hash = block_4c.block.header.bitcoin_hash();
670                 let block_5c = BlockData {
671                         block: Block {
672                                 header: BlockHeader {
673                                         version: 0,
674                                         prev_blockhash: block_4c.block.bitcoin_hash(),
675                                         merkle_root: Default::default(), time: 3,
676                                         bits: genesis.block.header.bits,
677                                         nonce: 218413871,
678                                 },
679                                 txdata: Vec::new(),
680                         },
681                         chainwork: Uint256::from_u64(4295032833 * 5).unwrap(),
682                         height: 5
683                 };
684                 let block_5c_hash = block_5c.block.header.bitcoin_hash();
685
686                 // Create four block sources:
687                 // * chain_one and chain_two are general purpose block sources which we use to test reorgs,
688                 // * headers_chain only provides headers,
689                 // * and backup_chain is a backup which should not receive any queries (ie disallowed is
690                 //   false) until the headers_chain gets ahead of chain_one and chain_two.
691                 let mut blocks_one = HashMap::new();
692                 blocks_one.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
693                 blocks_one.insert(block_1a_hash, block_1a.clone());
694                 blocks_one.insert(block_1b_hash, block_1b);
695                 blocks_one.insert(block_2b_hash, block_2b);
696                 let chain_one = Blockchain {
697                         blocks: Mutex::new(blocks_one), best_block: Mutex::new((block_2b_hash, Some(2))),
698                         headers_only: false, disallowed: Mutex::new(false)
699                 };
700
701                 let mut blocks_two = HashMap::new();
702                 blocks_two.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
703                 blocks_two.insert(block_1a_hash, block_1a.clone());
704                 let chain_two = Blockchain {
705                         blocks: Mutex::new(blocks_two), best_block: Mutex::new((block_1a_hash, Some(1))),
706                         headers_only: false, disallowed: Mutex::new(false)
707                 };
708
709                 let mut blocks_three = HashMap::new();
710                 blocks_three.insert(genesis.block.header.bitcoin_hash(), genesis.clone());
711                 blocks_three.insert(block_1a_hash, block_1a.clone());
712                 let header_chain = Blockchain {
713                         blocks: Mutex::new(blocks_three), best_block: Mutex::new((block_1a_hash, Some(1))),
714                         headers_only: true, disallowed: Mutex::new(false)
715                 };
716
717                 let mut blocks_four = HashMap::new();
718                 blocks_four.insert(genesis.block.header.bitcoin_hash(), genesis);
719                 blocks_four.insert(block_1a_hash, block_1a);
720                 blocks_four.insert(block_2a_hash, block_2a.clone());
721                 blocks_four.insert(block_3a_hash, block_3a.clone());
722                 let backup_chain = Blockchain {
723                         blocks: Mutex::new(blocks_four), best_block: Mutex::new((block_3a_hash, Some(3))),
724                         headers_only: false, disallowed: Mutex::new(true)
725                 };
726
727                 // Stand up a client at block_1a with all four sources:
728                 let chain_notifier = Arc::new(ChainListener {
729                         blocks_connected: Mutex::new(Vec::new()), blocks_disconnected: Mutex::new(Vec::new())
730                 });
731                 let mut source_one = &chain_one;
732                 let mut source_two = &chain_two;
733                 let mut source_three = &header_chain;
734                 let mut source_four = &backup_chain;
735                 let mut client = MicroSPVClient::init((&chain_one).get_header(&block_1a_hash, Some(1)).await.unwrap(),
736                         vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource],
737                         vec![&mut source_four as &mut dyn BlockSource],
738                         Arc::clone(&chain_notifier), true);
739
740                 // Test that we will reorg onto 2b because chain_one knows about 1b + 2b
741                 assert!(client.poll_best_tip().await);
742                 assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_1a_hash, 1)][..]);
743                 assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_1b_hash, 1), (block_2b_hash, 2)][..]);
744                 assert_eq!(client.blocks_past_common_tip.len(), 2);
745                 assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_1b_hash);
746                 assert_eq!(client.blocks_past_common_tip[1].header.bitcoin_hash(), block_2b_hash);
747
748                 // Test that even if chain_one (which we just got blocks from) stops responding to block or
749                 // header requests we can still reorg back because we never wiped our block cache as
750                 // chain_two always considered the "a" chain to contain the tip. We do this by simply
751                 // wiping the blocks chain_one knows about:
752                 chain_one.blocks.lock().unwrap().clear();
753                 chain_notifier.blocks_connected.lock().unwrap().clear();
754                 chain_notifier.blocks_disconnected.lock().unwrap().clear();
755
756                 // First test that nothing happens if nothing changes:
757                 assert!(!client.poll_best_tip().await);
758                 assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
759                 assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
760
761                 // Now add block 2a and 3a to chain_two and test that we reorg appropriately:
762                 chain_two.blocks.lock().unwrap().insert(block_2a_hash, block_2a.clone());
763                 chain_two.blocks.lock().unwrap().insert(block_3a_hash, block_3a.clone());
764                 *chain_two.best_block.lock().unwrap() = (block_3a_hash, Some(3));
765
766                 assert!(client.poll_best_tip().await);
767                 assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_2b_hash, 2), (block_1b_hash, 1)][..]);
768                 assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_1a_hash, 1), (block_2a_hash, 2), (block_3a_hash, 3)][..]);
769
770                 // Note that blocks_past_common_tip is not wiped as chain_one still returns 2a as its tip
771                 // (though a smarter MicroSPVClient may wipe 1a and 2a from the set eventually.
772                 assert_eq!(client.blocks_past_common_tip.len(), 3);
773                 assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_1a_hash);
774                 assert_eq!(client.blocks_past_common_tip[1].header.bitcoin_hash(), block_2a_hash);
775                 assert_eq!(client.blocks_past_common_tip[2].header.bitcoin_hash(), block_3a_hash);
776
777                 chain_notifier.blocks_connected.lock().unwrap().clear();
778                 chain_notifier.blocks_disconnected.lock().unwrap().clear();
779
780                 // Test that after chain_one and header_chain consider 3a as their tip that we'll wipe our
781                 // block header cache:
782                 *chain_one.best_block.lock().unwrap() = (block_3a_hash, Some(3));
783                 *header_chain.best_block.lock().unwrap() = (block_3a_hash, Some(3));
784                 assert!(!client.poll_best_tip().await);
785                 assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
786                 assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
787
788                 assert!(client.blocks_past_common_tip.is_empty());
789
790                 // Test that setting the header chain to 4a does...almost nothing (though backup_chain
791                 // should now be queried) since we can't get the blocks from anywhere.
792                 header_chain.blocks.lock().unwrap().insert(block_2a_hash, block_2a);
793                 header_chain.blocks.lock().unwrap().insert(block_3a_hash, block_3a);
794                 header_chain.blocks.lock().unwrap().insert(block_4a_hash, block_4a.clone());
795                 *header_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
796                 *backup_chain.disallowed.lock().unwrap() = false;
797
798                 assert!(!client.poll_best_tip().await);
799                 assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
800                 assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
801                 assert!(client.blocks_past_common_tip.is_empty());
802
803                 // But if backup_chain *also* has 4a, we'll fetch it from there:
804                 backup_chain.blocks.lock().unwrap().insert(block_4a_hash, block_4a);
805                 *backup_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
806
807                 assert!(client.poll_best_tip().await);
808                 assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
809                 assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
810                 assert_eq!(client.blocks_past_common_tip.len(), 1);
811                 assert_eq!(client.blocks_past_common_tip[0].header.bitcoin_hash(), block_4a_hash);
812
813                 chain_notifier.blocks_connected.lock().unwrap().clear();
814                 chain_notifier.blocks_disconnected.lock().unwrap().clear();
815
816                 // Note that if only headers_chain has a reorg, we'll end up in a somewhat pessimal case
817                 // where we will disconnect and reconnect at each poll. We should fix this at some point by
818                 // making sure we can at least fetch one block before we disconnect, but short of using a
819                 // ton more memory there isn't much we can do in the case of two disconnects. We check that
820                 // the disconnect happens here on a one-block-disconnected reorg, even though its
821                 // non-normative behavior, as a good test of failing to reorg and returning back to the
822                 // best chain.
823                 header_chain.blocks.lock().unwrap().insert(block_4c_hash, block_4c);
824                 header_chain.blocks.lock().unwrap().insert(block_5c_hash, block_5c);
825                 *header_chain.best_block.lock().unwrap() = (block_5c_hash, Some(5));
826                 // We'll check the backup chain last, so don't give it 4a, as otherwise we'll connect it:
827                 *backup_chain.best_block.lock().unwrap() = (block_3a_hash, Some(3));
828
829                 assert!(client.poll_best_tip().await);
830                 assert_eq!(&chain_notifier.blocks_disconnected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
831                 assert!(chain_notifier.blocks_connected.lock().unwrap().is_empty());
832
833                 chain_notifier.blocks_disconnected.lock().unwrap().clear();
834
835                 // Now reset the headers chain to 4a and test that we end up back there.
836                 *backup_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
837                 *header_chain.best_block.lock().unwrap() = (block_4a_hash, Some(4));
838                 assert!(client.poll_best_tip().await);
839                 assert!(chain_notifier.blocks_disconnected.lock().unwrap().is_empty());
840                 assert_eq!(&chain_notifier.blocks_connected.lock().unwrap()[..], &[(block_4a_hash, 4)][..]);
841         }
842 }