`rustfmt`: `fuzz/src/chanmon_consistency.rs`
[rust-lightning] / fuzz / src / chanmon_consistency.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! Test that monitor update failures don't get our channel state out of sync.
11 //! One of the biggest concern with the monitor update failure handling code is that messages
12 //! resent after monitor updating is restored are delivered out-of-order, resulting in
13 //! commitment_signed messages having "invalid signatures".
14 //! To test this we stand up a network of three nodes and read bytes from the fuzz input to denote
15 //! actions such as sending payments, handling events, or changing monitor update return values on
16 //! a per-node basis. This should allow it to find any cases where the ordering of actions results
17 //! in us getting out of sync with ourselves, and, assuming at least one of our recieve- or
18 //! send-side handling is correct, other peers. We consider it a failure if any action results in a
19 //! channel being force-closed.
20
21 use bitcoin::amount::Amount;
22 use bitcoin::blockdata::constants::genesis_block;
23 use bitcoin::blockdata::locktime::absolute::LockTime;
24 use bitcoin::blockdata::opcodes;
25 use bitcoin::blockdata::script::{Builder, ScriptBuf};
26 use bitcoin::blockdata::transaction::{Transaction, TxOut};
27 use bitcoin::network::Network;
28 use bitcoin::transaction::Version;
29
30 use bitcoin::hash_types::BlockHash;
31 use bitcoin::hashes::sha256::Hash as Sha256;
32 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
33 use bitcoin::hashes::Hash as TraitImport;
34 use bitcoin::WPubkeyHash;
35
36 use lightning::blinded_path::payment::ReceiveTlvs;
37 use lightning::blinded_path::BlindedPath;
38 use lightning::chain;
39 use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
40 use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent};
41 use lightning::chain::transaction::OutPoint;
42 use lightning::chain::{
43         chainmonitor, channelmonitor, BestBlock, ChannelMonitorUpdateStatus, Confirm, Watch,
44 };
45 use lightning::events;
46 use lightning::events::MessageSendEventsProvider;
47 use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
48 use lightning::ln::channel_state::ChannelDetails;
49 use lightning::ln::channelmanager::{
50         ChainParameters, ChannelManager, ChannelManagerReadArgs, PaymentId, PaymentSendFailure,
51         RecipientOnionFields,
52 };
53 use lightning::ln::functional_test_utils::*;
54 use lightning::ln::msgs::{
55         self, ChannelMessageHandler, CommitmentUpdate, DecodeError, Init, UpdateAddHTLC,
56 };
57 use lightning::ln::script::ShutdownScript;
58 use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret};
59 use lightning::offers::invoice::{BlindedPayInfo, UnsignedBolt12Invoice};
60 use lightning::offers::invoice_request::UnsignedInvoiceRequest;
61 use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath};
62 use lightning::routing::router::{InFlightHtlcs, Path, Route, RouteHop, RouteParameters, Router};
63 use lightning::sign::{
64         EntropySource, InMemorySigner, KeyMaterial, NodeSigner, Recipient, SignerProvider,
65 };
66 use lightning::util::config::UserConfig;
67 use lightning::util::errors::APIError;
68 use lightning::util::hash_tables::*;
69 use lightning::util::logger::Logger;
70 use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
71 use lightning::util::test_channel_signer::{EnforcementState, TestChannelSigner};
72
73 use crate::utils::test_logger::{self, Output};
74 use crate::utils::test_persister::TestPersister;
75
76 use bitcoin::secp256k1::ecdh::SharedSecret;
77 use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
78 use bitcoin::secp256k1::schnorr;
79 use bitcoin::secp256k1::{self, Message, PublicKey, Scalar, Secp256k1, SecretKey};
80
81 use bech32::u5;
82 use std::cmp::{self, Ordering};
83 use std::io::Cursor;
84 use std::mem;
85 use std::sync::atomic;
86 use std::sync::{Arc, Mutex};
87
88 const MAX_FEE: u32 = 10_000;
89 struct FuzzEstimator {
90         ret_val: atomic::AtomicU32,
91 }
92 impl FeeEstimator for FuzzEstimator {
93         fn get_est_sat_per_1000_weight(&self, conf_target: ConfirmationTarget) -> u32 {
94                 // We force-close channels if our counterparty sends us a feerate which is a small multiple
95                 // of our HighPriority fee estimate or smaller than our Background fee estimate. Thus, we
96                 // always return a HighPriority feerate here which is >= the maximum Normal feerate and a
97                 // Background feerate which is <= the minimum Normal feerate.
98                 match conf_target {
99                         ConfirmationTarget::OnChainSweep => MAX_FEE,
100                         ConfirmationTarget::ChannelCloseMinimum
101                         | ConfirmationTarget::AnchorChannelFee
102                         | ConfirmationTarget::MinAllowedAnchorChannelRemoteFee
103                         | ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee
104                         | ConfirmationTarget::OutputSpendingFee => 253,
105                         ConfirmationTarget::NonAnchorChannelFee => {
106                                 cmp::min(self.ret_val.load(atomic::Ordering::Acquire), MAX_FEE)
107                         },
108                 }
109         }
110 }
111
112 struct FuzzRouter {}
113
114 impl Router for FuzzRouter {
115         fn find_route(
116                 &self, _payer: &PublicKey, _params: &RouteParameters,
117                 _first_hops: Option<&[&ChannelDetails]>, _inflight_htlcs: InFlightHtlcs,
118         ) -> Result<Route, msgs::LightningError> {
119                 Err(msgs::LightningError {
120                         err: String::from("Not implemented"),
121                         action: msgs::ErrorAction::IgnoreError,
122                 })
123         }
124
125         fn create_blinded_payment_paths<T: secp256k1::Signing + secp256k1::Verification>(
126                 &self, _recipient: PublicKey, _first_hops: Vec<ChannelDetails>, _tlvs: ReceiveTlvs,
127                 _amount_msats: u64, _secp_ctx: &Secp256k1<T>,
128         ) -> Result<Vec<(BlindedPayInfo, BlindedPath)>, ()> {
129                 unreachable!()
130         }
131 }
132
133 impl MessageRouter for FuzzRouter {
134         fn find_path(
135                 &self, _sender: PublicKey, _peers: Vec<PublicKey>, _destination: Destination,
136         ) -> Result<OnionMessagePath, ()> {
137                 unreachable!()
138         }
139
140         fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
141                 &self, _recipient: PublicKey, _peers: Vec<PublicKey>, _secp_ctx: &Secp256k1<T>,
142         ) -> Result<Vec<BlindedPath>, ()> {
143                 unreachable!()
144         }
145 }
146
147 pub struct TestBroadcaster {}
148 impl BroadcasterInterface for TestBroadcaster {
149         fn broadcast_transactions(&self, _txs: &[&Transaction]) {}
150 }
151
152 pub struct VecWriter(pub Vec<u8>);
153 impl Writer for VecWriter {
154         fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> {
155                 self.0.extend_from_slice(buf);
156                 Ok(())
157         }
158 }
159
160 /// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
161 /// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
162 /// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
163 /// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
164 ///
165 /// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
166 /// simply be replayed on startup.
167 struct LatestMonitorState {
168         /// The latest monitor id which we told LDK we've persisted
169         persisted_monitor_id: u64,
170         /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
171         persisted_monitor: Vec<u8>,
172         /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
173         /// from LDK's perspective.
174         pending_monitors: Vec<(u64, Vec<u8>)>,
175 }
176
177 struct TestChainMonitor {
178         pub logger: Arc<dyn Logger>,
179         pub keys: Arc<KeyProvider>,
180         pub persister: Arc<TestPersister>,
181         pub chain_monitor: Arc<
182                 chainmonitor::ChainMonitor<
183                         TestChannelSigner,
184                         Arc<dyn chain::Filter>,
185                         Arc<TestBroadcaster>,
186                         Arc<FuzzEstimator>,
187                         Arc<dyn Logger>,
188                         Arc<TestPersister>,
189                 >,
190         >,
191         pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
192 }
193 impl TestChainMonitor {
194         pub fn new(
195                 broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
196                 persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
197         ) -> Self {
198                 Self {
199                         chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
200                                 None,
201                                 broadcaster,
202                                 logger.clone(),
203                                 feeest,
204                                 Arc::clone(&persister),
205                         )),
206                         logger,
207                         keys,
208                         persister,
209                         latest_monitors: Mutex::new(new_hash_map()),
210                 }
211         }
212 }
213 impl chain::Watch<TestChannelSigner> for TestChainMonitor {
214         fn watch_channel(
215                 &self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
216         ) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
217                 let mut ser = VecWriter(Vec::new());
218                 monitor.write(&mut ser).unwrap();
219                 let monitor_id = monitor.get_latest_update_id();
220                 let res = self.chain_monitor.watch_channel(funding_txo, monitor);
221                 let state = match res {
222                         Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
223                                 persisted_monitor_id: monitor_id,
224                                 persisted_monitor: ser.0,
225                                 pending_monitors: Vec::new(),
226                         },
227                         Ok(chain::ChannelMonitorUpdateStatus::InProgress) => {
228                                 panic!("The test currently doesn't test initial-persistence via the async pipeline")
229                         },
230                         Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
231                         Err(()) => panic!(),
232                 };
233                 if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
234                         panic!("Already had monitor pre-watch_channel");
235                 }
236                 res
237         }
238
239         fn update_channel(
240                 &self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate,
241         ) -> chain::ChannelMonitorUpdateStatus {
242                 let mut map_lock = self.latest_monitors.lock().unwrap();
243                 let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
244                 let latest_monitor_data = map_entry
245                         .pending_monitors
246                         .last()
247                         .as_ref()
248                         .map(|(_, data)| data)
249                         .unwrap_or(&map_entry.persisted_monitor);
250                 let deserialized_monitor =
251                         <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
252                                 &mut Cursor::new(&latest_monitor_data),
253                                 (&*self.keys, &*self.keys),
254                         )
255                         .unwrap()
256                         .1;
257                 deserialized_monitor
258                         .update_monitor(
259                                 update,
260                                 &&TestBroadcaster {},
261                                 &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) },
262                                 &self.logger,
263                         )
264                         .unwrap();
265                 let mut ser = VecWriter(Vec::new());
266                 deserialized_monitor.write(&mut ser).unwrap();
267                 let res = self.chain_monitor.update_channel(funding_txo, update);
268                 match res {
269                         chain::ChannelMonitorUpdateStatus::Completed => {
270                                 map_entry.persisted_monitor_id = update.update_id;
271                                 map_entry.persisted_monitor = ser.0;
272                         },
273                         chain::ChannelMonitorUpdateStatus::InProgress => {
274                                 map_entry.pending_monitors.push((update.update_id, ser.0));
275                         },
276                         chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
277                 }
278                 res
279         }
280
281         fn release_pending_monitor_events(
282                 &self,
283         ) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
284                 return self.chain_monitor.release_pending_monitor_events();
285         }
286 }
287
288 struct KeyProvider {
289         node_secret: SecretKey,
290         rand_bytes_id: atomic::AtomicU32,
291         enforcement_states: Mutex<HashMap<[u8; 32], Arc<Mutex<EnforcementState>>>>,
292 }
293
294 impl EntropySource for KeyProvider {
295         fn get_secure_random_bytes(&self) -> [u8; 32] {
296                 let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed);
297                 #[rustfmt::skip]
298                 let mut res = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, self.node_secret[31]];
299                 res[30 - 4..30].copy_from_slice(&id.to_le_bytes());
300                 res
301         }
302 }
303
304 impl NodeSigner for KeyProvider {
305         fn get_node_id(&self, recipient: Recipient) -> Result<PublicKey, ()> {
306                 let node_secret = match recipient {
307                         Recipient::Node => Ok(&self.node_secret),
308                         Recipient::PhantomNode => Err(()),
309                 }?;
310                 Ok(PublicKey::from_secret_key(&Secp256k1::signing_only(), node_secret))
311         }
312
313         fn ecdh(
314                 &self, recipient: Recipient, other_key: &PublicKey, tweak: Option<&Scalar>,
315         ) -> Result<SharedSecret, ()> {
316                 let mut node_secret = match recipient {
317                         Recipient::Node => Ok(self.node_secret.clone()),
318                         Recipient::PhantomNode => Err(()),
319                 }?;
320                 if let Some(tweak) = tweak {
321                         node_secret = node_secret.mul_tweak(tweak).map_err(|_| ())?;
322                 }
323                 Ok(SharedSecret::new(other_key, &node_secret))
324         }
325
326         fn get_inbound_payment_key_material(&self) -> KeyMaterial {
327                 #[rustfmt::skip]
328                 let random_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, self.node_secret[31]];
329                 KeyMaterial(random_bytes)
330         }
331
332         fn sign_invoice(
333                 &self, _hrp_bytes: &[u8], _invoice_data: &[u5], _recipient: Recipient,
334         ) -> Result<RecoverableSignature, ()> {
335                 unreachable!()
336         }
337
338         fn sign_bolt12_invoice_request(
339                 &self, _invoice_request: &UnsignedInvoiceRequest,
340         ) -> Result<schnorr::Signature, ()> {
341                 unreachable!()
342         }
343
344         fn sign_bolt12_invoice(
345                 &self, _invoice: &UnsignedBolt12Invoice,
346         ) -> Result<schnorr::Signature, ()> {
347                 unreachable!()
348         }
349
350         fn sign_gossip_message(
351                 &self, msg: lightning::ln::msgs::UnsignedGossipMessage,
352         ) -> Result<Signature, ()> {
353                 let msg_hash = Message::from_digest(Sha256dHash::hash(&msg.encode()[..]).to_byte_array());
354                 let secp_ctx = Secp256k1::signing_only();
355                 Ok(secp_ctx.sign_ecdsa(&msg_hash, &self.node_secret))
356         }
357 }
358
359 impl SignerProvider for KeyProvider {
360         type EcdsaSigner = TestChannelSigner;
361         #[cfg(taproot)]
362         type TaprootSigner = TestChannelSigner;
363
364         fn generate_channel_keys_id(
365                 &self, _inbound: bool, _channel_value_satoshis: u64, _user_channel_id: u128,
366         ) -> [u8; 32] {
367                 let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed) as u8;
368                 [id; 32]
369         }
370
371         fn derive_channel_signer(
372                 &self, channel_value_satoshis: u64, channel_keys_id: [u8; 32],
373         ) -> Self::EcdsaSigner {
374                 let secp_ctx = Secp256k1::signing_only();
375                 let id = channel_keys_id[0];
376                 #[rustfmt::skip]
377                 let keys = InMemorySigner::new(
378                         &secp_ctx,
379                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, self.node_secret[31]]).unwrap(),
380                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, self.node_secret[31]]).unwrap(),
381                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, self.node_secret[31]]).unwrap(),
382                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, self.node_secret[31]]).unwrap(),
383                         SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, self.node_secret[31]]).unwrap(),
384                         [id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, self.node_secret[31]],
385                         channel_value_satoshis,
386                         channel_keys_id,
387                         channel_keys_id,
388                 );
389                 let revoked_commitment = self.make_enforcement_state_cell(keys.commitment_seed);
390                 TestChannelSigner::new_with_revoked(keys, revoked_commitment, false)
391         }
392
393         fn read_chan_signer(&self, buffer: &[u8]) -> Result<Self::EcdsaSigner, DecodeError> {
394                 let mut reader = std::io::Cursor::new(buffer);
395
396                 let inner: InMemorySigner = ReadableArgs::read(&mut reader, self)?;
397                 let state = self.make_enforcement_state_cell(inner.commitment_seed);
398
399                 Ok(TestChannelSigner {
400                         inner,
401                         state,
402                         disable_revocation_policy_check: false,
403                         available: Arc::new(Mutex::new(true)),
404                 })
405         }
406
407         fn get_destination_script(&self, _channel_keys_id: [u8; 32]) -> Result<ScriptBuf, ()> {
408                 let secp_ctx = Secp256k1::signing_only();
409                 #[rustfmt::skip]
410                 let channel_monitor_claim_key = SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, self.node_secret[31]]).unwrap();
411                 let our_channel_monitor_claim_key_hash = WPubkeyHash::hash(
412                         &PublicKey::from_secret_key(&secp_ctx, &channel_monitor_claim_key).serialize(),
413                 );
414                 Ok(Builder::new()
415                         .push_opcode(opcodes::all::OP_PUSHBYTES_0)
416                         .push_slice(our_channel_monitor_claim_key_hash)
417                         .into_script())
418         }
419
420         fn get_shutdown_scriptpubkey(&self) -> Result<ShutdownScript, ()> {
421                 let secp_ctx = Secp256k1::signing_only();
422                 #[rustfmt::skip]
423                 let secret_key = SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, self.node_secret[31]]).unwrap();
424                 let pubkey_hash =
425                         WPubkeyHash::hash(&PublicKey::from_secret_key(&secp_ctx, &secret_key).serialize());
426                 Ok(ShutdownScript::new_p2wpkh(&pubkey_hash))
427         }
428 }
429
430 impl KeyProvider {
431         fn make_enforcement_state_cell(
432                 &self, commitment_seed: [u8; 32],
433         ) -> Arc<Mutex<EnforcementState>> {
434                 let mut revoked_commitments = self.enforcement_states.lock().unwrap();
435                 if !revoked_commitments.contains_key(&commitment_seed) {
436                         revoked_commitments
437                                 .insert(commitment_seed, Arc::new(Mutex::new(EnforcementState::new())));
438                 }
439                 let cell = revoked_commitments.get(&commitment_seed).unwrap();
440                 Arc::clone(cell)
441         }
442 }
443
444 #[inline]
445 fn check_api_err(api_err: APIError, sendable_bounds_violated: bool) {
446         match api_err {
447                 APIError::APIMisuseError { .. } => panic!("We can't misuse the API"),
448                 APIError::FeeRateTooHigh { .. } => panic!("We can't send too much fee?"),
449                 APIError::InvalidRoute { .. } => panic!("Our routes should work"),
450                 APIError::ChannelUnavailable { err } => {
451                         // Test the error against a list of errors we can hit, and reject
452                         // all others. If you hit this panic, the list of acceptable errors
453                         // is probably just stale and you should add new messages here.
454                         match err.as_str() {
455                                 "Peer for first hop currently disconnected" => {},
456                                 _ if err.starts_with("Cannot send less than our next-HTLC minimum - ") => {},
457                                 _ if err.starts_with("Cannot send more than our next-HTLC maximum - ") => {},
458                                 _ => panic!("{}", err),
459                         }
460                         assert!(sendable_bounds_violated);
461                 },
462                 APIError::MonitorUpdateInProgress => {
463                         // We can (obviously) temp-fail a monitor update
464                 },
465                 APIError::IncompatibleShutdownScript { .. } => {
466                         panic!("Cannot send an incompatible shutdown script")
467                 },
468         }
469 }
470 #[inline]
471 fn check_payment_err(send_err: PaymentSendFailure, sendable_bounds_violated: bool) {
472         match send_err {
473                 PaymentSendFailure::ParameterError(api_err) => {
474                         check_api_err(api_err, sendable_bounds_violated)
475                 },
476                 PaymentSendFailure::PathParameterError(per_path_results) => {
477                         for res in per_path_results {
478                                 if let Err(api_err) = res {
479                                         check_api_err(api_err, sendable_bounds_violated);
480                                 }
481                         }
482                 },
483                 PaymentSendFailure::AllFailedResendSafe(per_path_results) => {
484                         for api_err in per_path_results {
485                                 check_api_err(api_err, sendable_bounds_violated);
486                         }
487                 },
488                 PaymentSendFailure::PartialFailure { results, .. } => {
489                         for res in results {
490                                 if let Err(api_err) = res {
491                                         check_api_err(api_err, sendable_bounds_violated);
492                                 }
493                         }
494                 },
495                 PaymentSendFailure::DuplicatePayment => panic!(),
496         }
497 }
498
499 type ChanMan<'a> = ChannelManager<
500         Arc<TestChainMonitor>,
501         Arc<TestBroadcaster>,
502         Arc<KeyProvider>,
503         Arc<KeyProvider>,
504         Arc<KeyProvider>,
505         Arc<FuzzEstimator>,
506         &'a FuzzRouter,
507         Arc<dyn Logger>,
508 >;
509
510 #[inline]
511 fn get_payment_secret_hash(
512         dest: &ChanMan, payment_id: &mut u8,
513 ) -> Option<(PaymentSecret, PaymentHash)> {
514         let mut payment_hash;
515         for _ in 0..256 {
516                 payment_hash = PaymentHash(Sha256::hash(&[*payment_id; 1]).to_byte_array());
517                 if let Ok(payment_secret) =
518                         dest.create_inbound_payment_for_hash(payment_hash, None, 3600, None)
519                 {
520                         return Some((payment_secret, payment_hash));
521                 }
522                 *payment_id = payment_id.wrapping_add(1);
523         }
524         None
525 }
526
527 #[inline]
528 fn send_noret(
529         source: &ChanMan, dest: &ChanMan, dest_chan_id: u64, amt: u64, payment_id: &mut u8,
530         payment_idx: &mut u64,
531 ) {
532         send_payment(source, dest, dest_chan_id, amt, payment_id, payment_idx);
533 }
534
535 #[inline]
536 fn send_payment(
537         source: &ChanMan, dest: &ChanMan, dest_chan_id: u64, amt: u64, payment_id: &mut u8,
538         payment_idx: &mut u64,
539 ) -> bool {
540         let (payment_secret, payment_hash) =
541                 if let Some((secret, hash)) = get_payment_secret_hash(dest, payment_id) {
542                         (secret, hash)
543                 } else {
544                         return true;
545                 };
546         let mut payment_id = [0; 32];
547         payment_id[0..8].copy_from_slice(&payment_idx.to_ne_bytes());
548         *payment_idx += 1;
549         let (min_value_sendable, max_value_sendable) = source
550                 .list_usable_channels()
551                 .iter()
552                 .find(|chan| chan.short_channel_id == Some(dest_chan_id))
553                 .map(|chan| (chan.next_outbound_htlc_minimum_msat, chan.next_outbound_htlc_limit_msat))
554                 .unwrap_or((0, 0));
555         if let Err(err) = source.send_payment_with_route(
556                 &Route {
557                         paths: vec![Path {
558                                 hops: vec![RouteHop {
559                                         pubkey: dest.get_our_node_id(),
560                                         node_features: dest.node_features(),
561                                         short_channel_id: dest_chan_id,
562                                         channel_features: dest.channel_features(),
563                                         fee_msat: amt,
564                                         cltv_expiry_delta: 200,
565                                         maybe_announced_channel: true,
566                                 }],
567                                 blinded_tail: None,
568                         }],
569                         route_params: None,
570                 },
571                 payment_hash,
572                 RecipientOnionFields::secret_only(payment_secret),
573                 PaymentId(payment_id),
574         ) {
575                 check_payment_err(err, amt > max_value_sendable || amt < min_value_sendable);
576                 false
577         } else {
578                 // Note that while the max is a strict upper-bound, we can occasionally send substantially
579                 // below the minimum, with some gap which is unusable immediately below the minimum. Thus,
580                 // we don't check against min_value_sendable here.
581                 assert!(amt <= max_value_sendable);
582                 true
583         }
584 }
585
586 #[inline]
587 fn send_hop_noret(
588         source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, dest: &ChanMan, dest_chan_id: u64,
589         amt: u64, payment_id: &mut u8, payment_idx: &mut u64,
590 ) {
591         send_hop_payment(
592                 source,
593                 middle,
594                 middle_chan_id,
595                 dest,
596                 dest_chan_id,
597                 amt,
598                 payment_id,
599                 payment_idx,
600         );
601 }
602
603 #[inline]
604 fn send_hop_payment(
605         source: &ChanMan, middle: &ChanMan, middle_chan_id: u64, dest: &ChanMan, dest_chan_id: u64,
606         amt: u64, payment_id: &mut u8, payment_idx: &mut u64,
607 ) -> bool {
608         let (payment_secret, payment_hash) =
609                 if let Some((secret, hash)) = get_payment_secret_hash(dest, payment_id) {
610                         (secret, hash)
611                 } else {
612                         return true;
613                 };
614         let mut payment_id = [0; 32];
615         payment_id[0..8].copy_from_slice(&payment_idx.to_ne_bytes());
616         *payment_idx += 1;
617         let (min_value_sendable, max_value_sendable) = source
618                 .list_usable_channels()
619                 .iter()
620                 .find(|chan| chan.short_channel_id == Some(middle_chan_id))
621                 .map(|chan| (chan.next_outbound_htlc_minimum_msat, chan.next_outbound_htlc_limit_msat))
622                 .unwrap_or((0, 0));
623         let first_hop_fee = 50_000;
624         if let Err(err) = source.send_payment_with_route(
625                 &Route {
626                         paths: vec![Path {
627                                 hops: vec![
628                                         RouteHop {
629                                                 pubkey: middle.get_our_node_id(),
630                                                 node_features: middle.node_features(),
631                                                 short_channel_id: middle_chan_id,
632                                                 channel_features: middle.channel_features(),
633                                                 fee_msat: first_hop_fee,
634                                                 cltv_expiry_delta: 100,
635                                                 maybe_announced_channel: true,
636                                         },
637                                         RouteHop {
638                                                 pubkey: dest.get_our_node_id(),
639                                                 node_features: dest.node_features(),
640                                                 short_channel_id: dest_chan_id,
641                                                 channel_features: dest.channel_features(),
642                                                 fee_msat: amt,
643                                                 cltv_expiry_delta: 200,
644                                                 maybe_announced_channel: true,
645                                         },
646                                 ],
647                                 blinded_tail: None,
648                         }],
649                         route_params: None,
650                 },
651                 payment_hash,
652                 RecipientOnionFields::secret_only(payment_secret),
653                 PaymentId(payment_id),
654         ) {
655                 let sent_amt = amt + first_hop_fee;
656                 check_payment_err(err, sent_amt < min_value_sendable || sent_amt > max_value_sendable);
657                 false
658         } else {
659                 // Note that while the max is a strict upper-bound, we can occasionally send substantially
660                 // below the minimum, with some gap which is unusable immediately below the minimum. Thus,
661                 // we don't check against min_value_sendable here.
662                 assert!(amt + first_hop_fee <= max_value_sendable);
663                 true
664         }
665 }
666
667 #[inline]
668 pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
669         let out = SearchingOutput::new(underlying_out);
670         let broadcast = Arc::new(TestBroadcaster {});
671         let router = FuzzRouter {};
672
673         macro_rules! make_node {
674                 ($node_id: expr, $fee_estimator: expr) => {{
675                         let logger: Arc<dyn Logger> =
676                                 Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
677                         let node_secret = SecretKey::from_slice(&[
678                                 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
679                                 0, 0, 1, $node_id,
680                         ])
681                         .unwrap();
682                         let keys_manager = Arc::new(KeyProvider {
683                                 node_secret,
684                                 rand_bytes_id: atomic::AtomicU32::new(0),
685                                 enforcement_states: Mutex::new(new_hash_map()),
686                         });
687                         let monitor = Arc::new(TestChainMonitor::new(
688                                 broadcast.clone(),
689                                 logger.clone(),
690                                 $fee_estimator.clone(),
691                                 Arc::new(TestPersister {
692                                         update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
693                                 }),
694                                 Arc::clone(&keys_manager),
695                         ));
696
697                         let mut config = UserConfig::default();
698                         config.channel_config.forwarding_fee_proportional_millionths = 0;
699                         config.channel_handshake_config.announced_channel = true;
700                         if anchors {
701                                 config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
702                                 config.manually_accept_inbound_channels = true;
703                         }
704                         let network = Network::Bitcoin;
705                         let best_block_timestamp = genesis_block(network).header.time;
706                         let params = ChainParameters { network, best_block: BestBlock::from_network(network) };
707                         (
708                                 ChannelManager::new(
709                                         $fee_estimator.clone(),
710                                         monitor.clone(),
711                                         broadcast.clone(),
712                                         &router,
713                                         Arc::clone(&logger),
714                                         keys_manager.clone(),
715                                         keys_manager.clone(),
716                                         keys_manager.clone(),
717                                         config,
718                                         params,
719                                         best_block_timestamp,
720                                 ),
721                                 monitor,
722                                 keys_manager,
723                         )
724                 }};
725         }
726
727         macro_rules! reload_node {
728                 ($ser: expr, $node_id: expr, $old_monitors: expr, $keys_manager: expr, $fee_estimator: expr) => {{
729                         let keys_manager = Arc::clone(&$keys_manager);
730                         let logger: Arc<dyn Logger> =
731                                 Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
732                         let chain_monitor = Arc::new(TestChainMonitor::new(
733                                 broadcast.clone(),
734                                 logger.clone(),
735                                 $fee_estimator.clone(),
736                                 Arc::new(TestPersister {
737                                         update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
738                                 }),
739                                 Arc::clone(&$keys_manager),
740                         ));
741
742                         let mut config = UserConfig::default();
743                         config.channel_config.forwarding_fee_proportional_millionths = 0;
744                         config.channel_handshake_config.announced_channel = true;
745                         if anchors {
746                                 config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
747                                 config.manually_accept_inbound_channels = true;
748                         }
749
750                         let mut monitors = new_hash_map();
751                         let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
752                         for (outpoint, mut prev_state) in old_monitors.drain() {
753                                 monitors.insert(
754                                         outpoint,
755                                         <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
756                                                 &mut Cursor::new(&prev_state.persisted_monitor),
757                                                 (&*$keys_manager, &*$keys_manager),
758                                         )
759                                         .expect("Failed to read monitor")
760                                         .1,
761                                 );
762                                 // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
763                                 // considering them discarded. LDK should replay these for us as they're stored in
764                                 // the `ChannelManager`.
765                                 prev_state.pending_monitors.clear();
766                                 chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
767                         }
768                         let mut monitor_refs = new_hash_map();
769                         for (outpoint, monitor) in monitors.iter_mut() {
770                                 monitor_refs.insert(*outpoint, monitor);
771                         }
772
773                         let read_args = ChannelManagerReadArgs {
774                                 entropy_source: keys_manager.clone(),
775                                 node_signer: keys_manager.clone(),
776                                 signer_provider: keys_manager.clone(),
777                                 fee_estimator: $fee_estimator.clone(),
778                                 chain_monitor: chain_monitor.clone(),
779                                 tx_broadcaster: broadcast.clone(),
780                                 router: &router,
781                                 logger,
782                                 default_config: config,
783                                 channel_monitors: monitor_refs,
784                         };
785
786                         let res = (
787                                 <(BlockHash, ChanMan)>::read(&mut Cursor::new(&$ser.0), read_args)
788                                         .expect("Failed to read manager")
789                                         .1,
790                                 chain_monitor.clone(),
791                         );
792                         for (funding_txo, mon) in monitors.drain() {
793                                 assert_eq!(
794                                         chain_monitor.chain_monitor.watch_channel(funding_txo, mon),
795                                         Ok(ChannelMonitorUpdateStatus::Completed)
796                                 );
797                         }
798                         res
799                 }};
800         }
801
802         let mut channel_txn = Vec::new();
803         macro_rules! make_channel {
804                 ($source: expr, $dest: expr, $dest_keys_manager: expr, $chan_id: expr) => {{
805                         let init_dest = Init {
806                                 features: $dest.init_features(),
807                                 networks: None,
808                                 remote_network_address: None,
809                         };
810                         $source.peer_connected(&$dest.get_our_node_id(), &init_dest, true).unwrap();
811                         let init_src = Init {
812                                 features: $source.init_features(),
813                                 networks: None,
814                                 remote_network_address: None,
815                         };
816                         $dest.peer_connected(&$source.get_our_node_id(), &init_src, false).unwrap();
817
818                         $source.create_channel($dest.get_our_node_id(), 100_000, 42, 0, None, None).unwrap();
819                         let open_channel = {
820                                 let events = $source.get_and_clear_pending_msg_events();
821                                 assert_eq!(events.len(), 1);
822                                 if let events::MessageSendEvent::SendOpenChannel { ref msg, .. } = events[0] {
823                                         msg.clone()
824                                 } else {
825                                         panic!("Wrong event type");
826                                 }
827                         };
828
829                         $dest.handle_open_channel(&$source.get_our_node_id(), &open_channel);
830                         let accept_channel = {
831                                 if anchors {
832                                         let events = $dest.get_and_clear_pending_events();
833                                         assert_eq!(events.len(), 1);
834                                         if let events::Event::OpenChannelRequest {
835                                                 ref temporary_channel_id,
836                                                 ref counterparty_node_id,
837                                                 ..
838                                         } = events[0]
839                                         {
840                                                 let mut random_bytes = [0u8; 16];
841                                                 random_bytes
842                                                         .copy_from_slice(&$dest_keys_manager.get_secure_random_bytes()[..16]);
843                                                 let user_channel_id = u128::from_be_bytes(random_bytes);
844                                                 $dest
845                                                         .accept_inbound_channel(
846                                                                 temporary_channel_id,
847                                                                 counterparty_node_id,
848                                                                 user_channel_id,
849                                                         )
850                                                         .unwrap();
851                                         } else {
852                                                 panic!("Wrong event type");
853                                         }
854                                 }
855                                 let events = $dest.get_and_clear_pending_msg_events();
856                                 assert_eq!(events.len(), 1);
857                                 if let events::MessageSendEvent::SendAcceptChannel { ref msg, .. } = events[0] {
858                                         msg.clone()
859                                 } else {
860                                         panic!("Wrong event type");
861                                 }
862                         };
863
864                         $source.handle_accept_channel(&$dest.get_our_node_id(), &accept_channel);
865                         let funding_output;
866                         {
867                                 let events = $source.get_and_clear_pending_events();
868                                 assert_eq!(events.len(), 1);
869                                 if let events::Event::FundingGenerationReady {
870                                         ref temporary_channel_id,
871                                         ref channel_value_satoshis,
872                                         ref output_script,
873                                         ..
874                                 } = events[0]
875                                 {
876                                         let tx = Transaction {
877                                                 version: Version($chan_id),
878                                                 lock_time: LockTime::ZERO,
879                                                 input: Vec::new(),
880                                                 output: vec![TxOut {
881                                                         value: Amount::from_sat(*channel_value_satoshis),
882                                                         script_pubkey: output_script.clone(),
883                                                 }],
884                                         };
885                                         funding_output = OutPoint { txid: tx.txid(), index: 0 };
886                                         $source
887                                                 .funding_transaction_generated(
888                                                         &temporary_channel_id,
889                                                         &$dest.get_our_node_id(),
890                                                         tx.clone(),
891                                                 )
892                                                 .unwrap();
893                                         channel_txn.push(tx);
894                                 } else {
895                                         panic!("Wrong event type");
896                                 }
897                         }
898
899                         let funding_created = {
900                                 let events = $source.get_and_clear_pending_msg_events();
901                                 assert_eq!(events.len(), 1);
902                                 if let events::MessageSendEvent::SendFundingCreated { ref msg, .. } = events[0] {
903                                         msg.clone()
904                                 } else {
905                                         panic!("Wrong event type");
906                                 }
907                         };
908                         $dest.handle_funding_created(&$source.get_our_node_id(), &funding_created);
909
910                         let funding_signed = {
911                                 let events = $dest.get_and_clear_pending_msg_events();
912                                 assert_eq!(events.len(), 1);
913                                 if let events::MessageSendEvent::SendFundingSigned { ref msg, .. } = events[0] {
914                                         msg.clone()
915                                 } else {
916                                         panic!("Wrong event type");
917                                 }
918                         };
919                         let events = $dest.get_and_clear_pending_events();
920                         assert_eq!(events.len(), 1);
921                         if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
922                                 assert_eq!(counterparty_node_id, &$source.get_our_node_id());
923                         } else {
924                                 panic!("Wrong event type");
925                         }
926
927                         $source.handle_funding_signed(&$dest.get_our_node_id(), &funding_signed);
928                         let events = $source.get_and_clear_pending_events();
929                         assert_eq!(events.len(), 1);
930                         if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
931                                 assert_eq!(counterparty_node_id, &$dest.get_our_node_id());
932                         } else {
933                                 panic!("Wrong event type");
934                         }
935
936                         funding_output
937                 }};
938         }
939
940         macro_rules! confirm_txn {
941                 ($node: expr) => {{
942                         let chain_hash = genesis_block(Network::Bitcoin).block_hash();
943                         let mut header = create_dummy_header(chain_hash, 42);
944                         let txdata: Vec<_> =
945                                 channel_txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect();
946                         $node.transactions_confirmed(&header, &txdata, 1);
947                         for _ in 2..100 {
948                                 header = create_dummy_header(header.block_hash(), 42);
949                         }
950                         $node.best_block_updated(&header, 99);
951                 }};
952         }
953
954         macro_rules! lock_fundings {
955                 ($nodes: expr) => {{
956                         let mut node_events = Vec::new();
957                         for node in $nodes.iter() {
958                                 node_events.push(node.get_and_clear_pending_msg_events());
959                         }
960                         for (idx, node_event) in node_events.iter().enumerate() {
961                                 for event in node_event {
962                                         if let events::MessageSendEvent::SendChannelReady { ref node_id, ref msg } =
963                                                 event
964                                         {
965                                                 for node in $nodes.iter() {
966                                                         if node.get_our_node_id() == *node_id {
967                                                                 node.handle_channel_ready(&$nodes[idx].get_our_node_id(), msg);
968                                                         }
969                                                 }
970                                         } else {
971                                                 panic!("Wrong event type");
972                                         }
973                                 }
974                         }
975
976                         for node in $nodes.iter() {
977                                 let events = node.get_and_clear_pending_msg_events();
978                                 for event in events {
979                                         if let events::MessageSendEvent::SendAnnouncementSignatures { .. } = event {
980                                         } else {
981                                                 panic!("Wrong event type");
982                                         }
983                                 }
984                         }
985                 }};
986         }
987
988         let fee_est_a = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
989         let mut last_htlc_clear_fee_a = 253;
990         let fee_est_b = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
991         let mut last_htlc_clear_fee_b = 253;
992         let fee_est_c = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) });
993         let mut last_htlc_clear_fee_c = 253;
994
995         // 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
996         // forwarding.
997         let (node_a, mut monitor_a, keys_manager_a) = make_node!(0, fee_est_a);
998         let (node_b, mut monitor_b, keys_manager_b) = make_node!(1, fee_est_b);
999         let (node_c, mut monitor_c, keys_manager_c) = make_node!(2, fee_est_c);
1000
1001         let mut nodes = [node_a, node_b, node_c];
1002
1003         let chan_1_funding = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
1004         let chan_2_funding = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);
1005
1006         for node in nodes.iter() {
1007                 confirm_txn!(node);
1008         }
1009
1010         lock_fundings!(nodes);
1011
1012         let chan_a = nodes[0].list_usable_channels()[0].short_channel_id.unwrap();
1013         let chan_b = nodes[2].list_usable_channels()[0].short_channel_id.unwrap();
1014
1015         let mut p_id: u8 = 0;
1016         let mut p_idx: u64 = 0;
1017
1018         let mut chan_a_disconnected = false;
1019         let mut chan_b_disconnected = false;
1020         let mut ab_events = Vec::new();
1021         let mut ba_events = Vec::new();
1022         let mut bc_events = Vec::new();
1023         let mut cb_events = Vec::new();
1024
1025         let mut node_a_ser = VecWriter(Vec::new());
1026         nodes[0].write(&mut node_a_ser).unwrap();
1027         let mut node_b_ser = VecWriter(Vec::new());
1028         nodes[1].write(&mut node_b_ser).unwrap();
1029         let mut node_c_ser = VecWriter(Vec::new());
1030         nodes[2].write(&mut node_c_ser).unwrap();
1031
1032         macro_rules! test_return {
1033                 () => {{
1034                         assert_eq!(nodes[0].list_channels().len(), 1);
1035                         assert_eq!(nodes[1].list_channels().len(), 2);
1036                         assert_eq!(nodes[2].list_channels().len(), 1);
1037                         return;
1038                 }};
1039         }
1040
1041         let mut read_pos = 0;
1042         macro_rules! get_slice {
1043                 ($len: expr) => {{
1044                         let slice_len = $len as usize;
1045                         if data.len() < read_pos + slice_len {
1046                                 test_return!();
1047                         }
1048                         read_pos += slice_len;
1049                         &data[read_pos - slice_len..read_pos]
1050                 }};
1051         }
1052
1053         loop {
1054                 // Push any events from Node B onto ba_events and bc_events
1055                 macro_rules! push_excess_b_events {
1056                         ($excess_events: expr, $expect_drop_node: expr) => { {
1057                                 let a_id = nodes[0].get_our_node_id();
1058                                 let expect_drop_node: Option<usize> = $expect_drop_node;
1059                                 let expect_drop_id = if let Some(id) = expect_drop_node { Some(nodes[id].get_our_node_id()) } else { None };
1060                                 for event in $excess_events {
1061                                         let push_a = match event {
1062                                                 events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => {
1063                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1064                                                         *node_id == a_id
1065                                                 },
1066                                                 events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => {
1067                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1068                                                         *node_id == a_id
1069                                                 },
1070                                                 events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => {
1071                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1072                                                         *node_id == a_id
1073                                                 },
1074                                                 events::MessageSendEvent::SendChannelReady { .. } => continue,
1075                                                 events::MessageSendEvent::SendAnnouncementSignatures { .. } => continue,
1076                                                 events::MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
1077                                                         assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1078                                                         if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
1079                                                         *node_id == a_id
1080                                                 },
1081                                                 _ => panic!("Unhandled message event {:?}", event),
1082                                         };
1083                                         if push_a { ba_events.push(event); } else { bc_events.push(event); }
1084                                 }
1085                         } }
1086                 }
1087
1088                 // While delivering messages, we select across three possible message selection processes
1089                 // to ensure we get as much coverage as possible. See the individual enum variants for more
1090                 // details.
1091                 #[derive(PartialEq)]
1092                 enum ProcessMessages {
1093                         /// Deliver all available messages, including fetching any new messages from
1094                         /// `get_and_clear_pending_msg_events()` (which may have side effects).
1095                         AllMessages,
1096                         /// Call `get_and_clear_pending_msg_events()` first, and then deliver up to one
1097                         /// message (which may already be queued).
1098                         OneMessage,
1099                         /// Deliver up to one already-queued message. This avoids any potential side-effects
1100                         /// of `get_and_clear_pending_msg_events()` (eg freeing the HTLC holding cell), which
1101                         /// provides potentially more coverage.
1102                         OnePendingMessage,
1103                 }
1104
1105                 macro_rules! process_msg_events {
1106                         ($node: expr, $corrupt_forward: expr, $limit_events: expr) => { {
1107                                 let mut events = if $node == 1 {
1108                                         let mut new_events = Vec::new();
1109                                         mem::swap(&mut new_events, &mut ba_events);
1110                                         new_events.extend_from_slice(&bc_events[..]);
1111                                         bc_events.clear();
1112                                         new_events
1113                                 } else if $node == 0 {
1114                                         let mut new_events = Vec::new();
1115                                         mem::swap(&mut new_events, &mut ab_events);
1116                                         new_events
1117                                 } else {
1118                                         let mut new_events = Vec::new();
1119                                         mem::swap(&mut new_events, &mut cb_events);
1120                                         new_events
1121                                 };
1122                                 let mut new_events = Vec::new();
1123                                 if $limit_events != ProcessMessages::OnePendingMessage {
1124                                         new_events = nodes[$node].get_and_clear_pending_msg_events();
1125                                 }
1126                                 let mut had_events = false;
1127                                 let mut events_iter = events.drain(..).chain(new_events.drain(..));
1128                                 let mut extra_ev = None;
1129                                 for event in &mut events_iter {
1130                                         had_events = true;
1131                                         match event {
1132                                                 events::MessageSendEvent::UpdateHTLCs { node_id, updates: CommitmentUpdate { update_add_htlcs, update_fail_htlcs, update_fulfill_htlcs, update_fail_malformed_htlcs, update_fee, commitment_signed } } => {
1133                                                         for (idx, dest) in nodes.iter().enumerate() {
1134                                                                 if dest.get_our_node_id() == node_id {
1135                                                                         for update_add in update_add_htlcs.iter() {
1136                                                                                 out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes());
1137                                                                                 if !$corrupt_forward {
1138                                                                                         dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), update_add);
1139                                                                                 } else {
1140                                                                                         // Corrupt the update_add_htlc message so that its HMAC
1141                                                                                         // check will fail and we generate a
1142                                                                                         // update_fail_malformed_htlc instead of an
1143                                                                                         // update_fail_htlc as we do when we reject a payment.
1144                                                                                         let mut msg_ser = update_add.encode();
1145                                                                                         msg_ser[1000] ^= 0xff;
1146                                                                                         let new_msg = UpdateAddHTLC::read(&mut Cursor::new(&msg_ser)).unwrap();
1147                                                                                         dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), &new_msg);
1148                                                                                 }
1149                                                                         }
1150                                                                         for update_fulfill in update_fulfill_htlcs.iter() {
1151                                                                                 out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes());
1152                                                                                 dest.handle_update_fulfill_htlc(&nodes[$node].get_our_node_id(), update_fulfill);
1153                                                                         }
1154                                                                         for update_fail in update_fail_htlcs.iter() {
1155                                                                                 out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes());
1156                                                                                 dest.handle_update_fail_htlc(&nodes[$node].get_our_node_id(), update_fail);
1157                                                                         }
1158                                                                         for update_fail_malformed in update_fail_malformed_htlcs.iter() {
1159                                                                                 out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes());
1160                                                                                 dest.handle_update_fail_malformed_htlc(&nodes[$node].get_our_node_id(), update_fail_malformed);
1161                                                                         }
1162                                                                         if let Some(msg) = update_fee {
1163                                                                                 out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes());
1164                                                                                 dest.handle_update_fee(&nodes[$node].get_our_node_id(), &msg);
1165                                                                         }
1166                                                                         let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() ||
1167                                                                                 !update_fail_htlcs.is_empty() || !update_fail_malformed_htlcs.is_empty();
1168                                                                         if $limit_events != ProcessMessages::AllMessages && processed_change {
1169                                                                                 // If we only want to process some messages, don't deliver the CS until later.
1170                                                                                 extra_ev = Some(events::MessageSendEvent::UpdateHTLCs { node_id, updates: CommitmentUpdate {
1171                                                                                         update_add_htlcs: Vec::new(),
1172                                                                                         update_fail_htlcs: Vec::new(),
1173                                                                                         update_fulfill_htlcs: Vec::new(),
1174                                                                                         update_fail_malformed_htlcs: Vec::new(),
1175                                                                                         update_fee: None,
1176                                                                                         commitment_signed
1177                                                                                 } });
1178                                                                                 break;
1179                                                                         }
1180                                                                         out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes());
1181                                                                         dest.handle_commitment_signed(&nodes[$node].get_our_node_id(), &commitment_signed);
1182                                                                         break;
1183                                                                 }
1184                                                         }
1185                                                 },
1186                                                 events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
1187                                                         for (idx, dest) in nodes.iter().enumerate() {
1188                                                                 if dest.get_our_node_id() == *node_id {
1189                                                                         out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes());
1190                                                                         dest.handle_revoke_and_ack(&nodes[$node].get_our_node_id(), msg);
1191                                                                 }
1192                                                         }
1193                                                 },
1194                                                 events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
1195                                                         for (idx, dest) in nodes.iter().enumerate() {
1196                                                                 if dest.get_our_node_id() == *node_id {
1197                                                                         out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes());
1198                                                                         dest.handle_channel_reestablish(&nodes[$node].get_our_node_id(), msg);
1199                                                                 }
1200                                                         }
1201                                                 },
1202                                                 events::MessageSendEvent::SendChannelReady { .. } => {
1203                                                         // Can be generated as a reestablish response
1204                                                 },
1205                                                 events::MessageSendEvent::SendAnnouncementSignatures { .. } => {
1206                                                         // Can be generated as a reestablish response
1207                                                 },
1208                                                 events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1209                                                         // When we reconnect we will resend a channel_update to make sure our
1210                                                         // counterparty has the latest parameters for receiving payments
1211                                                         // through us. We do, however, check that the message does not include
1212                                                         // the "disabled" bit, as we should never ever have a channel which is
1213                                                         // disabled when we send such an update (or it may indicate channel
1214                                                         // force-close which we should detect as an error).
1215                                                         assert_eq!(msg.contents.flags & 2, 0);
1216                                                 },
1217                                                 _ => if out.may_fail.load(atomic::Ordering::Acquire) {
1218                                                         return;
1219                                                 } else {
1220                                                         panic!("Unhandled message event {:?}", event)
1221                                                 },
1222                                         }
1223                                         if $limit_events != ProcessMessages::AllMessages {
1224                                                 break;
1225                                         }
1226                                 }
1227                                 if $node == 1 {
1228                                         push_excess_b_events!(extra_ev.into_iter().chain(events_iter), None);
1229                                 } else if $node == 0 {
1230                                         if let Some(ev) = extra_ev { ab_events.push(ev); }
1231                                         for event in events_iter { ab_events.push(event); }
1232                                 } else {
1233                                         if let Some(ev) = extra_ev { cb_events.push(ev); }
1234                                         for event in events_iter { cb_events.push(event); }
1235                                 }
1236                                 had_events
1237                         } }
1238                 }
1239
1240                 macro_rules! process_msg_noret {
1241                         ($node: expr, $corrupt_forward: expr, $limit_events: expr) => {{
1242                                 process_msg_events!($node, $corrupt_forward, $limit_events);
1243                         }};
1244                 }
1245
1246                 macro_rules! drain_msg_events_on_disconnect {
1247                         ($counterparty_id: expr) => {{
1248                                 if $counterparty_id == 0 {
1249                                         for event in nodes[0].get_and_clear_pending_msg_events() {
1250                                                 match event {
1251                                                         events::MessageSendEvent::UpdateHTLCs { .. } => {},
1252                                                         events::MessageSendEvent::SendRevokeAndACK { .. } => {},
1253                                                         events::MessageSendEvent::SendChannelReestablish { .. } => {},
1254                                                         events::MessageSendEvent::SendChannelReady { .. } => {},
1255                                                         events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
1256                                                         events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1257                                                                 assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1258                                                         },
1259                                                         _ => {
1260                                                                 if out.may_fail.load(atomic::Ordering::Acquire) {
1261                                                                         return;
1262                                                                 } else {
1263                                                                         panic!("Unhandled message event")
1264                                                                 }
1265                                                         },
1266                                                 }
1267                                         }
1268                                         push_excess_b_events!(
1269                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1270                                                 Some(0)
1271                                         );
1272                                         ab_events.clear();
1273                                         ba_events.clear();
1274                                 } else {
1275                                         for event in nodes[2].get_and_clear_pending_msg_events() {
1276                                                 match event {
1277                                                         events::MessageSendEvent::UpdateHTLCs { .. } => {},
1278                                                         events::MessageSendEvent::SendRevokeAndACK { .. } => {},
1279                                                         events::MessageSendEvent::SendChannelReestablish { .. } => {},
1280                                                         events::MessageSendEvent::SendChannelReady { .. } => {},
1281                                                         events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
1282                                                         events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
1283                                                                 assert_eq!(msg.contents.flags & 2, 0); // The disable bit must never be set!
1284                                                         },
1285                                                         _ => {
1286                                                                 if out.may_fail.load(atomic::Ordering::Acquire) {
1287                                                                         return;
1288                                                                 } else {
1289                                                                         panic!("Unhandled message event")
1290                                                                 }
1291                                                         },
1292                                                 }
1293                                         }
1294                                         push_excess_b_events!(
1295                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1296                                                 Some(2)
1297                                         );
1298                                         bc_events.clear();
1299                                         cb_events.clear();
1300                                 }
1301                         }};
1302                 }
1303
1304                 macro_rules! process_events {
1305                         ($node: expr, $fail: expr) => {{
1306                                 // In case we get 256 payments we may have a hash collision, resulting in the
1307                                 // second claim/fail call not finding the duplicate-hash HTLC, so we have to
1308                                 // deduplicate the calls here.
1309                                 let mut claim_set = new_hash_map();
1310                                 let mut events = nodes[$node].get_and_clear_pending_events();
1311                                 // Sort events so that PendingHTLCsForwardable get processed last. This avoids a
1312                                 // case where we first process a PendingHTLCsForwardable, then claim/fail on a
1313                                 // PaymentClaimable, claiming/failing two HTLCs, but leaving a just-generated
1314                                 // PaymentClaimable event for the second HTLC in our pending_events (and breaking
1315                                 // our claim_set deduplication).
1316                                 events.sort_by(|a, b| {
1317                                         if let events::Event::PaymentClaimable { .. } = a {
1318                                                 if let events::Event::PendingHTLCsForwardable { .. } = b {
1319                                                         Ordering::Less
1320                                                 } else {
1321                                                         Ordering::Equal
1322                                                 }
1323                                         } else if let events::Event::PendingHTLCsForwardable { .. } = a {
1324                                                 if let events::Event::PaymentClaimable { .. } = b {
1325                                                         Ordering::Greater
1326                                                 } else {
1327                                                         Ordering::Equal
1328                                                 }
1329                                         } else {
1330                                                 Ordering::Equal
1331                                         }
1332                                 });
1333                                 let had_events = !events.is_empty();
1334                                 for event in events.drain(..) {
1335                                         match event {
1336                                                 events::Event::PaymentClaimable { payment_hash, .. } => {
1337                                                         if claim_set.insert(payment_hash.0, ()).is_none() {
1338                                                                 if $fail {
1339                                                                         nodes[$node].fail_htlc_backwards(&payment_hash);
1340                                                                 } else {
1341                                                                         nodes[$node].claim_funds(PaymentPreimage(payment_hash.0));
1342                                                                 }
1343                                                         }
1344                                                 },
1345                                                 events::Event::PaymentSent { .. } => {},
1346                                                 events::Event::PaymentClaimed { .. } => {},
1347                                                 events::Event::PaymentPathSuccessful { .. } => {},
1348                                                 events::Event::PaymentPathFailed { .. } => {},
1349                                                 events::Event::PaymentFailed { .. } => {},
1350                                                 events::Event::ProbeSuccessful { .. }
1351                                                 | events::Event::ProbeFailed { .. } => {
1352                                                         // Even though we don't explicitly send probes, because probes are
1353                                                         // detected based on hashing the payment hash+preimage, its rather
1354                                                         // trivial for the fuzzer to build payments that accidentally end up
1355                                                         // looking like probes.
1356                                                 },
1357                                                 events::Event::PaymentForwarded { .. } if $node == 1 => {},
1358                                                 events::Event::ChannelReady { .. } => {},
1359                                                 events::Event::PendingHTLCsForwardable { .. } => {
1360                                                         nodes[$node].process_pending_htlc_forwards();
1361                                                 },
1362                                                 events::Event::HTLCHandlingFailed { .. } => {},
1363                                                 _ => {
1364                                                         if out.may_fail.load(atomic::Ordering::Acquire) {
1365                                                                 return;
1366                                                         } else {
1367                                                                 panic!("Unhandled event")
1368                                                         }
1369                                                 },
1370                                         }
1371                                 }
1372                                 had_events
1373                         }};
1374                 }
1375
1376                 macro_rules! process_ev_noret {
1377                         ($node: expr, $fail: expr) => {{
1378                                 process_events!($node, $fail);
1379                         }};
1380                 }
1381
1382                 let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
1383                 let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
1384                 let complete_monitor_update =
1385                         |monitor: &Arc<TestChainMonitor>,
1386                          chan_funding,
1387                          compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
1388                                 if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1389                                         assert!(
1390                                                 state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1391                                                 "updates should be sorted by id"
1392                                         );
1393                                         if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
1394                                                 monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1395                                                 if id > state.persisted_monitor_id {
1396                                                         state.persisted_monitor_id = id;
1397                                                         state.persisted_monitor = data;
1398                                                 }
1399                                         }
1400                                 }
1401                         };
1402
1403                 let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
1404                         if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1405                                 assert!(
1406                                         state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
1407                                         "updates should be sorted by id"
1408                                 );
1409                                 for (id, data) in state.pending_monitors.drain(..) {
1410                                         monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1411                                         if id > state.persisted_monitor_id {
1412                                                 state.persisted_monitor_id = id;
1413                                                 state.persisted_monitor = data;
1414                                         }
1415                                 }
1416                         }
1417                 };
1418
1419                 let v = get_slice!(1)[0];
1420                 out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
1421                 match v {
1422                         // In general, we keep related message groups close together in binary form, allowing
1423                         // bit-twiddling mutations to have similar effects. This is probably overkill, but no
1424                         // harm in doing so.
1425                         0x00 => {
1426                                 *monitor_a.persister.update_ret.lock().unwrap() =
1427                                         ChannelMonitorUpdateStatus::InProgress
1428                         },
1429                         0x01 => {
1430                                 *monitor_b.persister.update_ret.lock().unwrap() =
1431                                         ChannelMonitorUpdateStatus::InProgress
1432                         },
1433                         0x02 => {
1434                                 *monitor_c.persister.update_ret.lock().unwrap() =
1435                                         ChannelMonitorUpdateStatus::InProgress
1436                         },
1437                         0x04 => {
1438                                 *monitor_a.persister.update_ret.lock().unwrap() =
1439                                         ChannelMonitorUpdateStatus::Completed
1440                         },
1441                         0x05 => {
1442                                 *monitor_b.persister.update_ret.lock().unwrap() =
1443                                         ChannelMonitorUpdateStatus::Completed
1444                         },
1445                         0x06 => {
1446                                 *monitor_c.persister.update_ret.lock().unwrap() =
1447                                         ChannelMonitorUpdateStatus::Completed
1448                         },
1449
1450                         0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
1451                         0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
1452                         0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
1453                         0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
1454
1455                         0x0c => {
1456                                 if !chan_a_disconnected {
1457                                         nodes[0].peer_disconnected(&nodes[1].get_our_node_id());
1458                                         nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
1459                                         chan_a_disconnected = true;
1460                                         drain_msg_events_on_disconnect!(0);
1461                                 }
1462                         },
1463                         0x0d => {
1464                                 if !chan_b_disconnected {
1465                                         nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
1466                                         nodes[2].peer_disconnected(&nodes[1].get_our_node_id());
1467                                         chan_b_disconnected = true;
1468                                         drain_msg_events_on_disconnect!(2);
1469                                 }
1470                         },
1471                         0x0e => {
1472                                 if chan_a_disconnected {
1473                                         let init_1 = Init {
1474                                                 features: nodes[1].init_features(),
1475                                                 networks: None,
1476                                                 remote_network_address: None,
1477                                         };
1478                                         nodes[0].peer_connected(&nodes[1].get_our_node_id(), &init_1, true).unwrap();
1479                                         let init_0 = Init {
1480                                                 features: nodes[0].init_features(),
1481                                                 networks: None,
1482                                                 remote_network_address: None,
1483                                         };
1484                                         nodes[1].peer_connected(&nodes[0].get_our_node_id(), &init_0, false).unwrap();
1485                                         chan_a_disconnected = false;
1486                                 }
1487                         },
1488                         0x0f => {
1489                                 if chan_b_disconnected {
1490                                         let init_2 = Init {
1491                                                 features: nodes[2].init_features(),
1492                                                 networks: None,
1493                                                 remote_network_address: None,
1494                                         };
1495                                         nodes[1].peer_connected(&nodes[2].get_our_node_id(), &init_2, true).unwrap();
1496                                         let init_1 = Init {
1497                                                 features: nodes[1].init_features(),
1498                                                 networks: None,
1499                                                 remote_network_address: None,
1500                                         };
1501                                         nodes[2].peer_connected(&nodes[1].get_our_node_id(), &init_1, false).unwrap();
1502                                         chan_b_disconnected = false;
1503                                 }
1504                         },
1505
1506                         0x10 => process_msg_noret!(0, true, ProcessMessages::AllMessages),
1507                         0x11 => process_msg_noret!(0, false, ProcessMessages::AllMessages),
1508                         0x12 => process_msg_noret!(0, true, ProcessMessages::OneMessage),
1509                         0x13 => process_msg_noret!(0, false, ProcessMessages::OneMessage),
1510                         0x14 => process_msg_noret!(0, true, ProcessMessages::OnePendingMessage),
1511                         0x15 => process_msg_noret!(0, false, ProcessMessages::OnePendingMessage),
1512
1513                         0x16 => process_ev_noret!(0, true),
1514                         0x17 => process_ev_noret!(0, false),
1515
1516                         0x18 => process_msg_noret!(1, true, ProcessMessages::AllMessages),
1517                         0x19 => process_msg_noret!(1, false, ProcessMessages::AllMessages),
1518                         0x1a => process_msg_noret!(1, true, ProcessMessages::OneMessage),
1519                         0x1b => process_msg_noret!(1, false, ProcessMessages::OneMessage),
1520                         0x1c => process_msg_noret!(1, true, ProcessMessages::OnePendingMessage),
1521                         0x1d => process_msg_noret!(1, false, ProcessMessages::OnePendingMessage),
1522
1523                         0x1e => process_ev_noret!(1, true),
1524                         0x1f => process_ev_noret!(1, false),
1525
1526                         0x20 => process_msg_noret!(2, true, ProcessMessages::AllMessages),
1527                         0x21 => process_msg_noret!(2, false, ProcessMessages::AllMessages),
1528                         0x22 => process_msg_noret!(2, true, ProcessMessages::OneMessage),
1529                         0x23 => process_msg_noret!(2, false, ProcessMessages::OneMessage),
1530                         0x24 => process_msg_noret!(2, true, ProcessMessages::OnePendingMessage),
1531                         0x25 => process_msg_noret!(2, false, ProcessMessages::OnePendingMessage),
1532
1533                         0x26 => process_ev_noret!(2, true),
1534                         0x27 => process_ev_noret!(2, false),
1535
1536                         0x2c => {
1537                                 if !chan_a_disconnected {
1538                                         nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
1539                                         chan_a_disconnected = true;
1540                                         push_excess_b_events!(
1541                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1542                                                 Some(0)
1543                                         );
1544                                         ab_events.clear();
1545                                         ba_events.clear();
1546                                 }
1547                                 let (new_node_a, new_monitor_a) =
1548                                         reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a);
1549                                 nodes[0] = new_node_a;
1550                                 monitor_a = new_monitor_a;
1551                         },
1552                         0x2d => {
1553                                 if !chan_a_disconnected {
1554                                         nodes[0].peer_disconnected(&nodes[1].get_our_node_id());
1555                                         chan_a_disconnected = true;
1556                                         nodes[0].get_and_clear_pending_msg_events();
1557                                         ab_events.clear();
1558                                         ba_events.clear();
1559                                 }
1560                                 if !chan_b_disconnected {
1561                                         nodes[2].peer_disconnected(&nodes[1].get_our_node_id());
1562                                         chan_b_disconnected = true;
1563                                         nodes[2].get_and_clear_pending_msg_events();
1564                                         bc_events.clear();
1565                                         cb_events.clear();
1566                                 }
1567                                 let (new_node_b, new_monitor_b) =
1568                                         reload_node!(node_b_ser, 1, monitor_b, keys_manager_b, fee_est_b);
1569                                 nodes[1] = new_node_b;
1570                                 monitor_b = new_monitor_b;
1571                         },
1572                         0x2e => {
1573                                 if !chan_b_disconnected {
1574                                         nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
1575                                         chan_b_disconnected = true;
1576                                         push_excess_b_events!(
1577                                                 nodes[1].get_and_clear_pending_msg_events().drain(..),
1578                                                 Some(2)
1579                                         );
1580                                         bc_events.clear();
1581                                         cb_events.clear();
1582                                 }
1583                                 let (new_node_c, new_monitor_c) =
1584                                         reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c);
1585                                 nodes[2] = new_node_c;
1586                                 monitor_c = new_monitor_c;
1587                         },
1588
1589                         // 1/10th the channel size:
1590                         0x30 => send_noret(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx),
1591                         0x31 => send_noret(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx),
1592                         0x32 => send_noret(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx),
1593                         0x33 => send_noret(&nodes[2], &nodes[1], chan_b, 10_000_000, &mut p_id, &mut p_idx),
1594                         0x34 => send_hop_noret(
1595                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx,
1596                         ),
1597                         0x35 => send_hop_noret(
1598                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx,
1599                         ),
1600
1601                         0x38 => send_noret(&nodes[0], &nodes[1], chan_a, 1_000_000, &mut p_id, &mut p_idx),
1602                         0x39 => send_noret(&nodes[1], &nodes[0], chan_a, 1_000_000, &mut p_id, &mut p_idx),
1603                         0x3a => send_noret(&nodes[1], &nodes[2], chan_b, 1_000_000, &mut p_id, &mut p_idx),
1604                         0x3b => send_noret(&nodes[2], &nodes[1], chan_b, 1_000_000, &mut p_id, &mut p_idx),
1605                         0x3c => send_hop_noret(
1606                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1_000_000, &mut p_id, &mut p_idx,
1607                         ),
1608                         0x3d => send_hop_noret(
1609                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1_000_000, &mut p_id, &mut p_idx,
1610                         ),
1611
1612                         0x40 => send_noret(&nodes[0], &nodes[1], chan_a, 100_000, &mut p_id, &mut p_idx),
1613                         0x41 => send_noret(&nodes[1], &nodes[0], chan_a, 100_000, &mut p_id, &mut p_idx),
1614                         0x42 => send_noret(&nodes[1], &nodes[2], chan_b, 100_000, &mut p_id, &mut p_idx),
1615                         0x43 => send_noret(&nodes[2], &nodes[1], chan_b, 100_000, &mut p_id, &mut p_idx),
1616                         0x44 => send_hop_noret(
1617                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 100_000, &mut p_id, &mut p_idx,
1618                         ),
1619                         0x45 => send_hop_noret(
1620                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 100_000, &mut p_id, &mut p_idx,
1621                         ),
1622
1623                         0x48 => send_noret(&nodes[0], &nodes[1], chan_a, 10_000, &mut p_id, &mut p_idx),
1624                         0x49 => send_noret(&nodes[1], &nodes[0], chan_a, 10_000, &mut p_id, &mut p_idx),
1625                         0x4a => send_noret(&nodes[1], &nodes[2], chan_b, 10_000, &mut p_id, &mut p_idx),
1626                         0x4b => send_noret(&nodes[2], &nodes[1], chan_b, 10_000, &mut p_id, &mut p_idx),
1627                         0x4c => send_hop_noret(
1628                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10_000, &mut p_id, &mut p_idx,
1629                         ),
1630                         0x4d => send_hop_noret(
1631                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10_000, &mut p_id, &mut p_idx,
1632                         ),
1633
1634                         0x50 => send_noret(&nodes[0], &nodes[1], chan_a, 1_000, &mut p_id, &mut p_idx),
1635                         0x51 => send_noret(&nodes[1], &nodes[0], chan_a, 1_000, &mut p_id, &mut p_idx),
1636                         0x52 => send_noret(&nodes[1], &nodes[2], chan_b, 1_000, &mut p_id, &mut p_idx),
1637                         0x53 => send_noret(&nodes[2], &nodes[1], chan_b, 1_000, &mut p_id, &mut p_idx),
1638                         0x54 => send_hop_noret(
1639                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1_000, &mut p_id, &mut p_idx,
1640                         ),
1641                         0x55 => send_hop_noret(
1642                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1_000, &mut p_id, &mut p_idx,
1643                         ),
1644
1645                         0x58 => send_noret(&nodes[0], &nodes[1], chan_a, 100, &mut p_id, &mut p_idx),
1646                         0x59 => send_noret(&nodes[1], &nodes[0], chan_a, 100, &mut p_id, &mut p_idx),
1647                         0x5a => send_noret(&nodes[1], &nodes[2], chan_b, 100, &mut p_id, &mut p_idx),
1648                         0x5b => send_noret(&nodes[2], &nodes[1], chan_b, 100, &mut p_id, &mut p_idx),
1649                         0x5c => send_hop_noret(
1650                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 100, &mut p_id, &mut p_idx,
1651                         ),
1652                         0x5d => send_hop_noret(
1653                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 100, &mut p_id, &mut p_idx,
1654                         ),
1655
1656                         0x60 => send_noret(&nodes[0], &nodes[1], chan_a, 10, &mut p_id, &mut p_idx),
1657                         0x61 => send_noret(&nodes[1], &nodes[0], chan_a, 10, &mut p_id, &mut p_idx),
1658                         0x62 => send_noret(&nodes[1], &nodes[2], chan_b, 10, &mut p_id, &mut p_idx),
1659                         0x63 => send_noret(&nodes[2], &nodes[1], chan_b, 10, &mut p_id, &mut p_idx),
1660                         0x64 => send_hop_noret(
1661                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 10, &mut p_id, &mut p_idx,
1662                         ),
1663                         0x65 => send_hop_noret(
1664                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 10, &mut p_id, &mut p_idx,
1665                         ),
1666
1667                         0x68 => send_noret(&nodes[0], &nodes[1], chan_a, 1, &mut p_id, &mut p_idx),
1668                         0x69 => send_noret(&nodes[1], &nodes[0], chan_a, 1, &mut p_id, &mut p_idx),
1669                         0x6a => send_noret(&nodes[1], &nodes[2], chan_b, 1, &mut p_id, &mut p_idx),
1670                         0x6b => send_noret(&nodes[2], &nodes[1], chan_b, 1, &mut p_id, &mut p_idx),
1671                         0x6c => send_hop_noret(
1672                                 &nodes[0], &nodes[1], chan_a, &nodes[2], chan_b, 1, &mut p_id, &mut p_idx,
1673                         ),
1674                         0x6d => send_hop_noret(
1675                                 &nodes[2], &nodes[1], chan_b, &nodes[0], chan_a, 1, &mut p_id, &mut p_idx,
1676                         ),
1677
1678                         0x80 => {
1679                                 let mut max_feerate = last_htlc_clear_fee_a;
1680                                 if !anchors {
1681                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1682                                 }
1683                                 if fee_est_a.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1684                                         fee_est_a.ret_val.store(max_feerate, atomic::Ordering::Release);
1685                                 }
1686                                 nodes[0].maybe_update_chan_fees();
1687                         },
1688                         0x81 => {
1689                                 fee_est_a.ret_val.store(253, atomic::Ordering::Release);
1690                                 nodes[0].maybe_update_chan_fees();
1691                         },
1692
1693                         0x84 => {
1694                                 let mut max_feerate = last_htlc_clear_fee_b;
1695                                 if !anchors {
1696                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1697                                 }
1698                                 if fee_est_b.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1699                                         fee_est_b.ret_val.store(max_feerate, atomic::Ordering::Release);
1700                                 }
1701                                 nodes[1].maybe_update_chan_fees();
1702                         },
1703                         0x85 => {
1704                                 fee_est_b.ret_val.store(253, atomic::Ordering::Release);
1705                                 nodes[1].maybe_update_chan_fees();
1706                         },
1707
1708                         0x88 => {
1709                                 let mut max_feerate = last_htlc_clear_fee_c;
1710                                 if !anchors {
1711                                         max_feerate *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE as u32;
1712                                 }
1713                                 if fee_est_c.ret_val.fetch_add(250, atomic::Ordering::AcqRel) + 250 > max_feerate {
1714                                         fee_est_c.ret_val.store(max_feerate, atomic::Ordering::Release);
1715                                 }
1716                                 nodes[2].maybe_update_chan_fees();
1717                         },
1718                         0x89 => {
1719                                 fee_est_c.ret_val.store(253, atomic::Ordering::Release);
1720                                 nodes[2].maybe_update_chan_fees();
1721                         },
1722
1723                         0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
1724                         0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
1725                         0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
1726
1727                         0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
1728                         0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
1729                         0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
1730
1731                         0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
1732                         0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
1733                         0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
1734
1735                         0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
1736                         0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
1737                         0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
1738
1739                         0xff => {
1740                                 // Test that no channel is in a stuck state where neither party can send funds even
1741                                 // after we resolve all pending events.
1742                                 // First make sure there are no pending monitor updates and further update
1743                                 // operations complete.
1744                                 *monitor_a.persister.update_ret.lock().unwrap() =
1745                                         ChannelMonitorUpdateStatus::Completed;
1746                                 *monitor_b.persister.update_ret.lock().unwrap() =
1747                                         ChannelMonitorUpdateStatus::Completed;
1748                                 *monitor_c.persister.update_ret.lock().unwrap() =
1749                                         ChannelMonitorUpdateStatus::Completed;
1750
1751                                 complete_all_monitor_updates(&monitor_a, &chan_1_funding);
1752                                 complete_all_monitor_updates(&monitor_b, &chan_1_funding);
1753                                 complete_all_monitor_updates(&monitor_b, &chan_2_funding);
1754                                 complete_all_monitor_updates(&monitor_c, &chan_2_funding);
1755
1756                                 // Next, make sure peers are all connected to each other
1757                                 if chan_a_disconnected {
1758                                         let init_1 = Init {
1759                                                 features: nodes[1].init_features(),
1760                                                 networks: None,
1761                                                 remote_network_address: None,
1762                                         };
1763                                         nodes[0].peer_connected(&nodes[1].get_our_node_id(), &init_1, true).unwrap();
1764                                         let init_0 = Init {
1765                                                 features: nodes[0].init_features(),
1766                                                 networks: None,
1767                                                 remote_network_address: None,
1768                                         };
1769                                         nodes[1].peer_connected(&nodes[0].get_our_node_id(), &init_0, false).unwrap();
1770                                         chan_a_disconnected = false;
1771                                 }
1772                                 if chan_b_disconnected {
1773                                         let init_2 = Init {
1774                                                 features: nodes[2].init_features(),
1775                                                 networks: None,
1776                                                 remote_network_address: None,
1777                                         };
1778                                         nodes[1].peer_connected(&nodes[2].get_our_node_id(), &init_2, true).unwrap();
1779                                         let init_1 = Init {
1780                                                 features: nodes[1].init_features(),
1781                                                 networks: None,
1782                                                 remote_network_address: None,
1783                                         };
1784                                         nodes[2].peer_connected(&nodes[1].get_our_node_id(), &init_1, false).unwrap();
1785                                         chan_b_disconnected = false;
1786                                 }
1787
1788                                 for i in 0..std::usize::MAX {
1789                                         if i == 100 {
1790                                                 panic!("It may take may iterations to settle the state, but it should not take forever");
1791                                         }
1792                                         // Then, make sure any current forwards make their way to their destination
1793                                         if process_msg_events!(0, false, ProcessMessages::AllMessages) {
1794                                                 continue;
1795                                         }
1796                                         if process_msg_events!(1, false, ProcessMessages::AllMessages) {
1797                                                 continue;
1798                                         }
1799                                         if process_msg_events!(2, false, ProcessMessages::AllMessages) {
1800                                                 continue;
1801                                         }
1802                                         // ...making sure any pending PendingHTLCsForwardable events are handled and
1803                                         // payments claimed.
1804                                         if process_events!(0, false) {
1805                                                 continue;
1806                                         }
1807                                         if process_events!(1, false) {
1808                                                 continue;
1809                                         }
1810                                         if process_events!(2, false) {
1811                                                 continue;
1812                                         }
1813                                         break;
1814                                 }
1815
1816                                 // Finally, make sure that at least one end of each channel can make a substantial payment
1817                                 assert!(
1818                                         send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx)
1819                                                 || send_payment(
1820                                                         &nodes[1], &nodes[0], chan_a, 10_000_000, &mut p_id, &mut p_idx
1821                                                 )
1822                                 );
1823                                 assert!(
1824                                         send_payment(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut p_id, &mut p_idx)
1825                                                 || send_payment(
1826                                                         &nodes[2], &nodes[1], chan_b, 10_000_000, &mut p_id, &mut p_idx
1827                                                 )
1828                                 );
1829
1830                                 last_htlc_clear_fee_a = fee_est_a.ret_val.load(atomic::Ordering::Acquire);
1831                                 last_htlc_clear_fee_b = fee_est_b.ret_val.load(atomic::Ordering::Acquire);
1832                                 last_htlc_clear_fee_c = fee_est_c.ret_val.load(atomic::Ordering::Acquire);
1833                         },
1834                         _ => test_return!(),
1835                 }
1836
1837                 if nodes[0].get_and_clear_needs_persistence() == true {
1838                         node_a_ser.0.clear();
1839                         nodes[0].write(&mut node_a_ser).unwrap();
1840                 }
1841                 if nodes[1].get_and_clear_needs_persistence() == true {
1842                         node_b_ser.0.clear();
1843                         nodes[1].write(&mut node_b_ser).unwrap();
1844                 }
1845                 if nodes[2].get_and_clear_needs_persistence() == true {
1846                         node_c_ser.0.clear();
1847                         nodes[2].write(&mut node_c_ser).unwrap();
1848                 }
1849         }
1850 }
1851
1852 /// We actually have different behavior based on if a certain log string has been seen, so we have
1853 /// to do a bit more tracking.
1854 #[derive(Clone)]
1855 struct SearchingOutput<O: Output> {
1856         output: O,
1857         may_fail: Arc<atomic::AtomicBool>,
1858 }
1859 impl<O: Output> Output for SearchingOutput<O> {
1860         fn locked_write(&self, data: &[u8]) {
1861                 // We hit a design limitation of LN state machine (see CONCURRENT_INBOUND_HTLC_FEE_BUFFER)
1862                 if std::str::from_utf8(data).unwrap().contains("Outbound update_fee HTLC buffer overflow - counterparty should force-close this channel") {
1863                         self.may_fail.store(true, atomic::Ordering::Release);
1864                 }
1865                 self.output.locked_write(data)
1866         }
1867 }
1868 impl<O: Output> SearchingOutput<O> {
1869         pub fn new(output: O) -> Self {
1870                 Self { output, may_fail: Arc::new(atomic::AtomicBool::new(false)) }
1871         }
1872 }
1873
1874 pub fn chanmon_consistency_test<Out: Output>(data: &[u8], out: Out) {
1875         do_test(data, out.clone(), false);
1876         do_test(data, out, true);
1877 }
1878
1879 #[no_mangle]
1880 pub extern "C" fn chanmon_consistency_run(data: *const u8, datalen: usize) {
1881         do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, test_logger::DevNull {}, false);
1882         do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, test_logger::DevNull {}, true);
1883 }