Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dependencies {
implementation(project(":sdk-api-kotlin"))
implementation(project(":sdk-serde-jackson"))

implementation(libs.jackson.jsr310)
implementation(libs.jackson.parameter.names)

implementation(libs.kotlinx.coroutines.core)
Expand Down
66 changes: 66 additions & 0 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import kotlin.coroutines.Continuation
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.startCoroutine
import kotlin.random.Random
import kotlin.time.Clock
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.Instant
import kotlinx.coroutines.currentCoroutineContext

/**
Expand Down Expand Up @@ -211,6 +214,23 @@ sealed interface Context {
* @return the [Random] instance.
*/
fun random(): RestateRandom

/**
* Returns the current time as a deterministic [Instant].
*
* <p>This method returns the current timestamp in a way that is consistent across replays. The
* time is captured using [Context.runBlock], ensuring that the same value is returned during
* replay as was returned during the original execution.
*
* @return the recorded [Instant]
* @see Clock.System.now
*/
@ExperimentalTime
suspend fun instantNow(): Instant {
return runBlock(name = "Clock.System.now()", typeTag = typeTag<Instant>()) {
Clock.System.now()
}
}
}

/**
Expand Down Expand Up @@ -797,6 +817,52 @@ suspend fun random(): RestateRandom {
return context().random()
}

/**
* Get [RestateClock], that deterministically records the time.
*
* @see RestateClock.now
*/
@ExperimentalTime
@org.jetbrains.annotations.ApiStatus.Experimental
fun clock(): RestateClock {
return RestateClockImpl
}

@ExperimentalTime
@org.jetbrains.annotations.ApiStatus.Experimental
interface RestateClock {
/**
* Returns the current time as a deterministic [Instant].
*
* <p>This method returns the current timestamp in a way that is consistent across replays. The
* time is captured using [runBlock], ensuring that the same value is returned during replay as
* was returned during the original execution.
*
* @return the recorded [Instant]
* @throws IllegalStateException if called outside a Restate handler
* @see Clock.System.now
*/
suspend fun now(): Instant
}

@ExperimentalTime
@org.jetbrains.annotations.ApiStatus.Experimental
private object RestateClockImpl : RestateClock {
override suspend fun now(): Instant {
return context().instantNow()
}
}

/**
* Get [RestateClock], that deterministically records the time.
*
* @see RestateClock.now
*/
@ExperimentalTime
@get:org.jetbrains.annotations.ApiStatus.Experimental
val Clock.Companion.Restate: RestateClock
get() = clock()

/**
* Causes the current execution of the function invocation to sleep for the given duration.
*
Expand Down
15 changes: 15 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import dev.restate.serde.Serde;
import dev.restate.serde.TypeTag;
import java.time.Duration;
import java.time.Instant;

/**
* This interface exposes the Restate functionalities to Restate services. It can be used to
Expand Down Expand Up @@ -484,6 +485,20 @@ default <T> Awakeable<T> awakeable(Class<T> clazz) {
*/
RestateRandom random();

/**
* Returns the current time as a deterministic {@link Instant}.
*
* <p>This method returns the current timestamp in a way that is consistent across replays. The
* time is captured using {@link Context#run}, ensuring that the same value is returned during
* replay as was returned during the original execution.
*
* @return the recorded {@link Instant}
* @see Instant#now()
*/
default Instant instantNow() {
return run("Instant.now()", Instant.class, Instant::now);
}

/**
* @return the current context
* @throws NullPointerException if called outside a Restate Handler
Expand Down
16 changes: 16 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/Restate.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import dev.restate.serde.Serde;
import dev.restate.serde.TypeTag;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import org.jspecify.annotations.NonNull;
Expand Down Expand Up @@ -98,6 +99,21 @@ public static RestateRandom random() {
return Context.current().random();
}

/**
* Returns the current time as a deterministic {@link Instant}.
*
* <p>This method returns the current timestamp in a way that is consistent across replays. The
* time is captured using {@link Restate#run}, ensuring that the same value is returned during
* replay as was returned during the original execution.
*
* @return the recorded {@link Instant}
* @see Instant#now()
*/
@org.jetbrains.annotations.ApiStatus.Experimental
public static Instant instantNow() {
return Context.current().instantNow();
}

