From 22c7729e0eece7f382698420f9fd766d3985a69c Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Sat, 24 Jan 2026 12:15:39 -0800 Subject: [PATCH 01/13] feat(workflows): Make telemetry opt-in via WithOpenTelemetry() - Add WorkflowTelemetryOptions class with EnableSensitiveData property - Add WorkflowTelemetryContext to manage ActivitySource lifecycle - Add WithOpenTelemetry() extension method on WorkflowBuilder - Update all workflow components to use telemetry context: - WorkflowBuilder, Workflow, Executor - InProcessRunnerContext, InProcessRunner - LockstepRunEventStream, StreamingRunEventStream - All edge runners (Direct, FanIn, FanOut, Response) - Telemetry is now disabled by default - Users must call WithOpenTelemetry() to enable spans/activities BREAKING CHANGE: Workflow telemetry is now opt-in. Users who relied on automatic telemetry must add .WithOpenTelemetry() to their workflow builder. --- .../Execution/DirectEdgeRunner.cs | 2 +- .../Execution/EdgeRunner.cs | 6 +- .../Execution/FanInEdgeRunner.cs | 2 +- .../Execution/FanOutEdgeRunner.cs | 2 +- .../Execution/IRunnerContext.cs | 3 + .../Execution/ISuperStepRunner.cs | 3 + .../Execution/LockstepRunEventStream.cs | 5 +- .../Execution/ResponseEdgeRunner.cs | 2 +- .../Execution/StreamingRunEventStream.cs | 5 +- .../Microsoft.Agents.AI.Workflows/Executor.cs | 6 +- .../IWorkflowContextWithTelemetry.cs | 16 +++++ .../InProc/InProcessRunner.cs | 4 ++ .../InProc/InProcessRunnerContext.cs | 14 ++-- .../Observability/ActivityNames.cs | 2 +- .../Observability/WorkflowTelemetryContext.cs | 61 ++++++++++++++++ .../Observability/WorkflowTelemetryOptions.cs | 23 ++++++ .../OpenTelemetryWorkflowBuilderExtensions.cs | 70 +++++++++++++++++++ .../Microsoft.Agents.AI.Workflows/Workflow.cs | 10 ++- .../WorkflowBuilder.cs | 17 +++-- .../ObservabilityTests.cs | 26 ++++++- .../TestRunContext.cs | 3 + 21 files changed, 248 insertions(+), 34 deletions(-) create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs index ee303c500b..ac082ad0e6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs @@ -14,7 +14,7 @@ private async ValueTask FindRouterAsync(IStepTracer? tracer) => await protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); activity? .SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index d71fa539b3..e04fc7465d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -2,6 +2,7 @@ using System.Diagnostics; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -14,9 +15,6 @@ internal interface IStatefulEdgeRunner internal abstract class EdgeRunner { - protected static readonly string s_namespace = typeof(EdgeRunner).Namespace!; - protected static readonly ActivitySource s_activitySource = new(s_namespace); - // TODO: Can this be sync? protected internal abstract ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer); } @@ -26,4 +24,6 @@ internal abstract class EdgeRunner( { protected IRunnerContext RunContext { get; } = Throw.IfNull(runContext); protected TEdgeData EdgeData { get; } = Throw.IfNull(edgeData); + + protected Activity? StartActivity(string name) => this.RunContext.TelemetryContext.StartActivity(name); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs index 02c0252af3..09ea6b5960 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs @@ -19,7 +19,7 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e { Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input"); - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); activity? .SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner)) .SetTag(Tags.MessageTargetId, this.EdgeData.SinkId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs index aa6133955d..c8fe40bbe2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs @@ -13,7 +13,7 @@ internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData { protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); activity? .SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs index f3fc762336..addfbccf7a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs @@ -3,11 +3,14 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; internal interface IRunnerContext : IExternalRequestSink, ISuperStepJoinContext { + WorkflowTelemetryContext TelemetryContext { get; } + ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default); ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs index a7923a7d9b..fce4d9636a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs @@ -3,6 +3,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -12,6 +13,8 @@ internal interface ISuperStepRunner string StartExecutorId { get; } + WorkflowTelemetryContext TelemetryContext { get; } + bool HasUnservicedRequests { get; } bool HasUnprocessedMessages { get; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index b47a692113..248ae5175c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -13,9 +13,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class LockstepRunEventStream : IRunEventStream { - private static readonly string s_namespace = typeof(LockstepRunEventStream).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - private readonly CancellationTokenSource _stopCancellation = new(); private readonly InputWaiter _inputWaiter = new(); private int _isDisposed; @@ -53,7 +50,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartActivity(ActivityNames.WorkflowRun); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs index 55e85b8b14..c5058cd180 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs @@ -23,7 +23,7 @@ public static ResponseEdgeRunner ForPort(IRunnerContext runContext, RequestPort { Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input"); - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); activity? .SetTag(Tags.EdgeGroupType, nameof(ResponseEdgeRunner)) .SetTag(Tags.MessageSourceId, envelope.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index ca0cc52641..49193ff34a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -17,9 +17,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution; /// internal sealed class StreamingRunEventStream : IRunEventStream { - private static readonly string s_namespace = typeof(StreamingRunEventStream).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - private readonly Channel _eventChannel; private readonly ISuperStepRunner _stepRunner; private readonly InputWaiter _inputWaiter; @@ -63,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartActivity(ActivityNames.WorkflowRun); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 647dbcd852..59034e8e7e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -24,9 +24,6 @@ public abstract class Executor : IIdentified /// public string Id { get; } - private static readonly string s_namespace = typeof(Executor).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - // TODO: Add overloads for binding with a configuration/options object once the Configured hierarchy goes away. /// @@ -121,7 +118,8 @@ internal MessageRouter Router /// An exception is generated while handling the message. public async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) { - using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal); + WorkflowTelemetryContext telemetryContext = (context as IWorkflowContextWithTelemetry)?.TelemetryContext ?? WorkflowTelemetryContext.Disabled; + using var activity = telemetryContext.StartActivity(ActivityNames.ExecutorProcess + " " + this.Id); activity?.SetTag(Tags.ExecutorId, this.Id) .SetTag(Tags.ExecutorType, this.GetType().FullName) .SetTag(Tags.MessageType, messageType.TypeName) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs new file mode 100644 index 0000000000..3e79706a61 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows.Observability; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Internal interface that extends IWorkflowContext to provide access to telemetry context. +/// +internal interface IWorkflowContextWithTelemetry : IWorkflowContext +{ + /// + /// Gets the telemetry context for the workflow. + /// + WorkflowTelemetryContext TelemetryContext { get; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 8c7149b0be..be77c23605 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows.InProc; @@ -70,6 +71,9 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager /// public string StartExecutorId { get; } + /// + public WorkflowTelemetryContext TelemetryContext => this.Workflow.TelemetryContext; + private readonly HashSet _knownValidInputTypes; public async ValueTask IsValidInputTypeAsync(Type messageType, CancellationToken cancellationToken = default) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 1750f779f2..7c020abf98 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -66,6 +66,8 @@ public InProcessRunnerContext( this.OutgoingEvents = outgoingEvents; } + public WorkflowTelemetryContext TelemetryContext => this._workflow.TelemetryContext; + public async ValueTask EnsureExecutorAsync(string executorId, IStepTracer? tracer, CancellationToken cancellationToken = default) { this.CheckEnded(); @@ -178,12 +180,12 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca return this.OutgoingEvents.EnqueueAsync(workflowEvent); } - private static readonly string s_namespace = typeof(IWorkflowContext).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default) { - using Activity? activity = s_activitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + using Activity? activity = this._workflow.TelemetryContext.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + activity?.SetTag(Tags.MessageSourceId, sourceId); + if (targetId is not null) { activity?.SetTag(Tags.MessageTargetId, targetId); } + // Create a carrier for trace context propagation var traceContext = activity is null ? null : new Dictionary(); if (traceContext is not null) @@ -242,8 +244,10 @@ private sealed class BoundContext( InProcessRunnerContext RunnerContext, string ExecutorId, OutputFilter outputFilter, - Dictionary? traceContext) : IWorkflowContext + Dictionary? traceContext) : IWorkflowContextWithTelemetry { + public WorkflowTelemetryContext TelemetryContext => RunnerContext._workflow.TelemetryContext; + public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => RunnerContext.AddEventAsync(workflowEvent, cancellationToken); public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index a845915a96..9d2912ecd3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -5,7 +5,7 @@ namespace Microsoft.Agents.AI.Workflows.Observability; internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; - public const string WorkflowRun = "workflow.run"; + public const string WorkflowRun = "workflow_invoke"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; public const string EdgeGroupProcess = "edge_group.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs new file mode 100644 index 0000000000..0dea7fbe5d --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows.Observability; + +/// +/// Internal context for workflow telemetry, holding the enabled state and configuration options. +/// +internal sealed class WorkflowTelemetryContext +{ + /// + /// Gets a shared instance representing disabled telemetry. + /// + public static WorkflowTelemetryContext Disabled { get; } = new(); + + /// + /// Gets a value indicating whether telemetry is enabled. + /// + public bool IsEnabled { get; } + + /// + /// Gets the telemetry options. + /// + public WorkflowTelemetryOptions Options { get; } + + /// + /// Gets the activity source used for creating telemetry spans. + /// + public ActivitySource? ActivitySource { get; } + + private WorkflowTelemetryContext() + { + this.IsEnabled = false; + this.Options = new WorkflowTelemetryOptions(); + this.ActivitySource = null; + } + + /// + /// Initializes a new instance of the class with telemetry enabled. + /// + /// The source name for the activity source. + /// The telemetry options. + public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions options) + { + this.IsEnabled = true; + this.Options = options; + this.ActivitySource = new ActivitySource(sourceName); + } + + /// + /// Starts an activity if telemetry is enabled, otherwise returns null. + /// + /// The activity name. + /// The activity kind. + /// An activity if telemetry is enabled and the activity is sampled, otherwise null. + public Activity? StartActivity(string name, ActivityKind kind = ActivityKind.Internal) + { + return this.ActivitySource?.StartActivity(name, kind); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs new file mode 100644 index 0000000000..843ac03eff --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +/// +/// Configuration options for workflow telemetry. +/// +public sealed class WorkflowTelemetryOptions +{ + /// + /// Gets or sets a value indicating whether potentially sensitive information should be included in telemetry. + /// + /// + /// if potentially sensitive information should be included in telemetry; + /// if telemetry shouldn't include raw inputs and outputs. + /// The default value is . + /// + /// + /// By default, telemetry includes metadata but not raw inputs and outputs, + /// such as message content and executor data. + /// + public bool EnableSensitiveData { get; set; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs new file mode 100644 index 0000000000..27103d81a1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using Microsoft.Agents.AI.Workflows.Observability; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides extension methods for adding OpenTelemetry instrumentation to instances. +/// +public static class OpenTelemetryWorkflowBuilderExtensions +{ + private const string DefaultSourceName = "Microsoft.Agents.AI.Workflows"; + + /// + /// Enables OpenTelemetry instrumentation for the workflow, providing comprehensive observability for workflow operations. + /// + /// The to which OpenTelemetry support will be added. + /// + /// An optional source name that will be used to identify telemetry data from this workflow. + /// If not specified, a default source name will be used. + /// + /// + /// An optional callback that provides additional configuration of the instance. + /// This allows for fine-tuning telemetry behavior such as enabling sensitive data collection. + /// + /// The with OpenTelemetry instrumentation enabled, enabling method chaining. + /// is . + /// + /// + /// This extension adds comprehensive telemetry capabilities to workflows, including: + /// + /// Distributed tracing of workflow execution + /// Executor invocation and processing spans + /// Edge routing and message delivery spans + /// Workflow build and validation spans + /// Error tracking and exception details + /// + /// + /// + /// By default, workflow telemetry is disabled. Call this method to enable telemetry collection. + /// + /// + /// + /// + /// var workflow = new WorkflowBuilder(startExecutor) + /// .AddEdge(executor1, executor2) + /// .WithOpenTelemetry("MyApp.Workflows", cfg => cfg.EnableSensitiveData = true) + /// .Build(); + /// + /// + public static WorkflowBuilder WithOpenTelemetry( + this WorkflowBuilder builder, + string? sourceName = null, + Action? configure = null) + { + Throw.IfNull(builder); + + WorkflowTelemetryOptions options = new(); + configure?.Invoke(options); + + string effectiveSourceName = string.IsNullOrEmpty(sourceName) ? DefaultSourceName : sourceName!; + WorkflowTelemetryContext context = new(effectiveSourceName, options); + + builder.SetTelemetryContext(context); + + return builder; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index 456838b9eb..af6c8d2b63 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; @@ -66,6 +67,11 @@ public Dictionary ReflectPorts() /// public string? Description { get; internal init; } + /// + /// Gets the telemetry context for the workflow. + /// + internal WorkflowTelemetryContext TelemetryContext { get; } + internal bool AllowConcurrent => this.ExecutorBindings.Values.All(registration => registration.SupportsConcurrentSharedExecution); internal IEnumerable NonConcurrentExecutorIds => @@ -78,11 +84,13 @@ public Dictionary ReflectPorts() /// The unique identifier of the starting executor for the workflow. Cannot be null. /// Optional human-readable name for the workflow. /// Optional description of what the workflow does. - internal Workflow(string startExecutorId, string? name = null, string? description = null) + /// Optional telemetry context for the workflow. + internal Workflow(string startExecutorId, string? name = null, string? description = null, WorkflowTelemetryContext? telemetryContext = null) { this.StartExecutorId = Throw.IfNull(startExecutorId); this.Name = name; this.Description = description; + this.TelemetryContext = telemetryContext ?? WorkflowTelemetryContext.Disabled; } private bool _needsReset; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index 4b6980d433..494cf28c24 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -38,9 +38,7 @@ private readonly record struct EdgeConnection(string SourceId, string TargetId) private readonly string _startExecutorId; private string? _name; private string? _description; - - private static readonly string s_namespace = typeof(WorkflowBuilder).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); + private WorkflowTelemetryContext _telemetryContext = WorkflowTelemetryContext.Disabled; /// /// Initializes a new instance of the WorkflowBuilder class with the specified starting executor. @@ -137,6 +135,15 @@ public WorkflowBuilder WithDescription(string description) return this; } + /// + /// Sets the telemetry context for the workflow. + /// + /// The telemetry context to use. + internal void SetTelemetryContext(WorkflowTelemetryContext context) + { + this._telemetryContext = Throw.IfNull(context); + } + /// /// Binds the specified executor (via registration) to the workflow, allowing it to participate in workflow execution. /// @@ -563,7 +570,7 @@ private Workflow BuildInternal(bool validateOrphans, Activity? activity = null) activity?.AddEvent(new ActivityEvent(EventNames.BuildValidationCompleted)); - var workflow = new Workflow(this._startExecutorId, this._name, this._description) + var workflow = new Workflow(this._startExecutorId, this._name, this._description, this._telemetryContext) { ExecutorBindings = this._executorBindings, Edges = this._edges, @@ -601,7 +608,7 @@ private Workflow BuildInternal(bool validateOrphans, Activity? activity = null) /// or if the start executor is not bound. public Workflow Build(bool validateOrphans = true) { - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild); + using Activity? activity = this._telemetryContext.StartActivity(ActivityNames.WorkflowBuild); var workflow = this.BuildInternal(validateOrphans, activity); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 8ab6280b46..bd302999b1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -67,7 +67,7 @@ private static Workflow CreateWorkflow() WorkflowBuilder builder = new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); - return builder.Build(); + return builder.WithOpenTelemetry().Build(); } private static Dictionary GetExpectedActivityNameCounts() => @@ -122,12 +122,12 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme { var activityName = kvp.Key; var expectedCount = kvp.Value; - var actualCount = capturedActivities.Count(a => a.OperationName == activityName); + var actualCount = capturedActivities.Count(a => a.OperationName.StartsWith(activityName, StringComparison.Ordinal)); actualCount.Should().Be(expectedCount, $"Activity '{activityName}' should occur {expectedCount} times."); } // Verify WorkflowRun activity events include workflow lifecycle events - var workflowRunActivity = capturedActivities.First(a => a.OperationName == ActivityNames.WorkflowRun); + var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)); var activityEvents = workflowRunActivity.Events.ToList(); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event"); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event"); @@ -183,4 +183,24 @@ public async Task CreatesWorkflowActivities_WithCorrectNameAsync() tags.Should().ContainKey(Tags.WorkflowId); tags.Should().ContainKey(Tags.WorkflowDefinition); } + + [Fact] + public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() + { + // Arrange + // Create a test activity to correlate captured activities + using var testActivity = new Activity("ObservabilityTest").Start(); + + // Act - Build workflow WITHOUT calling WithOpenTelemetry() + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + WorkflowBuilder builder = new(uppercase); + var workflow = builder.Build(); // No WithOpenTelemetry() call + await Task.Delay(100); // Allow time for activities to be captured + + // Assert - No activities should be created + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs index 57375b8341..9c8ef41a94 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -88,6 +89,8 @@ ValueTask IRunnerContext.AdvanceAsync(CancellationToken cancellatio public bool WithCheckpointing => throw new NotSupportedException(); public bool ConcurrentRunsEnabled => throw new NotSupportedException(); + WorkflowTelemetryContext IRunnerContext.TelemetryContext => WorkflowTelemetryContext.Disabled; + ValueTask IRunnerContext.EnsureExecutorAsync(string executorId, IStepTracer? tracer, CancellationToken cancellationToken) => new(this.Executors[executorId]); From f2fc5d50317f49273a2aab8a8a90120447528eec Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Sun, 25 Jan 2026 21:38:24 -0800 Subject: [PATCH 02/13] refactor: Pass telemetry context as parameter instead of via interface - Remove IWorkflowContextWithTelemetry interface - Add internal ExecuteAsync overload that accepts WorkflowTelemetryContext - Public ExecuteAsync delegates with WorkflowTelemetryContext.Disabled - InProcessRunner passes TelemetryContext when calling ExecuteAsync - BoundContext now implements IWorkflowContext (not the removed interface) --- .../Microsoft.Agents.AI.Workflows/Executor.cs | 6 ++++-- .../IWorkflowContextWithTelemetry.cs | 16 ---------------- .../InProc/InProcessRunner.cs | 1 + .../InProc/InProcessRunnerContext.cs | 4 +--- 4 files changed, 6 insertions(+), 21 deletions(-) delete mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 59034e8e7e..faeb209b78 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -116,9 +116,11 @@ internal MessageRouter Router /// A ValueTask representing the asynchronous operation, wrapping the output from the executor. /// No handler found for the message type. /// An exception is generated while handling the message. - public async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) + public ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) + => this.ExecuteAsync(message, messageType, context, WorkflowTelemetryContext.Disabled, cancellationToken); + + internal async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, WorkflowTelemetryContext telemetryContext, CancellationToken cancellationToken = default) { - WorkflowTelemetryContext telemetryContext = (context as IWorkflowContextWithTelemetry)?.TelemetryContext ?? WorkflowTelemetryContext.Disabled; using var activity = telemetryContext.StartActivity(ActivityNames.ExecutorProcess + " " + this.Id); activity?.SetTag(Tags.ExecutorId, this.Id) .SetTag(Tags.ExecutorType, this.GetType().FullName) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs deleted file mode 100644 index 3e79706a61..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContextWithTelemetry.cs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using Microsoft.Agents.AI.Workflows.Observability; - -namespace Microsoft.Agents.AI.Workflows; - -/// -/// Internal interface that extends IWorkflowContext to provide access to telemetry context. -/// -internal interface IWorkflowContextWithTelemetry : IWorkflowContext -{ - /// - /// Gets the telemetry context for the workflow. - /// - WorkflowTelemetryContext TelemetryContext { get; } -} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index be77c23605..4198a00a98 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -205,6 +205,7 @@ await executor.ExecuteAsync( envelope.Message, envelope.MessageType, this.RunContext.Bind(receiverId, envelope.TraceContext), + this.TelemetryContext, cancellationToken ).ConfigureAwait(false); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 7c020abf98..1da76ad3a9 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -244,10 +244,8 @@ private sealed class BoundContext( InProcessRunnerContext RunnerContext, string ExecutorId, OutputFilter outputFilter, - Dictionary? traceContext) : IWorkflowContextWithTelemetry + Dictionary? traceContext) : IWorkflowContext { - public WorkflowTelemetryContext TelemetryContext => RunnerContext._workflow.TelemetryContext; - public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => RunnerContext.AddEventAsync(workflowEvent, cancellationToken); public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default) From a12399225f2aedf77129e9bc06582edb90c13099 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 26 Jan 2026 10:58:21 -0800 Subject: [PATCH 03/13] Add optional ActivitySource parameter to WithOpenTelemetry Allow users to provide their own ActivitySource when enabling telemetry, giving them better control over the ActivitySource lifecycle. When not provided, the framework creates one internally (existing behavior). Changes: - Add optional activitySource parameter to WithOpenTelemetry() extension - Update WorkflowTelemetryContext to accept external ActivitySource - Add unit test for user-provided ActivitySource scenario --- .../Observability/WorkflowTelemetryContext.cs | 11 ++++-- .../OpenTelemetryWorkflowBuilderExtensions.cs | 14 +++++-- .../ObservabilityTests.cs | 37 +++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index 0dea7fbe5d..ea7bbb3ebe 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -39,13 +39,18 @@ private WorkflowTelemetryContext() /// /// Initializes a new instance of the class with telemetry enabled. /// - /// The source name for the activity source. + /// The source name for the activity source. Used only when is . /// The telemetry options. - public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions options) + /// + /// An optional activity source to use. If provided, this activity source will be used directly + /// and the caller retains ownership (responsible for disposal). If , a new + /// activity source will be created using . + /// + public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions options, ActivitySource? activitySource = null) { this.IsEnabled = true; this.Options = options; - this.ActivitySource = new ActivitySource(sourceName); + this.ActivitySource = activitySource ?? new ActivitySource(sourceName); } /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs index 27103d81a1..e988b5a4ec 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. using System; +using System.Diagnostics; using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; @@ -19,12 +20,18 @@ public static class OpenTelemetryWorkflowBuilderExtensions /// The to which OpenTelemetry support will be added. /// /// An optional source name that will be used to identify telemetry data from this workflow. - /// If not specified, a default source name will be used. + /// If not specified, a default source name will be used. This parameter is ignored when + /// is provided. /// /// /// An optional callback that provides additional configuration of the instance. /// This allows for fine-tuning telemetry behavior such as enabling sensitive data collection. /// + /// + /// An optional to use for telemetry. If provided, this activity source will be used + /// directly and the caller retains ownership (responsible for disposal). If , a new + /// activity source will be created using . + /// /// The with OpenTelemetry instrumentation enabled, enabling method chaining. /// is . /// @@ -53,7 +60,8 @@ public static class OpenTelemetryWorkflowBuilderExtensions public static WorkflowBuilder WithOpenTelemetry( this WorkflowBuilder builder, string? sourceName = null, - Action? configure = null) + Action? configure = null, + ActivitySource? activitySource = null) { Throw.IfNull(builder); @@ -61,7 +69,7 @@ public static WorkflowBuilder WithOpenTelemetry( configure?.Invoke(options); string effectiveSourceName = string.IsNullOrEmpty(sourceName) ? DefaultSourceName : sourceName!; - WorkflowTelemetryContext context = new(effectiveSourceName, options); + WorkflowTelemetryContext context = new(effectiveSourceName, options, activitySource); builder.SetTelemetryContext(context); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index bd302999b1..ca84c2fdec 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -203,4 +203,41 @@ public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); } + + [Fact] + public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + using var userActivitySource = new ActivitySource("UserProvidedSource"); + + // Set up a separate listener for the user-provided source + ConcurrentBag userActivities = []; + using var userListener = new ActivityListener + { + ShouldListenTo = source => source.Name == "UserProvidedSource", + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => userActivities.Add(activity), + }; + ActivitySource.AddActivityListener(userListener); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + var workflow = builder.WithOpenTelemetry(activitySource: userActivitySource).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = userActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotBeEmpty("Activities should be created with user-provided ActivitySource."); + capturedActivities.Should().OnlyContain( + a => a.Source.Name == "UserProvidedSource", + "All activities should come from the user-provided ActivitySource."); + } } From e5421a7e6bda91536a87c21ec9f906d97525cc06 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 26 Jan 2026 12:34:11 -0800 Subject: [PATCH 04/13] Add component-level telemetry control with disable flags Allow users to selectively disable specific activity types via WorkflowTelemetryOptions. All activities are enabled by default. New disable flags: - DisableWorkflowBuild: Disables workflow.build activities - DisableWorkflowRun: Disables workflow_invoke activities - DisableExecutorProcess: Disables executor.process activities - DisableEdgeGroupProcess: Disables edge_group.process activities - DisableMessageSend: Disables message.send activities Added helper methods to WorkflowTelemetryContext for each activity type and updated all activity creation sites to use them. --- .../Execution/EdgeRunner.cs | 2 +- .../Execution/LockstepRunEventStream.cs | 2 +- .../Execution/StreamingRunEventStream.cs | 2 +- .../Microsoft.Agents.AI.Workflows/Executor.cs | 2 +- .../InProc/InProcessRunnerContext.cs | 2 +- .../Observability/WorkflowTelemetryContext.cs | 71 ++++++++ .../Observability/WorkflowTelemetryOptions.cs | 45 +++++ .../WorkflowBuilder.cs | 2 +- .../ObservabilityTests.cs | 154 ++++++++++++++++++ 9 files changed, 276 insertions(+), 6 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index e04fc7465d..3300015570 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -25,5 +25,5 @@ internal abstract class EdgeRunner( protected IRunnerContext RunContext { get; } = Throw.IfNull(runContext); protected TEdgeData EdgeData { get; } = Throw.IfNull(edgeData); - protected Activity? StartActivity(string name) => this.RunContext.TelemetryContext.StartActivity(name); + protected Activity? StartActivity(string name) => this.RunContext.TelemetryContext.StartEdgeGroupProcessActivity(); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 248ae5175c..250a9ee612 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -50,7 +50,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 49193ff34a..4cce8df844 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -60,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index faeb209b78..2f060f6419 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -121,7 +121,7 @@ internal MessageRouter Router internal async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, WorkflowTelemetryContext telemetryContext, CancellationToken cancellationToken = default) { - using var activity = telemetryContext.StartActivity(ActivityNames.ExecutorProcess + " " + this.Id); + using var activity = telemetryContext.StartExecutorProcessActivity(this.Id); activity?.SetTag(Tags.ExecutorId, this.Id) .SetTag(Tags.ExecutorType, this.GetType().FullName) .SetTag(Tags.MessageType, messageType.TypeName) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 1da76ad3a9..93a1fda209 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -182,7 +182,7 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default) { - using Activity? activity = this._workflow.TelemetryContext.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + using Activity? activity = this._workflow.TelemetryContext.StartMessageSendActivity(); activity?.SetTag(Tags.MessageSourceId, sourceId); if (targetId is not null) { activity?.SetTag(Tags.MessageTargetId, targetId); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index ea7bbb3ebe..012157afe2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -63,4 +63,75 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti { return this.ActivitySource?.StartActivity(name, kind); } + + /// + /// Starts a workflow build activity if enabled. + /// + /// An activity if workflow build telemetry is enabled, otherwise null. + public Activity? StartWorkflowBuildActivity() + { + if (this.Options.DisableWorkflowBuild) + { + return null; + } + + return this.ActivitySource?.StartActivity(ActivityNames.WorkflowBuild); + } + + /// + /// Starts a workflow run activity if enabled. + /// + /// An activity if workflow run telemetry is enabled, otherwise null. + public Activity? StartWorkflowRunActivity() + { + if (this.Options.DisableWorkflowRun) + { + return null; + } + + return this.ActivitySource?.StartActivity(ActivityNames.WorkflowRun); + } + + /// + /// Starts an executor process activity if enabled. + /// + /// The executor identifier to include in the activity name. + /// An activity if executor process telemetry is enabled, otherwise null. + public Activity? StartExecutorProcessActivity(string executorId) + { + if (this.Options.DisableExecutorProcess) + { + return null; + } + + return this.ActivitySource?.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); + } + + /// + /// Starts an edge group process activity if enabled. + /// + /// An activity if edge group process telemetry is enabled, otherwise null. + public Activity? StartEdgeGroupProcessActivity() + { + if (this.Options.DisableEdgeGroupProcess) + { + return null; + } + + return this.ActivitySource?.StartActivity(ActivityNames.EdgeGroupProcess); + } + + /// + /// Starts a message send activity if enabled. + /// + /// An activity if message send telemetry is enabled, otherwise null. + public Activity? StartMessageSendActivity() + { + if (this.Options.DisableMessageSend) + { + return null; + } + + return this.ActivitySource?.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs index 843ac03eff..832c024de4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs @@ -20,4 +20,49 @@ public sealed class WorkflowTelemetryOptions /// such as message content and executor data. /// public bool EnableSensitiveData { get; set; } + + /// + /// Gets or sets a value indicating whether workflow build activities should be disabled. + /// + /// + /// to disable workflow.build activities; + /// to enable them. The default value is . + /// + public bool DisableWorkflowBuild { get; set; } + + /// + /// Gets or sets a value indicating whether workflow run activities should be disabled. + /// + /// + /// to disable workflow_invoke activities; + /// to enable them. The default value is . + /// + public bool DisableWorkflowRun { get; set; } + + /// + /// Gets or sets a value indicating whether executor process activities should be disabled. + /// + /// + /// to disable executor.process activities; + /// to enable them. The default value is . + /// + public bool DisableExecutorProcess { get; set; } + + /// + /// Gets or sets a value indicating whether edge group process activities should be disabled. + /// + /// + /// to disable edge_group.process activities; + /// to enable them. The default value is . + /// + public bool DisableEdgeGroupProcess { get; set; } + + /// + /// Gets or sets a value indicating whether message send activities should be disabled. + /// + /// + /// to disable message.send activities; + /// to enable them. The default value is . + /// + public bool DisableMessageSend { get; set; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index 494cf28c24..36a3468e2d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -608,7 +608,7 @@ private Workflow BuildInternal(bool validateOrphans, Activity? activity = null) /// or if the start executor is not bound. public Workflow Build(bool validateOrphans = true) { - using Activity? activity = this._telemetryContext.StartActivity(ActivityNames.WorkflowBuild); + using Activity? activity = this._telemetryContext.StartWorkflowBuildActivity(); var workflow = this.BuildInternal(validateOrphans, activity); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index ca84c2fdec..f196169b11 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -240,4 +240,158 @@ public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync() a => a.Source.Name == "UserProvidedSource", "All activities should come from the user-provided ActivitySource."); } + + [Fact] + public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), + "WorkflowBuild activity should be disabled."); + } + + [Fact] + public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowRun = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + "WorkflowRun activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableExecutorProcess = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "ExecutorProcess activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + var workflow = CreateWorkflowWithDisabledEdges(); + + // Act + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.EdgeGroupProcess, StringComparison.Ordinal), + "EdgeGroupProcess activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableMessageSend_PreventsMessageSendActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + var workflow = CreateWorkflowWithDisabledMessages(); + + // Act + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal), + "MessageSend activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "Other activities should still be created."); + } + + private static Workflow CreateWorkflowWithDisabledEdges() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry(configure: opts => opts.DisableEdgeGroupProcess = true).Build(); + } + + private static Workflow CreateWorkflowWithDisabledMessages() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build(); + } } From 94fa28e8dd2dad0224a90e96f908bda211e28c37 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 26 Jan 2026 14:22:48 -0800 Subject: [PATCH 05/13] Implement EnableSensitiveData to log executor input/output When EnableSensitiveData is true in WorkflowTelemetryOptions, executor input and output are logged as JSON-serialized attributes in the executor.process activity. New activity tags: - executor.input: JSON serialized input message - executor.output: JSON serialized output result (non-void only) Added suppression attributes for AOT/trimming warnings since this is an opt-in feature for debugging/diagnostics. --- .../Microsoft.Agents.AI.Workflows/Executor.cs | 30 +++++++++ .../Observability/Tags.cs | 2 + .../ObservabilityTests.cs | 64 +++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 2f060f6419..106fbb2369 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Reflection; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; @@ -127,6 +128,11 @@ internal MessageRouter Router .SetTag(Tags.MessageType, messageType.TypeName) .CreateSourceLinks(context.TraceContext); + if (activity is not null && telemetryContext.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorInput, SerializeForTelemetry(message)); + } + await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message), cancellationToken).ConfigureAwait(false); CallResult? result = await this.Router.RouteMessageAsync(message, context, requireRoute: true, cancellationToken) @@ -160,6 +166,11 @@ internal MessageRouter Router return null; // Void result. } + if (activity is not null && telemetryContext.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorOutput, SerializeForTelemetry(result.Result)); + } + // If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour? if (result.Result is not null && this.Options.AutoSendMessageHandlerResultObject) { @@ -234,6 +245,25 @@ internal bool CanOutput(Type messageType) return false; } + + [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("ReflectionAnalysis", "IL3050:RequiresDynamicCode", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + private static string? SerializeForTelemetry(object? value) + { + if (value is null) + { + return null; + } + + try + { + return JsonSerializer.Serialize(value, value.GetType()); + } + catch (JsonException) + { + return $"[Unserializable: {value.GetType().FullName}]"; + } + } } /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs index 9acba99eea..a1ac2f3c69 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -14,6 +14,8 @@ internal static class Tags public const string RunId = "run.id"; public const string ExecutorId = "executor.id"; public const string ExecutorType = "executor.type"; + public const string ExecutorInput = "executor.input"; + public const string ExecutorOutput = "executor.output"; public const string MessageType = "message.type"; public const string EdgeGroupType = "edge_group.type"; public const string MessageSourceId = "message.source_id"; diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index f196169b11..1e7f857fbe 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -394,4 +394,68 @@ private static Workflow CreateWorkflowWithDisabledMessages() return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build(); } + + [Fact] + public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var executorActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal)); + + executorActivity.Should().NotBeNull("ExecutorProcess activity should be created."); + + var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().ContainKey(Tags.ExecutorInput, "Input should be logged when EnableSensitiveData is true."); + tags.Should().ContainKey(Tags.ExecutorOutput, "Output should be logged when EnableSensitiveData is true."); + tags[Tags.ExecutorInput].Should().Contain("hello", "Input should contain the input value."); + tags[Tags.ExecutorOutput].Should().Contain("HELLO", "Output should contain the transformed value."); + } + + [Fact] + public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act - EnableSensitiveData is false by default + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry().Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var executorActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal)); + + executorActivity.Should().NotBeNull("ExecutorProcess activity should be created."); + + var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().NotContainKey(Tags.ExecutorInput, "Input should NOT be logged when EnableSensitiveData is false."); + tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false."); + } } From 86b268b8d259f2640d83f71c0d81d3b4ae770c4d Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 26 Jan 2026 14:54:30 -0800 Subject: [PATCH 06/13] Refactor activity start methods to centralize tagging logic Move tagging logic into WorkflowTelemetryContext methods: - StartExecutorProcessActivity now accepts executorId, executorType, messageType, and message; sets all tags including executor.input when EnableSensitiveData is true - Added SetExecutorOutput method to set executor.output after execution - StartMessageSendActivity now accepts sourceId, targetId, and message; sets all tags including message.content when EnableSensitiveData is true Simplified Executor.cs and InProcessRunnerContext.cs by removing inline tagging code. Added message.content tag constant. --- .../Microsoft.Agents.AI.Workflows/Executor.cs | 37 +------- .../InProc/InProcessRunnerContext.cs | 4 +- .../Observability/Tags.cs | 1 + .../Observability/WorkflowTelemetryContext.cs | 86 +++++++++++++++++-- .../ObservabilityTests.cs | 68 +++++++++++++++ .../workflow_as_agent_human_in_the_loop.py | 5 +- 6 files changed, 154 insertions(+), 47 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 106fbb2369..9a1e860977 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Reflection; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; @@ -122,16 +121,8 @@ internal MessageRouter Router internal async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, WorkflowTelemetryContext telemetryContext, CancellationToken cancellationToken = default) { - using var activity = telemetryContext.StartExecutorProcessActivity(this.Id); - activity?.SetTag(Tags.ExecutorId, this.Id) - .SetTag(Tags.ExecutorType, this.GetType().FullName) - .SetTag(Tags.MessageType, messageType.TypeName) - .CreateSourceLinks(context.TraceContext); - - if (activity is not null && telemetryContext.Options.EnableSensitiveData) - { - activity.SetTag(Tags.ExecutorInput, SerializeForTelemetry(message)); - } + using var activity = telemetryContext.StartExecutorProcessActivity(this.Id, this.GetType().FullName, messageType.TypeName, message); + activity?.CreateSourceLinks(context.TraceContext); await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message), cancellationToken).ConfigureAwait(false); @@ -166,10 +157,7 @@ internal MessageRouter Router return null; // Void result. } - if (activity is not null && telemetryContext.Options.EnableSensitiveData) - { - activity.SetTag(Tags.ExecutorOutput, SerializeForTelemetry(result.Result)); - } + telemetryContext.SetExecutorOutput(activity, result.Result); // If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour? if (result.Result is not null && this.Options.AutoSendMessageHandlerResultObject) @@ -245,25 +233,6 @@ internal bool CanOutput(Type messageType) return false; } - - [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("ReflectionAnalysis", "IL3050:RequiresDynamicCode", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] - [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] - private static string? SerializeForTelemetry(object? value) - { - if (value is null) - { - return null; - } - - try - { - return JsonSerializer.Serialize(value, value.GetType()); - } - catch (JsonException) - { - return $"[Unserializable: {value.GetType().FullName}]"; - } - } } /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 93a1fda209..e7c506f888 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -182,9 +182,7 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default) { - using Activity? activity = this._workflow.TelemetryContext.StartMessageSendActivity(); - activity?.SetTag(Tags.MessageSourceId, sourceId); - if (targetId is not null) { activity?.SetTag(Tags.MessageTargetId, targetId); } + using Activity? activity = this._workflow.TelemetryContext.StartMessageSendActivity(sourceId, targetId, message); // Create a carrier for trace context propagation var traceContext = activity is null ? null : new Dictionary(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs index a1ac2f3c69..4b40e46f8c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -17,6 +17,7 @@ internal static class Tags public const string ExecutorInput = "executor.input"; public const string ExecutorOutput = "executor.output"; public const string MessageType = "message.type"; + public const string MessageContent = "message.content"; public const string EdgeGroupType = "edge_group.type"; public const string MessageSourceId = "message.source_id"; public const string MessageTargetId = "message.target_id"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index 012157afe2..65ff2cb7df 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; namespace Microsoft.Agents.AI.Workflows.Observability; @@ -93,18 +95,49 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti } /// - /// Starts an executor process activity if enabled. + /// Starts an executor process activity if enabled, with all standard tags set. /// - /// The executor identifier to include in the activity name. + /// The executor identifier. + /// The executor type name. + /// The message type name. + /// The input message. Logged only when is true. /// An activity if executor process telemetry is enabled, otherwise null. - public Activity? StartExecutorProcessActivity(string executorId) + public Activity? StartExecutorProcessActivity(string executorId, string? executorType, string messageType, object? message) { if (this.Options.DisableExecutorProcess) { return null; } - return this.ActivitySource?.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); + Activity? activity = this.ActivitySource?.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); + if (activity is null) + { + return null; + } + + activity.SetTag(Tags.ExecutorId, executorId) + .SetTag(Tags.ExecutorType, executorType) + .SetTag(Tags.MessageType, messageType); + + if (this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorInput, SerializeForTelemetry(message)); + } + + return activity; + } + + /// + /// Sets the executor output tag on an activity when sensitive data logging is enabled. + /// + /// The activity to set the output on. + /// The output value to log. + public void SetExecutorOutput(Activity? activity, object? output) + { + if (activity is not null && this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorOutput, SerializeForTelemetry(output)); + } } /// @@ -122,16 +155,55 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti } /// - /// Starts a message send activity if enabled. + /// Starts a message send activity if enabled, with all standard tags set. /// + /// The source executor identifier. + /// The target executor identifier, if any. + /// The message being sent. Logged only when is true. /// An activity if message send telemetry is enabled, otherwise null. - public Activity? StartMessageSendActivity() + public Activity? StartMessageSendActivity(string sourceId, string? targetId, object? message) { if (this.Options.DisableMessageSend) { return null; } - return this.ActivitySource?.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + Activity? activity = this.ActivitySource?.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + if (activity is null) + { + return null; + } + + activity.SetTag(Tags.MessageSourceId, sourceId); + if (targetId is not null) + { + activity.SetTag(Tags.MessageTargetId, targetId); + } + + if (this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.MessageContent, SerializeForTelemetry(message)); + } + + return activity; + } + + [UnconditionalSuppressMessage("ReflectionAnalysis", "IL3050:RequiresDynamicCode", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + [UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + private static string? SerializeForTelemetry(object? value) + { + if (value is null) + { + return null; + } + + try + { + return JsonSerializer.Serialize(value, value.GetType()); + } + catch (JsonException) + { + return $"[Unserializable: {value.GetType().FullName}]"; + } } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 1e7f857fbe..5ca56f00e8 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -458,4 +458,72 @@ public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync() tags.Should().NotContainKey(Tags.ExecutorInput, "Input should NOT be logged when EnableSensitiveData is false."); tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false."); } + + [Fact] + public async Task EnableSensitiveData_LogsMessageSendContentAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var messageSendActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal)); + + messageSendActivity.Should().NotBeNull("MessageSend activity should be created."); + + var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().ContainKey(Tags.MessageContent, "Message content should be logged when EnableSensitiveData is true."); + tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should be logged."); + } + + [Fact] + public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + // Act - EnableSensitiveData is false by default + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + var workflow = builder.WithOpenTelemetry().Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + await Task.Delay(100); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var messageSendActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal)); + + messageSendActivity.Should().NotBeNull("MessageSend activity should be created."); + + var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().NotContainKey(Tags.MessageContent, "Message content should NOT be logged when EnableSensitiveData is false."); + tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should still be logged."); + } } diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py index 3850cf74e7..21907af0e3 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -7,8 +7,7 @@ from pathlib import Path from typing import Any -from agent_framework.azure import AzureOpenAIChatClient -from azure.identity import AzureCliCredential +from agent_framework.openai import OpenAIChatClient # Ensure local getting_started package can be imported when running as a script. _SAMPLES_ROOT = Path(__file__).resolve().parents[3] @@ -105,7 +104,7 @@ async def main() -> None: .register_executor( lambda: Worker( id="sub-worker", - chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + chat_client=OpenAIChatClient(), ), name="worker", ) From a5c393c8cb064d6103f7af5564f280e636a52098 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 26 Jan 2026 15:19:32 -0800 Subject: [PATCH 07/13] Revert Python changes --- .../workflows/agents/workflow_as_agent_human_in_the_loop.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py index 21907af0e3..3850cf74e7 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -7,7 +7,8 @@ from pathlib import Path from typing import Any -from agent_framework.openai import OpenAIChatClient +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential # Ensure local getting_started package can be imported when running as a script. _SAMPLES_ROOT = Path(__file__).resolve().parents[3] @@ -104,7 +105,7 @@ async def main() -> None: .register_executor( lambda: Worker( id="sub-worker", - chat_client=OpenAIChatClient(), + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), ), name="worker", ) From 2979edc2e0c1f0d37d99227301499f6f2c02d662 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 27 Jan 2026 10:47:49 -0800 Subject: [PATCH 08/13] Update samples and code cleanup --- .../FoundryAgents_Step01.1_Basics/Program.cs | 1 - .../Observability/ApplicationInsights/Program.cs | 8 +++++++- .../Workflows/Observability/AspireDashboard/Program.cs | 8 +++++++- .../Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs | 1 - .../CosmosDBCollectionFixture.cs | 2 -- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs b/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs index 9a7ee0736a..91ff29c369 100644 --- a/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs +++ b/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs @@ -6,7 +6,6 @@ using Azure.AI.Projects.OpenAI; using Azure.Identity; using Microsoft.Agents.AI; -using Microsoft.Extensions.AI; string endpoint = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_ENDPOINT") ?? throw new InvalidOperationException("AZURE_FOUNDRY_PROJECT_ENDPOINT is not set."); string deploymentName = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs index f7894f707a..a05a5cddf6 100644 --- a/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs @@ -35,8 +35,10 @@ private static async Task Main() using var traceProvider = Sdk.CreateTracerProviderBuilder() .SetResourceBuilder(resourceBuilder) - .AddSource("Microsoft.Agents.AI.Workflows*") .AddSource(SourceName) + // The following source is only required if not specifying + // the `activitySource` in the WithOpenTelemetry call below + .AddSource("Microsoft.Agents.AI.Workflows*") .AddAzureMonitorTraceExporter(options => options.ConnectionString = applicationInsightsConnectionString) .Build(); @@ -51,6 +53,10 @@ private static async Task Main() // Build the workflow by connecting executors sequentially var workflow = new WorkflowBuilder(uppercase) .AddEdge(uppercase, reverse) + .WithOpenTelemetry( + // Set `EnableSensitiveData` to true to include message content in traces + configure: cfg => cfg.EnableSensitiveData = true, + activitySource: s_activitySource) .Build(); // Execute the workflow with input data diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs index c04a397c55..06eebfba9d 100644 --- a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs @@ -37,8 +37,10 @@ private static async Task Main() using var traceProvider = Sdk.CreateTracerProviderBuilder() .SetResourceBuilder(resourceBuilder) - .AddSource("Microsoft.Agents.AI.Workflows*") .AddSource(SourceName) + // The following source is only required if not specifying + // the `activitySource` in the WithOpenTelemetry call below + .AddSource("Microsoft.Agents.AI.Workflows*") .AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint)) .Build(); @@ -53,6 +55,10 @@ private static async Task Main() // Build the workflow by connecting executors sequentially var workflow = new WorkflowBuilder(uppercase) .AddEdge(uppercase, reverse) + .WithOpenTelemetry( + // Set `EnableSensitiveData` to true to include message content in traces + configure: cfg => cfg.EnableSensitiveData = true, + activitySource: s_activitySource) .Build(); // Execute the workflow with input data diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index 3300015570..4a2d6c72ad 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -2,7 +2,6 @@ using System.Diagnostics; using System.Threading.Tasks; -using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows.Execution; diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs index 195c433de5..d6825ad30d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs @@ -1,7 +1,5 @@ // Copyright (c) Microsoft. All rights reserved. -using Xunit; - namespace Microsoft.Agents.AI.CosmosNoSql.UnitTests; /// From b1167bb2757c846f85a49ad87e1801c3dbe7eb8d Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 27 Jan 2026 11:04:43 -0800 Subject: [PATCH 09/13] Fix file formatting --- .../Observability/WorkflowTelemetryContext.cs | 2 +- .../Observability/WorkflowTelemetryOptions.cs | 2 +- .../OpenTelemetryWorkflowBuilderExtensions.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index 65ff2cb7df..a9071dfdb4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Diagnostics; using System.Diagnostics.CodeAnalysis; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs index 832c024de4..b32f0c0f66 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Agents.AI.Workflows.Observability; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs index e988b5a4ec..93d199a488 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System; using System.Diagnostics; From 4289b751c9c5154332b1771a385342c896c8a8eb Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 27 Jan 2026 11:07:50 -0800 Subject: [PATCH 10/13] Add comment --- dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 9a1e860977..8068c986a8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -157,6 +157,9 @@ internal MessageRouter Router return null; // Void result. } + // Output is not available if executor does not return anything, in which case + // messages sent in the handlers of this executor will be set in the message + // send activities. telemetryContext.SetExecutorOutput(activity, result.Result); // If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour? From 11b944da8aa13844ef7817864d4d27bc5ff9fc8e Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 5 Feb 2026 21:14:27 -0800 Subject: [PATCH 11/13] Add telemetry configuration to declarative workflow --- .../DeclarativeWorkflowOptions.cs | 20 ++ .../Interpreter/WorkflowActionVisitor.cs | 15 + .../DeclarativeWorkflowOptionsTest.cs | 310 ++++++++++++++++++ .../ObservabilityTests.cs | 6 - 4 files changed, 345 insertions(+), 6 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs index 638bed1f90..1a1b29a6e6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs @@ -1,5 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. +using System; +using System.Diagnostics; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -41,4 +44,21 @@ public sealed class DeclarativeWorkflowOptions(WorkflowAgentProvider agentProvid /// Gets the used to create loggers for workflow components. /// public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; + + /// + /// Gets the telemetry source name for OpenTelemetry instrumentation. + /// If , telemetry is disabled unless is provided. + /// + public string? TelemetrySourceName { get; init; } + + /// + /// Gets the callback to configure telemetry options. + /// + public Action? ConfigureTelemetry { get; init; } + + /// + /// Gets an optional for telemetry. + /// If provided, the caller retains ownership and is responsible for disposal. + /// + public ActivitySource? TelemetryActivitySource { get; init; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs index b6bcd458d8..5fb656d003 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs @@ -51,6 +51,21 @@ public Workflow Complete() this._workflowModel.Build(builder); + // Apply telemetry if configured + if (this._workflowOptions.TelemetrySourceName is not null || this._workflowOptions.TelemetryActivitySource is not null) + { + builder.WorkflowBuilder.WithOpenTelemetry( + this._workflowOptions.TelemetrySourceName, + this._workflowOptions.ConfigureTelemetry, + this._workflowOptions.TelemetryActivitySource); + } + else if (this._workflowOptions.ConfigureTelemetry is not null) + { + throw new InvalidOperationException( + $"{nameof(DeclarativeWorkflowOptions.ConfigureTelemetry)} was provided but telemetry is not enabled. " + + $"Set {nameof(DeclarativeWorkflowOptions.TelemetrySourceName)} or {nameof(DeclarativeWorkflowOptions.TelemetryActivitySource)} to enable telemetry."); + } + // Build final workflow return builder.WorkflowBuilder.Build(validateOrphans: false); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs new file mode 100644 index 0000000000..ab4b5a68be --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs @@ -0,0 +1,310 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace Microsoft.Agents.AI.Workflows.Declarative.UnitTests; + +/// +/// Tests for telemetry configuration. +/// +[Collection("DeclarativeWorkflowOptionsTest")] +public sealed class DeclarativeWorkflowOptionsTest : IDisposable +{ + // These constants mirror Microsoft.Agents.AI.Workflows.Observability.ActivityNames + // which is internal and not accessible from this test project. + private const string WorkflowBuildActivityName = "workflow.build"; + private const string WorkflowRunActivityName = "workflow_invoke"; + + private const string SimpleWorkflowYaml = """ + kind: Workflow + trigger: + kind: OnConversationStart + id: test_workflow + actions: + - kind: EndConversation + id: end_all + """; + + private readonly ActivitySource _activitySource = new("TestSource"); + private readonly ActivityListener _activityListener; + private readonly ConcurrentBag _capturedActivities = []; + + public DeclarativeWorkflowOptionsTest() + { + this._activityListener = new ActivityListener + { + ShouldListenTo = source => + source.Name.Contains(typeof(Workflow).Namespace!) || + source.Name == "TestSource" || + source.Name == "Test.Workflows", + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => this._capturedActivities.Add(activity), + }; + ActivitySource.AddActivityListener(this._activityListener); + } + + public void Dispose() + { + this._activityListener.Dispose(); + this._activitySource.Dispose(); + } + + [Fact] + public void TelemetrySourceName_DefaultIsNull() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object); + + // Assert + Assert.Null(options.TelemetrySourceName); + } + + [Fact] + public void TelemetrySourceName_CanBeSet() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + const string SourceName = "MyApp.Workflows"; + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetrySourceName = SourceName + }; + + // Assert + Assert.Equal(SourceName, options.TelemetrySourceName); + } + + [Fact] + public void ConfigureTelemetry_DefaultIsNull() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object); + + // Assert + Assert.Null(options.ConfigureTelemetry); + } + + [Fact] + public void ConfigureTelemetry_CanBeSet() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + bool callbackInvoked = false; + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + ConfigureTelemetry = opt => + { + callbackInvoked = true; + opt.EnableSensitiveData = true; + } + }; + + // Assert + Assert.NotNull(options.ConfigureTelemetry); + WorkflowTelemetryOptions telemetryOptions = new(); + options.ConfigureTelemetry(telemetryOptions); + Assert.True(callbackInvoked); + Assert.True(telemetryOptions.EnableSensitiveData); + } + + [Fact] + public void TelemetryActivitySource_DefaultIsNull() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object); + + // Assert + Assert.Null(options.TelemetryActivitySource); + } + + [Fact] + public void TelemetryActivitySource_CanBeSet() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetryActivitySource = this._activitySource + }; + + // Assert + Assert.Same(this._activitySource, options.TelemetryActivitySource); + } + + [Fact] + public async Task BuildWorkflow_WithTelemetrySourceName_AppliesTelemetryAsync() + { + // Arrange + using Activity testActivity = new Activity("TelemetrySourceNameTest").Start()!; + Mock mockProvider = CreateMockProvider(); + const string SourceName = "Test.Workflows"; + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetrySourceName = SourceName, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId) + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal)); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal)); + } + + [Fact] + public async Task BuildWorkflow_WithTelemetryActivitySource_AppliesTelemetryAsync() + { + // Arrange + using Activity testActivity = new Activity("TelemetryActivitySourceTest").Start()!; + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetryActivitySource = this._activitySource, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == "TestSource") + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.All(capturedActivities, a => Assert.Equal("TestSource", a.Source.Name)); + } + + [Fact] + public async Task BuildWorkflow_WithConfigureTelemetry_AppliesConfigurationAsync() + { + // Arrange + using Activity testActivity = new Activity("ConfigureTelemetryTest").Start()!; + Mock mockProvider = CreateMockProvider(); + bool configureInvoked = false; + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetrySourceName = "Test.Workflows", + ConfigureTelemetry = opt => + { + configureInvoked = true; + opt.EnableSensitiveData = true; + }, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Assert.True(configureInvoked); + + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId) + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal)); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal)); + } + + [Fact] + public async Task BuildWorkflow_WithoutTelemetry_DoesNotCreateActivitiesAsync() + { + // Arrange + using Activity testActivity = new Activity("NoTelemetryTest").Start()!; + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert - No workflow activities should be created when telemetry is disabled + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && + (a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal) || + a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal))) + .ToArray(); + + Assert.Empty(capturedActivities); + } + + [Fact] + public void BuildWorkflow_WithConfigureTelemetryOnly_ThrowsInvalidOperationException() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + ConfigureTelemetry = opt => opt.EnableSensitiveData = true, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act & Assert + using StringReader reader = new(SimpleWorkflowYaml); + InvalidOperationException exception = Assert.Throws( + () => DeclarativeWorkflowBuilder.Build(reader, options)); + + Assert.Contains(nameof(DeclarativeWorkflowOptions.ConfigureTelemetry), exception.Message); + Assert.Contains(nameof(DeclarativeWorkflowOptions.TelemetrySourceName), exception.Message); + Assert.Contains(nameof(DeclarativeWorkflowOptions.TelemetryActivitySource), exception.Message); + } + + private static Mock CreateMockProvider() + { + Mock mockAgentProvider = new(MockBehavior.Strict); + mockAgentProvider + .Setup(provider => provider.CreateConversationAsync(It.IsAny())) + .Returns(() => Task.FromResult(Guid.NewGuid().ToString("N"))); + mockAgentProvider + .Setup(provider => provider.CreateMessageAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ChatMessage(ChatRole.Assistant, "Test response"))); + return mockAgentProvider; + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 5ca56f00e8..66de68e24c 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -111,8 +111,6 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme Run run = await executionEnvironment.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); - await Task.Delay(100); // Allow time for activities to be captured - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created."); @@ -166,8 +164,6 @@ public async Task CreatesWorkflowActivities_WithCorrectNameAsync() // Act CreateWorkflow(); - await Task.Delay(100); // Allow time for activities to be captured - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().HaveCount(1, "Exactly 1 activity should be created."); @@ -197,8 +193,6 @@ public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() WorkflowBuilder builder = new(uppercase); var workflow = builder.Build(); // No WithOpenTelemetry() call - await Task.Delay(100); // Allow time for activities to be captured - // Assert - No activities should be created var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); From 6643a2d1031d8058d0ba0fd46d324be0d107e14b Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 5 Feb 2026 21:18:44 -0800 Subject: [PATCH 12/13] Remove delays in tests --- .../ObservabilityTests.cs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 66de68e24c..cd2dff5bbd 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -225,8 +225,6 @@ public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = userActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotBeEmpty("Activities should be created with user-provided ActivitySource."); @@ -248,8 +246,6 @@ public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync() WorkflowBuilder builder = new(uppercase); var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( @@ -274,8 +270,6 @@ public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( @@ -303,8 +297,6 @@ public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( @@ -326,8 +318,6 @@ public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync( Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( @@ -349,8 +339,6 @@ public async Task DisableMessageSend_PreventsMessageSendActivityAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( @@ -406,8 +394,6 @@ public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); var executorActivity = capturedActivities.FirstOrDefault( @@ -439,8 +425,6 @@ public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); var executorActivity = capturedActivities.FirstOrDefault( @@ -473,8 +457,6 @@ public async Task EnableSensitiveData_LogsMessageSendContentAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); var messageSendActivity = capturedActivities.FirstOrDefault( @@ -507,8 +489,6 @@ public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync() Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); await run.DisposeAsync(); - await Task.Delay(100); - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); var messageSendActivity = capturedActivities.FirstOrDefault( From e8d29d5f908965aa8c780fbf70b4f486aed419ec Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 5 Feb 2026 21:43:22 -0800 Subject: [PATCH 13/13] Address comments --- .../DeclarativeWorkflowOptions.cs | 14 ++-- .../Interpreter/WorkflowActionVisitor.cs | 9 +-- .../Execution/DirectEdgeRunner.cs | 2 +- .../Execution/EdgeRunner.cs | 2 +- .../Execution/FanInEdgeRunner.cs | 2 +- .../Execution/FanOutEdgeRunner.cs | 2 +- .../Execution/ResponseEdgeRunner.cs | 2 +- .../Observability/WorkflowTelemetryContext.cs | 43 ++++++----- .../OpenTelemetryWorkflowBuilderExtensions.cs | 17 ++--- .../DeclarativeWorkflowOptionsTest.cs | 71 +++---------------- .../ObservabilityTests.cs | 4 +- 11 files changed, 55 insertions(+), 113 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs index 1a1b29a6e6..ab7794f1b8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs @@ -45,12 +45,6 @@ public sealed class DeclarativeWorkflowOptions(WorkflowAgentProvider agentProvid /// public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; - /// - /// Gets the telemetry source name for OpenTelemetry instrumentation. - /// If , telemetry is disabled unless is provided. - /// - public string? TelemetrySourceName { get; init; } - /// /// Gets the callback to configure telemetry options. /// @@ -59,6 +53,14 @@ public sealed class DeclarativeWorkflowOptions(WorkflowAgentProvider agentProvid /// /// Gets an optional for telemetry. /// If provided, the caller retains ownership and is responsible for disposal. + /// If but is set, a shared default + /// activity source named "Microsoft.Agents.AI.Workflows" will be used. /// public ActivitySource? TelemetryActivitySource { get; init; } + + /// + /// Gets a value indicating whether telemetry is enabled. + /// Telemetry is enabled when either or is set. + /// + internal bool IsTelemetryEnabled => this.ConfigureTelemetry is not null || this.TelemetryActivitySource is not null; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs index f00be49031..a90b1bd9c9 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs @@ -52,19 +52,12 @@ public Workflow Complete() this._workflowModel.Build(builder); // Apply telemetry if configured - if (this._workflowOptions.TelemetrySourceName is not null || this._workflowOptions.TelemetryActivitySource is not null) + if (this._workflowOptions.IsTelemetryEnabled) { builder.WorkflowBuilder.WithOpenTelemetry( - this._workflowOptions.TelemetrySourceName, this._workflowOptions.ConfigureTelemetry, this._workflowOptions.TelemetryActivitySource); } - else if (this._workflowOptions.ConfigureTelemetry is not null) - { - throw new InvalidOperationException( - $"{nameof(DeclarativeWorkflowOptions.ConfigureTelemetry)} was provided but telemetry is not enabled. " + - $"Set {nameof(DeclarativeWorkflowOptions.TelemetrySourceName)} or {nameof(DeclarativeWorkflowOptions.TelemetryActivitySource)} to enable telemetry."); - } // Build final workflow return builder.WorkflowBuilder.Build(validateOrphans: false); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs index ac082ad0e6..db643ab441 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs @@ -14,7 +14,7 @@ private async ValueTask FindRouterAsync(IStepTracer? tracer) => await protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index 4a2d6c72ad..309072c32f 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -24,5 +24,5 @@ internal abstract class EdgeRunner( protected IRunnerContext RunContext { get; } = Throw.IfNull(runContext); protected TEdgeData EdgeData { get; } = Throw.IfNull(edgeData); - protected Activity? StartActivity(string name) => this.RunContext.TelemetryContext.StartEdgeGroupProcessActivity(); + protected Activity? StartActivity() => this.RunContext.TelemetryContext.StartEdgeGroupProcessActivity(); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs index 09ea6b5960..be80ef34de 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs @@ -19,7 +19,7 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e { Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input"); - using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner)) .SetTag(Tags.MessageTargetId, this.EdgeData.SinkId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs index c8fe40bbe2..d1102d6554 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs @@ -13,7 +13,7 @@ internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData { protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs index e9b8ee7fce..969509e40b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs @@ -25,7 +25,7 @@ public static ResponseEdgeRunner ForPort(IRunnerContext runContext, string execu { Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input"); - using var activity = this.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(ResponseEdgeRunner)) .SetTag(Tags.MessageSourceId, envelope.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index a9071dfdb4..e4b8d7a851 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -11,6 +11,9 @@ namespace Microsoft.Agents.AI.Workflows.Observability; /// internal sealed class WorkflowTelemetryContext { + private const string DefaultSourceName = "Microsoft.Agents.AI.Workflows"; + private static readonly ActivitySource s_defaultActivitySource = new(DefaultSourceName); + /// /// Gets a shared instance representing disabled telemetry. /// @@ -29,30 +32,29 @@ internal sealed class WorkflowTelemetryContext /// /// Gets the activity source used for creating telemetry spans. /// - public ActivitySource? ActivitySource { get; } + public ActivitySource ActivitySource { get; } private WorkflowTelemetryContext() { this.IsEnabled = false; this.Options = new WorkflowTelemetryOptions(); - this.ActivitySource = null; + this.ActivitySource = s_defaultActivitySource; } /// /// Initializes a new instance of the class with telemetry enabled. /// - /// The source name for the activity source. Used only when is . /// The telemetry options. /// /// An optional activity source to use. If provided, this activity source will be used directly - /// and the caller retains ownership (responsible for disposal). If , a new - /// activity source will be created using . + /// and the caller retains ownership (responsible for disposal). If , the + /// shared default activity source will be used. /// - public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions options, ActivitySource? activitySource = null) + public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource? activitySource = null) { this.IsEnabled = true; this.Options = options; - this.ActivitySource = activitySource ?? new ActivitySource(sourceName); + this.ActivitySource = activitySource ?? s_defaultActivitySource; } /// @@ -63,7 +65,12 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti /// An activity if telemetry is enabled and the activity is sampled, otherwise null. public Activity? StartActivity(string name, ActivityKind kind = ActivityKind.Internal) { - return this.ActivitySource?.StartActivity(name, kind); + if (!this.IsEnabled) + { + return null; + } + + return this.ActivitySource.StartActivity(name, kind); } /// @@ -72,12 +79,12 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti /// An activity if workflow build telemetry is enabled, otherwise null. public Activity? StartWorkflowBuildActivity() { - if (this.Options.DisableWorkflowBuild) + if (!this.IsEnabled || this.Options.DisableWorkflowBuild) { return null; } - return this.ActivitySource?.StartActivity(ActivityNames.WorkflowBuild); + return this.ActivitySource.StartActivity(ActivityNames.WorkflowBuild); } /// @@ -86,12 +93,12 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti /// An activity if workflow run telemetry is enabled, otherwise null. public Activity? StartWorkflowRunActivity() { - if (this.Options.DisableWorkflowRun) + if (!this.IsEnabled || this.Options.DisableWorkflowRun) { return null; } - return this.ActivitySource?.StartActivity(ActivityNames.WorkflowRun); + return this.ActivitySource.StartActivity(ActivityNames.WorkflowRun); } /// @@ -104,12 +111,12 @@ public WorkflowTelemetryContext(string sourceName, WorkflowTelemetryOptions opti /// An activity if executor process telemetry is enabled, otherwise null. public Activity? StartExecutorProcessActivity(string executorId, string? executorType, string messageType, object? message) { - if (this.Options.DisableExecutorProcess) + if (!this.IsEnabled || this.Options.DisableExecutorProcess) { return null; } - Activity? activity = this.ActivitySource?.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); + Activity? activity = this.ActivitySource.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); if (activity is null) { return null; @@ -146,12 +153,12 @@ public void SetExecutorOutput(Activity? activity, object? output) /// An activity if edge group process telemetry is enabled, otherwise null. public Activity? StartEdgeGroupProcessActivity() { - if (this.Options.DisableEdgeGroupProcess) + if (!this.IsEnabled || this.Options.DisableEdgeGroupProcess) { return null; } - return this.ActivitySource?.StartActivity(ActivityNames.EdgeGroupProcess); + return this.ActivitySource.StartActivity(ActivityNames.EdgeGroupProcess); } /// @@ -163,12 +170,12 @@ public void SetExecutorOutput(Activity? activity, object? output) /// An activity if message send telemetry is enabled, otherwise null. public Activity? StartMessageSendActivity(string sourceId, string? targetId, object? message) { - if (this.Options.DisableMessageSend) + if (!this.IsEnabled || this.Options.DisableMessageSend) { return null; } - Activity? activity = this.ActivitySource?.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + Activity? activity = this.ActivitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); if (activity is null) { return null; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs index 93d199a488..ffa0f0362d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs @@ -12,25 +12,18 @@ namespace Microsoft.Agents.AI.Workflows; /// public static class OpenTelemetryWorkflowBuilderExtensions { - private const string DefaultSourceName = "Microsoft.Agents.AI.Workflows"; - /// /// Enables OpenTelemetry instrumentation for the workflow, providing comprehensive observability for workflow operations. /// /// The to which OpenTelemetry support will be added. - /// - /// An optional source name that will be used to identify telemetry data from this workflow. - /// If not specified, a default source name will be used. This parameter is ignored when - /// is provided. - /// /// /// An optional callback that provides additional configuration of the instance. /// This allows for fine-tuning telemetry behavior such as enabling sensitive data collection. /// /// /// An optional to use for telemetry. If provided, this activity source will be used - /// directly and the caller retains ownership (responsible for disposal). If , a new - /// activity source will be created using . + /// directly and the caller retains ownership (responsible for disposal). If , a shared + /// default activity source named "Microsoft.Agents.AI.Workflows" will be used. /// /// The with OpenTelemetry instrumentation enabled, enabling method chaining. /// is . @@ -53,13 +46,12 @@ public static class OpenTelemetryWorkflowBuilderExtensions /// /// var workflow = new WorkflowBuilder(startExecutor) /// .AddEdge(executor1, executor2) - /// .WithOpenTelemetry("MyApp.Workflows", cfg => cfg.EnableSensitiveData = true) + /// .WithOpenTelemetry(cfg => cfg.EnableSensitiveData = true) /// .Build(); /// /// public static WorkflowBuilder WithOpenTelemetry( this WorkflowBuilder builder, - string? sourceName = null, Action? configure = null, ActivitySource? activitySource = null) { @@ -68,8 +60,7 @@ public static WorkflowBuilder WithOpenTelemetry( WorkflowTelemetryOptions options = new(); configure?.Invoke(options); - string effectiveSourceName = string.IsNullOrEmpty(sourceName) ? DefaultSourceName : sourceName!; - WorkflowTelemetryContext context = new(effectiveSourceName, options, activitySource); + WorkflowTelemetryContext context = new(options, activitySource); builder.SetTelemetryContext(context); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs index ab4b5a68be..73c4792e32 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs @@ -25,6 +25,9 @@ public sealed class DeclarativeWorkflowOptionsTest : IDisposable private const string WorkflowBuildActivityName = "workflow.build"; private const string WorkflowRunActivityName = "workflow_invoke"; + // The default activity source name used by the workflow telemetry context. + private const string DefaultTelemetrySourceName = "Microsoft.Agents.AI.Workflows"; + private const string SimpleWorkflowYaml = """ kind: Workflow trigger: @@ -44,9 +47,8 @@ public DeclarativeWorkflowOptionsTest() this._activityListener = new ActivityListener { ShouldListenTo = source => - source.Name.Contains(typeof(Workflow).Namespace!) || - source.Name == "TestSource" || - source.Name == "Test.Workflows", + source.Name == DefaultTelemetrySourceName || + source.Name == "TestSource", Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, ActivityStarted = activity => this._capturedActivities.Add(activity), }; @@ -59,36 +61,6 @@ public void Dispose() this._activitySource.Dispose(); } - [Fact] - public void TelemetrySourceName_DefaultIsNull() - { - // Arrange - Mock mockProvider = CreateMockProvider(); - - // Act - DeclarativeWorkflowOptions options = new(mockProvider.Object); - - // Assert - Assert.Null(options.TelemetrySourceName); - } - - [Fact] - public void TelemetrySourceName_CanBeSet() - { - // Arrange - Mock mockProvider = CreateMockProvider(); - const string SourceName = "MyApp.Workflows"; - - // Act - DeclarativeWorkflowOptions options = new(mockProvider.Object) - { - TelemetrySourceName = SourceName - }; - - // Assert - Assert.Equal(SourceName, options.TelemetrySourceName); - } - [Fact] public void ConfigureTelemetry_DefaultIsNull() { @@ -157,15 +129,14 @@ public void TelemetryActivitySource_CanBeSet() } [Fact] - public async Task BuildWorkflow_WithTelemetrySourceName_AppliesTelemetryAsync() + public async Task BuildWorkflow_WithDefaultTelemetry_AppliesTelemetryAsync() { // Arrange - using Activity testActivity = new Activity("TelemetrySourceNameTest").Start()!; + using Activity testActivity = new Activity("DefaultTelemetryTest").Start()!; Mock mockProvider = CreateMockProvider(); - const string SourceName = "Test.Workflows"; DeclarativeWorkflowOptions options = new(mockProvider.Object) { - TelemetrySourceName = SourceName, + ConfigureTelemetry = _ => { }, LoggerFactory = NullLoggerFactory.Instance }; @@ -177,7 +148,7 @@ public async Task BuildWorkflow_WithTelemetrySourceName_AppliesTelemetryAsync() // Assert Activity[] capturedActivities = this._capturedActivities - .Where(a => a.RootId == testActivity.RootId) + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == DefaultTelemetrySourceName) .ToArray(); Assert.NotEmpty(capturedActivities); @@ -221,7 +192,6 @@ public async Task BuildWorkflow_WithConfigureTelemetry_AppliesConfigurationAsync bool configureInvoked = false; DeclarativeWorkflowOptions options = new(mockProvider.Object) { - TelemetrySourceName = "Test.Workflows", ConfigureTelemetry = opt => { configureInvoked = true; @@ -240,7 +210,7 @@ public async Task BuildWorkflow_WithConfigureTelemetry_AppliesConfigurationAsync Assert.True(configureInvoked); Activity[] capturedActivities = this._capturedActivities - .Where(a => a.RootId == testActivity.RootId) + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == DefaultTelemetrySourceName) .ToArray(); Assert.NotEmpty(capturedActivities); @@ -275,27 +245,6 @@ public async Task BuildWorkflow_WithoutTelemetry_DoesNotCreateActivitiesAsync() Assert.Empty(capturedActivities); } - [Fact] - public void BuildWorkflow_WithConfigureTelemetryOnly_ThrowsInvalidOperationException() - { - // Arrange - Mock mockProvider = CreateMockProvider(); - DeclarativeWorkflowOptions options = new(mockProvider.Object) - { - ConfigureTelemetry = opt => opt.EnableSensitiveData = true, - LoggerFactory = NullLoggerFactory.Instance - }; - - // Act & Assert - using StringReader reader = new(SimpleWorkflowYaml); - InvalidOperationException exception = Assert.Throws( - () => DeclarativeWorkflowBuilder.Build(reader, options)); - - Assert.Contains(nameof(DeclarativeWorkflowOptions.ConfigureTelemetry), exception.Message); - Assert.Contains(nameof(DeclarativeWorkflowOptions.TelemetrySourceName), exception.Message); - Assert.Contains(nameof(DeclarativeWorkflowOptions.TelemetryActivitySource), exception.Message); - } - private static Mock CreateMockProvider() { Mock mockAgentProvider = new(MockBehavior.Strict); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index cd2dff5bbd..e7a99d5ca2 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -192,7 +192,7 @@ public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); WorkflowBuilder builder = new(uppercase); - var workflow = builder.Build(); // No WithOpenTelemetry() call + builder.Build(); // No WithOpenTelemetry() call // Assert - No activities should be created var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); @@ -244,7 +244,7 @@ public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync() // Act WorkflowBuilder builder = new(uppercase); - var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build(); + builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build(); // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();