Rejigger BGP timeouts
authorMatt Corallo <git@bluematt.me>
Fri, 23 Aug 2019 04:54:39 +0000 (00:54 -0400)
committerMatt Corallo <git@bluematt.me>
Fri, 23 Aug 2019 05:34:42 +0000 (01:34 -0400)
src/bgp_client.rs
src/main.rs

index ed07bdc46989c4257311aed443319317da37be25..4d393c39de094026a263d7557392a1f289e3b564 100644 (file)
@@ -21,6 +21,7 @@ use tokio::timer::Delay;
 use futures::sync::mpsc;
 
 use crate::printer::{Printer, Stat};
+use crate::timeout_stream::TimeoutStream;
 
 struct Route {
        path: Vec<u32>,
@@ -231,77 +232,79 @@ impl BGPClient {
        }
 
        fn connect_given_client(addr: SocketAddr, timeout: Duration, printer: &'static Printer, client: Arc<BGPClient>) {
-               let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
-                       future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
-               });
-               let client_reconn = Arc::clone(&client);
-               tokio::spawn(TcpStream::connect(&addr).select(connect_timeout)
-                       .or_else(move |_| {
-                               Delay::new(Instant::now() + timeout / 10).then(|_| {
-                                       future::err(())
-                               })
-                       }).and_then(move |stream| {
-                               let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
-                               let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
-                               tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
-                                       .then(|_| {
+               tokio::spawn(Delay::new(Instant::now() + timeout / 4).then(move |_| {
+                       let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
+                               future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
+                       });
+                       let client_reconn = Arc::clone(&client);
+                       TcpStream::connect(&addr).select(connect_timeout)
+                               .or_else(move |_| {
+                                       Delay::new(Instant::now() + timeout / 2).then(|_| {
                                                future::err(())
+                                       })
+                               }).and_then(move |stream| {
+                                       let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
+                                       let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
+                                       tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
+                                               .then(|_| {
+                                                       future::err(())
+                                               }));
+                                       let _ = sender.try_send(Message::Open(Open {
+                                               version: 4,
+                                               peer_asn: 23456,
+                                               hold_timer: timeout.as_secs() as u16,
+                                               identifier: 0x453b1215, // 69.59.18.21
+                                               parameters: vec![OpenParameter::Capabilities(vec![
+                                                       OpenCapability::MultiProtocol((AFI::IPV4, SAFI::Unicast)),
+                                                       OpenCapability::MultiProtocol((AFI::IPV6, SAFI::Unicast)),
+                                                       OpenCapability::FourByteASN(397444),
+                                                       OpenCapability::RouteRefresh,
+                                                       OpenCapability::AddPath(vec![
+                                                               (AFI::IPV4, SAFI::Unicast, AddPathDirection::ReceivePaths),
+                                                               (AFI::IPV6, SAFI::Unicast, AddPathDirection::ReceivePaths)]),
+                                               ])]
                                        }));
-                               let _ = sender.try_send(Message::Open(Open {
-                                       version: 4,
-                                       peer_asn: 23456,
-                                       hold_timer: 120,
-                                       identifier: 0x453b1215, // 69.59.18.21
-                                       parameters: vec![OpenParameter::Capabilities(vec![
-                                               OpenCapability::MultiProtocol((AFI::IPV4, SAFI::Unicast)),
-                                               OpenCapability::MultiProtocol((AFI::IPV6, SAFI::Unicast)),
-                                               OpenCapability::FourByteASN(397444),
-                                               OpenCapability::RouteRefresh,
-                                               OpenCapability::AddPath(vec![
-                                                       (AFI::IPV4, SAFI::Unicast, AddPathDirection::ReceivePaths),
-                                                       (AFI::IPV6, SAFI::Unicast, AddPathDirection::ReceivePaths)]),
-                                       ])]
-                               }));
-                               read.for_each(move |bgp_msg| {
-                                       if client.shutdown.load(Ordering::Relaxed) {
-                                               return future::err(std::io::Error::new(std::io::ErrorKind::Other, "Shutting Down"));
-                                       }
-                                       match bgp_msg {
-                                               Message::Open(_) => {
-                                                       client.routes.lock().unwrap().v4_table.clear();
-                                                       client.routes.lock().unwrap().v6_table.clear();
-                                                       printer.add_line("Connected to BGP route provider".to_string(), false);
-                                               },
-                                               Message::KeepAlive => {
-                                                       let _ = sender.try_send(Message::KeepAlive);
-                                               },
-                                               Message::Update(mut upd) => {
-                                                       upd.normalize();
-                                                       let mut route_table = client.routes.lock().unwrap();
-                                                       for r in upd.withdrawn_routes {
-                                                               route_table.withdraw(r);
-                                                       }
-                                                       if let Some(path) = Self::map_attrs(upd.attributes) {
-                                                               let path_arc = Arc::new(path);
-                                                               for r in upd.announced_routes {
-                                                                       route_table.announce(r, Arc::clone(&path_arc));
+                                       TimeoutStream::new_persistent(read, timeout).for_each(move |bgp_msg| {
+                                               if client.shutdown.load(Ordering::Relaxed) {
+                                                       return future::err(std::io::Error::new(std::io::ErrorKind::Other, "Shutting Down"));
+                                               }
+                                               match bgp_msg {
+                                                       Message::Open(_) => {
+                                                               client.routes.lock().unwrap().v4_table.clear();
+                                                               client.routes.lock().unwrap().v6_table.clear();
+                                                               printer.add_line("Connected to BGP route provider".to_string(), false);
+                                                       },
+                                                       Message::KeepAlive => {
+                                                               let _ = sender.try_send(Message::KeepAlive);
+                                                       },
+                                                       Message::Update(mut upd) => {
+                                                               upd.normalize();
+                                                               let mut route_table = client.routes.lock().unwrap();
+                                                               for r in upd.withdrawn_routes {
+                                                                       route_table.withdraw(r);
                                                                }
-                                                       }
-                                                       printer.set_stat(Stat::V4RoutingTableSize(route_table.v4_table.len()));
-                                                       printer.set_stat(Stat::V6RoutingTableSize(route_table.v6_table.len()));
-                                               },
-                                               _ => {}
+                                                               if let Some(path) = Self::map_attrs(upd.attributes) {
+                                                                       let path_arc = Arc::new(path);
+                                                                       for r in upd.announced_routes {
+                                                                               route_table.announce(r, Arc::clone(&path_arc));
+                                                                       }
+                                                               }
+                                                               printer.set_stat(Stat::V4RoutingTableSize(route_table.v4_table.len()));
+                                                               printer.set_stat(Stat::V6RoutingTableSize(route_table.v6_table.len()));
+                                                       },
+                                                       _ => {}
+                                               }
+                                               future::ok(())
+                                       }).or_else(move |e| {
+                                               printer.add_line(format!("Got error from BGP stream: {:?}", e), true);
+                                               future::ok(())
+                                       })
+                               }).then(move |_| {
+                                       if !client_reconn.shutdown.load(Ordering::Relaxed) {
+                                               BGPClient::connect_given_client(addr, timeout, printer, client_reconn);
                                        }
                                        future::ok(())
-                               }).or_else(move |e| {
-                                       printer.add_line(format!("Got error from BGP stream: {:?}", e), true);
-                                       future::ok(())
                                })
-                       }).then(move |_| {
-                               if !client_reconn.shutdown.load(Ordering::Relaxed) {
-                                       BGPClient::connect_given_client(addr, timeout, printer, client_reconn);
-                               }
-                               future::ok(())
                        })
                );
        }
index 13053b5ce7effa16ef8c907bd5d70e03e22a3c31..b868570db764f9f702248adf3caeca973c4fe7be 100644 (file)
@@ -436,7 +436,7 @@ fn main() {
                        let store = unsafe { DATA_STORE.as_ref().unwrap() };
                        unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
 
-                       let bgp_client = BGPClient::new(bgp_sockaddr, Duration::from_secs(600), unsafe { PRINTER.as_ref().unwrap() });
+                       let bgp_client = BGPClient::new(bgp_sockaddr, Duration::from_secs(60), unsafe { PRINTER.as_ref().unwrap() });
                        make_trusted_conn(trusted_sockaddr, Arc::clone(&bgp_client));
 
                        reader::read(store, unsafe { PRINTER.as_ref().unwrap() }, bgp_client);