diff --git a/go.mod b/go.mod index 1703bcc..245973d 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,19 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/redis/go-redis/v9 v9.7.0 github.com/rs/cors v1.11.1 + github.com/shirou/gopsutil/v4 v4.24.12 golang.org/x/sync v0.10.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/ebitengine/purego v0.8.1 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + golang.org/x/sys v0.28.0 // indirect ) diff --git a/go.sum b/go.sum index 2ad9dde..64a3bfe 100644 --- a/go.sum +++ b/go.sum @@ -6,16 +6,48 @@ github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5m github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE= +github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4= +github.com/shirou/gopsutil/v4 v4.24.12/go.mod h1:DCtMPAad2XceTeIAbGyVfycbYQNBGk2P8cvDi7/VN9o= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/images/home.png b/images/home.png index 9c15137..7b7e9d7 100644 Binary files a/images/home.png and b/images/home.png differ diff --git a/redis/heartbeater.go b/redis/heartbeater.go index 039ebef..574a56f 100644 --- a/redis/heartbeater.go +++ b/redis/heartbeater.go @@ -16,6 +16,8 @@ type HeartbeatData struct { HeartbeatAt time.Time `redis:"heartbeat_at"` Queues []byte `redis:"queues"` PID int `redis:"pid"` + MemoryUsage float64 `redis:"memory_usage"` + CPUUsage float64 `redis:"cpu_usage"` } func NewHeartBeater(rc redis.UniversalClient, opts ...OptFunc) *Heartbeater { @@ -47,6 +49,8 @@ func (s *Heartbeater) SendHeartbeat(ctx context.Context, data taskqueue.Heartbea HeartbeatAt: data.HeartbeatAt, Queues: queuesData, PID: data.PID, + MemoryUsage: data.MemoryUsage, + CPUUsage: data.CPUUsage, } _, err = s.client.TxPipelined(ctx, func(p redis.Pipeliner) error { @@ -107,6 +111,8 @@ func (s *Heartbeater) LastHeartbeats(ctx context.Context) ([]taskqueue.Heartbeat HeartbeatAt: hb.HeartbeatAt, Queues: queues, PID: hb.PID, + MemoryUsage: hb.MemoryUsage, + CPUUsage: hb.CPUUsage, }) } diff --git a/redis/jobstore_test.go b/redis/jobstore_test.go index 9312866..890a6b9 100644 --- a/redis/jobstore_test.go +++ b/redis/jobstore_test.go @@ -95,20 +95,14 @@ func TestStoreLastHeartbeats(t *testing.T) { { Name: "queue_1", Concurrency: 10, - MaxAttempts: 10, - Timeout: time.Minute * 10, }, { Name: "queue_2", Concurrency: 10, - MaxAttempts: 10, - Timeout: time.Minute, }, { Name: "queue_2", Concurrency: 10, - MaxAttempts: 10, - Timeout: time.Second * 30, }, }, PID: 12, diff --git a/redis/metrics.go b/redis/metrics.go index 5f5d122..168cc56 100644 --- a/redis/metrics.go +++ b/redis/metrics.go @@ -2,7 +2,9 @@ package redis import ( "context" + "sort" "strconv" + "strings" "time" "github.com/oshankkumar/taskqueue-go" @@ -82,7 +84,7 @@ func (m *MetricsBackend) QueryRangeCounterValues(ctx context.Context, mt taskque func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (taskqueue.MetricValue, error) { metricName, labels := mt.Name, mt.Labels - key := redisKeyGaugeMetrics(m.namespace, metricName, labels) + key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels) result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{ Key: key, @@ -121,7 +123,7 @@ func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (t func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, value float64, ts time.Time) error { metricName, labels := mt.Name, mt.Labels - key := redisKeyGaugeMetrics(m.namespace, metricName, labels) + key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels) score := ts.Unix() return m.client.ZAdd(ctx, key, redis.Z{ @@ -132,7 +134,7 @@ func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, v func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue.Metric, start, end time.Time) (taskqueue.MetricRangeValue, error) { metricName, labels := mt.Name, mt.Labels - key := redisKeyGaugeMetrics(m.namespace, metricName, labels) + key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels) result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{ Key: key, @@ -170,26 +172,40 @@ func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue return gaugeRange, nil } -func redisKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string { - key := ns + ":gauge:" + metricName - for k, v := range labels { - key += ":" + k + ":" + v - } - return key +func redisZSetKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string { + return redisKeyPrefixGaugeMetrics(ns, metricName, labels) + ":values" } func redisHashKeyCounterMetrics(ns string, metricName string, labels map[string]string) string { - key := ns + ":counter:" + metricName - for k, v := range labels { - key += ":" + k + ":" + v - } - return key + ":values" + return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":values" } func redisZSetKeyCounterMetrics(ns string, metricName string, labels map[string]string) string { - key := ns + ":counter:" + metricName - for k, v := range labels { - key += ":" + k + ":" + v + return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":timestamps" +} + +func redisKeyPrefixCounterMetrics(ns string, metricName string, labels map[string]string) string { + return ns + ":counter:" + metricName + ":" + joinLabels(labels, ":") +} + +func redisKeyPrefixGaugeMetrics(ns string, metricName string, labels map[string]string) string { + return ns + ":gauge:" + metricName + ":" + joinLabels(labels, ":") +} + +func sortedMapKeys(labels map[string]string) []string { + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +func joinLabels(labels map[string]string, sep string) string { + keys := sortedMapKeys(labels) + tokens := make([]string, 0, len(keys)*2) + for _, k := range keys { + tokens = append(tokens, k, labels[k]) } - return key + ":timestamps" + return strings.Join(tokens, sep) } diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go index b531ff6..e5b159b 100644 --- a/taskmanager/taskmanager.go +++ b/taskmanager/taskmanager.go @@ -206,8 +206,6 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) { queues = append(queues, QueuesConfig{ QueueName: q.Name, Concurrency: q.Concurrency, - MaxAttempts: q.MaxAttempts, - Timeout: q.Timeout, }) } @@ -217,6 +215,8 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) { HeartbeatAt: hb.HeartbeatAt, Queues: queues, PID: hb.PID, + MemoryUsage: hb.MemoryUsage, + CPUUsage: hb.CPUUsage, }) } diff --git a/taskmanager/taskqueue-web/src/components/WorkerList.vue b/taskmanager/taskqueue-web/src/components/WorkerList.vue index 1a6c7da..5cb5278 100644 --- a/taskmanager/taskqueue-web/src/components/WorkerList.vue +++ b/taskmanager/taskqueue-web/src/components/WorkerList.vue @@ -50,7 +50,7 @@ - +

