1 // This file is Copyright its original authors, visible in version control
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
11 * This file implements what you need to get networking up with LDK using
12 * standard POSIX APIs. Its not particularly effecient (in fact quite the
13 * opposite) but should be more than sufficient for most use-cases.
18 #include <lightning.h>
20 #include <sys/socket.h>
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
35 #define MSG_NOSIGNAL 0
39 #define MAX_CONNS 1024
40 struct SocketHandler {
41 struct LDKPeerManager ldk_peer_manager;
42 pthread_t socket_thread;
45 // pollfds ands sockcount may only be written to with a lock on
47 // sockcount mut always be the number of sockets in pollfds
48 // Items in pollfds may only be removed in the socket_thread, other threads
49 // may only append new file descriptors at the end (via register_socket).
50 pthread_mutex_t sockets_mutex;
51 struct pollfd pollfds[MAX_CONNS];
55 int register_socket(struct SocketHandler* handler, int fd, int is_listen_sock) {
56 int fd_flags = fcntl(fd, F_GETFL, 0);
57 if (fd_flags < 0) return -1;
58 if (fcntl(fd, F_SETFL, fd_flags | O_NONBLOCK) == -1) return -1;
60 if (!is_listen_sock) {
62 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&opt, sizeof(opt))) return -1;
64 if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt))) return -1;
68 int lockres = pthread_mutex_lock(&handler->sockets_mutex);
71 handler->pollfds[handler->sockcount].fd = fd;
72 handler->pollfds[handler->sockcount].events = POLLIN;
73 handler->pollfds[handler->sockcount].revents = 0;
75 assert(handler->sockcount <= MAX_CONNS);
77 lockres = pthread_mutex_unlock(&handler->sockets_mutex);
81 write(handler->pipefds[1], &dummy, 1);
87 struct SocketHandler *handler;
91 static uintptr_t sock_send_data(void* desc, struct LDKu8slice data, bool resume_read) {
92 struct Descriptor *descriptor = (struct Descriptor*)desc;
93 ssize_t write_count = send(descriptor->fd, data.data, data.datalen, MSG_NOSIGNAL);
94 bool pause_read = false;
95 if (write_count <= 0) {
96 if (errno == EAGAIN || errno == EWOULDBLOCK) {
100 shutdown(descriptor->fd, SHUT_RDWR);
102 write(descriptor->handler->pipefds[1], &dummy, 1);
105 } else if (write_count < data.datalen) {
108 if (pause_read || resume_read) {
109 int lockres = pthread_mutex_lock(&descriptor->handler->sockets_mutex);
110 assert(lockres == 0);
111 for (int i = 0; i < descriptor->handler->sockcount; i++) {
112 if (descriptor->handler->pollfds[i].fd == descriptor->fd) {
114 descriptor->handler->pollfds[i].events = POLLOUT;
116 descriptor->handler->pollfds[i].events = POLLIN;
121 lockres = pthread_mutex_unlock(&descriptor->handler->sockets_mutex);
122 assert(lockres == 0);
124 write(descriptor->handler->pipefds[1], &dummy, 1);
129 static void sock_disconnect(void* desc) {
130 struct Descriptor *descriptor = (struct Descriptor*)desc;
131 shutdown(descriptor->fd, SHUT_RDWR);
133 write(descriptor->handler->pipefds[1], &dummy, 1);
135 static bool sock_eq(const void* desc, const struct LDKSocketDescriptor *other_arg) {
136 const struct Descriptor *descriptor = (const struct Descriptor*)desc;
137 const struct Descriptor *other_descriptor = (const struct Descriptor*)other_arg->this_arg;
138 return descriptor->fd == other_descriptor->fd;
140 static uint64_t sock_hash(const void* desc) {
141 const struct Descriptor *descriptor = (const struct Descriptor*)desc;
142 return (uint64_t)descriptor->fd;
144 static void sock_cloned(LDKSocketDescriptor *NONNULL_PTR ldk_desc) {
145 const struct Descriptor *descriptor = (const struct Descriptor*)ldk_desc->this_arg;
146 struct Descriptor *new_desc = malloc(sizeof(struct Descriptor));
147 new_desc->handler = descriptor->handler;
148 new_desc->fd = descriptor->fd;
149 ldk_desc->this_arg = (void*) new_desc;
151 static void sock_free(void* desc) {
155 static inline LDKSocketDescriptor get_descriptor(struct SocketHandler *handler, int fd) {
156 struct Descriptor *desc = malloc(sizeof(struct Descriptor));
157 desc->handler = handler;
159 LDKSocketDescriptor ret = {
160 .this_arg = (void*)desc,
161 .send_data = sock_send_data,
162 .disconnect_socket = sock_disconnect,
165 .cloned = sock_cloned,
171 static LDKCOption_NetAddressZ get_remote_network_address(int fd) {
172 struct sockaddr_storage sockaddr;
173 socklen_t remote_addr_len = sizeof(sockaddr);
174 if (getpeername(fd, (struct sockaddr*)&sockaddr, &remote_addr_len) == -1) {
175 return COption_NetAddressZ_none();
178 switch (sockaddr.ss_family) {
180 const struct sockaddr_in *remote_addr = (struct sockaddr_in*)&sockaddr;
182 memcpy(&addr, &remote_addr->sin_addr.s_addr, 4);
183 return COption_NetAddressZ_some(NetAddress_ipv4(addr, ntohs(remote_addr->sin_port)));
186 const struct sockaddr_in6 *remote_addr = (struct sockaddr_in6*)&sockaddr;
187 LDKSixteenBytes addr;
188 memcpy(&addr, &remote_addr->sin6_addr.s6_addr, 16);
189 return COption_NetAddressZ_some(NetAddress_ipv6(addr, ntohs(remote_addr->sin6_port)));
192 return COption_NetAddressZ_none();
196 static void *sock_thread_fn(void* arg) {
197 struct SocketHandler *handler = (struct SocketHandler*) arg;
200 struct pollfd pollfds[MAX_CONNS + 1];
203 int close_socks[MAX_CONNS];
204 int close_socks_count = 0;
206 uint8_t readbuf[8192];
208 while (!handler->should_exit) {
210 lockres = pthread_mutex_lock(&handler->sockets_mutex);
211 assert(lockres == 0);
212 memcpy(pollfds, handler->pollfds, sizeof(struct pollfd) * handler->sockcount);
213 fd_count = handler->sockcount;
214 lockres = pthread_mutex_unlock(&handler->sockets_mutex);
215 assert(lockres == 0);
217 pollfds[fd_count].fd = handler->pipefds[0];
218 pollfds[fd_count].events = POLLIN;
221 int pollres = poll(pollfds, fd_count, 10000);
222 assert(pollres != -1);
223 close_socks_count = 0;
225 read(pollfds[fd_count-1].fd, readbuf, sizeof(readbuf)); // Empty out the pipe
228 for (int i = 0; i < fd_count - 1; i++) {
229 if (pollfds[i].revents) {
230 LDKSocketDescriptor descriptor = get_descriptor(handler, pollfds[i].fd);
231 if (pollfds[i].revents & POLLIN) {
232 int readlen = read(pollfds[i].fd, readbuf, sizeof(readbuf));
234 if (errno == EAGAIN || errno == EWOULDBLOCK) {
236 } else if (errno == EINVAL || errno == ENOTCONN) {
237 // Assume we're a listening socket!
238 int newfd = accept(pollfds[i].fd, NULL, NULL);
240 // Received a new connection, register it!
241 LDKSocketDescriptor new_descriptor = get_descriptor(handler, newfd);
242 LDKCResult_NonePeerHandleErrorZ con_res = PeerManager_new_inbound_connection(&handler->ldk_peer_manager, new_descriptor, get_remote_network_address(newfd));
243 if (con_res.result_ok) {
244 if (register_socket(handler, newfd, 0))
245 shutdown(newfd, SHUT_RDWR);
248 CResult_NonePeerHandleErrorZ_free(con_res);
250 // Maybe it wasn't a listening socket, disconnect!
251 close_socks[close_socks_count++] = i;
254 close_socks[close_socks_count++] = i;
256 } else if (readlen == 0) {
258 close_socks[close_socks_count++] = i;
264 LDKCResult_boolPeerHandleErrorZ res = PeerManager_read_event(&handler->ldk_peer_manager, &descriptor, data);
266 if (*res.contents.result) {
267 lockres = pthread_mutex_lock(&handler->sockets_mutex);
268 assert(lockres == 0);
269 assert(handler->pollfds[i].fd == pollfds[i].fd); // Only we change fd order!
270 handler->pollfds[i].events = POLLOUT;
271 lockres = pthread_mutex_unlock(&handler->sockets_mutex);
272 assert(lockres == 0);
275 close_socks[close_socks_count++] = i;
277 CResult_boolPeerHandleErrorZ_free(res);
280 if (pollfds[i].revents & POLLOUT) {
281 LDKCResult_NonePeerHandleErrorZ res = PeerManager_write_buffer_space_avail(&handler->ldk_peer_manager, &descriptor);
282 if (!res.result_ok) {
283 close_socks[close_socks_count++] = i;
285 CResult_NonePeerHandleErrorZ_free(res);
287 SocketDescriptor_free(descriptor);
292 // We only do the actual socket disconnect handling in this thread,
293 // other threads may append to pollfds and call shutdown().
294 // Thus, in this thread, we first call socket_disconnected for each
295 // socket we're gonna remove, then we walk the sockets and close() each
296 // which should be disconnecting, shifting the remaining sockets down
298 for (int i = 0; i < close_socks_count; i++) {
299 LDKSocketDescriptor descriptor = get_descriptor(handler, handler->pollfds[close_socks[i]].fd);
300 PeerManager_socket_disconnected(&handler->ldk_peer_manager, &descriptor);
301 SocketDescriptor_free(descriptor);
304 lockres = pthread_mutex_lock(&handler->sockets_mutex);
305 assert(lockres == 0);
307 for (int i = 0; close_socks_count != 0 && i < handler->sockcount; i++) {
308 if (close_idx < close_socks_count && close_socks[close_idx] == i) {
309 close(handler->pollfds[i].fd);
312 handler->pollfds[i-close_idx] = handler->pollfds[i];
315 assert(close_idx == close_socks_count);
316 handler->sockcount -= close_socks_count;
317 lockres = pthread_mutex_unlock(&handler->sockets_mutex);
318 assert(lockres == 0);
320 PeerManager_process_events(&handler->ldk_peer_manager);
323 lockres = pthread_mutex_lock(&handler->sockets_mutex);
324 assert(lockres == 0);
325 for (int i = 0; i < handler->sockcount; i++) {
326 LDKSocketDescriptor descriptor = get_descriptor(handler, handler->pollfds[i].fd);
327 PeerManager_socket_disconnected(&handler->ldk_peer_manager, &descriptor);
328 SocketDescriptor_free(descriptor);
331 for (int i = 0; i < handler->sockcount; i++) {
332 close(handler->pollfds[i].fd);
334 close(handler->pipefds[0]);
335 close(handler->pipefds[1]);
336 handler->sockcount = 0;
337 lockres = pthread_mutex_unlock(&handler->sockets_mutex);
338 assert(lockres == 0);
344 void* init_socket_handling(const struct LDKPeerManager *NONNULL_PTR ldk_peer_manager) {
345 struct SocketHandler *handler = NULL;
347 handler = (struct SocketHandler*) calloc(1, sizeof(struct SocketHandler));
348 if (!handler) goto err;
349 handler->pipefds[0] = -1;
350 handler->pipefds[1] = -1;
352 handler->ldk_peer_manager = *ldk_peer_manager;
353 handler->should_exit = false;
355 if (pipe(handler->pipefds) != 0) goto err;
357 int fd_flags = fcntl(handler->pipefds[0], F_GETFL, 0);
358 if (fd_flags < 0) goto err;
359 if (fcntl(handler->pipefds[0], F_SETFL, fd_flags | O_NONBLOCK) == -1) goto err;
361 if (pthread_mutex_init(&handler->sockets_mutex, NULL) != 0) goto err;
362 if (pthread_create(&handler->socket_thread, NULL, sock_thread_fn, handler) != 0) {
363 pthread_mutex_destroy(&handler->sockets_mutex);
371 if (handler->pipefds[0] != -1) close(handler->pipefds[0]);
372 if (handler->pipefds[1] != -1) close(handler->pipefds[1]);
378 void interrupt_socket_handling(void* arg) {
379 struct SocketHandler *handler = (struct SocketHandler*) arg;
380 handler->should_exit = true;
382 write(handler->pipefds[1], &dummy, 1);
384 int ret = pthread_join(handler->socket_thread, &retval);
389 int socket_connect(void* arg, LDKPublicKey pubkey, struct sockaddr *addr, size_t addrlen) {
390 struct SocketHandler *handler = (struct SocketHandler*) arg;
392 int fd = socket(addr->sa_family, SOCK_STREAM, 0);
393 if (fd < 0) return -1;
395 struct timeval timeout;
398 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout))) return -2;
399 if (connect(fd, addr, addrlen)) return -3;
400 timeout.tv_sec = 120;
401 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout))) return -3;
403 if (register_socket(handler, fd, 0)) return -4;
405 LDKSocketDescriptor descriptor = get_descriptor(handler, fd);
406 LDKCResult_CVec_u8ZPeerHandleErrorZ con_res = PeerManager_new_outbound_connection(&handler->ldk_peer_manager, pubkey, descriptor, get_remote_network_address(fd));
407 if (con_res.result_ok) {
408 ssize_t write_count = send(fd, con_res.contents.result->data, con_res.contents.result->datalen, MSG_NOSIGNAL);
409 if (write_count != con_res.contents.result->datalen)
410 shutdown(fd, SHUT_RDWR);
412 shutdown(fd, SHUT_RDWR);
414 CResult_CVec_u8ZPeerHandleErrorZ_free(con_res);
419 int socket_bind(void* arg, struct sockaddr *addr, socklen_t addrlen) {
420 struct SocketHandler *handler = (struct SocketHandler*) arg;
421 int fd = socket(addr->sa_family, SOCK_STREAM, 0);
422 if (fd < 0) return -1;
425 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuseon, sizeof(reuseon))) return -2;
427 if (bind(fd, addr, addrlen)) return -3;
428 if (listen(fd, 32)) return -4;
430 if (register_socket(handler, fd, 1)) return -5;