Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions caddy/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithWorkerWatchMode(w.Watch),
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
frankenphp.WithWorkerHTTPDisabled(w.DisableHTTP),
)
} else {
workerOpts = append(
Expand All @@ -167,6 +168,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
frankenphp.WithWorkerRequestOptions(w.requestOptions...),
frankenphp.WithWorkerHTTPDisabled(w.DisableHTTP),
)
}

Expand Down
9 changes: 8 additions & 1 deletion caddy/workerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type workerConfig struct {
MatchPath []string `json:"match_path,omitempty"`
// MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick)
MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"`
// DisableHTTP specifies if the worker handles HTTP requests
DisableHTTP bool `json:"http_disabled,omitempty"`

requestOptions []frankenphp.RequestOption
}
Expand Down Expand Up @@ -116,6 +118,11 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {
} else {
wc.Watch = append(wc.Watch, d.Val())
}
case "http_disabled":
if d.NextArg() {
return wc, d.ArgErr()
}
wc.DisableHTTP = true
case "match":
// provision the path so it's identical to Caddy match rules
// see: https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/matchers.go
Expand All @@ -140,7 +147,7 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {

wc.MaxConsecutiveFailures = v
default:
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v)
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, http_disabled", v)
}
}

Expand Down
29 changes: 28 additions & 1 deletion frankenphp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ frankenphp_config frankenphp_get_config() {
bool should_filter_var = 0;
__thread uintptr_t thread_index;
__thread bool is_worker_thread = false;
__thread bool is_http_thread = true;
__thread zval *os_environment = NULL;

void frankenphp_update_local_thread_context(bool is_worker) {
void frankenphp_update_local_thread_context(bool is_worker, bool httpEnabled) {
is_worker_thread = is_worker;
is_http_thread = httpEnabled;
}

static void frankenphp_update_request_context() {
Expand Down Expand Up @@ -168,6 +170,9 @@ static void frankenphp_release_temporary_streams() {

/* Adapted from php_request_shutdown */
static void frankenphp_worker_request_shutdown() {
if (!is_http_thread) {
return;
}
/* Flush all output buffers */
zend_try { php_output_end_all(); }
zend_end_try();
Expand Down Expand Up @@ -212,6 +217,9 @@ PHPAPI void get_full_env(zval *track_vars_array) {
/* Adapted from php_request_startup() */
static int frankenphp_worker_request_startup() {
int retval = SUCCESS;
if (!is_http_thread) {
return retval;
}

frankenphp_update_request_context();

Expand Down Expand Up @@ -486,6 +494,25 @@ PHP_FUNCTION(frankenphp_handle_request) {
RETURN_TRUE;
}

PHP_FUNCTION(frankenphp_send_request) {
zval *zv;
char *worker_name = NULL;
size_t worker_name_len = 0;

ZEND_PARSE_PARAMETERS_START(1, 2);
Z_PARAM_ZVAL(zv);
Z_PARAM_OPTIONAL
Z_PARAM_STRING(worker_name, worker_name_len);
ZEND_PARSE_PARAMETERS_END();

char *error = go_frankenphp_send_request(thread_index, zv, worker_name,
worker_name_len);
if (error) {
zend_throw_exception(spl_ce_RuntimeException, error, 0);
RETURN_THROWS();
}
}

PHP_FUNCTION(headers_send) {
zend_long response_code = 200;

Expand Down
4 changes: 4 additions & 0 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
ErrMainThreadCreation = errors.New("error creating the main thread")
ErrScriptExecution = errors.New("error during PHP script execution")
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
ErrNotHTTPWorker = errors.New("worker is not an HTTP worker")

ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest}
ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest}
Expand Down Expand Up @@ -399,6 +400,9 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error

// Detect if a worker is available to handle this request
if fc.worker != nil {
if !fc.worker.httpEnabled {
return ErrNotHTTPWorker
}
return fc.worker.handleRequest(ch)
}

Expand Down
2 changes: 1 addition & 1 deletion frankenphp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bool frankenphp_new_php_thread(uintptr_t thread_index);

bool frankenphp_shutdown_dummy_request(void);
int frankenphp_execute_script(char *file_name);
void frankenphp_update_local_thread_context(bool is_worker);
void frankenphp_update_local_thread_context(bool is_worker, bool httpEnabled);

int frankenphp_execute_script_cli(char *script, int argc, char **argv,
bool eval);
Expand Down
2 changes: 2 additions & 0 deletions frankenphp.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

function frankenphp_handle_request(callable $callback): bool {}

function frankenphp_send_request(mixed $message, string $workerName = ""): bool {}

function headers_send(int $status = 200): int {}

function frankenphp_finish_request(): bool {}
Expand Down
7 changes: 7 additions & 0 deletions frankenphp_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1,
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_send_request, 0, 1, IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, message, IS_MIXED, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "\"\"")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200")
ZEND_END_ARG_INFO()
Expand Down Expand Up @@ -37,6 +42,7 @@ ZEND_END_ARG_INFO()


ZEND_FUNCTION(frankenphp_handle_request);
ZEND_FUNCTION(frankenphp_send_request);
ZEND_FUNCTION(headers_send);
ZEND_FUNCTION(frankenphp_finish_request);
ZEND_FUNCTION(frankenphp_request_headers);
Expand All @@ -46,6 +52,7 @@ ZEND_FUNCTION(mercure_publish);

static const zend_function_entry ext_functions[] = {
ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request)
ZEND_FE(frankenphp_send_request, arginfo_frankenphp_send_request)
ZEND_FE(headers_send, arginfo_headers_send)
ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request)
ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request)
Expand Down
23 changes: 23 additions & 0 deletions frankenphp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,29 @@ func TestExecuteCLICode(t *testing.T) {
assert.Equal(t, stdoutStderrStr, `Hello World`)
}

func TestFrankenSendRequest(t *testing.T) {
var buf bytes.Buffer
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := slog.New(handler)
cwd, _ := os.Getwd()
workerFile := cwd + "/testdata/request-receiver.php"

runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) {
body, _ := testGet("http://example.com/request-sender.php?message=hi-from-go", handler, t)
assert.Equal(t, "request sent", body)
}, &testOptions{
logger: logger,
initOpts: []frankenphp.Option{frankenphp.WithWorkers(
"workerName",
workerFile,
1,
frankenphp.WithWorkerHTTPDisabled(true),
)},
})

assert.Contains(t, buf.String(), "hi-from-go")
}

func ExampleServeHTTP() {
if err := frankenphp.Init(); err != nil {
panic(err)
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type workerOpt struct {
onThreadShutdown func(int)
onServerStartup func()
onServerShutdown func()
httpDisabled bool
}

// WithContext sets the main context to use.
Expand Down Expand Up @@ -234,6 +235,15 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption {
}
}

// AsHTTPWorker determines if the worker will handle HTTP requests (true by default).
func WithWorkerHTTPDisabled(isDisabled bool) WorkerOption {
return func(w *workerOpt) error {
w.httpDisabled = isDisabled

return nil
}
}

func withExtensionWorkers(w *extensionWorkers) WorkerOption {
return func(wo *workerOpt) error {
wo.extensionWorkers = w
Expand Down
4 changes: 2 additions & 2 deletions phpthread.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s + "\x00")
}

func (*phpThread) updateContext(isWorker bool) {
C.frankenphp_update_local_thread_context(C.bool(isWorker))
func (*phpThread) updateContext(isWorker bool, httpEnabled bool) {
C.frankenphp_update_local_thread_context(C.bool(isWorker), C.bool(httpEnabled))
}

//export go_frankenphp_before_script_execution
Expand Down
7 changes: 7 additions & 0 deletions testdata/request-receiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

while (frankenphp_handle_request(function ($message) {
echo $message;
})) {
// keep handling requests
}
12 changes: 12 additions & 0 deletions testdata/request-sender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

$message = $_GET["message"];
$workerName = $_GET["workerName"] ?? '';

frankenphp_send_request($message, $workerName);

// sleep to make sure request was received
// TODO: solve this test-restart race condition with Futures instead?
usleep(10_000);

echo "request sent";
2 changes: 1 addition & 1 deletion threadregular.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (handler *regularThread) beforeScriptExecution() string {
return handler.thread.transitionToNewHandler()

case state.TransitionComplete:
handler.thread.updateContext(false)
handler.thread.updateContext(false, true)
handler.state.Set(state.Ready)

return handler.waitForRequest()
Expand Down
11 changes: 9 additions & 2 deletions threadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (handler *workerThread) beforeScriptExecution() string {
handler.state.WaitFor(state.Ready, state.ShuttingDown)
return handler.beforeScriptExecution()
case state.Ready, state.TransitionComplete:
handler.thread.updateContext(true)
handler.thread.updateContext(true, handler.worker.httpEnabled)
if handler.worker.onThreadReady != nil {
handler.worker.onThreadReady(handler.thread.threadIndex)
}
Expand Down Expand Up @@ -125,6 +125,13 @@ func setupWorkerScript(handler *workerThread, worker *worker) {
if globalLogger.Enabled(ctx, slog.LevelDebug) {
globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
}

// non-http worker: instantly gets marked as ready
if !worker.httpEnabled {
metrics.ReadyWorker(handler.worker.name)
handler.thread.state.Set(state.Ready)
handler.isBootingScript = false
}
}

func tearDownWorkerScript(handler *workerThread, exitStatus int) {
Expand Down Expand Up @@ -298,7 +305,7 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval
thread.handler.(*workerThread).workerFrankenPHPContext = nil
thread.handler.(*workerThread).workerContext = nil

if globalLogger.Enabled(ctx, slog.LevelDebug) {
if globalLogger.Enabled(ctx, slog.LevelDebug) && thread.handler.(*workerThread).worker.httpEnabled {
if fc.request == nil {
fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex))
} else {
Expand Down
55 changes: 55 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package frankenphp
// #include "frankenphp.h"
import "C"
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -32,6 +34,7 @@ type worker struct {
onThreadReady func(int)
onThreadShutdown func(int)
queuedRequests atomic.Int32
httpEnabled bool
}

var (
Expand Down Expand Up @@ -154,6 +157,7 @@ func newWorker(o workerOpt) (*worker, error) {
maxConsecutiveFailures: o.maxConsecutiveFailures,
onThreadReady: o.onThreadReady,
onThreadShutdown: o.onThreadShutdown,
httpEnabled: !o.httpDisabled,
}

w.requestOptions = append(
Expand Down Expand Up @@ -322,3 +326,54 @@ func (worker *worker) handleRequest(ch contextHolder) error {
}
}
}

//export go_frankenphp_send_request
func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.char, nameLen C.size_t) *C.char {
var w *worker
if nameLen != 0 {
w = getWorkerByName(C.GoStringN(name, C.int(nameLen)))
} else {
for _, worker := range workers {
if !worker.httpEnabled {
w = worker
break
}
}
}

if w == nil {
return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen)))
}

if w.httpEnabled {
return phpThreads[threadIndex].pinCString("Cannot call frankenphp_send_request() on a HTTP worker: " + C.GoStringN(name, C.int(nameLen)))
}

message, err := goValue[any](zv)
if err != nil {
return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error())
}

fc := newFrankenPHPContext()
fc.logger = globalLogger
fc.worker = w
fc.responseWriter = nil
fc.handlerParameters = message

ctx := context.WithValue(context.Background(), contextKey, fc)

go func() {
err := w.handleRequest(contextHolder{ctx, fc})
if err != nil && globalLogger.Enabled(globalCtx, slog.LevelError) {
globalLogger.LogAttrs(
globalCtx,
slog.LevelError,
"error while handling non-http message",
slog.String("error", err.Error()),
slog.String("worker", w.name),
)
}
}()

return nil
}
Loading