diff --git a/Readme.md b/Readme.md index befdbc5..b617936 100644 --- a/Readme.md +++ b/Readme.md @@ -3,6 +3,7 @@ [![GoDoc](https://godoc.org/github.com/oshankkumar/taskqueue-go?status.svg)](https://godoc.org/github.com/oshankkumar/taskqueue-go) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) ![Build Status](https://github.com/oshankkumar/taskqueue-go/actions/workflows/go.yml/badge.svg?branch=main) +![Build Status](https://github.com/oshankkumar/taskqueue-go/actions/workflows/build_taskmanager.yml/badge.svg?branch=main) [![Go Report Card](https://goreportcard.com/badge/github.com/oshankkumar/taskqueue-go)](https://goreportcard.com/report/github.com/oshankkumar/taskqueue-go) **TaskQueue-Go** is a high-performance, distributed task queue library for Go, designed to simplify background job diff --git a/examples/basic-worker/main.go b/examples/basic-worker/main.go index 098dc05..d733b18 100644 --- a/examples/basic-worker/main.go +++ b/examples/basic-worker/main.go @@ -60,7 +60,7 @@ func main() { fmt.Printf("job processed queue_name=email_queue job_id=%s\n", job.ID) return nil - }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1)) + }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4)) worker.RegisterHandler("payment_queue", taskqueue.HandlerFunc(func(ctx context.Context, job *taskqueue.Job) error { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) @@ -72,19 +72,19 @@ func main() { paymentProcessed.Add(1) fmt.Printf("job processed queue_name=payment_queue job_id=%s\n", job.ID) return nil - }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1)) + }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4)) worker.RegisterHandler("push_notification_queue", taskqueue.HandlerFunc(func(ctx context.Context, job *taskqueue.Job) error { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) if rand.Intn(100) < 30 { - return errors.New("something bad happened") + return taskqueue.ErrSkipRetry{Err: errors.New("something bad happened"), SkipReason: "Don't want to send outdated notification"} } notifyProcessed.Add(1) fmt.Printf("job processed queue_name=push_notification_queue job_id=%s\n", job.ID) return nil - }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1)) + }), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4)) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() diff --git a/redis/luascripts/dequeue_inline.lua b/redis/luascripts/dequeue_inline.lua index f57a5d2..9050970 100644 --- a/redis/luascripts/dequeue_inline.lua +++ b/redis/luascripts/dequeue_inline.lua @@ -23,6 +23,12 @@ redis.call("ZADD", queueKey, "XX", newScore, jobID) -- Fetch job details from the hash local jobKey = jobKeyPrefix .. jobID + +local fieldsAdded = redis.call("HSET", jobKey, "status", "Active") +if fieldsAdded ~= 0 then + return { err = "Failed in updating the job status to Active" } +end + local jobDetails = redis.call("HGETALL", jobKey) -- Return the job ID and its details diff --git a/redis/redisq.go b/redis/redisq.go index 5893943..8fac4dd 100644 --- a/redis/redisq.go +++ b/redis/redisq.go @@ -165,10 +165,10 @@ func (q *Queue) Ack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.Ack } func (q *Queue) Nack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.NackOptions) error { - if opts.MaxAttemptsExceeded { - return q.moveToDead(ctx, job, opts) + if opts.ShouldRetry { + return q.retry(ctx, job, opts) } - return q.retry(ctx, job, opts) + return q.moveToDead(ctx, job, opts) } func (q *Queue) moveToDead(ctx context.Context, job *taskqueue.Job, opts *taskqueue.NackOptions) error { diff --git a/redis/redisq_test.go b/redis/redisq_test.go index 5f7a1d1..9e273d9 100644 --- a/redis/redisq_test.go +++ b/redis/redisq_test.go @@ -3,7 +3,6 @@ package redis import ( "bytes" "context" - "os" "testing" "time" @@ -26,16 +25,13 @@ const testPayload = `{ "phone": "+1 (823) 515-3571" }` -func TestRedisQueueEnqueue(t *testing.T) { - redisAddr := os.Getenv("REDIS_ADDR") - if redisAddr == "" { - t.Skip("skipping test since REDIS_ADDR is not set") - } - - client := redis.NewClient(&redis.Options{Addr: redisAddr}) +func TestRedisQueue(t *testing.T) { + client := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) q := NewQueue(client, WithCompletedJobTTL(time.Minute*30)) + client.Del(context.Background(), redisKeyPendingQueue(taskqueue.DefaultNameSpace, "test_redis_queue")) + job := taskqueue.NewJob() job.Payload = []byte(testPayload) @@ -60,7 +56,11 @@ func TestRedisQueueEnqueue(t *testing.T) { } if deqJob.ID != job.ID { - t.Fatal("expected ID to be equal to job ID") + t.Fatalf("expected ID to be equal to job ID, expected: %s got: %s", job.ID, deqJob.ID) + } + + if deqJob.Status != taskqueue.JobStatusActive { + t.Error("expected status to be active after dequeue got:", deqJob.Status) } deqJob.Status = taskqueue.JobStatusCompleted diff --git a/redis/redisqwithjobstorage.go b/redis/redisqwithjobstorage.go index 72ded19..bbdccf7 100644 --- a/redis/redisqwithjobstorage.go +++ b/redis/redisqwithjobstorage.go @@ -127,10 +127,10 @@ func (q *QueueWithExternalJobStorage) Nack(ctx context.Context, job *taskqueue.J return err } - if opts.MaxAttemptsExceeded { - return q.nackDead(ctx, job.ID, opts) + if opts.ShouldRetry { + return q.nack(ctx, job.ID, opts) } - return q.nack(ctx, job.ID, opts) + return q.nackDead(ctx, job.ID, opts) } func (q *QueueWithExternalJobStorage) nackDead(ctx context.Context, jobID string, opts *taskqueue.NackOptions) error { diff --git a/taskqueue.go b/taskqueue.go index 312db16..5ca3c24 100644 --- a/taskqueue.go +++ b/taskqueue.go @@ -126,9 +126,9 @@ type AckOptions struct { } type NackOptions struct { - QueueName string - RetryAfter time.Duration - MaxAttemptsExceeded bool + QueueName string + RetryAfter time.Duration + ShouldRetry bool } type Acker interface { diff --git a/worker.go b/worker.go index 60f35f1..d9b123a 100644 --- a/worker.go +++ b/worker.go @@ -3,13 +3,15 @@ package taskqueue import ( "context" "errors" - "github.com/shirou/gopsutil/v4/cpu" - "github.com/shirou/gopsutil/v4/mem" + "fmt" "log/slog" "math" "os" "sync" "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" ) type Handler interface { @@ -255,6 +257,19 @@ func (w *Worker) dequeueJob(ctx context.Context, jobCh chan<- *Job, h *queueHand } } +type ErrSkipRetry struct { + Err error + SkipReason string +} + +func (e ErrSkipRetry) Error() string { + return fmt.Sprintf("skip retry: %s, reason: %s", e.Err, e.SkipReason) +} + +func (e ErrSkipRetry) Unwrap() error { + return e.Err +} + func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) error { ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), h.jobOptions.Timeout) defer cancel() @@ -271,16 +286,17 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro job.UpdatedAt = time.Now() job.Attempts++ + var skipErr ErrSkipRetry switch { case jobErr == nil: job.Status = JobStatusCompleted - case job.Attempts >= h.jobOptions.MaxAttempts: + case job.Attempts >= h.jobOptions.MaxAttempts, errors.As(jobErr, &skipErr): job.Status = JobStatusDead default: job.Status = JobStatusFailed } - if jobErr == nil { + if job.Status == JobStatusCompleted { _ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobProcessedCount}, 1, time.Now()) return w.queue.Ack(ctx, job, &AckOptions{QueueName: h.queueName}) } @@ -288,9 +304,9 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro _ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobFailedCount}, 1, time.Now()) nackOpts := &NackOptions{ - QueueName: h.queueName, - RetryAfter: h.jobOptions.BackoffFunc(job.Attempts), - MaxAttemptsExceeded: job.Attempts >= h.jobOptions.MaxAttempts, + QueueName: h.queueName, + RetryAfter: h.jobOptions.BackoffFunc(job.Attempts), + ShouldRetry: job.Status == JobStatusFailed, } return w.queue.Nack(ctx, job, nackOpts) @@ -348,7 +364,7 @@ func (w *Worker) startHeartBeat(ctx context.Context) { sendHearBeat() - heartBeatTicker := time.NewTicker(time.Second * 10) + heartBeatTicker := time.NewTicker(heartBeatInterval) defer heartBeatTicker.Stop() for { @@ -466,6 +482,8 @@ func (w *Worker) monitorQueues(ctx context.Context) { } } +const heartBeatInterval = time.Second * 30 + type HeartbeatQueueData struct { Name string Concurrency int