From 19e626a7ffdfbe8dd09e0f54092351e30c600ac9 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 11:38:14 +1030 Subject: [PATCH 01/10] pytest: add fixture for checking packet sizes. This requires access to dumpcap. On Ubuntu, at least, this means you need to be in the "wireshark" group. We may also need: sudo ethtool -K lo gro off gso off tso off Signed-off-by: Rusty Russell --- tests/fixtures.py | 109 +++++++++++++++++++++++++++++++++++++++ tests/test_connection.py | 26 ++++++++++ 2 files changed, 135 insertions(+) diff --git a/tests/fixtures.py b/tests/fixtures.py index d3a4a114e471..7ff7e26d4408 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -7,6 +7,10 @@ import os import pytest import re +import shutil +import subprocess +import tempfile +import time @pytest.fixture @@ -82,3 +86,108 @@ def compat(): def is_compat(version): compat = CompatLevel() return compat(version) + + +def dumpcap_usable(): + def have_binary(name): + return shutil.which(name) is not None + + if not have_binary("dumpcap") or not have_binary("tshark"): + return False + + try: + with tempfile.TemporaryDirectory() as td: + pcap = Path(td) / "probe.pcap" + + proc = subprocess.Popen( + [ + "dumpcap", + "-i", "lo", + "-w", str(pcap), + "-f", "tcp", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + time.sleep(0.2) + proc.terminate() + proc.wait(timeout=1) + + return pcap.exists() and pcap.stat().st_size > 0 + except (PermissionError, subprocess.SubprocessError, OSError): + return False + + +@pytest.fixture(scope="session") +def have_pcap_tools(): + if not dumpcap_usable(): + pytest.skip("dumpcap/tshark not available or insufficient privileges") + + +class TcpCapture: + def __init__(self, tmpdir): + self.tmpdir = Path(tmpdir) + self.pcap = self.tmpdir / "traffic.pcap" + self.proc = None + self.port = None + + def start(self, port): + assert self.proc is None, "capture already started" + self.port = int(port) + + self.proc = subprocess.Popen( + [ + "dumpcap", + "-i", "lo", + "-w", str(self.pcap), + "-f", f"tcp port {self.port}", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # allow filter attach + time.sleep(0.2) + + def stop(self): + if self.proc: + self.proc.terminate() + self.proc.wait(timeout=2) + self.proc = None + + def assert_constant_payload(self): + tshark_cmd = [ + "tshark", + "-r", str(self.pcap), + "-Y", "tcp.len > 0", + "-T", "fields", + "-e", "tcp.len", + ] + + out = subprocess.check_output(tshark_cmd, text=True) + lengths = [int(x) for x in out.splitlines() if x.strip()] + + assert lengths, f"No TCP payload packets captured on port {self.port}" + + uniq = set(lengths) + assert len(uniq) == 1, ( + f"Non-constant TCP payload sizes on port {self.port}: " + f"{sorted(uniq)}:" + + subprocess.check_output(["tshark", "-r", str(self.pcap)], text=True) + ) + + +@pytest.fixture +def tcp_capture(have_pcap_tools, tmp_path): + # You will need permissions. Most distributions have a group which has + # permissions to use dumpcap: + # $ ls -l /usr/bin/dumpcap + # -rwxr-xr-- 1 root wireshark 229112 Apr 16 2024 /usr/bin/dumpcap + # $ getcap /usr/bin/dumpcap + # /usr/bin/dumpcap cap_net_admin,cap_net_raw=eip + # So you just need to be in the wireshark group. + cap = TcpCapture(tmp_path) + yield cap + cap.stop() + cap.assert_constant_payload() diff --git a/tests/test_connection.py b/tests/test_connection.py index 20fa6259fbae..b93179f6ea21 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -4771,3 +4771,29 @@ def test_networkevents(node_factory, executor): 'type': 'connect_fail'}, {'created_index': 7, 'type': 'connect'}]} + + +@pytest.mark.xfail(strict=True) +def test_constant_packet_size(node_factory, tcp_capture): + """ + Test that TCP packets between nodes are constant size. This will be skipped unless + you can run `dumpcap` (usually means you have to be in the `wireshark` group). + """ + l1, l2, l3, l4 = node_factory.get_nodes(4) + + # Encrypted setup BOLT 8 has some short packets. + l1.connect(l2) + l2.connect(l3) + + tcp_capture.start(l2.port) + + # This gives us gossip send a recv, and channel establishment. + node_factory.join_nodes([l1, l2, l3, l4], wait_for_announce=True) + + # Forwarding, incoming and outgoing payments. + for src, dest in (l1, l2), (l2, l3), (l1, l4): + inv = dest.rpc.invoice(10000000, "test_constant_packet_size", "test_constant_packet_size") + src.rpc.xpay(inv['bolt11']) + + # Padding pings don't elicit a response + assert not l2.daemon.is_in_log("connectd: Unexpected pong") From b74ed6bbbdd2c4c7cacc04b65ddd49bd403332ea Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 11:38:20 +1030 Subject: [PATCH 02/10] CI: run tests in the `wireshark` group so we can test packet sizes Signed-off-by: Rusty Russell --- .github/scripts/setup.sh | 13 +++++++++++++ .github/workflows/ci.yaml | 10 +++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.github/scripts/setup.sh b/.github/scripts/setup.sh index 2409cdf0d543..a719c52086c4 100755 --- a/.github/scripts/setup.sh +++ b/.github/scripts/setup.sh @@ -52,9 +52,11 @@ sudo apt-get -qq install --no-install-recommends --allow-unauthenticated -yy \ sudo \ tcl \ tclsh \ + tshark \ unzip \ valgrind \ wget \ + wireshark-common \ xsltproc \ systemtap-sdt-dev \ zlib1g-dev @@ -97,3 +99,14 @@ export PROTOC=/usr/local/bin/protoc export PATH=$PATH:/usr/local/bin env ls -lha /usr/local/bin + +# wireshark-common normally does this, but GH runners are special, so we +# do it explicitly +sudo groupadd -f wireshark +sudo chgrp wireshark /usr/bin/dumpcap +sudo chmod 750 /usr/bin/dumpcap +sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap + +# Add ourselves to the wireshark group (still need "sg wireshark..." for it to take effect) +sudo usermod -aG wireshark "$(id -nu)" + diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7dda61720455..b9a93a23e727 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -323,7 +323,7 @@ jobs: run: | env cat config.vars - uv run eatmydata pytest tests/test_downgrade.py -n ${PYTEST_PAR} ${PYTEST_OPTS} + sg wireshark "uv run eatmydata pytest tests/test_downgrade.py -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -438,7 +438,7 @@ jobs: run: | env cat config.vars - VALGRIND=0 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} + VALGRIND=0 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -526,7 +526,7 @@ jobs: TEST_DEBUG: 1 PYTEST_PAR: 2 run: | - VALGRIND=1 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }} + VALGRIND=1 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -617,7 +617,7 @@ jobs: env: PYTEST_PAR: 2 run: | - uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }} + sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} ${{ matrix.PYTEST_OPTS }}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 @@ -748,7 +748,7 @@ jobs: run: | env cat config.vars - VALGRIND=0 uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS} + VALGRIND=0 sg wireshark "uv run eatmydata pytest tests/ -n ${PYTEST_PAR} ${PYTEST_OPTS}" - name: Upload test results if: always() uses: actions/upload-artifact@v4 From b9ff30972bd57c6bd8d248f06373a0552d5a86ae Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 11:38:21 +1030 Subject: [PATCH 03/10] connectd: refactor outgoing loop. Give us a single "next message" function to call. This will be useful when we want to write more than one at a time. Signed-off-by: Rusty Russell --- connectd/multiplex.c | 58 ++++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/connectd/multiplex.c b/connectd/multiplex.c index a59b7ad33476..4135ad5f3be5 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -1085,6 +1085,33 @@ static void maybe_update_channelid(struct subd *subd, const u8 *msg) } } +static const u8 *next_msg_for_peer(struct peer *peer) +{ + const u8 *msg; + + msg = msg_dequeue(peer->peer_outq); + if (!msg) { + /* Draining? Don't send gossip. */ + if (peer->draining_state == WRITING_TO_PEER) + return NULL; + + /* If they want us to send gossip, do so now. */ + msg = maybe_gossip_msg(NULL, peer); + if (!msg) + return NULL; + } + + /* dev_disconnect can disable writes (discard everything) */ + if (peer->dev_writes_enabled) { + if (*peer->dev_writes_enabled == 0) { + return tal_free(msg); + } + (*peer->dev_writes_enabled)--; + } + + return msg; +} + static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { @@ -1094,10 +1121,8 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Free last sent one (if any) */ peer->sent_to_peer = tal_free(peer->sent_to_peer); - /* Pop tail of send queue */ - msg = msg_dequeue(peer->peer_outq); - - /* Still nothing to send? */ + /* Pop tail of send queue (or gossip) */ + msg = next_msg_for_peer(peer); if (!msg) { /* Draining? Shutdown socket (to avoid losing msgs) */ if (peer->draining_state == WRITING_TO_PEER) { @@ -1106,33 +1131,18 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, return io_sock_shutdown(peer_conn); } - /* If they want us to send gossip, do so now. */ - msg = maybe_gossip_msg(NULL, peer); - if (!msg) { - /* Tell them to read again, */ - io_wake(&peer->subds); - io_wake(&peer->peer_in); + /* Tell them to read again, */ + io_wake(&peer->subds); + io_wake(&peer->peer_in); - /* Wait for them to wake us */ - return msg_queue_wait(peer_conn, peer->peer_outq, - write_to_peer, peer); - } + /* Wait for them to wake us */ + return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); } if (peer->draining_state == WRITING_TO_PEER) status_peer_debug(&peer->id, "draining, but sending %s.", peer_wire_name(fromwire_peektype(msg))); - /* dev_disconnect can disable writes */ - if (peer->dev_writes_enabled) { - if (*peer->dev_writes_enabled == 0) { - tal_free(msg); - /* Continue, to drain queue */ - return write_to_peer(peer_conn, peer); - } - (*peer->dev_writes_enabled)--; - } - return encrypt_and_send(peer, take(msg)); } From a401d11ef219e9810b447376e78dd8da81e88831 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 11:38:23 +1030 Subject: [PATCH 04/10] connectd: refactor to break up "encrypt_and_send". Do all the special treatment of the message type first. Signed-off-by: Rusty Russell --- connectd/multiplex.c | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 4135ad5f3be5..40abb5082657 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -454,33 +454,30 @@ static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES) return ret; } -static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) +/* --dev-disconnect can do magic things: if this returns non-NULL, + free msg and do that */ +static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) { int type = fromwire_peektype(msg); switch (dev_disconnect_out(&peer->id, type)) { case DEV_DISCONNECT_OUT_BEFORE: - if (taken(msg)) - tal_free(msg); return io_close(peer->to_peer); case DEV_DISCONNECT_OUT_AFTER: /* Disallow reads from now on */ peer->dev_read_enabled = false; free_all_subds(peer); drain_peer(peer); - break; + return NULL; case DEV_DISCONNECT_OUT_BLACKHOLE: /* Disable both reads and writes from now on */ peer->dev_read_enabled = false; peer->dev_writes_enabled = talz(peer, u32); - break; + return NULL; case DEV_DISCONNECT_OUT_NORMAL: - break; + return NULL; case DEV_DISCONNECT_OUT_DROP: - /* Drop this message and continue */ - if (taken(msg)) - tal_free(msg); - /* Tell them to read again, */ + /* Tell them to read again. */ io_wake(&peer->subds); return msg_queue_wait(peer->to_peer, peer->peer_outq, write_to_peer, peer); @@ -488,8 +485,14 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) peer->dev_read_enabled = false; peer->dev_writes_enabled = tal(peer, u32); *peer->dev_writes_enabled = 1; - break; + return NULL; } + abort(); +} + +static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) +{ + int type = fromwire_peektype(msg); set_urgent_flag(peer, is_urgent(type)); @@ -1116,6 +1119,8 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { const u8 *msg; + struct io_plan *dev_override; + assert(peer->to_peer == peer_conn); /* Free last sent one (if any) */ @@ -1143,6 +1148,11 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, status_peer_debug(&peer->id, "draining, but sending %s.", peer_wire_name(fromwire_peektype(msg))); + dev_override = msg_out_dev_disconnect(peer, msg); + if (dev_override) { + tal_free(msg); + return dev_override; + } return encrypt_and_send(peer, take(msg)); } From 5229b99c221a18100b990e7003426d4df1969783 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 11:38:25 +1030 Subject: [PATCH 05/10] connect: switch to using io_write_partial instead of io_write. This gives us finer control over write sizes: for now we just cap the write size. Signed-off-by: Rusty Russell --- connectd/connectd.c | 4 +++- connectd/connectd.h | 6 ++++-- connectd/multiplex.c | 36 +++++++++++++++++++++++++++--------- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 44f81b6c2518..792b472f55aa 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -127,7 +127,9 @@ static struct peer *new_peer(struct daemon *daemon, peer->cs = *cs; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; - peer->sent_to_peer = NULL; + peer->encrypted_peer_out = NULL; + peer->encrypted_peer_out_off = 0; + peer->encrypted_peer_out_sent = 0; peer->urgent = false; peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; diff --git a/connectd/connectd.h b/connectd/connectd.h index 41a86cd10a77..451abf7a1d61 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -88,8 +88,10 @@ struct peer { /* Output buffer. */ struct msg_queue *peer_outq; - /* Peer sent buffer (for freeing after sending) */ - const u8 *sent_to_peer; + /* Encrypted peer sending buffer */ + const u8 *encrypted_peer_out; + size_t encrypted_peer_out_off; + size_t encrypted_peer_out_sent; /* We stream from the gossip_store for them, when idle */ struct gossip_state gs; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 40abb5082657..b19bffe99e78 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -26,6 +26,9 @@ #include #include +/* Maximum write(), to create uniform size packets. */ +#define MAX_MESSAGE_SIZE 1460 + struct subd { /* Owner: we are in peer->subds[] */ struct peer *peer; @@ -490,6 +493,19 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) abort(); } +/* (Continue) writing the encrypted_peer_out array */ +static struct io_plan *write_encrypted_to_peer(struct peer *peer) +{ + size_t max = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; + if (max > MAX_MESSAGE_SIZE) + max = MAX_MESSAGE_SIZE; + return io_write_partial(peer->to_peer, + peer->encrypted_peer_out + peer->encrypted_peer_out_off, + max, + &peer->encrypted_peer_out_sent, + write_to_peer, peer); +} + static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); @@ -498,19 +514,15 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { - peer->sent_to_peer = process_batch_elements(peer, msg); - if (!peer->sent_to_peer) + peer->encrypted_peer_out = process_batch_elements(peer, msg); + if (!peer->encrypted_peer_out) return io_close(peer->to_peer); } else { - peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg); + peer->encrypted_peer_out = cryptomsg_encrypt_msg(peer, &peer->cs, msg); } - /* We free this and the encrypted version in next write_to_peer */ - return io_write(peer->to_peer, - peer->sent_to_peer, - tal_bytelen(peer->sent_to_peer), - write_to_peer, peer); + return write_encrypted_to_peer(peer); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -1123,8 +1135,14 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, assert(peer->to_peer == peer_conn); + /* Write any remainder. */ + peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; + if (peer->encrypted_peer_out_off < tal_bytelen(peer->encrypted_peer_out)) + return write_encrypted_to_peer(peer); + /* Free last sent one (if any) */ - peer->sent_to_peer = tal_free(peer->sent_to_peer); + peer->encrypted_peer_out = tal_free(peer->encrypted_peer_out); + peer->encrypted_peer_out_off = 0; /* Pop tail of send queue (or gossip) */ msg = next_msg_for_peer(peer); From 5e5e8f6ced4cf66d4e3ad34490757f6e16a51dae Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 21:55:42 +1030 Subject: [PATCH 06/10] connectd: don't toggle nagle on and off, leave it always off. We're doing our own buffering now. We leave the is_urgent() function for two commits in the future though. Signed-off-by: Rusty Russell --- connectd/connectd.c | 19 +++++++++++++++++++ connectd/multiplex.c | 40 +--------------------------------------- 2 files changed, 20 insertions(+), 39 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 792b472f55aa..61c1ef71c722 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -507,6 +508,19 @@ static bool get_remote_address(struct io_conn *conn, return true; } +/* Nagle had a good idea of making networking more efficient by + * inserting a delay, creating a trap for every author of network code + * everywhere. + */ +static void set_tcp_no_delay(int fd) +{ + int val = 1; + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + status_broken("setsockopt TCP_NODELAY=1 fd=%u: %s", + fd, strerror(errno)); + } +} + /*~ As so common in C, we need to bundle two args into a callback, so we * allocate a temporary structure to hold them: */ struct conn_in { @@ -639,6 +653,10 @@ static struct io_plan *connection_in(struct io_conn *conn, if (!get_remote_address(conn, &conn_in_arg.addr)) return io_close(conn); + /* Don't try to set TCP options on UNIX socket! */ + if (conn_in_arg.addr.itype == ADDR_INTERNAL_WIREADDR) + set_tcp_no_delay(io_conn_fd(conn)); + conn_in_arg.daemon = daemon; conn_in_arg.is_websocket = false; return conn_in(conn, &conn_in_arg); @@ -1175,6 +1193,7 @@ static void try_connect_one_addr(struct connecting *connect) goto next; } + set_tcp_no_delay(fd); connect->connect_attempted = true; /* This creates the new connection using our fd, with the initialization diff --git a/connectd/multiplex.c b/connectd/multiplex.c index b19bffe99e78..abc31d3b5d84 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -294,42 +293,7 @@ void setup_peer_gossip_store(struct peer *peer, return; } -/* We're happy for the kernel to batch update and gossip messages, but a - * commitment message, for example, should be instantly sent. There's no - * great way of doing this, unfortunately. - * - * Setting TCP_NODELAY on Linux flushes the socket, which really means - * we'd want to toggle on then off it *after* sending. But Linux has - * TCP_CORK. On FreeBSD, it seems (looking at source) not to, so - * there we'd want to set it before the send, and reenable it - * afterwards. Even if this is wrong on other non-Linux platforms, it - * only means one extra packet. - */ -static void set_urgent_flag(struct peer *peer, bool urgent) -{ - int val; - - if (urgent == peer->urgent) - return; - - /* FIXME: We can't do this on websockets, but we could signal our - * websocket proxy via some magic message to do so! */ - if (peer->is_websocket != NORMAL_SOCKET) - return; - - val = urgent; - if (setsockopt(io_conn_fd(peer->to_peer), - IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0 - /* This actually happens in testing, where we blackhole the fd */ - && peer->daemon->dev_disconnect_fd == -1) { - status_broken("setsockopt TCP_NODELAY=1 fd=%u: %s", - io_conn_fd(peer->to_peer), - strerror(errno)); - } - peer->urgent = urgent; -} - -static bool is_urgent(enum peer_wire type) +static bool UNNEEDED is_urgent(enum peer_wire type) { switch (type) { case WIRE_INIT: @@ -510,8 +474,6 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); - set_urgent_flag(peer, is_urgent(type)); - /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { peer->encrypted_peer_out = process_batch_elements(peer, msg); From f8beb9271a5096dc934cf7c855a93fbb238720af Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 21:55:46 +1030 Subject: [PATCH 07/10] connectd: pad messages with dummy pings if needed to make size uniform. Messages are now constant. Signed-off-by: Rusty Russell Changelog-Added: Protocol: we now pad all peer messages to make them the same length. --- connectd/connectd.c | 2 +- connectd/connectd.h | 2 +- connectd/multiplex.c | 190 ++++++++++++++++++++++++++++----------- tests/test_connection.py | 1 - 4 files changed, 141 insertions(+), 54 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 61c1ef71c722..2ff56077dea1 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -128,7 +128,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->cs = *cs; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; - peer->encrypted_peer_out = NULL; + peer->encrypted_peer_out = tal_arr(peer, u8, 0); peer->encrypted_peer_out_off = 0; peer->encrypted_peer_out_sent = 0; peer->urgent = false; diff --git a/connectd/connectd.h b/connectd/connectd.h index 451abf7a1d61..1fdc95c4b1a6 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -89,7 +89,7 @@ struct peer { struct msg_queue *peer_outq; /* Encrypted peer sending buffer */ - const u8 *encrypted_peer_out; + u8 *encrypted_peer_out; size_t encrypted_peer_out_off; size_t encrypted_peer_out_sent; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index abc31d3b5d84..0ac2f80df7e7 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -25,8 +25,8 @@ #include #include -/* Maximum write(), to create uniform size packets. */ -#define MAX_MESSAGE_SIZE 1460 +/* Size of write(), to create uniform size packets. */ +#define UNIFORM_MESSAGE_SIZE 1460 struct subd { /* Owner: we are in peer->subds[] */ @@ -360,9 +360,9 @@ static bool UNNEEDED is_urgent(enum peer_wire type) /* Process and eat protocol_batch_element messages, encrypt each element message * and return the encrypted messages as one long byte array. */ -static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES) +static u8 *process_batch_elements(const tal_t *ctx, struct peer *peer, const u8 *msg TAKES) { - u8 *ret = tal_arr(peer, u8, 0); + u8 *ret = tal_arr(ctx, u8, 0); size_t ret_size = 0; const u8 *cursor = msg; size_t plen = tal_count(msg); @@ -457,34 +457,112 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) abort(); } +/* Do we have enough bytes without padding? */ +static bool have_full_encrypted_queue(const struct peer *peer) +{ + size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; + return bytes >= UNIFORM_MESSAGE_SIZE; +} + +/* Do we have nothing in queue? */ +static bool have_empty_encrypted_queue(const struct peer *peer) +{ + size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; + return bytes == 0; +} + /* (Continue) writing the encrypted_peer_out array */ static struct io_plan *write_encrypted_to_peer(struct peer *peer) { - size_t max = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; - if (max > MAX_MESSAGE_SIZE) - max = MAX_MESSAGE_SIZE; + assert(have_full_encrypted_queue(peer)); return io_write_partial(peer->to_peer, peer->encrypted_peer_out + peer->encrypted_peer_out_off, - max, + UNIFORM_MESSAGE_SIZE, &peer->encrypted_peer_out_sent, write_to_peer, peer); } -static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) +/* Close the connection if this fails */ +static bool encrypt_append(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); + u8 *enc; + size_t prev_size; /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { - peer->encrypted_peer_out = process_batch_elements(peer, msg); - if (!peer->encrypted_peer_out) - return io_close(peer->to_peer); - } - else { - peer->encrypted_peer_out = cryptomsg_encrypt_msg(peer, &peer->cs, msg); + enc = process_batch_elements(tmpctx, peer, msg); + if (!enc) + return false; + } else { + enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg); } - return write_encrypted_to_peer(peer); + prev_size = tal_bytelen(peer->encrypted_peer_out); + tal_resize(&peer->encrypted_peer_out, prev_size + tal_bytelen(enc)); + memcpy(peer->encrypted_peer_out + prev_size, enc, tal_bytelen(enc)); + return true; +} + +static void pad_encrypted_queue(struct peer *peer) +{ + size_t needed, pingpad, bytes; + u8 *ping; + + /* BOLT #8: + * + * ``` + * +------------------------------- + * |2-byte encrypted message length| + * +------------------------------- + * | 16-byte MAC of the encrypted | + * | message length | + * +------------------------------- + * | | + * | | + * | encrypted Lightning | + * | message | + * | | + * +------------------------------- + * | 16-byte MAC of the | + * | Lightning message | + * +------------------------------- + * ``` + * + * The prefixed message length is encoded as a 2-byte big-endian integer, + * for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes. + */ + assert(!have_full_encrypted_queue(peer)); + bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; + + needed = UNIFORM_MESSAGE_SIZE - bytes; + + /* BOLT #1: + * 1. type: 18 (`ping`) + * 2. data: + * * [`u16`:`num_pong_bytes`] + * * [`u16`:`byteslen`] + * * [`byteslen*byte`:`ignored`] + */ + /* So smallest possible ping is 6 bytes (2 byte type field) */ + if (needed < 2 + 16 + 16 + 6) + needed += UNIFORM_MESSAGE_SIZE; + + pingpad = needed - (2 + 16 + 16 + 6); + /* Note: we don't bother --dev-disconnect here */ + /* BOLT #1: + * A node receiving a `ping` message: + * - if `num_pong_bytes` is less than 65532: + * - MUST respond by sending a `pong` message, with `byteslen` equal to `num_pong_bytes`. + * - otherwise (`num_pong_bytes` is **not** less than 65532): + * - MUST ignore the `ping`. + */ + ping = make_ping(NULL, 65535, pingpad); + if (!encrypt_append(peer, take(ping))) + abort(); + + assert(have_full_encrypted_queue(peer)); + assert((tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off) % UNIFORM_MESSAGE_SIZE == 0); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -1092,48 +1170,58 @@ static const u8 *next_msg_for_peer(struct peer *peer) static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { - const u8 *msg; - struct io_plan *dev_override; - assert(peer->to_peer == peer_conn); /* Write any remainder. */ peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; - if (peer->encrypted_peer_out_off < tal_bytelen(peer->encrypted_peer_out)) - return write_encrypted_to_peer(peer); - - /* Free last sent one (if any) */ - peer->encrypted_peer_out = tal_free(peer->encrypted_peer_out); - peer->encrypted_peer_out_off = 0; - - /* Pop tail of send queue (or gossip) */ - msg = next_msg_for_peer(peer); - if (!msg) { - /* Draining? Shutdown socket (to avoid losing msgs) */ - if (peer->draining_state == WRITING_TO_PEER) { - status_peer_debug(&peer->id, "draining done, shutting down"); - io_wake(&peer->peer_in); - return io_sock_shutdown(peer_conn); - } - - /* Tell them to read again, */ - io_wake(&peer->subds); - io_wake(&peer->peer_in); - - /* Wait for them to wake us */ - return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); + peer->encrypted_peer_out_sent = 0; + /* If all used, clean up */ + if (peer->encrypted_peer_out_off == tal_bytelen(peer->encrypted_peer_out)) { + peer->encrypted_peer_out_off = 0; + tal_resize(&peer->encrypted_peer_out, 0); } - if (peer->draining_state == WRITING_TO_PEER) - status_peer_debug(&peer->id, "draining, but sending %s.", - peer_wire_name(fromwire_peektype(msg))); - - dev_override = msg_out_dev_disconnect(peer, msg); - if (dev_override) { - tal_free(msg); - return dev_override; + while (!have_full_encrypted_queue(peer)) { + const u8 *msg; + struct io_plan *dev_override; + + /* Pop tail of send queue (or gossip) */ + msg = next_msg_for_peer(peer); + if (!msg) { + /* Nothing to send at all? We're done */ + if (have_empty_encrypted_queue(peer)) { + /* Draining? Shutdown socket (to avoid losing msgs) */ + if (peer->draining_state == WRITING_TO_PEER) { + status_peer_debug(&peer->id, "draining done, shutting down"); + io_wake(&peer->peer_in); + return io_sock_shutdown(peer_conn); + } + + /* Tell them to read again, */ + io_wake(&peer->subds); + io_wake(&peer->peer_in); + + /* Wait for them to wake us */ + return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); + } + /* OK, add padding. */ + pad_encrypted_queue(peer); + } else { + if (peer->draining_state == WRITING_TO_PEER) + status_peer_debug(&peer->id, "draining, but sending %s.", + peer_wire_name(fromwire_peektype(msg))); + + dev_override = msg_out_dev_disconnect(peer, msg); + if (dev_override) { + tal_free(msg); + return dev_override; + } + + if (!encrypt_append(peer, take(msg))) + return io_close(peer->to_peer); + } } - return encrypt_and_send(peer, take(msg)); + return write_encrypted_to_peer(peer); } static struct io_plan *read_from_subd(struct io_conn *subd_conn, diff --git a/tests/test_connection.py b/tests/test_connection.py index b93179f6ea21..5dd5b768f5c8 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -4773,7 +4773,6 @@ def test_networkevents(node_factory, executor): 'type': 'connect'}]} -@pytest.mark.xfail(strict=True) def test_constant_packet_size(node_factory, tcp_capture): """ Test that TCP packets between nodes are constant size. This will be skipped unless From 9b9e1dd0ae8ebdb9f21130ee98a3df1f9ff256d2 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 21:55:46 +1030 Subject: [PATCH 08/10] connectd: don't flush messages unless we have something important. This replaces our previous nagle-based toggling. Signed-off-by: Rusty Russell --- connectd/connectd.c | 4 ++- connectd/connectd.h | 6 ++-- connectd/multiplex.c | 75 ++++++++++++++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 2ff56077dea1..4e6cdd15d189 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -131,7 +131,9 @@ static struct peer *new_peer(struct daemon *daemon, peer->encrypted_peer_out = tal_arr(peer, u8, 0); peer->encrypted_peer_out_off = 0; peer->encrypted_peer_out_sent = 0; - peer->urgent = false; + peer->nonurgent_flush_timer = NULL; + peer->peer_out_urgent = 0; + peer->flushing_nonurgent = false; peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; peer->peer_outq = msg_queue_new(peer, false); diff --git a/connectd/connectd.h b/connectd/connectd.h index 1fdc95c4b1a6..55f7719a42d1 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -79,9 +79,6 @@ struct peer { /* Connections to the subdaemons */ struct subd **subds; - /* When socket has Nagle overridden */ - bool urgent; - /* Input buffer. */ u8 *peer_in; @@ -92,6 +89,9 @@ struct peer { u8 *encrypted_peer_out; size_t encrypted_peer_out_off; size_t encrypted_peer_out_sent; + size_t peer_out_urgent; + bool flushing_nonurgent; + struct oneshot *nonurgent_flush_timer; /* We stream from the gossip_store for them, when idle */ struct gossip_state gs; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 0ac2f80df7e7..751195134366 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -55,6 +55,7 @@ struct subd { }; /* FIXME: reorder! */ +static bool is_urgent(enum peer_wire type); static void destroy_connected_subd(struct subd *subd); static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer); @@ -98,10 +99,18 @@ static void close_subd_timeout(struct subd *subd) io_close(subd->conn); } +static void msg_to_peer_outq(struct peer *peer, const u8 *msg TAKES) +{ + if (is_urgent(fromwire_peektype(msg))) + peer->peer_out_urgent++; + + msg_enqueue(peer->peer_outq, msg); +} + void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); - msg_enqueue(peer->peer_outq, msg); + msg_to_peer_outq(peer, msg); } static void drain_peer(struct peer *peer) @@ -293,7 +302,7 @@ void setup_peer_gossip_store(struct peer *peer, return; } -static bool UNNEEDED is_urgent(enum peer_wire type) +static bool is_urgent(enum peer_wire type) { switch (type) { case WIRE_INIT: @@ -629,7 +638,7 @@ static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer) peer->gs.bytes_this_second += tal_bytelen(msgs[i]); status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]); if (i > 0) - msg_enqueue(peer->peer_outq, take(msgs[i])); + msg_to_peer_outq(peer, take(msgs[i])); } return msgs[0]; } @@ -1145,7 +1154,11 @@ static const u8 *next_msg_for_peer(struct peer *peer) const u8 *msg; msg = msg_dequeue(peer->peer_outq); - if (!msg) { + if (msg) { + if (is_urgent(fromwire_peektype(msg))) { + peer->peer_out_urgent--; + } + } else { /* Draining? Don't send gossip. */ if (peer->draining_state == WRITING_TO_PEER) return NULL; @@ -1167,14 +1180,31 @@ static const u8 *next_msg_for_peer(struct peer *peer) return msg; } +static void nonurgent_flush(struct peer *peer) +{ + peer->nonurgent_flush_timer = NULL; + peer->flushing_nonurgent = true; + io_wake(peer->peer_outq); +} + static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { + bool do_flush; assert(peer->to_peer == peer_conn); + /* We always pad and send if we have an urgent msg or our + * non-urgent has gone off, or we're trying to close. */ + do_flush = (peer->peer_out_urgent != 0 + || peer->flushing_nonurgent + || peer->draining_state == WRITING_TO_PEER); + + peer->flushing_nonurgent = false; + /* Write any remainder. */ peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; peer->encrypted_peer_out_sent = 0; + /* If all used, clean up */ if (peer->encrypted_peer_out_off == tal_bytelen(peer->encrypted_peer_out)) { peer->encrypted_peer_out_off = 0; @@ -1188,19 +1218,36 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Pop tail of send queue (or gossip) */ msg = next_msg_for_peer(peer); if (!msg) { - /* Nothing to send at all? We're done */ - if (have_empty_encrypted_queue(peer)) { - /* Draining? Shutdown socket (to avoid losing msgs) */ - if (peer->draining_state == WRITING_TO_PEER) { - status_peer_debug(&peer->id, "draining done, shutting down"); - io_wake(&peer->peer_in); - return io_sock_shutdown(peer_conn); - } + /* Nothing in queue means nothing urgent in queue, surely! */ + assert(peer->peer_out_urgent == 0); + /* Draining? Shutdown socket (to avoid losing msgs) */ + if (have_empty_encrypted_queue(peer) + && peer->draining_state == WRITING_TO_PEER) { + status_peer_debug(&peer->id, "draining done, shutting down"); + io_wake(&peer->peer_in); + return io_sock_shutdown(peer_conn); + } + + /* If no urgent message, and not draining, we wait. */ + if (!do_flush) { /* Tell them to read again, */ io_wake(&peer->subds); io_wake(&peer->peer_in); + /* Set up a timer if not already set */ + if (!have_empty_encrypted_queue(peer) + && !peer->nonurgent_flush_timer) { + /* Bias towards larger values, but don't be too predictable */ + u32 max = pseudorand(1000); + u32 msec = 1000 - pseudorand(1 + max); + peer->nonurgent_flush_timer + = new_reltimer(&peer->daemon->timers, + peer, + time_from_msec(msec), + nonurgent_flush, peer); + } + /* Wait for them to wake us */ return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); } @@ -1221,6 +1268,8 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, return io_close(peer->to_peer); } } + + peer->nonurgent_flush_timer = tal_free(peer->nonurgent_flush_timer); return write_encrypted_to_peer(peer); } @@ -1232,7 +1281,7 @@ static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, maybe_update_channelid(subd, subd->in); /* Tell them to encrypt & write. */ - msg_enqueue(subd->peer->peer_outq, take(subd->in)); + msg_to_peer_outq(subd->peer, take(subd->in)); subd->in = NULL; /* Wait for them to wake us */ From c7effc73ec8344ad4ca4d693ef79a2ad4d5e1261 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 21:55:46 +1030 Subject: [PATCH 09/10] connectd: only do lazy transmission for *definitely* non-urgent messages. Since we delay the others quite a lot (up to 1 second), it's better to consider most messages "urgent" and worth immediately transmitting. Signed-off-by: Rusty Russell --- connectd/multiplex.c | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 751195134366..1d3e0203bce2 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -305,6 +305,21 @@ void setup_peer_gossip_store(struct peer *peer, static bool is_urgent(enum peer_wire type) { switch (type) { + /* We are happy to batch UPDATE_ADD messages: it's the + * commitment signed which matters. */ + case WIRE_UPDATE_ADD_HTLC: + case WIRE_UPDATE_FULFILL_HTLC: + case WIRE_UPDATE_FAIL_HTLC: + case WIRE_UPDATE_FAIL_MALFORMED_HTLC: + case WIRE_UPDATE_FEE: + /* Gossip messages are also non-urgent */ + case WIRE_CHANNEL_ANNOUNCEMENT: + case WIRE_NODE_ANNOUNCEMENT: + case WIRE_CHANNEL_UPDATE: + return false; + + /* We don't delay for anything else, but we use a switch + * statement to make you think about new cases! */ case WIRE_INIT: case WIRE_ERROR: case WIRE_WARNING: @@ -328,17 +343,9 @@ static bool is_urgent(enum peer_wire type) case WIRE_CLOSING_SIGNED: case WIRE_CLOSING_COMPLETE: case WIRE_CLOSING_SIG: - case WIRE_UPDATE_ADD_HTLC: - case WIRE_UPDATE_FULFILL_HTLC: - case WIRE_UPDATE_FAIL_HTLC: - case WIRE_UPDATE_FAIL_MALFORMED_HTLC: - case WIRE_UPDATE_FEE: case WIRE_UPDATE_BLOCKHEIGHT: case WIRE_CHANNEL_REESTABLISH: case WIRE_ANNOUNCEMENT_SIGNATURES: - case WIRE_CHANNEL_ANNOUNCEMENT: - case WIRE_NODE_ANNOUNCEMENT: - case WIRE_CHANNEL_UPDATE: case WIRE_QUERY_SHORT_CHANNEL_IDS: case WIRE_REPLY_SHORT_CHANNEL_IDS_END: case WIRE_QUERY_CHANNEL_RANGE: @@ -351,9 +358,6 @@ static bool is_urgent(enum peer_wire type) case WIRE_SPLICE: case WIRE_SPLICE_ACK: case WIRE_SPLICE_LOCKED: - return false; - - /* These are time-sensitive, and so send without delay. */ case WIRE_PING: case WIRE_PONG: case WIRE_PROTOCOL_BATCH_ELEMENT: @@ -363,8 +367,8 @@ static bool is_urgent(enum peer_wire type) return true; }; - /* plugins can inject other messages; assume not urgent. */ - return false; + /* plugins can inject other messages. */ + return true; } /* Process and eat protocol_batch_element messages, encrypt each element message From c9493013622ef5a7078581bbee6c2ba567f193c0 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 12 Feb 2026 21:55:46 +1030 Subject: [PATCH 10/10] connectd: use membuf for more efficient output queue. This is exactly what membuf is for: it handles expansion much more neatly. Signed-off-by: Rusty Russell --- connectd/connectd.c | 5 +++-- connectd/connectd.h | 4 ++-- connectd/multiplex.c | 42 ++++++++++++++++-------------------------- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 4e6cdd15d189..9fb5d33eaff5 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -128,8 +128,9 @@ static struct peer *new_peer(struct daemon *daemon, peer->cs = *cs; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; - peer->encrypted_peer_out = tal_arr(peer, u8, 0); - peer->encrypted_peer_out_off = 0; + membuf_init(&peer->encrypted_peer_out, + tal_arr(peer, u8, 0), 0, + membuf_tal_resize); peer->encrypted_peer_out_sent = 0; peer->nonurgent_flush_timer = NULL; peer->peer_out_urgent = 0; diff --git a/connectd/connectd.h b/connectd/connectd.h index 55f7719a42d1..1cce2c64d3aa 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -3,6 +3,7 @@ #include "config.h" #include #include +#include #include #include #include @@ -86,8 +87,7 @@ struct peer { struct msg_queue *peer_outq; /* Encrypted peer sending buffer */ - u8 *encrypted_peer_out; - size_t encrypted_peer_out_off; + MEMBUF(u8) encrypted_peer_out; size_t encrypted_peer_out_sent; size_t peer_out_urgent; bool flushing_nonurgent; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 1d3e0203bce2..85db8e5485f6 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -473,23 +473,21 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) /* Do we have enough bytes without padding? */ static bool have_full_encrypted_queue(const struct peer *peer) { - size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; - return bytes >= UNIFORM_MESSAGE_SIZE; + return membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE; } /* Do we have nothing in queue? */ static bool have_empty_encrypted_queue(const struct peer *peer) { - size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; - return bytes == 0; + return membuf_num_elems(&peer->encrypted_peer_out) == 0; } /* (Continue) writing the encrypted_peer_out array */ static struct io_plan *write_encrypted_to_peer(struct peer *peer) { - assert(have_full_encrypted_queue(peer)); + assert(membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE); return io_write_partial(peer->to_peer, - peer->encrypted_peer_out + peer->encrypted_peer_out_off, + membuf_elems(&peer->encrypted_peer_out), UNIFORM_MESSAGE_SIZE, &peer->encrypted_peer_out_sent, write_to_peer, peer); @@ -499,8 +497,8 @@ static struct io_plan *write_encrypted_to_peer(struct peer *peer) static bool encrypt_append(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); - u8 *enc; - size_t prev_size; + const u8 *enc; + size_t enclen; /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { @@ -510,16 +508,16 @@ static bool encrypt_append(struct peer *peer, const u8 *msg TAKES) } else { enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg); } - - prev_size = tal_bytelen(peer->encrypted_peer_out); - tal_resize(&peer->encrypted_peer_out, prev_size + tal_bytelen(enc)); - memcpy(peer->encrypted_peer_out + prev_size, enc, tal_bytelen(enc)); + enclen = tal_bytelen(enc); + memcpy(membuf_add(&peer->encrypted_peer_out, enclen), + enc, + enclen); return true; } static void pad_encrypted_queue(struct peer *peer) { - size_t needed, pingpad, bytes; + size_t needed, pingpad; u8 *ping; /* BOLT #8: @@ -545,10 +543,8 @@ static void pad_encrypted_queue(struct peer *peer) * The prefixed message length is encoded as a 2-byte big-endian integer, * for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes. */ - assert(!have_full_encrypted_queue(peer)); - bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; - - needed = UNIFORM_MESSAGE_SIZE - bytes; + assert(membuf_num_elems(&peer->encrypted_peer_out) < UNIFORM_MESSAGE_SIZE); + needed = UNIFORM_MESSAGE_SIZE - membuf_num_elems(&peer->encrypted_peer_out); /* BOLT #1: * 1. type: 18 (`ping`) @@ -575,7 +571,7 @@ static void pad_encrypted_queue(struct peer *peer) abort(); assert(have_full_encrypted_queue(peer)); - assert((tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off) % UNIFORM_MESSAGE_SIZE == 0); + assert(membuf_num_elems(&peer->encrypted_peer_out) % UNIFORM_MESSAGE_SIZE == 0); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -1205,16 +1201,10 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, peer->flushing_nonurgent = false; - /* Write any remainder. */ - peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; + /* We wrote out some bytes from membuf. */ + membuf_consume(&peer->encrypted_peer_out, peer->encrypted_peer_out_sent); peer->encrypted_peer_out_sent = 0; - /* If all used, clean up */ - if (peer->encrypted_peer_out_off == tal_bytelen(peer->encrypted_peer_out)) { - peer->encrypted_peer_out_off = 0; - tal_resize(&peer->encrypted_peer_out, 0); - } - while (!have_full_encrypted_queue(peer)) { const u8 *msg; struct io_plan *dev_override;