@@ -196,7 +196,7 @@ inline napi_env NewEnv(v8::Local<v8::Context> context,
196196 return result;
197197}
198198
199- class ThreadSafeFunction : public node ::AsyncResource {
199+ class ThreadSafeFunction {
200200 public:
201201 ThreadSafeFunction (v8::Local<v8::Function> func,
202202 v8::Local<v8::Object> resource,
@@ -208,11 +208,13 @@ class ThreadSafeFunction : public node::AsyncResource {
208208 void * finalize_data_,
209209 napi_finalize finalize_cb_,
210210 napi_threadsafe_function_call_js call_js_cb_)
211- : AsyncResource(env_->isolate,
212- resource,
213- *v8::String::Utf8Value (env_->isolate, name)),
211+ : async_resource(std::in_place,
212+ env_->isolate,
213+ resource,
214+ *v8::String::Utf8Value (env_->isolate, name)),
214215 thread_count(thread_count_),
215216 is_closing(false ),
217+ is_closed(false ),
216218 dispatch_state(kDispatchIdle ),
217219 context(context_),
218220 max_queue_size(max_queue_size_),
@@ -226,36 +228,38 @@ class ThreadSafeFunction : public node::AsyncResource {
226228 env->Ref ();
227229 }
228230
229- ~ThreadSafeFunction () override {
230- node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
231- env->Unref ();
232- }
231+ ~ThreadSafeFunction () { ReleaseResources (); }
233232
234233 // These methods can be called from any thread.
235234
236235 napi_status Push (void * data, napi_threadsafe_function_call_mode mode) {
237- node::Mutex::ScopedLock lock (this ->mutex );
236+ {
237+ node::Mutex::ScopedLock lock (this ->mutex );
238238
239- while (queue.size () >= max_queue_size && max_queue_size > 0 &&
240- !is_closing) {
241- if (mode == napi_tsfn_nonblocking) {
242- return napi_queue_full;
239+ while (queue.size () >= max_queue_size && max_queue_size > 0 &&
240+ !is_closing) {
241+ if (mode == napi_tsfn_nonblocking) {
242+ return napi_queue_full;
243+ }
244+ cond->Wait (lock);
243245 }
244- cond->Wait (lock);
245- }
246246
247- if (is_closing) {
247+ if (!is_closing) {
248+ queue.push (data);
249+ Send ();
250+ return napi_ok;
251+ }
248252 if (thread_count == 0 ) {
249253 return napi_invalid_arg;
250- } else {
251- thread_count--;
254+ }
255+ thread_count--;
256+ if (!is_closed || thread_count > 0 ) {
252257 return napi_closing;
253258 }
254- } else {
255- queue.push (data);
256- Send ();
257- return napi_ok;
258259 }
260+ // Make sure to release lock before destroying
261+ delete this ;
262+ return napi_closing;
259263 }
260264
261265 napi_status Acquire () {
@@ -271,31 +275,51 @@ class ThreadSafeFunction : public node::AsyncResource {
271275 }
272276
273277 napi_status Release (napi_threadsafe_function_release_mode mode) {
274- node::Mutex::ScopedLock lock (this ->mutex );
278+ {
279+ node::Mutex::ScopedLock lock (this ->mutex );
275280
276- if (thread_count == 0 ) {
277- return napi_invalid_arg;
278- }
281+ if (thread_count == 0 ) {
282+ return napi_invalid_arg;
283+ }
279284
280- thread_count--;
285+ thread_count--;
281286
282- if (thread_count == 0 || mode == napi_tsfn_abort) {
283- if (!is_closing) {
284- is_closing = (mode == napi_tsfn_abort);
285- if (is_closing && max_queue_size > 0 ) {
286- cond->Signal (lock);
287+ if (thread_count == 0 || mode == napi_tsfn_abort) {
288+ if (!is_closing) {
289+ is_closing = (mode == napi_tsfn_abort);
290+ if (is_closing && max_queue_size > 0 ) {
291+ cond->Signal (lock);
292+ }
293+ Send ();
287294 }
288- Send ();
289295 }
290- }
291296
297+ if (!is_closed || thread_count > 0 ) {
298+ return napi_ok;
299+ }
300+ }
301+ // Make sure to release lock before destroying
302+ delete this ;
292303 return napi_ok;
293304 }
294305
295- void EmptyQueueAndDelete () {
296- for (; !queue.empty (); queue.pop ()) {
297- call_js_cb (nullptr , nullptr , context, queue.front ());
306+ void EmptyQueueAndMaybeDelete () {
307+ {
308+ node::Mutex::ScopedLock lock (this ->mutex );
309+ for (; !queue.empty (); queue.pop ()) {
310+ call_js_cb (nullptr , nullptr , context, queue.front ());
311+ }
312+ if (thread_count > 0 ) {
313+ // At this point this TSFN is effectively done, but we need to keep
314+ // it alive for other threads that still have pointers to it until
315+ // they release them.
316+ // But we already release all the resources that we can at this point
317+ queue = {};
318+ ReleaseResources ();
319+ return ;
320+ }
298321 }
322+ // Make sure to release lock before destroying
299323 delete this ;
300324 }
301325
@@ -347,6 +371,16 @@ class ThreadSafeFunction : public node::AsyncResource {
347371 inline void * Context () { return context; }
348372
349373 protected:
374+ void ReleaseResources () {
375+ if (!is_closed) {
376+ is_closed = true ;
377+ ref.Reset ();
378+ node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
379+ env->Unref ();
380+ async_resource.reset ();
381+ }
382+ }
383+
350384 void Dispatch () {
351385 bool has_more = true ;
352386
@@ -405,7 +439,7 @@ class ThreadSafeFunction : public node::AsyncResource {
405439
406440 if (popped_value) {
407441 v8::HandleScope scope (env->isolate );
408- CallbackScope cb_scope (this );
442+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
409443 napi_value js_callback = nullptr ;
410444 if (!ref.IsEmpty ()) {
411445 v8::Local<v8::Function> js_cb =
@@ -422,10 +456,10 @@ class ThreadSafeFunction : public node::AsyncResource {
422456 void Finalize () {
423457 v8::HandleScope scope (env->isolate );
424458 if (finalize_cb) {
425- CallbackScope cb_scope (this );
459+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
426460 env->CallFinalizer <false >(finalize_cb, finalize_data, context);
427461 }
428- EmptyQueueAndDelete ();
462+ EmptyQueueAndMaybeDelete ();
429463 }
430464
431465 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
@@ -497,19 +531,29 @@ class ThreadSafeFunction : public node::AsyncResource {
497531 }
498532
499533 private:
534+ // Needed because node::AsyncResource::CallbackScope is protected
535+ class AsyncResource : public node ::AsyncResource {
536+ public:
537+ using node::AsyncResource::AsyncResource;
538+ using node::AsyncResource::CallbackScope;
539+ };
540+
500541 static const unsigned char kDispatchIdle = 0 ;
501542 static const unsigned char kDispatchRunning = 1 << 0 ;
502543 static const unsigned char kDispatchPending = 1 << 1 ;
503544
504545 static const unsigned int kMaxIterationCount = 1000 ;
505546
547+ std::optional<AsyncResource> async_resource;
548+
506549 // These are variables protected by the mutex.
507550 node::Mutex mutex;
508551 std::unique_ptr<node::ConditionVariable> cond;
509552 std::queue<void *> queue;
510553 uv_async_t async;
511554 size_t thread_count;
512555 bool is_closing;
556+ bool is_closed;
513557 std::atomic_uchar dispatch_state;
514558
515559 // These are variables set once, upon creation, and then never again, which
0 commit comments