Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
95e3c51
Initial version
donald-pinckney Jan 14, 2026
569286a
remove discovery, update notes
donald-pinckney Jan 14, 2026
9d4ba4a
remove notes
donald-pinckney Jan 14, 2026
4688e95
Add initializeWorker in plugin interface.
donald-pinckney Jan 14, 2026
9007d0f
Re-do shutdownWorkerFactory design with chain
donald-pinckney Jan 14, 2026
6a1ae39
rename runWorkerFactory -> startWorkerFactory
donald-pinckney Jan 14, 2026
057dc14
Move plugins around
donald-pinckney Jan 14, 2026
fb2f2e6
add startWorker and shutdownWorker
donald-pinckney Jan 14, 2026
55482cb
rename again
donald-pinckney Jan 15, 2026
195c241
Improved SimplePlugin builder design
donald-pinckney Jan 15, 2026
26a2c0a
Remove default implementations, and add in startWorkerFactory and shu…
donald-pinckney Jan 15, 2026
9a666e0
Don't return the builders
donald-pinckney Jan 15, 2026
218c79b
Remove ServiceStubsSupplier and checked exception
donald-pinckney Jan 15, 2026
3550c55
Cleanup applyClientPluginConfiguration
donald-pinckney Jan 15, 2026
6aefa1a
array instead of list
donald-pinckney Jan 15, 2026
861712e
Require ClientPlugin in WorkflowClientOptions
donald-pinckney Jan 15, 2026
a5a1ceb
Seaparate out WorkflowClientPlugin and WorkflowServiceStubsPlugin
donald-pinckney Jan 16, 2026
0eb2ec8
Lift up to a generic ServiceStubsPlugin
donald-pinckney Jan 16, 2026
b4a09c5
Add ScheduleClientPlugin, and rename WorkflowClientPlugin.configureCl…
donald-pinckney Jan 16, 2026
b9264c6
Remove ServiceStubsPlugin super interface
donald-pinckney Jan 16, 2026
cf149a0
Document order of validation and plugin application
donald-pinckney Jan 16, 2026
8b4b73f
abstract SimplePlugin
donald-pinckney Jan 16, 2026
ac094ec
Add data converters, activities, workflows, Nexus services
donald-pinckney Jan 16, 2026
9760c33
bug fix
donald-pinckney Jan 16, 2026
44227e8
Add duplicate warnings
donald-pinckney Jan 16, 2026
f1d3642
cleanup tests some
donald-pinckney Jan 16, 2026
76d0482
first stab at replayer
donald-pinckney Jan 19, 2026
e8ed0c1
warn on duplicant *instances* not *types*
donald-pinckney Jan 20, 2026
450a282
remove configure/initialize for replay
donald-pinckney Jan 21, 2026
82f0934
revert WorkflowReplayer.java
donald-pinckney Jan 21, 2026
4bd410a
no default
donald-pinckney Jan 21, 2026
c036ec1
Run plugin configures *before* setting the plugins and the defaults.
donald-pinckney Jan 21, 2026
fe261ba
Remove checked exceptions and simplify handling of errors
donald-pinckney Jan 21, 2026
27749dd
Change types of the chains
donald-pinckney Jan 21, 2026
7480385
Added overloade with public WorkflowExecutionHistory
donald-pinckney Jan 21, 2026
19eab2f
simplify error handling in the case of shutdown, and document it.
donald-pinckney Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.temporal.internal.sync.StubMarker;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.*;
import java.lang.annotation.Annotation;
Expand All @@ -36,9 +37,13 @@
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClientInternal {

private static final Logger log = LoggerFactory.getLogger(WorkflowClientInternalImpl.class);

private final GenericWorkflowClient genericClient;
private final WorkflowClientOptions options;
private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory;
Expand All @@ -65,7 +70,22 @@ public static WorkflowClient newInstance(

WorkflowClientInternalImpl(
WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
// Extract WorkflowClientPlugins from service stubs plugins (propagation)
WorkflowClientPlugin[] propagatedPlugins =
extractClientPlugins(workflowServiceStubs.getOptions().getPlugins());

// Merge propagated plugins with client-specified plugins
WorkflowClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins());

// Apply plugin configuration phase (forward order) on user-provided options,
// so plugins see unmodified state before defaults and plugin merging
WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(options);
for (WorkflowClientPlugin plugin : mergedPlugins) {
plugin.configureWorkflowClient(builder);
}
// Set merged plugins after configuration, then validate
builder.setPlugins(mergedPlugins);
options = builder.validateAndBuildWithDefaults();
workflowServiceStubs =
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
this.options = options;
Expand Down Expand Up @@ -771,4 +791,56 @@ public NexusStartWorkflowResponse startNexus(
WorkflowInvocationHandler.closeAsyncInvocation();
}
}

/**
* Extracts WorkflowClientPlugins from service stubs plugins. Only plugins that also implement
* {@link WorkflowClientPlugin} are included. This enables plugin propagation from service stubs
* to workflow client.
*/
private static WorkflowClientPlugin[] extractClientPlugins(
WorkflowServiceStubsPlugin[] stubsPlugins) {
if (stubsPlugins == null || stubsPlugins.length == 0) {
return new WorkflowClientPlugin[0];
}
List<WorkflowClientPlugin> clientPlugins = new ArrayList<>();
for (WorkflowServiceStubsPlugin plugin : stubsPlugins) {
if (plugin instanceof WorkflowClientPlugin) {
clientPlugins.add((WorkflowClientPlugin) plugin);
}
}
return clientPlugins.toArray(new WorkflowClientPlugin[0]);
}

/**
* Merges propagated plugins with explicitly specified plugins. Propagated plugins come first
* (from service stubs), followed by client-specific plugins.
*/
private static WorkflowClientPlugin[] mergePlugins(
WorkflowClientPlugin[] propagated, WorkflowClientPlugin[] explicit) {
boolean propagatedEmpty = propagated == null || propagated.length == 0;
boolean explicitEmpty = explicit == null || explicit.length == 0;
if (propagatedEmpty && explicitEmpty) {
return new WorkflowClientPlugin[0];
}
if (propagatedEmpty) {
return explicit;
}
if (explicitEmpty) {
return propagated;
}
// Warn about duplicate plugin instances (same object in both lists)
Set<WorkflowClientPlugin> propagatedSet = new HashSet<>(Arrays.asList(propagated));
for (WorkflowClientPlugin p : explicit) {
if (propagatedSet.contains(p)) {
log.warn(
"Plugin instance {} is present in both propagated plugins (from service stubs) and "
+ "explicit plugins. It will run twice which may not be the intended behavior.",
p.getName());
}
}
WorkflowClientPlugin[] merged = new WorkflowClientPlugin[propagated.length + explicit.length];
System.arraycopy(propagated, 0, merged, 0, propagated.length);
System.arraycopy(explicit, 0, merged, propagated.length, explicit.length);
return merged;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.client;

import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
Expand Down Expand Up @@ -47,6 +48,7 @@ public static final class Builder {
private String binaryChecksum;
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private WorkflowClientPlugin[] plugins;

private Builder() {}

Expand All @@ -61,6 +63,7 @@ private Builder(WorkflowClientOptions options) {
binaryChecksum = options.binaryChecksum;
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -132,6 +135,24 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition
return this;
}

/**
* Sets the workflow client plugins to use with this client. Plugins can modify client
* configuration.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated to workers created from this client.
*
* @param plugins the workflow client plugins to use
* @return this builder for chaining
* @see WorkflowClientPlugin
* @see io.temporal.worker.WorkerPlugin
*/
@Experimental
public Builder setPlugins(WorkflowClientPlugin... plugins) {
this.plugins = Objects.requireNonNull(plugins);
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -140,9 +161,21 @@ public WorkflowClientOptions build() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
}

/**
* Validates options and builds with defaults applied.
*
* <p>Note: If plugins are configured via {@link #setPlugins(WorkflowClientPlugin...)}, they
* will have an opportunity to modify options after this method is called, when the options are
* passed to {@link WorkflowClient#newInstance}. This means validation performed here occurs
* before plugin modifications. In most cases, users should simply call {@link #build()} and let
* the client creation handle validation.
*
* @return validated options with defaults applied
*/
public WorkflowClientOptions validateAndBuildWithDefaults() {
String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity;
return new WorkflowClientOptions(
Expand All @@ -154,7 +187,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition);
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
}
}

Expand All @@ -163,6 +197,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS = Collections.emptyList();

private static final WorkflowClientPlugin[] EMPTY_PLUGINS = new WorkflowClientPlugin[0];

private final String namespace;

private final DataConverter dataConverter;
Expand All @@ -177,21 +213,25 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final QueryRejectCondition queryRejectCondition;

private final WorkflowClientPlugin[] plugins;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
WorkflowClientInterceptor[] interceptors,
String identity,
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition) {
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
}

/**
Expand Down Expand Up @@ -236,6 +276,19 @@ public QueryRejectCondition getQueryRejectCondition() {
return queryRejectCondition;
}

/**
* Returns the workflow client plugins configured for this client.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated to workers created from this client.
*
* @return the array of workflow client plugins, never null
*/
@Experimental
public WorkflowClientPlugin[] getPlugins() {
return plugins;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -256,6 +309,8 @@ public String toString() {
+ contextPropagators
+ ", queryRejectCondition="
+ queryRejectCondition
+ ", plugins="
+ Arrays.toString(plugins)
+ '}';
}

Expand All @@ -270,7 +325,8 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition;
&& queryRejectCondition == that.queryRejectCondition
&& Arrays.equals(plugins, that.plugins);
}

@Override
Expand All @@ -282,6 +338,7 @@ public int hashCode() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
Arrays.hashCode(plugins));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.temporal.common.Experimental;
import io.temporal.common.SimplePlugin;
import javax.annotation.Nonnull;

