Merge pull request #349 from ariard/2019-07-data_loss
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 6 Aug 2019 21:12:57 +0000 (21:12 +0000)
committerGitHub <noreply@github.com>
Tue, 6 Aug 2019 21:12:57 +0000 (21:12 +0000)
Implement option_data_loss_protect on both sides

.gitignore
.travis.yml
fuzz/fuzz_targets/full_stack_target.rs
net-tokio/src/lib.rs
src/ln/channel.rs
src/ln/functional_test_utils.rs
src/ln/functional_tests.rs
src/ln/peer_handler.rs

index c2c882ddf49b8dd4032826867eed5bc28cbffddb..c795d9b53373103e2a75b2acfdcb1c27facb24e7 100644 (file)
@@ -1,6 +1,6 @@
 /target/
+/net-tokio/target/
 **/*.rs.bk
 Cargo.lock
-/target/
-**/*.rs.bk
-.idea
\ No newline at end of file
+.idea
+
index af5fa721456691c401dbaa8100daa6fa53454887..ef0aa839cf51195cb0304a482a9df86b74b4f008 100644 (file)
@@ -12,7 +12,7 @@ before_install:
 
 script:
   - RUSTFLAGS="-C link-dead-code" cargo build --verbose
-  - rm target/debug/lightning-* # Make sure we drop old test binaries
+  - rm -f target/debug/lightning-* # Make sure we drop old test binaries
   - RUSTFLAGS="-C link-dead-code" cargo test --verbose
   - if [ "$(rustup show | grep default | grep 1.34.2)" != "" ]; then cd fuzz && cargo test --verbose && ./travis-fuzz.sh; fi
   - if [ "$(rustup show | grep default | grep stable)" != "" ]; then cd net-tokio && cargo build --verbose && cd ..; fi
index acd757882b9825a77e2685ff74dae19608fead07..6145d003b199f5007c1b563266b30dd0da2f9c1d 100644 (file)
@@ -49,7 +49,7 @@ use std::collections::{HashMap, hash_map};
 use std::cmp;
 use std::hash::Hash;
 use std::sync::Arc;
-use std::sync::atomic::{AtomicU8,AtomicUsize,Ordering};
+use std::sync::atomic::{AtomicU64,AtomicUsize,Ordering};
 
 #[inline]
 pub fn slice_to_be16(v: &[u8]) -> u16 {
@@ -124,9 +124,8 @@ struct Peer<'a> {
        peers_connected: &'a RefCell<[bool; 256]>,
 }
 impl<'a> SocketDescriptor for Peer<'a> {
-       fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, _resume_read: bool) -> usize {
-               assert!(write_offset < data.len());
-               data.len() - write_offset
+       fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
+               data.len()
        }
        fn disconnect_socket(&mut self) {
                assert!(self.peers_connected.borrow()[self.id as usize]);
@@ -236,7 +235,7 @@ impl<'a> Drop for MoneyLossDetector<'a> {
 
 struct KeyProvider {
        node_secret: SecretKey,
-       counter: AtomicU8,
+       counter: AtomicU64,
 }
 impl KeysInterface for KeyProvider {
        fn get_node_secret(&self) -> SecretKey {
@@ -256,7 +255,7 @@ impl KeysInterface for KeyProvider {
        }
 
        fn get_channel_keys(&self, inbound: bool) -> ChannelKeys {
-               let ctr = self.counter.fetch_add(1, Ordering::Relaxed);
+               let ctr = self.counter.fetch_add(1, Ordering::Relaxed) as u8;
                if inbound {
                        ChannelKeys {
                                funding_key:               SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, ctr]).unwrap(),
@@ -279,13 +278,14 @@ impl KeysInterface for KeyProvider {
        }
 
        fn get_session_key(&self) -> SecretKey {
-               let ctr = self.counter.fetch_add(1, Ordering::Relaxed);
+               let ctr = self.counter.fetch_add(1, Ordering::Relaxed) as u8;
                SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, ctr]).unwrap()
        }
 
        fn get_channel_id(&self) -> [u8; 32] {
                let ctr = self.counter.fetch_add(1, Ordering::Relaxed);
-               [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, ctr]
+               [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+               (ctr >> 8*7) as u8, (ctr >> 8*6) as u8, (ctr >> 8*5) as u8, (ctr >> 8*4) as u8, (ctr >> 8*3) as u8, (ctr >> 8*2) as u8, (ctr >> 8*1) as u8, 14, (ctr >> 8*0) as u8]
        }
 }
 
@@ -326,7 +326,7 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
        let broadcast = Arc::new(TestBroadcaster{});
        let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
 
-       let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU8::new(0) });
+       let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
        let mut config = UserConfig::new();
        config.channel_options.fee_proportional_millionths =  slice_to_be32(get_slice!(4));
        config.channel_options.announced_channel = get_slice!(1)[0] != 0;