/**
* @see Context#invocationHandle(String, TypeTag)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import static dev.restate.sdk.core.statemachine.ProtoUtils.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.STRING;
import static org.assertj.core.api.InstanceOfAssertFactories.type;

import com.google.protobuf.ByteString;
import dev.restate.sdk.common.RetryPolicy;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.core.generated.protocol.Protocol;
Expand Down Expand Up @@ -43,6 +45,10 @@ protected abstract TestInvocationBuilder awaitAllSideEffectWithSecondFailing(
protected abstract TestInvocationBuilder failingSideEffectWithRetryPolicy(
String reason, RetryPolicy retryPolicy);

protected abstract TestInvocationBuilder instantNow();

protected abstract void assertIsInstant(ByteString bytes);

@Override
public Stream<TestDefinitions.TestDefinition> definitions() {
return Stream.of(
Expand Down Expand Up @@ -301,6 +307,26 @@ public Stream<TestDefinitions.TestDefinition> definitions() {
.onlyBidiStream()
.assertingOutput(
actualOutputMessages ->
assertThat(actualOutputMessages).element(2).isEqualTo(suspensionMessage(1))));
assertThat(actualOutputMessages).element(2).isEqualTo(suspensionMessage(1))),
this.instantNow()
.withInput(startMessage(1), inputCmd())
.onlyBidiStream()
.assertingOutput(
msgs ->
assertThat(msgs)
.satisfiesExactly(
msg ->
assertThat(msg)
.asInstanceOf(type(Protocol.RunCommandMessage.class))
.returns(1, Protocol.RunCommandMessage::getResultCompletionId),
msg ->
assertThat(msg)
.asInstanceOf(type(Protocol.ProposeRunCompletionMessage.class))
.returns(
1,
Protocol.ProposeRunCompletionMessage::getResultCompletionId)
.extracting(Protocol.ProposeRunCompletionMessage::getValue)
.satisfies(this::assertIsInstant),
msg -> assertThat(msg).isEqualTo(suspensionMessage(1)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@
package dev.restate.sdk.core.javaapi;

import static dev.restate.sdk.core.javaapi.JavaAPITests.testDefinitionForService;
import static org.assertj.core.api.Assertions.assertThat;

import com.google.protobuf.ByteString;
import dev.restate.common.Slice;
import dev.restate.sdk.DurableFuture;
import dev.restate.sdk.Restate;
import dev.restate.sdk.common.RetryPolicy;
import dev.restate.sdk.core.SideEffectTestSuite;
import dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder;
import dev.restate.sdk.core.TestSerdes;
import dev.restate.serde.Serde;
import dev.restate.serde.jackson.JacksonSerdeFactory;
import java.time.Instant;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -159,4 +165,25 @@ protected TestInvocationBuilder failingSideEffectWithRetryPolicy(
return null;
});
}

@Override
protected TestInvocationBuilder instantNow() {
return testDefinitionForService(
"InstantNow",
Serde.VOID,
TestSerdes.STRING,
(ctx, unused) -> {
var instant = Restate.instantNow();
return null;
});
}

@Override
protected void assertIsInstant(ByteString bytes) {
Instant instant =
JacksonSerdeFactory.DEFAULT
.create(Instant.class)
.deserialize(Slice.wrap(bytes.asReadOnlyByteBuffer()));
assertThat(instant).isNotNull().isBefore(Instant.now());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core.kotlinapi

import com.google.protobuf.ByteString
import dev.restate.common.Slice
import dev.restate.sdk.Restate
import dev.restate.sdk.common.RetryPolicy
import dev.restate.sdk.core.SideEffectTestSuite
import dev.restate.sdk.core.TestDefinitions
Expand All @@ -18,15 +21,19 @@ import dev.restate.sdk.endpoint.definition.HandlerType
import dev.restate.sdk.endpoint.definition.ServiceDefinition
import dev.restate.sdk.endpoint.definition.ServiceType
import dev.restate.sdk.kotlin.*
import dev.restate.sdk.kotlin.awaitAll
import dev.restate.sdk.kotlin.runAsync
import dev.restate.sdk.kotlin.runBlock
import dev.restate.serde.kotlinx.*
import dev.restate.serde.kotlinx.KotlinSerializationSerdeFactory
import dev.restate.serde.kotlinx.jsonSerde
import dev.restate.serde.kotlinx.typeTag
import java.util.*
import kotlin.coroutines.coroutineContext
import kotlin.time.Clock
import kotlin.time.ExperimentalTime
import kotlin.time.Instant
import kotlin.time.toJavaInstant
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import org.assertj.core.api.Assertions

class SideEffectTest : SideEffectTestSuite() {

Expand Down Expand Up @@ -128,4 +135,17 @@ class SideEffectTest : SideEffectTestSuite() {
throw IllegalStateException(reason)
}
}

@OptIn(ExperimentalTime::class)
override fun instantNow() =
testDefinitionForService<Unit, Instant>("InstantNow") { ctx, _: Unit -> Clock.Restate.now() }

@OptIn(ExperimentalTime::class)
override fun assertIsInstant(bytes: ByteString) {
val instant =
KotlinSerializationSerdeFactory()
.create(typeTag<Instant>())
.deserialize(Slice.wrap(bytes.asReadOnlyByteBuffer()))
Assertions.assertThat(instant.toJavaInstant()).isNotNull().isBefore(java.time.Instant.now())
}
}
1 change: 1 addition & 0 deletions sdk-serde-jackson/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation(project(":common"))

api(libs.jackson.databind)
implementation(libs.jackson.jsr310)
implementation(libs.jackson.core)

implementation(libs.victools.jsonschema.generator)
Expand Down