From ca19712f87441656b913f99bf65fd719801ff2aa Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 1 Nov 2025 21:33:33 +0100 Subject: [PATCH 01/20] Cleanup. --- backoff.go | 51 ---------------------------- backoff_test.go | 41 ---------------------- cgi.go | 6 ++-- env.go | 5 ++- frankenphp.c | 14 ++++---- frankenphp.h | 10 +----- internal/backoff/backoff.go | 58 ++++++++++++++++++++++++++++++++ internal/backoff/backoff_test.go | 41 ++++++++++++++++++++++ phpthread.go | 4 +++ threadregular.go | 1 + threadworker.go | 23 +++++++------ 11 files changed, 128 insertions(+), 126 deletions(-) delete mode 100644 backoff.go delete mode 100644 backoff_test.go create mode 100644 internal/backoff/backoff.go create mode 100644 internal/backoff/backoff_test.go diff --git a/backoff.go b/backoff.go deleted file mode 100644 index a4bce80fa4..0000000000 --- a/backoff.go +++ /dev/null @@ -1,51 +0,0 @@ -package frankenphp - -import ( - "sync" - "time" -) - -type exponentialBackoff struct { - backoff time.Duration - failureCount int - mu sync.RWMutex - maxBackoff time.Duration - minBackoff time.Duration - maxConsecutiveFailures int -} - -// recordSuccess resets the backoff and failureCount -func (e *exponentialBackoff) recordSuccess() { - e.mu.Lock() - e.failureCount = 0 - e.backoff = e.minBackoff - e.mu.Unlock() -} - -// recordFailure increments the failure count and increases the backoff, it returns true if maxConsecutiveFailures has been reached -func (e *exponentialBackoff) recordFailure() bool { - e.mu.Lock() - e.failureCount += 1 - if e.backoff < e.minBackoff { - e.backoff = e.minBackoff - } - - e.backoff = min(e.backoff*2, e.maxBackoff) - - e.mu.Unlock() - return e.maxConsecutiveFailures != -1 && e.failureCount >= e.maxConsecutiveFailures -} - -// wait sleeps for the backoff duration if failureCount is non-zero. -// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple) -func (e *exponentialBackoff) wait() { - e.mu.RLock() - if e.failureCount == 0 { - e.mu.RUnlock() - - return - } - e.mu.RUnlock() - - time.Sleep(e.backoff) -} diff --git a/backoff_test.go b/backoff_test.go deleted file mode 100644 index 5ced2e4cd5..0000000000 --- a/backoff_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package frankenphp - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestExponentialBackoff_Reset(t *testing.T) { - e := &exponentialBackoff{ - maxBackoff: 5 * time.Second, - minBackoff: 500 * time.Millisecond, - maxConsecutiveFailures: 3, - } - - assert.False(t, e.recordFailure()) - assert.False(t, e.recordFailure()) - e.recordSuccess() - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0") - assert.Equal(t, e.backoff, e.minBackoff, "expected backoff to be reset to minBackoff") -} - -func TestExponentialBackoff_Trigger(t *testing.T) { - e := &exponentialBackoff{ - maxBackoff: 500 * 3 * time.Millisecond, - minBackoff: 500 * time.Millisecond, - maxConsecutiveFailures: 3, - } - - assert.False(t, e.recordFailure()) - assert.False(t, e.recordFailure()) - assert.True(t, e.recordFailure()) - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, e.failureCount, e.maxConsecutiveFailures, "expected failureCount to be maxConsecutiveFailures") - assert.Equal(t, e.backoff, e.maxBackoff, "expected backoff to be maxBackoff") -} diff --git a/cgi.go b/cgi.go index 4c11a285ab..6c36cf6986 100644 --- a/cgi.go +++ b/cgi.go @@ -277,13 +277,13 @@ func splitPos(path string, splitPath []string) int { // See: https://github.com/php/php-src/blob/345e04b619c3bc11ea17ee02cdecad6ae8ce5891/main/SAPI.h#L72 // //export go_update_request_info -func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) C.bool { +func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) { thread := phpThreads[threadIndex] fc := thread.getRequestContext() request := fc.request if request == nil { - return C.bool(fc.worker != nil) + return } authUser, authPassword, ok := request.BasicAuth() @@ -311,8 +311,6 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) info.request_uri = thread.pinCString(request.URL.RequestURI()) info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor) - - return C.bool(fc.worker != nil) } // SanitizedPathJoin performs filepath.Join(root, reqPath) that diff --git a/env.go b/env.go index 9e6fbfdfe5..3ac9a3adf6 100644 --- a/env.go +++ b/env.go @@ -1,10 +1,9 @@ package frankenphp // #cgo nocallback frankenphp_init_persistent_string -// #cgo nocallback frankenphp_add_assoc_str_ex // #cgo noescape frankenphp_init_persistent_string -// #cgo noescape frankenphp_add_assoc_str_ex // #include "frankenphp.h" +// #include import "C" import ( "os" @@ -98,7 +97,7 @@ func go_getfullenv(threadIndex C.uintptr_t, trackVarsArray *C.zval) { env := getSandboxedEnv(thread) for key, val := range env { - C.frankenphp_add_assoc_str_ex(trackVarsArray, toUnsafeChar(key), C.size_t(len(key)), val) + C.add_assoc_str_ex(trackVarsArray, toUnsafeChar(key), C.size_t(len(key)), val) } } diff --git a/frankenphp.c b/frankenphp.c index 3d124eced1..99eca7703f 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -51,7 +51,6 @@ frankenphp_version frankenphp_get_version() { frankenphp_config frankenphp_get_config() { return (frankenphp_config){ - frankenphp_get_version(), #ifdef ZTS true, #else @@ -75,6 +74,10 @@ __thread uintptr_t thread_index; __thread bool is_worker_thread = false; __thread zval *os_environment = NULL; +void frankenphp_update_local_thread_context(bool is_worker) { + is_worker_thread = is_worker; +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -82,7 +85,7 @@ static void frankenphp_update_request_context() { /* status It is not reset by zend engine, set it to 200. */ SG(sapi_headers).http_response_code = 200; - is_worker_thread = go_update_request_info(thread_index, &SG(request_info)); + go_update_request_info(thread_index, &SG(request_info)); } static void frankenphp_free_request_context() { @@ -206,11 +209,6 @@ PHPAPI void get_full_env(zval *track_vars_array) { go_getfullenv(thread_index, track_vars_array); } -void frankenphp_add_assoc_str_ex(zval *track_vars_array, char *key, - size_t keylen, zend_string *val) { - add_assoc_str_ex(track_vars_array, key, keylen, val); -} - /* Adapted from php_request_startup() */ static int frankenphp_worker_request_startup() { int retval = SUCCESS; @@ -610,7 +608,7 @@ static char *frankenphp_read_cookies(void) { } /* all variables with well defined keys can safely be registered like this */ -void frankenphp_register_trusted_var(zend_string *z_key, char *value, +static inline void frankenphp_register_trusted_var(zend_string *z_key, char *value, size_t val_len, HashTable *ht) { if (value == NULL) { zval empty; diff --git a/frankenphp.h b/frankenphp.h index c17df6061a..efbd5fc48f 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -23,12 +23,6 @@ typedef struct ht_key_value_pair { size_t val_len; } ht_key_value_pair; -typedef struct php_variable { - const char *var; - size_t data_len; - char *data; -} php_variable; - typedef struct frankenphp_version { unsigned char major_version; unsigned char minor_version; @@ -40,7 +34,6 @@ typedef struct frankenphp_version { frankenphp_version frankenphp_get_version(); typedef struct frankenphp_config { - frankenphp_version version; bool zts; bool zend_signals; bool zend_max_execution_timers; @@ -52,6 +45,7 @@ bool frankenphp_new_php_thread(uintptr_t thread_index); bool frankenphp_shutdown_dummy_request(void); int frankenphp_execute_script(char *file_name); +void frankenphp_update_local_thread_context(bool is_worker); int frankenphp_execute_script_cli(char *script, int argc, char **argv, bool eval); @@ -65,8 +59,6 @@ void frankenphp_register_variable_safe(char *key, char *var, size_t val_len, zend_string *frankenphp_init_persistent_string(const char *string, size_t len); int frankenphp_reset_opcache(void); int frankenphp_get_current_memory_limit(); -void frankenphp_add_assoc_str_ex(zval *track_vars_array, char *key, - size_t keylen, zend_string *val); void frankenphp_register_single(zend_string *z_key, char *value, size_t val_len, zval *track_vars_array); diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go new file mode 100644 index 0000000000..9139623879 --- /dev/null +++ b/internal/backoff/backoff.go @@ -0,0 +1,58 @@ +package backoff + +import ( + "sync" + "time" +) + +type ExponentialBackoff struct { + backoff time.Duration + failureCount int + mu sync.RWMutex + MaxBackoff time.Duration + MinBackoff time.Duration + MaxConsecutiveFailures int +} + +// recordSuccess resets the backoff and failureCount +func (e *ExponentialBackoff) RecordSuccess() { + e.mu.Lock() + e.failureCount = 0 + e.backoff = e.MinBackoff + e.mu.Unlock() +} + +// recordFailure increments the failure count and increases the backoff, it returns true if MaxConsecutiveFailures has been reached +func (e *ExponentialBackoff) RecordFailure() bool { + e.mu.Lock() + e.failureCount += 1 + if e.backoff < e.MinBackoff { + e.backoff = e.MinBackoff + } + + e.backoff = min(e.backoff*2, e.MaxBackoff) + + e.mu.Unlock() + return e.MaxConsecutiveFailures != -1 && e.failureCount >= e.MaxConsecutiveFailures +} + +// wait sleeps for the backoff duration if failureCount is non-zero. +// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple) +func (e *ExponentialBackoff) Wait() { + e.mu.RLock() + if e.failureCount == 0 { + e.mu.RUnlock() + + return + } + e.mu.RUnlock() + + time.Sleep(e.backoff) +} + +func (e *ExponentialBackoff) FailureCount() int { + e.mu.RLock() + defer e.mu.RUnlock() + + return e.failureCount +} \ No newline at end of file diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go new file mode 100644 index 0000000000..b82efc4bd4 --- /dev/null +++ b/internal/backoff/backoff_test.go @@ -0,0 +1,41 @@ +package backoff + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestExponentialBackoff_Reset(t *testing.T) { + e := &ExponentialBackoff{ + MaxBackoff: 5 * time.Second, + MinBackoff: 500 * time.Millisecond, + MaxConsecutiveFailures: 3, + } + + assert.False(t, e.RecordFailure()) + assert.False(t, e.RecordFailure()) + e.RecordSuccess() + + e.mu.RLock() + defer e.mu.RUnlock() + assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0") + assert.Equal(t, e.backoff, e.MinBackoff, "expected backoff to be reset to MinBackoff") +} + +func TestExponentialBackoff_Trigger(t *testing.T) { + e := &ExponentialBackoff{ + MaxBackoff: 500 * 3 * time.Millisecond, + MinBackoff: 500 * time.Millisecond, + MaxConsecutiveFailures: 3, + } + + assert.False(t, e.RecordFailure()) + assert.False(t, e.RecordFailure()) + assert.True(t, e.RecordFailure()) + + e.mu.RLock() + defer e.mu.RUnlock() + assert.Equal(t, e.failureCount, e.MaxConsecutiveFailures, "expected failureCount to be MaxConsecutiveFailures") + assert.Equal(t, e.backoff, e.MaxBackoff, "expected backoff to be MaxBackoff") +} diff --git a/phpthread.go b/phpthread.go index a60aa8f0aa..2691aa54bf 100644 --- a/phpthread.go +++ b/phpthread.go @@ -132,6 +132,10 @@ func (thread *phpThread) pinCString(s string) *C.char { return thread.pinString(s + "\x00") } +func (*phpThread) updateContext(isWorker bool) { + C.frankenphp_update_local_thread_context(C.bool(isWorker)) +} + //export go_frankenphp_before_script_execution func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { thread := phpThreads[threadIndex] diff --git a/threadregular.go b/threadregular.go index 88cef7e79d..a8ca03543c 100644 --- a/threadregular.go +++ b/threadregular.go @@ -34,6 +34,7 @@ func (handler *regularThread) beforeScriptExecution() string { detachRegularThread(handler.thread) return handler.thread.transitionToNewHandler() case stateTransitionComplete: + handler.thread.updateContext(false) handler.state.set(stateReady) return handler.waitForRequest() case stateReady: diff --git a/threadworker.go b/threadworker.go index 5a59f9278b..6d3ca6d2ce 100644 --- a/threadworker.go +++ b/threadworker.go @@ -9,6 +9,8 @@ import ( "path/filepath" "time" "unsafe" + + "github.com/dunglas/frankenphp/internal/backoff" ) // representation of a thread assigned to a worker script @@ -20,7 +22,7 @@ type workerThread struct { worker *worker dummyContext *frankenPHPContext workerContext *frankenPHPContext - backoff *exponentialBackoff + backoff *backoff.ExponentialBackoff isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet } @@ -29,10 +31,10 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { state: thread.state, thread: thread, worker: worker, - backoff: &exponentialBackoff{ - maxBackoff: 1 * time.Second, - minBackoff: 100 * time.Millisecond, - maxConsecutiveFailures: worker.maxConsecutiveFailures, + backoff: &backoff.ExponentialBackoff{ + MaxBackoff: 1 * time.Second, + MinBackoff: 100 * time.Millisecond, + MaxConsecutiveFailures: worker.maxConsecutiveFailures, }, }) worker.attachThread(thread) @@ -55,6 +57,7 @@ func (handler *workerThread) beforeScriptExecution() string { handler.state.waitFor(stateReady, stateShuttingDown) return handler.beforeScriptExecution() case stateReady, stateTransitionComplete: + handler.thread.updateContext(true) if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) } @@ -88,7 +91,7 @@ func (handler *workerThread) name() string { } func setupWorkerScript(handler *workerThread, worker *worker) { - handler.backoff.wait() + handler.backoff.Wait() metrics.StartWorker(worker.name) if handler.state.is(stateReady) { @@ -128,7 +131,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // on exit status 0 we just run the worker script again if exitStatus == 0 && !handler.isBootingScript { metrics.StopWorker(worker.name, StopReasonRestart) - handler.backoff.recordSuccess() + handler.backoff.RecordSuccess() logger.LogAttrs(ctx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) return @@ -147,12 +150,12 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { logger.LogAttrs(ctx, slog.LevelError, "worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) // panic after exponential backoff if the worker has never reached frankenphp_handle_request - if handler.backoff.recordFailure() { + if handler.backoff.RecordFailure() { if !watcherIsEnabled && !handler.state.is(stateReady) { - logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount)) + logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount())) panic("too many consecutive worker failures") } - logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount)) + logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount())) } } From 5b7fbab3b195153586eb2ed92b40b4a772b05f2f Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 1 Nov 2025 21:34:46 +0100 Subject: [PATCH 02/20] Formatting. --- frankenphp.c | 5 +++-- internal/backoff/backoff.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/frankenphp.c b/frankenphp.c index 99eca7703f..a2b1d36a69 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -608,8 +608,9 @@ static char *frankenphp_read_cookies(void) { } /* all variables with well defined keys can safely be registered like this */ -static inline void frankenphp_register_trusted_var(zend_string *z_key, char *value, - size_t val_len, HashTable *ht) { +static inline void frankenphp_register_trusted_var(zend_string *z_key, + char *value, size_t val_len, + HashTable *ht) { if (value == NULL) { zval empty; ZVAL_EMPTY_STRING(&empty); diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 9139623879..555eea99c2 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -55,4 +55,4 @@ func (e *ExponentialBackoff) FailureCount() int { defer e.mu.RUnlock() return e.failureCount -} \ No newline at end of file +} From 0cebb7479da06e0e1d2605cfb1e92fa0fa96909e Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 1 Nov 2025 22:05:21 +0100 Subject: [PATCH 03/20] Moves state to own module. --- debugstate.go | 14 ++- state.go => internal/state/state.go | 118 +++++++++--------- state_test.go => internal/state/state_test.go | 18 +-- phpmainthread.go | 25 ++-- phpmainthread_test.go | 7 +- phpthread.go | 34 ++--- scaling.go | 17 +-- scaling_test.go | 9 +- threadinactive.go | 22 ++-- threadregular.go | 22 ++-- threadtasks_test.go | 16 +-- threadworker.go | 35 +++--- worker.go | 9 +- 13 files changed, 185 insertions(+), 161 deletions(-) rename state.go => internal/state/state.go (54%) rename state_test.go => internal/state/state_test.go (74%) diff --git a/debugstate.go b/debugstate.go index a7941ac79c..d50f747040 100644 --- a/debugstate.go +++ b/debugstate.go @@ -1,5 +1,9 @@ package frankenphp +import ( + state "github.com/dunglas/frankenphp/internal/state" +) + // EXPERIMENTAL: ThreadDebugState prints the state of a single PHP thread - debugging purposes only type ThreadDebugState struct { Index int @@ -23,7 +27,7 @@ func DebugState() FrankenPHPDebugState { ReservedThreadCount: 0, } for _, thread := range phpThreads { - if thread.state.is(stateReserved) { + if thread.state.Is(state.StateReserved) { fullState.ReservedThreadCount++ continue } @@ -38,9 +42,9 @@ func threadDebugState(thread *phpThread) ThreadDebugState { return ThreadDebugState{ Index: thread.threadIndex, Name: thread.name(), - State: thread.state.name(), - IsWaiting: thread.state.isInWaitingState(), - IsBusy: !thread.state.isInWaitingState(), - WaitingSinceMilliseconds: thread.state.waitTime(), + State: thread.state.Name(), + IsWaiting: thread.state.IsInWaitingState(), + IsBusy: !thread.state.IsInWaitingState(), + WaitingSinceMilliseconds: thread.state.WaitTime(), } } diff --git a/state.go b/internal/state/state.go similarity index 54% rename from state.go rename to internal/state/state.go index 71970c8388..d7d811e4f5 100644 --- a/state.go +++ b/internal/state/state.go @@ -6,46 +6,46 @@ import ( "time" ) -type stateID uint8 +type StateID uint8 const ( - // livecycle states of a thread - stateReserved stateID = iota - stateBooting - stateBootRequested - stateShuttingDown - stateDone - - // these states are 'stable' and safe to transition from at any time - stateInactive - stateReady - - // states necessary for restarting workers - stateRestarting - stateYielding - - // states necessary for transitioning between different handlers - stateTransitionRequested - stateTransitionInProgress - stateTransitionComplete + // livecycle States of a thread + StateReserved StateID = iota + StateBooting + StateBootRequested + StateShuttingDown + StateDone + + // these States are 'stable' and safe to transition from at any time + StateInactive + StateReady + + // States necessary for restarting workers + StateRestarting + StateYielding + + // States necessary for transitioning between different handlers + StateTransitionRequested + StateTransitionInProgress + StateTransitionComplete ) -var stateNames = map[stateID]string{ - stateReserved: "reserved", - stateBooting: "booting", - stateInactive: "inactive", - stateReady: "ready", - stateShuttingDown: "shutting down", - stateDone: "done", - stateRestarting: "restarting", - stateYielding: "yielding", - stateTransitionRequested: "transition requested", - stateTransitionInProgress: "transition in progress", - stateTransitionComplete: "transition complete", -} - -type threadState struct { - currentState stateID +var stateNames = map[StateID]string{ + StateReserved: "reserved", + StateBooting: "booting", + StateInactive: "inactive", + StateReady: "ready", + StateShuttingDown: "shutting down", + StateDone: "done", + StateRestarting: "restarting", + StateYielding: "yielding", + StateTransitionRequested: "transition requested", + StateTransitionInProgress: "transition in progress", + StateTransitionComplete: "transition complete", +} + +type ThreadState struct { + currentState StateID mu sync.RWMutex subscribers []stateSubscriber // how long threads have been waiting in stable states @@ -54,19 +54,19 @@ type threadState struct { } type stateSubscriber struct { - states []stateID + states []StateID ch chan struct{} } -func newThreadState() *threadState { - return &threadState{ - currentState: stateReserved, +func NewThreadState() *ThreadState { + return &ThreadState{ + currentState: StateReserved, subscribers: []stateSubscriber{}, mu: sync.RWMutex{}, } } -func (ts *threadState) is(state stateID) bool { +func (ts *ThreadState) Is(state StateID) bool { ts.mu.RLock() ok := ts.currentState == state ts.mu.RUnlock() @@ -74,7 +74,7 @@ func (ts *threadState) is(state stateID) bool { return ok } -func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { +func (ts *ThreadState) CompareAndSwap(compareTo StateID, swapTo StateID) bool { ts.mu.Lock() ok := ts.currentState == compareTo if ok { @@ -86,11 +86,11 @@ func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { return ok } -func (ts *threadState) name() string { - return stateNames[ts.get()] +func (ts *ThreadState) Name() string { + return stateNames[ts.Get()] } -func (ts *threadState) get() stateID { +func (ts *ThreadState) Get() StateID { ts.mu.RLock() id := ts.currentState ts.mu.RUnlock() @@ -98,14 +98,14 @@ func (ts *threadState) get() stateID { return id } -func (ts *threadState) set(nextState stateID) { +func (ts *ThreadState) Set(nextState StateID) { ts.mu.Lock() ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() } -func (ts *threadState) notifySubscribers(nextState stateID) { +func (ts *ThreadState) notifySubscribers(nextState StateID) { if len(ts.subscribers) == 0 { return } @@ -122,7 +122,7 @@ func (ts *threadState) notifySubscribers(nextState stateID) { } // block until the thread reaches a certain state -func (ts *threadState) waitFor(states ...stateID) { +func (ts *ThreadState) WaitFor(states ...StateID) { ts.mu.Lock() if slices.Contains(states, ts.currentState) { ts.mu.Unlock() @@ -138,15 +138,15 @@ func (ts *threadState) waitFor(states ...stateID) { } // safely request a state change from a different goroutine -func (ts *threadState) requestSafeStateChange(nextState stateID) bool { +func (ts *ThreadState) RequestSafeStateChange(nextState StateID) bool { ts.mu.Lock() switch ts.currentState { // disallow state changes if shutting down or done - case stateShuttingDown, stateDone, stateReserved: + case StateShuttingDown, StateDone, StateReserved: ts.mu.Unlock() return false // ready and inactive are safe states to transition from - case stateReady, stateInactive: + case StateReady, StateInactive: ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() @@ -155,12 +155,12 @@ func (ts *threadState) requestSafeStateChange(nextState stateID) bool { ts.mu.Unlock() // wait for the state to change to a safe state - ts.waitFor(stateReady, stateInactive, stateShuttingDown) - return ts.requestSafeStateChange(nextState) + ts.WaitFor(StateReady, StateInactive, StateShuttingDown) + return ts.RequestSafeStateChange(nextState) } // markAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown -func (ts *threadState) markAsWaiting(isWaiting bool) { +func (ts *ThreadState) MarkAsWaiting(isWaiting bool) { ts.mu.Lock() if isWaiting { ts.isWaiting = true @@ -172,7 +172,7 @@ func (ts *threadState) markAsWaiting(isWaiting bool) { } // isWaitingState returns true if a thread is waiting for a request or shutdown -func (ts *threadState) isInWaitingState() bool { +func (ts *ThreadState) IsInWaitingState() bool { ts.mu.RLock() isWaiting := ts.isWaiting ts.mu.RUnlock() @@ -180,7 +180,7 @@ func (ts *threadState) isInWaitingState() bool { } // waitTime returns the time since the thread is waiting in a stable state in ms -func (ts *threadState) waitTime() int64 { +func (ts *ThreadState) WaitTime() int64 { ts.mu.RLock() waitTime := int64(0) if ts.isWaiting { @@ -189,3 +189,9 @@ func (ts *threadState) waitTime() int64 { ts.mu.RUnlock() return waitTime } + +func (ts *ThreadState) SetWaitTime(t time.Time) { + ts.mu.Lock() + ts.waitingSince = t + ts.mu.Unlock() +} diff --git a/state_test.go b/internal/state/state_test.go similarity index 74% rename from state_test.go rename to internal/state/state_test.go index 7055d35f65..9a5e844153 100644 --- a/state_test.go +++ b/internal/state/state_test.go @@ -11,13 +11,13 @@ func Test2GoroutinesYieldToEachOtherViaStates(t *testing.T) { threadState := &threadState{currentState: stateBooting} go func() { - threadState.waitFor(stateInactive) + threadState.WaitFor(stateInactive) assert.True(t, threadState.is(stateInactive)) - threadState.set(stateReady) + threadstate.Set(stateReady) }() - threadState.set(stateInactive) - threadState.waitFor(stateReady) + threadstate.Set(stateInactive) + threadState.WaitFor(stateReady) assert.True(t, threadState.is(stateReady)) } @@ -25,16 +25,16 @@ func TestStateShouldHaveCorrectAmountOfSubscribers(t *testing.T) { threadState := &threadState{currentState: stateBooting} // 3 subscribers waiting for different states - go threadState.waitFor(stateInactive) - go threadState.waitFor(stateInactive, stateShuttingDown) - go threadState.waitFor(stateShuttingDown) + go threadState.WaitFor(stateInactive) + go threadState.WaitFor(stateInactive, StateShuttingDown) + go threadState.WaitFor(StateShuttingDown) assertNumberOfSubscribers(t, threadState, 3) - threadState.set(stateInactive) + threadstate.Set(stateInactive) assertNumberOfSubscribers(t, threadState, 1) - assert.True(t, threadState.compareAndSwap(stateInactive, stateShuttingDown)) + assert.True(t, threadstate.CompareAndSwap(stateInactive, stateShuttingDown)) assertNumberOfSubscribers(t, threadState, 0) } diff --git a/phpmainthread.go b/phpmainthread.go index 3154bb77ba..b8081afe42 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -15,12 +15,13 @@ import ( "github.com/dunglas/frankenphp/internal/memory" "github.com/dunglas/frankenphp/internal/phpheaders" + state "github.com/dunglas/frankenphp/internal/state" ) // represents the main PHP thread // the thread needs to keep running as long as all other threads are running type phpMainThread struct { - state *threadState + state *state.ThreadState done chan struct{} numThreads int maxThreads int @@ -40,7 +41,7 @@ var ( // and reserves a fixed number of possible PHP threads func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) { mainThread = &phpMainThread{ - state: newThreadState(), + state: state.NewThreadState(), done: make(chan struct{}), numThreads: numThreads, maxThreads: numMaxThreads, @@ -84,11 +85,11 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) func drainPHPThreads() { doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) - mainThread.state.set(stateShuttingDown) + mainThread.state.Set(state.StateShuttingDown) close(mainThread.done) for _, thread := range phpThreads { // shut down all reserved threads - if thread.state.compareAndSwap(stateReserved, stateDone) { + if thread.state.CompareAndSwap(state.StateReserved, state.StateDone) { doneWG.Done() continue } @@ -100,8 +101,8 @@ func drainPHPThreads() { } doneWG.Wait() - mainThread.state.set(stateDone) - mainThread.state.waitFor(stateReserved) + mainThread.state.Set(state.StateDone) + mainThread.state.WaitFor(state.StateReserved) phpThreads = nil } @@ -110,7 +111,7 @@ func (mainThread *phpMainThread) start() error { return ErrMainThreadCreation } - mainThread.state.waitFor(stateReady) + mainThread.state.WaitFor(state.StateReady) // cache common request headers as zend_strings (HTTP_ACCEPT, HTTP_USER_AGENT, etc.) mainThread.commonHeaders = make(map[string]*C.zend_string, len(phpheaders.CommonRequestHeaders)) @@ -129,13 +130,13 @@ func (mainThread *phpMainThread) start() error { func getInactivePHPThread() *phpThread { for _, thread := range phpThreads { - if thread.state.is(stateInactive) { + if thread.state.Is(state.StateInactive) { return thread } } for _, thread := range phpThreads { - if thread.state.compareAndSwap(stateReserved, stateBootRequested) { + if thread.state.CompareAndSwap(state.StateReserved, state.StateBootRequested) { thread.boot() return thread } @@ -151,8 +152,8 @@ func go_frankenphp_main_thread_is_ready() { mainThread.maxThreads = mainThread.numThreads } - mainThread.state.set(stateReady) - mainThread.state.waitFor(stateDone) + mainThread.state.Set(state.StateReady) + mainThread.state.WaitFor(state.StateDone) } // max_threads = auto @@ -176,7 +177,7 @@ func (mainThread *phpMainThread) setAutomaticMaxThreads() { //export go_frankenphp_shutdown_main_thread func go_frankenphp_shutdown_main_thread() { - mainThread.state.set(stateReserved) + mainThread.state.Set(state.StateReserved) } //export go_get_custom_php_ini diff --git a/phpmainthread_test.go b/phpmainthread_test.go index a49484b61f..75385e9f23 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/phpheaders" + state "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -25,7 +26,7 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { assert.Len(t, phpThreads, 1) assert.Equal(t, 0, phpThreads[0].threadIndex) - assert.True(t, phpThreads[0].state.is(stateInactive)) + assert.True(t, phpThreads[0].state.Is(state.StateInactive)) drainPHPThreads() assert.Nil(t, phpThreads) @@ -159,7 +160,7 @@ func TestFinishBootingAWorkerScript(t *testing.T) { // boot the worker worker := getDummyWorker("transition-worker-1.php") convertToWorkerThread(phpThreads[0], worker) - phpThreads[0].state.waitFor(stateReady) + phpThreads[0].state.WaitFor(state.StateReady) assert.NotNil(t, phpThreads[0].handler.(*workerThread).dummyContext) assert.Nil(t, phpThreads[0].handler.(*workerThread).workerContext) @@ -225,7 +226,7 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT convertToRegularThread, func(thread *phpThread) { thread.shutdown() }, func(thread *phpThread) { - if thread.state.is(stateReserved) { + if thread.state.Is(state.StateReserved) { thread.boot() } }, diff --git a/phpthread.go b/phpthread.go index 2691aa54bf..592e4f5756 100644 --- a/phpthread.go +++ b/phpthread.go @@ -9,6 +9,8 @@ import ( "runtime" "sync" "unsafe" + + state "github.com/dunglas/frankenphp/internal/state" ) // representation of the actual underlying PHP thread @@ -20,7 +22,7 @@ type phpThread struct { drainChan chan struct{} handlerMu sync.Mutex handler threadHandler - state *threadState + state *state.ThreadState sandboxedEnv map[string]*C.zend_string } @@ -36,16 +38,16 @@ func newPHPThread(threadIndex int) *phpThread { return &phpThread{ threadIndex: threadIndex, requestChan: make(chan *frankenPHPContext), - state: newThreadState(), + state: state.NewThreadState(), } } // boot starts the underlying PHP thread func (thread *phpThread) boot() { // thread must be in reserved state to boot - if !thread.state.compareAndSwap(stateReserved, stateBooting) && !thread.state.compareAndSwap(stateBootRequested, stateBooting) { - logger.Error("thread is not in reserved state: " + thread.state.name()) - panic("thread is not in reserved state: " + thread.state.name()) + if !thread.state.CompareAndSwap(state.StateReserved, state.StateBooting) && !thread.state.CompareAndSwap(state.StateBootRequested, state.StateBooting) { + logger.Error("thread is not in reserved state: " + thread.state.Name()) + panic("thread is not in reserved state: " + thread.state.Name()) } // boot threads as inactive @@ -60,22 +62,22 @@ func (thread *phpThread) boot() { panic("unable to create thread") } - thread.state.waitFor(stateInactive) + thread.state.WaitFor(state.StateInactive) } // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { - if !thread.state.requestSafeStateChange(stateShuttingDown) { + if !thread.state.RequestSafeStateChange(state.StateShuttingDown) { // already shutting down or done return } close(thread.drainChan) - thread.state.waitFor(stateDone) + thread.state.WaitFor(state.StateDone) thread.drainChan = make(chan struct{}) // threads go back to the reserved state from which they can be booted again - if mainThread.state.is(stateReady) { - thread.state.set(stateReserved) + if mainThread.state.Is(state.StateReady) { + thread.state.Set(state.StateReserved) } } @@ -84,22 +86,22 @@ func (thread *phpThread) shutdown() { func (thread *phpThread) setHandler(handler threadHandler) { thread.handlerMu.Lock() defer thread.handlerMu.Unlock() - if !thread.state.requestSafeStateChange(stateTransitionRequested) { + if !thread.state.RequestSafeStateChange(state.StateTransitionRequested) { // no state change allowed == shutdown or done return } close(thread.drainChan) - thread.state.waitFor(stateTransitionInProgress) + thread.state.WaitFor(state.StateTransitionInProgress) thread.handler = handler thread.drainChan = make(chan struct{}) - thread.state.set(stateTransitionComplete) + thread.state.Set(state.StateTransitionComplete) } // transition to a new handler safely // is triggered by setHandler and executed on the PHP thread func (thread *phpThread) transitionToNewHandler() string { - thread.state.set(stateTransitionInProgress) - thread.state.waitFor(stateTransitionComplete) + thread.state.Set(state.StateTransitionInProgress) + thread.state.WaitFor(state.StateTransitionComplete) // execute beforeScriptExecution of the new handler return thread.handler.beforeScriptExecution() } @@ -166,5 +168,5 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.set(stateDone) + thread.state.Set(state.StateDone) } diff --git a/scaling.go b/scaling.go index 57e6c598b9..6c70ccc41d 100644 --- a/scaling.go +++ b/scaling.go @@ -11,6 +11,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/cpu" + state "github.com/dunglas/frankenphp/internal/state" ) const ( @@ -64,7 +65,7 @@ func addRegularThread() (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToRegularThread(thread) - thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + thread.state.WaitFor(state.StateReady, state.StateShuttingDown, state.StateReserved) return thread, nil } @@ -74,7 +75,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToWorkerThread(thread, worker) - thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + thread.state.WaitFor(state.StateReady, state.StateShuttingDown, state.StateReserved) return thread, nil } @@ -83,7 +84,7 @@ func scaleWorkerThread(worker *worker) { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.is(stateReady) { + if !mainThread.state.Is(state.StateReady) { return } @@ -108,7 +109,7 @@ func scaleRegularThread() { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.is(stateReady) { + if !mainThread.state.Is(state.StateReady) { return } @@ -189,18 +190,18 @@ func deactivateThreads() { thread := autoScaledThreads[i] // the thread might have been stopped otherwise, remove it - if thread.state.is(stateReserved) { + if thread.state.Is(state.StateReserved) { autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) continue } - waitTime := thread.state.waitTime() + waitTime := thread.state.WaitTime() if stoppedThreadCount > maxTerminationCount || waitTime == 0 { continue } // convert threads to inactive if they have been idle for too long - if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() { + if thread.state.Is(state.StateReady) && waitTime > maxThreadIdleTime.Milliseconds() { convertToInactiveThread(thread) stoppedThreadCount++ autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) @@ -212,7 +213,7 @@ func deactivateThreads() { // TODO: Completely stopping threads is more memory efficient // Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory) // Reactivate this if there is a better solution or workaround - // if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() { + // if thread.state.Is(state.StateInactive) && waitTime > maxThreadIdleTime.Milliseconds() { // logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("thread", thread.threadIndex)) // thread.shutdown() // stoppedThreadCount++ diff --git a/scaling_test.go b/scaling_test.go index 89e04b51ee..f64c0f60c8 100644 --- a/scaling_test.go +++ b/scaling_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + state "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -20,7 +21,7 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) { // scale up scaleRegularThread() - assert.Equal(t, stateReady, autoScaledThread.state.get()) + assert.Equal(t, state.StateReady, autoScaledThread.state.Get()) assert.IsType(t, ®ularThread{}, autoScaledThread.handler) // on down-scale, the thread will be marked as inactive @@ -49,7 +50,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) { // scale up scaleWorkerThread(getWorkerByPath(workerPath)) - assert.Equal(t, stateReady, autoScaledThread.state.get()) + assert.Equal(t, state.StateReady, autoScaledThread.state.Get()) // on down-scale, the thread will be marked as inactive setLongWaitTime(autoScaledThread) @@ -60,7 +61,5 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) { } func setLongWaitTime(thread *phpThread) { - thread.state.mu.Lock() - thread.state.waitingSince = time.Now().Add(-time.Hour) - thread.state.mu.Unlock() + thread.state.SetWaitTime(time.Now().Add(-time.Hour)) } diff --git a/threadinactive.go b/threadinactive.go index 912d339fee..42ac57a248 100644 --- a/threadinactive.go +++ b/threadinactive.go @@ -1,5 +1,9 @@ package frankenphp +import ( + state "github.com/dunglas/frankenphp/internal/state" +) + // representation of a thread with no work assigned to it // implements the threadHandler interface // each inactive thread weighs around ~350KB @@ -15,22 +19,22 @@ func convertToInactiveThread(thread *phpThread) { func (handler *inactiveThread) beforeScriptExecution() string { thread := handler.thread - switch thread.state.get() { - case stateTransitionRequested: + switch thread.state.Get() { + case state.StateTransitionRequested: return thread.transitionToNewHandler() - case stateBooting, stateTransitionComplete: - thread.state.set(stateInactive) + case state.StateBooting, state.StateTransitionComplete: + thread.state.Set(state.StateInactive) // wait for external signal to start or shut down - thread.state.markAsWaiting(true) - thread.state.waitFor(stateTransitionRequested, stateShuttingDown) - thread.state.markAsWaiting(false) + thread.state.MarkAsWaiting(true) + thread.state.WaitFor(state.StateTransitionRequested, state.StateShuttingDown) + thread.state.MarkAsWaiting(false) return handler.beforeScriptExecution() - case stateShuttingDown: + case state.StateShuttingDown: // signal to stop return "" } - panic("unexpected state: " + thread.state.name()) + panic("unexpected state: " + thread.state.Name()) } func (handler *inactiveThread) afterScriptExecution(int) { diff --git a/threadregular.go b/threadregular.go index a8ca03543c..bbcba0e0a1 100644 --- a/threadregular.go +++ b/threadregular.go @@ -2,13 +2,15 @@ package frankenphp import ( "sync" + + state "github.com/dunglas/frankenphp/internal/state" ) // representation of a non-worker PHP thread // executes PHP scripts in a web context // implements the threadHandler interface type regularThread struct { - state *threadState + state *state.ThreadState thread *phpThread requestContext *frankenPHPContext } @@ -29,22 +31,22 @@ func convertToRegularThread(thread *phpThread) { // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *regularThread) beforeScriptExecution() string { - switch handler.state.get() { - case stateTransitionRequested: + switch handler.state.Get() { + case state.StateTransitionRequested: detachRegularThread(handler.thread) return handler.thread.transitionToNewHandler() - case stateTransitionComplete: + case state.StateTransitionComplete: handler.thread.updateContext(false) - handler.state.set(stateReady) + handler.state.Set(state.StateReady) return handler.waitForRequest() - case stateReady: + case state.StateReady: return handler.waitForRequest() - case stateShuttingDown: + case state.StateShuttingDown: detachRegularThread(handler.thread) // signal to stop return "" } - panic("unexpected state: " + handler.state.name()) + panic("unexpected state: " + handler.state.Name()) } func (handler *regularThread) afterScriptExecution(int) { @@ -63,7 +65,7 @@ func (handler *regularThread) waitForRequest() string { // clear any previously sandboxed env clearSandboxedEnv(handler.thread) - handler.state.markAsWaiting(true) + handler.state.MarkAsWaiting(true) var fc *frankenPHPContext select { @@ -74,7 +76,7 @@ func (handler *regularThread) waitForRequest() string { } handler.requestContext = fc - handler.state.markAsWaiting(false) + handler.state.MarkAsWaiting(false) // set the scriptFilename that should be executed return fc.scriptFilename diff --git a/threadtasks_test.go b/threadtasks_test.go index d81c555350..e774a41b12 100644 --- a/threadtasks_test.go +++ b/threadtasks_test.go @@ -2,6 +2,8 @@ package frankenphp import ( "sync" + + state "github.com/dunglas/frankenphp/internal/state" ) // representation of a thread that handles tasks directly assigned by go @@ -41,23 +43,23 @@ func convertToTaskThread(thread *phpThread) *taskThread { func (handler *taskThread) beforeScriptExecution() string { thread := handler.thread - switch thread.state.get() { - case stateTransitionRequested: + switch thread.state.Get() { + case state.StateTransitionRequested: return thread.transitionToNewHandler() - case stateBooting, stateTransitionComplete: - thread.state.set(stateReady) + case state.StateBooting, state.StateTransitionComplete: + thread.state.Set(state.StateReady) handler.waitForTasks() return handler.beforeScriptExecution() - case stateReady: + case state.StateReady: handler.waitForTasks() return handler.beforeScriptExecution() - case stateShuttingDown: + case state.StateShuttingDown: // signal to stop return "" } - panic("unexpected state: " + thread.state.name()) + panic("unexpected state: " + thread.state.Name()) } func (handler *taskThread) afterScriptExecution(int) { diff --git a/threadworker.go b/threadworker.go index 6d3ca6d2ce..fc70d7f452 100644 --- a/threadworker.go +++ b/threadworker.go @@ -11,13 +11,14 @@ import ( "unsafe" "github.com/dunglas/frankenphp/internal/backoff" + state "github.com/dunglas/frankenphp/internal/state" ) // representation of a thread assigned to a worker script // executes the PHP worker script in a loop // implements the threadHandler interface type workerThread struct { - state *threadState + state *state.ThreadState thread *phpThread worker *worker dummyContext *frankenPHPContext @@ -42,28 +43,28 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *workerThread) beforeScriptExecution() string { - switch handler.state.get() { - case stateTransitionRequested: + switch handler.state.Get() { + case state.StateTransitionRequested: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) return handler.thread.transitionToNewHandler() - case stateRestarting: + case state.StateRestarting: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } - handler.state.set(stateYielding) - handler.state.waitFor(stateReady, stateShuttingDown) + handler.state.Set(state.StateYielding) + handler.state.WaitFor(state.StateReady, state.StateShuttingDown) return handler.beforeScriptExecution() - case stateReady, stateTransitionComplete: + case state.StateReady, state.StateTransitionComplete: handler.thread.updateContext(true) if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) } setupWorkerScript(handler, handler.worker) return handler.worker.fileName - case stateShuttingDown: + case state.StateShuttingDown: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } @@ -71,7 +72,7 @@ func (handler *workerThread) beforeScriptExecution() string { // signal to stop return "" } - panic("unexpected state: " + handler.state.name()) + panic("unexpected state: " + handler.state.Name()) } func (handler *workerThread) afterScriptExecution(exitStatus int) { @@ -94,7 +95,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { handler.backoff.Wait() metrics.StartWorker(worker.name) - if handler.state.is(stateReady) { + if handler.state.Is(state.StateReady) { metrics.ReadyWorker(handler.worker.name) } @@ -151,7 +152,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // panic after exponential backoff if the worker has never reached frankenphp_handle_request if handler.backoff.RecordFailure() { - if !watcherIsEnabled && !handler.state.is(stateReady) { + if !watcherIsEnabled && !handler.state.Is(state.StateReady) { logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount())) panic("too many consecutive worker failures") } @@ -176,14 +177,14 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { } // worker threads are 'ready' after they first reach frankenphp_handle_request() - // 'stateTransitionComplete' is only true on the first boot of the worker script, + // 'state.StateTransitionComplete' is only true on the first boot of the worker script, // while 'isBootingScript' is true on every boot of the worker script - if handler.state.is(stateTransitionComplete) { + if handler.state.Is(state.StateTransitionComplete) { metrics.ReadyWorker(handler.worker.name) - handler.state.set(stateReady) + handler.state.Set(state.StateReady) } - handler.state.markAsWaiting(true) + handler.state.MarkAsWaiting(true) var fc *frankenPHPContext select { @@ -192,7 +193,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' - if handler.state.is(stateRestarting) { + if handler.state.Is(state.StateRestarting) { C.frankenphp_reset_opcache() } @@ -202,7 +203,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { } handler.workerContext = fc - handler.state.markAsWaiting(false) + handler.state.MarkAsWaiting(false) if fc.request == nil { logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) diff --git a/worker.go b/worker.go index ee468bd6cd..f44a154e46 100644 --- a/worker.go +++ b/worker.go @@ -9,6 +9,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/fastabs" + state "github.com/dunglas/frankenphp/internal/state" "github.com/dunglas/frankenphp/internal/watcher" ) @@ -52,7 +53,7 @@ func initWorkers(opt []workerOpt) error { thread := getInactivePHPThread() convertToWorkerThread(thread, w) go func() { - thread.state.waitFor(stateReady) + thread.state.WaitFor(state.StateReady) workersReady.Done() }() } @@ -146,7 +147,7 @@ func drainWorkerThreads() []*phpThread { worker.threadMutex.RLock() ready.Add(len(worker.threads)) for _, thread := range worker.threads { - if !thread.state.requestSafeStateChange(stateRestarting) { + if !thread.state.RequestSafeStateChange(state.StateRestarting) { ready.Done() // no state change allowed == thread is shutting down // we'll proceed to restart all other threads anyways @@ -155,7 +156,7 @@ func drainWorkerThreads() []*phpThread { close(thread.drainChan) drainedThreads = append(drainedThreads, thread) go func(thread *phpThread) { - thread.state.waitFor(stateYielding) + thread.state.WaitFor(state.StateYielding) ready.Done() }(thread) } @@ -182,7 +183,7 @@ func RestartWorkers() { for _, thread := range threadsToRestart { thread.drainChan = make(chan struct{}) - thread.state.set(stateReady) + thread.state.Set(state.StateReady) } } From 6dc34328ba8686116383a89af481f2d35ec27d20 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 1 Nov 2025 22:13:02 +0100 Subject: [PATCH 04/20] Refactoring. --- debugstate.go | 4 +-- internal/state/state.go | 56 ++++++++++++++++++------------------ internal/state/state_test.go | 31 ++++++++++---------- phpmainthread.go | 22 +++++++------- phpmainthread_test.go | 8 +++--- phpthread.go | 26 ++++++++--------- scaling.go | 16 +++++------ scaling_test.go | 6 ++-- threadinactive.go | 12 ++++---- threadregular.go | 12 ++++---- threadtasks_test.go | 12 ++++---- threadworker.go | 26 ++++++++--------- worker.go | 10 +++---- 13 files changed, 121 insertions(+), 120 deletions(-) diff --git a/debugstate.go b/debugstate.go index d50f747040..c18813ec1b 100644 --- a/debugstate.go +++ b/debugstate.go @@ -1,7 +1,7 @@ package frankenphp import ( - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // EXPERIMENTAL: ThreadDebugState prints the state of a single PHP thread - debugging purposes only @@ -27,7 +27,7 @@ func DebugState() FrankenPHPDebugState { ReservedThreadCount: 0, } for _, thread := range phpThreads { - if thread.state.Is(state.StateReserved) { + if thread.state.Is(state.Reserved) { fullState.ReservedThreadCount++ continue } diff --git a/internal/state/state.go b/internal/state/state.go index d7d811e4f5..33b365a672 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,4 +1,4 @@ -package frankenphp +package state import ( "slices" @@ -10,38 +10,38 @@ type StateID uint8 const ( // livecycle States of a thread - StateReserved StateID = iota - StateBooting - StateBootRequested - StateShuttingDown - StateDone + Reserved StateID = iota + Booting + BootRequested + ShuttingDown + Done // these States are 'stable' and safe to transition from at any time - StateInactive - StateReady + Inactive + Ready // States necessary for restarting workers - StateRestarting - StateYielding + Restarting + Yielding // States necessary for transitioning between different handlers - StateTransitionRequested - StateTransitionInProgress - StateTransitionComplete + TransitionRequested + TransitionInProgress + TransitionComplete ) var stateNames = map[StateID]string{ - StateReserved: "reserved", - StateBooting: "booting", - StateInactive: "inactive", - StateReady: "ready", - StateShuttingDown: "shutting down", - StateDone: "done", - StateRestarting: "restarting", - StateYielding: "yielding", - StateTransitionRequested: "transition requested", - StateTransitionInProgress: "transition in progress", - StateTransitionComplete: "transition complete", + Reserved: "reserved", + Booting: "booting", + Inactive: "inactive", + Ready: "ready", + ShuttingDown: "shutting down", + Done: "done", + Restarting: "restarting", + Yielding: "yielding", + TransitionRequested: "transition requested", + TransitionInProgress: "transition in progress", + TransitionComplete: "transition complete", } type ThreadState struct { @@ -60,7 +60,7 @@ type stateSubscriber struct { func NewThreadState() *ThreadState { return &ThreadState{ - currentState: StateReserved, + currentState: Reserved, subscribers: []stateSubscriber{}, mu: sync.RWMutex{}, } @@ -142,11 +142,11 @@ func (ts *ThreadState) RequestSafeStateChange(nextState StateID) bool { ts.mu.Lock() switch ts.currentState { // disallow state changes if shutting down or done - case StateShuttingDown, StateDone, StateReserved: + case ShuttingDown, Done, Reserved: ts.mu.Unlock() return false // ready and inactive are safe states to transition from - case StateReady, StateInactive: + case Ready, Inactive: ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() @@ -155,7 +155,7 @@ func (ts *ThreadState) RequestSafeStateChange(nextState StateID) bool { ts.mu.Unlock() // wait for the state to change to a safe state - ts.WaitFor(StateReady, StateInactive, StateShuttingDown) + ts.WaitFor(Ready, Inactive, ShuttingDown) return ts.RequestSafeStateChange(nextState) } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 9a5e844153..3da5266038 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -1,4 +1,4 @@ -package frankenphp +package state import ( "testing" @@ -8,37 +8,38 @@ import ( ) func Test2GoroutinesYieldToEachOtherViaStates(t *testing.T) { - threadState := &threadState{currentState: stateBooting} + threadState := &ThreadState{currentState: Booting} go func() { - threadState.WaitFor(stateInactive) - assert.True(t, threadState.is(stateInactive)) - threadstate.Set(stateReady) + threadState.WaitFor(Inactive) + assert.True(t, threadState.Is(Inactive)) + threadState.Set(Ready) }() - threadstate.Set(stateInactive) - threadState.WaitFor(stateReady) - assert.True(t, threadState.is(stateReady)) + threadState.Set(Inactive) + threadState.WaitFor(Ready) + assert.True(t, threadState.Is(Ready)) } func TestStateShouldHaveCorrectAmountOfSubscribers(t *testing.T) { - threadState := &threadState{currentState: stateBooting} + threadState := &ThreadState{currentState: Booting} // 3 subscribers waiting for different states - go threadState.WaitFor(stateInactive) - go threadState.WaitFor(stateInactive, StateShuttingDown) - go threadState.WaitFor(StateShuttingDown) + go threadState.WaitFor(Inactive) + go threadState.WaitFor(Inactive, ShuttingDown) + go threadState.WaitFor(ShuttingDown) assertNumberOfSubscribers(t, threadState, 3) - threadstate.Set(stateInactive) + threadState.Set(Inactive) assertNumberOfSubscribers(t, threadState, 1) - assert.True(t, threadstate.CompareAndSwap(stateInactive, stateShuttingDown)) + assert.True(t, threadState.CompareAndSwap(Inactive, ShuttingDown)) assertNumberOfSubscribers(t, threadState, 0) } -func assertNumberOfSubscribers(t *testing.T, threadState *threadState, expected int) { +func assertNumberOfSubscribers(t *testing.T, threadState *ThreadState, expected int) { + t.Helper() for range 10_000 { // wait for 1 second max time.Sleep(100 * time.Microsecond) threadState.mu.RLock() diff --git a/phpmainthread.go b/phpmainthread.go index b8081afe42..a2299a7092 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -15,7 +15,7 @@ import ( "github.com/dunglas/frankenphp/internal/memory" "github.com/dunglas/frankenphp/internal/phpheaders" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // represents the main PHP thread @@ -85,11 +85,11 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) func drainPHPThreads() { doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) - mainThread.state.Set(state.StateShuttingDown) + mainThread.state.Set(state.ShuttingDown) close(mainThread.done) for _, thread := range phpThreads { // shut down all reserved threads - if thread.state.CompareAndSwap(state.StateReserved, state.StateDone) { + if thread.state.CompareAndSwap(state.Reserved, state.Done) { doneWG.Done() continue } @@ -101,8 +101,8 @@ func drainPHPThreads() { } doneWG.Wait() - mainThread.state.Set(state.StateDone) - mainThread.state.WaitFor(state.StateReserved) + mainThread.state.Set(state.Done) + mainThread.state.WaitFor(state.Reserved) phpThreads = nil } @@ -111,7 +111,7 @@ func (mainThread *phpMainThread) start() error { return ErrMainThreadCreation } - mainThread.state.WaitFor(state.StateReady) + mainThread.state.WaitFor(state.Ready) // cache common request headers as zend_strings (HTTP_ACCEPT, HTTP_USER_AGENT, etc.) mainThread.commonHeaders = make(map[string]*C.zend_string, len(phpheaders.CommonRequestHeaders)) @@ -130,13 +130,13 @@ func (mainThread *phpMainThread) start() error { func getInactivePHPThread() *phpThread { for _, thread := range phpThreads { - if thread.state.Is(state.StateInactive) { + if thread.state.Is(state.Inactive) { return thread } } for _, thread := range phpThreads { - if thread.state.CompareAndSwap(state.StateReserved, state.StateBootRequested) { + if thread.state.CompareAndSwap(state.Reserved, state.BootRequested) { thread.boot() return thread } @@ -152,8 +152,8 @@ func go_frankenphp_main_thread_is_ready() { mainThread.maxThreads = mainThread.numThreads } - mainThread.state.Set(state.StateReady) - mainThread.state.WaitFor(state.StateDone) + mainThread.state.Set(state.Ready) + mainThread.state.WaitFor(state.Done) } // max_threads = auto @@ -177,7 +177,7 @@ func (mainThread *phpMainThread) setAutomaticMaxThreads() { //export go_frankenphp_shutdown_main_thread func go_frankenphp_shutdown_main_thread() { - mainThread.state.Set(state.StateReserved) + mainThread.state.Set(state.Reserved) } //export go_get_custom_php_ini diff --git a/phpmainthread_test.go b/phpmainthread_test.go index 75385e9f23..3cb7de132e 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/phpheaders" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -26,7 +26,7 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { assert.Len(t, phpThreads, 1) assert.Equal(t, 0, phpThreads[0].threadIndex) - assert.True(t, phpThreads[0].state.Is(state.StateInactive)) + assert.True(t, phpThreads[0].state.Is(state.Inactive)) drainPHPThreads() assert.Nil(t, phpThreads) @@ -160,7 +160,7 @@ func TestFinishBootingAWorkerScript(t *testing.T) { // boot the worker worker := getDummyWorker("transition-worker-1.php") convertToWorkerThread(phpThreads[0], worker) - phpThreads[0].state.WaitFor(state.StateReady) + phpThreads[0].state.WaitFor(state.Ready) assert.NotNil(t, phpThreads[0].handler.(*workerThread).dummyContext) assert.Nil(t, phpThreads[0].handler.(*workerThread).workerContext) @@ -226,7 +226,7 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT convertToRegularThread, func(thread *phpThread) { thread.shutdown() }, func(thread *phpThread) { - if thread.state.Is(state.StateReserved) { + if thread.state.Is(state.Reserved) { thread.boot() } }, diff --git a/phpthread.go b/phpthread.go index 592e4f5756..7af9e13acc 100644 --- a/phpthread.go +++ b/phpthread.go @@ -10,7 +10,7 @@ import ( "sync" "unsafe" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // representation of the actual underlying PHP thread @@ -45,7 +45,7 @@ func newPHPThread(threadIndex int) *phpThread { // boot starts the underlying PHP thread func (thread *phpThread) boot() { // thread must be in reserved state to boot - if !thread.state.CompareAndSwap(state.StateReserved, state.StateBooting) && !thread.state.CompareAndSwap(state.StateBootRequested, state.StateBooting) { + if !thread.state.CompareAndSwap(state.Reserved, state.Booting) && !thread.state.CompareAndSwap(state.BootRequested, state.Booting) { logger.Error("thread is not in reserved state: " + thread.state.Name()) panic("thread is not in reserved state: " + thread.state.Name()) } @@ -62,22 +62,22 @@ func (thread *phpThread) boot() { panic("unable to create thread") } - thread.state.WaitFor(state.StateInactive) + thread.state.WaitFor(state.Inactive) } // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { - if !thread.state.RequestSafeStateChange(state.StateShuttingDown) { + if !thread.state.RequestSafeStateChange(state.ShuttingDown) { // already shutting down or done return } close(thread.drainChan) - thread.state.WaitFor(state.StateDone) + thread.state.WaitFor(state.Done) thread.drainChan = make(chan struct{}) // threads go back to the reserved state from which they can be booted again - if mainThread.state.Is(state.StateReady) { - thread.state.Set(state.StateReserved) + if mainThread.state.Is(state.Ready) { + thread.state.Set(state.Reserved) } } @@ -86,22 +86,22 @@ func (thread *phpThread) shutdown() { func (thread *phpThread) setHandler(handler threadHandler) { thread.handlerMu.Lock() defer thread.handlerMu.Unlock() - if !thread.state.RequestSafeStateChange(state.StateTransitionRequested) { + if !thread.state.RequestSafeStateChange(state.TransitionRequested) { // no state change allowed == shutdown or done return } close(thread.drainChan) - thread.state.WaitFor(state.StateTransitionInProgress) + thread.state.WaitFor(state.TransitionInProgress) thread.handler = handler thread.drainChan = make(chan struct{}) - thread.state.Set(state.StateTransitionComplete) + thread.state.Set(state.TransitionComplete) } // transition to a new handler safely // is triggered by setHandler and executed on the PHP thread func (thread *phpThread) transitionToNewHandler() string { - thread.state.Set(state.StateTransitionInProgress) - thread.state.WaitFor(state.StateTransitionComplete) + thread.state.Set(state.TransitionInProgress) + thread.state.WaitFor(state.TransitionComplete) // execute beforeScriptExecution of the new handler return thread.handler.beforeScriptExecution() } @@ -168,5 +168,5 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.Set(state.StateDone) + thread.state.Set(state.Done) } diff --git a/scaling.go b/scaling.go index 6c70ccc41d..a2c253063a 100644 --- a/scaling.go +++ b/scaling.go @@ -11,7 +11,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/cpu" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) const ( @@ -65,7 +65,7 @@ func addRegularThread() (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToRegularThread(thread) - thread.state.WaitFor(state.StateReady, state.StateShuttingDown, state.StateReserved) + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved) return thread, nil } @@ -75,7 +75,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToWorkerThread(thread, worker) - thread.state.WaitFor(state.StateReady, state.StateShuttingDown, state.StateReserved) + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved) return thread, nil } @@ -84,7 +84,7 @@ func scaleWorkerThread(worker *worker) { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.Is(state.StateReady) { + if !mainThread.state.Is(state.Ready) { return } @@ -109,7 +109,7 @@ func scaleRegularThread() { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.Is(state.StateReady) { + if !mainThread.state.Is(state.Ready) { return } @@ -190,7 +190,7 @@ func deactivateThreads() { thread := autoScaledThreads[i] // the thread might have been stopped otherwise, remove it - if thread.state.Is(state.StateReserved) { + if thread.state.Is(state.Reserved) { autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) continue } @@ -201,7 +201,7 @@ func deactivateThreads() { } // convert threads to inactive if they have been idle for too long - if thread.state.Is(state.StateReady) && waitTime > maxThreadIdleTime.Milliseconds() { + if thread.state.Is(state.Ready) && waitTime > maxThreadIdleTime.Milliseconds() { convertToInactiveThread(thread) stoppedThreadCount++ autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) @@ -213,7 +213,7 @@ func deactivateThreads() { // TODO: Completely stopping threads is more memory efficient // Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory) // Reactivate this if there is a better solution or workaround - // if thread.state.Is(state.StateInactive) && waitTime > maxThreadIdleTime.Milliseconds() { + // if thread.state.Is(state.Inactive) && waitTime > maxThreadIdleTime.Milliseconds() { // logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("thread", thread.threadIndex)) // thread.shutdown() // stoppedThreadCount++ diff --git a/scaling_test.go b/scaling_test.go index f64c0f60c8..da7a4d2992 100644 --- a/scaling_test.go +++ b/scaling_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -21,7 +21,7 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) { // scale up scaleRegularThread() - assert.Equal(t, state.StateReady, autoScaledThread.state.Get()) + assert.Equal(t, state.Ready, autoScaledThread.state.Get()) assert.IsType(t, ®ularThread{}, autoScaledThread.handler) // on down-scale, the thread will be marked as inactive @@ -50,7 +50,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) { // scale up scaleWorkerThread(getWorkerByPath(workerPath)) - assert.Equal(t, state.StateReady, autoScaledThread.state.Get()) + assert.Equal(t, state.Ready, autoScaledThread.state.Get()) // on down-scale, the thread will be marked as inactive setLongWaitTime(autoScaledThread) diff --git a/threadinactive.go b/threadinactive.go index 42ac57a248..f825c7a185 100644 --- a/threadinactive.go +++ b/threadinactive.go @@ -1,7 +1,7 @@ package frankenphp import ( - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // representation of a thread with no work assigned to it @@ -20,17 +20,17 @@ func (handler *inactiveThread) beforeScriptExecution() string { thread := handler.thread switch thread.state.Get() { - case state.StateTransitionRequested: + case state.TransitionRequested: return thread.transitionToNewHandler() - case state.StateBooting, state.StateTransitionComplete: - thread.state.Set(state.StateInactive) + case state.Booting, state.TransitionComplete: + thread.state.Set(state.Inactive) // wait for external signal to start or shut down thread.state.MarkAsWaiting(true) - thread.state.WaitFor(state.StateTransitionRequested, state.StateShuttingDown) + thread.state.WaitFor(state.TransitionRequested, state.ShuttingDown) thread.state.MarkAsWaiting(false) return handler.beforeScriptExecution() - case state.StateShuttingDown: + case state.ShuttingDown: // signal to stop return "" } diff --git a/threadregular.go b/threadregular.go index bbcba0e0a1..82e5ca3600 100644 --- a/threadregular.go +++ b/threadregular.go @@ -3,7 +3,7 @@ package frankenphp import ( "sync" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // representation of a non-worker PHP thread @@ -32,16 +32,16 @@ func convertToRegularThread(thread *phpThread) { // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *regularThread) beforeScriptExecution() string { switch handler.state.Get() { - case state.StateTransitionRequested: + case state.TransitionRequested: detachRegularThread(handler.thread) return handler.thread.transitionToNewHandler() - case state.StateTransitionComplete: + case state.TransitionComplete: handler.thread.updateContext(false) - handler.state.Set(state.StateReady) + handler.state.Set(state.Ready) return handler.waitForRequest() - case state.StateReady: + case state.Ready: return handler.waitForRequest() - case state.StateShuttingDown: + case state.ShuttingDown: detachRegularThread(handler.thread) // signal to stop return "" diff --git a/threadtasks_test.go b/threadtasks_test.go index e774a41b12..65e97bb6b4 100644 --- a/threadtasks_test.go +++ b/threadtasks_test.go @@ -3,7 +3,7 @@ package frankenphp import ( "sync" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // representation of a thread that handles tasks directly assigned by go @@ -44,18 +44,18 @@ func (handler *taskThread) beforeScriptExecution() string { thread := handler.thread switch thread.state.Get() { - case state.StateTransitionRequested: + case state.TransitionRequested: return thread.transitionToNewHandler() - case state.StateBooting, state.StateTransitionComplete: - thread.state.Set(state.StateReady) + case state.Booting, state.TransitionComplete: + thread.state.Set(state.Ready) handler.waitForTasks() return handler.beforeScriptExecution() - case state.StateReady: + case state.Ready: handler.waitForTasks() return handler.beforeScriptExecution() - case state.StateShuttingDown: + case state.ShuttingDown: // signal to stop return "" } diff --git a/threadworker.go b/threadworker.go index fc70d7f452..c50e917ddd 100644 --- a/threadworker.go +++ b/threadworker.go @@ -11,7 +11,7 @@ import ( "unsafe" "github.com/dunglas/frankenphp/internal/backoff" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" ) // representation of a thread assigned to a worker script @@ -44,27 +44,27 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *workerThread) beforeScriptExecution() string { switch handler.state.Get() { - case state.StateTransitionRequested: + case state.TransitionRequested: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) return handler.thread.transitionToNewHandler() - case state.StateRestarting: + case state.Restarting: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } - handler.state.Set(state.StateYielding) - handler.state.WaitFor(state.StateReady, state.StateShuttingDown) + handler.state.Set(state.Yielding) + handler.state.WaitFor(state.Ready, state.ShuttingDown) return handler.beforeScriptExecution() - case state.StateReady, state.StateTransitionComplete: + case state.Ready, state.TransitionComplete: handler.thread.updateContext(true) if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) } setupWorkerScript(handler, handler.worker) return handler.worker.fileName - case state.StateShuttingDown: + case state.ShuttingDown: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } @@ -95,7 +95,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { handler.backoff.Wait() metrics.StartWorker(worker.name) - if handler.state.Is(state.StateReady) { + if handler.state.Is(state.Ready) { metrics.ReadyWorker(handler.worker.name) } @@ -152,7 +152,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // panic after exponential backoff if the worker has never reached frankenphp_handle_request if handler.backoff.RecordFailure() { - if !watcherIsEnabled && !handler.state.Is(state.StateReady) { + if !watcherIsEnabled && !handler.state.Is(state.Ready) { logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount())) panic("too many consecutive worker failures") } @@ -177,11 +177,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { } // worker threads are 'ready' after they first reach frankenphp_handle_request() - // 'state.StateTransitionComplete' is only true on the first boot of the worker script, + // 'state.TransitionComplete' is only true on the first boot of the worker script, // while 'isBootingScript' is true on every boot of the worker script - if handler.state.Is(state.StateTransitionComplete) { + if handler.state.Is(state.TransitionComplete) { metrics.ReadyWorker(handler.worker.name) - handler.state.Set(state.StateReady) + handler.state.Set(state.Ready) } handler.state.MarkAsWaiting(true) @@ -193,7 +193,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' - if handler.state.Is(state.StateRestarting) { + if handler.state.Is(state.Restarting) { C.frankenphp_reset_opcache() } diff --git a/worker.go b/worker.go index f44a154e46..96ba45e338 100644 --- a/worker.go +++ b/worker.go @@ -9,7 +9,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/fastabs" - state "github.com/dunglas/frankenphp/internal/state" + "github.com/dunglas/frankenphp/internal/state" "github.com/dunglas/frankenphp/internal/watcher" ) @@ -53,7 +53,7 @@ func initWorkers(opt []workerOpt) error { thread := getInactivePHPThread() convertToWorkerThread(thread, w) go func() { - thread.state.WaitFor(state.StateReady) + thread.state.WaitFor(state.Ready) workersReady.Done() }() } @@ -147,7 +147,7 @@ func drainWorkerThreads() []*phpThread { worker.threadMutex.RLock() ready.Add(len(worker.threads)) for _, thread := range worker.threads { - if !thread.state.RequestSafeStateChange(state.StateRestarting) { + if !thread.state.RequestSafeStateChange(state.Restarting) { ready.Done() // no state change allowed == thread is shutting down // we'll proceed to restart all other threads anyways @@ -156,7 +156,7 @@ func drainWorkerThreads() []*phpThread { close(thread.drainChan) drainedThreads = append(drainedThreads, thread) go func(thread *phpThread) { - thread.state.WaitFor(state.StateYielding) + thread.state.WaitFor(state.Yielding) ready.Done() }(thread) } @@ -183,7 +183,7 @@ func RestartWorkers() { for _, thread := range threadsToRestart { thread.drainChan = make(chan struct{}) - thread.state.Set(state.StateReady) + thread.state.Set(state.Ready) } } From 7c8813ee6d39eeafbecaa07fae54fbb586faa61a Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 1 Nov 2025 22:37:35 +0100 Subject: [PATCH 05/20] Also moves php_headers test. --- internal/phpheaders/phpheaders_test.go | 22 ++++++++++++++++++++++ phpmainthread_test.go | 15 --------------- 2 files changed, 22 insertions(+), 15 deletions(-) create mode 100644 internal/phpheaders/phpheaders_test.go diff --git a/internal/phpheaders/phpheaders_test.go b/internal/phpheaders/phpheaders_test.go new file mode 100644 index 0000000000..a741ec38fc --- /dev/null +++ b/internal/phpheaders/phpheaders_test.go @@ -0,0 +1,22 @@ +package phpheaders + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllCommonHeadersAreCorrect(t *testing.T) { + fakeRequest := httptest.NewRequest("GET", "http://localhost", nil) + + for header, phpHeader := range CommonRequestHeaders { + // verify that common and uncommon headers return the same result + expectedPHPHeader := GetUnCommonHeader(header) + assert.Equal(t, phpHeader+"\x00", expectedPHPHeader, "header is not well formed: "+phpHeader) + + // net/http will capitalize lowercase headers, verify that headers are capitalized + fakeRequest.Header.Add(header, "foo") + assert.Contains(t, fakeRequest.Header, header, "header is not correctly capitalized: "+header) + } +} diff --git a/phpmainthread_test.go b/phpmainthread_test.go index 3cb7de132e..81d94c2d35 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/dunglas/frankenphp/internal/phpheaders" "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -237,20 +236,6 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT } } -func TestAllCommonHeadersAreCorrect(t *testing.T) { - fakeRequest := httptest.NewRequest("GET", "http://localhost", nil) - - for header, phpHeader := range phpheaders.CommonRequestHeaders { - // verify that common and uncommon headers return the same result - expectedPHPHeader := phpheaders.GetUnCommonHeader(header) - assert.Equal(t, phpHeader+"\x00", expectedPHPHeader, "header is not well formed: "+phpHeader) - - // net/http will capitalize lowercase headers, verify that headers are capitalized - fakeRequest.Header.Add(header, "foo") - assert.Contains(t, fakeRequest.Header, header, "header is not correctly capitalized: "+header) - } -} - func TestCorrectThreadCalculation(t *testing.T) { maxProcs := runtime.GOMAXPROCS(0) * 2 oneWorkerThread := []workerOpt{{num: 1}} From 1436802e7f49f04639e73a11dbca091c0dfb8c6d Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Thu, 6 Nov 2025 22:39:28 +0100 Subject: [PATCH 06/20] tests with -vet=off --- .github/workflows/docker.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 97bed1630a..a755bb9e55 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -213,7 +213,7 @@ jobs: run: | docker run --platform="${PLATFORM}" --rm \ "$(jq -r ".\"builder-${VARIANT}\".\"containerimage.config.digest\"" <<< "${METADATA}")" \ - sh -c "./go.sh test ${RACE} -v $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." + sh -c "./go.sh test ${RACE} -v -vet=off $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." env: METADATA: ${{ steps.build.outputs.metadata }} PLATFORM: ${{ matrix.platform }} From 7c79d7a463946560dc3a96db69a9b25e808fccb6 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Thu, 6 Nov 2025 22:51:41 +0100 Subject: [PATCH 07/20] tests with ./go.sh vet before. --- .github/workflows/docker.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index a755bb9e55..b32e45f5b1 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -213,7 +213,7 @@ jobs: run: | docker run --platform="${PLATFORM}" --rm \ "$(jq -r ".\"builder-${VARIANT}\".\"containerimage.config.digest\"" <<< "${METADATA}")" \ - sh -c "./go.sh test ${RACE} -v -vet=off $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." + sh -c "./go.sh vet ./... && ./go.sh test ${RACE} -v $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." env: METADATA: ${{ steps.build.outputs.metadata }} PLATFORM: ${{ matrix.platform }} From 87123bd4095cadfb6c4fd40c9ff9e115298790ed Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 10 Nov 2025 19:26:45 +0100 Subject: [PATCH 08/20] import C test. --- .github/workflows/docker.yaml | 2 +- internal/backoff/backoff.go | 1 + internal/phpheaders/phpheaders.go | 1 + internal/state/state.go | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index b32e45f5b1..97bed1630a 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -213,7 +213,7 @@ jobs: run: | docker run --platform="${PLATFORM}" --rm \ "$(jq -r ".\"builder-${VARIANT}\".\"containerimage.config.digest\"" <<< "${METADATA}")" \ - sh -c "./go.sh vet ./... && ./go.sh test ${RACE} -v $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." + sh -c "./go.sh test ${RACE} -v $(./go.sh list ./... | grep -v github.com/dunglas/frankenphp/internal/testext | grep -v github.com/dunglas/frankenphp/internal/extgen | tr '\n' ' ') && cd caddy && ../go.sh test ${RACE} -v ./..." env: METADATA: ${{ steps.build.outputs.metadata }} PLATFORM: ${{ matrix.platform }} diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 555eea99c2..73182f4a37 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -1,5 +1,6 @@ package backoff +import "C" import ( "sync" "time" diff --git a/internal/phpheaders/phpheaders.go b/internal/phpheaders/phpheaders.go index 64bc3a0832..5d7709baad 100644 --- a/internal/phpheaders/phpheaders.go +++ b/internal/phpheaders/phpheaders.go @@ -1,5 +1,6 @@ package phpheaders +import "C" import ( "strings" diff --git a/internal/state/state.go b/internal/state/state.go index 33b365a672..5c6d5dadc8 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,5 +1,6 @@ package state +import "C" import ( "slices" "sync" From 4161623736b02759f5a1867bc465a400e284dced Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Tue, 11 Nov 2025 21:07:50 +0100 Subject: [PATCH 09/20] adds turns state into string --- internal/state/state.go | 60 ++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/internal/state/state.go b/internal/state/state.go index 5c6d5dadc8..52c2d0b156 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -7,46 +7,32 @@ import ( "time" ) -type StateID uint8 +type State string const ( // livecycle States of a thread - Reserved StateID = iota - Booting - BootRequested - ShuttingDown - Done + Reserved State = "reserved" + Booting State = "booting" + BootRequested State = "boot requested" + ShuttingDown State = "shutting down" + Done State = "done" // these States are 'stable' and safe to transition from at any time - Inactive - Ready + Inactive State = "inactive" + Ready State = "ready" // States necessary for restarting workers - Restarting - Yielding + Restarting State = "restarting" + Yielding State = "yielding" // States necessary for transitioning between different handlers - TransitionRequested - TransitionInProgress - TransitionComplete + TransitionRequested State = "transition requested" + TransitionInProgress State = "transition in progress" + TransitionComplete State = "transition complete" ) -var stateNames = map[StateID]string{ - Reserved: "reserved", - Booting: "booting", - Inactive: "inactive", - Ready: "ready", - ShuttingDown: "shutting down", - Done: "done", - Restarting: "restarting", - Yielding: "yielding", - TransitionRequested: "transition requested", - TransitionInProgress: "transition in progress", - TransitionComplete: "transition complete", -} - type ThreadState struct { - currentState StateID + currentState State mu sync.RWMutex subscribers []stateSubscriber // how long threads have been waiting in stable states @@ -55,7 +41,7 @@ type ThreadState struct { } type stateSubscriber struct { - states []StateID + states []State ch chan struct{} } @@ -67,7 +53,7 @@ func NewThreadState() *ThreadState { } } -func (ts *ThreadState) Is(state StateID) bool { +func (ts *ThreadState) Is(state State) bool { ts.mu.RLock() ok := ts.currentState == state ts.mu.RUnlock() @@ -75,7 +61,7 @@ func (ts *ThreadState) Is(state StateID) bool { return ok } -func (ts *ThreadState) CompareAndSwap(compareTo StateID, swapTo StateID) bool { +func (ts *ThreadState) CompareAndSwap(compareTo State, swapTo State) bool { ts.mu.Lock() ok := ts.currentState == compareTo if ok { @@ -88,10 +74,10 @@ func (ts *ThreadState) CompareAndSwap(compareTo StateID, swapTo StateID) bool { } func (ts *ThreadState) Name() string { - return stateNames[ts.Get()] + return string(ts.Get()) } -func (ts *ThreadState) Get() StateID { +func (ts *ThreadState) Get() State { ts.mu.RLock() id := ts.currentState ts.mu.RUnlock() @@ -99,14 +85,14 @@ func (ts *ThreadState) Get() StateID { return id } -func (ts *ThreadState) Set(nextState StateID) { +func (ts *ThreadState) Set(nextState State) { ts.mu.Lock() ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() } -func (ts *ThreadState) notifySubscribers(nextState StateID) { +func (ts *ThreadState) notifySubscribers(nextState State) { if len(ts.subscribers) == 0 { return } @@ -123,7 +109,7 @@ func (ts *ThreadState) notifySubscribers(nextState StateID) { } // block until the thread reaches a certain state -func (ts *ThreadState) WaitFor(states ...StateID) { +func (ts *ThreadState) WaitFor(states ...State) { ts.mu.Lock() if slices.Contains(states, ts.currentState) { ts.mu.Unlock() @@ -139,7 +125,7 @@ func (ts *ThreadState) WaitFor(states ...StateID) { } // safely request a state change from a different goroutine -func (ts *ThreadState) RequestSafeStateChange(nextState StateID) bool { +func (ts *ThreadState) RequestSafeStateChange(nextState State) bool { ts.mu.Lock() switch ts.currentState { // disallow state changes if shutting down or done From a36547bc2ffb620ec6cc43f4c6ff4a4e276ef858 Mon Sep 17 00:00:00 2001 From: Alexander Stecher <45872305+AlliBalliBaba@users.noreply.github.com> Date: Thu, 13 Nov 2025 23:38:54 +0100 Subject: [PATCH 10/20] suggestion: simplify exponential backoff (#1970) * removes backoff. * Adjusts comment. * Suggestions by @dunglas * Removes 'max_consecutive_failures' * Removes 'max_consecutive_failures' * Adjusts warning. * Disables the logger in tests. * Revert "Adjusts warning." This reverts commit e93a6a930129e938d076fc5176a283f6b4b45852. * Revert "Removes 'max_consecutive_failures'" This reverts commit ba28ea0e4ada8639095bac8273961edf4c3b4cb2. * Revert "Removes 'max_consecutive_failures'" This reverts commit 32e649caf7a0f0b987cac3ffd37f6855497355d7. * Only fails on max failures again. * Restores failure timings. --- frankenphp_test.go | 10 +++--- internal/backoff/backoff.go | 59 -------------------------------- internal/backoff/backoff_test.go | 41 ---------------------- phpmainthread_test.go | 13 ++++--- testdata/failing-worker.php | 17 ++------- threadworker.go | 41 +++++++++++++--------- worker.go | 25 ++++++++++---- 7 files changed, 57 insertions(+), 149 deletions(-) delete mode 100644 internal/backoff/backoff.go delete mode 100644 internal/backoff/backoff_test.go diff --git a/frankenphp_test.go b/frankenphp_test.go index 7b4b44dbce..f7e0a171cf 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -601,10 +601,12 @@ func testRequestHeaders(t *testing.T, opts *testOptions) { } func TestFailingWorker(t *testing.T) { - runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { - body, _ := testGet("http://example.com/failing-worker.php", handler, t) - assert.Contains(t, body, "ok") - }, &testOptions{workerScript: "failing-worker.php"}) + err := frankenphp.Init( + frankenphp.WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))), + frankenphp.WithWorkers("failing worker", "testdata/failing-worker.php", 4, frankenphp.WithWorkerMaxFailures(1)), + frankenphp.WithNumThreads(5), + ) + assert.Error(t, err, "should return an immediate error if workers fail on startup") } func TestEnv(t *testing.T) { diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go deleted file mode 100644 index 73182f4a37..0000000000 --- a/internal/backoff/backoff.go +++ /dev/null @@ -1,59 +0,0 @@ -package backoff - -import "C" -import ( - "sync" - "time" -) - -type ExponentialBackoff struct { - backoff time.Duration - failureCount int - mu sync.RWMutex - MaxBackoff time.Duration - MinBackoff time.Duration - MaxConsecutiveFailures int -} - -// recordSuccess resets the backoff and failureCount -func (e *ExponentialBackoff) RecordSuccess() { - e.mu.Lock() - e.failureCount = 0 - e.backoff = e.MinBackoff - e.mu.Unlock() -} - -// recordFailure increments the failure count and increases the backoff, it returns true if MaxConsecutiveFailures has been reached -func (e *ExponentialBackoff) RecordFailure() bool { - e.mu.Lock() - e.failureCount += 1 - if e.backoff < e.MinBackoff { - e.backoff = e.MinBackoff - } - - e.backoff = min(e.backoff*2, e.MaxBackoff) - - e.mu.Unlock() - return e.MaxConsecutiveFailures != -1 && e.failureCount >= e.MaxConsecutiveFailures -} - -// wait sleeps for the backoff duration if failureCount is non-zero. -// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple) -func (e *ExponentialBackoff) Wait() { - e.mu.RLock() - if e.failureCount == 0 { - e.mu.RUnlock() - - return - } - e.mu.RUnlock() - - time.Sleep(e.backoff) -} - -func (e *ExponentialBackoff) FailureCount() int { - e.mu.RLock() - defer e.mu.RUnlock() - - return e.failureCount -} diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go deleted file mode 100644 index b82efc4bd4..0000000000 --- a/internal/backoff/backoff_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package backoff - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestExponentialBackoff_Reset(t *testing.T) { - e := &ExponentialBackoff{ - MaxBackoff: 5 * time.Second, - MinBackoff: 500 * time.Millisecond, - MaxConsecutiveFailures: 3, - } - - assert.False(t, e.RecordFailure()) - assert.False(t, e.RecordFailure()) - e.RecordSuccess() - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0") - assert.Equal(t, e.backoff, e.MinBackoff, "expected backoff to be reset to MinBackoff") -} - -func TestExponentialBackoff_Trigger(t *testing.T) { - e := &ExponentialBackoff{ - MaxBackoff: 500 * 3 * time.Millisecond, - MinBackoff: 500 * time.Millisecond, - MaxConsecutiveFailures: 3, - } - - assert.False(t, e.RecordFailure()) - assert.False(t, e.RecordFailure()) - assert.True(t, e.RecordFailure()) - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, e.failureCount, e.MaxConsecutiveFailures, "expected failureCount to be MaxConsecutiveFailures") - assert.Equal(t, e.backoff, e.MaxBackoff, "expected backoff to be MaxBackoff") -} diff --git a/phpmainthread_test.go b/phpmainthread_test.go index 81d94c2d35..e6cafe0647 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -175,9 +175,9 @@ func TestFinishBootingAWorkerScript(t *testing.T) { func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) { workers = []*worker{} - w, err1 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures}) + w, err1 := newWorker(workerOpt{fileName: "filename.php"}) workers = append(workers, w) - _, err2 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures}) + _, err2 := newWorker(workerOpt{fileName: "filename.php"}) assert.NoError(t, err1) assert.Error(t, err2, "two workers cannot have the same filename") @@ -185,9 +185,9 @@ func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) { func TestReturnAnErrorIf2ModuleWorkersHaveTheSameName(t *testing.T) { workers = []*worker{} - w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures}) + w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername"}) workers = append(workers, w) - _, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures}) + _, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername"}) assert.NoError(t, err1) assert.Error(t, err2, "two workers cannot have the same name") @@ -198,9 +198,8 @@ func getDummyWorker(fileName string) *worker { workers = []*worker{} } worker, _ := newWorker(workerOpt{ - fileName: testDataPath + "/" + fileName, - num: 1, - maxConsecutiveFailures: defaultMaxConsecutiveFailures, + fileName: testDataPath + "/" + fileName, + num: 1, }) workers = append(workers, worker) return worker diff --git a/testdata/failing-worker.php b/testdata/failing-worker.php index 108d2ff865..0bb001f1ed 100644 --- a/testdata/failing-worker.php +++ b/testdata/failing-worker.php @@ -1,18 +1,7 @@ = 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures { + select { + case startupFailChan <- fmt.Errorf("worker failure: script %s has not reached frankenphp_handle_request()", worker.fileName): + handler.thread.state.Set(state.ShuttingDown) + return } - logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount())) } + + if watcherIsEnabled { + // worker script has probably failed due to script changes while watcher is enabled + logger.LogAttrs(ctx, slog.LevelWarn, "(watcher enabled) worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) + } else { + // rare case where worker script has failed on a restart during normal operation + // this can happen if startup success depends on external resources + logger.LogAttrs(ctx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) + } + + // wait a bit and try again (exponential backoff) + backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond + if backoffDuration > time.Second { + backoffDuration = time.Second + } + handler.failureCount++ + time.Sleep(backoffDuration) } // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. @@ -171,6 +177,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { // Clear the first dummy request created to initialize the worker if handler.isBootingScript { handler.isBootingScript = false + handler.failureCount = 0 if !C.frankenphp_shutdown_dummy_request() { panic("Not in CGI context") } diff --git a/worker.go b/worker.go index 96ba45e338..13e49111c2 100644 --- a/worker.go +++ b/worker.go @@ -31,41 +31,52 @@ type worker struct { var ( workers []*worker watcherIsEnabled bool + startupFailChan chan (error) ) func initWorkers(opt []workerOpt) error { workers = make([]*worker, 0, len(opt)) - workersReady := sync.WaitGroup{} directoriesToWatch := getDirectoriesToWatch(opt) watcherIsEnabled = len(directoriesToWatch) > 0 + totalThreadsToStart := 0 for _, o := range opt { w, err := newWorker(o) if err != nil { return err } + totalThreadsToStart += w.num workers = append(workers, w) } + startupFailChan = make(chan error, totalThreadsToStart) + var workersReady sync.WaitGroup for _, w := range workers { - workersReady.Add(w.num) for i := 0; i < w.num; i++ { thread := getInactivePHPThread() convertToWorkerThread(thread, w) - go func() { - thread.state.WaitFor(state.Ready) - workersReady.Done() - }() + workersReady.Go(func() { + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done) + }) } } workersReady.Wait() + select { + case err := <-startupFailChan: + // at least 1 worker has failed, shut down and return an error + Shutdown() + return fmt.Errorf("failed to initialize workers: %w", err) + default: + // all workers started successfully + startupFailChan = nil + } + if !watcherIsEnabled { return nil } - watcherIsEnabled = true if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil { return err } From 42b2ffacf40b4865c6288f6323480a9f45fbd6e5 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Thu, 13 Nov 2025 23:45:40 +0100 Subject: [PATCH 11/20] changes log to the documented version. --- threadworker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/threadworker.go b/threadworker.go index 1fba6f2402..f4ac1e1580 100644 --- a/threadworker.go +++ b/threadworker.go @@ -142,7 +142,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures { select { - case startupFailChan <- fmt.Errorf("worker failure: script %s has not reached frankenphp_handle_request()", worker.fileName): + case startupFailChan <- fmt.Errorf("too many consecutive failures: worker %s has not reached frankenphp_handle_request()", worker.fileName): handler.thread.state.Set(state.ShuttingDown) return } From 26e1408f54043c24ae0f50c42fcb3e4214c8f2f4 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 14 Nov 2025 22:06:04 +0100 Subject: [PATCH 12/20] go linting. --- threadworker.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/threadworker.go b/threadworker.go index f4ac1e1580..4cf64351cb 100644 --- a/threadworker.go +++ b/threadworker.go @@ -141,11 +141,9 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { } if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures { - select { - case startupFailChan <- fmt.Errorf("too many consecutive failures: worker %s has not reached frankenphp_handle_request()", worker.fileName): - handler.thread.state.Set(state.ShuttingDown) - return - } + startupFailChan <- fmt.Errorf("too many consecutive failures: worker %s has not reached frankenphp_handle_request()", worker.fileName) + handler.thread.state.Set(state.ShuttingDown) + return } if watcherIsEnabled { From bcd482a643f7f6b3db254cd97f614886973470b1 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 21 Nov 2025 23:13:15 +0100 Subject: [PATCH 13/20] Resolve merge conflicts. --- phpthread.go | 4 ++-- threadregular.go | 4 ++-- threadworker.go | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/phpthread.go b/phpthread.go index f701a4a451..1726cf9d18 100644 --- a/phpthread.go +++ b/phpthread.go @@ -46,7 +46,7 @@ func newPHPThread(threadIndex int) *phpThread { func (thread *phpThread) boot() { // thread must be in reserved state to boot if !thread.state.CompareAndSwap(state.Reserved, state.Booting) && !thread.state.CompareAndSwap(state.BootRequested, state.Booting) { - panic("thread is not in reserved state: " + thread.state.name()) + panic("thread is not in reserved state: " + thread.state.Name()) } // boot threads as inactive @@ -84,7 +84,7 @@ func (thread *phpThread) shutdown() { func (thread *phpThread) setHandler(handler threadHandler) { thread.handlerMu.Lock() defer thread.handlerMu.Unlock() - if !thread.state.requestSafeStateChange(stateTransitionRequested) { + if !thread.state.RequestSafeStateChange(state.TransitionRequested) { // no state change allowed == shutdown or done return } diff --git a/threadregular.go b/threadregular.go index 09889f5d11..e65e0302aa 100644 --- a/threadregular.go +++ b/threadregular.go @@ -13,7 +13,7 @@ import ( type regularThread struct { contextHolder - state *state.ThreadState + state *state.ThreadState thread *phpThread } @@ -53,7 +53,7 @@ func (handler *regularThread) beforeScriptExecution() string { return "" } - panic("unexpected state: " + handler.state.name()) + panic("unexpected state: " + handler.state.Name()) } func (handler *regularThread) afterScriptExecution(_ int) { diff --git a/threadworker.go b/threadworker.go index 2a182adb10..d6f119a353 100644 --- a/threadworker.go +++ b/threadworker.go @@ -4,6 +4,7 @@ package frankenphp import "C" import ( "context" + "fmt" "log/slog" "path/filepath" "time" @@ -16,7 +17,7 @@ import ( // executes the PHP worker script in a loop // implements the threadHandler interface type workerThread struct { - state *state.ThreadState + state *state.ThreadState thread *phpThread worker *worker dummyFrankenPHPContext *frankenPHPContext @@ -24,7 +25,7 @@ type workerThread struct { workerFrankenPHPContext *frankenPHPContext workerContext context.Context isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet - failureCount int // number of consecutive startup failures + failureCount int // number of consecutive startup failures } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -178,7 +179,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // rare case where worker script has failed on a restart during normal operation // this can happen if startup success depends on external resources if globalLogger.Enabled(globalCtx, slog.LevelWarn) { - globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount)) + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount)) } } From 083ef0e669d75c8ab9ffa2d7b1751e6cbafebaad Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 1 Dec 2025 22:22:08 +0100 Subject: [PATCH 14/20] test --- caddy/workerconfig.go | 16 +++++++++++++++- options.go | 12 ++++++++++++ worker.go | 2 ++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index baf9ad86b2..5de7bfdc2e 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -38,12 +38,14 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` + // IsHTTPWorker specifies if the worker handles HTTP requests + IsHTTPWorker bool `json:"http,omitempty"` requestOptions []frankenphp.RequestOption } func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { - wc := workerConfig{} + wc := workerConfig{IsHTTPWorker:true} if d.NextArg() { wc.FileName = d.Val() } @@ -116,6 +118,18 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { } else { wc.Watch = append(wc.Watch, d.Val()) } + case "http": + if !d.NextArg() { + wc.HTTP = true + } + v, err := strconv.ParseBool(d.Val()) + if err != nil { + return err + } + if d.NextArg() { + return d.ArgErr() + } + wc.HTTP = v case "match": // provision the path so it's identical to Caddy match rules // see: https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/matchers.go diff --git a/options.go b/options.go index b9751ad8b2..1eefcec5b9 100644 --- a/options.go +++ b/options.go @@ -44,6 +44,7 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() + isHTTP bool } // WithContext sets the main context to use. @@ -90,6 +91,7 @@ func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option env: PrepareEnv(nil), watch: []string{}, maxConsecutiveFailures: defaultMaxConsecutiveFailures, + isHTTP: true } for _, option := range options { @@ -234,6 +236,16 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } } +// AsHTTPWorker determines if the worker will handle HTTP requests (true by default). +func AsHTTPWorker(isHTTP bool) WorkerOption { + return func(w *workerOpt) error { + w.isHTTP = isHTTP + + return nil + } +} + + func withExtensionWorkers(w *extensionWorkers) WorkerOption { return func(wo *workerOpt) error { wo.extensionWorkers = w diff --git a/worker.go b/worker.go index 5b83f67582..8ac5a69bfc 100644 --- a/worker.go +++ b/worker.go @@ -31,6 +31,7 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 + isHTTP bool } var ( @@ -140,6 +141,7 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, + isHTTP o.isHTTP } w.requestOptions = append( From 4969a4bdb62e5dccb57c34d1e8d0297390fd5962 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 1 Dec 2025 22:49:43 +0100 Subject: [PATCH 15/20] inverts configuration logic. --- caddy/app.go | 2 ++ caddy/workerconfig.go | 21 +++++++-------------- frankenphp.c | 10 +++++++++- frankenphp.go | 4 ++++ frankenphp.h | 2 +- options.go | 8 +++----- phpthread.go | 4 ++-- threadregular.go | 2 +- threadworker.go | 2 +- worker.go | 4 ++-- 10 files changed, 32 insertions(+), 27 deletions(-) diff --git a/caddy/app.go b/caddy/app.go index e9c31c9fe4..cd32859c95 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -158,6 +158,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerWatchMode(w.Watch), frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures), frankenphp.WithWorkerMaxThreads(w.MaxThreads), + frankenphp.WithWorkerHTTPDisabled(w.DisableHTTP), ) } else { workerOpts = append( @@ -167,6 +168,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures), frankenphp.WithWorkerMaxThreads(w.MaxThreads), frankenphp.WithWorkerRequestOptions(w.requestOptions...), + frankenphp.WithWorkerHTTPDisabled(w.DisableHTTP), ) } diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index 5de7bfdc2e..116a64b20d 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -38,14 +38,14 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` - // IsHTTPWorker specifies if the worker handles HTTP requests - IsHTTPWorker bool `json:"http,omitempty"` + // DisableHTTP specifies if the worker handles HTTP requests + DisableHTTP bool `json:"http_disabled,omitempty"` requestOptions []frankenphp.RequestOption } func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { - wc := workerConfig{IsHTTPWorker:true} + wc := workerConfig{} if d.NextArg() { wc.FileName = d.Val() } @@ -118,18 +118,11 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { } else { wc.Watch = append(wc.Watch, d.Val()) } - case "http": - if !d.NextArg() { - wc.HTTP = true + case "http_disabled": + if d.NextArg() { + return wc, d.ArgErr() } - v, err := strconv.ParseBool(d.Val()) - if err != nil { - return err - } - if d.NextArg() { - return d.ArgErr() - } - wc.HTTP = v + wc.DisableHTTP = true case "match": // provision the path so it's identical to Caddy match rules // see: https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/matchers.go diff --git a/frankenphp.c b/frankenphp.c index 04782a9b65..cec54824f6 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -72,10 +72,12 @@ frankenphp_config frankenphp_get_config() { bool should_filter_var = 0; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread bool is_http_thread = true; __thread zval *os_environment = NULL; -void frankenphp_update_local_thread_context(bool is_worker) { +void frankenphp_update_local_thread_context(bool is_worker, bool httpEnabled) { is_worker_thread = is_worker; + is_http_thread = httpEnabled; } static void frankenphp_update_request_context() { @@ -168,6 +170,9 @@ static void frankenphp_release_temporary_streams() { /* Adapted from php_request_shutdown */ static void frankenphp_worker_request_shutdown() { + if (!is_http_thread){ + return; + } /* Flush all output buffers */ zend_try { php_output_end_all(); } zend_end_try(); @@ -212,6 +217,9 @@ PHPAPI void get_full_env(zval *track_vars_array) { /* Adapted from php_request_startup() */ static int frankenphp_worker_request_startup() { int retval = SUCCESS; + if (!is_http_thread){ + return retval; + } frankenphp_update_request_context(); diff --git a/frankenphp.go b/frankenphp.go index d71f25a0b2..8717ef6675 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -48,6 +48,7 @@ var ( ErrMainThreadCreation = errors.New("error creating the main thread") ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") + ErrNotHTTPWorker = errors.New("worker is not an HTTP worker") ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest} ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest} @@ -399,6 +400,9 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error // Detect if a worker is available to handle this request if fc.worker != nil { + if !fc.worker.httpEnabled { + return ErrNotHTTPWorker + } return fc.worker.handleRequest(ch) } diff --git a/frankenphp.h b/frankenphp.h index efbd5fc48f..b0e5f8c857 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -45,7 +45,7 @@ bool frankenphp_new_php_thread(uintptr_t thread_index); bool frankenphp_shutdown_dummy_request(void); int frankenphp_execute_script(char *file_name); -void frankenphp_update_local_thread_context(bool is_worker); +void frankenphp_update_local_thread_context(bool is_worker, bool httpEnabled); int frankenphp_execute_script_cli(char *script, int argc, char **argv, bool eval); diff --git a/options.go b/options.go index 1eefcec5b9..58ca6dbf32 100644 --- a/options.go +++ b/options.go @@ -44,7 +44,7 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() - isHTTP bool + httpDisabled bool } // WithContext sets the main context to use. @@ -91,7 +91,6 @@ func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option env: PrepareEnv(nil), watch: []string{}, maxConsecutiveFailures: defaultMaxConsecutiveFailures, - isHTTP: true } for _, option := range options { @@ -237,15 +236,14 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } // AsHTTPWorker determines if the worker will handle HTTP requests (true by default). -func AsHTTPWorker(isHTTP bool) WorkerOption { +func WithWorkerHTTPDisabled(isDisabled bool) WorkerOption { return func(w *workerOpt) error { - w.isHTTP = isHTTP + w.httpDisabled = isDisabled return nil } } - func withExtensionWorkers(w *extensionWorkers) WorkerOption { return func(wo *workerOpt) error { wo.extensionWorkers = w diff --git a/phpthread.go b/phpthread.go index 1726cf9d18..8377bf7a41 100644 --- a/phpthread.go +++ b/phpthread.go @@ -143,8 +143,8 @@ func (thread *phpThread) pinCString(s string) *C.char { return thread.pinString(s + "\x00") } -func (*phpThread) updateContext(isWorker bool) { - C.frankenphp_update_local_thread_context(C.bool(isWorker)) +func (*phpThread) updateContext(isWorker bool, httpEnabled bool) { + C.frankenphp_update_local_thread_context(C.bool(isWorker), C.bool(httpEnabled)) } //export go_frankenphp_before_script_execution diff --git a/threadregular.go b/threadregular.go index ad44c57170..7dd228bdc8 100644 --- a/threadregular.go +++ b/threadregular.go @@ -42,7 +42,7 @@ func (handler *regularThread) beforeScriptExecution() string { return handler.thread.transitionToNewHandler() case state.TransitionComplete: - handler.thread.updateContext(false) + handler.thread.updateContext(false, true) handler.state.Set(state.Ready) return handler.waitForRequest() diff --git a/threadworker.go b/threadworker.go index ae7e4545f2..fec81487d1 100644 --- a/threadworker.go +++ b/threadworker.go @@ -54,7 +54,7 @@ func (handler *workerThread) beforeScriptExecution() string { handler.state.WaitFor(state.Ready, state.ShuttingDown) return handler.beforeScriptExecution() case state.Ready, state.TransitionComplete: - handler.thread.updateContext(true) + handler.thread.updateContext(true, handler.worker.httpEnabled) if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) } diff --git a/worker.go b/worker.go index c03fccd9b5..5412608d5c 100644 --- a/worker.go +++ b/worker.go @@ -32,7 +32,7 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 - isHTTP bool + httpEnabled bool } var ( @@ -155,7 +155,7 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, - isHTTP o.isHTTP + httpEnabled: !o.httpDisabled, } w.requestOptions = append( From 75f658bc8e26e2eab343fcfce36ac20c997c1a25 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 1 Dec 2025 23:37:35 +0100 Subject: [PATCH 16/20] adds working implementation. --- caddy/workerconfig.go | 2 +- frankenphp.c | 19 +++++++++++++++ frankenphp.stub.php | 2 ++ frankenphp_arginfo.h | 7 ++++++ testdata/request-receiver.php | 7 ++++++ testdata/request-sender.php | 8 ++++++ threadworker.go | 6 +++++ worker.go | 46 +++++++++++++++++++++++++++++++++++ 8 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 testdata/request-receiver.php create mode 100644 testdata/request-sender.php diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index 116a64b20d..0a35145d8e 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -147,7 +147,7 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { wc.MaxConsecutiveFailures = v default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, http_disabled", v) } } diff --git a/frankenphp.c b/frankenphp.c index cec54824f6..cdbd0f3bbf 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -494,6 +494,25 @@ PHP_FUNCTION(frankenphp_handle_request) { RETURN_TRUE; } +PHP_FUNCTION(frankenphp_send_request) { + zval *zv; + char *worker_name = NULL; + size_t worker_name_len = 0; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(zv); + Z_PARAM_OPTIONAL + Z_PARAM_STRING(worker_name, worker_name_len); + ZEND_PARSE_PARAMETERS_END(); + + char *error = go_frankenphp_send_request(thread_index, zv, worker_name, + worker_name_len); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + RETURN_THROWS(); + } +} + PHP_FUNCTION(headers_send) { zend_long response_code = 200; diff --git a/frankenphp.stub.php b/frankenphp.stub.php index 60ac5d5885..c790c30a69 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -4,6 +4,8 @@ function frankenphp_handle_request(callable $callback): bool {} +function frankenphp_send_request(mixed $message, string $workerName = ""): bool {} + function headers_send(int $status = 200): int {} function frankenphp_finish_request(): bool {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index 558c6e3cf6..22a7d97888 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -5,6 +5,11 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_send_request, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, message, IS_MIXED, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "\"\"") +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200") ZEND_END_ARG_INFO() @@ -37,6 +42,7 @@ ZEND_END_ARG_INFO() ZEND_FUNCTION(frankenphp_handle_request); +ZEND_FUNCTION(frankenphp_send_request); ZEND_FUNCTION(headers_send); ZEND_FUNCTION(frankenphp_finish_request); ZEND_FUNCTION(frankenphp_request_headers); @@ -46,6 +52,7 @@ ZEND_FUNCTION(mercure_publish); static const zend_function_entry ext_functions[] = { ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) + ZEND_FE(frankenphp_send_request, arginfo_frankenphp_send_request) ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request) ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request) diff --git a/testdata/request-receiver.php b/testdata/request-receiver.php new file mode 100644 index 0000000000..cca381da9d --- /dev/null +++ b/testdata/request-receiver.php @@ -0,0 +1,7 @@ + Date: Tue, 2 Dec 2025 00:22:26 +0100 Subject: [PATCH 17/20] adds test --- frankenphp_test.go | 23 +++++++++++++++++++++++ threadworker.go | 5 ++--- worker.go | 5 ++++- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/frankenphp_test.go b/frankenphp_test.go index 9328b27835..98757ec7b2 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -749,6 +749,29 @@ func TestExecuteCLICode(t *testing.T) { assert.Equal(t, stdoutStderrStr, `Hello World`) } +func TestFrankenSendRequest(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(handler) + cwd, _ := os.Getwd() + workerFile := cwd + "/testdata/request-receiver.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/request-sender.php?message=hi-from-go", handler, t) + assert.Equal(t, "request sent", body) + }, &testOptions{ + logger: logger, + initOpts: []frankenphp.Option{frankenphp.WithWorkers( + "workerName", + workerFile, + 1, + frankenphp.WithWorkerHTTPDisabled(true), + )}, + }) + + assert.Contains(t, buf.String(), "hi-from-go") +} + func ExampleServeHTTP() { if err := frankenphp.Init(); err != nil { panic(err) diff --git a/threadworker.go b/threadworker.go index bb6d60e126..438fecf3ff 100644 --- a/threadworker.go +++ b/threadworker.go @@ -128,7 +128,6 @@ func setupWorkerScript(handler *workerThread, worker *worker) { // non-http worker: instantly mark as ready if !worker.httpEnabled { - handler.isBootingScript = false handler.thread.state.Set(state.Ready) } } @@ -160,7 +159,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // worker has thrown a fatal error or has not reached frankenphp_handle_request metrics.StopWorker(worker.name, StopReasonCrash) - if !handler.isBootingScript { + if !handler.isBootingScript || !worker.httpEnabled { // fatal error (could be due to exit(1), timeouts, etc.) if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) @@ -304,7 +303,7 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval thread.handler.(*workerThread).workerFrankenPHPContext = nil thread.handler.(*workerThread).workerContext = nil - if globalLogger.Enabled(ctx, slog.LevelDebug) { + if globalLogger.Enabled(ctx, slog.LevelDebug) && thread.handler.(*workerThread).worker.httpEnabled { if fc.request == nil { fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex)) } else { diff --git a/worker.go b/worker.go index add8d73a9b..15b6daf3c3 100644 --- a/worker.go +++ b/worker.go @@ -3,6 +3,7 @@ package frankenphp // #include "frankenphp.h" import "C" import ( + "context" "fmt" "log/slog" "os" @@ -355,8 +356,10 @@ func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.cha fc.responseWriter = nil fc.handlerParameters = message + ctx := context.WithValue(context.Background(), contextKey, fc) + go func() { - err := w.handleRequest(contextHolder{phpThreads[threadIndex].context(), fc}) + err := w.handleRequest(contextHolder{ctx, fc}) if err != nil && globalLogger.Enabled(globalCtx, slog.LevelError) { globalLogger.LogAttrs( globalCtx, From ad6bb653ec35654baceb99948a2e00897b76c601 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Tue, 2 Dec 2025 22:17:57 +0100 Subject: [PATCH 18/20] error cases and formatting. --- frankenphp.c | 4 ++-- threadworker.go | 6 ++++-- worker.go | 4 ++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/frankenphp.c b/frankenphp.c index cdbd0f3bbf..9fd4c12d2f 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -170,7 +170,7 @@ static void frankenphp_release_temporary_streams() { /* Adapted from php_request_shutdown */ static void frankenphp_worker_request_shutdown() { - if (!is_http_thread){ + if (!is_http_thread) { return; } /* Flush all output buffers */ @@ -217,7 +217,7 @@ PHPAPI void get_full_env(zval *track_vars_array) { /* Adapted from php_request_startup() */ static int frankenphp_worker_request_startup() { int retval = SUCCESS; - if (!is_http_thread){ + if (!is_http_thread) { return retval; } diff --git a/threadworker.go b/threadworker.go index 438fecf3ff..09a64daed3 100644 --- a/threadworker.go +++ b/threadworker.go @@ -126,9 +126,11 @@ func setupWorkerScript(handler *workerThread, worker *worker) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } - // non-http worker: instantly mark as ready + // non-http worker: instantly gets marked as ready if !worker.httpEnabled { + metrics.ReadyWorker(handler.worker.name) handler.thread.state.Set(state.Ready) + handler.isBootingScript = false } } @@ -159,7 +161,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // worker has thrown a fatal error or has not reached frankenphp_handle_request metrics.StopWorker(worker.name, StopReasonCrash) - if !handler.isBootingScript || !worker.httpEnabled { + if !handler.isBootingScript { // fatal error (could be due to exit(1), timeouts, etc.) if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) diff --git a/worker.go b/worker.go index 15b6daf3c3..c5cba528ec 100644 --- a/worker.go +++ b/worker.go @@ -345,6 +345,10 @@ func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.cha return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen))) } + if w.httpEnabled == nil { + return phpThreads[threadIndex].pinCString("Cannot call frankenphp_send_request() on a HTTP worker: " + C.GoStringN(name, C.int(nameLen))) + } + message, err := goValue[any](zv) if err != nil { return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error()) From 00d2a4963bfbd883e7cfcbcdc91c280ceca8bad7 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Tue, 2 Dec 2025 23:38:03 +0100 Subject: [PATCH 19/20] Fixes tests. --- worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker.go b/worker.go index c5cba528ec..1283281e10 100644 --- a/worker.go +++ b/worker.go @@ -345,7 +345,7 @@ func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.cha return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen))) } - if w.httpEnabled == nil { + if w.httpEnabled { return phpThreads[threadIndex].pinCString("Cannot call frankenphp_send_request() on a HTTP worker: " + C.GoStringN(name, C.int(nameLen))) } From c4aedb8b85c305e88d815e748800b1f5c2614ada Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Wed, 3 Dec 2025 00:05:39 +0100 Subject: [PATCH 20/20] Solves race condition. --- testdata/request-sender.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/testdata/request-sender.php b/testdata/request-sender.php index 3b81594ff0..427e24d279 100644 --- a/testdata/request-sender.php +++ b/testdata/request-sender.php @@ -5,4 +5,8 @@ frankenphp_send_request($message, $workerName); +// sleep to make sure request was received +// TODO: solve this test-restart race condition with Futures instead? +usleep(10_000); + echo "request sent";