Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ var (
ErrScriptExecution = errors.New("error during PHP script execution")
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")

isRunning bool
isRunning bool
onServerShutdown []func()

loggerMu sync.RWMutex
logger *slog.Logger
Expand Down Expand Up @@ -216,7 +217,7 @@ func Init(options ...Option) error {

// add registered external workers
for _, ew := range extensionWorkers {
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.MinThreads(), WithWorkerEnv(ew.Env())))
options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...))
}

opt := &opt{}
Expand Down Expand Up @@ -291,6 +292,17 @@ func Init(options ...Option) error {
logger.LogAttrs(ctx, slog.LevelInfo, "embedded PHP app 📦", slog.String("path", EmbeddedAppPath))
}

// register the startup/shutdown hooks (mainly useful for extensions)
onServerShutdown = nil
for _, w := range opt.workers {
if w.onServerStartup != nil {
w.onServerStartup()
}
if w.onServerShutdown != nil {
onServerShutdown = append(onServerShutdown, w.onServerShutdown)
}
}

return nil
}

Expand All @@ -300,6 +312,11 @@ func Shutdown() {
return
}

// call the shutdown hooks (mainly useful for extensions)
for _, fn := range onServerShutdown {
fn()
}

drainWatcher()
drainAutoScaling()
drainPHPThreads()
Expand Down
38 changes: 38 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type workerOpt struct {
env PreparedEnv
watch []string
maxConsecutiveFailures int
onThreadReady func(int)
onThreadShutdown func(int)
onServerStartup func()
onServerShutdown func()
}

// WithNumThreads configures the number of PHP threads to start.
Expand Down Expand Up @@ -116,6 +120,40 @@ func WithWorkerMaxFailures(maxFailures int) WorkerOption {
}
}

func WithWorkerOnReady(f func(int)) WorkerOption {
return func(w *workerOpt) error {
w.onThreadReady = f

return nil
}
}

func WithWorkerOnShutdown(f func(int)) WorkerOption {
return func(w *workerOpt) error {
w.onThreadShutdown = f

return nil
}
}

// WithWorkerOnServerStartup adds a function to be called right after server startup. Useful for extensions.
func WithWorkerOnServerStartup(f func()) WorkerOption {
return func(w *workerOpt) error {
w.onServerStartup = f

return nil
}
}

// WithWorkerOnServerShutdown adds a function to be called right before server shutdown. Useful for extensions.
func WithWorkerOnServerShutdown(f func()) WorkerOption {
return func(w *workerOpt) error {
w.onServerShutdown = f

return nil
}
}

// WithLogger configures the global logger to use.
func WithLogger(l *slog.Logger) Option {
return func(o *opt) error {
Expand Down
8 changes: 8 additions & 0 deletions testdata/message-worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

while (frankenphp_handle_request(function ($message) {
echo $message;
return "received message: $message";
})) {
// continue handling requests
}
20 changes: 8 additions & 12 deletions threadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ type workerThread struct {
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *exponentialBackoff
externalWorker Worker
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
}

func convertToWorkerThread(thread *phpThread, worker *worker) {
externalWorker := extensionWorkers[worker.name]

thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
Expand All @@ -36,7 +33,6 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: worker.maxConsecutiveFailures,
},
externalWorker: externalWorker,
})
worker.attachThread(thread)
}
Expand All @@ -45,27 +41,27 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
if handler.externalWorker != nil {
handler.externalWorker.OnServerShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
if handler.externalWorker != nil {
handler.externalWorker.OnShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
if handler.externalWorker != nil {
handler.externalWorker.OnReady(handler.thread.threadIndex)
if handler.worker.onThreadReady != nil {
handler.worker.onThreadReady(handler.thread.threadIndex)
}
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
if handler.externalWorker != nil {
handler.externalWorker.OnServerShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
// signal to stop
Expand Down
10 changes: 4 additions & 6 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type worker struct {
threadMutex sync.RWMutex
allowPathMatching bool
maxConsecutiveFailures int
onThreadReady func(int)
onThreadShutdown func(int)
}

var (
Expand Down Expand Up @@ -51,12 +53,6 @@ func initWorkers(opt []workerOpt) error {
convertToWorkerThread(thread, w)
go func() {
thread.state.waitFor(stateReady)

// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
go startWorker(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()
}
Expand Down Expand Up @@ -131,6 +127,8 @@ func newWorker(o workerOpt) (*worker, error) {
threads: make([]*phpThread, 0, o.num),
allowPathMatching: allowPathMatching,
maxConsecutiveFailures: o.maxConsecutiveFailures,
onThreadReady: o.onThreadReady,
onThreadShutdown: o.onThreadShutdown,
}

return w, nil
Expand Down
Loading