Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 17 additions & 26 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,23 @@ It's recommended to use a local [OTEL Collector](https://opentelemetry.io/docs/c

Example configuration: https://github.com/andreasohlund/Docker/tree/main/otel-monitoring

The following metrics are available:

### Ingestion

#### Success or failure

- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter)
- `sc.audit.ingestion.retry` - Retried audit message count (Counter)
- `sc.audit.ingestion.failed` - Failed audit message count (Counter)

The above metrics also have the following attributes attached:

- `messaging.message.body.size` - The size of the message body in bytes
- `messaging.message.type` - The logical message type of the message if present

#### Details

- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram)
- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter)

### Batching

- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram)
- Attributes:
- `ingestion.batch_size`
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter)
The following ingestion metrics with their corresponding dimensions are available:

- `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds
- `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full`, `partial` or `failed`
- `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds
- `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message`
- `result` - Indicates the outcome of the operation: `success`, `failed` or `skipped` (if the message was filtered out and skipped)
- `sc.audit.ingestion.failures_total` - Failure counter
- `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message`
- `result` - Indicates how the failure was resolved: `retry` or `stored-poision`
- `sc.audit.ingestion.consecutive_batch_failure_total` - Consecutive batch failures

Example queries in PromQL for use in Grafana:

- Ingestion rate: `sum (rate(sc_audit_ingestion_message_duration_seconds_count[$__rate_interval])) by (exported_job)`
- Failure rate: `sum(rate(sc_audit_ingestion_failures_total[$__rate_interval])) by (exported_job,result)`
- Message duration: `histogram_quantile(0.9,sum(rate(sc_audit_ingestion_message_duration_seconds_bucket[$__rate_interval])) by (le,exported_job))`

## Monitoring

Expand Down
4 changes: 3 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<PackageVersion Include="NUnit" Version="4.3.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="5.0.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
Expand All @@ -63,6 +64,7 @@
<PackageVersion Include="ReactiveUI.WPF" Version="20.1.63" />
<PackageVersion Include="ServiceControl.Contracts" Version="5.0.0" />
<PackageVersion Include="System.Configuration.ConfigurationManager" Version="8.0.1" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.2" />
<PackageVersion Include="System.Diagnostics.PerformanceCounter" Version="8.0.1" />
<PackageVersion Include="System.DirectoryServices.AccountManagement" Version="8.0.1" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
Expand All @@ -89,4 +91,4 @@
<GlobalPackageReference Include="Microsoft.Build.CopyOnWrite" Version="1.0.334" />
<GlobalPackageReference Include="Particular.Packaging" Version="4.2.2" />
</ItemGroup>
</Project>
</Project>
64 changes: 29 additions & 35 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Metrics;
using Microsoft.Extensions.Hosting;
using NServiceBus;
using NServiceBus.Logging;
Expand All @@ -26,7 +26,8 @@ public AuditIngestion(
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
IHostApplicationLifetime applicationLifetime)
IHostApplicationLifetime applicationLifetime,
IngestionMetrics metrics)
{
inputEndpoint = settings.AuditQueue;
this.transportCustomization = transportCustomization;
Expand All @@ -35,21 +36,24 @@ public AuditIngestion(
this.unitOfWorkFactory = unitOfWorkFactory;
this.settings = settings;
this.applicationLifetime = applicationLifetime;
this.metrics = metrics;

if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
}

channel = Channel.CreateBounded<MessageContext>(new BoundedChannelOptions(transportSettings.MaxConcurrency.Value)
MaxBatchSize = transportSettings.MaxConcurrency.Value;

channel = Channel.CreateBounded<MessageContext>(new BoundedChannelOptions(MaxBatchSize)
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
});

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError, metrics);

watchdog = new Watchdog(
"audit message ingestion",
Expand Down Expand Up @@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken)

async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body);
using (new DurationRecorder(ingestionDuration, tags))
using var messageIngestionMetrics = metrics.BeginIngestion(messageContext);

if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
return;
}
messageIngestionMetrics.Skipped();
return;
}

var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
await channel.Writer.WriteAsync(messageContext, cancellationToken);
_ = await taskCompletionSource.Task;

successfulMessagesCounter.Add(1, tags);
}
messageIngestionMetrics.Success();
}

public override async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -218,27 +221,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
var contexts = new List<MessageContext>(MaxBatchSize);

while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
// will only enter here if there is something to read.
try
{
using var batchMetrics = metrics.BeginBatch(MaxBatchSize);

// as long as there is something to read this will fetch up to MaximumConcurrency items
using (var recorder = new DurationRecorder(batchDuration))
while (channel.Reader.TryRead(out var context))
{
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

recorder.Tags.Add("ingestion.batch_size", contexts.Count);

await auditIngestor.Ingest(contexts);
contexts.Add(context);
}

consecutiveBatchFailuresCounter.Record(0);
await auditIngestor.Ingest(contexts);

batchMetrics.Complete(contexts.Count);
}
catch (Exception e)
{
Expand All @@ -255,9 +255,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}

logger.Info("Ingesting messages failed", e);

// no need to do interlocked increment since this is running sequential
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
Expand Down Expand Up @@ -298,8 +295,8 @@ public override async Task StopAsync(CancellationToken cancellationToken)

TransportInfrastructure transportInfrastructure;
IMessageReceiver messageReceiver;
long consecutiveBatchFailures = 0;

readonly int MaxBatchSize;
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
Expand All @@ -309,12 +306,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Histogram<double> batchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
readonly IHostApplicationLifetime applicationLifetime;
readonly IngestionMetrics metrics;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

Expand Down
Loading
Loading