Skip to content

Commit b4e3886

Browse files
committed
#53: Add separate queue for resumed coroutines to improve event cleanup stability
The previous clean_events_for_resumed_coroutines algorithm was unstable due to complex tracking of circular buffer changes during coroutine additions and buffer reallocations. This commit introduces a dedicated resumed_coroutines queue that provides a more reliable approach. Changes: - Add resumed_coroutines circular buffer to module globals - Initialize resumed_coroutines queue in PHP_RINIT_FUNCTION - Track resumed coroutines in async_scheduler_coroutine_enqueue() and async_coroutine_resume() - Replace complex clean_events_for_resumed_coroutines() with simple process_resumed_coroutines() - Update scheduler_next_tick() and fiber_entry() to use new queue-based approach - Add proper cleanup in both engine_shutdown() and async_scheduler_dtor() to prevent memory leaks Benefits: - Eliminates race conditions from buffer reallocation tracking - Simplifies logic by checking queue contents instead of head pointer changes - Improves code readability and maintainability - Fixes memory leak of 8192 bytes in circular buffer allocation
1 parent fc5db30 commit b4e3886

File tree

5 files changed

+22
-35
lines changed

5 files changed

+22
-35
lines changed

async.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,7 @@ PHP_RINIT_FUNCTION(async) /* {{{ */
962962
ZEND_ASYNC_INITIALIZE;
963963
circular_buffer_ctor(&ASYNC_G(microtasks), 64, sizeof(zend_async_microtask_t *), &zend_std_allocator);
964964
circular_buffer_ctor(&ASYNC_G(coroutine_queue), 128, sizeof(zend_coroutine_t *), &zend_std_allocator);
965+
circular_buffer_ctor(&ASYNC_G(resumed_coroutines), 64, sizeof(zend_coroutine_t *), &zend_std_allocator);
965966
zend_hash_init(&ASYNC_G(coroutines), 128, NULL, NULL, 0);
966967

967968
ASYNC_G(reactor_started) = false;

async_API.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ static void engine_shutdown(void)
208208

209209
circular_buffer_dtor(&ASYNC_G(microtasks));
210210
circular_buffer_dtor(&ASYNC_G(coroutine_queue));
211+
circular_buffer_dtor(&ASYNC_G(resumed_coroutines));
211212
zend_hash_destroy(&ASYNC_G(coroutines));
212213

213214
if (ASYNC_G(root_context) != NULL) {

coroutine.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,9 @@ void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, con
700700
}
701701

702702
coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED;
703+
704+
// Add to resumed_coroutines queue for event cleanup
705+
circular_buffer_push(&ASYNC_G(resumed_coroutines), &coroutine, true);
703706
}
704707

