diff --git a/caddy/admin_test.go b/caddy/admin_test.go index 345ceff80..17f211cd2 100644 --- a/caddy/admin_test.go +++ b/caddy/admin_test.go @@ -94,7 +94,10 @@ func TestAutoScaleWorkerThreads(t *testing.T) { frankenphp { max_threads 10 num_threads 2 - worker ../testdata/sleep.php 1 + worker ../testdata/sleep.php { + num 1 + max_threads 3 + } } } @@ -128,8 +131,8 @@ func TestAutoScaleWorkerThreads(t *testing.T) { } } - // assert that there are now more threads than before - assert.NotEqual(t, amountOfThreads, 2) + assert.NotEqual(t, amountOfThreads, 2, "at least one thread should have been auto-scaled") + assert.LessOrEqual(t, amountOfThreads, 4, "at most 3 max_threads + 1 regular thread should be present") } // Note this test requires at least 2x40MB available memory for the process diff --git a/caddy/app.go b/caddy/app.go index ba7e60664..ec15982ed 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -149,6 +149,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerEnv(w.Env), frankenphp.WithWorkerWatchMode(w.Watch), frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures), + frankenphp.WithWorkerMaxThreads(w.MaxThreads), } opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...)) diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index 7bffd18bc..265889836 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -28,6 +28,8 @@ type workerConfig struct { FileName string `json:"file_name,omitempty"` // Num sets the number of workers to start. Num int `json:"num,omitempty"` + // MaxThreads sets the maximum number of threads for this worker. + MaxThreads int `json:"max_threads,omitempty"` // Env sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables. Env map[string]string `json:"env,omitempty"` // Directories to watch for file changes @@ -85,6 +87,17 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { } wc.Num = int(v) + case "max_threads": + if !d.NextArg() { + return wc, d.ArgErr() + } + + v, err := strconv.ParseUint(d.Val(), 10, 32) + if err != nil { + return wc, d.WrapErr(err) + } + + wc.MaxThreads = int(v) case "env": args := d.RemainingArgs() if len(args) != 2 { @@ -125,7 +138,7 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { wc.MaxConsecutiveFailures = v default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) } } diff --git a/frankenphp.go b/frankenphp.go index 3950e7abd..3a66fcaff 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -155,6 +155,7 @@ func Config() PHPConfig { func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { maxProcs := runtime.GOMAXPROCS(0) * 2 + maxThreadsFromWorkers := 0 for i, w := range opt.workers { if w.num <= 0 { @@ -164,12 +165,34 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { metrics.TotalWorkers(w.name, w.num) numWorkers += opt.workers[i].num + + if w.maxThreads > 0 { + if w.maxThreads < w.num { + return 0, fmt.Errorf("worker max_threads (%d) must be greater or equal to worker num (%d) (%q)", w.maxThreads, w.num, w.fileName) + } + + if w.maxThreads > opt.maxThreads && opt.maxThreads > 0 { + return 0, fmt.Errorf("worker max_threads (%d) cannot be greater than total max_threads (%d) (%q)", w.maxThreads, opt.maxThreads, w.fileName) + } + + maxThreadsFromWorkers += w.maxThreads - w.num + } } numThreadsIsSet := opt.numThreads > 0 maxThreadsIsSet := opt.maxThreads != 0 maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go) + // if max_threads is only defined in workers, scale up to the sum of all worker max_threads + if !maxThreadsIsSet && maxThreadsFromWorkers > 0 { + maxThreadsIsSet = true + if numThreadsIsSet { + opt.maxThreads = opt.numThreads + maxThreadsFromWorkers + } else { + opt.maxThreads = numWorkers + 1 + maxThreadsFromWorkers + } + } + if numThreadsIsSet && !maxThreadsIsSet { opt.maxThreads = opt.numThreads if opt.numThreads <= numWorkers { @@ -188,7 +211,7 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return numWorkers, nil } - if !numThreadsIsSet { + if !maxThreadsIsSet && !numThreadsIsSet { if numWorkers >= maxProcs { // Start at least as many threads as workers, and keep a free thread to handle requests in non-worker mode opt.numThreads = numWorkers + 1 diff --git a/options.go b/options.go index abf16f0ff..3ba6dbe5a 100644 --- a/options.go +++ b/options.go @@ -34,6 +34,7 @@ type workerOpt struct { name string fileName string num int + maxThreads int env PreparedEnv watch []string maxConsecutiveFailures int @@ -159,6 +160,15 @@ func WithWorkerEnv(env map[string]string) WorkerOption { } } +// WithWorkerMaxThreads sets the max number of threads for this specific worker +func WithWorkerMaxThreads(num int) WorkerOption { + return func(w *workerOpt) error { + w.maxThreads = num + + return nil + } +} + // WithWorkerWatchMode sets directories to watch for file changes func WithWorkerWatchMode(watch []string) WorkerOption { return func(w *workerOpt) error { diff --git a/phpmainthread_test.go b/phpmainthread_test.go index edbd7b3a6..4ad502fa9 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -284,6 +284,15 @@ func TestCorrectThreadCalculation(t *testing.T) { testThreadCalculation(t, 2, -1, &opt{maxThreads: -1, workers: oneWorkerThread}) testThreadCalculation(t, 2, -1, &opt{numThreads: 2, maxThreads: -1}) + // max_threads should be thread minimum + sum of worker max_threads + testThreadCalculation(t, 2, 6, &opt{workers: []workerOpt{{num: 1, maxThreads: 5}}}) + testThreadCalculation(t, 6, 9, &opt{workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 4, maxThreads: 4}}}) + testThreadCalculation(t, 10, 14, &opt{numThreads: 10, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 3, maxThreads: 4}}}) + + // max_threads should remain equal to overall max_threads + testThreadCalculation(t, 2, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 3}}}) + testThreadCalculation(t, 3, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 1, maxThreads: 4}}}) + // not enough num threads testThreadCalculationError(t, &opt{numThreads: 1, workers: oneWorkerThread}) testThreadCalculationError(t, &opt{numThreads: 1, maxThreads: 1, workers: oneWorkerThread}) @@ -291,9 +300,17 @@ func TestCorrectThreadCalculation(t *testing.T) { // not enough max_threads testThreadCalculationError(t, &opt{numThreads: 2, maxThreads: 1}) testThreadCalculationError(t, &opt{maxThreads: 1, workers: oneWorkerThread}) + + // worker max_threads is bigger than overall max_threads + testThreadCalculationError(t, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 10}}}) + + // worker max_threads is smaller than num_threads + testThreadCalculationError(t, &opt{workers: []workerOpt{{num: 3, maxThreads: 2}}}) } func testThreadCalculation(t *testing.T, expectedNumThreads int, expectedMaxThreads int, o *opt) { + t.Helper() + _, err := calculateMaxThreads(o) assert.NoError(t, err, "no error should be returned") assert.Equal(t, expectedNumThreads, o.numThreads, "num_threads must be correct") @@ -301,6 +318,8 @@ func testThreadCalculation(t *testing.T, expectedNumThreads int, expectedMaxThre } func testThreadCalculationError(t *testing.T, o *opt) { + t.Helper() + _, err := calculateMaxThreads(o) assert.Error(t, err, "configuration must error") } diff --git a/scaling.go b/scaling.go index 3f541c279..af7c6c29d 100644 --- a/scaling.go +++ b/scaling.go @@ -171,11 +171,21 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, } // if the request has been stalled long enough, scale - if fc.worker != nil { - scaleWorkerThread(fc.worker) - } else { + if fc.worker == nil { scaleRegularThread() + continue } + + // check for max worker threads here again in case requests overflowed while waiting + if fc.worker.isAtThreadLimit() { + if globalLogger.Enabled(globalCtx, slog.LevelInfo) { + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "cannot scale worker thread, max threads reached for worker", slog.String("worker", fc.worker.name)) + } + + continue + } + + scaleWorkerThread(fc.worker) case <-done: return } diff --git a/worker.go b/worker.go index 0a1dbda11..cf6ea9dbf 100644 --- a/worker.go +++ b/worker.go @@ -18,6 +18,7 @@ type worker struct { name string fileName string num int + maxThreads int env PreparedEnv requestChan chan contextHolder threads []*phpThread @@ -127,6 +128,7 @@ func newWorker(o workerOpt) (*worker, error) { name: o.name, fileName: absFileName, num: o.num, + maxThreads: o.maxThreads, env: o.env, requestChan: make(chan contextHolder), threads: make([]*phpThread, 0, o.num), @@ -228,6 +230,19 @@ func (worker *worker) countThreads() int { return l } +// check if max_threads has been reached +func (worker *worker) isAtThreadLimit() bool { + if worker.maxThreads <= 0 { + return false + } + + worker.threadMutex.RLock() + atMaxThreads := len(worker.threads) >= worker.maxThreads + worker.threadMutex.RUnlock() + + return atMaxThreads +} + func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) @@ -250,6 +265,11 @@ func (worker *worker) handleRequest(ch contextHolder) error { // if no thread was available, mark the request as queued and apply the scaling strategy metrics.QueuedWorkerRequest(worker.name) for { + workerScaleChan := scaleChan + if worker.isAtThreadLimit() { + workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling + } + select { case worker.requestChan <- ch: metrics.DequeuedWorkerRequest(worker.name) @@ -257,7 +277,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) return nil - case scaleChan <- ch.frankenPHPContext: + case workerScaleChan <- ch.frankenPHPContext: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling