diff --git a/.github/scripts/setup.sh b/.github/scripts/setup.sh index 2409cdf0d543..a719c52086c4 100755 --- a/.github/scripts/setup.sh +++ b/.github/scripts/setup.sh @@ -52,9 +52,11 @@ sudo apt-get -qq install --no-install-recommends --allow-unauthenticated -yy \ sudo \ tcl \ tclsh \ + tshark \ unzip \ valgrind \ wget \ + wireshark-common \ xsltproc \ systemtap-sdt-dev \ zlib1g-dev @@ -97,3 +99,14 @@ export PROTOC=/usr/local/bin/protoc export PATH=$PATH:/usr/local/bin env ls -lha /usr/local/bin + +# wireshark-common normally does this, but GH runners are special, so we +# do it explicitly +sudo groupadd -f wireshark +sudo chgrp wireshark /usr/bin/dumpcap +sudo chmod 750 /usr/bin/dumpcap +sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap + +# Add ourselves to the wireshark group (still need "sg wireshark..." for it to take effect) +sudo usermod -aG wireshark "$(id -nu)" + diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7dda61720455..b9a93a23e727 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -323,7 +323,7 @@ jobs: run: | env cat config.vars - uv run eatmydata pytest tests/test_downgrade.py -n ${PYTEST_PAR} ${PYTEST_OPTS} + sg wireshark "uv run eatmydata pytest tests/test_downgrade.py -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -438,7 +438,7 @@ jobs: run: | env cat config.vars - VALGRIND=0 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} + VALGRIND=0 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -526,7 +526,7 @@ jobs: TEST_DEBUG: 1 PYTEST_PAR: 2 run: | - VALGRIND=1 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }} + VALGRIND=1 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -617,7 +617,7 @@ jobs: env: PYTEST_PAR: 2 run: | - uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }} + sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -748,7 +748,7 @@ jobs: run: | env cat config.vars - VALGRIND=0 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} + VALGRIND=0 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 diff --git a/connectd/connectd.c b/connectd/connectd.c index 44f81b6c2518..a2e0e7d322a8 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -127,8 +128,13 @@ static struct peer *new_peer(struct daemon *daemon, peer->cs = *cs; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; - peer->sent_to_peer = NULL; - peer->urgent = false; + membuf_init(&peer->encrypted_peer_out, + tal_arr(peer, u8, 0), 0, + membuf_tal_resize); + peer->encrypted_peer_out_sent = 0; + peer->nonurgent_flush_timer = NULL; + peer->peer_out_urgent = 0; + peer->flushing_nonurgent = false; peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; peer->peer_outq = msg_queue_new(peer, false); @@ -505,6 +511,19 @@ static bool get_remote_address(struct io_conn *conn, return true; } +/* Nagle had a good idea of making networking more efficient by + * inserting a delay, creating a trap for every author of network code + * everywhere. + */ +static void set_tcp_no_delay(int fd) +{ + int val = 1; + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + status_broken("setsockopt TCP_NODELAY=1 fd=%u: %s", + fd, strerror(errno)); + } +} + /*~ As so common in C, we need to bundle two args into a callback, so we * allocate a temporary structure to hold them: */ struct conn_in { @@ -526,6 +545,10 @@ static struct io_plan *conn_in(struct io_conn *conn, time_from_sec(daemon->timeout_secs), conn_timeout, conn); + /* Don't try to set TCP options on local socket! */ + if (!conn_in_arg->is_websocket) + set_tcp_no_delay(io_conn_fd(conn)); + /*~ The crypto handshake differs depending on whether you received or * initiated the socket connection, so there are two entry points. * Note, again, the notleak() to avoid our simplistic leak detection @@ -1173,6 +1196,7 @@ static void try_connect_one_addr(struct connecting *connect) goto next; } + set_tcp_no_delay(fd); connect->connect_attempted = true; /* This creates the new connection using our fd, with the initialization diff --git a/connectd/connectd.h b/connectd/connectd.h index 41a86cd10a77..1cce2c64d3aa 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -3,6 +3,7 @@ #include "config.h" #include #include +#include #include #include #include @@ -79,17 +80,18 @@ struct peer { /* Connections to the subdaemons */ struct subd **subds; - /* When socket has Nagle overridden */ - bool urgent; - /* Input buffer. */ u8 *peer_in; /* Output buffer. */ struct msg_queue *peer_outq; - /* Peer sent buffer (for freeing after sending) */ - const u8 *sent_to_peer; + /* Encrypted peer sending buffer */ + MEMBUF(u8) encrypted_peer_out; + size_t encrypted_peer_out_sent; + size_t peer_out_urgent; + bool flushing_nonurgent; + struct oneshot *nonurgent_flush_timer; /* We stream from the gossip_store for them, when idle */ struct gossip_state gs; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index a59b7ad33476..85db8e5485f6 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -22,10 +22,12 @@ #include #include #include -#include #include #include +/* Size of write(), to create uniform size packets. */ +#define UNIFORM_MESSAGE_SIZE 1460 + struct subd { /* Owner: we are in peer->subds[] */ struct peer *peer; @@ -53,6 +55,7 @@ struct subd { }; /* FIXME: reorder! */ +static bool is_urgent(enum peer_wire type); static void destroy_connected_subd(struct subd *subd); static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer); @@ -96,10 +99,18 @@ static void close_subd_timeout(struct subd *subd) io_close(subd->conn); } +static void msg_to_peer_outq(struct peer *peer, const u8 *msg TAKES) +{ + if (is_urgent(fromwire_peektype(msg))) + peer->peer_out_urgent++; + + msg_enqueue(peer->peer_outq, msg); +} + void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); - msg_enqueue(peer->peer_outq, msg); + msg_to_peer_outq(peer, msg); } static void drain_peer(struct peer *peer) @@ -291,44 +302,24 @@ void setup_peer_gossip_store(struct peer *peer, return; } -/* We're happy for the kernel to batch update and gossip messages, but a - * commitment message, for example, should be instantly sent. There's no - * great way of doing this, unfortunately. - * - * Setting TCP_NODELAY on Linux flushes the socket, which really means - * we'd want to toggle on then off it *after* sending. But Linux has - * TCP_CORK. On FreeBSD, it seems (looking at source) not to, so - * there we'd want to set it before the send, and reenable it - * afterwards. Even if this is wrong on other non-Linux platforms, it - * only means one extra packet. - */ -static void set_urgent_flag(struct peer *peer, bool urgent) -{ - int val; - - if (urgent == peer->urgent) - return; - - /* FIXME: We can't do this on websockets, but we could signal our - * websocket proxy via some magic message to do so! */ - if (peer->is_websocket != NORMAL_SOCKET) - return; - - val = urgent; - if (setsockopt(io_conn_fd(peer->to_peer), - IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0 - /* This actually happens in testing, where we blackhole the fd */ - && peer->daemon->dev_disconnect_fd == -1) { - status_broken("setsockopt TCP_NODELAY=1 fd=%u: %s", - io_conn_fd(peer->to_peer), - strerror(errno)); - } - peer->urgent = urgent; -} - static bool is_urgent(enum peer_wire type) { switch (type) { + /* We are happy to batch UPDATE_ADD messages: it's the + * commitment signed which matters. */ + case WIRE_UPDATE_ADD_HTLC: + case WIRE_UPDATE_FULFILL_HTLC: + case WIRE_UPDATE_FAIL_HTLC: + case WIRE_UPDATE_FAIL_MALFORMED_HTLC: + case WIRE_UPDATE_FEE: + /* Gossip messages are also non-urgent */ + case WIRE_CHANNEL_ANNOUNCEMENT: + case WIRE_NODE_ANNOUNCEMENT: + case WIRE_CHANNEL_UPDATE: + return false; + + /* We don't delay for anything else, but we use a switch + * statement to make you think about new cases! */ case WIRE_INIT: case WIRE_ERROR: case WIRE_WARNING: @@ -352,17 +343,9 @@ static bool is_urgent(enum peer_wire type) case WIRE_CLOSING_SIGNED: case WIRE_CLOSING_COMPLETE: case WIRE_CLOSING_SIG: - case WIRE_UPDATE_ADD_HTLC: - case WIRE_UPDATE_FULFILL_HTLC: - case WIRE_UPDATE_FAIL_HTLC: - case WIRE_UPDATE_FAIL_MALFORMED_HTLC: - case WIRE_UPDATE_FEE: case WIRE_UPDATE_BLOCKHEIGHT: case WIRE_CHANNEL_REESTABLISH: case WIRE_ANNOUNCEMENT_SIGNATURES: - case WIRE_CHANNEL_ANNOUNCEMENT: - case WIRE_NODE_ANNOUNCEMENT: - case WIRE_CHANNEL_UPDATE: case WIRE_QUERY_SHORT_CHANNEL_IDS: case WIRE_REPLY_SHORT_CHANNEL_IDS_END: case WIRE_QUERY_CHANNEL_RANGE: @@ -375,9 +358,6 @@ static bool is_urgent(enum peer_wire type) case WIRE_SPLICE: case WIRE_SPLICE_ACK: case WIRE_SPLICE_LOCKED: - return false; - - /* These are time-sensitive, and so send without delay. */ case WIRE_PING: case WIRE_PONG: case WIRE_PROTOCOL_BATCH_ELEMENT: @@ -387,15 +367,15 @@ static bool is_urgent(enum peer_wire type) return true; }; - /* plugins can inject other messages; assume not urgent. */ - return false; + /* plugins can inject other messages. */ + return true; } /* Process and eat protocol_batch_element messages, encrypt each element message * and return the encrypted messages as one long byte array. */ -static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES) +static u8 *process_batch_elements(const tal_t *ctx, struct peer *peer, const u8 *msg TAKES) { - u8 *ret = tal_arr(peer, u8, 0); + u8 *ret = tal_arr(ctx, u8, 0); size_t ret_size = 0; const u8 *cursor = msg; size_t plen = tal_count(msg); @@ -454,33 +434,30 @@ static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES) return ret; } -static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) +/* --dev-disconnect can do magic things: if this returns non-NULL, + free msg and do that */ +static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) { int type = fromwire_peektype(msg); switch (dev_disconnect_out(&peer->id, type)) { case DEV_DISCONNECT_OUT_BEFORE: - if (taken(msg)) - tal_free(msg); return io_close(peer->to_peer); case DEV_DISCONNECT_OUT_AFTER: /* Disallow reads from now on */ peer->dev_read_enabled = false; free_all_subds(peer); drain_peer(peer); - break; + return NULL; case DEV_DISCONNECT_OUT_BLACKHOLE: /* Disable both reads and writes from now on */ peer->dev_read_enabled = false; peer->dev_writes_enabled = talz(peer, u32); - break; + return NULL; case DEV_DISCONNECT_OUT_NORMAL: - break; + return NULL; case DEV_DISCONNECT_OUT_DROP: - /* Drop this message and continue */ - if (taken(msg)) - tal_free(msg); - /* Tell them to read again, */ + /* Tell them to read again. */ io_wake(&peer->subds); return msg_queue_wait(peer->to_peer, peer->peer_outq, write_to_peer, peer); @@ -488,26 +465,113 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) peer->dev_read_enabled = false; peer->dev_writes_enabled = tal(peer, u32); *peer->dev_writes_enabled = 1; - break; + return NULL; } + abort(); +} + +/* Do we have enough bytes without padding? */ +static bool have_full_encrypted_queue(const struct peer *peer) +{ + return membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE; +} - set_urgent_flag(peer, is_urgent(type)); +/* Do we have nothing in queue? */ +static bool have_empty_encrypted_queue(const struct peer *peer) +{ + return membuf_num_elems(&peer->encrypted_peer_out) == 0; +} + +/* (Continue) writing the encrypted_peer_out array */ +static struct io_plan *write_encrypted_to_peer(struct peer *peer) +{ + assert(membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE); + return io_write_partial(peer->to_peer, + membuf_elems(&peer->encrypted_peer_out), + UNIFORM_MESSAGE_SIZE, + &peer->encrypted_peer_out_sent, + write_to_peer, peer); +} + +/* Close the connection if this fails */ +static bool encrypt_append(struct peer *peer, const u8 *msg TAKES) +{ + int type = fromwire_peektype(msg); + const u8 *enc; + size_t enclen; /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { - peer->sent_to_peer = process_batch_elements(peer, msg); - if (!peer->sent_to_peer) - return io_close(peer->to_peer); - } - else { - peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg); + enc = process_batch_elements(tmpctx, peer, msg); + if (!enc) + return false; + } else { + enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg); } - /* We free this and the encrypted version in next write_to_peer */ + enclen = tal_bytelen(enc); + memcpy(membuf_add(&peer->encrypted_peer_out, enclen), + enc, + enclen); + return true; +} + +static void pad_encrypted_queue(struct peer *peer) +{ + size_t needed, pingpad; + u8 *ping; + + /* BOLT #8: + * + * ``` + * +------------------------------- + * |2-byte encrypted message length| + * +------------------------------- + * | 16-byte MAC of the encrypted | + * | message length | + * +------------------------------- + * | | + * | | + * | encrypted Lightning | + * | message | + * | | + * +------------------------------- + * | 16-byte MAC of the | + * | Lightning message | + * +------------------------------- + * ``` + * + * The prefixed message length is encoded as a 2-byte big-endian integer, + * for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes. + */ + assert(membuf_num_elems(&peer->encrypted_peer_out) < UNIFORM_MESSAGE_SIZE); + needed = UNIFORM_MESSAGE_SIZE - membuf_num_elems(&peer->encrypted_peer_out); - return io_write(peer->to_peer, - peer->sent_to_peer, - tal_bytelen(peer->sent_to_peer), - write_to_peer, peer); + /* BOLT #1: + * 1. type: 18 (`ping`) + * 2. data: + * * [`u16`:`num_pong_bytes`] + * * [`u16`:`byteslen`] + * * [`byteslen*byte`:`ignored`] + */ + /* So smallest possible ping is 6 bytes (2 byte type field) */ + if (needed < 2 + 16 + 16 + 6) + needed += UNIFORM_MESSAGE_SIZE; + + pingpad = needed - (2 + 16 + 16 + 6); + /* Note: we don't bother --dev-disconnect here */ + /* BOLT #1: + * A node receiving a `ping` message: + * - if `num_pong_bytes` is less than 65532: + * - MUST respond by sending a `pong` message, with `byteslen` equal to `num_pong_bytes`. + * - otherwise (`num_pong_bytes` is **not** less than 65532): + * - MUST ignore the `ping`. + */ + ping = make_ping(NULL, 65535, pingpad); + if (!encrypt_append(peer, take(ping))) + abort(); + + assert(have_full_encrypted_queue(peer)); + assert(membuf_num_elems(&peer->encrypted_peer_out) % UNIFORM_MESSAGE_SIZE == 0); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -574,7 +638,7 @@ static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer) peer->gs.bytes_this_second += tal_bytelen(msgs[i]); status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]); if (i > 0) - msg_enqueue(peer->peer_outq, take(msgs[i])); + msg_to_peer_outq(peer, take(msgs[i])); } return msgs[0]; } @@ -1085,55 +1149,122 @@ static void maybe_update_channelid(struct subd *subd, const u8 *msg) } } -static struct io_plan *write_to_peer(struct io_conn *peer_conn, - struct peer *peer) +static const u8 *next_msg_for_peer(struct peer *peer) { const u8 *msg; - assert(peer->to_peer == peer_conn); - - /* Free last sent one (if any) */ - peer->sent_to_peer = tal_free(peer->sent_to_peer); - /* Pop tail of send queue */ msg = msg_dequeue(peer->peer_outq); - - /* Still nothing to send? */ - if (!msg) { - /* Draining? Shutdown socket (to avoid losing msgs) */ - if (peer->draining_state == WRITING_TO_PEER) { - status_peer_debug(&peer->id, "draining done, shutting down"); - io_wake(&peer->peer_in); - return io_sock_shutdown(peer_conn); + if (msg) { + if (is_urgent(fromwire_peektype(msg))) { + peer->peer_out_urgent--; } + } else { + /* Draining? Don't send gossip. */ + if (peer->draining_state == WRITING_TO_PEER) + return NULL; /* If they want us to send gossip, do so now. */ msg = maybe_gossip_msg(NULL, peer); - if (!msg) { - /* Tell them to read again, */ - io_wake(&peer->subds); - io_wake(&peer->peer_in); - - /* Wait for them to wake us */ - return msg_queue_wait(peer_conn, peer->peer_outq, - write_to_peer, peer); - } + if (!msg) + return NULL; } - if (peer->draining_state == WRITING_TO_PEER) - status_peer_debug(&peer->id, "draining, but sending %s.", - peer_wire_name(fromwire_peektype(msg))); - - /* dev_disconnect can disable writes */ + /* dev_disconnect can disable writes (discard everything) */ if (peer->dev_writes_enabled) { if (*peer->dev_writes_enabled == 0) { - tal_free(msg); - /* Continue, to drain queue */ - return write_to_peer(peer_conn, peer); + return tal_free(msg); } (*peer->dev_writes_enabled)--; } - return encrypt_and_send(peer, take(msg)); + return msg; +} + +static void nonurgent_flush(struct peer *peer) +{ + peer->nonurgent_flush_timer = NULL; + peer->flushing_nonurgent = true; + io_wake(peer->peer_outq); +} + +static struct io_plan *write_to_peer(struct io_conn *peer_conn, + struct peer *peer) +{ + bool do_flush; + assert(peer->to_peer == peer_conn); + + /* We always pad and send if we have an urgent msg or our + * non-urgent has gone off, or we're trying to close. */ + do_flush = (peer->peer_out_urgent != 0 + || peer->flushing_nonurgent + || peer->draining_state == WRITING_TO_PEER); + + peer->flushing_nonurgent = false; + + /* We wrote out some bytes from membuf. */ + membuf_consume(&peer->encrypted_peer_out, peer->encrypted_peer_out_sent); + peer->encrypted_peer_out_sent = 0; + + while (!have_full_encrypted_queue(peer)) { + const u8 *msg; + struct io_plan *dev_override; + + /* Pop tail of send queue (or gossip) */ + msg = next_msg_for_peer(peer); + if (!msg) { + /* Nothing in queue means nothing urgent in queue, surely! */ + assert(peer->peer_out_urgent == 0); + + /* Draining? Shutdown socket (to avoid losing msgs) */ + if (have_empty_encrypted_queue(peer) + && peer->draining_state == WRITING_TO_PEER) { + status_peer_debug(&peer->id, "draining done, shutting down"); + io_wake(&peer->peer_in); + return io_sock_shutdown(peer_conn); + } + + /* If no urgent message, and not draining, we wait. */ + if (!do_flush) { + /* Tell them to read again, */ + io_wake(&peer->subds); + io_wake(&peer->peer_in); + + /* Set up a timer if not already set */ + if (!have_empty_encrypted_queue(peer) + && !peer->nonurgent_flush_timer) { + /* Bias towards larger values, but don't be too predictable */ + u32 max = pseudorand(1000); + u32 msec = 1000 - pseudorand(1 + max); + peer->nonurgent_flush_timer + = new_reltimer(&peer->daemon->timers, + peer, + time_from_msec(msec), + nonurgent_flush, peer); + } + + /* Wait for them to wake us */ + return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); + } + /* OK, add padding. */ + pad_encrypted_queue(peer); + } else { + if (peer->draining_state == WRITING_TO_PEER) + status_peer_debug(&peer->id, "draining, but sending %s.", + peer_wire_name(fromwire_peektype(msg))); + + dev_override = msg_out_dev_disconnect(peer, msg); + if (dev_override) { + tal_free(msg); + return dev_override; + } + + if (!encrypt_append(peer, take(msg))) + return io_close(peer->to_peer); + } + } + + peer->nonurgent_flush_timer = tal_free(peer->nonurgent_flush_timer); + return write_encrypted_to_peer(peer); } static struct io_plan *read_from_subd(struct io_conn *subd_conn, @@ -1144,7 +1275,7 @@ static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, maybe_update_channelid(subd, subd->in); /* Tell them to encrypt & write. */ - msg_enqueue(subd->peer->peer_outq, take(subd->in)); + msg_to_peer_outq(subd->peer, take(subd->in)); subd->in = NULL; /* Wait for them to wake us */ diff --git a/tests/fixtures.py b/tests/fixtures.py index d3a4a114e471..7ff7e26d4408 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -7,6 +7,10 @@ import os import pytest import re +import shutil +import subprocess +import tempfile +import time @pytest.fixture @@ -82,3 +86,108 @@ def compat(): def is_compat(version): compat = CompatLevel() return compat(version) + + +def dumpcap_usable(): + def have_binary(name): + return shutil.which(name) is not None + + if not have_binary("dumpcap") or not have_binary("tshark"): + return False + + try: + with tempfile.TemporaryDirectory() as td: + pcap = Path(td) / "probe.pcap" + + proc = subprocess.Popen( + [ + "dumpcap", + "-i", "lo", + "-w", str(pcap), + "-f", "tcp", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + time.sleep(0.2) + proc.terminate() + proc.wait(timeout=1) + + return pcap.exists() and pcap.stat().st_size > 0 + except (PermissionError, subprocess.SubprocessError, OSError): + return False + + +@pytest.fixture(scope="session") +def have_pcap_tools(): + if not dumpcap_usable(): + pytest.skip("dumpcap/tshark not available or insufficient privileges") + + +class TcpCapture: + def __init__(self, tmpdir): + self.tmpdir = Path(tmpdir) + self.pcap = self.tmpdir / "traffic.pcap" + self.proc = None + self.port = None + + def start(self, port): + assert self.proc is None, "capture already started" + self.port = int(port) + + self.proc = subprocess.Popen( + [ + "dumpcap", + "-i", "lo", + "-w", str(self.pcap), + "-f", f"tcp port {self.port}", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # allow filter attach + time.sleep(0.2) + + def stop(self): + if self.proc: + self.proc.terminate() + self.proc.wait(timeout=2) + self.proc = None + + def assert_constant_payload(self): + tshark_cmd = [ + "tshark", + "-r", str(self.pcap), + "-Y", "tcp.len > 0", + "-T", "fields", + "-e", "tcp.len", + ] + + out = subprocess.check_output(tshark_cmd, text=True) + lengths = [int(x) for x in out.splitlines() if x.strip()] + + assert lengths, f"No TCP payload packets captured on port {self.port}" + + uniq = set(lengths) + assert len(uniq) == 1, ( + f"Non-constant TCP payload sizes on port {self.port}: " + f"{sorted(uniq)}:" + + subprocess.check_output(["tshark", "-r", str(self.pcap)], text=True) + ) + + +@pytest.fixture +def tcp_capture(have_pcap_tools, tmp_path): + # You will need permissions. Most distributions have a group which has + # permissions to use dumpcap: + # $ ls -l /usr/bin/dumpcap + # -rwxr-xr-- 1 root wireshark 229112 Apr 16 2024 /usr/bin/dumpcap + # $ getcap /usr/bin/dumpcap + # /usr/bin/dumpcap cap_net_admin,cap_net_raw=eip + # So you just need to be in the wireshark group. + cap = TcpCapture(tmp_path) + yield cap + cap.stop() + cap.assert_constant_payload() diff --git a/tests/test_connection.py b/tests/test_connection.py index 20fa6259fbae..5dd5b768f5c8 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -4771,3 +4771,28 @@ def test_networkevents(node_factory, executor): 'type': 'connect_fail'}, {'created_index': 7, 'type': 'connect'}]} + + +def test_constant_packet_size(node_factory, tcp_capture): + """ + Test that TCP packets between nodes are constant size. This will be skipped unless + you can run `dumpcap` (usually means you have to be in the `wireshark` group). + """ + l1, l2, l3, l4 = node_factory.get_nodes(4) + + # Encrypted setup BOLT 8 has some short packets. + l1.connect(l2) + l2.connect(l3) + + tcp_capture.start(l2.port) + + # This gives us gossip send a recv, and channel establishment. + node_factory.join_nodes([l1, l2, l3, l4], wait_for_announce=True) + + # Forwarding, incoming and outgoing payments. + for src, dest in (l1, l2), (l2, l3), (l1, l4): + inv = dest.rpc.invoice(10000000, "test_constant_packet_size", "test_constant_packet_size") + src.rpc.xpay(inv['bolt11']) + + # Padding pings don't elicit a response + assert not l2.daemon.is_in_log("connectd: Unexpected pong")