index 54752fdb3684e423c8ce1b8c257f18ec0da76b65..0bc36b28075f887476ac782d52a955c80b56cb0c 100644 (file)
@@ -128,7 +128,7 @@ impl Connection {
                let (reader, us) = Self::new(event_notify, stream);
 
                if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
-                       if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, 0, true) == initial_send.len() {
+                       if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, true) == initial_send.len() {
                                Self::schedule_read(peer_manager, us, reader);
                        } else {
                                println!("Failed to write first full message to socket!");
@@ -170,7 +170,7 @@ impl SocketDescriptor {
        }
 }
 impl peer_handler::SocketDescriptor for SocketDescriptor {
-       fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, resume_read: bool) -> usize {
+       fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
                macro_rules! schedule_read {
                        ($us_ref: expr) => {
                                tokio::spawn(future::lazy(move || -> Result<(), ()> {
@@ -211,20 +211,20 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                        let us_ref = self.clone();
                        schedule_read!(us_ref);
                }
-               if data.len() == write_offset { return 0; }
+               if data.is_empty() { return 0; }
                if us.writer.is_none() {
                        us.read_paused = true;
                        return 0;
                }
 
-               let mut bytes = bytes::BytesMut::with_capacity(data.len() - write_offset);
-               bytes.put(&data[write_offset..]);
+               let mut bytes = bytes::BytesMut::with_capacity(data.len());
+               bytes.put(data);
                let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze());
                match write_res {
                        Ok(res) => {
                                match res {
                                        AsyncSink::Ready => {
-                                               data.len() - write_offset
+                                               data.len()
                                        },
                                        AsyncSink::NotReady(_) => {
                                                us.read_paused = true;
index 5c0ca8fd6609cfe07204ffed6d3e6c9d7a11268b..f523471210abaf4650700d42814f75d8c75112f4 100644 (file)
@@ -1674,10 +1674,9 @@ impl Channel {
                if inbound_htlc_count + 1 > OUR_MAX_HTLCS as u32 {
                        return Err(ChannelError::Close("Remote tried to push more than our max accepted HTLCs"));
                }
-               //TODO: Spec is unclear if this is per-direction or in total (I assume per direction):
                // Check our_max_htlc_value_in_flight_msat
                if htlc_inbound_value_msat + msg.amount_msat > Channel::get_our_max_htlc_value_in_flight_msat(self.channel_value_satoshis) {
-                       return Err(ChannelError::Close("Remote HTLC add would put them over their max HTLC value in flight"));
+                       return Err(ChannelError::Close("Remote HTLC add would put them over our max HTLC value"));
                }
                // Check our_channel_reserve_satoshis (we're getting paid, so they have to at least meet
                // the reserve_satoshis we told them to always have as direct payment so that they lose
@@ -3359,16 +3358,15 @@ impl Channel {
                if outbound_htlc_count + 1 > self.their_max_accepted_htlcs as u32 {
                        return Err(ChannelError::Ignore("Cannot push more than their max accepted HTLCs"));
                }
-               //TODO: Spec is unclear if this is per-direction or in total (I assume per direction):
                // Check their_max_htlc_value_in_flight_msat
                if htlc_outbound_value_msat + amount_msat > self.their_max_htlc_value_in_flight_msat {
-                       return Err(ChannelError::Ignore("Cannot send value that would put us over the max HTLC value in flight"));
+                       return Err(ChannelError::Ignore("Cannot send value that would put us over the max HTLC value in flight our peer will accept"));
                }
 
                // Check self.their_channel_reserve_satoshis (the amount we must keep as
                // reserve for them to have something to claim if we misbehave)
                if self.value_to_self_msat < self.their_channel_reserve_satoshis * 1000 + amount_msat + htlc_outbound_value_msat {
-                       return Err(ChannelError::Ignore("Cannot send value that would put us over the reserve value"));
+                       return Err(ChannelError::Ignore("Cannot send value that would put us over their reserve value"));
                }
 
                //TODO: Check cltv_expiry? Do this in channel manager?
index 001da7009b8a67e05ff22ae9719da7ce869434b9..fe32a1ef24bf9f5d239044e446eab284f1295bae 100644 (file)
@@ -740,7 +740,7 @@ pub fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value
 
        let err = origin_node.node.send_payment(route, our_payment_hash).err().unwrap();
        match err {
-               APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight"),
+               APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept"),
                _ => panic!("Unknown error variants"),
        };
 }
index 9debe08fa28889f74dc97379b1412a27eca32eb2..5446c80aad821d8960703f58ca4fd902cd4ff345 100644 (file)
@@ -1236,7 +1236,7 @@ fn do_channel_reserve_test(test_recv: bool) {
                assert!(route.hops.iter().rev().skip(1).all(|h| h.fee_msat == feemsat));
                let err = nodes[0].node.send_payment(route, our_payment_hash).err().unwrap();
                match err {
-                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight"),
+                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept"),
                        _ => panic!("Unknown error variants"),
                }
        }
@@ -1272,7 +1272,7 @@ fn do_channel_reserve_test(test_recv: bool) {
                let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value + 1);
                let err = nodes[0].node.send_payment(route.clone(), our_payment_hash).err().unwrap();
                match err {
-                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"),
+                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"),
                        _ => panic!("Unknown error variants"),
                }
        }
@@ -1297,7 +1297,7 @@ fn do_channel_reserve_test(test_recv: bool) {
        {
                let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value_2 + 1);
                match nodes[0].node.send_payment(route, our_payment_hash).err().unwrap() {
-                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"),
+                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"),
                        _ => panic!("Unknown error variants"),
                }
        }
@@ -1361,7 +1361,7 @@ fn do_channel_reserve_test(test_recv: bool) {
        {
                let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value_22+1);
                match nodes[0].node.send_payment(route, our_payment_hash).err().unwrap() {
-                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over the reserve value"),
+                       APIError::ChannelUnavailable{err} => assert_eq!(err, "Cannot send value that would put us over their reserve value"),
                        _ => panic!("Unknown error variants"),
                }
        }
@@ -5019,7 +5019,7 @@ fn test_update_add_htlc_bolt2_sender_exceed_max_htlc_value_in_flight() {
        let err = nodes[0].node.send_payment(route, our_payment_hash);
 
        if let Err(APIError::ChannelUnavailable{err}) = err {
-               assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight");
+               assert_eq!(err, "Cannot send value that would put us over the max HTLC value in flight our peer will accept");
        } else {
                assert!(false);
        }
@@ -5143,7 +5143,7 @@ fn test_update_add_htlc_bolt2_receiver_check_max_in_flight_msat() {
        let err = nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
 
        if let Err(msgs::HandleError{err, action: Some(msgs::ErrorAction::SendErrorMessage {..})}) = err {
-               assert_eq!(err,"Remote HTLC add would put them over their max HTLC value in flight");
+               assert_eq!(err,"Remote HTLC add would put them over our max HTLC value");
        } else {
                assert!(false);
        }
index 0e390bd32fc9d777dbd7dddbc45e071683c69514..8094f256195d2fa5e1dd5c20c1f44ef3c3489570 100644 (file)
@@ -45,12 +45,13 @@ pub struct MessageHandler {
 /// careful to ensure you don't have races whereby you might register a new connection with an fd
 /// the same as a yet-to-be-disconnect_event()-ed.
 pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
-       /// Attempts to send some data from the given Vec starting at the given offset to the peer.
+       /// Attempts to send some data from the given slice to the peer.
+       ///
        /// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
        /// Note that in the disconnected case, a disconnect_event must still fire and further write
        /// attempts may occur until that time.
        ///
-       /// If the returned size is smaller than data.len() - write_offset, a write_available event must
+       /// If the returned size is smaller than data.len(), a write_available event must
        /// trigger the next time more data can be written. Additionally, until the a send_data event
        /// completes fully, no further read_events should trigger on the same peer!
        ///
@@ -58,7 +59,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
        /// events should be paused to prevent DoS in the send buffer), resume_read may be set
        /// indicating that read events on this descriptor should resume. A resume_read of false does
        /// *not* imply that further read events should be paused.
-       fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, resume_read: bool) -> usize;
+       fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
        /// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no
        /// more calls to write_event, read_event or disconnect_event may be made with this descriptor.
        /// No disconnect_event should be generated as a result of this call, though obviously races
@@ -387,7 +388,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                };
 
                                let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
-                               let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
+                               let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
+                               let data_sent = descriptor.send_data(pending, should_be_reading);
                                peer.pending_outbound_buffer_first_msg_offset += data_sent;
                                if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
                        } {
@@ -1118,9 +1120,8 @@ mod tests {
        }
 
        impl SocketDescriptor for FileDescriptor {
-               fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, _resume_read: bool) -> usize {
-                       assert!(write_offset < data.len());
-                       data.len() - write_offset
+               fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
+                       data.len()
                }
 
                fn disconnect_socket(&mut self) {}