Skip to content

Commit b45c815

Browse files
committed
node-api: fix data race and use-after-free in napi_threadsafe_function
Other threads can still hold a valid handle to the tsfn after finalization if finalization was triggered by - release with napi_tsfn_abort, or - environment shutdown Handle this by: - protecting finalization itself with the mutex - if necessary, delay deletion after finalization to when thread_count drops to 0 - releasing all resources as soon as possible before deletion Fixes: #55706
1 parent def4c28 commit b45c815

File tree

1 file changed

+84
-40
lines changed

1 file changed

+84
-40
lines changed

src/node_api.cc

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ inline napi_env NewEnv(v8::Local<v8::Context> context,
196196
return result;
197197
}
198198

199-
class ThreadSafeFunction : public node::AsyncResource {
199+
class ThreadSafeFunction {
200200
public:
201201
ThreadSafeFunction(v8::Local<v8::Function> func,
202202
v8::Local<v8::Object> resource,
@@ -208,11 +208,13 @@ class ThreadSafeFunction : public node::AsyncResource {
208208
void* finalize_data_,
209209
napi_finalize finalize_cb_,
210210
napi_threadsafe_function_call_js call_js_cb_)
211-
: AsyncResource(env_->isolate,
212-
resource,
213-
*v8::String::Utf8Value(env_->isolate, name)),
211+
: async_resource(std::in_place,
212+
env_->isolate,
213+
resource,
214+
*v8::String::Utf8Value(env_->isolate, name)),
214215
thread_count(thread_count_),
215216
is_closing(false),
217+
is_closed(false),
216218
dispatch_state(kDispatchIdle),
217219
context(context_),
218220
max_queue_size(max_queue_size_),
@@ -226,36 +228,38 @@ class ThreadSafeFunction : public node::AsyncResource {
226228
env->Ref();
227229
}
228230

229-
~ThreadSafeFunction() override {
230-
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
231-
env->Unref();
232-
}
231+
~ThreadSafeFunction() { ReleaseResources(); }
233232

234233
// These methods can be called from any thread.
235234

236235
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
237-
node::Mutex::ScopedLock lock(this->mutex);
236+
{
237+
node::Mutex::ScopedLock lock(this->mutex);
238238

239-
while (queue.size() >= max_queue_size && max_queue_size > 0 &&
240-
!is_closing) {
241-
if (mode == napi_tsfn_nonblocking) {
242-
return napi_queue_full;
239+
while (queue.size() >= max_queue_size && max_queue_size > 0 &&
240+
!is_closing) {
241+
if (mode == napi_tsfn_nonblocking) {
242+
return napi_queue_full;
243+
}
244+
cond->Wait(lock);
243245
}
244-
cond->Wait(lock);
245-
}
246246

247-
if (is_closing) {
247+
if (!is_closing) {
248+
queue.push(data);
249+
Send();
250+
return napi_ok;
251+
}
248252
if (thread_count == 0) {
249253
return napi_invalid_arg;
250-
} else {
251-
thread_count--;
254+
}
255+
thread_count--;
256+
if (!is_closed || thread_count > 0) {
252257
return napi_closing;
253258
}
254-
} else {
255-
queue.push(data);
256-
Send();
257-
return napi_ok;
258259
}
260+
// Make sure to release lock before destroying
261+
delete this;
262+
return napi_closing;
259263
}
260264

261265
napi_status Acquire() {
@@ -271,31 +275,51 @@ class ThreadSafeFunction : public node::AsyncResource {
271275
}
272276

273277
napi_status Release(napi_threadsafe_function_release_mode mode) {
274-
node::Mutex::ScopedLock lock(this->mutex);
278+
{
279+
node::Mutex::ScopedLock lock(this->mutex);
275280

276-
if (thread_count == 0) {
277-
return napi_invalid_arg;
278-
}
281+
if (thread_count == 0) {
282+
return napi_invalid_arg;
283+
}
279284

280-
thread_count--;
285+
thread_count--;
281286

282-
if (thread_count == 0 || mode == napi_tsfn_abort) {
283-
if (!is_closing) {
284-
is_closing = (mode == napi_tsfn_abort);
285-
if (is_closing && max_queue_size > 0) {
286-
cond->Signal(lock);
287+
if (thread_count == 0 || mode == napi_tsfn_abort) {
288+
if (!is_closing) {
289+
is_closing = (mode == napi_tsfn_abort);
290+
if (is_closing && max_queue_size > 0) {
291+
cond->Signal(lock);
292+
}
293+
Send();
287294
}
288-
Send();
289295
}
290-
}
291296

297+
if (!is_closed || thread_count > 0) {
298+
return napi_ok;
299+
}
300+
}
301+
// Make sure to release lock before destroying
302+
delete this;
292303
return napi_ok;
293304
}
294305

295-
void EmptyQueueAndDelete() {
296-
for (; !queue.empty(); queue.pop()) {
297-
call_js_cb(nullptr, nullptr, context, queue.front());
306+
void EmptyQueueAndMaybeDelete() {
307+
{
308+
node::Mutex::ScopedLock lock(this->mutex);
309+
for (; !queue.empty(); queue.pop()) {
310+
call_js_cb(nullptr, nullptr, context, queue.front());
311+
}
312+
if (thread_count > 0) {
313+
// At this point this TSFN is effectively done, but we need to keep
314+
// it alive for other threads that still have pointers to it until
315+
// they release them.
316+
// But we already release all the resources that we can at this point
317+
queue = {};
318+
ReleaseResources();
319+
return;
320+
}
298321
}
322+
// Make sure to release lock before destroying
299323
delete this;
300324
}
301325

@@ -347,6 +371,16 @@ class ThreadSafeFunction : public node::AsyncResource {
347371
inline void* Context() { return context; }
348372

349373
protected:
374+
void ReleaseResources() {
375+
if (!is_closed) {
376+
is_closed = true;
377+
ref.Reset();
378+
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
379+
env->Unref();
380+
async_resource.reset();
381+
}
382+
}
383+
350384
void Dispatch() {
351385
bool has_more = true;
352386

@@ -405,7 +439,7 @@ class ThreadSafeFunction : public node::AsyncResource {
405439

406440
if (popped_value) {
407441
v8::HandleScope scope(env->isolate);
408-
CallbackScope cb_scope(this);
442+
AsyncResource::CallbackScope cb_scope(&*async_resource);
409443
napi_value js_callback = nullptr;
410444
if (!ref.IsEmpty()) {
411445
v8::Local<v8::Function> js_cb =
@@ -422,10 +456,10 @@ class ThreadSafeFunction : public node::AsyncResource {
422456
void Finalize() {
423457
v8::HandleScope scope(env->isolate);
424458
if (finalize_cb) {
425-
CallbackScope cb_scope(this);
459+
AsyncResource::CallbackScope cb_scope(&*async_resource);
426460
env->CallFinalizer<false>(finalize_cb, finalize_data, context);
427461
}
428-
EmptyQueueAndDelete();
462+
EmptyQueueAndMaybeDelete();
429463
}
430464

431465
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
@@ -497,19 +531,29 @@ class ThreadSafeFunction : public node::AsyncResource {
497531
}
498532

499533
private:
534+
// Needed because node::AsyncResource::CallbackScope is protected
535+
class AsyncResource : public node::AsyncResource {
536+
public:
537+
using node::AsyncResource::AsyncResource;
538+
using node::AsyncResource::CallbackScope;
539+
};
540+
500541
static const unsigned char kDispatchIdle = 0;
501542
static const unsigned char kDispatchRunning = 1 << 0;
502543
static const unsigned char kDispatchPending = 1 << 1;
503544

504545
static const unsigned int kMaxIterationCount = 1000;
505546

547+
std::optional<AsyncResource> async_resource;
548+
506549
// These are variables protected by the mutex.
507550
node::Mutex mutex;
508551
std::unique_ptr<node::ConditionVariable> cond;
509552
std::queue<void*> queue;
510553
uv_async_t async;
511554
size_t thread_count;
512555
bool is_closing;
556+
bool is_closed;
513557
std::atomic_uchar dispatch_state;
514558

515559
// These are variables set once, upon creation, and then never again, which

0 commit comments

Comments
 (0)