From 33031b9b1b508014d37ebe1790053c022b80aeb6 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 3 Feb 2026 12:50:22 +0100 Subject: [PATCH] Fix RequestCaptureProxy usage of coroutines --- .../dev/restate/client/kotlin/ingress.kt | 2 +- .../reflection/kotlin/RequestCaptureProxy.kt | 39 +++++-------------- .../common/reflection/kotlin/reflections.kt | 2 +- .../main/kotlin/dev/restate/sdk/kotlin/api.kt | 2 +- .../kotlinapi/reflections/ReflectionTest.kt | 18 +++++++++ .../core/kotlinapi/reflections/testClasses.kt | 12 ++++++ 6 files changed, 43 insertions(+), 32 deletions(-) diff --git a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt index 22275b59..cfd2b725 100644 --- a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt +++ b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt @@ -429,7 +429,7 @@ internal constructor( * @return a [KClientRequest] with the correct response type */ @Suppress("UNCHECKED_CAST") - fun request(block: suspend SVC.() -> Res): KClientRequest { + suspend fun request(block: suspend SVC.() -> Res): KClientRequest { return KClientRequestImpl( client, RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest(), diff --git a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt index d72295fa..c50ad3d1 100644 --- a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt +++ b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt @@ -10,10 +10,6 @@ package dev.restate.common.reflection.kotlin import dev.restate.common.reflections.ProxySupport import dev.restate.common.reflections.ReflectionUtils -import kotlin.coroutines.Continuation -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED -import kotlin.coroutines.startCoroutine /** * Captures method invocations on a proxy to extract invocation information. @@ -37,35 +33,20 @@ class RequestCaptureProxy(private val clazz: Class, private val * @param block the suspend lambda that invokes a method on the service proxy * @return the captured invocation information */ - fun capture(block: suspend SVC.() -> Any?): CapturedInvocation { - var capturedInvocation: CapturedInvocation? = null - + suspend fun capture(block: suspend SVC.() -> Any?): CapturedInvocation { val proxy = ProxySupport.createProxy(clazz) { invocation -> - capturedInvocation = invocation.captureInvocation(serviceName, key) - - // Return COROUTINE_SUSPENDED to prevent actual execution - COROUTINE_SUSPENDED - } - - // Invoke the block with the proxy to capture the method call. - // Since the proxy returns COROUTINE_SUSPENDED, we use startCoroutine - // which starts but doesn't block waiting for completion. - val capturingContinuation = - object : Continuation { - override val context = EmptyCoroutineContext - - override fun resumeWith(result: Result) { - // Do nothing - we're just capturing, the coroutine suspends immediately - } + throw invocation.captureInvocation(serviceName, key) } - val suspendBlock: suspend () -> Any? = { proxy.block() } - suspendBlock.startCoroutine(capturingContinuation) + try { + proxy.block() + } catch (e: CapturedInvocation) { + return e + } - return capturedInvocation - ?: error( - "Method invocation was not captured. Make sure to call ONLY a method of the service proxy." - ) + error( + "Method invocation was not captured. Make sure to call ONLY a method of the service proxy." + ) } } diff --git a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt index 60e65bda..0b30577d 100644 --- a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt +++ b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt @@ -36,7 +36,7 @@ data class CapturedInvocation( val inputTypeTag: TypeTag<*>, val outputTypeTag: TypeTag<*>, val input: Any?, -) { +) : RuntimeException("CapturedInvocation message should not be used", null, false, false) { @Suppress("UNCHECKED_CAST") fun toRequest(): Request<*, *> { return Request.of(target, inputTypeTag as TypeTag, outputTypeTag as TypeTag, input) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index 4373f62a..45b8830f 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -1293,7 +1293,7 @@ internal constructor( * @return a [KRequest] with the correct response type */ @Suppress("UNCHECKED_CAST") - fun request(block: suspend SVC.() -> Res): KRequest { + suspend fun request(block: suspend SVC.() -> Res): KRequest { return KRequestImpl( RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest() ) diff --git a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/ReflectionTest.kt b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/ReflectionTest.kt index 7946a864..e85b4908 100644 --- a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/ReflectionTest.kt +++ b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/ReflectionTest.kt @@ -220,6 +220,24 @@ class ReflectionTest : TestDefinitions.TestSuite { outputCmd(), END_MESSAGE, ), + testInvocation({ CornerCases() }, "callSuspendWithinProxy") + .withInput(startMessage(1, "mykey"), inputCmd()) + .onlyBidiStream() + .expectingOutput( + oneWayCallCmd( + 1, + Target.virtualObject( + "CornerCases", + "mykey", + "callSuspendWithinProxy", + ), + null, + null, + Slice.EMPTY, + ), + outputCmd(), + END_MESSAGE, + ), testInvocation({ CustomSerdeService() }, "echo") .withInput(startMessage(1), inputCmd(byteArrayOf(1))) .onlyBidiStream() diff --git a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/testClasses.kt b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/testClasses.kt index 8fa6b59f..4f08348c 100644 --- a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/testClasses.kt +++ b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/reflections/testClasses.kt @@ -15,6 +15,7 @@ import dev.restate.serde.SerdeFactory import dev.restate.serde.TypeRef import dev.restate.serde.TypeTag import dev.restate.serde.kotlinx.KotlinSerializationSerdeFactory +import kotlinx.coroutines.delay import kotlinx.serialization.Serializable @Service @@ -112,6 +113,17 @@ open class CornerCases { open suspend fun badReturnTypeInferred(): Unit { toVirtualObject(objectKey()).request { badReturnTypeInferred() }.send() } + + @Exclusive + open suspend fun callSuspendWithinProxy() { + toVirtualObject(objectKey()) + .request { + // Doing a suspend call within the proxy + delay(1) + callSuspendWithinProxy() + } + .send() + } } @Service