From: Matt Corallo Date: Tue, 21 Jan 2020 21:28:41 +0000 (-0500) Subject: Type absurdism X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=6ca8c485e07139d41d68f60a1e2a7baf3252a558;p=rust-lightning Type absurdism --- diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index faa831a19..47e179189 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -43,7 +43,7 @@ pub struct Connection { id: u64, } impl Connection { - fn schedule_read(peer_manager: Arc>>, us: Arc>, reader: futures::stream::SplitStream>) { + fn schedule_read(peer_manager: Arc, Arc>>, us: Arc>, reader: futures::stream::SplitStream>) { let us_ref = us.clone(); let us_close_ref = us.clone(); let peer_manager_ref = peer_manager.clone(); @@ -111,7 +111,7 @@ impl Connection { /// /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on /// ChannelManager and ChannelMonitor objects. - pub fn setup_inbound(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) { + pub fn setup_inbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) { let (reader, us) = Self::new(event_notify, stream); if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) { @@ -125,7 +125,7 @@ impl Connection { /// /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on /// ChannelManager and ChannelMonitor objects. - pub fn setup_outbound(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) { + pub fn setup_outbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) { let (reader, us) = Self::new(event_notify, stream); if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) { @@ -143,7 +143,7 @@ impl Connection { /// /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on /// ChannelManager and ChannelMonitor objects. - pub fn connect_outbound(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) { + pub fn connect_outbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) { let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| { future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached")) }); @@ -158,19 +158,18 @@ impl Connection { } } -#[derive(Clone)] -pub struct SocketDescriptor { +pub struct SocketDescriptor { conn: Arc>, id: u64, - peer_manager: Arc>>, + peer_manager: Arc, Arc>>, } -impl SocketDescriptor { - fn new(conn: Arc>, peer_manager: Arc>>) -> Self { +impl SocketDescriptor { + fn new(conn: Arc>, peer_manager: Arc, Arc>>) -> Self { let id = conn.lock().unwrap().id; Self { conn, id, peer_manager } } } -impl peer_handler::SocketDescriptor for SocketDescriptor { +impl peer_handler::SocketDescriptor for SocketDescriptor { fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize { macro_rules! schedule_read { ($us_ref: expr) => { @@ -257,13 +256,22 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { us.read_paused = true; } } -impl Eq for SocketDescriptor {} -impl PartialEq for SocketDescriptor { +impl Clone for SocketDescriptor { + fn clone(&self) -> Self { + Self { + conn: Arc::clone(&self.conn), + id: self.id, + peer_manager: Arc::clone(&self.peer_manager), + } + } +} +impl Eq for SocketDescriptor {} +impl PartialEq for SocketDescriptor { fn eq(&self, o: &Self) -> bool { self.id == o.id } } -impl Hash for SocketDescriptor { +impl Hash for SocketDescriptor { fn hash(&self, state: &mut H) { self.id.hash(state); } diff --git a/lightning/src/chain/chaininterface.rs b/lightning/src/chain/chaininterface.rs index 45a9ee7f1..3216e2b63 100644 --- a/lightning/src/chain/chaininterface.rs +++ b/lightning/src/chain/chaininterface.rs @@ -207,33 +207,26 @@ impl ChainWatchedUtil { /// Utility for notifying listeners about new blocks, and handling block rescans if new watch /// data is registered. -pub struct BlockNotifier<'a> { - ref_listeners: Mutex>, - arc_listeners: Mutex>>, +pub struct BlockNotifier<'a, CL: std::ops::Deref + 'a> { + listeners: Mutex>, chain_monitor: Arc, + phantom: ::std::marker::PhantomData<&'a ()>, } -impl<'a> BlockNotifier<'a> { +impl<'a, CL: ::std::ops::Deref + 'a> BlockNotifier<'a, CL> { /// Constructs a new BlockNotifier without any listeners. - pub fn new(chain_monitor: Arc) -> BlockNotifier<'a> { + pub fn new(chain_monitor: Arc) -> BlockNotifier<'a, CL> { BlockNotifier { - ref_listeners: Mutex::new(Vec::new()), - arc_listeners: Mutex::new(Vec::new()), + listeners: Mutex::new(Vec::new()), chain_monitor, + phantom: ::std::marker::PhantomData, } } /// Register the given ref listener to receive events. // TODO: unregister - pub fn register_ref_listener(&self, listener: &'a ChainListener) { - let mut vec = self.ref_listeners.lock().unwrap(); - vec.push(listener); - } - - /// Register the given Arc listener to receive events. - // TODO: unregister - pub fn register_arc_listener(&self, listener: Arc) { - let mut vec = self.arc_listeners.lock().unwrap(); + pub fn register_listener(&self, listener: CL) { + let mut vec = self.listeners.lock().unwrap(); vec.push(listener); } @@ -258,14 +251,9 @@ impl<'a> BlockNotifier<'a> { pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool { let last_seen = self.chain_monitor.reentered(); - let ref_listeners = self.ref_listeners.lock().unwrap().clone(); - for ref_listener in ref_listeners.iter() { - ref_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched); - } - - let arc_listeners = self.arc_listeners.lock().unwrap().clone(); - for arc_listener in arc_listeners.iter() { - arc_listener.block_connected(header, height, txn_matched, indexes_of_txn_matched); + let listeners = self.listeners.lock().unwrap(); + for listener in listeners.iter() { + listener.block_connected(header, height, txn_matched, indexes_of_txn_matched); } return last_seen != self.chain_monitor.reentered(); } @@ -273,13 +261,8 @@ impl<'a> BlockNotifier<'a> { /// Notify listeners that a block was disconnected. pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { - let ref_listeners = self.ref_listeners.lock().unwrap().clone(); - for listener in ref_listeners.iter() { - listener.block_disconnected(&header, disconnected_height); - } - - let arc_listeners = self.arc_listeners.lock().unwrap().clone(); - for listener in arc_listeners.iter() { + let listeners = self.listeners.lock().unwrap(); + for listener in listeners.iter() { listener.block_disconnected(&header, disconnected_height); } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 4a960cca5..751dde246 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex}; use std::mem; pub const CHAN_CONFIRM_DEPTH: u32 = 100; -pub fn confirm_transaction<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) { +pub fn confirm_transaction<'a, 'b: 'a>(notifier: &'a chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) { assert!(chain.does_match_tx(tx)); let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; notifier.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]); @@ -47,7 +47,7 @@ pub fn confirm_transaction<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<' } } -pub fn connect_blocks<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b>, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d { +pub fn connect_blocks<'a, 'b>(notifier: &'a chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d { let mut header = BlockHeader { version: 0x2000000, prev_blockhash: if parent { prev_blockhash } else { Default::default() }, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; notifier.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new()); for i in 2..depth + 1 { @@ -68,7 +68,7 @@ pub struct NodeCfg { } pub struct Node<'a, 'b: 'a> { - pub block_notifier: chaininterface::BlockNotifier<'b>, + pub block_notifier: chaininterface::BlockNotifier<'b, &'b chaininterface::ChainListener>, pub chain_monitor: Arc, pub tx_broadcaster: Arc, pub chan_monitor: &'b test_utils::TestChannelMonitor, @@ -889,8 +889,8 @@ pub fn create_network<'a, 'b>(node_count: usize, cfgs: &'a Vec, chan_mg for i in 0..node_count { let block_notifier = chaininterface::BlockNotifier::new(cfgs[i].chain_monitor.clone()); - block_notifier.register_ref_listener(&cfgs[i].chan_monitor.simple_monitor); - block_notifier.register_ref_listener(&chan_mgrs[i]); + block_notifier.register_listener(&cfgs[i].chan_monitor.simple_monitor as &chaininterface::ChainListener); + block_notifier.register_listener(&chan_mgrs[i] as &chaininterface::ChainListener); let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &cfgs[i].keys_manager.get_node_secret()), cfgs[i].chain_monitor.clone(), cfgs[i].logger.clone() as Arc); nodes.push(Node{ chain_monitor: cfgs[i].chain_monitor.clone(), block_notifier, tx_broadcaster: cfgs[i].tx_broadcaster.clone(), chan_monitor: &cfgs[i].chan_monitor, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 934241558..b81377135 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -3522,7 +3522,7 @@ fn test_no_txn_manager_serialize_deserialize() { assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok()); nodes[0].node = &nodes_0_deserialized; - nodes[0].block_notifier.register_ref_listener(nodes[0].node); + nodes[0].block_notifier.register_listener(nodes[0].node); assert_eq!(nodes[0].node.list_channels().len(), 1); check_added_monitors!(nodes[0], 1); @@ -6323,8 +6323,8 @@ fn test_data_loss_protect() { nodes[0].chain_monitor = chain_monitor; nodes[0].block_notifier = BlockNotifier::new(nodes[0].chain_monitor.clone()); - nodes[0].block_notifier.register_ref_listener(&nodes[0].chan_monitor.simple_monitor); - nodes[0].block_notifier.register_ref_listener(nodes[0].node); + nodes[0].block_notifier.register_listener(&nodes[0].chan_monitor.simple_monitor); + nodes[0].block_notifier.register_listener(nodes[0].node); check_added_monitors!(nodes[0], 1);