1212#include " threadpoolwork-inl.h"
1313#include " util-inl.h"
1414
15+ #include < atomic>
1516#include < cinttypes>
1617
1718namespace node {
@@ -116,12 +117,65 @@ using v8::Value;
116117 } \
117118 } while (0 )
118119
120+ class AbortError {
121+ public:
122+ static MaybeLocal<Object> New (
123+ Isolate* isolate,
124+ std::string_view message = " The operation was aborted" ,
125+ Local<Value> cause = Local<Value>()) {
126+ Local<String> js_msg;
127+ Local<Object> error_obj;
128+ Local<Context> context = isolate->GetCurrentContext ();
129+ Environment* env = Environment::GetCurrent (isolate);
130+
131+ if (!String::NewFromUtf8 (isolate,
132+ message.data (),
133+ NewStringType::kNormal ,
134+ static_cast <int >(message.size ()))
135+ .ToLocal (&js_msg) ||
136+ !Exception::Error (js_msg)->ToObject (context).ToLocal (&error_obj)) {
137+ return MaybeLocal<Object>();
138+ }
139+
140+ Local<String> error_name;
141+ if (!String::NewFromUtf8 (isolate, " AbortError" ).ToLocal (&error_name)) {
142+ return MaybeLocal<Object>();
143+ }
144+ if (error_obj->Set (context, env->name_string (), error_name).IsNothing ()) {
145+ return MaybeLocal<Object>();
146+ }
147+
148+ Local<String> code_key;
149+ Local<String> code_value;
150+ if (!String::NewFromUtf8 (isolate, " code" ).ToLocal (&code_key) ||
151+ !String::NewFromUtf8 (isolate, " ABORT_ERR" ).ToLocal (&code_value)) {
152+ return MaybeLocal<Object>();
153+ }
154+ if (error_obj->Set (context, code_key, code_value).IsNothing ()) {
155+ return MaybeLocal<Object>();
156+ }
157+
158+ if (!cause.IsEmpty () && !cause->IsUndefined ()) {
159+ if (error_obj->Set (context, env->cause_string (), cause).IsNothing ()) {
160+ return MaybeLocal<Object>();
161+ }
162+ }
163+
164+ return error_obj;
165+ }
166+ };
167+
119168inline MaybeLocal<Object> CreateSQLiteError (Isolate* isolate,
120- const char * message) {
169+ std::string_view message) {
121170 Local<String> js_msg;
122171 Local<Object> e;
123172 Environment* env = Environment::GetCurrent (isolate);
124- if (!String::NewFromUtf8 (isolate, message).ToLocal (&js_msg) ||
173+
174+ if (!String::NewFromUtf8 (isolate,
175+ message.data (),
176+ NewStringType::kNormal ,
177+ static_cast <int >(message.size ()))
178+ .ToLocal (&js_msg) ||
125179 !Exception::Error (js_msg)
126180 ->ToObject (isolate->GetCurrentContext ())
127181 .ToLocal (&e) ||
@@ -131,6 +185,7 @@ inline MaybeLocal<Object> CreateSQLiteError(Isolate* isolate,
131185 .IsNothing ()) {
132186 return MaybeLocal<Object>();
133187 }
188+
134189 return e;
135190}
136191
@@ -433,16 +488,21 @@ class BackupJob : public ThreadPoolWork {
433488 std::string destination_name,
434489 std::string dest_db,
435490 int pages,
436- Local<Function> progressFunc)
491+ Local<Function> progress_func,
492+ Local<Object> abort_signal = Local<Object>())
437493 : ThreadPoolWork(env, " node_sqlite3.BackupJob" ),
438494 env_(env),
439495 source_(source),
440496 pages_(pages),
441497 source_db_(std::move(source_db)),
442498 destination_name_(std::move(destination_name)),
443- dest_db_(std::move(dest_db)) {
499+ dest_db_(std::move(dest_db)),
500+ is_aborted_(false ) {
444501 resolver_.Reset (env->isolate (), resolver);
445- progressFunc_.Reset (env->isolate (), progressFunc);
502+ progress_func_.Reset (env->isolate (), progress_func);
503+ if (!abort_signal.IsEmpty ()) {
504+ abort_signal_.Reset (env->isolate (), abort_signal);
505+ }
446506 }
447507
448508 void ScheduleBackup () {
@@ -471,6 +531,10 @@ class BackupJob : public ThreadPoolWork {
471531 }
472532
473533 void DoThreadPoolWork () override {
534+ if (is_aborted_.load (std::memory_order_acquire)) {
535+ backup_status_ = SQLITE_INTERRUPT;
536+ return ;
537+ }
474538 backup_status_ = sqlite3_backup_step (backup_, pages_);
475539 }
476540
@@ -479,6 +543,12 @@ class BackupJob : public ThreadPoolWork {
479543 Local<Promise::Resolver> resolver =
480544 Local<Promise::Resolver>::New (env ()->isolate (), resolver_);
481545
546+ if (is_aborted_.load (std::memory_order_acquire) ||
547+ backup_status_ == SQLITE_INTERRUPT) {
548+ HandleAbortError (resolver);
549+ return ;
550+ }
551+
482552 if (!(backup_status_ == SQLITE_OK || backup_status_ == SQLITE_DONE ||
483553 backup_status_ == SQLITE_BUSY || backup_status_ == SQLITE_LOCKED)) {
484554 HandleBackupError (resolver, backup_status_);
@@ -489,7 +559,7 @@ class BackupJob : public ThreadPoolWork {
489559 int remaining_pages = sqlite3_backup_remaining (backup_);
490560 if (remaining_pages != 0 ) {
491561 Local<Function> fn =
492- Local<Function>::New (env ()->isolate (), progressFunc_ );
562+ Local<Function>::New (env ()->isolate (), progress_func_ );
493563 if (!fn.IsEmpty ()) {
494564 Local<Object> progress_info = Object::New (env ()->isolate ());
495565 if (progress_info
@@ -515,6 +585,14 @@ class BackupJob : public ThreadPoolWork {
515585 return ;
516586 }
517587 }
588+ if (CheckAbortSignal ()) {
589+ // TODO(@lluisemper): BackupJob does not implement proper async context
590+ // tracking yet.
591+ // Consider inheriting from AsyncWrap and using CallbackScope to
592+ // propagate async context, similar to other ThreadPoolWork items.
593+ HandleAbortError (resolver);
594+ return ;
595+ }
518596
519597 // There's still work to do
520598 this ->ScheduleWork ();
@@ -548,6 +626,10 @@ class BackupJob : public ThreadPoolWork {
548626 sqlite3_close_v2 (dest_);
549627 dest_ = nullptr ;
550628 }
629+
630+ if (!abort_signal_.IsEmpty ()) {
631+ abort_signal_.Reset ();
632+ }
551633 }
552634
553635 private:
@@ -573,19 +655,73 @@ class BackupJob : public ThreadPoolWork {
573655 resolver->Reject (env ()->context (), e).ToChecked ();
574656 }
575657
658+ inline MaybeLocal<Object> CreateAbortError (
659+ Isolate* isolate,
660+ std::string_view message = " The operation was aborted" ) {
661+ Environment* env = Environment::GetCurrent (isolate);
662+ HandleScope scope (isolate);
663+ Local<Value> cause;
664+
665+ if (!abort_signal_.IsEmpty ()) {
666+ Local<Object> signal = abort_signal_.Get (isolate);
667+ Local<String> reason_key = env->reason_string ();
668+
669+ if (!signal->Get (isolate->GetCurrentContext (), reason_key)
670+ .ToLocal (&cause)) {
671+ cause = Local<Value>();
672+ }
673+ }
674+
675+ return AbortError::New (isolate, message, cause);
676+ }
677+
678+ void HandleAbortError (Local<Promise::Resolver> resolver) {
679+ Local<Object> e;
680+ if (!CreateAbortError (env ()->isolate ()).ToLocal (&e)) {
681+ Finalize ();
682+ return ;
683+ }
684+
685+ Finalize ();
686+ resolver->Reject (env ()->context (), e).ToChecked ();
687+ }
688+
689+ bool CheckAbortSignal () {
690+ if (abort_signal_.IsEmpty ()) {
691+ return false ;
692+ }
693+
694+ Isolate* isolate = env ()->isolate ();
695+ HandleScope scope (isolate);
696+ Local<Object> signal = abort_signal_.Get (isolate);
697+
698+ Local<Value> aborted_value;
699+ if (signal->Get (env ()->context (), env ()->aborted_string ())
700+ .ToLocal (&aborted_value)) {
701+ if (aborted_value->BooleanValue (isolate)) {
702+ is_aborted_.store (true , std::memory_order_release);
703+ return true ;
704+ }
705+ }
706+
707+ return false ;
708+ }
709+
576710 Environment* env () const { return env_; }
577711
578712 Environment* env_;
579713 DatabaseSync* source_;
580714 Global<Promise::Resolver> resolver_;
581- Global<Function> progressFunc_;
715+ Global<Function> progress_func_;
716+ Global<Object> abort_signal_;
582717 sqlite3* dest_ = nullptr ;
583718 sqlite3_backup* backup_ = nullptr ;
584719 int pages_;
585720 int backup_status_ = SQLITE_OK;
586721 std::string source_db_;
587722 std::string destination_name_;
588723 std::string dest_db_;
724+ std::atomic<bool > is_aborted_;
589725};
590726
591727UserDefinedFunction::UserDefinedFunction (Environment* env,
@@ -1538,7 +1674,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
15381674 int rate = 100 ;
15391675 std::string source_db = " main" ;
15401676 std::string dest_db = " main" ;
1541- Local<Function> progressFunc = Local<Function>();
1677+ Local<Function> progress_func = Local<Function>();
1678+ Local<Object> abort_signal = Local<Object>();
15421679
15431680 if (args.Length () > 2 ) {
15441681 if (!args[2 ]->IsObject ()) {
@@ -1610,7 +1747,13 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16101747 " The \" options.progress\" argument must be a function." );
16111748 return ;
16121749 }
1613- progressFunc = progress_v.As <Function>();
1750+ progress_func = progress_v.As <Function>();
1751+ }
1752+
1753+ Local<Value> signal_v;
1754+ if (!options->Get (env->context (), env->signal_string ())
1755+ .ToLocal (&signal_v)) {
1756+ return ;
16141757 }
16151758 }
16161759
@@ -1627,7 +1770,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16271770 dest_path.value (),
16281771 std::move (dest_db),
16291772 rate,
1630- progressFunc);
1773+ progress_func,
1774+ abort_signal);
16311775 db->AddBackup (job);
16321776 job->ScheduleBackup ();
16331777}
0 commit comments