Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions caddy/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func TestAutoScaleWorkerThreads(t *testing.T) {
frankenphp {
max_threads 10
num_threads 2
worker ../testdata/sleep.php 1
worker ../testdata/sleep.php {
num 1
max_threads 3
}
}
}

Expand Down Expand Up @@ -128,8 +131,8 @@ func TestAutoScaleWorkerThreads(t *testing.T) {
}
}

// assert that there are now more threads than before
assert.NotEqual(t, amountOfThreads, 2)
assert.NotEqual(t, amountOfThreads, 2, "at least one thread should have been auto-scaled")
assert.LessOrEqual(t, amountOfThreads, 4, "at most 3 max_threads + 1 regular thread should be present")
}

// Note this test requires at least 2x40MB available memory for the process
Expand Down
1 change: 1 addition & 0 deletions caddy/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithWorkerEnv(w.Env),
frankenphp.WithWorkerWatchMode(w.Watch),
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
}

opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...))
Expand Down
15 changes: 14 additions & 1 deletion caddy/workerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type workerConfig struct {
FileName string `json:"file_name,omitempty"`
// Num sets the number of workers to start.
Num int `json:"num,omitempty"`
// MaxThreads sets the maximum number of threads for this worker.
MaxThreads int `json:"max_threads,omitempty"`
// Env sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables.
Env map[string]string `json:"env,omitempty"`
// Directories to watch for file changes
Expand Down Expand Up @@ -85,6 +87,17 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {
}

wc.Num = int(v)
case "max_threads":
if !d.NextArg() {
return wc, d.ArgErr()
}

v, err := strconv.ParseUint(d.Val(), 10, 32)
if err != nil {
return wc, d.WrapErr(err)
}

wc.MaxThreads = int(v)
case "env":
args := d.RemainingArgs()
if len(args) != 2 {
Expand Down Expand Up @@ -125,7 +138,7 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {

wc.MaxConsecutiveFailures = v
default:
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures", v)
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v)
}
}

Expand Down
25 changes: 24 additions & 1 deletion frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func Config() PHPConfig {

func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
maxProcs := runtime.GOMAXPROCS(0) * 2
maxThreadsFromWorkers := 0

for i, w := range opt.workers {
if w.num <= 0 {
Expand All @@ -164,12 +165,34 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
metrics.TotalWorkers(w.name, w.num)

numWorkers += opt.workers[i].num

if w.maxThreads > 0 {
if w.maxThreads < w.num {
return 0, fmt.Errorf("worker max_threads (%d) must be greater or equal to worker num (%d) (%q)", w.maxThreads, w.num, w.fileName)
}

if w.maxThreads > opt.maxThreads && opt.maxThreads > 0 {
return 0, fmt.Errorf("worker max_threads (%d) cannot be greater than total max_threads (%d) (%q)", w.maxThreads, opt.maxThreads, w.fileName)
}

maxThreadsFromWorkers += w.maxThreads - w.num
}
}

numThreadsIsSet := opt.numThreads > 0
maxThreadsIsSet := opt.maxThreads != 0
maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go)

// if max_threads is only defined in workers, scale up to the sum of all worker max_threads
if !maxThreadsIsSet && maxThreadsFromWorkers > 0 {
maxThreadsIsSet = true
if numThreadsIsSet {
opt.maxThreads = opt.numThreads + maxThreadsFromWorkers
} else {
opt.maxThreads = numWorkers + 1 + maxThreadsFromWorkers
}
}

if numThreadsIsSet && !maxThreadsIsSet {
opt.maxThreads = opt.numThreads
if opt.numThreads <= numWorkers {
Expand All @@ -188,7 +211,7 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
return numWorkers, nil
}

if !numThreadsIsSet {
if !maxThreadsIsSet && !numThreadsIsSet {
if numWorkers >= maxProcs {
// Start at least as many threads as workers, and keep a free thread to handle requests in non-worker mode
opt.numThreads = numWorkers + 1
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type workerOpt struct {
name string
fileName string
num int
maxThreads int
env PreparedEnv
watch []string
maxConsecutiveFailures int
Expand Down Expand Up @@ -159,6 +160,15 @@ func WithWorkerEnv(env map[string]string) WorkerOption {
}
}

// WithWorkerMaxThreads sets the max number of threads for this specific worker
func WithWorkerMaxThreads(num int) WorkerOption {
return func(w *workerOpt) error {
w.maxThreads = num

return nil
}
}

// WithWorkerWatchMode sets directories to watch for file changes
func WithWorkerWatchMode(watch []string) WorkerOption {
return func(w *workerOpt) error {
Expand Down
19 changes: 19 additions & 0 deletions phpmainthread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,42 @@ func TestCorrectThreadCalculation(t *testing.T) {
testThreadCalculation(t, 2, -1, &opt{maxThreads: -1, workers: oneWorkerThread})
testThreadCalculation(t, 2, -1, &opt{numThreads: 2, maxThreads: -1})

// max_threads should be thread minimum + sum of worker max_threads
testThreadCalculation(t, 2, 6, &opt{workers: []workerOpt{{num: 1, maxThreads: 5}}})
testThreadCalculation(t, 6, 9, &opt{workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 4, maxThreads: 4}}})
testThreadCalculation(t, 10, 14, &opt{numThreads: 10, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 3, maxThreads: 4}}})

// max_threads should remain equal to overall max_threads
testThreadCalculation(t, 2, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 3}}})
testThreadCalculation(t, 3, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 1, maxThreads: 4}}})