/**
* Plugin interface for customizing Temporal workflow client configuration.
*
* <p>This interface is separate from {@link
* io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin} to allow plugins that only
* need to configure the workflow client without affecting the underlying gRPC connection.
*
* <p>Plugins that implement both {@code ServiceStubsPlugin} and {@code WorkflowClientPlugin} will
* have their service stubs configuration applied when creating the service stubs, and their client
* configuration applied when creating the workflow client.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated from the client to workers created from that client.
*
* <p>Example implementation:
*
* <pre>{@code
* public class LoggingPlugin extends SimplePlugin {
* public LoggingPlugin() {
* super("my-org.logging");
* }
*
* @Override
* public void configureClient(WorkflowClientOptions.Builder builder) {
* // Add custom interceptor
* builder.setInterceptors(new LoggingInterceptor());
* }
* }
* }</pre>
*
* @see io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin
* @see io.temporal.worker.WorkerPlugin
* @see SimplePlugin
*/
@Experimental
public interface WorkflowClientPlugin {

/**
* Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended
* format: "organization.plugin-name" (e.g., "io.temporal.tracing")
*
* @return fully qualified plugin name
*/
@Nonnull
String getName();

/**
* Allows the plugin to modify workflow client options before the client is created. Called during
* configuration phase in forward (registration) order.
*
* @param builder the options builder to modify
*/
void configureWorkflowClient(@Nonnull WorkflowClientOptions.Builder builder);
}
Loading
Loading