@@ -198,7 +198,7 @@ class BufferFinalizer : private Finalizer {
198198 ~BufferFinalizer () { env ()->Unref (); }
199199};
200200
201- class ThreadSafeFunction : public node ::AsyncResource {
201+ class ThreadSafeFunction {
202202 public:
203203 ThreadSafeFunction (v8::Local<v8::Function> func,
204204 v8::Local<v8::Object> resource,
@@ -210,11 +210,13 @@ class ThreadSafeFunction : public node::AsyncResource {
210210 void * finalize_data_,
211211 napi_finalize finalize_cb_,
212212 napi_threadsafe_function_call_js call_js_cb_)
213- : AsyncResource(env_->isolate,
214- resource,
215- *v8::String::Utf8Value (env_->isolate, name)),
213+ : async_resource(std::in_place,
214+ env_->isolate,
215+ resource,
216+ *v8::String::Utf8Value (env_->isolate, name)),
216217 thread_count(thread_count_),
217218 is_closing(false ),
219+ is_closed(false ),
218220 dispatch_state(kDispatchIdle ),
219221 context(context_),
220222 max_queue_size(max_queue_size_),
@@ -228,36 +230,38 @@ class ThreadSafeFunction : public node::AsyncResource {
228230 env->Ref ();
229231 }
230232
231- ~ThreadSafeFunction () override {
232- node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
233- env->Unref ();
234- }
233+ ~ThreadSafeFunction () { ReleaseResources (); }
235234
236235 // These methods can be called from any thread.
237236
238237 napi_status Push (void * data, napi_threadsafe_function_call_mode mode) {
239- node::Mutex::ScopedLock lock (this ->mutex );
238+ {
239+ node::Mutex::ScopedLock lock (this ->mutex );
240240
241- while (queue.size () >= max_queue_size && max_queue_size > 0 &&
242- !is_closing) {
243- if (mode == napi_tsfn_nonblocking) {
244- return napi_queue_full;
241+ while (queue.size () >= max_queue_size && max_queue_size > 0 &&
242+ !is_closing) {
243+ if (mode == napi_tsfn_nonblocking) {
244+ return napi_queue_full;
245+ }
246+ cond->Wait (lock);
245247 }
246- cond->Wait (lock);
247- }
248248
249- if (is_closing) {
249+ if (!is_closing) {
250+ queue.push (data);
251+ Send ();
252+ return napi_ok;
253+ }
250254 if (thread_count == 0 ) {
251255 return napi_invalid_arg;
252- } else {
253- thread_count--;
256+ }
257+ thread_count--;
258+ if (!is_closed || thread_count > 0 ) {
254259 return napi_closing;
255260 }
256- } else {
257- queue.push (data);
258- Send ();
259- return napi_ok;
260261 }
262+ // Make sure to release lock before destroying
263+ delete this ;
264+ return napi_closing;
261265 }
262266
263267 napi_status Acquire () {
@@ -273,31 +277,51 @@ class ThreadSafeFunction : public node::AsyncResource {
273277 }
274278
275279 napi_status Release (napi_threadsafe_function_release_mode mode) {
276- node::Mutex::ScopedLock lock (this ->mutex );
280+ {
281+ node::Mutex::ScopedLock lock (this ->mutex );
277282
278- if (thread_count == 0 ) {
279- return napi_invalid_arg;
280- }
283+ if (thread_count == 0 ) {
284+ return napi_invalid_arg;
285+ }
281286
282- thread_count--;
287+ thread_count--;
283288
284- if (thread_count == 0 || mode == napi_tsfn_abort) {
285- if (!is_closing) {
286- is_closing = (mode == napi_tsfn_abort);
287- if (is_closing && max_queue_size > 0 ) {
288- cond->Signal (lock);
289+ if (thread_count == 0 || mode == napi_tsfn_abort) {
290+ if (!is_closing) {
291+ is_closing = (mode == napi_tsfn_abort);
292+ if (is_closing && max_queue_size > 0 ) {
293+ cond->Signal (lock);
294+ }
295+ Send ();
289296 }
290- Send ();
291297 }
292- }
293298
299+ if (!is_closed || thread_count > 0 ) {
300+ return napi_ok;
301+ }
302+ }
303+ // Make sure to release lock before destroying
304+ delete this ;
294305 return napi_ok;
295306 }
296307
297- void EmptyQueueAndDelete () {
298- for (; !queue.empty (); queue.pop ()) {
299- call_js_cb (nullptr , nullptr , context, queue.front ());
308+ void EmptyQueueAndMaybeDelete () {
309+ {
310+ node::Mutex::ScopedLock lock (this ->mutex );
311+ for (; !queue.empty (); queue.pop ()) {
312+ call_js_cb (nullptr , nullptr , context, queue.front ());
313+ }
314+ if (thread_count > 0 ) {
315+ // At this point this TSFN is effectively done, but we need to keep
316+ // it alive for other threads that still have pointers to it until
317+ // they release them.
318+ // But we already release all the resources that we can at this point
319+ queue = {};
320+ ReleaseResources ();
321+ return ;
322+ }
300323 }
324+ // Make sure to release lock before destroying
301325 delete this ;
302326 }
303327
@@ -349,6 +373,16 @@ class ThreadSafeFunction : public node::AsyncResource {
349373 inline void * Context () { return context; }
350374
351375 protected:
376+ void ReleaseResources () {
377+ if (!is_closed) {
378+ is_closed = true ;
379+ ref.Reset ();
380+ node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
381+ env->Unref ();
382+ async_resource.reset ();
383+ }
384+ }
385+
352386 void Dispatch () {
353387 bool has_more = true ;
354388
@@ -407,7 +441,7 @@ class ThreadSafeFunction : public node::AsyncResource {
407441
408442 if (popped_value) {
409443 v8::HandleScope scope (env->isolate );
410- CallbackScope cb_scope (this );
444+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
411445 napi_value js_callback = nullptr ;
412446 if (!ref.IsEmpty ()) {
413447 v8::Local<v8::Function> js_cb =
@@ -424,10 +458,10 @@ class ThreadSafeFunction : public node::AsyncResource {
424458 void Finalize () {
425459 v8::HandleScope scope (env->isolate );
426460 if (finalize_cb) {
427- CallbackScope cb_scope (this );
461+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
428462 env->CallFinalizer <false >(finalize_cb, finalize_data, context);
429463 }
430- EmptyQueueAndDelete ();
464+ EmptyQueueAndMaybeDelete ();
431465 }
432466
433467 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
@@ -499,19 +533,29 @@ class ThreadSafeFunction : public node::AsyncResource {
499533 }
500534
501535 private:
536+ // Needed because node::AsyncResource::CallbackScope is protected
537+ class AsyncResource : public node ::AsyncResource {
538+ public:
539+ using node::AsyncResource::AsyncResource;
540+ using node::AsyncResource::CallbackScope;
541+ };
542+
502543 static const unsigned char kDispatchIdle = 0 ;
503544 static const unsigned char kDispatchRunning = 1 << 0 ;
504545 static const unsigned char kDispatchPending = 1 << 1 ;
505546
506547 static const unsigned int kMaxIterationCount = 1000 ;
507548
549+ std::optional<AsyncResource> async_resource;
550+
508551 // These are variables protected by the mutex.
509552 node::Mutex mutex;
510553 std::unique_ptr<node::ConditionVariable> cond;
511554 std::queue<void *> queue;
512555 uv_async_t async;
513556 size_t thread_count;
514557 bool is_closing;
558+ bool is_closed;
515559 std::atomic_uchar dispatch_state;
516560
517561 // These are variables set once, upon creation, and then never again, which
0 commit comments