Initial checkin
[dnsseed-rust] / src / timeout_stream.rs
1 use tokio::prelude::*;
2 use tokio::timer::Delay;
3
4 use std::time::{Duration, Instant};
5
6 pub struct TimeoutStream<S> where S : Stream {
7         stream: S,
8         next_deadline: Delay,
9         timeout: Duration,
10 }
11
12 impl<S> TimeoutStream<S> where S : Stream {
13         pub fn new(stream: S, timeout: Duration) -> Self {
14                 let next_deadline = Delay::new(Instant::now() + timeout);
15                 Self {
16                         stream,
17                         next_deadline,
18                         timeout,
19                 }
20         }
21 }
22
23 impl<S> Stream for TimeoutStream<S> where S : Stream {
24         type Item = S::Item;
25         type Error = S::Error;
26         fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
27                 match self.next_deadline.poll() {
28                         Ok(Async::Ready(_)) => Ok(Async::Ready(None)),
29                         Ok(Async::NotReady) => {
30                                 match self.stream.poll() {
31                                         Ok(Async::Ready(v)) => {
32                                                 self.next_deadline.reset(Instant::now() + self.timeout);
33                                                 Ok(Async::Ready(v))
34                                         },
35                                         Ok(Async::NotReady) => Ok(Async::NotReady),
36                                         Err(e) => Err(e),
37                                 }
38                         },
39                         Err(_) => Ok(Async::Ready(None)), // TODO: If I want to upstream TimeoutStream this is gonna need some love
40                 }
41         }
42 }