From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Tue, 6 Aug 2019 21:12:57 +0000 (+0000) Subject: Merge pull request #349 from ariard/2019-07-data_loss X-Git-Tag: v0.0.12~189 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=56f16eaa4df0de13a59f6524562a8d8a45b6c2ea;hp=41def659c0089ba3f0c7d85726b379f400bb6eb2;p=rust-lightning Merge pull request #349 from ariard/2019-07-data_loss Implement option_data_loss_protect on both sides --- diff --git a/.gitignore b/.gitignore index c2c882ddf..c795d9b53 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /target/ +/net-tokio/target/ **/*.rs.bk Cargo.lock -/target/ -**/*.rs.bk -.idea \ No newline at end of file +.idea + diff --git a/.travis.yml b/.travis.yml index af5fa7214..ef0aa839c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ before_install: script: - RUSTFLAGS="-C link-dead-code" cargo build --verbose - - rm target/debug/lightning-* # Make sure we drop old test binaries + - rm -f target/debug/lightning-* # Make sure we drop old test binaries - RUSTFLAGS="-C link-dead-code" cargo test --verbose - if [ "$(rustup show | grep default | grep 1.34.2)" != "" ]; then cd fuzz && cargo test --verbose && ./travis-fuzz.sh; fi - if [ "$(rustup show | grep default | grep stable)" != "" ]; then cd net-tokio && cargo build --verbose && cd ..; fi diff --git a/fuzz/fuzz_targets/full_stack_target.rs b/fuzz/fuzz_targets/full_stack_target.rs index acd757882..6145d003b 100644 --- a/fuzz/fuzz_targets/full_stack_target.rs +++ b/fuzz/fuzz_targets/full_stack_target.rs @@ -49,7 +49,7 @@ use std::collections::{HashMap, hash_map}; use std::cmp; use std::hash::Hash; use std::sync::Arc; -use std::sync::atomic::{AtomicU8,AtomicUsize,Ordering}; +use std::sync::atomic::{AtomicU64,AtomicUsize,Ordering}; #[inline] pub fn slice_to_be16(v: &[u8]) -> u16 { @@ -124,9 +124,8 @@ struct Peer<'a> { peers_connected: &'a RefCell<[bool; 256]>, } impl<'a> SocketDescriptor for Peer<'a> { - fn send_data(&mut self, data: &Vec, write_offset: usize, _resume_read: bool) -> usize { - assert!(write_offset < data.len()); - data.len() - write_offset + fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize { + data.len() } fn disconnect_socket(&mut self) { assert!(self.peers_connected.borrow()[self.id as usize]); @@ -236,7 +235,7 @@ impl<'a> Drop for MoneyLossDetector<'a> { struct KeyProvider { node_secret: SecretKey, - counter: AtomicU8, + counter: AtomicU64, } impl KeysInterface for KeyProvider { fn get_node_secret(&self) -> SecretKey { @@ -256,7 +255,7 @@ impl KeysInterface for KeyProvider { } fn get_channel_keys(&self, inbound: bool) -> ChannelKeys { - let ctr = self.counter.fetch_add(1, Ordering::Relaxed); + let ctr = self.counter.fetch_add(1, Ordering::Relaxed) as u8; if inbound { ChannelKeys { funding_key: SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, ctr]).unwrap(), @@ -279,13 +278,14 @@ impl KeysInterface for KeyProvider { } fn get_session_key(&self) -> SecretKey { - let ctr = self.counter.fetch_add(1, Ordering::Relaxed); + let ctr = self.counter.fetch_add(1, Ordering::Relaxed) as u8; SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, ctr]).unwrap() } fn get_channel_id(&self) -> [u8; 32] { let ctr = self.counter.fetch_add(1, Ordering::Relaxed); - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, ctr] + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + (ctr >> 8*7) as u8, (ctr >> 8*6) as u8, (ctr >> 8*5) as u8, (ctr >> 8*4) as u8, (ctr >> 8*3) as u8, (ctr >> 8*2) as u8, (ctr >> 8*1) as u8, 14, (ctr >> 8*0) as u8] } } @@ -326,7 +326,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let broadcast = Arc::new(TestBroadcaster{}); let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()); - let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU8::new(0) }); + let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) }); let mut config = UserConfig::new(); config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4)); config.channel_options.announced_channel = get_slice!(1)[0] != 0; diff --git a/net-tokio/src/lib.rs b/net-tokio/src/lib.rs index 54752fdb3..0bc36b280 100644 --- a/net-tokio/src/lib.rs +++ b/net-tokio/src/lib.rs @@ -128,7 +128,7 @@ impl Connection { let (reader, us) = Self::new(event_notify, stream); if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) { - if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, 0, true) == initial_send.len() { + if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, true) == initial_send.len() { Self::schedule_read(peer_manager, us, reader); } else { println!("Failed to write first full message to socket!"); @@ -170,7 +170,7 @@ impl SocketDescriptor { } } impl peer_handler::SocketDescriptor for SocketDescriptor { - fn send_data(&mut self, data: &Vec, write_offset: usize, resume_read: bool) -> usize { + fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize { macro_rules! schedule_read { ($us_ref: expr) => { tokio::spawn(future::lazy(move || -> Result<(), ()> { @@ -211,20 +211,20 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { let us_ref = self.clone(); schedule_read!(us_ref); } - if data.len() == write_offset { return 0; } + if data.is_empty() { return 0; } if us.writer.is_none() { us.read_paused = true; return 0; } - let mut bytes = bytes::BytesMut::with_capacity(data.len() - write_offset); - bytes.put(&data[write_offset..]); + let mut bytes = bytes::BytesMut::with_capacity(data.len()); + bytes.put(data); let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze()); match write_res { Ok(res) => { match res { AsyncSink::Ready => { - data.len() - write_offset + data.len() }, AsyncSink::NotReady(_) => { us.read_paused = true; diff --git a/src/ln/channel.rs b/src/ln/channel.rs index 5c0ca8fd6..f52347121 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -1674,10 +1674,9 @@ impl Channel { if inbound_htlc_count + 1 > OUR_MAX_HTLCS as u32 { return Err(ChannelError::Close("Remote tried to push more than our max accepted HTLCs")); } - //TODO: Spec is unclear if this is per-direction or in total (I assume per direction): // Check our_max_htlc_value_in_flight_msat if htlc_inbound_value_msat + msg.amount_msat > Channel::get_our_max_htlc_value_in_flight_msat(self.channel_value_satoshis) { - return Err(ChannelError::Close("Remote HTLC add would put them over their max HTLC value in flight")); + return Err(ChannelError::Close("Remote HTLC add would put them over our max HTLC value")); } // Check our_channel_reserve_satoshis (we're getting paid, so they have to at least meet // the reserve_satoshis we told them to always have as direct payment so that they lose @@ -3359,16 +3358,15 @@ impl Channel { if outbound_htlc_count + 1 > self.their_max_accepted_htlcs as u32 { return Err(ChannelError::Ignore("Cannot push more than their max accepted HTLCs")); } - //TODO: Spec is unclear if this is per-direction or in total (I assume per direction): // Check their_max_htlc_value_in_flight_msat if htlc_outbound_value_msat + amount_msat > self.their_max_htlc_value_in_flight_msat { - return Err(ChannelError::Ignore("Cannot send value that would put us over the max HTLC value in flight")); + return Err(ChannelError::Ignore("Cannot send value that would put us over the max HTLC value in flight our peer will accept")); } // Check self.their_channel_reserve_satoshis (the amount we must keep as // reserve for them to have something to claim if we misbehave) if self.value_to_self_msat < self.their_channel_reserve_satoshis * 1000 + amount_msat + htlc_outbound_value_msat { - return Err(ChannelError::Ignore("Cannot send value that would put us over the reserve value")); + return Err(ChannelError::Ignore("Cannot send value that would put us over their reserve value")); } //TODO: Check cltv_expiry? Do this in channel manager? diff --git a/src/ln/functional_test_utils.rs b/src/ln/functional_test_utils.rs index 001da7009..fe32a1ef2 100644 --- a/src/ln/functional_test_utils.rs +++ b/src/ln/functional_test_utils.rs @@ -740,7 +740,7 @@ pub fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value let err = origin_node.node.send_payment(route, our_payment_hash).err().unwrap(); match err { - APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight"), + APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept"), _ => panic!("Unknown error variants"), }; } diff --git a/src/ln/functional_tests.rs b/src/ln/functional_tests.rs index 9debe08fa..5446c80aa 100644 --- a/src/ln/functional_tests.rs +++ b/src/ln/functional_tests.rs @@ -1236,7 +1236,7 @@ fn do_channel_reserve_test(test_recv: bool) { assert!(route.hops.iter().rev().skip(1).all(|h| h.fee_msat == feemsat)); let err = nodes[0].node.send_payment(route, our_payment_hash).err().unwrap(); match err { - APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight"), + APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept"), _ => panic!("Unknown error variants"), } } @@ -1272,7 +1272,7 @@ fn do_channel_reserve_test(test_recv: bool) { let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value + 1); let err = nodes[0].node.send_payment(route.clone(), our_payment_hash).err().unwrap(); match err { - APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"), + APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"), _ => panic!("Unknown error variants"), } } @@ -1297,7 +1297,7 @@ fn do_channel_reserve_test(test_recv: bool) { { let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value_2 + 1); match nodes[0].node.send_payment(route, our_payment_hash).err().unwrap() { - APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"), + APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"), _ => panic!("Unknown error variants"), } } @@ -1361,7 +1361,7 @@ fn do_channel_reserve_test(test_recv: bool) { { let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value_22+1); match nodes[0].node.send_payment(route, our_payment_hash).err().unwrap() { - APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"), + APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"), _ => panic!("Unknown error variants"), } } @@ -5019,7 +5019,7 @@ fn test_update_add_htlc_bolt2_sender_exceed_max_htlc_value_in_flight() { let err = nodes[0].node.send_payment(route, our_payment_hash); if let Err(APIError::ChannelUnavailable{err}) = err { - assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight"); + assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept"); } else { assert!(false); } @@ -5143,7 +5143,7 @@ fn test_update_add_htlc_bolt2_receiver_check_max_in_flight_msat() { let err = nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]); if let Err(msgs::HandleError{err, action: Some(msgs::ErrorAction::SendErrorMessage {..})}) = err { - assert_eq!(err,"Remote HTLC add would put them over their max HTLC value in flight"); + assert_eq!(err,"Remote HTLC add would put them over our max HTLC value"); } else { assert!(false); } diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 0e390bd32..8094f2561 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -45,12 +45,13 @@ pub struct MessageHandler { /// careful to ensure you don't have races whereby you might register a new connection with an fd /// the same as a yet-to-be-disconnect_event()-ed. pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { - /// Attempts to send some data from the given Vec starting at the given offset to the peer. + /// Attempts to send some data from the given slice to the peer. + /// /// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected. /// Note that in the disconnected case, a disconnect_event must still fire and further write /// attempts may occur until that time. /// - /// If the returned size is smaller than data.len() - write_offset, a write_available event must + /// If the returned size is smaller than data.len(), a write_available event must /// trigger the next time more data can be written. Additionally, until the a send_data event /// completes fully, no further read_events should trigger on the same peer! /// @@ -58,7 +59,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { /// events should be paused to prevent DoS in the send buffer), resume_read may be set /// indicating that read events on this descriptor should resume. A resume_read of false does /// *not* imply that further read events should be paused. - fn send_data(&mut self, data: &Vec, write_offset: usize, resume_read: bool) -> usize; + fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize; /// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no /// more calls to write_event, read_event or disconnect_event may be made with this descriptor. /// No disconnect_event should be generated as a result of this call, though obviously races @@ -387,7 +388,8 @@ impl PeerManager { }; let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE; - let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading); + let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; + let data_sent = descriptor.send_data(pending, should_be_reading); peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false } } { @@ -1118,9 +1120,8 @@ mod tests { } impl SocketDescriptor for FileDescriptor { - fn send_data(&mut self, data: &Vec, write_offset: usize, _resume_read: bool) -> usize { - assert!(write_offset < data.len()); - data.len() - write_offset + fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize { + data.len() } fn disconnect_socket(&mut self) {}