Merge pull request #3129 from optout21/splicing-msgs-update
[rust-lightning] / fuzz / src / chanmon_consistency.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! Test that monitor update failures don't get our channel state out of sync.
11 //! One of the biggest concern with the monitor update failure handling code is that messages
12 //! resent after monitor updating is restored are delivered out-of-order, resulting in
13 //! commitment_signed messages having "invalid signatures".
14 //! To test this we stand up a network of three nodes and read bytes from the fuzz input to denote
15 //! actions such as sending payments, handling events, or changing monitor update return values on
16 //! a per-node basis. This should allow it to find any cases where the ordering of actions results
17 //! in us getting out of sync with ourselves, and, assuming at least one of our recieve- or
18 //! send-side handling is correct, other peers. We consider it a failure if any action results in a
19 //! channel being force-closed.
20
21 use bitcoin::amount::Amount;
22 use bitcoin::blockdata::constants::genesis_block;
23 use bitcoin::blockdata::locktime::absolute::LockTime;
24 use bitcoin::blockdata::opcodes;
25 use bitcoin::blockdata::script::{Builder, ScriptBuf};
26 use bitcoin::blockdata::transaction::{Transaction, TxOut};
27 use bitcoin::network::Network;
28 use bitcoin::transaction::Version;
29
30 use bitcoin::hash_types::BlockHash;
31 use bitcoin::hashes::sha256::Hash as Sha256;
32 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
33 use bitcoin::hashes::Hash as TraitImport;
34 use bitcoin::WPubkeyHash;
35
36 use lightning::blinded_path::payment::ReceiveTlvs;
37 use lightning::blinded_path::BlindedPath;
38 use lightning::chain;
39 use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
40 use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent};
41 use lightning::chain::transaction::OutPoint;
42 use lightning::chain::{
43         chainmonitor, channelmonitor, BestBlock, ChannelMonitorUpdateStatus, Confirm, Watch,
44 };
45 use lightning::events;
46 use lightning::events::MessageSendEventsProvider;
47 use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
48 use lightning::ln::channel_state::ChannelDetails;
49 use lightning::ln::channelmanager::{
50         ChainParameters, ChannelManager, ChannelManagerReadArgs, PaymentId, PaymentSendFailure,
51         RecipientOnionFields,
52 };
53 use lightning::ln::functional_test_utils::*;
54 use lightning::ln::msgs::{
55         self, ChannelMessageHandler, CommitmentUpdate, DecodeError, Init, UpdateAddHTLC,
56 };
57 use lightning::ln::script::ShutdownScript;
58 use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret};
59 use lightning::offers::invoice::{BlindedPayInfo, UnsignedBolt12Invoice};
60 use lightning::offers::invoice_request::UnsignedInvoiceRequest;
61 use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath};
62 use lightning::routing::router::{InFlightHtlcs, Path, Route, RouteHop, RouteParameters, Router};
63 use lightning::sign::{
64         EntropySource, InMemorySigner, KeyMaterial, NodeSigner, Recipient, SignerProvider,
65 };
66 use lightning::util::config::UserConfig;
67 use lightning::util::errors::APIError;
68 use lightning::util::hash_tables::*;
69 use lightning::util::logger::Logger;
70 use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
71 use lightning::util::test_channel_signer::{EnforcementState, TestChannelSigner};
72
73 use crate::utils::test_logger::{self, Output};
74 use crate::utils::test_persister::TestPersister;
75
76 use bitcoin::secp256k1::ecdh::SharedSecret;
77 use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
78 use bitcoin::secp256k1::schnorr;
79 use bitcoin::secp256k1::{self, Message, PublicKey, Scalar, Secp256k1, SecretKey};
80
81 use bech32::u5;
82 use std::cmp::{self, Ordering};
83 use std::io::Cursor;
84 use std::mem;
85 use std::sync::atomic;
86 use std::sync::{Arc, Mutex};
87
88 const MAX_FEE: u32 = 10_000;
89 struct FuzzEstimator {
90         ret_val: atomic::AtomicU32,
91 }
92 impl FeeEstimator for FuzzEstimator {
93         fn get_est_sat_per_1000_weight(&self, conf_target: ConfirmationTarget) -> u32 {
94                 // We force-close channels if our counterparty sends us a feerate which is a small multiple
95                 // of our HighPriority fee estimate or smaller than our Background fee estimate. Thus, we
96                 // always return a HighPriority feerate here which is >= the maximum Normal feerate and a
97                 // Background feerate which is <= the minimum Normal feerate.
98                 match conf_target {
99                         ConfirmationTarget::OnChainSweep => MAX_FEE,
100                         ConfirmationTarget::ChannelCloseMinimum
101                         | ConfirmationTarget::AnchorChannelFee
102                         | ConfirmationTarget::MinAllowedAnchorChannelRemoteFee
103                         | ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee
104                         | ConfirmationTarget::OutputSpendingFee => 253,
105                         ConfirmationTarget::NonAnchorChannelFee => {
106                                 cmp::min(self.ret_val.load(atomic::Ordering::Acquire), MAX_FEE)
107                         },
108                 }
109         }
110 }
111
112 struct FuzzRouter {}
113
114 impl Router for FuzzRouter {
115         fn find_route(
116                 &self, _payer: &PublicKey, _params: &RouteParameters,
117                 _first_hops: Option<&[&ChannelDetails]>, _inflight_htlcs: InFlightHtlcs,
118         ) -> Result<Route, msgs::LightningError> {
119                 Err(msgs::LightningError {
120                         err: String::from("Not implemented"),
121                         action: msgs::ErrorAction::IgnoreError,
122                 })
123         }
124
125         fn create_blinded_payment_paths<T: secp256k1::Signing + secp256k1::Verification>(
126                 &self, _recipient: PublicKey, _first_hops: Vec<ChannelDetails>, _tlvs: ReceiveTlvs,
127                 _amount_msats: u64, _secp_ctx: &Secp256k1<T>,
128         ) -> Result<Vec<(BlindedPayInfo, BlindedPath)>, ()> {
129                 unreachable!()
130         }
131 }
132
133 impl MessageRouter for FuzzRouter {
134         fn find_path(
135                 &self, _sender: PublicKey, _peers: Vec<PublicKey>, _destination: Destination,
136         ) -> Result<OnionMessagePath, ()> {
137                 unreachable!()
138         }
139
140         fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
141                 &self, _recipient: PublicKey, _peers: Vec<PublicKey>, _secp_ctx: &Secp256k1<T>,
142         ) -> Result<Vec<BlindedPath>, ()> {
143                 unreachable!()
144         }
145 }
146
147 pub struct TestBroadcaster {}
148 impl BroadcasterInterface for TestBroadcaster {
149         fn broadcast_transactions(&self, _txs: &[&Transaction]) {}
150 }
151
152 pub struct VecWriter(pub Vec<u8>);
153 impl Writer for VecWriter {
154         fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
155                 self.0.extend_from_slice(buf);
156                 Ok(())
157         }
158 }
159
160 /// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
161 /// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
162 /// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
163 /// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
164 ///
165 /// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
166 /// simply be replayed on startup.
167 struct LatestMonitorState {
168         /// The latest monitor id which we told LDK we've persisted
169         persisted_monitor_id: u64,
170         /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
171         persisted_monitor: Vec<u8>,
172         /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
173         /// from LDK's perspective.
174         pending_monitors: Vec<(u64, Vec<u8>)>,
175 }
176
177 struct TestChainMonitor {
178         pub logger: Arc<dyn Logger>,
179         pub keys: Arc<KeyProvider>,
180         pub persister: Arc<TestPersister>,
181         pub chain_monitor: Arc<
182                 chainmonitor::ChainMonitor<
183                         TestChannelSigner,
184                         Arc<dyn chain::Filter>,
185                         Arc<TestBroadcaster>,
186                         Arc<FuzzEstimator>,
187                         Arc<dyn Logger>,
188                         Arc<TestPersister>,
189                 >,
190         >,
191         pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
192 }
193 impl TestChainMonitor {
194         pub fn new(
195                 broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
196                 persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
197         ) -> Self {
198                 Self {
199                         chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
200                                 None,
201                                 broadcaster,
202                                 logger.clone(),
203                                 feeest,
204                                 Arc::clone(&persister),
205                         )),
206                         logger,
207                         keys,
208                         persister,
209                         latest_monitors: Mutex::new(new_hash_map()),
210                 }
211         }
212 }
213 impl chain::Watch<TestChannelSigner> for TestChainMonitor {
214         fn watch_channel(
215                 &self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
216         ) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
217                 let mut ser = VecWriter(Vec::new());
218                 monitor.write(&mut ser).unwrap();
219                 let monitor_id = monitor.get_latest_update_id();
220                 let res = self.chain_monitor.watch_channel(funding_txo, monitor);
221                 let state = match res {
222                         Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
223                                 persisted_monitor_id: monitor_id,
224                                 persisted_monitor: ser.0,
225                                 pending_monitors: Vec::new(),
226                         },
227                         Ok(chain::ChannelMonitorUpdateStatus::InProgress) => {
228                                 panic!("The test currently doesn't test initial-persistence via the async pipeline")
229                         },
230                         Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
231                         Err(()) => panic!(),
232                 };
233                 if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
234                         panic!("Already had monitor pre-watch_channel");
235                 }
236                 res
237         }
238
239         fn update_channel(
240                 &self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate,
241         ) -> chain::ChannelMonitorUpdateStatus {
242                 let mut map_lock = self.latest_monitors.lock().unwrap();
243                 let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
244                 let latest_monitor_data = map_entry
245                         .pending_monitors
246                         .last()
247                         .as_ref()
248                         .map(|(_, data)| data)
249                         .unwrap_or(&map_entry.persisted_monitor);
250                 let deserialized_monitor =
251                         <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
252                                 &mut Cursor::new(&latest_monitor_data),
253                                 (&*self.keys, &*self.keys),
254                         )
255                         .unwrap()
256                         .1;
257                 deserialized_monitor
258                         .update_monitor(
259                                 update,
260                                 &&TestBroadcaster {},
261                                 &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) },
262                                 &self.logger,
263                         )
264                         .unwrap();
265                 let mut ser = VecWriter(Vec::new());
266                 deserialized_monitor.write(&mut ser).unwrap();
267                 let res = self.chain_monitor.update_channel(funding_txo, update);
268                 match res {
269                         chain::ChannelMonitorUpdateStatus::Completed => {
270                                 map_entry.persisted_monitor_id = update.update_id;
271                                 map_entry.persisted_monitor = ser.0;
272                         },
273                         chain::ChannelMonitorUpdateStatus::InProgress => {
274                                 map_entry.pending_monitors.push((update.update_id, ser.0));
275                         },
276                         chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
277                 }
278                 res
279         }
280
281         fn release_pending_monitor_events(
282                 &self,
283         ) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
284                 return self.chain_monitor.release_pending_monitor_events();
285         }
286 }
287
288 struct KeyProvider {
289         node_secret: SecretKey,
290         rand_bytes_id: atomic::AtomicU32,
291         enforcement_states: Mutex<HashMap<[u8; 32], Arc<Mutex<EnforcementState>>>>,
292 }
293
294 impl EntropySource for KeyProvider {
295         fn get_secure_random_bytes(&self) -> [u8; 32] {
296                 let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed);
297                 #[rustfmt::skip]
298                 let mut res = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, self.node_secret[31]];
299                 res[30 - 4..30].copy_from_slice(&id.to_le_bytes());
300                 res
301         }
302 }
303
304 impl NodeSigner for KeyProvider {
305         fn get_node_id(&self, recipient: Recipient) -> Result<PublicKey, ()> {
306                 let node_secret = match recipient {
307                         Recipient::Node => Ok(&self.node_secret),
308                         Recipient::PhantomNode => Err(()),
309                 }?;
310                 Ok(PublicKey::from_secret_key(&Secp256k1::signing_only(), node_secret))
311         }
312
313         fn ecdh(
314                 &self, recipient: Recipient, other_key: &PublicKey, tweak: Option<&Scalar>,
315         ) -> Result<SharedSecret, ()> {
316                 let mut node_secret = match recipient {
317                         Recipient::Node => Ok(self.node_secret.clone()),
318                         Recipient::PhantomNode => Err(()),
319                 }?;
320                 if let Some(tweak) = tweak {
321                         node_secret = node_secret.mul_tweak(tweak).map_err(|_| ())?;
322                 }
323                 Ok(SharedSecret::new(other_key, &node_secret))
324         }
325
326         fn get_inbound_payment_key_material(&self) -> KeyMaterial {
327                 #[rustfmt::skip]
328                 let random_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, self.node_secret[31]];
329                 KeyMaterial(random_bytes)
330         }
331
332         fn sign_invoice(
333                 &self, _hrp_bytes: &[u8], _invoice_data: &[u5], _recipient: Recipient,
334         ) -> Result<RecoverableSignature, ()> {
335                 unreachable!()
336         }
337
338         fn sign_bolt12_invoice_request(
339                 &self, _invoice_request: &UnsignedInvoiceRequest,
340         ) -> Result<schnorr::Signature, ()> {
341                 unreachable!()
342         }
343
344         fn sign_bolt12_invoice(
345                 &self, _invoice: &UnsignedBolt12Invoice,
346         ) -> Result<schnorr::Signature, ()> {
347                 unreachable!()
348         }
349
350         fn sign_gossip_message(
351                 &self, msg: lightning::ln::msgs::UnsignedGossipMessage,
352         ) -> Result<Signature, ()> {
353                 let msg_hash = Message::from_digest(Sha256dHash::hash(&msg.encode()[..]).to_byte_array());
354                 let secp_ctx = Secp256k1::signing_only();
355                 Ok(secp_ctx.sign_ecdsa(&msg_hash, &self.node_secret))
356         }
357 }
358
359 impl SignerProvider for KeyProvider {
360         type EcdsaSigner = TestChannelSigner;
361         #[cfg(taproot)]
362         type TaprootSigner = TestChannelSigner;
363
364         fn generate_channel_keys_id(
365                 &self, _inbound: bool, _channel_value_satoshis: u64, _user_channel_id: u128,
366         ) -> [u8; 32] {
367                 let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed) as u8;
368                 [id; 32]
369         }
370
371         fn derive_channel_signer(
372                 &self, channel_value_satoshis: u64, channel_keys_id: [u8; 32],
373         ) -> Self::EcdsaSigner {
374                 let secp_ctx = Secp256k1::signing_only();
375                 let id = channel_keys_id[0];
376                 #[rustfmt::skip]
377                 let keys = InMemorySigner::new(
378                         &secp_ctx,
379                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, self.node_secret[31]]).unwrap(),
380                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, self.node_secret[31]]).unwrap(),
381                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, self.node_secret[31]]).unwrap(),
382                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, self.node_secret[31]]).unwrap(),
383                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, self.node_secret[31]]).unwrap(),
384                         [id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, self.node_secret[31]],
385                         channel_value_satoshis,
386                         channel_keys_id,
387                         channel_keys_id,
388                 );
389                 let revoked_commitment = self.make_enforcement_state_cell(keys.commitment_seed);
390                 TestChannelSigner::new_with_revoked(keys, revoked_commitment, false)
391         }
392
393         fn read_chan_signer(&self, buffer: &[u8]) -> Result<Self::EcdsaSigner, DecodeError> {
394                 let mut reader = std::io::Cursor::new(buffer);
395
396                 let inner: InMemorySigner = ReadableArgs::read(&mut reader, self)?;
397                 let state = self.make_enforcement_state_cell(inner.commitment_seed);
398
399                 Ok(TestChannelSigner::new_with_revoked(inner, state, false))
400         }
401
402         fn get_destination_script(&self, _channel_keys_id: [u8; 32]) -> Result<ScriptBuf, ()> {
403                 let secp_ctx = Secp256k1::signing_only();
404                 #[rustfmt::skip]
405                 let channel_monitor_claim_key = SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, self.node_secret[31]]).unwrap();
406                 let our_channel_monitor_claim_key_hash = WPubkeyHash::hash(
407                         &PublicKey::from_secret_key(&secp_ctx, &channel_monitor_claim_key).serialize(),
408                 );
409                 Ok(Builder::new()
410                         .push_opcode(opcodes::all::OP_PUSHBYTES_0)
411                         .push_slice(our_channel_monitor_claim_key_hash)
412                         .into_script())
413         }
414
415         fn get_shutdown_scriptpubkey(&self) -> Result<ShutdownScript, ()> {
416                 let secp_ctx = Secp256k1::signing_only();
417                 #[rustfmt::skip]
418                 let secret_key = SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, self.node_secret[31]]).unwrap();
419                 let pubkey_hash =
420                         WPubkeyHash::hash(&PublicKey::from_secret_key(&secp_ctx, &secret_key).serialize());
421                 Ok(ShutdownScript::new_p2wpkh(&pubkey_hash))
422         }
423 }
424
425 impl KeyProvider {
426         fn make_enforcement_state_cell(
427                 &self, commitment_seed: [u8; 32],
428         ) -> Arc<Mutex<EnforcementState>> {
429                 let mut revoked_commitments = self.enforcement_states.lock().unwrap();
430                 if !revoked_commitments.contains_key(&commitment_seed) {
431                         revoked_commitments
432                                 .insert(commitment_seed, Arc::new(Mutex::new(EnforcementState::new())));
433                 }
434                 let cell = revoked_commitments.get(&commitment_seed).unwrap();
435                 Arc::clone(cell)
436         }
437 }
438
439 #[inline]
440 fn check_api_err(api_err: APIError, sendable_bounds_violated: bool) {
441         match api_err {
442                 APIError::APIMisuseError { .. } => panic!("We can't misuse the API"),
443                 APIError::FeeRateTooHigh { .. } => panic!("We can't send too much fee?"),
444                 APIError::InvalidRoute { .. } => panic!("Our routes should work"),
445                 APIError::ChannelUnavailable { err } => {
446                         // Test the error against a list of errors we can hit, and reject
447                         // all others. If you hit this panic, the list of acceptable errors
448                         // is probably just stale and you should add new messages here.
449                         match err.as_str() {
450                                 "Peer for first hop currently disconnected" => {},
451                                 _ if err.starts_with("Cannot send less than our next-HTLC minimum - ") => {},
452                                 _ if err.starts_with("Cannot send more than our next-HTLC maximum - ") => {},
453                                 _ => panic!("{}", err),
454                         }
455                         assert!(sendable_bounds_violated);
456                 },
457                 APIError::MonitorUpdateInProgress => {
458                         // We can (obviously) temp-fail a monitor update
459                 },
460                 APIError::IncompatibleShutdownScript { .. } => {
461                         panic!("Cannot send an incompatible shutdown script")
462                 },
463         }
464 }
465 #[inline]
466 fn check_payment_err(send_err: PaymentSendFailure, sendable_bounds_violated: bool) {
467         match send_err {
468                 PaymentSendFailure::ParameterError(api_err) => {
469                         check_api_err(api_err, sendable_bounds_violated)
470                 },
471                 PaymentSendFailure::PathParameterError(per_path_results) => {
472                         for res in per_path_results {
473                                 if let Err(api_err) = res {
474                                         check_api_err(api_err, sendable_bounds_violated);
475                                 }
476                         }
477                 },
478                 PaymentSendFailure::AllFailedResendSafe(per_path_results) => {
479                         for api_err in per_path_results {
480                                 check_api_err(api_err, sendable_bounds_violated);
481                         }
482                 },
483                 PaymentSendFailure::PartialFailure { results, .. } => {
484                         for res in results {
485                                 if let Err(api_err) = res {
486                                         check_api_err(api_err, sendable_bounds_violated);
487                                 }
488                         }
489                 },
490                 PaymentSendFailure::DuplicatePayment => panic!(),
491         }
492 }
493
494 type ChanMan<'a> = ChannelManager<
495         Arc<TestChainMonitor>,
496         Arc<TestBroadcaster>,
497         Arc<KeyProvider>,
498         Arc<KeyProvider>,
499         Arc<KeyProvider>,
500         Arc<FuzzEstimator>,
501         &'a FuzzRouter,
502         Arc<dyn Logger>,
503 >;
504
505 #[inline]
506 fn get_payment_secret_hash(
507         dest: &ChanMan, payment_id: &mut u8,
508 ) -> Option<(PaymentSecret, PaymentHash)> {
509         let mut payment_hash;
510         for _ in 0..256 {
511                 payment_hash = PaymentHash(Sha256::hash(&[*payment_id; 1]).to_byte_array());
512                 if let Ok(payment_secret) =
513                         dest.create_inbound_payment_for_hash(payment_hash, None, 3600, None)
514                 {
515                         return Some((payment_secret, payment_hash));
516                 }
517                 *payment_id = payment_id.wrapping_add(1);
518         }
519         None
520 }
521
522 #[inline]
523 fn send_noret(
524         source: &ChanMan, dest: &ChanMan, dest_chan_id: u64, amt: u64, payment_id: &mut u8,
525         payment_idx: &mut u64,
526 ) {
527         send_payment(source, dest, dest_chan_id, amt, payment_id, payment_idx);
528 }
529
530 #[inline]
531 fn send_payment(
532         source: &ChanMan, dest: &ChanMan, dest_chan_id: u64, amt: u64, payment_id: &mut u8,
533         payment_idx: &mut u64,
534 ) -> bool {
535         let (payment_secret, payment_hash) =
536                 if let Some((secret, hash)) = get_payment_secret_hash(dest, payment_id) {
537                         (secret, hash)
538                 } else {
539                         return true;
540                 };
541         let mut payment_id = [0; 32];
542         payment_id[0..8].copy_from_slice(&payment_idx.to_ne_bytes());
543         *payment_idx += 1;
544         let (min_value_sendable, max_value_sendable) = source
545                 .list_usable_channels()
546                 .iter()
547                 .find(|chan| chan.short_channel_id == Some(dest_chan_id))
548                 .map(|chan| (chan.next_outbound_htlc_minimum_msat, chan.next_outbound_htlc_limit_msat))
549                 .unwrap_or((0, 0));
550         if let Err(err) = source.send_payment_with_route(
551                 &Route {
552                         paths: vec![Path {
553                                 hops: vec![RouteHop {
554                                         pubkey: dest.get_our_node_id(),
555                                         node_features: dest.node_features(),
556                                         short_channel_id: dest_chan_id,
557                                         channel_features: dest.channel_features(),
558                                         fee_msat: amt,
559                                         cltv_expiry_delta: 200,
560                                         maybe_announced_channel: true,
561                                 }],
562                                 blinded_tail: None,
563                         }],
564                         route_params: None,
565                 },
566                 payment_hash,
567                 RecipientOnionFields::secret_only(payment_secret),
568                 PaymentId(payment_id),
569         ) {
570                 check_payment_err(err, amt > max_value_sendable || amt < min_value_sendable);
571                 false
572         } else {
573                 // Note that while the max is a strict upper-bound, we can occasionally send substantially
574                 // below the minimum, with some gap which is unusable immediately below the minimum. Thus,
575                 // we don't check against min_value_sendable here.
576                 assert!(amt <= max_value_sendable);
577                 true
578         }
579 }
580
581 #[inline]
582 fn send_hop_noret(
583         source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, dest: &ChanMan, dest_chan_id: u64,
584         amt: u64, payment_id: &mut u8, payment_idx: &mut u64,
585 ) {
586         send_hop_payment(
587                 source,
588                 middle,
589                 middle_chan_id,
590                 dest,
591                 dest_chan_id,
592                 amt,
593                 payment_id,
594                 payment_idx,
595         );
596 }
597
598 #[inline]
599 fn send_hop_payment(
600         source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, dest: &ChanMan, dest_chan_id: u64,
601         amt: u64, payment_id: &mut u8, payment_idx: &mut u64,
602 ) -> bool {
603         let (payment_secret, payment_hash) =
604                 if let Some((secret, hash)) = get_payment_secret_hash(dest, payment_id) {
605                         (secret, hash)
606                 } else {
607                         return true;
608                 };
609         let mut payment_id = [0; 32];
610         payment_id[0..8].copy_from_slice(&payment_idx.to_ne_bytes());
611         *payment_idx += 1;
612         let (min_value_sendable, max_value_sendable) = source
613                 .list_usable_channels()
614                 .iter()
615                 .find(|chan| chan.short_channel_id == Some(middle_chan_id))
616                 .map(|chan| (chan.next_outbound_htlc_minimum_msat, chan.next_outbound_htlc_limit_msat))
617                 .unwrap_or((0, 0));
618         let first_hop_fee = 50_000;
619         if let Err(err) = source.send_payment_with_route(
620                 &Route {
621                         paths: vec![Path {
622                                 hops: vec![
623                                         RouteHop {
624                                                 pubkey: middle.get_our_node_id(),
625                                                 node_features: middle.node_features(),
626                                                 short_channel_id: middle_chan_id,
627                                                 channel_features: middle.channel_features(),
628                                                 fee_msat: first_hop_fee,
629                                                 cltv_expiry_delta: 100,
630                                                 maybe_announced_channel: true,
631                                         },
632                                         RouteHop {
633                                                 pubkey: dest.get_our_node_id(),
634                                                 node_features: dest.node_features(),
635                                                 short_channel_id: dest_chan_id,
636                                                 channel_features: dest.channel_features(),
637                                                 fee_msat: amt,
638                                                 cltv_expiry_delta: 200,
639                                                 maybe_announced_channel: true,
640                                         },
641                                 ],
642                                 blinded_tail: None,
643                         }],
644                         route_params: None,
645                 },
646                 payment_hash,
647                 RecipientOnionFields::secret_only(payment_secret),
648                 PaymentId(payment_id),
649         ) {
650                 let sent_amt = amt + first_hop_fee;
651                 check_payment_err(err, sent_amt < min_value_sendable || sent_amt > max_value_sendable);
652                 false
653         } else {
654                 // Note that while the max is a strict upper-bound, we can occasionally send substantially
655                 // below the minimum, with some gap which is unusable immediately below the minimum. Thus,
656                 // we don't check against min_value_sendable here.
657                 assert!(amt + first_hop_fee <= max_value_sendable);
658                 true
659         }
660 }
661
662 #[inline]
663 pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
664         let out = SearchingOutput::new(underlying_out);
665         let broadcast = Arc::new(TestBroadcaster {});
666         let router = FuzzRouter {};
667
668         macro_rules! make_node {
669                 ($node_id: expr, $fee_estimator: expr) => {{
670                         let logger: Arc<dyn Logger> =
671                                 Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
672                         let node_secret = SecretKey::from_slice(&[
673                                 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
674                                 0, 0, 1, $node_id,
675                         ])
676                         .unwrap();
677                         let keys_manager = Arc::new(KeyProvider {
678                                 node_secret,
679                                 rand_bytes_id: atomic::AtomicU32::new(0),
680                                 enforcement_states: Mutex::new(new_hash_map()),
681                         });
682                         let monitor = Arc::new(TestChainMonitor::new(
683                                 broadcast.clone(),
684                                 logger.clone(),
685                                 $fee_estimator.clone(),
686                                 Arc::new(TestPersister {
687                                         update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
688                                 }),
689                                 Arc::clone(&keys_manager),
690                         ));
691
692                         let mut config = UserConfig::default();
693                         config.channel_config.forwarding_fee_proportional_millionths = 0;
694                         config.channel_handshake_config.announced_channel = true;
695                         if anchors {
696                                 config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
697                                 config.manually_accept_inbound_channels = true;
698                         }
699                         let network = Network::Bitcoin;
700                         let best_block_timestamp = genesis_block(network).header.time;
701                         let params = ChainParameters { network, best_block: BestBlock::from_network(network) };
702                         (
703                                 ChannelManager::new(
704                                         $fee_estimator.clone(),
705                                         monitor.clone(),
706                                         broadcast.clone(),
707                                         &router,
708                                         Arc::clone(&logger),
709                                         keys_manager.clone(),
710                                         keys_manager.clone(),
711                                         keys_manager.clone(),
712                                         config,
713                                         params,
714                                         best_block_timestamp,
715                                 ),
716                                 monitor,
717                                 keys_manager,
718                         )
719                 }};
720         }
721
722         macro_rules! reload_node {
723                 ($ser: expr, $node_id: expr, $old_monitors: expr, $keys_manager: expr, $fee_estimator: expr) => {{
724                         let keys_manager = Arc::clone(&$keys_manager);
725                         let logger: Arc<dyn Logger> =
726                                 Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
727                         let chain_monitor = Arc::new(TestChainMonitor::new(
728                                 broadcast.clone(),
729                                 logger.clone(),
730                                 $fee_estimator.clone(),
731                                 Arc::new(TestPersister {
732                                         update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
733                                 }),
734                                 Arc::clone(&$keys_manager),
735                         ));
736
737                         let mut config = UserConfig::default();
738                         config.channel_config.forwarding_fee_proportional_millionths = 0;
739                         config.channel_handshake_config.announced_channel = true;
740                         if anchors {
741                                 config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
742                                 config.manually_accept_inbound_channels = true;
743                         }
744
745                         let mut monitors = new_hash_map();
746                         let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
747                         for (outpoint, mut prev_state) in old_monitors.drain() {
748                                 monitors.insert(
749                                         outpoint,
750                                         <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
751                                                 &mut Cursor::new(&prev_state.persisted_monitor),
752                                                 (&*$keys_manager, &*$keys_manager),
753                                         )
754                                         .expect("Failed to read monitor")
755                                         .1,
756                                 );
757                                 // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
758                                 // considering them discarded. LDK should replay these for us as they're stored in
759                                 // the `ChannelManager`.
760                                 prev_state.pending_monitors.clear();
761                                 chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
762                         }
763                         let mut monitor_refs = new_hash_map();
764                         for (outpoint, monitor) in monitors.iter_mut() {
765                                 monitor_refs.insert(*outpoint, monitor);
766                         }
767
768                         let read_args = ChannelManagerReadArgs {
769                                 entropy_source: keys_manager.clone(),
770                                 node_signer: keys_manager.clone(),
771                                 signer_provider: keys_manager.clone(),
772                                 fee_estimator: $fee_estimator.clone(),
773                                 chain_monitor: chain_monitor.clone(),
774                                 tx_broadcaster: broadcast.clone(),
775                                 router: &router,
776                                 logger,
777                                 default_config: config,
778                                 channel_monitors: monitor_refs,
779                         };
780
781                         let res = (
782                                 <(BlockHash, ChanMan)>::read(&mut Cursor::new(&$ser.0), read_args)
783                                         .expect("Failed to read manager")
784                                         .1,
785                                 chain_monitor.clone(),
786                         );
787                         for (funding_txo, mon) in monitors.drain() {
788                                 assert_eq!(
789                                         chain_monitor.chain_monitor.watch_channel(funding_txo, mon),
790                                         Ok(ChannelMonitorUpdateStatus::Completed)
791                                 );
792                         }
793                         res
794                 }};
795         }
796
797         let mut channel_txn = Vec::new();
798         macro_rules! make_channel {
799                 ($source: expr, $dest: expr, $dest_keys_manager: expr, $chan_id: expr) => {{
800                         let init_dest = Init {
801                                 features: $dest.init_features(),
802                                 networks: None,
803                                 remote_network_address: None,
804                         };
805                         $source.peer_connected(&$dest.get_our_node_id(), &init_dest, true).unwrap();
806                         let init_src = Init {
807                                 features: $source.init_features(),
808                                 networks: None,
809                                 remote_network_address: None,
810                         };
811                         $dest.peer_connected(&$source.get_our_node_id(), &init_src, false).unwrap();
812
813                         $source.create_channel($dest.get_our_node_id(), 100_000, 42, 0, None, None).unwrap();
814                         let open_channel = {
815                                 let events = $source.get_and_clear_pending_msg_events();
816                                 assert_eq!(events.len(), 1);
817                                 if let events::MessageSendEvent::SendOpenChannel { ref msg, .. } = events[0] {
818                                         msg.clone()
819                                 } else {
820                                         panic!("Wrong event type");
821                                 }
822                         };
823
824                         $dest.handle_open_channel(&$source.get_our_node_id(), &open_channel);
825                         let accept_channel = {
826                                 if anchors {
827                                         let events = $dest.get_and_clear_pending_events();
828                                         assert_eq!(events.len(), 1);
829                                         if let events::Event::OpenChannelRequest {
830                                                 ref temporary_channel_id,
831                                                 ref counterparty_node_id,
832                                                 ..
833                                         } = events[0]
834                                         {
835                                                 let mut random_bytes = [0u8; 16];
836                                                 random_bytes
837                                                         .copy_from_slice(&$dest_keys_manager.get_secure_random_bytes()[..16]);
838                                                 let user_channel_id = u128::from_be_bytes(random_bytes);
839                                                 $dest
840                                                         .accept_inbound_channel(
841                                                                 temporary_channel_id,
842                                                                 counterparty_node_id,
843                                                                 user_channel_id,
844                                                         )
845                                                         .unwrap();
846                                         } else {
847                                                 panic!("Wrong event type");
848                                         }
849                                 }
850                                 let events = $dest.get_and_clear_pending_msg_events();
851                                 assert_eq!(events.len(), 1);
852                                 if let events::MessageSendEvent::SendAcceptChannel { ref msg, .. } = events[0] {
853                                         msg.clone()
854                                 } else {
855                                         panic!("Wrong event type");
856                                 }
857                         };
858
859                         $source.handle_accept_channel(&$dest.get_our_node_id(), &accept_channel);
860                         let funding_output;
861                         {
862                                 let events = $source.get_and_clear_pending_events();
863                                 assert_eq!(events.len(), 1);
864                                 if let events::Event::FundingGenerationReady {
865                                         ref temporary_channel_id,
866                                         ref channel_value_satoshis,
867                                         ref output_script,
868                                         ..
869                                 } = events[0]
870                                 {
871                                         let tx = Transaction {
872                                                 version: Version($chan_id),
873                                                 lock_time: LockTime::ZERO,
874                                                 input: Vec::new(),
875                                                 output: vec![TxOut {
876                                                         value: Amount::from_sat(*channel_value_satoshis),
877                                                         script_pubkey: output_script.clone(),
878                                                 }],
879                                         };
880                                         funding_output = OutPoint { txid: tx.txid(), index: 0 };
881                                         $source
882                                                 .funding_transaction_generated(
883                                                         &temporary_channel_id,
884                                                         &$dest.get_our_node_id(),
885                                                         tx.clone(),
886                                                 )
887                                                 .unwrap();
888                                         channel_txn.push(tx);
889                                 } else {
890                                         panic!("Wrong event type");
891                                 }
892                         }
893
894                         let funding_created = {
895                                 let events = $source.get_and_clear_pending_msg_events();
896                                 assert_eq!(events.len(), 1);
897                                 if let events::MessageSendEvent::SendFundingCreated { ref msg, .. } = events[0] {
898                                         msg.clone()
899                                 } else {
900                                         panic!("Wrong event type");
901                                 }
902                         };
903                         $dest.handle_funding_created(&$source.get_our_node_id(), &funding_created);
904
905                         let funding_signed = {
906                                 let events = $dest.get_and_clear_pending_msg_events();
907                                 assert_eq!(events.len(), 1);
908                                 if let events::MessageSendEvent::SendFundingSigned { ref msg, .. } = events[0] {
909                                         msg.clone()
910                                 } else {
911                                         panic!("Wrong event type");
912                                 }
913                         };
914                         let events = $dest.get_and_clear_pending_events();
915                         assert_eq!(events.len(), 1);
916                         if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
917                                 assert_eq!(counterparty_node_id, &$source.get_our_node_id());
918                         } else {
919                                 panic!("Wrong event type");
920                         }
921
922                         $source.handle_funding_signed(&$dest.get_our_node_id(), &funding_signed);
923                         let events = $source.get_and_clear_pending_events();
924                         assert_eq!(events.len(), 1);
925                         if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
926                                 assert_eq!(counterparty_node_id, &$dest.get_our_node_id());
927                         } else {
928                                 panic!("Wrong event type");
929                         }
930
931                         funding_output
932                 }};
933         }
934
935         macro_rules! confirm_txn {
936                 ($node: expr) => {{
937                         let chain_hash = genesis_block(Network::Bitcoin).block_hash();
938                         let mut header = create_dummy_header(chain_hash, 42);
939                         let txdata: Vec<_> =
940                                 channel_txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect();
941                         $node.transactions_confirmed(&header, &txdata, 1);
942                         for _ in 2..100 {
943                                 header = create_dummy_header(header.block_hash(), 42);
944                         }
945                         $node.best_block_updated(&header, 99);
946                 }};
947         }
948
949         macro_rules! lock_fundings {
950                 ($nodes: expr) => {{
951                         let mut node_events = Vec::new();
952                         for node in $nodes.iter() {
953                                 node_events.push(node.get_and_clear_pending_msg_events());
954                         }
955                         for (idx, node_event) in node_events.iter().enumerate() {
956                                 for event in node_event {
957                                         if let events::MessageSendEvent::SendChannelReady { ref node_id, ref msg } =
958                                                 event
959                                         {
960                                                 for node in $nodes.iter() {
961                                                         if node.get_our_node_id() == *node_id {
962                                                                 node.handle_channel_ready(&$nodes[idx].get_our_node_id(), msg);
963                                                         }
964                                                 }
965                                         } else {
966                                                 panic!("Wrong event type");
967                                         }
968                                 }
969                         }
970
971                         for node in $nodes.iter() {
972                                 let events = node.get_and_clear_pending_msg_events();
973                                 for event in events {
974                                         if let events::MessageSendEvent::SendAnnouncementSignatures { .. } = event {
975                                         } else {
976                                                 panic!("Wrong event type");
977                                         }
978                                 }
979                         }
980                 }};
981         }
982
983         let fee_est_a = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
984         let mut last_htlc_clear_fee_a = 253;
985         let fee_est_b = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
986         let mut last_htlc_clear_fee_b = 253;
987         let fee_est_c = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
988         let mut last_htlc_clear_fee_c = 253;
989
990         // 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
991         // forwarding.
992         let (node_a, mut monitor_a, keys_manager_a) = make_node!(0, fee_est_a);
993         let (node_b, mut monitor_b, keys_manager_b) = make_node!(1, fee_est_b);
994         let (node_c, mut monitor_c, keys_manager_c) = make_node!(2, fee_est_c);
995
996         let mut nodes = [node_a, node_b, node_c];
997
998         let chan_1_funding = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
999         let chan_2_funding = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);
1000
1001         for node in nodes.iter() {
1002                 confirm_txn!(node);
1003         }
1004
1005         lock_fundings!(nodes);
1006
1007         let chan_a = nodes[0].list_usable_channels()[0].short_channel_id.unwrap();
1008         let chan_b = nodes[2].list_usable_channels()[0].short_channel_id.unwrap();
1009
1010         let mut p_id: u8 = 0;
1011         let mut p_idx: u64 = 0;
1012
1013         let mut chan_a_disconnected = false;
1014         let mut chan_b_disconnected = false;
1015         let mut ab_events = Vec::new();
1016         let mut ba_events = Vec::new();
1017         let mut bc_events = Vec::new();
1018         let mut cb_events = Vec::new();
1019
1020         let mut node_a_ser = VecWriter(Vec::new());
1021         nodes[0].write(&mut node_a_ser).unwrap();
1022         let mut node_b_ser = VecWriter(Vec::new());
1023         nodes[1].write(&mut node_b_ser).unwrap();
1024         let mut node_c_ser = VecWriter(Vec::new());
1025         nodes[2].write(&mut node_c_ser).unwrap();
1026
1027         macro_rules! test_return {
1028                 () => {{
1029                         assert_eq!(nodes[0].list_channels().len(), 1);
1030                         assert_eq!(nodes[1].list_channels().len(), 2);
1031                         assert_eq!(nodes[2].list_channels().len(), 1);
1032                         return;
1033                 }};
1034         }
1035
1036         let mut read_pos = 0;
1037         macro_rules! get_slice {
1038                 ($len: expr) => {{
1039                         let slice_len = $len as usize;
1040                         if data.len() < read_pos + slice_len {
1041                                 test_return!();
1042                         }
1043                         read_pos += slice_len;
1044                         &data[read_pos - slice_len..read_pos]
1045                 }};
1046         }
1047
1048         loop {
1049                 // Push any events from Node B onto ba_events and bc_events
1050                 macro_rules! push_excess_b_events {
1051                         ($excess_events: expr, $expect_drop_node: expr) => { {
1052                                 let a_id = nodes[0].get_our_node_id();
1053                                 let expect_drop_node: Option<usize> = $expect_drop_node;
1054                                 let expect_drop_id = if let Some(id) = expect_drop_node { Some(nodes[id].get_our_node_id()) } else { None };
1055                                 for event in $excess_events {
1056                                         let push_a = match event {
1057                                                 events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => {
1058                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1059                                                         *node_id == a_id
1060                                                 },
1061                                                 events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => {
1062                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1063                                                         *node_id == a_id
1064                                                 },
1065                                                 events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => {
1066                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1067                                                         *node_id == a_id
1068                                                 },
1069                                                 events::MessageSendEvent::SendChannelReady { .. } => continue,
1070                                                 events::MessageSendEvent::SendAnnouncementSignatures { .. } => continue,
1071                                                 events::MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
1072                                                         assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1073                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1074                                                         *node_id == a_id
1075                                                 },
1076                                                 _ => panic!("Unhandled message event {:?}", event),
1077                                         };
1078                                         if push_a { ba_events.push(event); } else { bc_events.push(event); }
1079                                 }
1080                         } }
1081                 }
1082
1083                 // While delivering messages, we select across three possible message selection processes
1084                 // to ensure we get as much coverage as possible. See the individual enum variants for more
1085                 // details.
1086                 #[derive(PartialEq)]
1087                 enum ProcessMessages {
1088                         /// Deliver all available messages, including fetching any new messages from
1089                         /// `get_and_clear_pending_msg_events()` (which may have side effects).
1090                         AllMessages,
1091                         /// Call `get_and_clear_pending_msg_events()` first, and then deliver up to one
1092                         /// message (which may already be queued).
1093                         OneMessage,
1094                         /// Deliver up to one already-queued message. This avoids any potential side-effects
1095                         /// of `get_and_clear_pending_msg_events()` (eg freeing the HTLC holding cell), which
1096                         /// provides potentially more coverage.
1097                         OnePendingMessage,
1098                 }
1099
1100                 macro_rules! process_msg_events {
1101                         ($node: expr, $corrupt_forward: expr, $limit_events: expr) => { {
1102                                 let mut events = if $node == 1 {
1103                                         let mut new_events = Vec::new();
1104                                         mem::swap(&mut new_events, &mut ba_events);
1105                                         new_events.extend_from_slice(&bc_events[..]);
1106                                         bc_events.clear();
1107                                         new_events
1108                                 } else if $node == 0 {
1109                                         let mut new_events = Vec::new();
1110                                         mem::swap(&mut new_events, &mut ab_events);
1111                                         new_events
1112                                 } else {
1113                                         let mut new_events = Vec::new();
1114                                         mem::swap(&mut new_events, &mut cb_events);
1115                                         new_events
1116                                 };
1117                                 let mut new_events = Vec::new();
1118                                 if $limit_events != ProcessMessages::OnePendingMessage {
1119                                         new_events = nodes[$node].get_and_clear_pending_msg_events();
1120                                 }
1121                                 let mut had_events = false;
1122                                 let mut events_iter = events.drain(..).chain(new_events.drain(..));
1123                                 let mut extra_ev = None;
1124                                 for event in &mut events_iter {
1125                                         had_events = true;
1126                                         match event {
1127                                                 events::MessageSendEvent::UpdateHTLCs { node_id, updates: CommitmentUpdate { update_add_htlcs, update_fail_htlcs, update_fulfill_htlcs, update_fail_malformed_htlcs, update_fee, commitment_signed } } => {
1128                                                         for (idx, dest) in nodes.iter().enumerate() {
1129                                                                 if dest.get_our_node_id() == node_id {
1130                                                                         for update_add in update_add_htlcs.iter() {
1131                                                                                 out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes());
1132                                                                                 if !$corrupt_forward {
1133                                                                                         dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), update_add);
1134                                                                                 } else {
1135                                                                                         // Corrupt the update_add_htlc message so that its HMAC
1136                                                                                         // check will fail and we generate a
1137                                                                                         // update_fail_malformed_htlc instead of an
1138                                                                                         // update_fail_htlc as we do when we reject a payment.
1139                                                                                         let mut msg_ser = update_add.encode();
1140                                                                                         msg_ser[1000] ^= 0xff;
1141                                                                                         let new_msg = UpdateAddHTLC::read(&mut Cursor::new(&msg_ser)).unwrap();
1142                                                                                         dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), &new_msg);
1143                                                                                 }
1144                                                                         }
1145                                                                         for update_fulfill in update_fulfill_htlcs.iter() {
1146                                                                                 out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes());
1147                                                                                 dest.handle_update_fulfill_htlc(&nodes[$node].get_our_node_id(), update_fulfill);
1148                                                                         }
1149                                                                         for update_fail in update_fail_htlcs.iter() {
1150                                                                                 out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes());
1151                                                                                 dest.handle_update_fail_htlc(&nodes[$node].get_our_node_id(), update_fail);
1152                                                                         }
1153                                                                         for update_fail_malformed in update_fail_malformed_htlcs.iter() {
1154                                                                                 out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes());
1155                                                                                 dest.handle_update_fail_malformed_htlc(&nodes[$node].get_our_node_id(), update_fail_malformed);
1156                                                                         }
1157                                                                         if let Some(msg) = update_fee {
1158                                                                                 out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes());
1159                                                                                 dest.handle_update_fee(&nodes[$node].get_our_node_id(), &msg);
1160                                                                         }
1161                                                                         let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() ||
1162                                                                                 !update_fail_htlcs.is_empty() || !update_fail_malformed_htlcs.is_empty();
1163                                                                         if $limit_events != ProcessMessages::AllMessages && processed_change {
1164                                                                                 // If we only want to process some messages, don't deliver the CS until later.
1165                                                                                 extra_ev = Some(events::MessageSendEvent::UpdateHTLCs { node_id, updates: CommitmentUpdate {
1166                                                                                         update_add_htlcs: Vec::new(),
1167                                                                                         update_fail_htlcs: Vec::new(),
1168                                                                                         update_fulfill_htlcs: Vec::new(),
1169                                                                                         update_fail_malformed_htlcs: Vec::new(),
1170                                                                                         update_fee: None,
1171                                                                                         commitment_signed
1172                                                                                 } });
1173                                                                                 break;
1174                                                                         }
1175                                                                         out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes());
1176                                                                         dest.handle_commitment_signed(&nodes[$node].get_our_node_id(), &commitment_signed);
1177                                                                         break;
1178                                                                 }
1179                                                         }
1180                                                 },
1181                                                 events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
1182                                                         for (idx, dest) in nodes.iter().enumerate() {
1183                                                                 if dest.get_our_node_id() == *node_id {
1184                                                                         out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes());
1185                                                                         dest.handle_revoke_and_ack(&nodes[$node].get_our_node_id(), msg);
1186                                                                 }
1187                                                         }
1188                                                 },
1189                                                 events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
1190                                                         for (idx, dest) in nodes.iter().enumerate() {
1191                                                                 if dest.get_our_node_id() == *node_id {
1192                                                                         out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes());
1193                                                                         dest.handle_channel_reestablish(&nodes[$node].get_our_node_id(), msg);
1194                                                                 }
1195                                                         }
1196                                                 },
1197                                                 events::MessageSendEvent::SendChannelReady { .. } => {
1198                                                         // Can be generated as a reestablish response
1199                                                 },
1200                                                 events::MessageSendEvent::SendAnnouncementSignatures { .. } => {
1201                                                         // Can be generated as a reestablish response
1202                                                 },
1203                                                 events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1204                                                         // When we reconnect we will resend a channel_update to make sure our
1205                                                         // counterparty has the latest parameters for receiving payments
1206                                                         // through us. We do, however, check that the message does not include
1207                                                         // the "disabled" bit, as we should never ever have a channel which is
1208                                                         // disabled when we send such an update (or it may indicate channel
1209                                                         // force-close which we should detect as an error).
1210                                                         assert_eq!(msg.contents.flags & 2, 0);
1211                                                 },
1212                                                 _ => if out.may_fail.load(atomic::Ordering::Acquire) {
1213                                                         return;
1214                                                 } else {
1215                                                         panic!("Unhandled message event {:?}", event)
1216                                                 },
1217                                         }
1218                                         if $limit_events != ProcessMessages::AllMessages {
1219                                                 break;
1220                                         }
1221                                 }
1222                                 if $node == 1 {
1223                                         push_excess_b_events!(extra_ev.into_iter().chain(events_iter), None);
1224                                 } else if $node == 0 {
1225                                         if let Some(ev) = extra_ev { ab_events.push(ev); }
1226                                         for event in events_iter { ab_events.push(event); }
1227                                 } else {
1228                                         if let Some(ev) = extra_ev { cb_events.push(ev); }
1229                                         for event in events_iter { cb_events.push(event); }
1230                                 }
1231                                 had_events
1232                         } }
1233                 }
1234
1235                 macro_rules! process_msg_noret {
1236                         ($node: expr, $corrupt_forward: expr, $limit_events: expr) => {{
1237                                 process_msg_events!($node, $corrupt_forward, $limit_events);
1238                         }};
1239                 }
1240
1241                 macro_rules! drain_msg_events_on_disconnect {
1242                         ($counterparty_id: expr) => {{
1243                                 if $counterparty_id == 0 {
1244                                         for event in nodes[0].get_and_clear_pending_msg_events() {
1245                                                 match event {
1246                                                         events::MessageSendEvent::UpdateHTLCs { .. } => {},
1247                                                         events::MessageSendEvent::SendRevokeAndACK { .. } => {},
1248                                                         events::MessageSendEvent::SendChannelReestablish { .. } => {},
1249                                                         events::MessageSendEvent::SendChannelReady { .. } => {},
1250                                                         events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
1251                                                         events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1252                                                                 assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1253                                                         },
1254                                                         _ => {
1255                                                                 if out.may_fail.load(atomic::Ordering::Acquire) {
1256                                                                         return;
1257                                                                 } else {
1258                                                                         panic!("Unhandled message event")
1259                                                                 }
1260                                                         },
1261                                                 }
1262                                         }
1263                                         push_excess_b_events!(
1264                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1265                                                 Some(0)
1266                                         );
1267                                         ab_events.clear();
1268                                         ba_events.clear();
1269                                 } else {
1270                                         for event in nodes[2].get_and_clear_pending_msg_events() {
1271                                                 match event {
1272                                                         events::MessageSendEvent::UpdateHTLCs { .. } => {},
1273                                                         events::MessageSendEvent::SendRevokeAndACK { .. } => {},
1274                                                         events::MessageSendEvent::SendChannelReestablish { .. } => {},
1275                                                         events::MessageSendEvent::SendChannelReady { .. } => {},
1276                                                         events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
1277                                                         events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1278                                                                 assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1279                                                         },
1280                                                         _ => {
1281                                                                 if out.may_fail.load(atomic::Ordering::Acquire) {
1282                                                                         return;
1283                                                                 } else {
1284                                                                         panic!("Unhandled message event")
1285                                                                 }
1286                                                         },
1287                                                 }
1288                                         }
1289                                         push_excess_b_events!(
1290                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1291                                                 Some(2)
1292                                         );
1293                                         bc_events.clear();
1294                                         cb_events.clear();
1295                                 }
1296                         }};
1297                 }
1298
1299                 macro_rules! process_events {
1300                         ($node: expr, $fail: expr) => {{
1301                                 // In case we get 256 payments we may have a hash collision, resulting in the
1302                                 // second claim/fail call not finding the duplicate-hash HTLC, so we have to
1303                                 // deduplicate the calls here.
1304                                 let mut claim_set = new_hash_map();
1305                                 let mut events = nodes[$node].get_and_clear_pending_events();
1306                                 // Sort events so that PendingHTLCsForwardable get processed last. This avoids a
1307                                 // case where we first process a PendingHTLCsForwardable, then claim/fail on a
1308                                 // PaymentClaimable, claiming/failing two HTLCs, but leaving a just-generated
1309                                 // PaymentClaimable event for the second HTLC in our pending_events (and breaking
1310                                 // our claim_set deduplication).
1311                                 events.sort_by(|a, b| {
1312                                         if let events::Event::PaymentClaimable { .. } = a {
1313                                                 if let events::Event::PendingHTLCsForwardable { .. } = b {
1314                                                         Ordering::Less
1315                                                 } else {
1316                                                         Ordering::Equal
1317                                                 }
1318                                         } else if let events::Event::PendingHTLCsForwardable { .. } = a {
1319                                                 if let events::Event::PaymentClaimable { .. } = b {
1320                                                         Ordering::Greater
1321                                                 } else {
1322                                                         Ordering::Equal
1323                                                 }
1324                                         } else {
1325                                                 Ordering::Equal
1326                                         }
1327                                 });
1328                                 let had_events = !events.is_empty();
1329                                 for event in events.drain(..) {
1330                                         match event {
1331                                                 events::Event::PaymentClaimable { payment_hash, .. } => {
1332                                                         if claim_set.insert(payment_hash.0, ()).is_none() {
1333                                                                 if $fail {
1334                                                                         nodes[$node].fail_htlc_backwards(&payment_hash);
1335                                                                 } else {
1336                                                                         nodes[$node].claim_funds(PaymentPreimage(payment_hash.0));
1337                                                                 }
1338                                                         }
1339                                                 },
1340                                                 events::Event::PaymentSent { .. } => {},
1341                                                 events::Event::PaymentClaimed { .. } => {},
1342                                                 events::Event::PaymentPathSuccessful { .. } => {},
1343                                                 events::Event::PaymentPathFailed { .. } => {},
1344                                                 events::Event::PaymentFailed { .. } => {},
1345                                                 events::Event::ProbeSuccessful { .. }
1346                                                 | events::Event::ProbeFailed { .. } => {
1347                                                         // Even though we don't explicitly send probes, because probes are
1348                                                         // detected based on hashing the payment hash+preimage, its rather
1349                                                         // trivial for the fuzzer to build payments that accidentally end up
1350                                                         // looking like probes.
1351                                                 },
1352                                                 events::Event::PaymentForwarded { .. } if $node == 1 => {},
1353                                                 events::Event::ChannelReady { .. } => {},
1354                                                 events::Event::PendingHTLCsForwardable { .. } => {
1355                                                         nodes[$node].process_pending_htlc_forwards();
1356                                                 },
1357                                                 events::Event::HTLCHandlingFailed { .. } => {},
1358                                                 _ => {
1359                                                         if out.may_fail.load(atomic::Ordering::Acquire) {
1360                                                                 return;
1361                                                         } else {
1362                                                                 panic!("Unhandled event")
1363                                                         }
1364                                                 },
1365                                         }
1366                                 }
1367                                 had_events
1368                         }};
1369                 }
1370
1371                 macro_rules! process_ev_noret {
1372                         ($node: expr, $fail: expr) => {{
1373                                 process_events!($node, $fail);
1374                         }};
1375                 }
1376
1377                 let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
1378                 let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
1379                 let complete_monitor_update =
1380                         |monitor: &Arc<TestChainMonitor>,
1381                          chan_funding,
1382                          compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
1383                                 if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1384                                         assert!(
1385                                                 state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1386                                                 "updates should be sorted by id"
1387                                         );
1388                                         if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
1389                                                 monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1390                                                 if id > state.persisted_monitor_id {
1391                                                         state.persisted_monitor_id = id;
1392                                                         state.persisted_monitor = data;
1393                                                 }
1394                                         }
1395                                 }
1396                         };
1397
1398                 let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
1399                         if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1400                                 assert!(
1401                                         state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1402                                         "updates should be sorted by id"
1403                                 );
1404                                 for (id, data) in state.pending_monitors.drain(..) {
1405                                         monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1406                                         if id > state.persisted_monitor_id {
1407                                                 state.persisted_monitor_id = id;
1408                                                 state.persisted_monitor = data;
1409                                         }
1410                                 }
1411                         }
1412                 };
1413
1414                 let v = get_slice!(1)[0];
1415                 out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
1416                 match v {
1417                         // In general, we keep related message groups close together in binary form, allowing
1418                         // bit-twiddling mutations to have similar effects. This is probably overkill, but no
1419                         // harm in doing so.
1420                         0x00 => {
1421                                 *monitor_a.persister.update_ret.lock().unwrap() =
1422                                         ChannelMonitorUpdateStatus::InProgress
1423                         },
1424                         0x01 => {
1425                                 *monitor_b.persister.update_ret.lock().unwrap() =
1426                                         ChannelMonitorUpdateStatus::InProgress
1427                         },
1428                         0x02 => {
1429                                 *monitor_c.persister.update_ret.lock().unwrap() =
1430                                         ChannelMonitorUpdateStatus::InProgress
1431                         },
1432                         0x04 => {
1433                                 *monitor_a.persister.update_ret.lock().unwrap() =
1434                                         ChannelMonitorUpdateStatus::Completed
1435                         },
1436                         0x05 => {
1437                                 *monitor_b.persister.update_ret.lock().unwrap() =
1438                                         ChannelMonitorUpdateStatus::Completed
1439                         },
1440                         0x06 => {
1441                                 *monitor_c.persister.update_ret.lock().unwrap() =
1442                                         ChannelMonitorUpdateStatus::Completed
1443                         },
1444
1445                         0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
1446                         0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
1447                         0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
1448                         0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
1449
1450                         0x0c => {
1451                                 if !chan_a_disconnected {
1452                                         nodes[0].peer_disconnected(&nodes[1].get_our_node_id());
1453                                         nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
1454                                         chan_a_disconnected = true;
1455                                         drain_msg_events_on_disconnect!(0);
1456                                 }
1457                         },
1458                         0x0d => {
1459                                 if !chan_b_disconnected {
1460                                         nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
1461                                         nodes[2].peer_disconnected(&nodes[1].get_our_node_id());
1462                                         chan_b_disconnected = true;
1463                                         drain_msg_events_on_disconnect!(2);
1464                                 }
1465                         },
1466                         0x0e => {
1467                                 if chan_a_disconnected {
1468                                         let init_1 = Init {
1469                                                 features: nodes[1].init_features(),
1470                                                 networks: None,
1471                                                 remote_network_address: None,
1472                                         };
1473                                         nodes[0].peer_connected(&nodes[1].get_our_node_id(), &init_1, true).unwrap();
1474                                         let init_0 = Init {
1475                                                 features: nodes[0].init_features(),
1476                                                 networks: None,
1477                                                 remote_network_address: None,
1478                                         };
1479                                         nodes[1].peer_connected(&nodes[0].get_our_node_id(), &init_0, false).unwrap();
1480                                         chan_a_disconnected = false;
1481                                 }
1482                         },
1483                         0x0f => {
1484                                 if chan_b_disconnected {
1485                                         let init_2 = Init {
1486                                                 features: nodes[2].init_features(),
1487                                                 networks: None,
1488                                                 remote_network_address: None,
1489                                         };
1490                                         nodes[1].peer_connected(&nodes[2].get_our_node_id(), &init_2, true).unwrap();
1491                                         let init_1 = Init {
1492                                                 features: nodes[1].init_features(),
1493                                                 networks: None,
1494                                                 remote_network_address: None,
1495                                         };
1496                                         nodes[2].peer_connected(&nodes[1].get_our_node_id(), &init_1, false).unwrap();
1497                                         chan_b_disconnected = false;
1498                                 }
1499                         },
1500
1501                         0x10 => process_msg_noret!(0, true, ProcessMessages::AllMessages),
1502                         0x11 => process_msg_noret!(0, false, ProcessMessages::AllMessages),
1503                         0x12 => process_msg_noret!(0, true, ProcessMessages::OneMessage),
1504                         0x13 => process_msg_noret!(0, false, ProcessMessages::OneMessage),
1505                         0x14 => process_msg_noret!(0, true, ProcessMessages::OnePendingMessage),
1506                         0x15 => process_msg_noret!(0, false, ProcessMessages::OnePendingMessage),
1507
1508                         0x16 => process_ev_noret!(0, true),
1509                         0x17 => process_ev_noret!(0, false),
1510
1511                         0x18 => process_msg_noret!(1, true, ProcessMessages::AllMessages),
1512                         0x19 => process_msg_noret!(1, false, ProcessMessages::AllMessages),
1513                         0x1a => process_msg_noret!(1, true, ProcessMessages::OneMessage),
1514                         0x1b => process_msg_noret!(1, false, ProcessMessages::OneMessage),
1515                         0x1c => process_msg_noret!(1, true, ProcessMessages::OnePendingMessage),
1516                         0x1d => process_msg_noret!(1, false, ProcessMessages::OnePendingMessage),
1517
1518                         0x1e => process_ev_noret!(1, true),
1519                         0x1f => process_ev_noret!(1, false),
1520
1521                         0x20 => process_msg_noret!(2, true, ProcessMessages::AllMessages),
1522                         0x21 => process_msg_noret!(2, false, ProcessMessages::AllMessages),
1523                         0x22 => process_msg_noret!(2, true, ProcessMessages::OneMessage),
1524                         0x23 => process_msg_noret!(2, false, ProcessMessages::OneMessage),
1525                         0x24 => process_msg_noret!(2, true, ProcessMessages::OnePendingMessage),
1526                         0x25 => process_msg_noret!(2, false, ProcessMessages::OnePendingMessage),
1527
1528                         0x26 => process_ev_noret!(2, true),
1529                         0x27 => process_ev_noret!(2, false),
1530
1531                         0x2c => {
1532                                 if !chan_a_disconnected {
1533                                         nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
1534                                         chan_a_disconnected = true;
1535                                         push_excess_b_events!(
1536                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1537                                                 Some(0)
1538                                         );
1539                                         ab_events.clear();
1540                                         ba_events.clear();
1541                                 }
1542                                 let (new_node_a, new_monitor_a) =
1543                                         reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a);
1544                                 nodes[0] = new_node_a;
1545                                 monitor_a = new_monitor_a;
1546                         },
1547                         0x2d => {
1548                                 if !chan_a_disconnected {
1549                                         nodes[0].peer_disconnected(&nodes[1].get_our_node_id());
1550                                         chan_a_disconnected = true;
1551                                         nodes[0].get_and_clear_pending_msg_events();
1552                                         ab_events.clear();
1553                                         ba_events.clear();
1554                                 }
1555                                 if !chan_b_disconnected {
1556                                         nodes[2].peer_disconnected(&nodes[1].get_our_node_id());
1557                                         chan_b_disconnected = true;
1558                                         nodes[2].get_and_clear_pending_msg_events();
1559                                         bc_events.clear();
1560                                         cb_events.clear();
1561                                 }
1562                                 let (new_node_b, new_monitor_b) =
1563                                         reload_node!(node_b_ser, 1, monitor_b, keys_manager_b, fee_est_b);
1564                                 nodes[1] = new_node_b;
1565                                 monitor_b = new_monitor_b;
1566                         },
1567                         0x2e => {
1568                                 if !chan_b_disconnected {
1569                                         nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
1570                                         chan_b_disconnected = true;
1571                                         push_excess_b_events!(
1572                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1573                                                 Some(2)
1574                                         );
1575                                         bc_events.clear();
1576                                         cb_events.clear();
1577                                 }
1578                                 let (new_node_c, new_monitor_c) =
1579                                         reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c);
1580                                 nodes[2] = new_node_c;
1581                                 monitor_c = new_monitor_c;
1582                         },
1583
1584                         // 1/10th the channel size:
1585                         0x30 => send_noret(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx),
1586                         0x31 => send_noret(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx),
1587                         0x32 => send_noret(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx),
1588                         0x33 => send_noret(&nodes[2], &nodes[1], chan_b, 10_000_000, &mut p_id, &mut p_idx),
1589                         0x34 => send_hop_noret(
1590                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx,
1591                         ),
1592                         0x35 => send_hop_noret(
1593                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx,
1594                         ),
1595
1596                         0x38 => send_noret(&nodes[0], &nodes[1], chan_a, 1_000_000, &mut p_id, &mut p_idx),
1597                         0x39 => send_noret(&nodes[1], &nodes[0], chan_a, 1_000_000, &mut p_id, &mut p_idx),
1598                         0x3a => send_noret(&nodes[1], &nodes[2], chan_b, 1_000_000, &mut p_id, &mut p_idx),
1599                         0x3b => send_noret(&nodes[2], &nodes[1], chan_b, 1_000_000, &mut p_id, &mut p_idx),
1600                         0x3c => send_hop_noret(
1601                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1_000_000, &mut p_id, &mut p_idx,
1602                         ),
1603                         0x3d => send_hop_noret(
1604                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1_000_000, &mut p_id, &mut p_idx,
1605                         ),
1606
1607                         0x40 => send_noret(&nodes[0], &nodes[1], chan_a, 100_000, &mut p_id, &mut p_idx),
1608                         0x41 => send_noret(&nodes[1], &nodes[0], chan_a, 100_000, &mut p_id, &mut p_idx),
1609                         0x42 => send_noret(&nodes[1], &nodes[2], chan_b, 100_000, &mut p_id, &mut p_idx),
1610                         0x43 => send_noret(&nodes[2], &nodes[1], chan_b, 100_000, &mut p_id, &mut p_idx),
1611                         0x44 => send_hop_noret(
1612                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 100_000, &mut p_id, &mut p_idx,
1613                         ),
1614                         0x45 => send_hop_noret(
1615                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 100_000, &mut p_id, &mut p_idx,
1616                         ),
1617
1618                         0x48 => send_noret(&nodes[0], &nodes[1], chan_a, 10_000, &mut p_id, &mut p_idx),
1619                         0x49 => send_noret(&nodes[1], &nodes[0], chan_a, 10_000, &mut p_id, &mut p_idx),
1620                         0x4a => send_noret(&nodes[1], &nodes[2], chan_b, 10_000, &mut p_id, &mut p_idx),
1621                         0x4b => send_noret(&nodes[2], &nodes[1], chan_b, 10_000, &mut p_id, &mut p_idx),
1622                         0x4c => send_hop_noret(
1623                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10_000, &mut p_id, &mut p_idx,
1624                         ),
1625                         0x4d => send_hop_noret(
1626                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10_000, &mut p_id, &mut p_idx,
1627                         ),
1628
1629                         0x50 => send_noret(&nodes[0], &nodes[1], chan_a, 1_000, &mut p_id, &mut p_idx),
1630                         0x51 => send_noret(&nodes[1], &nodes[0], chan_a, 1_000, &mut p_id, &mut p_idx),
1631                         0x52 => send_noret(&nodes[1], &nodes[2], chan_b, 1_000, &mut p_id, &mut p_idx),
1632                         0x53 => send_noret(&nodes[2], &nodes[1], chan_b, 1_000, &mut p_id, &mut p_idx),
1633                         0x54 => send_hop_noret(
1634                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1_000, &mut p_id, &mut p_idx,
1635                         ),
1636                         0x55 => send_hop_noret(
1637                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1_000, &mut p_id, &mut p_idx,
1638                         ),
1639
1640                         0x58 => send_noret(&nodes[0], &nodes[1], chan_a, 100, &mut p_id, &mut p_idx),
1641                         0x59 => send_noret(&nodes[1], &nodes[0], chan_a, 100, &mut p_id, &mut p_idx),
1642                         0x5a => send_noret(&nodes[1], &nodes[2], chan_b, 100, &mut p_id, &mut p_idx),
1643                         0x5b => send_noret(&nodes[2], &nodes[1], chan_b, 100, &mut p_id, &mut p_idx),
1644                         0x5c => send_hop_noret(
1645                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 100, &mut p_id, &mut p_idx,
1646                         ),
1647                         0x5d => send_hop_noret(
1648                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 100, &mut p_id, &mut p_idx,
1649                         ),
1650
1651                         0x60 => send_noret(&nodes[0], &nodes[1], chan_a, 10, &mut p_id, &mut p_idx),
1652                         0x61 => send_noret(&nodes[1], &nodes[0], chan_a, 10, &mut p_id, &mut p_idx),
1653                         0x62 => send_noret(&nodes[1], &nodes[2], chan_b, 10, &mut p_id, &mut p_idx),
1654                         0x63 => send_noret(&nodes[2], &nodes[1], chan_b, 10, &mut p_id, &mut p_idx),
1655                         0x64 => send_hop_noret(
1656                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10, &mut p_id, &mut p_idx,
1657                         ),
1658                         0x65 => send_hop_noret(
1659                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10, &mut p_id, &mut p_idx,
1660                         ),
1661
1662                         0x68 => send_noret(&nodes[0], &nodes[1], chan_a, 1, &mut p_id, &mut p_idx),
1663                         0x69 => send_noret(&nodes[1], &nodes[0], chan_a, 1, &mut p_id, &mut p_idx),
1664                         0x6a => send_noret(&nodes[1], &nodes[2], chan_b, 1, &mut p_id, &mut p_idx),
1665                         0x6b => send_noret(&nodes[2], &nodes[1], chan_b, 1, &mut p_id, &mut p_idx),
1666                         0x6c => send_hop_noret(
1667                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1, &mut p_id, &mut p_idx,
1668                         ),
1669                         0x6d => send_hop_noret(
1670                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1, &mut p_id, &mut p_idx,
1671                         ),
1672
1673                         0x80 => {
1674                                 let mut max_feerate = last_htlc_clear_fee_a;
1675                                 if !anchors {
1676                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1677                                 }
1678                                 if fee_est_a.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1679                                         fee_est_a.ret_val.store(max_feerate, atomic::Ordering::Release);
1680                                 }
1681                                 nodes[0].maybe_update_chan_fees();
1682                         },
1683                         0x81 => {
1684                                 fee_est_a.ret_val.store(253, atomic::Ordering::Release);
1685                                 nodes[0].maybe_update_chan_fees();
1686                         },
1687
1688                         0x84 => {
1689                                 let mut max_feerate = last_htlc_clear_fee_b;
1690                                 if !anchors {
1691                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1692                                 }
1693                                 if fee_est_b.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1694                                         fee_est_b.ret_val.store(max_feerate, atomic::Ordering::Release);
1695                                 }
1696                                 nodes[1].maybe_update_chan_fees();
1697                         },
1698                         0x85 => {
1699                                 fee_est_b.ret_val.store(253, atomic::Ordering::Release);
1700                                 nodes[1].maybe_update_chan_fees();
1701                         },
1702
1703                         0x88 => {
1704                                 let mut max_feerate = last_htlc_clear_fee_c;
1705                                 if !anchors {
1706                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1707                                 }
1708                                 if fee_est_c.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1709                                         fee_est_c.ret_val.store(max_feerate, atomic::Ordering::Release);
1710                                 }
1711                                 nodes[2].maybe_update_chan_fees();
1712                         },
1713                         0x89 => {
1714                                 fee_est_c.ret_val.store(253, atomic::Ordering::Release);
1715                                 nodes[2].maybe_update_chan_fees();
1716                         },
1717
1718                         0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
1719                         0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
1720                         0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
1721
1722                         0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
1723                         0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
1724                         0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
1725
1726                         0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
1727                         0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
1728                         0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
1729
1730                         0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
1731                         0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
1732                         0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
1733
1734                         0xff => {
1735                                 // Test that no channel is in a stuck state where neither party can send funds even
1736                                 // after we resolve all pending events.
1737                                 // First make sure there are no pending monitor updates and further update
1738                                 // operations complete.
1739                                 *monitor_a.persister.update_ret.lock().unwrap() =
1740                                         ChannelMonitorUpdateStatus::Completed;
1741                                 *monitor_b.persister.update_ret.lock().unwrap() =
1742                                         ChannelMonitorUpdateStatus::Completed;
1743                                 *monitor_c.persister.update_ret.lock().unwrap() =
1744                                         ChannelMonitorUpdateStatus::Completed;
1745
1746                                 complete_all_monitor_updates(&monitor_a, &chan_1_funding);
1747                                 complete_all_monitor_updates(&monitor_b, &chan_1_funding);
1748                                 complete_all_monitor_updates(&monitor_b, &chan_2_funding);
1749                                 complete_all_monitor_updates(&monitor_c, &chan_2_funding);
1750
1751                                 // Next, make sure peers are all connected to each other
1752                                 if chan_a_disconnected {
1753                                         let init_1 = Init {
1754                                                 features: nodes[1].init_features(),
1755                                                 networks: None,
1756                                                 remote_network_address: None,
1757                                         };
1758                                         nodes[0].peer_connected(&nodes[1].get_our_node_id(), &init_1, true).unwrap();
1759                                         let init_0 = Init {
1760                                                 features: nodes[0].init_features(),
1761                                                 networks: None,
1762                                                 remote_network_address: None,
1763                                         };
1764                                         nodes[1].peer_connected(&nodes[0].get_our_node_id(), &init_0, false).unwrap();
1765                                         chan_a_disconnected = false;
1766                                 }
1767                                 if chan_b_disconnected {
1768                                         let init_2 = Init {
1769                                                 features: nodes[2].init_features(),
1770                                                 networks: None,
1771                                                 remote_network_address: None,
1772                                         };
1773                                         nodes[1].peer_connected(&nodes[2].get_our_node_id(), &init_2, true).unwrap();
1774                                         let init_1 = Init {
1775                                                 features: nodes[1].init_features(),
1776                                                 networks: None,
1777                                                 remote_network_address: None,
1778                                         };
1779                                         nodes[2].peer_connected(&nodes[1].get_our_node_id(), &init_1, false).unwrap();
1780                                         chan_b_disconnected = false;
1781                                 }
1782
1783                                 for i in 0..std::usize::MAX {
1784                                         if i == 100 {
1785                                                 panic!("It may take may iterations to settle the state, but it should not take forever");
1786                                         }
1787                                         // Then, make sure any current forwards make their way to their destination
1788                                         if process_msg_events!(0, false, ProcessMessages::AllMessages) {
1789                                                 continue;
1790                                         }
1791                                         if process_msg_events!(1, false, ProcessMessages::AllMessages) {
1792                                                 continue;
1793                                         }
1794                                         if process_msg_events!(2, false, ProcessMessages::AllMessages) {
1795                                                 continue;
1796                                         }
1797                                         // ...making sure any pending PendingHTLCsForwardable events are handled and
1798                                         // payments claimed.
1799                                         if process_events!(0, false) {
1800                                                 continue;
1801                                         }
1802                                         if process_events!(1, false) {
1803                                                 continue;
1804                                         }
1805                                         if process_events!(2, false) {
1806                                                 continue;
1807                                         }
1808                                         break;
1809                                 }
1810
1811                                 // Finally, make sure that at least one end of each channel can make a substantial payment
1812                                 assert!(
1813                                         send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx)
1814                                                 || send_payment(
1815                                                         &nodes[1], &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx
1816                                                 )
1817                                 );
1818                                 assert!(
1819                                         send_payment(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx)
1820                                                 || send_payment(
1821                                                         &nodes[2], &nodes[1], chan_b, 10_000_000, &mut p_id, &mut p_idx
1822                                                 )
1823                                 );
1824
1825                                 last_htlc_clear_fee_a = fee_est_a.ret_val.load(atomic::Ordering::Acquire);
1826                                 last_htlc_clear_fee_b = fee_est_b.ret_val.load(atomic::Ordering::Acquire);
1827                                 last_htlc_clear_fee_c = fee_est_c.ret_val.load(atomic::Ordering::Acquire);
1828                         },
1829                         _ => test_return!(),
1830                 }
1831
1832                 if nodes[0].get_and_clear_needs_persistence() == true {
1833                         node_a_ser.0.clear();
1834                         nodes[0].write(&mut node_a_ser).unwrap();
1835                 }
1836                 if nodes[1].get_and_clear_needs_persistence() == true {
1837                         node_b_ser.0.clear();
1838                         nodes[1].write(&mut node_b_ser).unwrap();
1839                 }
1840                 if nodes[2].get_and_clear_needs_persistence() == true {
1841                         node_c_ser.0.clear();
1842                         nodes[2].write(&mut node_c_ser).unwrap();
1843                 }
1844         }
1845 }
1846
1847 /// We actually have different behavior based on if a certain log string has been seen, so we have
1848 /// to do a bit more tracking.
1849 #[derive(Clone)]
1850 struct SearchingOutput<O: Output> {
1851         output: O,
1852         may_fail: Arc<atomic::AtomicBool>,
1853 }
1854 impl<O: Output> Output for SearchingOutput<O> {
1855         fn locked_write(&self, data: &[u8]) {
1856                 // We hit a design limitation of LN state machine (see CONCURRENT_INBOUND_HTLC_FEE_BUFFER)
1857                 if std::str::from_utf8(data).unwrap().contains("Outbound update_fee HTLC buffer overflow - counterparty should force-close this channel") {
1858                         self.may_fail.store(true, atomic::Ordering::Release);
1859                 }
1860                 self.output.locked_write(data)
1861         }
1862 }
1863 impl<O: Output> SearchingOutput<O> {
1864         pub fn new(output: O) -> Self {
1865                 Self { output, may_fail: Arc::new(atomic::AtomicBool::new(false)) }
1866         }
1867 }
1868
1869 pub fn chanmon_consistency_test<Out: Output>(data: &[u8], out: Out) {
1870         do_test(data, out.clone(), false);
1871         do_test(data, out, true);
1872 }
1873
1874 #[no_mangle]
1875 pub extern "C" fn chanmon_consistency_run(data: *const u8, datalen: usize) {
1876         do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, test_logger::DevNull {}, false);
1877         do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, test_logger::DevNull {}, true);
1878 }