Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ internal constructor(
* @return a [KClientRequest] with the correct response type
*/
@Suppress("UNCHECKED_CAST")
fun <Res> request(block: suspend SVC.() -> Res): KClientRequest<Any?, Res> {
suspend fun <Res> request(block: suspend SVC.() -> Res): KClientRequest<Any?, Res> {
return KClientRequestImpl(
client,
RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ package dev.restate.common.reflection.kotlin

import dev.restate.common.reflections.ProxySupport
import dev.restate.common.reflections.ReflectionUtils
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.startCoroutine

/**
* Captures method invocations on a proxy to extract invocation information.
Expand All @@ -37,35 +33,20 @@ class RequestCaptureProxy<SVC : Any>(private val clazz: Class<SVC>, private val
* @param block the suspend lambda that invokes a method on the service proxy
* @return the captured invocation information
*/
fun capture(block: suspend SVC.() -> Any?): CapturedInvocation {
var capturedInvocation: CapturedInvocation? = null

suspend fun capture(block: suspend SVC.() -> Any?): CapturedInvocation {
val proxy =
ProxySupport.createProxy(clazz) { invocation ->
capturedInvocation = invocation.captureInvocation(serviceName, key)

// Return COROUTINE_SUSPENDED to prevent actual execution
COROUTINE_SUSPENDED
}

// Invoke the block with the proxy to capture the method call.
// Since the proxy returns COROUTINE_SUSPENDED, we use startCoroutine
// which starts but doesn't block waiting for completion.
val capturingContinuation =
object : Continuation<Any?> {
override val context = EmptyCoroutineContext

override fun resumeWith(result: Result<Any?>) {
// Do nothing - we're just capturing, the coroutine suspends immediately
}
throw invocation.captureInvocation(serviceName, key)
}

val suspendBlock: suspend () -> Any? = { proxy.block() }
suspendBlock.startCoroutine(capturingContinuation)
try {
proxy.block()
} catch (e: CapturedInvocation) {
return e
}

return capturedInvocation
?: error(
"Method invocation was not captured. Make sure to call ONLY a method of the service proxy."
)
error(
"Method invocation was not captured. Make sure to call ONLY a method of the service proxy."
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ data class CapturedInvocation(
val inputTypeTag: TypeTag<*>,
val outputTypeTag: TypeTag<*>,
val input: Any?,
) {
) : RuntimeException("CapturedInvocation message should not be used", null, false, false) {
@Suppress("UNCHECKED_CAST")
fun toRequest(): Request<*, *> {
return Request.of(target, inputTypeTag as TypeTag<Any?>, outputTypeTag as TypeTag<Any?>, input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ internal constructor(
* @return a [KRequest] with the correct response type
*/
@Suppress("UNCHECKED_CAST")
fun <Res> request(block: suspend SVC.() -> Res): KRequest<Any?, Res> {
suspend fun <Res> request(block: suspend SVC.() -> Res): KRequest<Any?, Res> {
return KRequestImpl(
RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ class ReflectionTest : TestDefinitions.TestSuite {
outputCmd(),
END_MESSAGE,
),
testInvocation({ CornerCases() }, "callSuspendWithinProxy")
.withInput(startMessage(1, "mykey"), inputCmd())
.onlyBidiStream()
.expectingOutput(
oneWayCallCmd(
1,
Target.virtualObject(
"CornerCases",
"mykey",
"callSuspendWithinProxy",
),
null,
null,
Slice.EMPTY,
),
outputCmd(),
END_MESSAGE,
),
testInvocation({ CustomSerdeService() }, "echo")
.withInput(startMessage(1), inputCmd(byteArrayOf(1)))
.onlyBidiStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import dev.restate.serde.SerdeFactory
import dev.restate.serde.TypeRef
import dev.restate.serde.TypeTag
import dev.restate.serde.kotlinx.KotlinSerializationSerdeFactory
import kotlinx.coroutines.delay
import kotlinx.serialization.Serializable

@Service
Expand Down Expand Up @@ -112,6 +113,17 @@ open class CornerCases {
open suspend fun badReturnTypeInferred(): Unit {
toVirtualObject<CornerCases>(objectKey()).request { badReturnTypeInferred() }.send()
}

@Exclusive
open suspend fun callSuspendWithinProxy() {
toVirtualObject<CornerCases>(objectKey())
.request {
// Doing a suspend call within the proxy
delay(1)
callSuspendWithinProxy()
}
.send()
}
}

@Service
Expand Down