Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/api/dgram.md
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,8 @@ changes:
specified by the NAT.
* `sendBlockList` {net.BlockList} `sendBlockList` can be used for disabling outbound
access to specific IP addresses, IP ranges, or IP subnets.
* `msgCount` {integer} `msgCount` can be used to receive multiple messages at once via `recvmmsg`.
**Default:** `0`, disabled.
* `callback` {Function} Attached as a listener for `'message'` events. Optional.
* Returns: {dgram.Socket}

Expand Down
10 changes: 8 additions & 2 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ function Socket(type, listener) {
let sendBufferSize;
let receiveBlockList;
let sendBlockList;

let msgCount;
let options;
if (type !== null && typeof type === 'object') {
options = type;
Expand All @@ -138,9 +138,13 @@ function Socket(type, listener) {
}
sendBlockList = options.sendBlockList;
}
if (options.msgCount !== undefined) {
validateUint32(options.msgCount, 'options.msgCount');
msgCount = options.msgCount;
}
}

const handle = newHandle(type, lookup);
const handle = newHandle(type, lookup, msgCount);
handle[owner_symbol] = this;

this[async_id_symbol] = handle.getAsyncId();
Expand All @@ -162,6 +166,7 @@ function Socket(type, listener) {
sendBufferSize,
receiveBlockList,
sendBlockList,
msgCount,
};

if (options?.signal !== undefined) {
Expand Down Expand Up @@ -380,6 +385,7 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
}