705708
void async_coroutine_cancel(zend_coroutine_t *zend_coroutine,

php_async.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ ZEND_BEGIN_MODULE_GLOBALS(async)
7373
circular_buffer_t microtasks;
7474
/* Queue of coroutine_queue */
7575
circular_buffer_t coroutine_queue;
76+
/* Queue of resumed coroutines for event cleanup */
77+
circular_buffer_t resumed_coroutines;
7678
/* List of coroutines */
7779
HashTable coroutines;
7880
/* The transfer structure is used to return to the main execution context. */

scheduler.c

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -302,34 +302,19 @@ static zend_always_inline void return_to_main(zend_fiber_transfer *transfer)
302302
/// COROUTINE QUEUE MANAGEMENT
303303
///////////////////////////////////////////////////////////
304304

305-
static zend_always_inline void clean_events_for_resumed_coroutines(const circular_buffer_t *queue, const void *previous_data, size_t previous_count, size_t previous_head)
305+
static zend_always_inline void process_resumed_coroutines(void)
306306
{
307-
const size_t new_head = queue->head;
308-
const size_t mask = queue->capacity - 1;
309-
const size_t item_size = queue->item_size;
310-
size_t current;
311-
312-
// Check if reallocation occurred
313-
if (queue->data != previous_data) {
314-
// After reallocation: tail should be 0, old elements at [0, previous_count)
315-
ZEND_ASSERT(queue->tail == 0 && "After reallocation tail should be 0");
316-
current = previous_count;
317-
} else {
318-
// No reallocation: use original head
319-
current = previous_head;
320-
}
321-
322-
while (current != new_head) {
323-
zend_coroutine_t *coroutine = *(zend_coroutine_t**)((char*)queue->data + current * item_size);
307+
circular_buffer_t *resumed_queue = &ASYNC_G(resumed_coroutines);
308+
zend_coroutine_t *coroutine = NULL;
324309

325-
if (coroutine != NULL && coroutine->waker != NULL) {
310+
while (circular_buffer_pop_ptr(resumed_queue, (void**)&coroutine) == SUCCESS) {
311+
if (EXPECTED(coroutine != NULL && coroutine->waker != NULL)) {
326312
ZEND_ASYNC_WAKER_CLEAN_EVENTS(coroutine->waker);
327313
}
328-
329-
current = (current + 1) & mask;
330314
}
331315
}
332316

317+
333318
static zend_always_inline async_coroutine_t *next_coroutine(void)
334319
{
335320
async_coroutine_t *coroutine;
@@ -705,6 +690,7 @@ static void async_scheduler_dtor(void)
705690
OBJ_RELEASE(&async_coroutine->std);
706691

707692
zval_c_buffer_cleanup(&ASYNC_G(coroutine_queue));
693+
zval_c_buffer_cleanup(&ASYNC_G(resumed_coroutines));
708694
zval_c_buffer_cleanup(&ASYNC_G(microtasks));
709695

710696
zval *current;
@@ -1056,6 +1042,9 @@ void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine)
10561042
async_throw_error("Failed to enqueue coroutine");
10571043
} else {
10581044
coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED;
1045+
1046+
// Add to resumed_coroutines queue for event cleanup
1047+
circular_buffer_push_ptr_with_resize(&ASYNC_G(resumed_coroutines), coroutine);
10591048
}
10601049

10611050
//
@@ -1087,14 +1076,10 @@ static zend_always_inline void scheduler_next_tick(void)
10871076
ASYNC_G(last_reactor_tick) = current_time;
10881077
const circular_buffer_t * queue = &ASYNC_G(coroutine_queue);
10891078

1090-
const void *previous_data = queue->data;
1091-
const size_t previous_count = circular_buffer_count(queue);
1092-
const size_t previous_head = queue->head;
1093-
10941079
has_handles = ZEND_ASYNC_REACTOR_EXECUTE(circular_buffer_is_not_empty(queue));
10951080

1096-
if (previous_head != queue->head) {
1097-
clean_events_for_resumed_coroutines(queue, previous_data, previous_count, previous_head);
1081+
if (circular_buffer_is_not_empty(&ASYNC_G(resumed_coroutines))) {
1082+
process_resumed_coroutines();
10981083
}
10991084

11001085
TRY_HANDLE_SUSPEND_EXCEPTION();
@@ -1317,16 +1302,11 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer)
13171302
execute_microtasks();
13181303
TRY_HANDLE_EXCEPTION();
13191304

1320-
const void *previous_data = coroutine_queue->data;
1321-
const size_t previous_count = circular_buffer_count(coroutine_queue);
1322-
const size_t previous_head = coroutine_queue->head;
1323-
1324-
has_next_coroutine = previous_count > 0;
1325-
1305+
has_next_coroutine = circular_buffer_count(coroutine_queue) > 0;
13261306
has_handles = ZEND_ASYNC_REACTOR_EXECUTE(has_next_coroutine);
13271307

1328-
if (previous_head != coroutine_queue->head) {
1329-
clean_events_for_resumed_coroutines(coroutine_queue, previous_data, previous_count, previous_head);
1308+
if (circular_buffer_is_not_empty(&ASYNC_G(resumed_coroutines))) {
1309+
process_resumed_coroutines();
13301310
}
13311311

13321312
TRY_HANDLE_EXCEPTION();

0 commit comments

Comments
 (0)