// not enough num threads
testThreadCalculationError(t, &opt{numThreads: 1, workers: oneWorkerThread})
testThreadCalculationError(t, &opt{numThreads: 1, maxThreads: 1, workers: oneWorkerThread})

// not enough max_threads
testThreadCalculationError(t, &opt{numThreads: 2, maxThreads: 1})
testThreadCalculationError(t, &opt{maxThreads: 1, workers: oneWorkerThread})

// worker max_threads is bigger than overall max_threads
testThreadCalculationError(t, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 10}}})

// worker max_threads is smaller than num_threads
testThreadCalculationError(t, &opt{workers: []workerOpt{{num: 3, maxThreads: 2}}})
}

func testThreadCalculation(t *testing.T, expectedNumThreads int, expectedMaxThreads int, o *opt) {
t.Helper()

_, err := calculateMaxThreads(o)
assert.NoError(t, err, "no error should be returned")
assert.Equal(t, expectedNumThreads, o.numThreads, "num_threads must be correct")
assert.Equal(t, expectedMaxThreads, o.maxThreads, "max_threads must be correct")
}

func testThreadCalculationError(t *testing.T, o *opt) {
t.Helper()

_, err := calculateMaxThreads(o)
assert.Error(t, err, "configuration must error")
}
16 changes: 13 additions & 3 deletions scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,21 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext,
}

// if the request has been stalled long enough, scale
if fc.worker != nil {
scaleWorkerThread(fc.worker)
} else {
if fc.worker == nil {
scaleRegularThread()
continue
}

// check for max worker threads here again in case requests overflowed while waiting
if fc.worker.isAtThreadLimit() {
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "cannot scale worker thread, max threads reached for worker", slog.String("worker", fc.worker.name))
}

continue
}

scaleWorkerThread(fc.worker)
case <-done:
return
}
Expand Down
22 changes: 21 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type worker struct {
name string
fileName string
num int
maxThreads int
env PreparedEnv
requestChan chan contextHolder
threads []*phpThread
Expand Down Expand Up @@ -127,6 +128,7 @@ func newWorker(o workerOpt) (*worker, error) {
name: o.name,
fileName: absFileName,
num: o.num,
maxThreads: o.maxThreads,
env: o.env,
requestChan: make(chan contextHolder),
threads: make([]*phpThread, 0, o.num),
Expand Down Expand Up @@ -228,6 +230,19 @@ func (worker *worker) countThreads() int {
return l
}

// check if max_threads has been reached
func (worker *worker) isAtThreadLimit() bool {
if worker.maxThreads <= 0 {
return false
}

worker.threadMutex.RLock()
atMaxThreads := len(worker.threads) >= worker.maxThreads
worker.threadMutex.RUnlock()

return atMaxThreads
}

func (worker *worker) handleRequest(ch contextHolder) error {
metrics.StartWorkerRequest(worker.name)

Expand All @@ -250,14 +265,19 @@ func (worker *worker) handleRequest(ch contextHolder) error {
// if no thread was available, mark the request as queued and apply the scaling strategy
metrics.QueuedWorkerRequest(worker.name)
for {
workerScaleChan := scaleChan
if worker.isAtThreadLimit() {
workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling
}

select {
case worker.requestChan <- ch:
metrics.DequeuedWorkerRequest(worker.name)
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))

return nil
case scaleChan <- ch.frankenPHPContext:
case workerScaleChan <- ch.frankenPHPContext:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
Expand Down
Loading