if (cluster.isWorker && !exclusive) {
// TODO(theanarkh): support recvmmsg
bindServerHandle(this, {
address: ip,
port: port,
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function lookup6(lookup, address, callback) {
return lookup(address || '::1', 6, callback);
}

function newHandle(type, lookup) {
function newHandle(type, lookup, mmsgCount) {
if (lookup === undefined) {
if (dns === undefined) {
dns = require('dns');
Expand All @@ -40,14 +40,14 @@ function newHandle(type, lookup) {
}

if (type === 'udp4') {
const handle = new UDP();
const handle = new UDP(mmsgCount);

handle.lookup = FunctionPrototypeBind(lookup4, handle, lookup);
return handle;
}

if (type === 'udp6') {
const handle = new UDP();
const handle = new UDP(mmsgCount);

handle.lookup = FunctionPrototypeBind(lookup6, handle, lookup);
handle.bind = handle.bind6;
Expand Down
59 changes: 52 additions & 7 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,34 @@ void UDPWrapBase::RegisterExternalReferences(
registry->Register(RecvStop);
}

UDPWrap::UDPWrap(Environment* env, Local<Object> object)
UDPWrap::UDPWrap(Environment* env, Local<Object> object, uint32_t msg_count)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_UDPWRAP) {
AsyncWrap::PROVIDER_UDPWRAP),
msg_count_(msg_count),
mmsg_buf_(uv_buf_init(nullptr, 0)) {
object->SetAlignedPointerInInternalField(
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));

int r = uv_udp_init(env->event_loop(), &handle_);
int r;
if (msg_count > 0) {
r = uv_udp_init_ex(
env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);
} else {
r = uv_udp_init(env->event_loop(), &handle_);
}
CHECK_EQ(r, 0); // can't fail anyway

set_listener(this);
}

UDPWrap::~UDPWrap() {
// Libuv does not release the memory of memory which allocated
// by handle->alloc_cb when we call close in handle->read_cb,
// so we should release the memory here if necessary.
if (using_recvmmsg()) {
release_buf();
}
}

void UDPWrap::Initialize(Local<Object> target,
Local<Value> unused,
Expand Down Expand Up @@ -270,8 +284,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {

void UDPWrap::New(const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
uint32_t msg_count = 0;
if (args[0]->IsUint32()) {
msg_count = args[0].As<Uint32>()->Value();
}
Environment* env = Environment::GetCurrent(args);
new UDPWrap(env, args.This());
new UDPWrap(env, args.This(), msg_count);
}


Expand Down Expand Up @@ -741,6 +759,19 @@ void UDPWrap::OnAlloc(uv_handle_t* handle,
}

uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
if (using_recvmmsg()) {
CHECK(mmsg_buf_.base == nullptr);
if (msg_count_ > SIZE_MAX / suggested_size) {
return mmsg_buf_;
}
suggested_size *= msg_count_;
void* base = malloc(suggested_size);
if (base == nullptr) {
return mmsg_buf_;
}
mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(base), suggested_size);
return mmsg_buf_;
}
return env()->allocate_managed_buffer(suggested_size);
}

Expand All @@ -759,7 +790,17 @@ void UDPWrap::OnRecv(ssize_t nread,
unsigned int flags) {
Environment* env = this->env();
Isolate* isolate = env->isolate();
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
std::unique_ptr<BackingStore> bs;

auto cleanup = OnScopeLeave([&]() {
if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
release_buf();
}
});

if (!using_recvmmsg()) {
bs = env->release_managed_buffer(buf_);
}
if (nread == 0 && addr == nullptr) {
return;
}
Expand All @@ -778,6 +819,10 @@ void UDPWrap::OnRecv(ssize_t nread,
return;
} else if (nread == 0) {
bs = ArrayBuffer::NewBackingStore(isolate, 0);
} else if (using_recvmmsg()) {
bs = ArrayBuffer::NewBackingStore(
isolate, nread, BackingStoreInitializationMode::kUninitialized);
memcpy(bs->Data(), buf_.base, nread);
} else if (static_cast<size_t>(nread) != bs->ByteLength()) {
CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
std::unique_ptr<BackingStore> old_bs = std::move(bs);
Expand Down
22 changes: 19 additions & 3 deletions src/udp_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class UDPWrap final : public HandleWrap,
const uv_buf_t& buf,
const sockaddr* addr,
unsigned int flags) override;
bool using_recvmmsg() {
return uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_));
}
void release_buf() {
if (mmsg_buf_.base != nullptr) {
free(mmsg_buf_.base);
mmsg_buf_ = uv_buf_init(nullptr, 0);
}
}
ReqWrap<uv_udp_send_t>* CreateSendWrap(size_t msg_size) override;
void OnSendDone(ReqWrap<uv_udp_send_t>* wrap, int status) override;

Expand All @@ -178,7 +187,11 @@ class UDPWrap final : public HandleWrap,
static v8::MaybeLocal<v8::Object> Instantiate(Environment* env,
AsyncWrap* parent,
SocketType type);
SET_NO_MEMORY_INFO()
void MemoryInfo(MemoryTracker* tracker) const override {
if (mmsg_buf_.base != nullptr) {
tracker->TrackFieldWithSize("mmsg_buf", mmsg_buf_.len);
}
}
SET_MEMORY_INFO_NAME(UDPWrap)
SET_SELF_SIZE(UDPWrap)

Expand All @@ -189,7 +202,9 @@ class UDPWrap final : public HandleWrap,
int (*F)(const typename T::HandleType*, sockaddr*, int*)>
friend void GetSockOrPeerName(const v8::FunctionCallbackInfo<v8::Value>&);

UDPWrap(Environment* env, v8::Local<v8::Object> object);
UDPWrap(Environment* env, v8::Local<v8::Object> object, uint32_t msg_count);

~UDPWrap();

static void DoBind(const v8::FunctionCallbackInfo<v8::Value>& args,
int family);
Expand All @@ -213,7 +228,8 @@ class UDPWrap final : public HandleWrap,
unsigned int flags);

uv_udp_t handle_;

uint32_t msg_count_;
uv_buf_t mmsg_buf_;
bool current_send_has_callback_;
v8::Local<v8::Object> current_send_req_wrap_;
};
Expand Down
30 changes: 30 additions & 0 deletions test/parallel/test-dgram-udp-recvmmsg-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const dgram = require('dgram');
const message_to_send = 'A message to send';

const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
const client = dgram.createSocket({ type: 'udp4' });

client.on('close', common.mustCall());
server.on('close', common.mustCall());

server.on('message', common.mustCall((msg) => {
assert.strictEqual(msg.toString(), message_to_send.toString());
// The server will release the memory which allocated by handle->alloc_cb
server.close();
client.close();
}));

server.on('listening', common.mustCall(() => {
for (let i = 0; i < 2; i++) {
client.send(message_to_send,
0,
message_to_send.length,
server.address().port,
server.address().address);
}
}));

server.bind(0);
61 changes: 61 additions & 0 deletions test/parallel/test-dgram-udp-recvmmsg.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const dgram = require('dgram');
const message_to_send = 'A message to send';

[
-1,
1.1,
NaN,
undefined,
{},
[],
null,
function() {},
Symbol(),
true,
Infinity,
].forEach((msgCount) => {
try {
dgram.createSocket({ type: 'udp4', msgCount });
} catch (e) {
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
}
});

[
0,
1,
].forEach((msgCount) => {
const socket = dgram.createSocket({ type: 'udp4', msgCount });
socket.close();
});

const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
const client = dgram.createSocket({ type: 'udp4' });

client.on('close', common.mustCall());
server.on('close', common.mustCall());

let done = 0;
const count = 2;
server.on('message', common.mustCall((msg) => {
assert.strictEqual(msg.toString(), message_to_send.toString());
if (++done === count) {
client.close();
server.close();
}
}, count));

server.on('listening', common.mustCall(() => {
for (let i = 0; i < count; i++) {
client.send(message_to_send,
0,
message_to_send.length,
server.address().port,
server.address().address);
}
}));

server.bind(0);
Loading