Skip to content

Commit 16f8e7e

Browse files
committed
Refactor Fiber async event handling and improve callback ownership management
1 parent 226130c commit 16f8e7e

File tree

1 file changed

+94
-30
lines changed

1 file changed

+94
-30
lines changed

Zend/zend_fibers.c

Lines changed: 94 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -715,14 +715,15 @@ static zend_always_inline zend_fiber_transfer zend_fiber_suspend_internal(zend_f
715715
return zend_fiber_switch_to(caller, value, false);
716716
}
717717

718-
/* {{{ libuv_add_callback */
718+
/////////////////////////////////////////////////////////////////////////////
719+
/// Async Event Integration
720+
/////////////////////////////////////////////////////////////////////////////
721+
719722
static bool zend_fiber_async_event_add_callback(zend_async_event_t *event, zend_async_event_callback_t *callback)
720723
{
721724
return zend_async_callbacks_push(event, callback);
722725
}
723726

724-
/* }}} */
725-
726727
/* {{{ libuv_remove_callback */
727728
static bool zend_fiber_async_event_del_callback(zend_async_event_t *event, zend_async_event_callback_t *callback)
728729
{
@@ -1009,26 +1010,34 @@ static void coroutine_entry_point(void)
10091010
zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE;
10101011

10111012
if (!ZEND_COROUTINE_IS_FIBER(coroutine)) {
1012-
// throw error
10131013
zend_throw_error(zend_ce_fiber_error, "Coroutine entry point called for non-fiber coroutine");
10141014
return;
10151015
}
10161016

10171017
zend_fiber *fiber = coroutine->extended_data;
1018-
if (UNEXPECTED(fiber == NULL)) {
1019-
zend_throw_error(zend_ce_fiber_error, "Fiber coroutine has no associated fiber");
1018+
1019+
if (UNEXPECTED(fiber == NULL || fiber->fcall == NULL)) {
1020+
zend_throw_error(zend_ce_fiber_error, "Fiber coroutine has no callback function");
10201021
return;
10211022
}
10221023

1024+
/*
1025+
* SHARE OWNERSHIP of fcall from fiber.
1026+
* Fiber may be destroyed during execution,
1027+
* so we transfer fcall ownership to a local variable.
1028+
*/
1029+
zend_fcall_t *fcall = fiber->fcall;
1030+
10231031
bool is_bailout = false;
10241032
zend_object **exception_ptr = &EG(exception);
10251033
zend_object **prev_exception_ptr = &EG(prev_exception);
10261034
zend_object *exception = NULL;
10271035

10281036
zend_try
10291037
{
1030-
fiber->fci.retval = &fiber->result;
1031-
zend_call_function(&fiber->fci, &fiber->fci_cache);
1038+
// Result is saved in coroutine->result
1039+
fcall->fci.retval = &coroutine->result;
1040+
zend_call_function(&fcall->fci, &fcall->fci_cache);
10321041
}
10331042
zend_catch
10341043
{
@@ -1038,8 +1047,9 @@ static void coroutine_entry_point(void)
10381047

10391048
zend_try
10401049
{
1050+
// Get fiber again (may have been destroyed)
10411051
fiber = coroutine->extended_data;
1042-
zend_async_event_t * yield_event_base = NULL;
1052+
zend_async_event_t *yield_event_base = NULL;
10431053

10441054
if (fiber && fiber->yield_event) {
10451055
yield_event_base = &fiber->yield_event->base;
@@ -1052,7 +1062,7 @@ static void coroutine_entry_point(void)
10521062
// So, we need to verify that such handlers exist.
10531063
//
10541064
// If there are no handlers,
1055-
// the exception will reach the coroutines default handler and continue further along the chain.
1065+
// the exception will reach the coroutine's default handler and continue further along the chain.
10561066
//
10571067
if (EXPECTED(yield_event_base && yield_event_base->callbacks.length > 0)) {
10581068
//
@@ -1077,10 +1087,10 @@ static void coroutine_entry_point(void)
10771087
}
10781088
}
10791089

1080-
// Mark second parameter of zend_async_callbacks_notify as ZVAL
1090+
// Notify with result from coroutine->result
10811091
ZEND_ASYNC_EVENT_CLR_EXCEPTION_HANDLED(yield_event_base);
10821092
ZEND_ASYNC_EVENT_SET_ZVAL_RESULT(yield_event_base);
1083-
ZEND_ASYNC_CALLBACKS_NOTIFY(yield_event_base, &fiber->result, exception);
1093+
ZEND_ASYNC_CALLBACKS_NOTIFY(yield_event_base, &coroutine->result, exception);
10841094
zend_async_callbacks_free(yield_event_base);
10851095

10861096
if (exception && ZEND_ASYNC_EVENT_IS_EXCEPTION_HANDLED(yield_event_base)) {
@@ -1089,7 +1099,6 @@ static void coroutine_entry_point(void)
10891099
} else if (exception) {
10901100
zend_throw_exception_internal(exception);
10911101
}
1092-
10931102
}
10941103
}
10951104
zend_catch
@@ -1098,14 +1107,22 @@ static void coroutine_entry_point(void)
10981107
}
10991108
zend_end_try();
11001109

1101-
if (fiber) {
1102-
if (is_bailout) {
1103-
fiber->flags |= ZEND_FIBER_FLAG_BAILOUT;
1110+
// Update fiber flags if it still exists
1111+
fiber = coroutine->extended_data;
1112+
if (fiber && is_bailout) {
1113+
fiber->flags |= ZEND_FIBER_FLAG_BAILOUT;
1114+
}
1115+
1116+
// Cleanup fcall (Except in the case when the Fiber itself has already cleaned up this structure)
1117+
if (EXPECTED(fcall && !(fiber && fiber->fcall == NULL))) {
1118+
1119+
if (fiber) {
1120+
fiber->fcall = NULL;
11041121
}
11051122

1106-
/* Cleanup callback and unset field to prevent GC / duplicate dtor issues. */
1107-
zval_ptr_dtor(&fiber->fci.function_name);
1108-
ZVAL_UNDEF(&fiber->fci.function_name);
1123+
zval_ptr_dtor(&fcall->fci.function_name);
1124+
ZVAL_UNDEF(&fcall->fci.function_name);
1125+
efree(fcall);
11091126
}
11101127

11111128
if (is_bailout) {
@@ -1253,6 +1270,18 @@ static void zend_fiber_object_destroy(zend_object *object)
12531270
fiber->coroutine = NULL;
12541271
coroutine->extended_data = NULL;
12551272

1273+
/*
1274+
* If fcall is not NULL, ownership was NOT taken by
1275+
* coroutine_entry_point (coroutine did not start execution).
1276+
* In this case fiber must free fcall.
1277+
*/
1278+
if (fiber->fcall != NULL) {
1279+
zend_fcall_t *fcall = fiber->fcall;
1280+
fiber->fcall = NULL;
1281+
zval_ptr_dtor(&fcall->fci.function_name);
1282+
efree(fcall);
1283+
}
1284+
12561285
if (ZEND_COROUTINE_IS_FINISHED(coroutine)) {
12571286
ZEND_ASYNC_EVENT_RELEASE(&coroutine->event);
12581287
return;
@@ -1307,8 +1336,10 @@ static void zend_fiber_object_free(zend_object *object)
13071336
{
13081337
zend_fiber *fiber = (zend_fiber *) object;
13091338

1310-
zval_ptr_dtor(&fiber->fci.function_name);
1311-
zval_ptr_dtor(&fiber->result);
1339+
if (fiber->coroutine == NULL) {
1340+
zval_ptr_dtor(&fiber->fci.function_name);
1341+
zval_ptr_dtor(&fiber->result);
1342+
}
13121343

13131344
zend_object_std_dtor(&fiber->std);
13141345
}
@@ -1318,8 +1349,18 @@ static HashTable *zend_fiber_object_gc(zend_object *object, zval **table, int *n
13181349
zend_fiber *fiber = (zend_fiber *) object;
13191350
zend_get_gc_buffer *buf = zend_get_gc_buffer_create();
13201351

1321-
zend_get_gc_buffer_add_zval(buf, &fiber->fci.function_name);
1322-
zend_get_gc_buffer_add_zval(buf, &fiber->result);
1352+
/*
1353+
* For coroutine path add fcall to GC (if ownership not taken).
1354+
* For non-coroutine path add fci and result to GC.
1355+
*/
1356+
if (fiber->coroutine != NULL && fiber->fcall != NULL) {
1357+
zend_get_gc_buffer_add_zval(buf, &fiber->fcall->fci.function_name);
1358+
zend_get_gc_buffer_add_zval(buf, &fiber->coroutine->result);
1359+
/* result in coroutine->result - coroutine's GC handles it */
1360+
} else {
1361+
zend_get_gc_buffer_add_zval(buf, &fiber->fci.function_name);
1362+
zend_get_gc_buffer_add_zval(buf, &fiber->result);
1363+
}
13231364

13241365
if (fiber->context.status != ZEND_FIBER_STATUS_SUSPENDED || fiber->caller != NULL) {
13251366
zend_get_gc_buffer_use(buf, table, num);
@@ -1384,19 +1425,41 @@ ZEND_METHOD(Fiber, __construct)
13841425
RETURN_THROWS();
13851426
}
13861427

1387-
fiber->fci = fci;
1388-
fiber->fci_cache = fcc;
1428+
if (EXPECTED(fiber->coroutine != NULL)) {
1429+
/* Coroutine path: allocate fcall and store in fiber */
1430+
fiber->fcall = ecalloc(1, sizeof(zend_fcall_t));
1431+
fiber->fcall->fci = fci;
1432+
fiber->fcall->fci_cache = fcc;
13891433

1390-
// Keep a reference to closures or callable objects while the fiber is running.
1391-
Z_TRY_ADDREF(fiber->fci.function_name);
1434+
/* Keep a reference to closures or callable objects */
1435+
Z_TRY_ADDREF(fiber->fcall->fci.function_name);
1436+
} else {
1437+
/* Non-coroutine path: store directly in fiber */
1438+
fiber->fci = fci;
1439+
fiber->fci_cache = fcc;
1440+
1441+
/* Keep a reference to closures or callable objects */
1442+
Z_TRY_ADDREF(fiber->fci.function_name);
1443+
}
13921444
}
13931445

13941446
ZEND_METHOD(Fiber, start)
13951447
{
13961448
zend_fiber *fiber = (zend_fiber *) Z_OBJ_P(ZEND_THIS);
13971449

13981450
ZEND_PARSE_PARAMETERS_START(0, -1)
1399-
Z_PARAM_VARIADIC_WITH_NAMED(fiber->fci.params, fiber->fci.param_count, fiber->fci.named_params);
1451+
if (EXPECTED(fiber->coroutine != NULL)) {
1452+
/* Coroutine path: parameters go into fiber->fcall */
1453+
ZEND_ASSERT(fiber->fcall != NULL && "Fiber fcall must exist after constructor");
1454+
Z_PARAM_VARIADIC_WITH_NAMED(
1455+
fiber->fcall->fci.params,
1456+
fiber->fcall->fci.param_count,
1457+
fiber->fcall->fci.named_params
1458+
);
1459+
} else {
1460+
/* Non-coroutine path: parameters go into fiber->fci */
1461+
Z_PARAM_VARIADIC_WITH_NAMED(fiber->fci.params, fiber->fci.param_count, fiber->fci.named_params);
1462+
}
14001463
ZEND_PARSE_PARAMETERS_END();
14011464

14021465
if (UNEXPECTED(zend_fiber_switch_blocked())) {
@@ -1640,8 +1703,9 @@ ZEND_METHOD(Fiber, getReturn)
16401703
RETURN_THROWS();
16411704
}
16421705

1643-
if (!Z_ISUNDEF_P(&fiber->result)) {
1644-
RETURN_COPY_DEREF(&fiber->result);
1706+
/* Result is stored in coroutine->result */
1707+
if (!Z_ISUNDEF_P(&fiber->coroutine->result)) {
1708+
RETURN_COPY_DEREF(&fiber->coroutine->result);
16451709
}
16461710

16471711
RETURN_NULL();

0 commit comments

Comments
 (0)