Started At: @@ -69,6 +69,18 @@ {{ worker.pid }}

+ +

+ Memory Usage: + {{ worker.memoryUsage.toFixed(2) }}% +

+
+ +

+ CPU Usage: + {{ worker.cpuUsage.toFixed(2) }}% +

+
@@ -119,4 +131,9 @@ export default { max-height: 400px; overflow-y: auto; } + +.stats p { + font-size: 14px; + margin: 5px 0; +} diff --git a/taskmanager/taskqueue-web/src/pages/Home.vue b/taskmanager/taskqueue-web/src/pages/Home.vue index 4eb6e56..c0a5e36 100644 --- a/taskmanager/taskqueue-web/src/pages/Home.vue +++ b/taskmanager/taskqueue-web/src/pages/Home.vue @@ -31,7 +31,7 @@
-
+
diff --git a/taskmanager/types.go b/taskmanager/types.go index 62578ce..0e39d3e 100644 --- a/taskmanager/types.go +++ b/taskmanager/types.go @@ -48,10 +48,8 @@ type ListActiveWorkersResponse struct { } type QueuesConfig struct { - QueueName string `json:"queueName"` - Concurrency int `json:"concurrency"` - MaxAttempts int `json:"maxAttempts"` - Timeout time.Duration `json:"timeout"` + QueueName string `json:"queueName"` + Concurrency int `json:"concurrency"` } type ActiveWorker struct { @@ -60,6 +58,8 @@ type ActiveWorker struct { HeartbeatAt time.Time `json:"heartbeatAt"` Queues []QueuesConfig `json:"queues"` PID int `json:"pid"` + MemoryUsage float64 `json:"memoryUsage"` + CPUUsage float64 `json:"cpuUsage"` } type TogglePendingQueueStatusResponse struct { diff --git a/worker.go b/worker.go index 091e4e5..60f35f1 100644 --- a/worker.go +++ b/worker.go @@ -3,6 +3,8 @@ package taskqueue import ( "context" "errors" + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" "log/slog" "math" "os" @@ -313,23 +315,39 @@ func (w *Worker) startHeartBeat(ctx context.Context) { queues = append(queues, HeartbeatQueueData{ Name: h.queueName, Concurrency: h.jobOptions.Concurrency, - MaxAttempts: h.jobOptions.MaxAttempts, - Timeout: h.jobOptions.Timeout, }) } w.internalLogger.Info("starting heartbeat loop") - if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{ - WorkerID: w.id, - StartedAt: w.startedAt, - HeartbeatAt: time.Now(), - Queues: queues, - PID: pid, - }); err != nil { - w.errorHandler(err) + sendHearBeat := func() { + var memUsed float64 + memStat, err := mem.VirtualMemoryWithContext(ctx) + if err == nil { + memUsed = memStat.UsedPercent + } + + var cpuUsed float64 + cpuStat, err := cpu.PercentWithContext(ctx, 0, false) + if err == nil { + cpuUsed = cpuStat[0] + } + + if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{ + WorkerID: w.id, + StartedAt: w.startedAt, + HeartbeatAt: time.Now(), + Queues: queues, + PID: pid, + MemoryUsage: memUsed, + CPUUsage: cpuUsed, + }); err != nil { + w.errorHandler(err) + } } + sendHearBeat() + heartBeatTicker := time.NewTicker(time.Second * 10) defer heartBeatTicker.Stop() @@ -337,21 +355,13 @@ func (w *Worker) startHeartBeat(ctx context.Context) { select { case <-ctx.Done(): ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*5) - defer cancel() if err := w.heartBeater.RemoveHeartbeat(ctx, w.id); err != nil { w.errorHandler(err) } + cancel() return case <-heartBeatTicker.C: - if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{ - WorkerID: w.id, - StartedAt: w.startedAt, - HeartbeatAt: time.Now(), - Queues: queues, - PID: pid, - }); err != nil { - w.errorHandler(err) - } + sendHearBeat() } } } @@ -459,8 +469,6 @@ func (w *Worker) monitorQueues(ctx context.Context) { type HeartbeatQueueData struct { Name string Concurrency int - MaxAttempts int - Timeout time.Duration } type HeartbeatData struct { @@ -469,6 +477,8 @@ type HeartbeatData struct { HeartbeatAt time.Time Queues []HeartbeatQueueData PID int + MemoryUsage float64 + CPUUsage float64 } type HeartBeater interface {