From 8053071ae09eca7dcc869f00b44e7ad0d151f502 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 12 Oct 2025 19:26:43 +0200 Subject: [PATCH 01/84] quic: for streams allow ReadableStream as body After the change the body parameter for all created QuicStreams can take a ReadableStream of Uint8Array, Arraybuffers etc.. We introduce a DataQueueFeeder class, that may be also used for other related mechanisms. A locally created unidirectional Stream can not have a reader. Therefore the interface is changed to handle this case. (undercovered when writing the tests). Furthermore, a ResumeStream must be added After AddStream in QuicSession, as the ResumeStream beforehand triggered with set_outbound is a no-op, as Stream was not added to Session beforehand. Fixes: https://github.com/nodejs/node/issues/60234 --- doc/api/quic.md | 6 +- lib/internal/quic/quic.js | 37 ++++ src/dataqueue/queue.cc | 81 +++++++- src/dataqueue/queue.h | 6 + src/quic/bindingdata.h | 9 +- src/quic/quic.cc | 3 + src/quic/session.cc | 2 + src/quic/streams.cc | 178 +++++++++++++++++- src/quic/streams.h | 60 +++++- ...t-quic-server-to-client-unidirectional.mjs | 109 +++++++++++ 10 files changed, 482 insertions(+), 9 deletions(-) create mode 100644 test/parallel/test-quic-server-to-client-unidirectional.mjs diff --git a/doc/api/quic.md b/doc/api/quic.md index 723c26c5bd0b99..9ea06e8e8bd1cf 100644 --- a/doc/api/quic.md +++ b/doc/api/quic.md @@ -475,7 +475,7 @@ added: v23.8.0 --> * `options` {Object} - * `body` {ArrayBuffer | ArrayBufferView | Blob} + * `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream} * `sendOrder` {number} * Returns: {Promise} for a {quic.QuicStream} @@ -489,7 +489,7 @@ added: v23.8.0 --> * `options` {Object} - * `body` {ArrayBuffer | ArrayBufferView | Blob} + * `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream} * `sendOrder` {number} * Returns: {Promise} for a {quic.QuicStream} @@ -820,7 +820,7 @@ The callback to invoke when the stream is reset. Read/write. added: v23.8.0 --> -* Type: {ReadableStream} +* Type: {ReadableStream | undefined} ### `stream.session` diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 459906d270dab6..ba4ec5350f4691 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -34,6 +34,7 @@ const { Endpoint: Endpoint_, Http3Application: Http3, setCallbacks, + DataQueueFeeder, // The constants to be exposed to end users for various options. CC_ALGO_RENO_STR: CC_ALGO_RENO, @@ -114,6 +115,10 @@ const { buildNgHeaderString, } = require('internal/http2/util'); +const { + isReadableStream, +} = require('internal/webstreams/readablestream'); + const kEmptyObject = { __proto__: null }; const { @@ -546,6 +551,37 @@ setCallbacks({ function validateBody(body) { // TODO(@jasnell): Support streaming sources if (body === undefined) return body; + if (isReadableStream(body)) { + const feeder = new DataQueueFeeder(); + const reader = body.getReader(); + + const feeding = async () => { + await feeder.ready(); + let cont = true; + + while (cont) { + let read; + try { + read = await reader.read(); + } catch (error) { + feeder.error(error); + } + const { value, done } = read; + try { + cont = await feeder.submit(value, done); + } catch (error) { + reader.cancel(error.toString()); + break; + } + } + if (!cont) { + reader.releaseLock(); + } + + }; + feeding(); + return feeder; + } // Transfer ArrayBuffers... if (isArrayBuffer(body)) { return ArrayBufferPrototypeTransfer(body); @@ -578,6 +614,7 @@ function validateBody(body) { 'ArrayBuffer', 'ArrayBufferView', 'Blob', + 'ReadableStream', ], body); } diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 537844806d3087..a41518fa54615c 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -18,6 +18,8 @@ #include #include +#include "../quic/streams.h" + namespace node { using v8::ArrayBufferView; @@ -1062,9 +1064,81 @@ class FdEntry final : public EntryImpl { friend class ReaderImpl; }; +} // namespace +// ============================================================================ + +class FeederEntry final : public EntryImpl { + public: + FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) { + } + + static std::unique_ptr Create(DataQueueFeeder* feeder) { + return std::make_unique(feeder); + } + + std::shared_ptr get_reader() override { + return ReaderImpl::Create(this); + } + + std::unique_ptr slice( + uint64_t start, std::optional end = std::nullopt) override { + // we are not idempotent + return std::unique_ptr(nullptr); + } + + std::optional size() const override { + return std::optional(); + } + + bool is_idempotent() const override { return false; } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(FeederEntry) + SET_SELF_SIZE(FeederEntry) + + private: + DataQueueFeeder* feeder_; + + class ReaderImpl final : public DataQueue::Reader, + public std::enable_shared_from_this { + public: + static std::shared_ptr Create(FeederEntry* entry) { + return std::make_shared(entry); + }; + + explicit ReaderImpl(FeederEntry* entry) : entry_(entry) { + } + + ~ReaderImpl() { + entry_->feeder_->DrainAndClose(); + } + + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + if (entry_->feeder_->Done()) { + std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + return bob::STATUS_EOS; + } + entry_->feeder_->addPendingPull( + DataQueueFeeder::PendingPull(std::move(next))); + entry_->feeder_->tryWakePulls(); + return bob::STATUS_WAIT; + } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(FeederEntry::Reader) + SET_SELF_SIZE(ReaderImpl) + + private: + FeederEntry* entry_; + }; +}; + // ============================================================================ -} // namespace std::shared_ptr DataQueue::CreateIdempotent( std::vector> list) { @@ -1138,6 +1212,11 @@ std::unique_ptr DataQueue::CreateFdEntry(Environment* env, return FdEntry::Create(env, path); } +std::unique_ptr DataQueue::CreateFeederEntry( + DataQueueFeeder* feeder) { + return FeederEntry::Create(feeder); +} + void DataQueue::Initialize(Environment* env, v8::Local target) { // Nothing to do here currently. } diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index a37bd27549986e..eb470f4c7bf10a 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -16,6 +16,8 @@ #include namespace node { + using v8::Local; + using v8::Value; // Represents a sequenced collection of data sources that can be // consumed as a single logical stream of data. Sources can be @@ -124,6 +126,8 @@ namespace node { // For non-idempotent DataQueues, only a single reader is ever allowed for // the DataQueue, and the data can only ever be read once. +class DataQueueFeeder; + class DataQueue : public MemoryRetainer { public: struct Vec { @@ -224,6 +228,8 @@ class DataQueue : public MemoryRetainer { static std::unique_ptr CreateFdEntry(Environment* env, v8::Local path); + static std::unique_ptr CreateFeederEntry(DataQueueFeeder* feeder); + // Creates a Reader for the given queue. If the queue is idempotent, // any number of readers can be created, all of which are guaranteed // to provide the same data. Otherwise, only a single reader is diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index 1b29a54a8c1199..7be72a45e08d24 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -15,7 +15,9 @@ #include #include "defs.h" -namespace node::quic { +namespace node { +class DataQueueFeeder; +namespace quic { class Endpoint; class Packet; @@ -24,6 +26,7 @@ class Packet; // The FunctionTemplates the BindingData will store for us. #define QUIC_CONSTRUCTORS(V) \ + V(dataqueuefeeder) \ V(endpoint) \ V(http3application) \ V(logstream) \ @@ -68,6 +71,7 @@ class Packet; V(ciphers, "ciphers") \ V(crl, "crl") \ V(cubic, "cubic") \ + V(dataqueuefeeder, "DataQueueFeeder") \ V(disable_stateless_reset, "disableStatelessReset") \ V(enable_connect_protocol, "enableConnectProtocol") \ V(enable_datagrams, "enableDatagrams") \ @@ -264,6 +268,7 @@ struct CallbackScope final : public CallbackScopeBase { explicit CallbackScope(T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {} }; -} // namespace node::quic +} // namespace quic +} // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/quic.cc b/src/quic/quic.cc index edfb5dc9e66295..13fc8c2fda8a07 100644 --- a/src/quic/quic.cc +++ b/src/quic/quic.cc @@ -27,6 +27,7 @@ void CreatePerIsolateProperties(IsolateData* isolate_data, Endpoint::InitPerIsolate(isolate_data, target); Session::InitPerIsolate(isolate_data, target); Stream::InitPerIsolate(isolate_data, target); + DataQueueFeeder::InitPerIsolate(isolate_data, target); } void CreatePerContextProperties(Local target, @@ -38,6 +39,7 @@ void CreatePerContextProperties(Local target, Endpoint::InitPerContext(realm, target); Session::InitPerContext(realm, target); Stream::InitPerContext(realm, target); + DataQueueFeeder::InitPerContext(realm, target); } void RegisterExternalReferences(ExternalReferenceRegistry* registry) { @@ -45,6 +47,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { Endpoint::RegisterExternalReferences(registry); Session::RegisterExternalReferences(registry); Stream::RegisterExternalReferences(registry); + DataQueueFeeder::RegisterExternalReferences(registry); } } // namespace quic diff --git a/src/quic/session.cc b/src/quic/session.cc index 882fd9bac0e54f..5abdbda2be76fa 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -1942,6 +1942,8 @@ BaseObjectPtr Session::CreateStream( if (auto stream = Stream::Create(this, id, std::move(data_source))) [[likely]] { AddStream(stream, option); + ResumeStream(id); // ok, we need to resume, as the Resume before fails + // as the stream was not added yet return stream; } return {}; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 8fe5b72ce1fe5b..9d65ff9a264666 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -17,9 +17,11 @@ namespace node { +using quic::BindingData; using v8::Array; using v8::ArrayBuffer; using v8::ArrayBufferView; +using v8::BackingStore; using v8::BigInt; using v8::Global; using v8::Integer; @@ -30,6 +32,8 @@ using v8::Nothing; using v8::Object; using v8::ObjectTemplate; using v8::SharedArrayBuffer; +using v8::TypedArray; +using v8::Uint8Array; using v8::Value; namespace quic { @@ -215,8 +219,17 @@ Maybe> Stream::GetDataQueueFromSource( entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( std::move(backing), 0, backing->ByteLength())); return Just(DataQueue::CreateIdempotent(std::move(entries))); + } else if (DataQueueFeeder::HasInstance(env, value)) { + // a DataQueueFeeder + DataQueueFeeder* dataQueueFeeder; + ASSIGN_OR_RETURN_UNWRAP( + &dataQueueFeeder, value, Nothing>()); + std::shared_ptr dataQueue = DataQueue::Create(); + dataQueue->append(DataQueue::CreateFeederEntry(dataQueueFeeder)); + return Just(dataQueue); } - // TODO(jasnell): Add streaming sources... + + THROW_ERR_INVALID_ARG_TYPE(env, "Invalid data source type"); return Nothing>(); } @@ -367,9 +380,13 @@ struct Stream::Impl { // Returns a Blob::Reader that can be used to read data that has been // received on the stream. + // returns undefined if local unidirectional stream JS_METHOD(GetReader) { Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + if (stream->is_local_unidirectional()) { + return args.GetReturnValue().SetUndefined(); + } BaseObjectPtr reader = stream->get_reader(); if (reader) return args.GetReturnValue().Set(reader->object()); THROW_ERR_INVALID_STATE(Environment::GetCurrent(args), @@ -1319,6 +1336,165 @@ void Stream::Unschedule() { } } // namespace quic + +DataQueueFeeder::DataQueueFeeder(Environment* env, + Local object) + : AsyncWrap(env, object) { + MakeWeak(); +} + +void DataQueueFeeder::tryWakePulls() { + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + // I do not think, that this can error... + (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); + readFinish_.Reset(); + } +} + +void DataQueueFeeder::DrainAndClose() { + if (done) return; + while (!pendingPulls_.empty()) { + auto& pending = pendingPulls_.front(); + auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); + pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + } + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); + readFinish_.Reset(); + } + done = true; +} + + +JS_METHOD_IMPL(DataQueueFeeder::New) { + DCHECK(args.IsConstructCall()); + auto env = Environment::GetCurrent(args); + new DataQueueFeeder(env, args.This()); +} + +JS_METHOD_IMPL(DataQueueFeeder::Ready) { + Environment* env = Environment::GetCurrent(args); + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + if (feeder->pendingPulls_.size() > 0) { + feeder->readFinish_.Reset(); + return; + } else { + Local readFinish = + Promise::Resolver::New(env->context()).ToLocalChecked(); + feeder->readFinish_.Reset(env->isolate(), readFinish); + args.GetReturnValue().Set(readFinish->GetPromise()); + return; + } +} + +JS_METHOD_IMPL(DataQueueFeeder::Submit) { + Environment* env = Environment::GetCurrent(args); + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + + bool done = false; + if (args[1]->IsBoolean() && args[1].As()->Value()) { + done = true; + } + if (!args[0].IsEmpty()) { + CHECK_GT(feeder->pendingPulls_.size(), 0); + auto chunk = args[0]; + + if (chunk->IsArrayBuffer()) { + auto buffer = chunk.As(); + chunk = Uint8Array::New(buffer, 0, buffer->ByteLength()); + } + if (!chunk->IsTypedArray()) { + THROW_ERR_INVALID_ARG_TYPE(env, + "Invalid data must be Arraybuffer or TypedArray"); + } + Local typedArray = chunk.As(); + // now we create a copy + // detaching, would not be a good idea for example, such + // a limitation is not given with W3C Webtransport + // if we do not do it here, a transform stream would + // be needed to do the copy in the Webtransport case. + // there may be also troubles, if multiple Uint8Array + // are derived in a parser from a single ArrayBuffer + size_t nread = typedArray->ByteLength(); + JS_TRY_ALLOCATE_BACKING(env, backingUniq, nread); + std::shared_ptr backing = std::move(backingUniq); + + auto originalStore = typedArray->Buffer()->GetBackingStore(); + const void* originalData = static_cast(originalStore->Data()) + + typedArray->ByteOffset(); + memcpy(backing->Data(), originalData, nread); + auto& pending = feeder->pendingPulls_.front(); + auto pop = OnScopeLeave([feeder] { feeder->pendingPulls_.pop_front(); }); + DataQueue::Vec vec; + vec.base = static_cast(backing->Data()); + vec.len = static_cast(nread); + pending.next(bob::STATUS_CONTINUE, &vec, 1, [backing](uint64_t) {}); + } + if (done) { + feeder->DrainAndClose(); + feeder->readFinish_.Reset(); + args.GetReturnValue().Set(v8::False(env->isolate())); + return; + } else { + if (feeder->pendingPulls_.size() > 0) { + feeder->readFinish_.Reset(); + args.GetReturnValue().Set(v8::True(env->isolate())); + return; + } else { + Local readFinish = + Promise::Resolver::New(env->context()).ToLocalChecked(); + feeder->readFinish_.Reset(env->isolate(), readFinish); + args.GetReturnValue().Set(readFinish->GetPromise()); + return; + } + } +} + +JS_METHOD_IMPL(DataQueueFeeder::Error) { + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + // FIXME, how should I pass on the error + // ResetStream must be send also + feeder->DrainAndClose(); +} + +JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { + auto isolate = env->isolate(); + JS_NEW_CONSTRUCTOR(); + JS_INHERIT(AsyncWrap); + JS_CLASS(dataqueuefeeder); + SetProtoMethod(isolate, tmpl, "error", Error); + SetProtoMethod(isolate, tmpl, "submit", Submit); + SetProtoMethod(isolate, tmpl, "ready", Ready); +}) + + +void DataQueueFeeder::InitPerIsolate( + IsolateData* data, + Local target) { + // TODO(@jasnell): Implement the per-isolate state +} + +void DataQueueFeeder::InitPerContext(Realm* realm, Local target) { + SetConstructorFunction(realm->context(), + target, + "DataQueueFeeder", + GetConstructorTemplate(realm->env())); +} + +void DataQueueFeeder::RegisterExternalReferences( + ExternalReferenceRegistry* registry +) { + registry->Register(New); + registry->Register(Submit); + registry->Register(Error); + registry->Register(Ready); +} + } // namespace node #endif // OPENSSL_NO_QUIC diff --git a/src/quic/streams.h b/src/quic/streams.h index c230815d78e4be..a222e8585ae6b0 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -15,13 +15,18 @@ #include "bindingdata.h" #include "data.h" -namespace node::quic { +namespace node { + +using v8::Global; +using v8::Promise; +namespace quic { class Session; class Stream; using Ngtcp2Source = bob::SourceImpl; + // When a request to open a stream is made before a Session is able to actually // open a stream (either because the handshake is not yet sufficiently complete // or concurrency limits are temporarily reached) then the request to open the @@ -387,7 +392,58 @@ class Stream final : public AsyncWrap, void Schedule(Queue* queue); void Unschedule(); }; +} // namespace quic +class DataQueueFeeder final: public AsyncWrap { + public: +using Next = bob::Next; + + DataQueueFeeder(Environment* env, + v8::Local object); + + JS_CONSTRUCTOR(DataQueueFeeder); + JS_BINDING_INIT_BOILERPLATE(); + + static BaseObjectPtr Create(); + + void setDataQueue(std::shared_ptr queue) { + dataQueue_ = queue; + } + + void tryWakePulls(); + void DrainAndClose(); + + struct PendingPull { + Next next; + explicit PendingPull(Next next) + : next(std::move(next)) {} + }; + + void addPendingPull(PendingPull toAdd) { + pendingPulls_.emplace_back(std::move(toAdd)); + } + + bool Done() {return done;}; + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(DataQueueFeeder) + SET_SELF_SIZE(DataQueueFeeder) + + JS_METHOD(New); + JS_METHOD(Submit); + JS_METHOD(Error); + JS_METHOD(Ready); + + + private: + std::shared_ptr dataQueue_; + Global readFinish_; + + + + std::deque pendingPulls_; + bool done = false; +}; -} // namespace node::quic +} // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/test/parallel/test-quic-server-to-client-unidirectional.mjs b/test/parallel/test-quic-server-to-client-unidirectional.mjs new file mode 100644 index 00000000000000..933b8860ab8a22 --- /dev/null +++ b/test/parallel/test-quic-server-to-client-unidirectional.mjs @@ -0,0 +1,109 @@ +// Flags: --experimental-quic --no-warnings + +import { hasQuic, skip, mustCall } from '../common/index.mjs'; +import { ok, strictEqual, deepStrictEqual } from 'node:assert'; +import { readKey } from '../common/fixtures.mjs'; +import { TransformStream } from 'node:stream/web'; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +// Import after the hasQuic check +const { listen, connect } = await import('node:quic'); +const { createPrivateKey } = await import('node:crypto'); + +const keys = createPrivateKey(readKey('agent1-key.pem')); +const certs = readKey('agent1-cert.pem'); + +// The opened promise should resolve when the client finished reading +const clientFinished = Promise.withResolvers(); + +// start demo data +// FIX ME: move the following to a central place +// if used in several tests +// taken from @fails-components/webtransport tests +// by the original author +function createBytesChunk(length) { + const workArray = new Array(length / 2); + for (let i = 0; i < length / 4; i++) { + workArray[2 * i + 1] = length % 0xffff; + workArray[2 * i] = i; + } + const helper = new Uint16Array(workArray); + const toreturn = new Uint8Array( + helper.buffer, + helper.byteOffset, + helper.byteLength + ); + return toreturn; +} + +// The number in the comments, help you identify the chunk, as it is the length first two bytes +// this is helpful, when debugging buffer passing +const KNOWN_BYTES_LONG = [ + createBytesChunk(60000), // 96, 234 + createBytesChunk(12), // 0, 12 + createBytesChunk(50000), // 195, 80 + createBytesChunk(1600), // 6, 64 + createBytesChunk(20000), // 78, 32 + createBytesChunk(30000), // 117, 48 +]; + +// end demo data + +function uint8concat(arrays) { + const length = arrays.reduce((acc, curr) => acc + curr.length, 0); + const result = new Uint8Array(length); + let pos = 0; + let array = 0; + while (pos < length) { + const curArr = arrays[array]; + const curLen = curArr.byteLength; + const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); + dest.set(curArr); + array++; + pos += curArr.byteLength; + } +} + +const serverEndpoint = await listen(async (serverSession) => { + await serverSession.opened; + const transformStream = new TransformStream(); + const sendStream = await serverSession.createUnidirectionalStream({ body: transformStream.readable }); + strictEqual(sendStream.direction, 'uni'); + const serverWritable = transformStream.writable; + const writer = serverWritable.getWriter(); + for (const chunk of KNOWN_BYTES_LONG) { + await writer.ready; + await writer.write(chunk); + } + await writer.ready; + await writer.close(); + serverSession.close(); +}, { keys, certs }); + +// The server must have an address to connect to after listen resolves. +ok(serverEndpoint.address !== undefined); + +const clientSession = await connect(serverEndpoint.address); + +clientSession.onstream = mustCall(async (stream) => { + strictEqual(stream.direction, 'uni', 'Expects an unidirectional stream'); + const reader = stream.readable.getReader(); + const readChunks = []; + while (true) { + const { done, value } = await reader.read(); + if (value) { + ok(value instanceof Uint8Array, 'Expects value to be a Uint8Array'); + readChunks.push(value); + } + if (done) break; + } + // Now compare what we got + deepStrictEqual(uint8concat(KNOWN_BYTES_LONG), uint8concat(readChunks)); + clientFinished.resolve(); +}, 1); + +await clientFinished.promise; +clientSession.close(); From 33389e6966737b90951a7875fe0299193e3ac6ad Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 12 Oct 2025 20:10:22 +0200 Subject: [PATCH 02/84] quic: Fix linting --- src/dataqueue/queue.cc | 17 +++++++---------- src/dataqueue/queue.h | 4 ++-- src/quic/streams.cc | 35 +++++++++++++++-------------------- src/quic/streams.h | 20 ++++++-------------- 4 files changed, 30 insertions(+), 46 deletions(-) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index a41518fa54615c..3d98a6499fecf9 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -1069,8 +1069,7 @@ class FdEntry final : public EntryImpl { class FeederEntry final : public EntryImpl { public: - FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) { - } + FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) {} static std::unique_ptr Create(DataQueueFeeder* feeder) { return std::make_unique(feeder); @@ -1081,9 +1080,9 @@ class FeederEntry final : public EntryImpl { } std::unique_ptr slice( - uint64_t start, std::optional end = std::nullopt) override { - // we are not idempotent - return std::unique_ptr(nullptr); + uint64_t start, std::optional end = std::nullopt) override { + // we are not idempotent + return std::unique_ptr(nullptr); } std::optional size() const override { @@ -1104,11 +1103,10 @@ class FeederEntry final : public EntryImpl { public: static std::shared_ptr Create(FeederEntry* entry) { return std::make_shared(entry); - }; - - explicit ReaderImpl(FeederEntry* entry) : entry_(entry) { } + explicit ReaderImpl(FeederEntry* entry) : entry_(entry) {} + ~ReaderImpl() { entry_->feeder_->DrainAndClose(); } @@ -1123,7 +1121,7 @@ class FeederEntry final : public EntryImpl { return bob::STATUS_EOS; } entry_->feeder_->addPendingPull( - DataQueueFeeder::PendingPull(std::move(next))); + DataQueueFeeder::PendingPull(std::move(next))); entry_->feeder_->tryWakePulls(); return bob::STATUS_WAIT; } @@ -1139,7 +1137,6 @@ class FeederEntry final : public EntryImpl { // ============================================================================ - std::shared_ptr DataQueue::CreateIdempotent( std::vector> list) { // Any entry is invalid for an idempotent DataQueue if any of the entries diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index eb470f4c7bf10a..dc9871b712753f 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -16,8 +16,8 @@ #include namespace node { - using v8::Local; - using v8::Value; +using v8::Local; +using v8::Value; // Represents a sequenced collection of data sources that can be // consumed as a single logical stream of data. Sources can be diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 9d65ff9a264666..1057ea719bf817 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -229,7 +229,6 @@ Maybe> Stream::GetDataQueueFromSource( return Just(dataQueue); } - THROW_ERR_INVALID_ARG_TYPE(env, "Invalid data source type"); return Nothing>(); } @@ -1337,8 +1336,7 @@ void Stream::Unschedule() { } // namespace quic -DataQueueFeeder::DataQueueFeeder(Environment* env, - Local object) +DataQueueFeeder::DataQueueFeeder(Environment* env, Local object) : AsyncWrap(env, object) { MakeWeak(); } @@ -1367,7 +1365,6 @@ void DataQueueFeeder::DrainAndClose() { done = true; } - JS_METHOD_IMPL(DataQueueFeeder::New) { DCHECK(args.IsConstructCall()); auto env = Environment::GetCurrent(args); @@ -1383,7 +1380,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Ready) { return; } else { Local readFinish = - Promise::Resolver::New(env->context()).ToLocalChecked(); + Promise::Resolver::New(env->context()).ToLocalChecked(); feeder->readFinish_.Reset(env->isolate(), readFinish); args.GetReturnValue().Set(readFinish->GetPromise()); return; @@ -1397,7 +1394,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { bool done = false; if (args[1]->IsBoolean() && args[1].As()->Value()) { - done = true; + done = true; } if (!args[0].IsEmpty()) { CHECK_GT(feeder->pendingPulls_.size(), 0); @@ -1408,8 +1405,8 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { chunk = Uint8Array::New(buffer, 0, buffer->ByteLength()); } if (!chunk->IsTypedArray()) { - THROW_ERR_INVALID_ARG_TYPE(env, - "Invalid data must be Arraybuffer or TypedArray"); + THROW_ERR_INVALID_ARG_TYPE( + env, "Invalid data must be Arraybuffer or TypedArray"); } Local typedArray = chunk.As(); // now we create a copy @@ -1423,10 +1420,10 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { JS_TRY_ALLOCATE_BACKING(env, backingUniq, nread); std::shared_ptr backing = std::move(backingUniq); - auto originalStore = typedArray->Buffer()->GetBackingStore(); - const void* originalData = static_cast(originalStore->Data()) - + typedArray->ByteOffset(); - memcpy(backing->Data(), originalData, nread); + auto originalStore = typedArray->Buffer()->GetBackingStore(); + const void* originalData = + static_cast(originalStore->Data()) + typedArray->ByteOffset(); + memcpy(backing->Data(), originalData, nread); auto& pending = feeder->pendingPulls_.front(); auto pop = OnScopeLeave([feeder] { feeder->pendingPulls_.pop_front(); }); DataQueue::Vec vec; @@ -1446,7 +1443,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { return; } else { Local readFinish = - Promise::Resolver::New(env->context()).ToLocalChecked(); + Promise::Resolver::New(env->context()).ToLocalChecked(); feeder->readFinish_.Reset(env->isolate(), readFinish); args.GetReturnValue().Set(readFinish->GetPromise()); return; @@ -1467,15 +1464,14 @@ JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { JS_NEW_CONSTRUCTOR(); JS_INHERIT(AsyncWrap); JS_CLASS(dataqueuefeeder); - SetProtoMethod(isolate, tmpl, "error", Error); + SetProtoMethod(isolate, tmpl, "error", Error); SetProtoMethod(isolate, tmpl, "submit", Submit); - SetProtoMethod(isolate, tmpl, "ready", Ready); + SetProtoMethod(isolate, tmpl, "ready", Ready); }) -void DataQueueFeeder::InitPerIsolate( - IsolateData* data, - Local target) { +void DataQueueFeeder::InitPerIsolate(IsolateData* data, + Local target) { // TODO(@jasnell): Implement the per-isolate state } @@ -1487,8 +1483,7 @@ void DataQueueFeeder::InitPerContext(Realm* realm, Local target) { } void DataQueueFeeder::RegisterExternalReferences( - ExternalReferenceRegistry* registry -) { + ExternalReferenceRegistry* registry) { registry->Register(New); registry->Register(Submit); registry->Register(Error); diff --git a/src/quic/streams.h b/src/quic/streams.h index a222e8585ae6b0..038415686a6fbd 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -26,7 +26,6 @@ class Stream; using Ngtcp2Source = bob::SourceImpl; - // When a request to open a stream is made before a Session is able to actually // open a stream (either because the handshake is not yet sufficiently complete // or concurrency limits are temporarily reached) then the request to open the @@ -393,36 +392,32 @@ class Stream final : public AsyncWrap, void Unschedule(); }; } // namespace quic -class DataQueueFeeder final: public AsyncWrap { +class DataQueueFeeder final : public AsyncWrap { public: -using Next = bob::Next; + using Next = bob::Next; - DataQueueFeeder(Environment* env, - v8::Local object); + DataQueueFeeder(Environment* env, v8::Local object); JS_CONSTRUCTOR(DataQueueFeeder); JS_BINDING_INIT_BOILERPLATE(); static BaseObjectPtr Create(); - void setDataQueue(std::shared_ptr queue) { - dataQueue_ = queue; - } + void setDataQueue(std::shared_ptr queue) { dataQueue_ = queue; } void tryWakePulls(); void DrainAndClose(); struct PendingPull { Next next; - explicit PendingPull(Next next) - : next(std::move(next)) {} + explicit PendingPull(Next next) : next(std::move(next)) {} }; void addPendingPull(PendingPull toAdd) { pendingPulls_.emplace_back(std::move(toAdd)); } - bool Done() {return done;}; + bool Done() { return done; } SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(DataQueueFeeder) @@ -433,13 +428,10 @@ using Next = bob::Next; JS_METHOD(Error); JS_METHOD(Ready); - private: std::shared_ptr dataQueue_; Global readFinish_; - - std::deque pendingPulls_; bool done = false; }; From b8ee87ad4f7e2d3405da5354f1255f26d4f6dac4 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 12 Oct 2025 20:15:37 +0200 Subject: [PATCH 03/84] quic: Fix linting2 --- src/dataqueue/queue.cc | 6 ++---- src/quic/streams.cc | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 3d98a6499fecf9..8c99977f942d74 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -1080,7 +1080,7 @@ class FeederEntry final : public EntryImpl { } std::unique_ptr slice( - uint64_t start, std::optional end = std::nullopt) override { + uint64_t start, std::optional end = std::nullopt) override { // we are not idempotent return std::unique_ptr(nullptr); } @@ -1107,9 +1107,7 @@ class FeederEntry final : public EntryImpl { explicit ReaderImpl(FeederEntry* entry) : entry_(entry) {} - ~ReaderImpl() { - entry_->feeder_->DrainAndClose(); - } + ~ReaderImpl() { entry_->feeder_->DrainAndClose(); } int Pull(Next next, int options, diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 1057ea719bf817..ba72e23271eb05 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1406,7 +1406,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { } if (!chunk->IsTypedArray()) { THROW_ERR_INVALID_ARG_TYPE( - env, "Invalid data must be Arraybuffer or TypedArray"); + env, "Invalid data must be Arraybuffer or TypedArray"); } Local typedArray = chunk.As(); // now we create a copy @@ -1469,7 +1469,6 @@ JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { SetProtoMethod(isolate, tmpl, "ready", Ready); }) - void DataQueueFeeder::InitPerIsolate(IsolateData* data, Local target) { // TODO(@jasnell): Implement the per-isolate state @@ -1483,7 +1482,7 @@ void DataQueueFeeder::InitPerContext(Realm* realm, Local target) { } void DataQueueFeeder::RegisterExternalReferences( - ExternalReferenceRegistry* registry) { + ExternalReferenceRegistry* registry) { registry->Register(New); registry->Register(Submit); registry->Register(Error); From bf5e7352761af8e3fd8ff3423dd07865b0e04048 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 18 Oct 2025 19:58:13 +0200 Subject: [PATCH 04/84] Test: catch close error --- .../test-quic-server-to-client-unidirectional.mjs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/parallel/test-quic-server-to-client-unidirectional.mjs b/test/parallel/test-quic-server-to-client-unidirectional.mjs index 933b8860ab8a22..eb7e39d40ef3fb 100644 --- a/test/parallel/test-quic-server-to-client-unidirectional.mjs +++ b/test/parallel/test-quic-server-to-client-unidirectional.mjs @@ -71,6 +71,9 @@ const serverEndpoint = await listen(async (serverSession) => { await serverSession.opened; const transformStream = new TransformStream(); const sendStream = await serverSession.createUnidirectionalStream({ body: transformStream.readable }); + sendStream.closed.catch(() => { + // ignore + }); strictEqual(sendStream.direction, 'uni'); const serverWritable = transformStream.writable; const writer = serverWritable.getWriter(); @@ -80,6 +83,9 @@ const serverEndpoint = await listen(async (serverSession) => { } await writer.ready; await writer.close(); + serverSession.closed.catch((err) => { + // ignore the error + }); serverSession.close(); }, { keys, certs }); @@ -100,10 +106,16 @@ clientSession.onstream = mustCall(async (stream) => { } if (done) break; } + stream.closed.catch(() => { + // ignore + }); // Now compare what we got deepStrictEqual(uint8concat(KNOWN_BYTES_LONG), uint8concat(readChunks)); clientFinished.resolve(); }, 1); await clientFinished.promise; +clientSession.closed.catch((err) => { + // ignore the error +}); clientSession.close(); From 016af681b240636427e76e57c060d520532b126d Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 18 Oct 2025 20:06:05 +0200 Subject: [PATCH 05/84] quic: FIx several minor errors --- src/quic/session.cc | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/quic/session.cc b/src/quic/session.cc index 5abdbda2be76fa..282dc9f454fba1 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -562,9 +562,9 @@ struct Session::Impl final : public MemoryRetainer { // will remove themselves from the Session as soon as they are closed. // Note: we create a copy because the streams will remove themselves // while they are cleaning up which will invalidate the iterator. - StreamsMap streams = streams_; + StreamsMap streams = streams_; // needs to be a ref for (auto& stream : streams) stream.second->Destroy(last_error_); - DCHECK(streams.empty()); + DCHECK(streams_.empty()); // do not check our local copy // Clear the pending streams. while (!pending_bidi_stream_queue_.IsEmpty()) { @@ -1399,8 +1399,10 @@ void Session::Close(CloseMethod method) { Debug(this, "Closing with error: %s", impl_->last_error_); } - STAT_RECORD_TIMESTAMP(Stats, closing_at); - impl_->state_->closing = 1; + // This is done already in the implmentation + // STAT_RECORD_TIMESTAMP(Stats, closing_at); + // The next line would be prevent, that close of the implementation is executed! + // impl_->state_->closing = 1; // With both the DEFAULT and SILENT options, we will proceed to closing // the session immediately. All open streams will be immediately destroyed @@ -1451,7 +1453,8 @@ void Session::FinishClose() { // FinishClose() should be called only after, and as a result of, Close() // being called first. DCHECK(!is_destroyed()); - DCHECK(impl_->state_->closing); + // The next line does not make sense, as in the implementation is also checking if closing is not in progress + // DCHECK(impl_->state_->closing); // If impl_->Close() returns true, then the session can be destroyed // immediately without round-tripping through JavaScript. @@ -1469,8 +1472,7 @@ void Session::Destroy() { // being called first. DCHECK(impl_); DCHECK(impl_->state_->closing); - Debug(this, "Session destroyed"); - impl_.reset(); + Debug(this, "Session destroyed"); if (qlog_stream_ || keylog_stream_) { env()->SetImmediate( [qlog = qlog_stream_, keylog = keylog_stream_](Environment*) { @@ -1480,6 +1482,9 @@ void Session::Destroy() { } qlog_stream_.reset(); keylog_stream_.reset(); + impl_.reset(); // This can cause the session (so us) object to be garbage + // collected, so the session object may not be valid after + // this call. } PendingStream::PendingStreamQueue& Session::pending_bidi_stream_queue() const { @@ -2093,7 +2098,7 @@ void Session::RemoveStream(stream_id id) { // returns. if (impl_->state_->closing && impl_->state_->graceful_close) { FinishClose(); - CHECK(is_destroyed()); + // CHECK(is_destroyed()); // this will not work, our this pointer may not be valid anymore! } } From 360aba8c9d222b02b9fe2f8493e1f29d74fb01fa Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 18 Oct 2025 20:09:05 +0200 Subject: [PATCH 06/84] Fix DataQueueFeeder --- src/quic/streams.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index ba72e23271eb05..b80d67d38b7524 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -225,7 +225,7 @@ Maybe> Stream::GetDataQueueFromSource( ASSIGN_OR_RETURN_UNWRAP( &dataQueueFeeder, value, Nothing>()); std::shared_ptr dataQueue = DataQueue::Create(); - dataQueue->append(DataQueue::CreateFeederEntry(dataQueueFeeder)); + dataQueue->append(std::move(DataQueue::CreateFeederEntry(dataQueueFeeder))); return Just(dataQueue); } @@ -1396,7 +1396,8 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { if (args[1]->IsBoolean() && args[1].As()->Value()) { done = true; } - if (!args[0].IsEmpty()) { + if (!args[0].IsEmpty() && + !args[0]->IsUndefined() && !args[0]->IsNull()) { CHECK_GT(feeder->pendingPulls_.size(), 0); auto chunk = args[0]; @@ -1407,6 +1408,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { if (!chunk->IsTypedArray()) { THROW_ERR_INVALID_ARG_TYPE( env, "Invalid data must be Arraybuffer or TypedArray"); + return; } Local typedArray = chunk.As(); // now we create a copy From a178c969a1519265323bb80a1b31eb2c879cf417 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 18 Oct 2025 20:12:00 +0200 Subject: [PATCH 07/84] Fix Acknowledgement --- src/quic/streams.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index b80d67d38b7524..e7b91f38589c1c 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1128,10 +1128,7 @@ void Stream::Acknowledge(size_t datalen) { Debug(this, "Acknowledging %zu bytes", datalen); - // ngtcp2 guarantees that offset must always be greater than the previously - // received offset. - DCHECK_GE(datalen, STAT_GET(Stats, max_offset_ack)); - STAT_SET(Stats, max_offset_ack, datalen); + STAT_SET(Stats, max_offset_ack, STAT_GET(Stats, max_offset_ack) + datalen); // Consumes the given number of bytes in the buffer. outbound_->Acknowledge(datalen); From f80300b7f5358728aba8846f226fb4dfe7e16a21 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 16:01:06 +0200 Subject: [PATCH 08/84] Quic: fix Impl lifetime error --- src/quic/session.cc | 16 +++++++++++++--- src/quic/session.h | 4 +++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/quic/session.cc b/src/quic/session.cc index 282dc9f454fba1..8e320e28d27095 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -542,7 +542,12 @@ struct Session::Impl final : public MemoryRetainer { local_address_(config.local_address), remote_address_(config.remote_address), application_(SelectApplication(session, config_)), - timer_(session_->env(), [this] { session_->OnTimeout(); }) { + timer_(session_->env(), [this] { + auto impl = session_->impl_; // we hold a reference to ourself, + // as the reference from session to us may go away + // while we call OnTimeout + session_->OnTimeout(); + }) { timer_.Unref(); } DISALLOW_COPY_AND_MOVE(Impl) @@ -558,11 +563,16 @@ struct Session::Impl final : public MemoryRetainer { state_->closing = 1; STAT_RECORD_TIMESTAMP(Stats, closing_at); + // we hold a reference to ourself, + // as the reference from session to us may go away + // while we destroy streams + auto impl = session_->impl_; + // Iterate through all of the known streams and close them. The streams // will remove themselves from the Session as soon as they are closed. // Note: we create a copy because the streams will remove themselves // while they are cleaning up which will invalidate the iterator. - StreamsMap streams = streams_; // needs to be a ref + StreamsMap streams = streams_; for (auto& stream : streams) stream.second->Destroy(last_error_); DCHECK(streams_.empty()); // do not check our local copy @@ -1297,7 +1307,7 @@ Session::Session(Endpoint* endpoint, : AsyncWrap(endpoint->env(), object, PROVIDER_QUIC_SESSION), side_(config.side), allocator_(BindingData::Get(env())), - impl_(std::make_unique(this, endpoint, config)), + impl_(std::make_shared(this, endpoint, config)), connection_(InitConnection()), tls_session_(tls_context->NewSession(this, session_ticket)) { DCHECK(impl_); diff --git a/src/quic/session.h b/src/quic/session.h index ddaddb8d18a7a7..8f2da867454057 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -497,7 +497,9 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { Side side_; ngtcp2_mem allocator_; - std::unique_ptr impl_; + std::shared_ptr impl_; // we need to have a shared ptr, there are situations + // where Impl calls the session and the session resets this pointer to Impl, + // in this case we need to hold a local shared ptr to prevent use after free QuicConnectionPointer connection_; std::unique_ptr tls_session_; BaseObjectPtr qlog_stream_; From 526d8d54333f4af2d364ba58c3d4f4852ca647b2 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 17:28:58 +0200 Subject: [PATCH 09/84] Fix calling FinishClose multiple interleaving times --- src/quic/session.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/quic/session.cc b/src/quic/session.cc index 8e320e28d27095..616ea540e9e716 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -63,6 +63,7 @@ namespace quic { V(DATAGRAM, datagram, uint8_t) \ V(SESSION_TICKET, session_ticket, uint8_t) \ V(CLOSING, closing, uint8_t) \ + V(FINISH_CLOSING, finish_closing, uint8_t) \ V(GRACEFUL_CLOSE, graceful_close, uint8_t) \ V(SILENT_CLOSE, silent_close, uint8_t) \ V(STATELESS_RESET, stateless_reset, uint8_t) \ @@ -1403,7 +1404,7 @@ bool Session::is_destroyed_or_closing() const { void Session::Close(CloseMethod method) { if (is_destroyed()) return; - auto& stats_ = impl_->stats_; + // auto& stats_ = impl_->stats_; if (impl_->last_error_) { Debug(this, "Closing with error: %s", impl_->last_error_); @@ -1460,6 +1461,8 @@ void Session::Close(CloseMethod method) { } void Session::FinishClose() { + if (impl_->state_->finish_closing) return; // we were already called, avoids calling it twice + impl_->state_->finish_closing = 1; // FinishClose() should be called only after, and as a result of, Close() // being called first. DCHECK(!is_destroyed()); From 82b8bfd041cbe46acefce678c35b6864a64d5264 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 19:43:17 +0200 Subject: [PATCH 10/84] quic: fix crash in DataFeeder impl --- src/dataqueue/queue.cc | 4 +++- src/quic/streams.cc | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 8c99977f942d74..dc9488a044f603 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -450,9 +450,11 @@ class NonIdempotentDataQueueReader final status == bob::Status::STATUS_EOS, vecs == nullptr && count == 0); if (status == bob::Status::STATUS_EOS) { + current_reader_ = nullptr; // must be done before erasing the entries + // as FdEntry's and FeederEntry's reader hold a pointer to the entry! + // and FeederEntry invoke it in destructor data_queue_->entries_.erase(data_queue_->entries_.begin()); ended_ = data_queue_->entries_.empty(); - current_reader_ = nullptr; if (!ended_) status = bob::Status::STATUS_CONTINUE; std::move(next)(status, nullptr, 0, [](uint64_t) {}); return; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index e7b91f38589c1c..c2ba236802189a 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -225,7 +225,7 @@ Maybe> Stream::GetDataQueueFromSource( ASSIGN_OR_RETURN_UNWRAP( &dataQueueFeeder, value, Nothing>()); std::shared_ptr dataQueue = DataQueue::Create(); - dataQueue->append(std::move(DataQueue::CreateFeederEntry(dataQueueFeeder))); + dataQueue->append(DataQueue::CreateFeederEntry(dataQueueFeeder)); return Just(dataQueue); } @@ -1349,6 +1349,7 @@ void DataQueueFeeder::tryWakePulls() { void DataQueueFeeder::DrainAndClose() { if (done) return; + done = true; // do not do this several time, and note, it may be called several times. while (!pendingPulls_.empty()) { auto& pending = pendingPulls_.front(); auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); @@ -1359,7 +1360,6 @@ void DataQueueFeeder::DrainAndClose() { (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); readFinish_.Reset(); } - done = true; } JS_METHOD_IMPL(DataQueueFeeder::New) { From 33278c6a9814d4283a4671c9d35c727d143e66d6 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 19:54:14 +0200 Subject: [PATCH 11/84] quic: Fix linting 3 --- src/dataqueue/queue.cc | 8 +++++--- src/quic/session.cc | 26 ++++++++++++++++---------- src/quic/session.h | 8 +++++--- src/quic/streams.cc | 6 ++++-- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index dc9488a044f603..ce982652ce43ae 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -450,9 +450,11 @@ class NonIdempotentDataQueueReader final status == bob::Status::STATUS_EOS, vecs == nullptr && count == 0); if (status == bob::Status::STATUS_EOS) { - current_reader_ = nullptr; // must be done before erasing the entries - // as FdEntry's and FeederEntry's reader hold a pointer to the entry! - // and FeederEntry invoke it in destructor + current_reader_ = nullptr; + // must be done before erasing the entries + // as FdEntry's and FeederEntry's reader hold + // a pointer to the entry! and FeederEntry + // invoke it in destructor data_queue_->entries_.erase(data_queue_->entries_.begin()); ended_ = data_queue_->entries_.empty(); if (!ended_) status = bob::Status::STATUS_CONTINUE; diff --git a/src/quic/session.cc b/src/quic/session.cc index 616ea540e9e716..78ba400353bbfc 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -544,7 +544,7 @@ struct Session::Impl final : public MemoryRetainer { remote_address_(config.remote_address), application_(SelectApplication(session, config_)), timer_(session_->env(), [this] { - auto impl = session_->impl_; // we hold a reference to ourself, + auto impl = session_->impl_; // we hold a reference to ourself, // as the reference from session to us may go away // while we call OnTimeout session_->OnTimeout(); @@ -567,7 +567,7 @@ struct Session::Impl final : public MemoryRetainer { // we hold a reference to ourself, // as the reference from session to us may go away // while we destroy streams - auto impl = session_->impl_; + auto impl = session_->impl_; // Iterate through all of the known streams and close them. The streams // will remove themselves from the Session as soon as they are closed. @@ -575,7 +575,7 @@ struct Session::Impl final : public MemoryRetainer { // while they are cleaning up which will invalidate the iterator. StreamsMap streams = streams_; for (auto& stream : streams) stream.second->Destroy(last_error_); - DCHECK(streams_.empty()); // do not check our local copy + DCHECK(streams_.empty()); // do not check our local copy // Clear the pending streams. while (!pending_bidi_stream_queue_.IsEmpty()) { @@ -1412,7 +1412,8 @@ void Session::Close(CloseMethod method) { // This is done already in the implmentation // STAT_RECORD_TIMESTAMP(Stats, closing_at); - // The next line would be prevent, that close of the implementation is executed! + // The next line would prevent, + // that close of the implementation is executed! // impl_->state_->closing = 1; // With both the DEFAULT and SILENT options, we will proceed to closing @@ -1461,12 +1462,15 @@ void Session::Close(CloseMethod method) { } void Session::FinishClose() { - if (impl_->state_->finish_closing) return; // we were already called, avoids calling it twice + if (impl_->state_->finish_closing) return; + // we were already called, avoids calling it twice impl_->state_->finish_closing = 1; - // FinishClose() should be called only after, and as a result of, Close() + // FinishClose() should be called only after, + // and as a result of, Close() // being called first. DCHECK(!is_destroyed()); - // The next line does not make sense, as in the implementation is also checking if closing is not in progress + // The next line does not make sense, as in the implementation + // is also checking if closing is not in progress // DCHECK(impl_->state_->closing); // If impl_->Close() returns true, then the session can be destroyed @@ -1481,11 +1485,12 @@ void Session::FinishClose() { } void Session::Destroy() { - // Destroy() should be called only after, and as a result of, Close() + // Destroy() should be called only after, + // and as a result of, Close() // being called first. DCHECK(impl_); DCHECK(impl_->state_->closing); - Debug(this, "Session destroyed"); + Debug(this, "Session destroyed"); if (qlog_stream_ || keylog_stream_) { env()->SetImmediate( [qlog = qlog_stream_, keylog = keylog_stream_](Environment*) { @@ -2111,7 +2116,8 @@ void Session::RemoveStream(stream_id id) { // returns. if (impl_->state_->closing && impl_->state_->graceful_close) { FinishClose(); - // CHECK(is_destroyed()); // this will not work, our this pointer may not be valid anymore! + // CHECK(is_destroyed()); + // this will not work, our this pointer may not be valid anymore! } } diff --git a/src/quic/session.h b/src/quic/session.h index 8f2da867454057..b7faef25cf5052 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -497,9 +497,11 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { Side side_; ngtcp2_mem allocator_; - std::shared_ptr impl_; // we need to have a shared ptr, there are situations - // where Impl calls the session and the session resets this pointer to Impl, - // in this case we need to hold a local shared ptr to prevent use after free + std::shared_ptr impl_; // we need to have a shared ptr, + // there are situations, where Impl calls the session and + // the session resets this pointer to Impl, + // in this case we need to hold a local shared ptr + // to prevent use after free QuicConnectionPointer connection_; std::unique_ptr tls_session_; BaseObjectPtr qlog_stream_; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index c2ba236802189a..51f23381ef9473 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1349,7 +1349,9 @@ void DataQueueFeeder::tryWakePulls() { void DataQueueFeeder::DrainAndClose() { if (done) return; - done = true; // do not do this several time, and note, it may be called several times. + done = true; + // do not do this several time, and note, + // it may be called several times. while (!pendingPulls_.empty()) { auto& pending = pendingPulls_.front(); auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); @@ -1393,7 +1395,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { if (args[1]->IsBoolean() && args[1].As()->Value()) { done = true; } - if (!args[0].IsEmpty() && + if (!args[0].IsEmpty() && !args[0]->IsUndefined() && !args[0]->IsNull()) { CHECK_GT(feeder->pendingPulls_.size(), 0); auto chunk = args[0]; From f31ed613072cb233e9375365730290c1947b1a94 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 20:00:56 +0200 Subject: [PATCH 12/84] quic: Fix linting 4 --- src/quic/session.cc | 2 +- src/quic/streams.cc | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/quic/session.cc b/src/quic/session.cc index 78ba400353bbfc..228f322ea688c4 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -548,7 +548,7 @@ struct Session::Impl final : public MemoryRetainer { // as the reference from session to us may go away // while we call OnTimeout session_->OnTimeout(); - }) { + }) { timer_.Unref(); } DISALLOW_COPY_AND_MOVE(Impl) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 51f23381ef9473..23bd79b0a3ea50 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1395,8 +1395,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { if (args[1]->IsBoolean() && args[1].As()->Value()) { done = true; } - if (!args[0].IsEmpty() && - !args[0]->IsUndefined() && !args[0]->IsNull()) { + if (!args[0].IsEmpty() && !args[0]->IsUndefined() && !args[0]->IsNull()) { CHECK_GT(feeder->pendingPulls_.size(), 0); auto chunk = args[0]; From fea4894a3ea73688730d63d1d0599347da25892f Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 19 Oct 2025 22:20:49 +0200 Subject: [PATCH 13/84] quic: Move DataQueueFeeder parts to header for non-windows build --- src/quic/streams.cc | 26 -------------------------- src/quic/streams.h | 26 ++++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 23bd79b0a3ea50..592aae3659701c 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1338,32 +1338,6 @@ DataQueueFeeder::DataQueueFeeder(Environment* env, Local object) MakeWeak(); } -void DataQueueFeeder::tryWakePulls() { - if (!readFinish_.IsEmpty()) { - Local resolver = readFinish_.Get(env()->isolate()); - // I do not think, that this can error... - (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); - readFinish_.Reset(); - } -} - -void DataQueueFeeder::DrainAndClose() { - if (done) return; - done = true; - // do not do this several time, and note, - // it may be called several times. - while (!pendingPulls_.empty()) { - auto& pending = pendingPulls_.front(); - auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); - pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); - } - if (!readFinish_.IsEmpty()) { - Local resolver = readFinish_.Get(env()->isolate()); - (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); - readFinish_.Reset(); - } -} - JS_METHOD_IMPL(DataQueueFeeder::New) { DCHECK(args.IsConstructCall()); auto env = Environment::GetCurrent(args); diff --git a/src/quic/streams.h b/src/quic/streams.h index 038415686a6fbd..67f52e21e1ea2a 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -436,6 +436,32 @@ class DataQueueFeeder final : public AsyncWrap { bool done = false; }; +void DataQueueFeeder::tryWakePulls() { + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + // I do not think, that this can error... + (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); + readFinish_.Reset(); + } +} + +void DataQueueFeeder::DrainAndClose() { + if (done) return; + done = true; + // do not do this several time, and note, + // it may be called several times. + while (!pendingPulls_.empty()) { + auto& pending = pendingPulls_.front(); + auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); + pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + } + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); + readFinish_.Reset(); + } +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS From cb37974f0f305de9edeba9aac7ba3aea580dd93c Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Mon, 20 Oct 2025 07:39:32 +0200 Subject: [PATCH 14/84] quic: Move DataQueueFeeder to class body --- src/quic/streams.h | 53 ++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/src/quic/streams.h b/src/quic/streams.h index 67f52e21e1ea2a..23b320788bf7cd 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -405,8 +405,31 @@ class DataQueueFeeder final : public AsyncWrap { void setDataQueue(std::shared_ptr queue) { dataQueue_ = queue; } - void tryWakePulls(); - void DrainAndClose(); + void tryWakePulls() { + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + // I do not think, that this can error... + (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); + readFinish_.Reset(); + } + } + + void DrainAndClose() { + if (done) return; + done = true; + // do not do this several time, and note, + // it may be called several times. + while (!pendingPulls_.empty()) { + auto& pending = pendingPulls_.front(); + auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); + pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + } + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); + readFinish_.Reset(); + } + } struct PendingPull { Next next; @@ -436,32 +459,6 @@ class DataQueueFeeder final : public AsyncWrap { bool done = false; }; -void DataQueueFeeder::tryWakePulls() { - if (!readFinish_.IsEmpty()) { - Local resolver = readFinish_.Get(env()->isolate()); - // I do not think, that this can error... - (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); - readFinish_.Reset(); - } -} - -void DataQueueFeeder::DrainAndClose() { - if (done) return; - done = true; - // do not do this several time, and note, - // it may be called several times. - while (!pendingPulls_.empty()) { - auto& pending = pendingPulls_.front(); - auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); - pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); - } - if (!readFinish_.IsEmpty()) { - Local resolver = readFinish_.Get(env()->isolate()); - (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); - readFinish_.Reset(); - } -} - } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS From cc57a303f50c82214b1b61d720e7da4550b4219c Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Mon, 20 Oct 2025 08:07:03 +0200 Subject: [PATCH 15/84] quic: Fix compilation on non-Windows --- src/quic/streams.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/quic/streams.h b/src/quic/streams.h index 23b320788bf7cd..532a07b201b96d 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -409,7 +409,7 @@ class DataQueueFeeder final : public AsyncWrap { if (!readFinish_.IsEmpty()) { Local resolver = readFinish_.Get(env()->isolate()); // I do not think, that this can error... - (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); + v8::Maybe ignoredResult = resolver->Resolve(env()->context(), v8::True(env()->isolate())); readFinish_.Reset(); } } @@ -426,7 +426,7 @@ class DataQueueFeeder final : public AsyncWrap { } if (!readFinish_.IsEmpty()) { Local resolver = readFinish_.Get(env()->isolate()); - (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); + v8::Maybe ignoredResult = resolver->Resolve(env()->context(), v8::False(env()->isolate())); readFinish_.Reset(); } } From d1c47e291af102cbd2ee297dfa71403536b4725d Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Mon, 20 Oct 2025 08:20:05 +0200 Subject: [PATCH 16/84] quic: Fix lint and unused variable --- src/quic/streams.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/quic/streams.h b/src/quic/streams.h index 532a07b201b96d..70b3c7c5762083 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -409,7 +409,8 @@ class DataQueueFeeder final : public AsyncWrap { if (!readFinish_.IsEmpty()) { Local resolver = readFinish_.Get(env()->isolate()); // I do not think, that this can error... - v8::Maybe ignoredResult = resolver->Resolve(env()->context(), v8::True(env()->isolate())); + [[maybe_unused]] v8::Maybe ignoredResult = + resolver->Resolve(env()->context(), v8::True(env()->isolate())); readFinish_.Reset(); } } @@ -426,7 +427,8 @@ class DataQueueFeeder final : public AsyncWrap { } if (!readFinish_.IsEmpty()) { Local resolver = readFinish_.Get(env()->isolate()); - v8::Maybe ignoredResult = resolver->Resolve(env()->context(), v8::False(env()->isolate())); + [[maybe_unused]] v8::Maybe ignoredResult = + resolver->Resolve(env()->context(), v8::False(env()->isolate())); readFinish_.Reset(); } } From 6eabd649d0bbda85b99bc67d1a6ae3f8a4e4c5b0 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Mon, 20 Oct 2025 08:23:07 +0200 Subject: [PATCH 17/84] quic: Fix format-cpp --- src/quic/streams.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/quic/streams.h b/src/quic/streams.h index 70b3c7c5762083..850edc533d6cd6 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -410,7 +410,7 @@ class DataQueueFeeder final : public AsyncWrap { Local resolver = readFinish_.Get(env()->isolate()); // I do not think, that this can error... [[maybe_unused]] v8::Maybe ignoredResult = - resolver->Resolve(env()->context(), v8::True(env()->isolate())); + resolver->Resolve(env()->context(), v8::True(env()->isolate())); readFinish_.Reset(); } } @@ -428,7 +428,7 @@ class DataQueueFeeder final : public AsyncWrap { if (!readFinish_.IsEmpty()) { Local resolver = readFinish_.Get(env()->isolate()); [[maybe_unused]] v8::Maybe ignoredResult = - resolver->Resolve(env()->context(), v8::False(env()->isolate())); + resolver->Resolve(env()->context(), v8::False(env()->isolate())); readFinish_.Reset(); } } From fde040499aa9974ddde292c355c4f7518b134a61 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 1 Nov 2025 12:20:27 +0000 Subject: [PATCH 18/84] quic: Improve test coverage --- src/quic/streams.cc | 13 ++++++ src/quic/streams.h | 1 + .../test-quic-internal-dataqueuefeeder.mjs | 40 +++++++++++++++++++ ...t-quic-server-to-client-unidirectional.mjs | 2 +- 4 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-quic-internal-dataqueuefeeder.mjs diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 592aae3659701c..15af7b22db5aee 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1433,6 +1433,17 @@ JS_METHOD_IMPL(DataQueueFeeder::Error) { feeder->DrainAndClose(); } +JS_METHOD_IMPL(DataQueueFeeder::AddFakePull) { + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + // this adds a fake pull for testing code, not to be used anywhere else + Next dummyNext = [](int, const DataQueue::Vec*, size_t, bob::Done) { + // intentionally empty + }; + feeder->addPendingPull(PendingPull(std::move(dummyNext))); + feeder->tryWakePulls(); +} + JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { auto isolate = env->isolate(); JS_NEW_CONSTRUCTOR(); @@ -1441,6 +1452,7 @@ JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { SetProtoMethod(isolate, tmpl, "error", Error); SetProtoMethod(isolate, tmpl, "submit", Submit); SetProtoMethod(isolate, tmpl, "ready", Ready); + SetProtoMethod(isolate, tmpl, "addFakePull", AddFakePull); }) void DataQueueFeeder::InitPerIsolate(IsolateData* data, @@ -1461,6 +1473,7 @@ void DataQueueFeeder::RegisterExternalReferences( registry->Register(Submit); registry->Register(Error); registry->Register(Ready); + registry->Register(AddFakePull); } } // namespace node diff --git a/src/quic/streams.h b/src/quic/streams.h index 850edc533d6cd6..4c5ff34f0e5de3 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -452,6 +452,7 @@ class DataQueueFeeder final : public AsyncWrap { JS_METHOD(Submit); JS_METHOD(Error); JS_METHOD(Ready); + JS_METHOD(AddFakePull); private: std::shared_ptr dataQueue_; diff --git a/test/parallel/test-quic-internal-dataqueuefeeder.mjs b/test/parallel/test-quic-internal-dataqueuefeeder.mjs new file mode 100644 index 00000000000000..926f08f8519137 --- /dev/null +++ b/test/parallel/test-quic-internal-dataqueuefeeder.mjs @@ -0,0 +1,40 @@ +// Flags: --expose-internals --experimental-quic --no-warnings +import { hasQuic, skip } from '../common/index.mjs'; +import { + rejects, +} from 'node:assert'; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +const { internalBinding } = (await import('internal/test/binding')).default; + +const { + DataQueueFeeder, +} = internalBinding('quic'); + + +const feeder = new DataQueueFeeder(); +feeder.addFakePull(); +await feeder.ready(); +let lastprom = feeder.submit(new Uint8Array([1, 2, 3]), false); +feeder.addFakePull(); +await lastprom; +lastprom = feeder.submit(new Uint16Array([1, 2, 3]), false); +feeder.addFakePull(); +await lastprom; +lastprom = feeder.submit(new Uint32Array([1, 2, 3]), false); +feeder.addFakePull(); +await lastprom; +lastprom = feeder.submit(new Uint8Array([1, 2, 3]).buffer, false); +feeder.addFakePull(); +await lastprom; +await rejects(async () => feeder.submit({}, false), { + code: 'ERR_INVALID_ARG_TYPE', +}); +await rejects(async () => feeder.submit([], false), { + code: 'ERR_INVALID_ARG_TYPE', +}); +await feeder.submit(undefined, true); +feeder.error(new Error('Can we send an error')); diff --git a/test/parallel/test-quic-server-to-client-unidirectional.mjs b/test/parallel/test-quic-server-to-client-unidirectional.mjs index eb7e39d40ef3fb..77e251990883a4 100644 --- a/test/parallel/test-quic-server-to-client-unidirectional.mjs +++ b/test/parallel/test-quic-server-to-client-unidirectional.mjs @@ -45,7 +45,7 @@ const KNOWN_BYTES_LONG = [ createBytesChunk(60000), // 96, 234 createBytesChunk(12), // 0, 12 createBytesChunk(50000), // 195, 80 - createBytesChunk(1600), // 6, 64 + createBytesChunk(1600).buffer, // 6, 64 we use buffer here to increae test coverage createBytesChunk(20000), // 78, 32 createBytesChunk(30000), // 117, 48 ]; From da0298f69450560eceb2bfdc6c90a935d12fbbe9 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 08:37:51 +0100 Subject: [PATCH 19/84] quic: Move helper functions to separate files --- test/common/quic/test-helpers.mjs | 45 +++++++++++++++++ ...t-quic-server-to-client-unidirectional.mjs | 48 +------------------ 2 files changed, 46 insertions(+), 47 deletions(-) create mode 100644 test/common/quic/test-helpers.mjs diff --git a/test/common/quic/test-helpers.mjs b/test/common/quic/test-helpers.mjs new file mode 100644 index 00000000000000..0cf4ebbf591fc6 --- /dev/null +++ b/test/common/quic/test-helpers.mjs @@ -0,0 +1,45 @@ +// start demo data +// taken from @fails-components/webtransport tests +// by the original author +function createBytesChunk(length) { + const workArray = new Array(length / 2); + for (let i = 0; i < length / 4; i++) { + workArray[2 * i + 1] = length % 0xffff; + workArray[2 * i] = i; + } + const helper = new Uint16Array(workArray); + const toreturn = new Uint8Array( + helper.buffer, + helper.byteOffset, + helper.byteLength + ); + return toreturn; +} + +// The number in the comments, help you identify the chunk, as it is the length first two bytes +// this is helpful, when debugging buffer passing +export const KNOWN_BYTES_LONG = [ + createBytesChunk(60000), // 96, 234 + createBytesChunk(12), // 0, 12 + createBytesChunk(50000), // 195, 80 + createBytesChunk(1600).buffer, // 6, 64 we use buffer here to increae test coverage + createBytesChunk(20000), // 78, 32 + createBytesChunk(30000), // 117, 48 +]; + +// end demo data + +export function uint8concat(arrays) { + const length = arrays.reduce((acc, curr) => acc + curr.length, 0); + const result = new Uint8Array(length); + let pos = 0; + let array = 0; + while (pos < length) { + const curArr = arrays[array]; + const curLen = curArr.byteLength; + const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); + dest.set(curArr); + array++; + pos += curArr.byteLength; + } +} \ No newline at end of file diff --git a/test/parallel/test-quic-server-to-client-unidirectional.mjs b/test/parallel/test-quic-server-to-client-unidirectional.mjs index 77e251990883a4..16624c6f81d123 100644 --- a/test/parallel/test-quic-server-to-client-unidirectional.mjs +++ b/test/parallel/test-quic-server-to-client-unidirectional.mjs @@ -3,6 +3,7 @@ import { hasQuic, skip, mustCall } from '../common/index.mjs'; import { ok, strictEqual, deepStrictEqual } from 'node:assert'; import { readKey } from '../common/fixtures.mjs'; +import { KNOWN_BYTES_LONG, uint8concat } from '../common/quic/test-helpers.mjs'; import { TransformStream } from 'node:stream/web'; if (!hasQuic) { @@ -19,53 +20,6 @@ const certs = readKey('agent1-cert.pem'); // The opened promise should resolve when the client finished reading const clientFinished = Promise.withResolvers(); -// start demo data -// FIX ME: move the following to a central place -// if used in several tests -// taken from @fails-components/webtransport tests -// by the original author -function createBytesChunk(length) { - const workArray = new Array(length / 2); - for (let i = 0; i < length / 4; i++) { - workArray[2 * i + 1] = length % 0xffff; - workArray[2 * i] = i; - } - const helper = new Uint16Array(workArray); - const toreturn = new Uint8Array( - helper.buffer, - helper.byteOffset, - helper.byteLength - ); - return toreturn; -} - -// The number in the comments, help you identify the chunk, as it is the length first two bytes -// this is helpful, when debugging buffer passing -const KNOWN_BYTES_LONG = [ - createBytesChunk(60000), // 96, 234 - createBytesChunk(12), // 0, 12 - createBytesChunk(50000), // 195, 80 - createBytesChunk(1600).buffer, // 6, 64 we use buffer here to increae test coverage - createBytesChunk(20000), // 78, 32 - createBytesChunk(30000), // 117, 48 -]; - -// end demo data - -function uint8concat(arrays) { - const length = arrays.reduce((acc, curr) => acc + curr.length, 0); - const result = new Uint8Array(length); - let pos = 0; - let array = 0; - while (pos < length) { - const curArr = arrays[array]; - const curLen = curArr.byteLength; - const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); - dest.set(curArr); - array++; - pos += curArr.byteLength; - } -} const serverEndpoint = await listen(async (serverSession) => { await serverSession.opened; From af5d78683c51ba55a98e24c03b8aa0026556648f Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 15:25:19 +0100 Subject: [PATCH 20/84] quic: add client to server unidirectional test --- ...t-quic-client-to-server-unidirectional.mjs | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 test/parallel/test-quic-client-to-server-unidirectional.mjs diff --git a/test/parallel/test-quic-client-to-server-unidirectional.mjs b/test/parallel/test-quic-client-to-server-unidirectional.mjs new file mode 100644 index 00000000000000..4eb188d953425b --- /dev/null +++ b/test/parallel/test-quic-client-to-server-unidirectional.mjs @@ -0,0 +1,82 @@ +// Flags: --experimental-quic --no-warnings + +import { hasQuic, skip, mustCall } from '../common/index.mjs'; +import { ok, strictEqual, deepStrictEqual } from 'node:assert'; +import { readKey } from '../common/fixtures.mjs'; +import { KNOWN_BYTES_LONG, uint8concat } from '../common/quic/test-helpers.mjs'; +import { TransformStream } from 'node:stream/web'; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +// Import after the hasQuic check +const { listen, connect } = await import('node:quic'); +const { createPrivateKey } = await import('node:crypto'); + +const keys = createPrivateKey(readKey('agent1-key.pem')); +const certs = readKey('agent1-cert.pem'); + +// The opened promise should resolve when the client finished reading +const serverFinished = Promise.withResolvers(); + + +const serverEndpoint = await listen(async (serverSession) => { + serverSession.onstream = mustCall(async (stream) => { + strictEqual(stream.direction, 'uni', 'Expects an unidirectional stream'); + const reader = stream.readable.getReader(); + const readChunks = []; + let readc = 0; + while (true) { + const { done, value } = await reader.read(); + // if (readc > 20) throw new Error("after read " + readc); + if (value) { + ok(value instanceof Uint8Array, 'Expects value to be a Uint8Array'); + readChunks.push(value); + } + if (done) break; + readc++; + } + stream.closed.catch(() => { + // ignore + }); + // Now compare what we got + deepStrictEqual(uint8concat(KNOWN_BYTES_LONG), uint8concat(readChunks)); + serverFinished.resolve(); + }, 1); + + await serverFinished.promise; + serverSession.closed.catch((err) => { + // ignore the error + }); + serverSession.close(); +}, { keys, certs }); + +// The server must have an address to connect to after listen resolves. +ok(serverEndpoint.address !== undefined); + +const clientSession = await connect(serverEndpoint.address); +await clientSession.opened; + + +const transformStream = new TransformStream(); +const sendStream = await clientSession.createUnidirectionalStream({ body: transformStream.readable }); +sendStream.closed.catch(() => { + // ignore +}); +strictEqual(sendStream.direction, 'uni'); +const clientWritable = transformStream.writable; +const writer = clientWritable.getWriter(); +let run = 0 +for (const chunk of KNOWN_BYTES_LONG) { + await writer.ready; + await writer.write(chunk); + run++; +} +await writer.ready; +await writer.close(); +clientSession.closed.catch((err) => { + // ignore the error +}); +clientSession.close(); +await serverFinished.promise; \ No newline at end of file From 963d492fd3e3e0045161f1fc44acc2320bb872d7 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 15:42:02 +0100 Subject: [PATCH 21/84] quic: Fix new test and lint --- test/common/quic/test-helpers.mjs | 90 +++++++++---------- ...t-quic-client-to-server-unidirectional.mjs | 4 +- 2 files changed, 46 insertions(+), 48 deletions(-) diff --git a/test/common/quic/test-helpers.mjs b/test/common/quic/test-helpers.mjs index 0cf4ebbf591fc6..00c56c41016eea 100644 --- a/test/common/quic/test-helpers.mjs +++ b/test/common/quic/test-helpers.mjs @@ -1,45 +1,45 @@ -// start demo data -// taken from @fails-components/webtransport tests -// by the original author -function createBytesChunk(length) { - const workArray = new Array(length / 2); - for (let i = 0; i < length / 4; i++) { - workArray[2 * i + 1] = length % 0xffff; - workArray[2 * i] = i; - } - const helper = new Uint16Array(workArray); - const toreturn = new Uint8Array( - helper.buffer, - helper.byteOffset, - helper.byteLength - ); - return toreturn; -} - -// The number in the comments, help you identify the chunk, as it is the length first two bytes -// this is helpful, when debugging buffer passing -export const KNOWN_BYTES_LONG = [ - createBytesChunk(60000), // 96, 234 - createBytesChunk(12), // 0, 12 - createBytesChunk(50000), // 195, 80 - createBytesChunk(1600).buffer, // 6, 64 we use buffer here to increae test coverage - createBytesChunk(20000), // 78, 32 - createBytesChunk(30000), // 117, 48 -]; - -// end demo data - -export function uint8concat(arrays) { - const length = arrays.reduce((acc, curr) => acc + curr.length, 0); - const result = new Uint8Array(length); - let pos = 0; - let array = 0; - while (pos < length) { - const curArr = arrays[array]; - const curLen = curArr.byteLength; - const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); - dest.set(curArr); - array++; - pos += curArr.byteLength; - } -} \ No newline at end of file +// start demo data +// taken from @fails-components/webtransport tests +// by the original author +function createBytesChunk(length) { + const workArray = new Array(length / 2); + for (let i = 0; i < length / 4; i++) { + workArray[2 * i + 1] = length % 0xffff; + workArray[2 * i] = i; + } + const helper = new Uint16Array(workArray); + const toreturn = new Uint8Array( + helper.buffer, + helper.byteOffset, + helper.byteLength, + ); + return toreturn; +} + +// The number in the comments, help you identify the chunk, as it is the length first two bytes +// this is helpful, when debugging buffer passing +export const KNOWN_BYTES_LONG = [ + createBytesChunk(60000), // 96, 234 + createBytesChunk(12), // 0, 12 + createBytesChunk(50000), // 195, 80 + createBytesChunk(1600).buffer, // 6, 64 we use buffer here to increae test coverage + createBytesChunk(20000), // 78, 32 + createBytesChunk(30000), // 117, 48 +]; + +// end demo data + +export function uint8concat(arrays) { + const length = arrays.reduce((acc, curr) => acc + curr.length, 0); + const result = new Uint8Array(length); + let pos = 0; + let array = 0; + while (pos < length) { + const curArr = arrays[array]; + const curLen = curArr.byteLength; + const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); + dest.set(curArr); + array++; + pos += curArr.byteLength; + } +} diff --git a/test/parallel/test-quic-client-to-server-unidirectional.mjs b/test/parallel/test-quic-client-to-server-unidirectional.mjs index 4eb188d953425b..be5eaba1f3cef9 100644 --- a/test/parallel/test-quic-client-to-server-unidirectional.mjs +++ b/test/parallel/test-quic-client-to-server-unidirectional.mjs @@ -26,7 +26,6 @@ const serverEndpoint = await listen(async (serverSession) => { strictEqual(stream.direction, 'uni', 'Expects an unidirectional stream'); const reader = stream.readable.getReader(); const readChunks = []; - let readc = 0; while (true) { const { done, value } = await reader.read(); // if (readc > 20) throw new Error("after read " + readc); @@ -67,7 +66,6 @@ sendStream.closed.catch(() => { strictEqual(sendStream.direction, 'uni'); const clientWritable = transformStream.writable; const writer = clientWritable.getWriter(); -let run = 0 for (const chunk of KNOWN_BYTES_LONG) { await writer.ready; await writer.write(chunk); @@ -79,4 +77,4 @@ clientSession.closed.catch((err) => { // ignore the error }); clientSession.close(); -await serverFinished.promise; \ No newline at end of file +await serverFinished.promise; From 8b7bff652e54135a4faa808859ddb5f3e8c64737 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 15:46:00 +0100 Subject: [PATCH 22/84] quic: Fix new test 2 --- test/parallel/test-quic-client-to-server-unidirectional.mjs | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/parallel/test-quic-client-to-server-unidirectional.mjs b/test/parallel/test-quic-client-to-server-unidirectional.mjs index be5eaba1f3cef9..a912a1e885933a 100644 --- a/test/parallel/test-quic-client-to-server-unidirectional.mjs +++ b/test/parallel/test-quic-client-to-server-unidirectional.mjs @@ -34,7 +34,6 @@ const serverEndpoint = await listen(async (serverSession) => { readChunks.push(value); } if (done) break; - readc++; } stream.closed.catch(() => { // ignore @@ -69,7 +68,6 @@ const writer = clientWritable.getWriter(); for (const chunk of KNOWN_BYTES_LONG) { await writer.ready; await writer.write(chunk); - run++; } await writer.ready; await writer.close(); From 0d51a8660a0f3e2dbd09fac8ea6e7dde90ce7be4 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 20:03:24 +0100 Subject: [PATCH 23/84] quic: Fix use after free in next --- src/dataqueue/queue.cc | 44 ++++++++++++++++++++++++++++++++++++++++++ src/dataqueue/queue.h | 8 ++++++++ src/quic/streams.cc | 7 +++++++ src/quic/streams.h | 4 ++++ 4 files changed, 63 insertions(+) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index ce982652ce43ae..f2420719acd261 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -36,6 +36,7 @@ class NonIdempotentDataQueueReader; class EntryImpl : public DataQueue::Entry { public: virtual std::shared_ptr get_reader() = 0; + virtual void clearPendingNext() = 0; }; class DataQueueImpl final : public DataQueue, @@ -159,6 +160,12 @@ class DataQueueImpl final : public DataQueue, return std::nullopt; } + void clearPendingNext() override { + for (const auto& entry : entries_) { + entry->clearPendingNext(); + } + } + void MemoryInfo(node::MemoryTracker* tracker) const override { tracker->TrackField( "entries", entries_, "std::vector>"); @@ -590,6 +597,10 @@ class EmptyEntry final : public EntryImpl { return std::make_unique(); } + void clearPendingNext() override { + // this is a noop for an empty object + } + std::optional size() const override { return 0; } bool is_idempotent() const override { return true; } @@ -702,6 +713,10 @@ class InMemoryEntry final : public EntryImpl { return makeEntry(start, byte_length_ - start); } + void clearPendingNext() override { + // this is a noop for an object, that immediately calls next. + } + std::optional size() const override { return byte_length_; } bool is_idempotent() const override { return true; } @@ -750,6 +765,11 @@ class DataQueueEntry : public EntryImpl { return std::make_unique(std::move(sliced)); } + void clearPendingNext() override { + // this is just calls clearPendingNext of the queue + data_queue_->clearPendingNext(); + } + // Returns the number of bytes represented by this Entry if it is // known. Certain types of entries, such as those backed by streams // might not know the size in advance and therefore cannot provide @@ -868,6 +888,10 @@ class FdEntry final : public EntryImpl { return std::make_unique(env_, path_, stat_, new_start, new_end); } + void clearPendingNext() override { + clearPendingNextImpl(); + } + std::optional size() const override { return end_ - start_; } bool is_idempotent() const override { return true; } @@ -885,6 +909,16 @@ class FdEntry final : public EntryImpl { uint64_t start_ = 0; uint64_t end_ = 0; + class ReaderImpl; + std::unordered_set readers_; + + void clearPendingNextImpl() { + // this should clear all pending pulls from the FD reader + for (ReaderImpl* p : readers_) { + p->ClearAllPendingPulls(); + } + } + bool is_modified(const uv_stat_t& other) { return other.st_size != stat_.st_size || other.st_mtim.tv_nsec != stat_.st_mtim.tv_nsec; @@ -932,12 +966,14 @@ class FdEntry final : public EntryImpl { : env_(handle->env()), handle_(std::move(handle)), entry_(entry) { handle_->PushStreamListener(this); handle_->env()->AddCleanupHook(cleanup, this); + entry_->readers_.insert(this); } ~ReaderImpl() override { handle_->env()->RemoveCleanupHook(cleanup, this); DrainAndClose(); handle_->RemoveStreamListener(this); + entry_->readers_.erase(this); } uv_buf_t OnStreamAlloc(size_t suggested_size) override { @@ -1062,6 +1098,10 @@ class FdEntry final : public EntryImpl { return std::move(pending_pulls_.front()); } + void ClearAllPendingPulls() { + pending_pulls_.clear(); + } + friend class FdEntry; }; @@ -1094,6 +1134,10 @@ class FeederEntry final : public EntryImpl { } bool is_idempotent() const override { return false; } + + void clearPendingNext() override { + if (feeder_) feeder_->clearPendingNext(); + } SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(FeederEntry) diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index dc9871b712753f..6029b4f1cb079e 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -190,6 +190,10 @@ class DataQueue : public MemoryRetainer { // idempotent and cannot preserve that quality, subsequent reads // must fail with an error when a variance is detected. virtual bool is_idempotent() const = 0; + + // calls if required readers to invalidate next function's + // which may hold captures to stream and session objects + virtual void clearPendingNext() = 0; }; // Creates an idempotent DataQueue with a pre-established collection @@ -299,6 +303,10 @@ class DataQueue : public MemoryRetainer { // been set, maybeCapRemaining() will return std::nullopt. virtual std::optional maybeCapRemaining() const = 0; + // calls all entries and readers to invalidate next function's + // which may hold captures to stream and session objects + virtual void clearPendingNext() = 0; + // BackpressureListeners only work on non-idempotent DataQueues. virtual void addBackpressureListener(BackpressureListener* listener) = 0; virtual void removeBackpressureListener(BackpressureListener* listener) = 0; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 15af7b22db5aee..7d1b55fe01f196 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -402,6 +402,13 @@ class Stream::Outbound final : public MemoryRetainer { queue_(std::move(queue)), reader_(queue_->get_reader()) {} + ~Outbound() { + // we need to clear all pending next's from the queue, as it holds pointers + // to Stream and Session, which will be invalidated and may cause use after + // free otherwise + queue_->clearPendingNext(); + } + void Acknowledge(size_t amount) { size_t remaining = std::min(amount, total_ - uncommitted_); while (remaining > 0 && head_ != nullptr) { diff --git a/src/quic/streams.h b/src/quic/streams.h index 4c5ff34f0e5de3..83b96fde8b473d 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -433,6 +433,10 @@ class DataQueueFeeder final : public AsyncWrap { } } + void clearPendingNext() { + pendingPulls_.clear(); + } + struct PendingPull { Next next; explicit PendingPull(Next next) : next(std::move(next)) {} From 79b1673ea34ac3bdf65da5d927d866c0ede82722 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 2 Nov 2025 20:20:58 +0100 Subject: [PATCH 24/84] quic: Fix lint --- src/dataqueue/queue.cc | 14 +++++--------- src/quic/streams.h | 4 +--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index f2420719acd261..db740572ffb2f6 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -161,9 +161,9 @@ class DataQueueImpl final : public DataQueue, } void clearPendingNext() override { - for (const auto& entry : entries_) { + for (const auto& entry : entries_) { entry->clearPendingNext(); - } + } } void MemoryInfo(node::MemoryTracker* tracker) const override { @@ -888,9 +888,7 @@ class FdEntry final : public EntryImpl { return std::make_unique(env_, path_, stat_, new_start, new_end); } - void clearPendingNext() override { - clearPendingNextImpl(); - } + void clearPendingNext() override { clearPendingNextImpl(); } std::optional size() const override { return end_ - start_; } @@ -1098,9 +1096,7 @@ class FdEntry final : public EntryImpl { return std::move(pending_pulls_.front()); } - void ClearAllPendingPulls() { - pending_pulls_.clear(); - } + void ClearAllPendingPulls() { pending_pulls_.clear(); } friend class FdEntry; }; @@ -1134,7 +1130,7 @@ class FeederEntry final : public EntryImpl { } bool is_idempotent() const override { return false; } - + void clearPendingNext() override { if (feeder_) feeder_->clearPendingNext(); } diff --git a/src/quic/streams.h b/src/quic/streams.h index 83b96fde8b473d..a98b5a986db9d4 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -433,9 +433,7 @@ class DataQueueFeeder final : public AsyncWrap { } } - void clearPendingNext() { - pendingPulls_.clear(); - } + void clearPendingNext() { pendingPulls_.clear(); } struct PendingPull { Next next; From eef7b1a21b7d3f798c59e1c6237de1ce87ea5e31 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Mon, 3 Nov 2025 07:42:55 +0100 Subject: [PATCH 25/84] quic: Fix crash if queue_ is nullptr --- src/quic/streams.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 7d1b55fe01f196..3de75a56ff615e 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -406,7 +406,7 @@ class Stream::Outbound final : public MemoryRetainer { // we need to clear all pending next's from the queue, as it holds pointers // to Stream and Session, which will be invalidated and may cause use after // free otherwise - queue_->clearPendingNext(); + if (queue_) queue_->clearPendingNext(); } void Acknowledge(size_t amount) { From 481aaf7e85a6214fafe9e67e5f50f8a45fb85b0f Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 8 Nov 2025 09:31:14 +0100 Subject: [PATCH 26/84] quic: Fix use after free in UpdateDataStats --- src/quic/session.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/quic/session.cc b/src/quic/session.cc index 228f322ea688c4..71f12a8ef27efc 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -2259,6 +2259,10 @@ void Session::ExtendOffset(size_t amount) { void Session::UpdateDataStats() { Debug(this, "Updating data stats"); + if (!impl_) { + Debug(this, "Updating data stats failed, impl gone!"); + return; + } auto& stats_ = impl_->stats_; ngtcp2_conn_info info; ngtcp2_conn_get_conn_info(*this, &info); From b4bff44b5e52c47f2e346c5268c3a47af3322ac6 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sat, 8 Nov 2025 13:39:57 +0100 Subject: [PATCH 27/84] quic: Fix object lifetime in nested calls --- src/quic/streams.cc | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 3de75a56ff615e..cdbaf3b157fa7d 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -526,6 +526,12 @@ class Stream::Outbound final : public MemoryRetainer { // Always make sure next_pending_ is false when we're done. auto on_exit = OnScopeLeave([this] { next_pending_ = false; }); + // We need to hold a reference to stream and session + // so that it can not go away during the next calls. + BaseObjectPtr stream = BaseObjectPtr(stream_); + BaseObjectPtr session = + BaseObjectPtr(&stream_ ->session()); + // The status should never be wait here. DCHECK_NE(status, bob::Status::STATUS_WAIT); @@ -534,7 +540,7 @@ class Stream::Outbound final : public MemoryRetainer { // being asynchronous, our stream is blocking waiting for the data, // but we have an error! oh no! We need to error the stream. if (next_pending_) { - stream_->Destroy( + stream->Destroy( QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR)); // We do not need to worry about calling MarkErrored in this case // since we are immediately destroying the stream which will @@ -553,7 +559,7 @@ class Stream::Outbound final : public MemoryRetainer { // in the uncommitted queue. We'll resume the stream so that the // session will try to read from it again. if (next_pending_) { - stream_->session().ResumeStream(stream_->id()); + session->ResumeStream(stream_->id()); } return; } @@ -577,7 +583,7 @@ class Stream::Outbound final : public MemoryRetainer { // Now that we have data, let's resume the stream so the session will // pull from it again. if (next_pending_) { - stream_->session().ResumeStream(stream_->id()); + stream->session().ResumeStream(stream_->id()); } }, bob::OPTIONS_SYNC, @@ -1406,7 +1412,10 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { static_cast(originalStore->Data()) + typedArray->ByteOffset(); memcpy(backing->Data(), originalData, nread); auto& pending = feeder->pendingPulls_.front(); - auto pop = OnScopeLeave([feeder] { feeder->pendingPulls_.pop_front(); }); + auto pop = OnScopeLeave([feeder] { + if (feeder->pendingPulls_.size() > 0) + feeder->pendingPulls_.pop_front(); + }); DataQueue::Vec vec; vec.base = static_cast(backing->Data()); vec.len = static_cast(nread); From 4d8fce69a32aaa48e275871a7496066224cabc9b Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 9 Nov 2025 15:16:52 +0100 Subject: [PATCH 28/84] quic: Fix race condition in Stream::Outbound::Pull --- src/quic/streams.cc | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index cdbaf3b157fa7d..e07fb27f1df1eb 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -523,8 +523,12 @@ class Stream::Outbound final : public MemoryRetainer { // that the pull is sync but allow for it to be async. int ret = reader_->Pull( [this](auto status, auto vecs, auto count, auto done) { - // Always make sure next_pending_ is false when we're done. - auto on_exit = OnScopeLeave([this] { next_pending_ = false; }); + const bool last_next_pending_state = next_pending_; + next_pending_ = false; + // this ensures that next_pending is reset to false + // Note that for example ResumeStream below, may again call Pull + // so we have to erase next_pending_ to not block this call. + // Therefore the previous OnScopeLeave lead to a race condition. // We need to hold a reference to stream and session // so that it can not go away during the next calls. @@ -536,10 +540,10 @@ class Stream::Outbound final : public MemoryRetainer { DCHECK_NE(status, bob::Status::STATUS_WAIT); if (status < 0) { - // If next_pending_ is true then a pull from the reader ended up + // If next_pending_ was true then a pull from the reader ended up // being asynchronous, our stream is blocking waiting for the data, // but we have an error! oh no! We need to error the stream. - if (next_pending_) { + if (last_next_pending_state) { stream->Destroy( QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR)); // We do not need to worry about calling MarkErrored in this case @@ -558,7 +562,8 @@ class Stream::Outbound final : public MemoryRetainer { // Here, there is no more data to read, but we will might have data // in the uncommitted queue. We'll resume the stream so that the // session will try to read from it again. - if (next_pending_) { + if (last_next_pending_state) { + fprintf(stderr, "next_pending ResumeStream EOS\n"); session->ResumeStream(stream_->id()); } return; @@ -578,11 +583,11 @@ class Stream::Outbound final : public MemoryRetainer { // bytes in the queue. Append(vecs, count, std::move(done)); - // If next_pending_ is true, then a pull from the reader ended up + // If next_pending_ was true, then a pull from the reader ended up // being asynchronous, our stream is blocking waiting for the data. // Now that we have data, let's resume the stream so the session will // pull from it again. - if (next_pending_) { + if (last_next_pending_state) { stream->session().ResumeStream(stream_->id()); } }, From 0edf3869b372d1ee9c652c90bbcbbfc296fb3b40 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 9 Nov 2025 15:21:59 +0100 Subject: [PATCH 29/84] quic: Fix format-cpp --- src/quic/streams.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index e07fb27f1df1eb..92db473379a533 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -534,7 +534,7 @@ class Stream::Outbound final : public MemoryRetainer { // so that it can not go away during the next calls. BaseObjectPtr stream = BaseObjectPtr(stream_); BaseObjectPtr session = - BaseObjectPtr(&stream_ ->session()); + BaseObjectPtr(&stream_->session()); // The status should never be wait here. DCHECK_NE(status, bob::Status::STATUS_WAIT); @@ -544,8 +544,7 @@ class Stream::Outbound final : public MemoryRetainer { // being asynchronous, our stream is blocking waiting for the data, // but we have an error! oh no! We need to error the stream. if (last_next_pending_state) { - stream->Destroy( - QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR)); + stream->Destroy(QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR)); // We do not need to worry about calling MarkErrored in this case // since we are immediately destroying the stream which will // release the outbound buffer anyway. @@ -1418,8 +1417,7 @@ JS_METHOD_IMPL(DataQueueFeeder::Submit) { memcpy(backing->Data(), originalData, nread); auto& pending = feeder->pendingPulls_.front(); auto pop = OnScopeLeave([feeder] { - if (feeder->pendingPulls_.size() > 0) - feeder->pendingPulls_.pop_front(); + if (feeder->pendingPulls_.size() > 0) feeder->pendingPulls_.pop_front(); }); DataQueue::Vec vec; vec.base = static_cast(backing->Data()); From 4d6c88918248894ec89e31c12cb3182717730d5b Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 9 Nov 2025 16:55:18 +0100 Subject: [PATCH 30/84] quic: Remove debug message --- src/quic/streams.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 92db473379a533..be18a40f57c877 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -562,7 +562,6 @@ class Stream::Outbound final : public MemoryRetainer { // in the uncommitted queue. We'll resume the stream so that the // session will try to read from it again. if (last_next_pending_state) { - fprintf(stderr, "next_pending ResumeStream EOS\n"); session->ResumeStream(stream_->id()); } return; From 6e2d4ede3f00ade2a24a3d3027308f0a90ee97c2 Mon Sep 17 00:00:00 2001 From: Marten Richter Date: Sun, 9 Nov 2025 17:01:45 +0100 Subject: [PATCH 31/84] quic: Add ReadableStream to setOutbound (add to docs) --- doc/api/quic.md | 10 ++++++++++ lib/internal/quic/quic.js | 8 ++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/doc/api/quic.md b/doc/api/quic.md index 9ea06e8e8bd1cf..563276e0863d03 100644 --- a/doc/api/quic.md +++ b/doc/api/quic.md @@ -832,6 +832,16 @@ added: v23.8.0 The session that created this stream. Read only. +### `stream.setOutbound(outbound)` + + + +* `outbound` {ArrayBuffer|SharedArrayBuffer|ArrayBufferView|Blob|ReadableStream} + +Set the outbound datasource. + ### `stream.stats`