1 use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
2 use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
4 use bitcoin::blockdata::block::{Block, BlockHeader};
5 use bitcoin::hash_types::BlockHash;
6 use bitcoin::network::constants::Network;
8 /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
9 /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
11 /// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
12 /// failure, each listener may be left at a different block hash than the one it was originally
15 /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16 /// switching to [`SpvClient`].
18 /// [`SpvClient`]: ../struct.SpvClient.html
19 /// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
20 /// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html
21 pub async fn sync_listeners<B: BlockSource, C: Cache>(
25 mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>,
26 ) -> BlockSourceResult<ValidatedBlockHeader> {
27 let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
28 let new_header = block_source
29 .get_header(&best_block_hash, best_block_height).await?
30 .validate(best_block_hash)?;
32 // Fetch the header for the block hash paired with each listener.
33 let mut chain_listeners_with_old_headers = Vec::new();
34 for (old_block, chain_listener) in chain_listeners.drain(..) {
35 let old_header = match header_cache.look_up(&old_block) {
36 Some(header) => *header,
38 .get_header(&old_block, None).await?
41 chain_listeners_with_old_headers.push((old_header, chain_listener))
44 // Find differences and disconnect blocks for each listener individually.
45 let mut chain_poller = ChainPoller::new(block_source, network);
46 let mut chain_listeners_at_height = Vec::new();
47 let mut most_common_ancestor = None;
48 let mut most_connected_blocks = Vec::new();
49 for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
50 // Disconnect any stale blocks, but keep them in the cache for the next iteration.
51 let header_cache = &mut ReadOnlyCache(header_cache);
52 let mut chain_notifier = ChainNotifier { header_cache };
54 chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
55 chain_notifier.disconnect_blocks(
56 difference.disconnected_blocks,
57 &mut DynamicChainListener(chain_listener),
60 // Keep track of the most common ancestor and all blocks connected across all listeners.
61 chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener));
62 if difference.connected_blocks.len() > most_connected_blocks.len() {
63 most_common_ancestor = Some(difference.common_ancestor);
64 most_connected_blocks = difference.connected_blocks;
68 // Connect new blocks for all listeners at once to avoid re-fetching blocks.
69 if let Some(common_ancestor) = most_common_ancestor {
70 let mut chain_notifier = ChainNotifier { header_cache };
71 let mut chain_listener = ChainListenerSet(chain_listeners_at_height);
72 chain_notifier.connect_blocks(
74 most_connected_blocks,
77 ).await.or_else(|(e, _)| Err(e))?;
83 /// A wrapper to make a cache read-only.
85 /// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
87 struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
89 impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
90 fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
91 self.0.look_up(block_hash)
94 fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
98 fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
103 /// Wrapper for supporting dynamically sized chain listeners.
104 struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
106 impl<'a> ChainListener for DynamicChainListener<'a> {
107 fn block_connected(&mut self, _block: &Block, _height: u32) {
111 fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
112 self.0.block_disconnected(header, height)
116 /// A set of dynamically sized chain listeners, each paired with a starting block height.
117 struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
119 impl<'a> ChainListener for ChainListenerSet<'a> {
120 fn block_connected(&mut self, block: &Block, height: u32) {
121 for (starting_height, chain_listener) in self.0.iter_mut() {
122 if height > *starting_height {
123 chain_listener.block_connected(block, height);
128 fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
135 use crate::test_utils::{Blockchain, MockChainListener};
138 use bitcoin::network::constants::Network;
141 async fn sync_from_same_chain() {
142 let mut chain = Blockchain::default().with_height(4);
144 let mut listener_1 = MockChainListener::new()
145 .expect_block_connected(*chain.at_height(2))
146 .expect_block_connected(*chain.at_height(3))
147 .expect_block_connected(*chain.at_height(4));
148 let mut listener_2 = MockChainListener::new()
149 .expect_block_connected(*chain.at_height(3))
150 .expect_block_connected(*chain.at_height(4));
151 let mut listener_3 = MockChainListener::new()
152 .expect_block_connected(*chain.at_height(4));
154 let listeners = vec![
155 (chain.at_height(1).block_hash, &mut listener_1 as &mut dyn ChainListener),
156 (chain.at_height(2).block_hash, &mut listener_2 as &mut dyn ChainListener),
157 (chain.at_height(3).block_hash, &mut listener_3 as &mut dyn ChainListener),
159 let mut cache = chain.header_cache(0..=4);
160 match sync_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await {
161 Ok(header) => assert_eq!(header, chain.tip()),
162 Err(e) => panic!("Unexpected error: {:?}", e),
167 async fn sync_from_different_chains() {
168 let mut main_chain = Blockchain::default().with_height(4);
169 let fork_chain_1 = main_chain.fork_at_height(1);
170 let fork_chain_2 = main_chain.fork_at_height(2);
171 let fork_chain_3 = main_chain.fork_at_height(3);
173 let mut listener_1 = MockChainListener::new()
174 .expect_block_disconnected(*fork_chain_1.at_height(4))
175 .expect_block_disconnected(*fork_chain_1.at_height(3))
176 .expect_block_disconnected(*fork_chain_1.at_height(2))
177 .expect_block_connected(*main_chain.at_height(2))
178 .expect_block_connected(*main_chain.at_height(3))
179 .expect_block_connected(*main_chain.at_height(4));
180 let mut listener_2 = MockChainListener::new()
181 .expect_block_disconnected(*fork_chain_2.at_height(4))
182 .expect_block_disconnected(*fork_chain_2.at_height(3))
183 .expect_block_connected(*main_chain.at_height(3))
184 .expect_block_connected(*main_chain.at_height(4));
185 let mut listener_3 = MockChainListener::new()
186 .expect_block_disconnected(*fork_chain_3.at_height(4))
187 .expect_block_connected(*main_chain.at_height(4));
189 let listeners = vec![
190 (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn ChainListener),
191 (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn ChainListener),
192 (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn ChainListener),
194 let mut cache = fork_chain_1.header_cache(2..=4);
195 cache.extend(fork_chain_2.header_cache(3..=4));
196 cache.extend(fork_chain_3.header_cache(4..=4));
197 match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
198 Ok(header) => assert_eq!(header, main_chain.tip()),
199 Err(e) => panic!("Unexpected error: {:?}", e),
204 async fn sync_from_overlapping_chains() {
205 let mut main_chain = Blockchain::default().with_height(4);
206 let fork_chain_1 = main_chain.fork_at_height(1);
207 let fork_chain_2 = fork_chain_1.fork_at_height(2);
208 let fork_chain_3 = fork_chain_2.fork_at_height(3);
210 let mut listener_1 = MockChainListener::new()
211 .expect_block_disconnected(*fork_chain_1.at_height(4))
212 .expect_block_disconnected(*fork_chain_1.at_height(3))
213 .expect_block_disconnected(*fork_chain_1.at_height(2))
214 .expect_block_connected(*main_chain.at_height(2))
215 .expect_block_connected(*main_chain.at_height(3))
216 .expect_block_connected(*main_chain.at_height(4));
217 let mut listener_2 = MockChainListener::new()
218 .expect_block_disconnected(*fork_chain_2.at_height(4))
219 .expect_block_disconnected(*fork_chain_2.at_height(3))
220 .expect_block_disconnected(*fork_chain_2.at_height(2))
221 .expect_block_connected(*main_chain.at_height(2))
222 .expect_block_connected(*main_chain.at_height(3))
223 .expect_block_connected(*main_chain.at_height(4));
224 let mut listener_3 = MockChainListener::new()
225 .expect_block_disconnected(*fork_chain_3.at_height(4))
226 .expect_block_disconnected(*fork_chain_3.at_height(3))
227 .expect_block_disconnected(*fork_chain_3.at_height(2))
228 .expect_block_connected(*main_chain.at_height(2))
229 .expect_block_connected(*main_chain.at_height(3))
230 .expect_block_connected(*main_chain.at_height(4));
232 let listeners = vec![
233 (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn ChainListener),
234 (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn ChainListener),
235 (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn ChainListener),
237 let mut cache = fork_chain_1.header_cache(2..=4);
238 cache.extend(fork_chain_2.header_cache(3..=4));
239 cache.extend(fork_chain_3.header_cache(4..=4));
240 match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
241 Ok(header) => assert_eq!(header, main_chain.tip()),
242 Err(e) => panic!("Unexpected error: {:?}", e),
247 async fn cache_connected_and_keep_disconnected_blocks() {
248 let mut main_chain = Blockchain::default().with_height(2);
249 let fork_chain = main_chain.fork_at_height(1);
250 let new_tip = main_chain.tip();
251 let old_tip = fork_chain.tip();
253 let mut listener = MockChainListener::new()
254 .expect_block_disconnected(*old_tip)
255 .expect_block_connected(*new_tip);
257 let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn ChainListener)];
258 let mut cache = fork_chain.header_cache(2..=2);
259 match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
261 assert!(cache.contains_key(&new_tip.block_hash));
262 assert!(cache.contains_key(&old_tip.block_hash));
264 Err(e) => panic!("Unexpected error: {:?}", e),