Enforce doc link correctness during `cargo doc` runs and in CI
[rust-lightning] / lightning-block-sync / src / lib.rs
1 //! A lightweight client for keeping in sync with chain activity.
2 //!
3 //! Defines an [`SpvClient`] utility for polling one or more block sources for the best chain tip.
4 //! It is used to notify listeners of blocks connected or disconnected since the last poll. Useful
5 //! for keeping a Lightning node in sync with the chain.
6 //!
7 //! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers
8 //! and data.
9 //!
10 //! Enabling feature `rest-client` or `rpc-client` allows configuring the client to fetch blocks
11 //! using Bitcoin Core's REST or RPC interface, respectively.
12 //!
13 //! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`,
14 //! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime.
15 //!
16 //! [`SpvClient`]: struct.SpvClient.html
17 //! [`BlockSource`]: trait.BlockSource.html
18
19 #![deny(broken_intra_doc_links)]
20 #![deny(unsafe_code)]
21
22 #[cfg(any(feature = "rest-client", feature = "rpc-client"))]
23 pub mod http;
24
25 pub mod init;
26 pub mod poll;
27
28 #[cfg(feature = "rest-client")]
29 pub mod rest;
30
31 #[cfg(feature = "rpc-client")]
32 pub mod rpc;
33
34 #[cfg(any(feature = "rest-client", feature = "rpc-client"))]
35 mod convert;
36
37 #[cfg(test)]
38 mod test_utils;
39
40 #[cfg(any(feature = "rest-client", feature = "rpc-client"))]
41 mod utils;
42
43 use crate::poll::{ChainTip, Poll, ValidatedBlockHeader};
44
45 use bitcoin::blockdata::block::{Block, BlockHeader};
46 use bitcoin::hash_types::BlockHash;
47 use bitcoin::util::uint::Uint256;
48
49 use lightning::chain;
50 use lightning::chain::Listen;
51
52 use std::future::Future;
53 use std::ops::Deref;
54 use std::pin::Pin;
55
56 /// Abstract type for retrieving block headers and data.
57 pub trait BlockSource : Sync + Send {
58         /// Returns the header for a given hash. A height hint may be provided in case a block source
59         /// cannot easily find headers based on a hash. This is merely a hint and thus the returned
60         /// header must have the same hash as was requested. Otherwise, an error must be returned.
61         ///
62         /// Implementations that cannot find headers based on the hash should return a `Transient` error
63         /// when `height_hint` is `None`.
64         fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
65
66         /// Returns the block for a given hash. A headers-only block source should return a `Transient`
67         /// error.
68         fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
69
70         /// Returns the hash of the best block and, optionally, its height.
71         ///
72         /// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`]
73         /// to allow for a more efficient lookup.
74         ///
75         /// [`Poll`]: poll/trait.Poll.html
76         /// [`get_header`]: #tymethod.get_header
77         fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
78 }
79
80 /// Result type for `BlockSource` requests.
81 type BlockSourceResult<T> = Result<T, BlockSourceError>;
82
83 // TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details,
84 // see: https://areweasyncyet.rs.
85 /// Result type for asynchronous `BlockSource` requests.
86 type AsyncBlockSourceResult<'a, T> = Pin<Box<dyn Future<Output = BlockSourceResult<T>> + 'a + Send>>;
87
88 /// Error type for `BlockSource` requests.
89 ///
90 /// Transient errors may be resolved when re-polling, but no attempt will be made to re-poll on
91 /// persistent errors.
92 #[derive(Debug)]
93 pub struct BlockSourceError {
94         kind: BlockSourceErrorKind,
95         error: Box<dyn std::error::Error + Send + Sync>,
96 }
97
98 /// The kind of `BlockSourceError`, either persistent or transient.
99 #[derive(Clone, Copy, Debug, PartialEq)]
100 pub enum BlockSourceErrorKind {
101         /// Indicates an error that won't resolve when retrying a request (e.g., invalid data).
102         Persistent,
103
104         /// Indicates an error that may resolve when retrying a request (e.g., unresponsive).
105         Transient,
106 }
107
108 impl BlockSourceError {
109         /// Creates a new persistent error originated from the given error.
110         pub fn persistent<E>(error: E) -> Self
111         where E: Into<Box<dyn std::error::Error + Send + Sync>> {
112                 Self {
113                         kind: BlockSourceErrorKind::Persistent,
114                         error: error.into(),
115                 }
116         }
117
118         /// Creates a new transient error originated from the given error.
119         pub fn transient<E>(error: E) -> Self
120         where E: Into<Box<dyn std::error::Error + Send + Sync>> {
121                 Self {
122                         kind: BlockSourceErrorKind::Transient,
123                         error: error.into(),
124                 }
125         }
126
127         /// Returns the kind of error.
128         pub fn kind(&self) -> BlockSourceErrorKind {
129                 self.kind
130         }
131
132         /// Converts the error into the underlying error.
133         pub fn into_inner(self) -> Box<dyn std::error::Error + Send + Sync> {
134                 self.error
135         }
136 }
137
138 /// A block header and some associated data. This information should be available from most block
139 /// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces).
140 #[derive(Clone, Copy, Debug, PartialEq)]
141 pub struct BlockHeaderData {
142         /// The block header itself.
143         pub header: BlockHeader,
144
145         /// The block height where the genesis block has height 0.
146         pub height: u32,
147
148         /// The total chain work in expected number of double-SHA256 hashes required to build a chain
149         /// of equivalent weight.
150         pub chainwork: Uint256,
151 }
152
153 /// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified
154 /// Payment Verification (SPV).
155 ///
156 /// The client is parameterized by a chain poller which is responsible for polling one or more block
157 /// sources for the best chain tip. During this process it detects any chain forks, determines which
158 /// constitutes the best chain, and updates the listener accordingly with any blocks that were
159 /// connected or disconnected since the last poll.
160 ///
161 /// Block headers for the best chain are maintained in the parameterized cache, allowing for a
162 /// custom cache eviction policy. This offers flexibility to those sensitive to resource usage.
163 /// Hence, there is a trade-off between a lower memory footprint and potentially increased network
164 /// I/O as headers are re-fetched during fork detection.
165 pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref>
166 where L::Target: chain::Listen {
167         chain_tip: ValidatedBlockHeader,
168         chain_poller: P,
169         chain_notifier: ChainNotifier<'a, C, L>,
170 }
171
172 /// The `Cache` trait defines behavior for managing a block header cache, where block headers are
173 /// keyed by block hash.
174 ///
175 /// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring
176 /// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if
177 /// the block source does not store stale forks indefinitely).
178 ///
179 /// Implementations may define how long to retain headers such that it's unlikely they will ever be
180 /// needed to disconnect a block.  In cases where block sources provide access to headers on stale
181 /// forks reliably, caches may be entirely unnecessary.
182 ///
183 /// [`ChainNotifier`]: struct.ChainNotifier.html
184 pub trait Cache {
185         /// Retrieves the block header keyed by the given block hash.
186         fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;
187
188         /// Called when a block has been connected to the best chain to ensure it is available to be
189         /// disconnected later if needed.
190         fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);
191
192         /// Called when a block has been disconnected from the best chain. Once disconnected, a block's
193         /// header is no longer needed and thus can be removed.
194         fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
195 }
196
197 /// Unbounded cache of block headers keyed by block hash.
198 pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
199
200 impl Cache for UnboundedCache {
201         fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
202                 self.get(block_hash)
203         }
204
205         fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
206                 self.insert(block_hash, block_header);
207         }
208
209         fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
210                 self.remove(block_hash)
211         }
212 }
213
214 impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: chain::Listen {
215         /// Creates a new SPV client using `chain_tip` as the best known chain tip.
216         ///
217         /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain
218         /// poller, which may be configured with one or more block sources to query. At least one block
219         /// source must provide headers back from the best chain tip to its common ancestor with
220         /// `chain_tip`.
221         /// * `header_cache` is used to look up and store headers on the best chain
222         /// * `chain_listener` is notified of any blocks connected or disconnected
223         ///
224         /// [`poll_best_tip`]: struct.SpvClient.html#method.poll_best_tip
225         pub fn new(
226                 chain_tip: ValidatedBlockHeader,
227                 chain_poller: P,
228                 header_cache: &'a mut C,
229                 chain_listener: L,
230         ) -> Self {
231                 let chain_notifier = ChainNotifier { header_cache, chain_listener };
232                 Self { chain_tip, chain_poller, chain_notifier }
233         }
234
235         /// Polls for the best tip and updates the chain listener with any connected or disconnected
236         /// blocks accordingly.
237         ///
238         /// Returns the best polled chain tip relative to the previous best known tip and whether any
239         /// blocks were indeed connected or disconnected.
240         pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> {
241                 let chain_tip = self.chain_poller.poll_chain_tip(self.chain_tip).await?;
242                 let blocks_connected = match chain_tip {
243                         ChainTip::Common => false,
244                         ChainTip::Better(chain_tip) => {
245                                 debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash);
246                                 debug_assert!(chain_tip.chainwork > self.chain_tip.chainwork);
247                                 self.update_chain_tip(chain_tip).await
248                         },
249                         ChainTip::Worse(chain_tip) => {
250                                 debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash);
251                                 debug_assert!(chain_tip.chainwork <= self.chain_tip.chainwork);
252                                 false
253                         },
254                 };
255                 Ok((chain_tip, blocks_connected))
256         }
257
258         /// Updates the chain tip, syncing the chain listener with any connected or disconnected
259         /// blocks. Returns whether there were any such blocks.
260         async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool {
261                 match self.chain_notifier.synchronize_listener(
262                         best_chain_tip, &self.chain_tip, &mut self.chain_poller).await
263                 {
264                         Ok(_) => {
265                                 self.chain_tip = best_chain_tip;
266                                 true
267                         },
268                         Err((_, Some(chain_tip))) if chain_tip.block_hash != self.chain_tip.block_hash => {
269                                 self.chain_tip = chain_tip;
270                                 true
271                         },
272                         Err(_) => false,
273                 }
274         }
275 }
276
277 /// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
278 ///
279 /// [listeners]: ../../lightning/chain/trait.Listen.html
280 pub struct ChainNotifier<'a, C: Cache, L: Deref> where L::Target: chain::Listen {
281         /// Cache for looking up headers before fetching from a block source.
282         header_cache: &'a mut C,
283
284         /// Listener that will be notified of connected or disconnected blocks.
285         chain_listener: L,
286 }
287
288 /// Changes made to the chain between subsequent polls that transformed it from having one chain tip
289 /// to another.
290 ///
291 /// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
292 /// before new blocks are connected in reverse order.
293 struct ChainDifference {
294         /// The most recent ancestor common between the chain tips.
295         ///
296         /// If there are any disconnected blocks, this is where the chain forked.
297         common_ancestor: ValidatedBlockHeader,
298
299         /// Blocks that were disconnected from the chain since the last poll.
300         disconnected_blocks: Vec<ValidatedBlockHeader>,
301
302         /// Blocks that were connected to the chain since the last poll.
303         connected_blocks: Vec<ValidatedBlockHeader>,
304 }
305
306 impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Listen {
307         /// Finds the first common ancestor between `new_header` and `old_header`, disconnecting blocks
308         /// from `old_header` to get to that point and then connecting blocks until `new_header`.
309         ///
310         /// Validates headers along the transition path, but doesn't fetch blocks until the chain is
311         /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
312         /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header
313         /// if and only if the transition from `old_header` to `new_header` is valid.
314         async fn synchronize_listener<P: Poll>(
315                 &mut self,
316                 new_header: ValidatedBlockHeader,
317                 old_header: &ValidatedBlockHeader,
318                 chain_poller: &mut P,
319         ) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
320                 let difference = self.find_difference(new_header, old_header, chain_poller).await
321                         .map_err(|e| (e, None))?;
322                 self.disconnect_blocks(difference.disconnected_blocks);
323                 self.connect_blocks(
324                         difference.common_ancestor,
325                         difference.connected_blocks,
326                         chain_poller,
327                 ).await
328         }
329
330         /// Returns the changes needed to produce the chain with `current_header` as its tip from the
331         /// chain with `prev_header` as its tip.
332         ///
333         /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
334         async fn find_difference<P: Poll>(
335                 &self,
336                 current_header: ValidatedBlockHeader,
337                 prev_header: &ValidatedBlockHeader,
338                 chain_poller: &mut P,
339         ) -> BlockSourceResult<ChainDifference> {
340                 let mut disconnected_blocks = Vec::new();
341                 let mut connected_blocks = Vec::new();
342                 let mut current = current_header;
343                 let mut previous = *prev_header;
344                 loop {
345                         // Found the common ancestor.
346                         if current.block_hash == previous.block_hash {
347                                 break;
348                         }
349
350                         // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
351                         // the header with the greater height, or both if equal heights.
352                         let current_height = current.height;
353                         let previous_height = previous.height;
354                         if current_height <= previous_height {
355                                 disconnected_blocks.push(previous);
356                                 previous = self.look_up_previous_header(chain_poller, &previous).await?;
357                         }
358                         if current_height >= previous_height {
359                                 connected_blocks.push(current);
360                                 current = self.look_up_previous_header(chain_poller, &current).await?;
361                         }
362                 }
363
364                 let common_ancestor = current;
365                 Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks })
366         }
367
368         /// Returns the previous header for the given header, either by looking it up in the cache or
369         /// fetching it if not found.
370         async fn look_up_previous_header<P: Poll>(
371                 &self,
372                 chain_poller: &mut P,
373                 header: &ValidatedBlockHeader,
374         ) -> BlockSourceResult<ValidatedBlockHeader> {
375                 match self.header_cache.look_up(&header.header.prev_blockhash) {
376                         Some(prev_header) => Ok(*prev_header),
377                         None => chain_poller.look_up_previous_header(header).await,
378                 }
379         }
380
381         /// Notifies the chain listeners of disconnected blocks.
382         fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec<ValidatedBlockHeader>) {
383                 for header in disconnected_blocks.drain(..) {
384                         if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
385                                 assert_eq!(cached_header, header);
386                         }
387                         self.chain_listener.block_disconnected(&header.header, header.height);
388                 }
389         }
390
391         /// Notifies the chain listeners of connected blocks.
392         async fn connect_blocks<P: Poll>(
393                 &mut self,
394                 mut new_tip: ValidatedBlockHeader,
395                 mut connected_blocks: Vec<ValidatedBlockHeader>,
396                 chain_poller: &mut P,
397         ) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
398                 for header in connected_blocks.drain(..).rev() {
399                         let block = chain_poller
400                                 .fetch_block(&header).await
401                                 .or_else(|e| Err((e, Some(new_tip))))?;
402                         debug_assert_eq!(block.block_hash, header.block_hash);
403
404                         self.header_cache.block_connected(header.block_hash, header);
405                         self.chain_listener.block_connected(&block, header.height);
406                         new_tip = header;
407                 }
408
409                 Ok(())
410         }
411 }
412
413 #[cfg(test)]
414 mod spv_client_tests {
415         use crate::test_utils::{Blockchain, NullChainListener};
416         use super::*;
417
418         use bitcoin::network::constants::Network;
419
420         #[tokio::test]
421         async fn poll_from_chain_without_headers() {
422                 let mut chain = Blockchain::default().with_height(3).without_headers();
423                 let best_tip = chain.at_height(1);
424
425                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
426                 let mut cache = UnboundedCache::new();
427                 let mut listener = NullChainListener {};
428                 let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
429                 match client.poll_best_tip().await {
430                         Err(e) => {
431                                 assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
432                                 assert_eq!(e.into_inner().as_ref().to_string(), "header not found");
433                         },
434                         Ok(_) => panic!("Expected error"),
435                 }
436                 assert_eq!(client.chain_tip, best_tip);
437         }
438
439         #[tokio::test]
440         async fn poll_from_chain_with_common_tip() {
441                 let mut chain = Blockchain::default().with_height(3);
442                 let common_tip = chain.tip();
443
444                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
445                 let mut cache = UnboundedCache::new();
446                 let mut listener = NullChainListener {};
447                 let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener);
448                 match client.poll_best_tip().await {
449                         Err(e) => panic!("Unexpected error: {:?}", e),
450                         Ok((chain_tip, blocks_connected)) => {
451                                 assert_eq!(chain_tip, ChainTip::Common);
452                                 assert!(!blocks_connected);
453                         },
454                 }
455                 assert_eq!(client.chain_tip, common_tip);
456         }
457
458         #[tokio::test]
459         async fn poll_from_chain_with_better_tip() {
460                 let mut chain = Blockchain::default().with_height(3);
461                 let new_tip = chain.tip();
462                 let old_tip = chain.at_height(1);
463
464                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
465                 let mut cache = UnboundedCache::new();
466                 let mut listener = NullChainListener {};
467                 let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
468                 match client.poll_best_tip().await {
469                         Err(e) => panic!("Unexpected error: {:?}", e),
470                         Ok((chain_tip, blocks_connected)) => {
471                                 assert_eq!(chain_tip, ChainTip::Better(new_tip));
472                                 assert!(blocks_connected);
473                         },
474                 }
475                 assert_eq!(client.chain_tip, new_tip);
476         }
477
478         #[tokio::test]
479         async fn poll_from_chain_with_better_tip_and_without_any_new_blocks() {
480                 let mut chain = Blockchain::default().with_height(3).without_blocks(2..);
481                 let new_tip = chain.tip();
482                 let old_tip = chain.at_height(1);
483
484                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
485                 let mut cache = UnboundedCache::new();
486                 let mut listener = NullChainListener {};
487                 let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
488                 match client.poll_best_tip().await {
489                         Err(e) => panic!("Unexpected error: {:?}", e),
490                         Ok((chain_tip, blocks_connected)) => {
491                                 assert_eq!(chain_tip, ChainTip::Better(new_tip));
492                                 assert!(!blocks_connected);
493                         },
494                 }
495                 assert_eq!(client.chain_tip, old_tip);
496         }
497
498         #[tokio::test]
499         async fn poll_from_chain_with_better_tip_and_without_some_new_blocks() {
500                 let mut chain = Blockchain::default().with_height(3).without_blocks(3..);
501                 let new_tip = chain.tip();
502                 let old_tip = chain.at_height(1);
503
504                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
505                 let mut cache = UnboundedCache::new();
506                 let mut listener = NullChainListener {};
507                 let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
508                 match client.poll_best_tip().await {
509                         Err(e) => panic!("Unexpected error: {:?}", e),
510                         Ok((chain_tip, blocks_connected)) => {
511                                 assert_eq!(chain_tip, ChainTip::Better(new_tip));
512                                 assert!(blocks_connected);
513                         },
514                 }
515                 assert_eq!(client.chain_tip, chain.at_height(2));
516         }
517
518         #[tokio::test]
519         async fn poll_from_chain_with_worse_tip() {
520                 let mut chain = Blockchain::default().with_height(3);
521                 let best_tip = chain.tip();
522                 chain.disconnect_tip();
523                 let worse_tip = chain.tip();
524
525                 let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
526                 let mut cache = UnboundedCache::new();
527                 let mut listener = NullChainListener {};
528                 let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
529                 match client.poll_best_tip().await {
530                         Err(e) => panic!("Unexpected error: {:?}", e),
531                         Ok((chain_tip, blocks_connected)) => {
532                                 assert_eq!(chain_tip, ChainTip::Worse(worse_tip));
533                                 assert!(!blocks_connected);
534                         },
535                 }
536                 assert_eq!(client.chain_tip, best_tip);
537         }
538 }
539
540 #[cfg(test)]
541 mod chain_notifier_tests {
542         use crate::test_utils::{Blockchain, MockChainListener};
543         use super::*;
544
545         use bitcoin::network::constants::Network;
546
547         #[tokio::test]
548         async fn sync_from_same_chain() {
549                 let mut chain = Blockchain::default().with_height(3);
550
551                 let new_tip = chain.tip();
552                 let old_tip = chain.at_height(1);
553                 let chain_listener = &MockChainListener::new()
554                         .expect_block_connected(*chain.at_height(2))
555                         .expect_block_connected(*new_tip);
556                 let mut notifier = ChainNotifier {
557                         header_cache: &mut chain.header_cache(0..=1),
558                         chain_listener,
559                 };
560                 let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
561                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
562                         Err((e, _)) => panic!("Unexpected error: {:?}", e),
563                         Ok(_) => {},
564                 }
565         }
566
567         #[tokio::test]
568         async fn sync_from_different_chains() {
569                 let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1);
570                 let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1);
571
572                 let new_tip = test_chain.tip();
573                 let old_tip = main_chain.tip();
574                 let chain_listener = &MockChainListener::new();
575                 let mut notifier = ChainNotifier {
576                         header_cache: &mut main_chain.header_cache(0..=1),
577                         chain_listener,
578                 };
579                 let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet);
580                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
581                         Err((e, _)) => {
582                                 assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
583                                 assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached");
584                         },
585                         Ok(_) => panic!("Expected error"),
586                 }
587         }
588
589         #[tokio::test]
590         async fn sync_from_equal_length_fork() {
591                 let main_chain = Blockchain::default().with_height(2);
592                 let mut fork_chain = main_chain.fork_at_height(1);
593
594                 let new_tip = fork_chain.tip();
595                 let old_tip = main_chain.tip();
596                 let chain_listener = &MockChainListener::new()
597                         .expect_block_disconnected(*old_tip)
598                         .expect_block_connected(*new_tip);
599                 let mut notifier = ChainNotifier {
600                         header_cache: &mut main_chain.header_cache(0..=2),
601                         chain_listener,
602                 };
603                 let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
604                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
605                         Err((e, _)) => panic!("Unexpected error: {:?}", e),
606                         Ok(_) => {},
607                 }
608         }
609
610         #[tokio::test]
611         async fn sync_from_shorter_fork() {
612                 let main_chain = Blockchain::default().with_height(3);
613                 let mut fork_chain = main_chain.fork_at_height(1);
614                 fork_chain.disconnect_tip();
615
616                 let new_tip = fork_chain.tip();
617                 let old_tip = main_chain.tip();
618                 let chain_listener = &MockChainListener::new()
619                         .expect_block_disconnected(*old_tip)
620                         .expect_block_disconnected(*main_chain.at_height(2))
621                         .expect_block_connected(*new_tip);
622                 let mut notifier = ChainNotifier {
623                         header_cache: &mut main_chain.header_cache(0..=3),
624                         chain_listener,
625                 };
626                 let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
627                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
628                         Err((e, _)) => panic!("Unexpected error: {:?}", e),
629                         Ok(_) => {},
630                 }
631         }
632
633         #[tokio::test]
634         async fn sync_from_longer_fork() {
635                 let mut main_chain = Blockchain::default().with_height(3);
636                 let mut fork_chain = main_chain.fork_at_height(1);
637                 main_chain.disconnect_tip();
638
639                 let new_tip = fork_chain.tip();
640                 let old_tip = main_chain.tip();
641                 let chain_listener = &MockChainListener::new()
642                         .expect_block_disconnected(*old_tip)
643                         .expect_block_connected(*fork_chain.at_height(2))
644                         .expect_block_connected(*new_tip);
645                 let mut notifier = ChainNotifier {
646                         header_cache: &mut main_chain.header_cache(0..=2),
647                         chain_listener,
648                 };
649                 let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
650                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
651                         Err((e, _)) => panic!("Unexpected error: {:?}", e),
652                         Ok(_) => {},
653                 }
654         }
655
656         #[tokio::test]
657         async fn sync_from_chain_without_headers() {
658                 let mut chain = Blockchain::default().with_height(3).without_headers();
659
660                 let new_tip = chain.tip();
661                 let old_tip = chain.at_height(1);
662                 let chain_listener = &MockChainListener::new();
663                 let mut notifier = ChainNotifier {
664                         header_cache: &mut chain.header_cache(0..=1),
665                         chain_listener,
666                 };
667                 let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
668                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
669                         Err((_, tip)) => assert_eq!(tip, None),
670                         Ok(_) => panic!("Expected error"),
671                 }
672         }
673
674         #[tokio::test]
675         async fn sync_from_chain_without_any_new_blocks() {
676                 let mut chain = Blockchain::default().with_height(3).without_blocks(2..);
677
678                 let new_tip = chain.tip();
679                 let old_tip = chain.at_height(1);
680                 let chain_listener = &MockChainListener::new();
681                 let mut notifier = ChainNotifier {
682                         header_cache: &mut chain.header_cache(0..=3),
683                         chain_listener,
684                 };
685                 let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
686                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
687                         Err((_, tip)) => assert_eq!(tip, Some(old_tip)),
688                         Ok(_) => panic!("Expected error"),
689                 }
690         }
691
692         #[tokio::test]
693         async fn sync_from_chain_without_some_new_blocks() {
694                 let mut chain = Blockchain::default().with_height(3).without_blocks(3..);
695
696                 let new_tip = chain.tip();
697                 let old_tip = chain.at_height(1);
698                 let chain_listener = &MockChainListener::new()
699                         .expect_block_connected(*chain.at_height(2));
700                 let mut notifier = ChainNotifier {
701                         header_cache: &mut chain.header_cache(0..=3),
702                         chain_listener,
703                 };
704                 let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
705                 match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
706                         Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))),
707                         Ok(_) => panic!("Expected error"),
708                 }
709         }
710 }