diff --git a/go.mod b/go.mod
index 1703bcc..245973d 100644
--- a/go.mod
+++ b/go.mod
@@ -8,10 +8,19 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/redis/go-redis/v9 v9.7.0
github.com/rs/cors v1.11.1
+ github.com/shirou/gopsutil/v4 v4.24.12
golang.org/x/sync v0.10.0
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/ebitengine/purego v0.8.1 // indirect
+ github.com/go-ole/go-ole v1.2.6 // indirect
+ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
+ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
+ github.com/tklauser/go-sysconf v0.3.12 // indirect
+ github.com/tklauser/numcpus v0.6.1 // indirect
+ github.com/yusufpapurcu/wmi v1.2.4 // indirect
+ golang.org/x/sys v0.28.0 // indirect
)
diff --git a/go.sum b/go.sum
index 2ad9dde..64a3bfe 100644
--- a/go.sum
+++ b/go.sum
@@ -6,16 +6,48 @@ github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5m
github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
+github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
+github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
+github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4=
+github.com/shirou/gopsutil/v4 v4.24.12/go.mod h1:DCtMPAad2XceTeIAbGyVfycbYQNBGk2P8cvDi7/VN9o=
+github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
+github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
+github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
+github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
+github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
+github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
+golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/images/home.png b/images/home.png
index 9c15137..7b7e9d7 100644
Binary files a/images/home.png and b/images/home.png differ
diff --git a/redis/heartbeater.go b/redis/heartbeater.go
index 039ebef..574a56f 100644
--- a/redis/heartbeater.go
+++ b/redis/heartbeater.go
@@ -16,6 +16,8 @@ type HeartbeatData struct {
HeartbeatAt time.Time `redis:"heartbeat_at"`
Queues []byte `redis:"queues"`
PID int `redis:"pid"`
+ MemoryUsage float64 `redis:"memory_usage"`
+ CPUUsage float64 `redis:"cpu_usage"`
}
func NewHeartBeater(rc redis.UniversalClient, opts ...OptFunc) *Heartbeater {
@@ -47,6 +49,8 @@ func (s *Heartbeater) SendHeartbeat(ctx context.Context, data taskqueue.Heartbea
HeartbeatAt: data.HeartbeatAt,
Queues: queuesData,
PID: data.PID,
+ MemoryUsage: data.MemoryUsage,
+ CPUUsage: data.CPUUsage,
}
_, err = s.client.TxPipelined(ctx, func(p redis.Pipeliner) error {
@@ -107,6 +111,8 @@ func (s *Heartbeater) LastHeartbeats(ctx context.Context) ([]taskqueue.Heartbeat
HeartbeatAt: hb.HeartbeatAt,
Queues: queues,
PID: hb.PID,
+ MemoryUsage: hb.MemoryUsage,
+ CPUUsage: hb.CPUUsage,
})
}
diff --git a/redis/jobstore_test.go b/redis/jobstore_test.go
index 9312866..890a6b9 100644
--- a/redis/jobstore_test.go
+++ b/redis/jobstore_test.go
@@ -95,20 +95,14 @@ func TestStoreLastHeartbeats(t *testing.T) {
{
Name: "queue_1",
Concurrency: 10,
- MaxAttempts: 10,
- Timeout: time.Minute * 10,
},
{
Name: "queue_2",
Concurrency: 10,
- MaxAttempts: 10,
- Timeout: time.Minute,
},
{
Name: "queue_2",
Concurrency: 10,
- MaxAttempts: 10,
- Timeout: time.Second * 30,
},
},
PID: 12,
diff --git a/redis/metrics.go b/redis/metrics.go
index 5f5d122..168cc56 100644
--- a/redis/metrics.go
+++ b/redis/metrics.go
@@ -2,7 +2,9 @@ package redis
import (
"context"
+ "sort"
"strconv"
+ "strings"
"time"
"github.com/oshankkumar/taskqueue-go"
@@ -82,7 +84,7 @@ func (m *MetricsBackend) QueryRangeCounterValues(ctx context.Context, mt taskque
func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (taskqueue.MetricValue, error) {
metricName, labels := mt.Name, mt.Labels
- key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
+ key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)
result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{
Key: key,
@@ -121,7 +123,7 @@ func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (t
func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, value float64, ts time.Time) error {
metricName, labels := mt.Name, mt.Labels
- key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
+ key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)
score := ts.Unix()
return m.client.ZAdd(ctx, key, redis.Z{
@@ -132,7 +134,7 @@ func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, v
func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue.Metric, start, end time.Time) (taskqueue.MetricRangeValue, error) {
metricName, labels := mt.Name, mt.Labels
- key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
+ key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)
result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{
Key: key,
@@ -170,26 +172,40 @@ func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue
return gaugeRange, nil
}
-func redisKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string {
- key := ns + ":gauge:" + metricName
- for k, v := range labels {
- key += ":" + k + ":" + v
- }
- return key
+func redisZSetKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string {
+ return redisKeyPrefixGaugeMetrics(ns, metricName, labels) + ":values"
}
func redisHashKeyCounterMetrics(ns string, metricName string, labels map[string]string) string {
- key := ns + ":counter:" + metricName
- for k, v := range labels {
- key += ":" + k + ":" + v
- }
- return key + ":values"
+ return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":values"
}
func redisZSetKeyCounterMetrics(ns string, metricName string, labels map[string]string) string {
- key := ns + ":counter:" + metricName
- for k, v := range labels {
- key += ":" + k + ":" + v
+ return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":timestamps"
+}
+
+func redisKeyPrefixCounterMetrics(ns string, metricName string, labels map[string]string) string {
+ return ns + ":counter:" + metricName + ":" + joinLabels(labels, ":")
+}
+
+func redisKeyPrefixGaugeMetrics(ns string, metricName string, labels map[string]string) string {
+ return ns + ":gauge:" + metricName + ":" + joinLabels(labels, ":")
+}
+
+func sortedMapKeys(labels map[string]string) []string {
+ keys := make([]string, 0, len(labels))
+ for k := range labels {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+func joinLabels(labels map[string]string, sep string) string {
+ keys := sortedMapKeys(labels)
+ tokens := make([]string, 0, len(keys)*2)
+ for _, k := range keys {
+ tokens = append(tokens, k, labels[k])
}
- return key + ":timestamps"
+ return strings.Join(tokens, sep)
}
diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go
index b531ff6..e5b159b 100644
--- a/taskmanager/taskmanager.go
+++ b/taskmanager/taskmanager.go
@@ -206,8 +206,6 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) {
queues = append(queues, QueuesConfig{
QueueName: q.Name,
Concurrency: q.Concurrency,
- MaxAttempts: q.MaxAttempts,
- Timeout: q.Timeout,
})
}
@@ -217,6 +215,8 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) {
HeartbeatAt: hb.HeartbeatAt,
Queues: queues,
PID: hb.PID,
+ MemoryUsage: hb.MemoryUsage,
+ CPUUsage: hb.CPUUsage,
})
}
diff --git a/taskmanager/taskqueue-web/src/components/WorkerList.vue b/taskmanager/taskqueue-web/src/components/WorkerList.vue
index 1a6c7da..5cb5278 100644
--- a/taskmanager/taskqueue-web/src/components/WorkerList.vue
+++ b/taskmanager/taskqueue-web/src/components/WorkerList.vue
@@ -50,7 +50,7 @@
Started At:
@@ -69,6 +69,18 @@
{{ worker.pid }}
+ Memory Usage:
+ {{ worker.memoryUsage.toFixed(2) }}%
+
+ CPU Usage:
+ {{ worker.cpuUsage.toFixed(2) }}%
+