Type absurdism 2020-01-443-absurdism
authorMatt Corallo <git@bluematt.me>
Tue, 21 Jan 2020 21:28:41 +0000 (16:28 -0500)
committerMatt Corallo <git@bluematt.me>
Tue, 21 Jan 2020 21:28:41 +0000 (16:28 -0500)
lightning-net-tokio/src/lib.rs
lightning/src/chain/chaininterface.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs

index faa831a19d82dbac77b0ac8b7d64c58c03d093ff..47e179189789f953f0944b017b0bc3a5ff9faaec 100644 (file)
@@ -43,7 +43,7 @@ pub struct Connection {
        id: u64,
 }
 impl Connection {
-       fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
+       fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
                let us_ref = us.clone();
                let us_close_ref = us.clone();
                let peer_manager_ref = peer_manager.clone();
@@ -111,7 +111,7 @@ impl Connection {
        ///
        /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
        /// ChannelManager and ChannelMonitor objects.
-       pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
+       pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
                let (reader, us) = Self::new(event_notify, stream);
 
                if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -125,7 +125,7 @@ impl Connection {
        ///
        /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
        /// ChannelManager and ChannelMonitor objects.
-       pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
+       pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
                let (reader, us) = Self::new(event_notify, stream);
 
                if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -143,7 +143,7 @@ impl Connection {
        ///
        /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
        /// ChannelManager and ChannelMonitor objects.
-       pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
+       pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
                let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
                        future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
                });
@@ -158,19 +158,18 @@ impl Connection {
        }
 }
 
-#[derive(Clone)]
-pub struct SocketDescriptor {
+pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
        conn: Arc<Mutex<Connection>>,
        id: u64,
-       peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>,
+       peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
 }
-impl SocketDescriptor {
-       fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<dyn ChannelMessageHandler>>>) -> Self {
+impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
+       fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
                let id = conn.lock().unwrap().id;
                Self { conn, id, peer_manager }
        }
 }
-impl peer_handler::SocketDescriptor for SocketDescriptor {
+impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
        fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
                macro_rules! schedule_read {
                        ($us_ref: expr) => {
@@ -257,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                us.read_paused = true;
        }
 }
-impl Eq for SocketDescriptor {}
-impl PartialEq for SocketDescriptor {
+impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
+       fn clone(&self) -> Self {
+               Self {
+                       conn: Arc::clone(&self.conn),
+                       id: self.id,
+                       peer_manager: Arc::clone(&self.peer_manager),
+               }
+       }
+}
+impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
+impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
        fn eq(&self, o: &Self) -> bool {
                self.id == o.id
        }
 }
-impl Hash for SocketDescriptor {
+impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
        fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
                self.id.hash(state);
        }
index 45a9ee7f11df35399848206bed8365fdaf361c32..3216e2b63d72393240d5d63437fce53a2b304d45 100644 (file)
@@ -207,33 +207,26 @@ impl ChainWatchedUtil {
 
 /// Utility for notifying listeners about new blocks, and handling block rescans if new watch
 /// data is registered.
-pub struct BlockNotifier<'a> {
-       ref_listeners: Mutex<Vec<&'a ChainListener>>,
-       arc_listeners: Mutex<Vec<Arc<ChainListener>>>,
+pub struct BlockNotifier<'a, CL: std::ops::Deref<Target = ChainListener + 'a> + 'a> {
+       listeners: Mutex<Vec<CL>>,
        chain_monitor: Arc<ChainWatchInterface>,
+       phantom: ::std::marker::PhantomData<&'a ()>,
 }
 
