1212#include " threadpoolwork-inl.h"
1313#include " util-inl.h"
1414
15+ #include < atomic>
1516#include < cinttypes>
1617
1718namespace node {
@@ -115,13 +116,67 @@ using v8::Value;
115116 UNREACHABLE (" Bad SQLite value" ); \
116117 } \
117118 } while (0 )
119+ // TODO(@lluisemper) This is a copy of node::AbortError, use js native
120+ // AbortError constructor to allow instanceof checks in JS.
121+ class AbortError {
122+ public:
123+ static MaybeLocal<Object> New (
124+ Isolate* isolate,
125+ std::string_view message = " The operation was aborted" ,
126+ Local<Value> cause = Local<Value>()) {
127+ Local<String> js_msg;
128+ Local<Object> error_obj;
129+ Local<Context> context = isolate->GetCurrentContext ();
130+ Environment* env = Environment::GetCurrent (isolate);
131+
132+ if (!String::NewFromUtf8 (isolate,
133+ message.data (),
134+ NewStringType::kNormal ,
135+ static_cast <int >(message.size ()))
136+ .ToLocal (&js_msg) ||
137+ !Exception::Error (js_msg)->ToObject (context).ToLocal (&error_obj)) {
138+ return MaybeLocal<Object>();
139+ }
140+
141+ Local<String> error_name;
142+ if (!String::NewFromUtf8 (isolate, " AbortError" ).ToLocal (&error_name)) {
143+ return MaybeLocal<Object>();
144+ }
145+ if (error_obj->Set (context, env->name_string (), error_name).IsNothing ()) {
146+ return MaybeLocal<Object>();
147+ }
148+
149+ Local<String> code_key;
150+ Local<String> code_value;
151+ if (!String::NewFromUtf8 (isolate, " code" ).ToLocal (&code_key) ||
152+ !String::NewFromUtf8 (isolate, " ABORT_ERR" ).ToLocal (&code_value)) {
153+ return MaybeLocal<Object>();
154+ }
155+ if (error_obj->Set (context, code_key, code_value).IsNothing ()) {
156+ return MaybeLocal<Object>();
157+ }
158+
159+ if (!cause.IsEmpty () && !cause->IsUndefined ()) {
160+ if (error_obj->Set (context, env->cause_string (), cause).IsNothing ()) {
161+ return MaybeLocal<Object>();
162+ }
163+ }
164+
165+ return error_obj;
166+ }
167+ };
118168
119169inline MaybeLocal<Object> CreateSQLiteError (Isolate* isolate,
120- const char * message) {
170+ std::string_view message) {
121171 Local<String> js_msg;
122172 Local<Object> e;
123173 Environment* env = Environment::GetCurrent (isolate);
124- if (!String::NewFromUtf8 (isolate, message).ToLocal (&js_msg) ||
174+
175+ if (!String::NewFromUtf8 (isolate,
176+ message.data (),
177+ NewStringType::kNormal ,
178+ static_cast <int >(message.size ()))
179+ .ToLocal (&js_msg) ||
125180 !Exception::Error (js_msg)
126181 ->ToObject (isolate->GetCurrentContext ())
127182 .ToLocal (&e) ||
@@ -131,6 +186,7 @@ inline MaybeLocal<Object> CreateSQLiteError(Isolate* isolate,
131186 .IsNothing ()) {
132187 return MaybeLocal<Object>();
133188 }
189+
134190 return e;
135191}
136192
@@ -433,16 +489,21 @@ class BackupJob : public ThreadPoolWork {
433489 std::string destination_name,
434490 std::string dest_db,
435491 int pages,
436- Local<Function> progressFunc)
492+ Local<Function> progress_func,
493+ Local<Object> abort_signal = Local<Object>())
437494 : ThreadPoolWork(env, " node_sqlite3.BackupJob" ),
438495 env_(env),
439496 source_(source),
440497 pages_(pages),
441498 source_db_(std::move(source_db)),
442499 destination_name_(std::move(destination_name)),
443- dest_db_(std::move(dest_db)) {
500+ dest_db_(std::move(dest_db)),
501+ is_aborted_(false ) {
444502 resolver_.Reset (env->isolate (), resolver);
445- progressFunc_.Reset (env->isolate (), progressFunc);
503+ progress_func_.Reset (env->isolate (), progress_func);
504+ if (!abort_signal.IsEmpty ()) {
505+ abort_signal_.Reset (env->isolate (), abort_signal);
506+ }
446507 }
447508
448509 void ScheduleBackup () {
@@ -471,6 +532,10 @@ class BackupJob : public ThreadPoolWork {
471532 }
472533
473534 void DoThreadPoolWork () override {
535+ if (is_aborted_.load (std::memory_order_acquire)) {
536+ backup_status_ = SQLITE_INTERRUPT;
537+ return ;
538+ }
474539 backup_status_ = sqlite3_backup_step (backup_, pages_);
475540 }
476541
@@ -479,6 +544,12 @@ class BackupJob : public ThreadPoolWork {
479544 Local<Promise::Resolver> resolver =
480545 Local<Promise::Resolver>::New (env ()->isolate (), resolver_);
481546
547+ if (is_aborted_.load (std::memory_order_acquire) ||
548+ backup_status_ == SQLITE_INTERRUPT) {
549+ HandleAbortError (resolver);
550+ return ;
551+ }
552+
482553 if (!(backup_status_ == SQLITE_OK || backup_status_ == SQLITE_DONE ||
483554 backup_status_ == SQLITE_BUSY || backup_status_ == SQLITE_LOCKED)) {
484555 HandleBackupError (resolver, backup_status_);
@@ -489,7 +560,7 @@ class BackupJob : public ThreadPoolWork {
489560 int remaining_pages = sqlite3_backup_remaining (backup_);
490561 if (remaining_pages != 0 ) {
491562 Local<Function> fn =
492- Local<Function>::New (env ()->isolate (), progressFunc_ );
563+ Local<Function>::New (env ()->isolate (), progress_func_ );
493564 if (!fn.IsEmpty ()) {
494565 Local<Object> progress_info = Object::New (env ()->isolate ());
495566 if (progress_info
@@ -515,6 +586,14 @@ class BackupJob : public ThreadPoolWork {
515586 return ;
516587 }
517588 }
589+ if (CheckAbortSignal ()) {
590+ // TODO(@lluisemper): BackupJob does not implement proper async context
591+ // tracking yet.
592+ // Consider inheriting from AsyncWrap and using CallbackScope to
593+ // propagate async context, similar to other ThreadPoolWork items.
594+ HandleAbortError (resolver);
595+ return ;
596+ }
518597
519598 // There's still work to do
520599 this ->ScheduleWork ();
@@ -548,6 +627,10 @@ class BackupJob : public ThreadPoolWork {
548627 sqlite3_close_v2 (dest_);
549628 dest_ = nullptr ;
550629 }
630+
631+ if (!abort_signal_.IsEmpty ()) {
632+ abort_signal_.Reset ();
633+ }
551634 }
552635
553636 private:
@@ -573,19 +656,73 @@ class BackupJob : public ThreadPoolWork {
573656 resolver->Reject (env ()->context (), e).ToChecked ();
574657 }
575658
659+ inline MaybeLocal<Object> CreateAbortError (
660+ Isolate* isolate,
661+ std::string_view message = " The operation was aborted" ) {
662+ Environment* env = Environment::GetCurrent (isolate);
663+ HandleScope scope (isolate);
664+ Local<Value> cause;
665+
666+ if (!abort_signal_.IsEmpty ()) {
667+ Local<Object> signal = abort_signal_.Get (isolate);
668+ Local<String> reason_key = env->reason_string ();
669+
670+ if (!signal->Get (isolate->GetCurrentContext (), reason_key)
671+ .ToLocal (&cause)) {
672+ cause = Local<Value>();
673+ }
674+ }
675+
676+ return AbortError::New (isolate, message, cause);
677+ }
678+
679+ void HandleAbortError (Local<Promise::Resolver> resolver) {
680+ Local<Object> e;
681+ if (!CreateAbortError (env ()->isolate ()).ToLocal (&e)) {
682+ Finalize ();
683+ return ;
684+ }
685+
686+ Finalize ();
687+ resolver->Reject (env ()->context (), e).ToChecked ();
688+ }
689+
690+ bool CheckAbortSignal () {
691+ if (abort_signal_.IsEmpty ()) {
692+ return false ;
693+ }
694+
695+ Isolate* isolate = env ()->isolate ();
696+ HandleScope scope (isolate);
697+ Local<Object> signal = abort_signal_.Get (isolate);
698+
699+ Local<Value> aborted_value;
700+ if (signal->Get (env ()->context (), env ()->aborted_string ())
701+ .ToLocal (&aborted_value)) {
702+ if (aborted_value->BooleanValue (isolate)) {
703+ is_aborted_.store (true , std::memory_order_release);
704+ return true ;
705+ }
706+ }
707+
708+ return false ;
709+ }
710+
576711 Environment* env () const { return env_; }
577712
578713 Environment* env_;
579714 DatabaseSync* source_;
580715 Global<Promise::Resolver> resolver_;
581- Global<Function> progressFunc_;
716+ Global<Function> progress_func_;
717+ Global<Object> abort_signal_;
582718 sqlite3* dest_ = nullptr ;
583719 sqlite3_backup* backup_ = nullptr ;
584720 int pages_;
585721 int backup_status_ = SQLITE_OK;
586722 std::string source_db_;
587723 std::string destination_name_;
588724 std::string dest_db_;
725+ std::atomic<bool > is_aborted_;
589726};
590727
591728UserDefinedFunction::UserDefinedFunction (Environment* env,
@@ -1538,7 +1675,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
15381675 int rate = 100 ;
15391676 std::string source_db = " main" ;
15401677 std::string dest_db = " main" ;
1541- Local<Function> progressFunc = Local<Function>();
1678+ Local<Function> progress_func = Local<Function>();
1679+ Local<Object> abort_signal = Local<Object>();
15421680
15431681 if (args.Length () > 2 ) {
15441682 if (!args[2 ]->IsObject ()) {
@@ -1610,7 +1748,13 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16101748 " The \" options.progress\" argument must be a function." );
16111749 return ;
16121750 }
1613- progressFunc = progress_v.As <Function>();
1751+ progress_func = progress_v.As <Function>();
1752+ }
1753+
1754+ Local<Value> signal_v;
1755+ if (!options->Get (env->context (), env->signal_string ())
1756+ .ToLocal (&signal_v)) {
1757+ return ;
16141758 }
16151759 }
16161760
@@ -1627,7 +1771,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16271771 dest_path.value (),
16281772 std::move (dest_db),
16291773 rate,
1630- progressFunc);
1774+ progress_func,
1775+ abort_signal);
16311776 db->AddBackup (job);
16321777 job->ScheduleBackup ();
16331778}
0 commit comments