-impl<'a> BlockNotifier<'a> {
+impl<'a, CL: ::std::ops::Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
        /// Constructs a new BlockNotifier without any listeners.
-       pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a> {
+       pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
                BlockNotifier {
-                       ref_listeners: Mutex::new(Vec::new()),
-                       arc_listeners: Mutex::new(Vec::new()),
+                       listeners: Mutex::new(Vec::new()),
                        chain_monitor,
+                       phantom: ::std::marker::PhantomData,
                }
        }
 
        /// Register the given ref listener to receive events.
        // TODO: unregister
-       pub fn register_ref_listener(&self, listener: &'a ChainListener) {
-               let mut vec = self.ref_listeners.lock().unwrap();
-               vec.push(listener);
-       }
-
-       /// Register the given Arc listener to receive events.
-       // TODO: unregister
-       pub fn register_arc_listener(&self, listener: Arc<ChainListener>) {
-               let mut vec = self.arc_listeners.lock().unwrap();
+       pub fn register_listener(&self, listener: CL) {
+               let mut vec = self.listeners.lock().unwrap();
                vec.push(listener);
        }
 
@@ -258,14 +251,9 @@ impl<'a> BlockNotifier<'a> {
        pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
                let last_seen = self.chain_monitor.reentered();
 
-               let ref_listeners = self.ref_listeners.lock().unwrap().clone();
-               for ref_listener in ref_listeners.iter() {
-                       ref_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
-               }
-
-               let arc_listeners = self.arc_listeners.lock().unwrap().clone();
-               for arc_listener in arc_listeners.iter() {
-                       arc_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
+               let listeners = self.listeners.lock().unwrap();
+               for listener in listeners.iter() {
+                       listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
                }
                return last_seen != self.chain_monitor.reentered();
        }
@@ -273,13 +261,8 @@ impl<'a> BlockNotifier<'a> {
 
        /// Notify listeners that a block was disconnected.
        pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
-               let ref_listeners = self.ref_listeners.lock().unwrap().clone();
-               for listener in ref_listeners.iter() {
-                       listener.block_disconnected(&header, disconnected_height);
-               }
-
-               let arc_listeners = self.arc_listeners.lock().unwrap().clone();
-               for listener in arc_listeners.iter() {
+               let listeners = self.listeners.lock().unwrap();
+               for listener in listeners.iter() {
                        listener.block_disconnected(&header, disconnected_height);
                }
        }
index 4a960cca55760ddce0881e4e74286a66ae06f273..751dde2464a46f2c9ce436707e8f59a03806b65b 100644 (file)
@@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex};
 use std::mem;
 
 pub const CHAN_CONFIRM_DEPTH: u32 = 100;
-pub fn confirm_transaction<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
+pub fn confirm_transaction<'a, 'b: 'a>(notifier: &'a chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
        assert!(chain.does_match_tx(tx));
        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        notifier.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
@@ -47,7 +47,7 @@ pub fn confirm_transaction<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'
        }
 }
 
-pub fn connect_blocks<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b>, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
+pub fn connect_blocks<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
        let mut header = BlockHeader { version: 0x2000000, prev_blockhash: if parent { prev_blockhash } else { Default::default() }, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        notifier.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
        for i in 2..depth + 1 {
@@ -68,7 +68,7 @@ pub struct NodeCfg {
 }
 
 pub struct Node<'a, 'b: 'a> {
-       pub block_notifier: chaininterface::BlockNotifier<'b>,
+       pub block_notifier: chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>,
        pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
        pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
        pub chan_monitor: &'b test_utils::TestChannelMonitor,
@@ -889,8 +889,8 @@ pub fn create_network<'a, 'b>(node_count: usize, cfgs: &'a Vec<NodeCfg>, chan_mg
 
        for i in 0..node_count {
                let block_notifier = chaininterface::BlockNotifier::new(cfgs[i].chain_monitor.clone());
-               block_notifier.register_ref_listener(&cfgs[i].chan_monitor.simple_monitor);
-               block_notifier.register_ref_listener(&chan_mgrs[i]);
+               block_notifier.register_listener(&cfgs[i].chan_monitor.simple_monitor as &chaininterface::ChainListener);
+               block_notifier.register_listener(&chan_mgrs[i] as &chaininterface::ChainListener);
                let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &cfgs[i].keys_manager.get_node_secret()), cfgs[i].chain_monitor.clone(), cfgs[i].logger.clone() as Arc<Logger>);
                nodes.push(Node{ chain_monitor: cfgs[i].chain_monitor.clone(), block_notifier,
                                                                                 tx_broadcaster: cfgs[i].tx_broadcaster.clone(), chan_monitor: &cfgs[i].chan_monitor,
index 9342415587277d8c0eface5ae3ee41665b810d3f..b8137713516d7e93c546a63a4b1e81675cf5a93e 100644 (file)
@@ -3522,7 +3522,7 @@ fn test_no_txn_manager_serialize_deserialize() {
 
        assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok());
        nodes[0].node = &nodes_0_deserialized;
-       nodes[0].block_notifier.register_ref_listener(nodes[0].node);
+       nodes[0].block_notifier.register_listener(nodes[0].node);
        assert_eq!(nodes[0].node.list_channels().len(), 1);
        check_added_monitors!(nodes[0], 1);
 
@@ -6323,8 +6323,8 @@ fn test_data_loss_protect() {
        nodes[0].chain_monitor = chain_monitor;
 
        nodes[0].block_notifier = BlockNotifier::new(nodes[0].chain_monitor.clone());
-       nodes[0].block_notifier.register_ref_listener(&nodes[0].chan_monitor.simple_monitor);
-       nodes[0].block_notifier.register_ref_listener(nodes[0].node);
+       nodes[0].block_notifier.register_listener(&nodes[0].chan_monitor.simple_monitor);
+       nodes[0].block_notifier.register_listener(nodes[0].node);
 
        check_added_monitors!(nodes[0], 1);