Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@ The following metrics are available:

### Ingestion

- `sc.audit.ingestion.count` - Successful ingested audit message count
- `sc.audit.ingestion.retry` - Retried audit message count
- `sc.audit.ingestion.failed` - Failed audit message count
- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds)
- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes)
- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count
#### Success or failure

### Batching
- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter)
- `sc.audit.ingestion.retry` - Retried audit message count (Counter)
- `sc.audit.ingestion.failed` - Failed audit message count (Counter)

The above metrics also have the following attributes attached:

- `messaging.message.body.size` - The size of the message body in bytes
- `messaging.message.type` - The logical message type of the message if present

- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds)
- `sc.audit.ingestion.batch_size` - Batch size (number of messages)
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures
#### Details

### Storage
- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram)
- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter)

### Batching

- `sc.audit.ingestion.audits_count` - Stored audit message count
- `sc.audit.ingestion.sagas_count` - Stored sagas message count
- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds)
- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram)
- Attributes:
- `ingestion.batch_size`
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter)

## Monitoring

Expand Down
24 changes: 11 additions & 13 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)

async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
using (new DurationRecorder(ingestionDuration))
var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body);
using (new DurationRecorder(ingestionDuration, tags))
{
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
Expand All @@ -187,8 +188,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;

ingestedMessagesCounter.Add(1);
messageSize.Record(messageContext.Body.Length / 1024.0);
successfulMessagesCounter.Add(1, tags);
}
}

Expand All @@ -210,15 +210,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
try
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
using (var recorder = new DurationRecorder(batchDuration))
{
contexts.Add(context);
}
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

auditBatchSize.Record(contexts.Count);
recorder.Tags.Add("ingestion.batch_size", contexts.Count);

using (new DurationRecorder(auditBatchDuration))
{
await auditIngestor.Ingest(contexts);
}

Expand Down Expand Up @@ -293,10 +293,8 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count");
readonly Histogram<double> batchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
Expand Down
11 changes: 7 additions & 4 deletions src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
using Infrastructure;
using NServiceBus.Logging;
using NServiceBus.Transport;
using ServiceControl.Audit.Persistence;
using ServiceControl.Configuration;
using Persistence;
using Configuration;
using ServiceControl.Infrastructure;

class AuditIngestionFaultPolicy
Expand All @@ -35,16 +35,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging

public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
{
var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body);

//Same as recoverability policy in NServiceBusFactory
if (errorContext.ImmediateProcessingFailures < 3)
{
retryCounter.Add(1);
retryCounter.Add(1, tags);
return ErrorHandleResult.RetryRequired;
}

await StoreFailedMessageDocument(errorContext, cancellationToken);

failedCounter.Add(1);
failedCounter.Add(1, tags);

return ErrorHandleResult.Handled;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure;
using ServiceControl.Transports;

public class AuditIngestor
Expand Down Expand Up @@ -131,7 +132,7 @@ public async Task VerifyCanReachForwardingAddress()
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");

static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
Expand Down
16 changes: 2 additions & 14 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Text.Json;
using System.Threading.Tasks;
using Infrastructure;
Expand Down Expand Up @@ -61,14 +60,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
}

await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);

storedAuditsCounter.Add(1);
}
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
{
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);

storedSagasCounter.Add(1);
}

storedContexts.Add(context);
Expand Down Expand Up @@ -105,11 +100,8 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo

try
{
using (new DurationRecorder(commitDuration))
{
// this can throw even though dispose is never supposed to throw
await unitOfWork.DisposeAsync();
}
// this can throw even though dispose is never supposed to throw
await unitOfWork.DisposeAsync();
}
catch (Exception e)
{
Expand Down Expand Up @@ -252,10 +244,6 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
}
}

readonly Counter<long> storedAuditsCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count");
readonly Counter<long> storedSagasCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count");
readonly Histogram<double> commitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration");

static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace ServiceControl.Audit;
using NServiceBus.Transport;
using Persistence;
using Transports;
using ServiceControl.Infrastructure;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;

Expand Down
6 changes: 3 additions & 3 deletions src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace ServiceControl.Audit;
namespace ServiceControl.Infrastructure;

using System;
using System.Diagnostics;
using System.Diagnostics.Metrics;

record DurationRecorder(Histogram<double> Histogram) : IDisposable
record DurationRecorder(Histogram<double> Histogram, TagList Tags = default) : IDisposable
{
readonly Stopwatch sw = Stopwatch.StartNew();

public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds);
public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags);
}
19 changes: 16 additions & 3 deletions src/ServiceControl.Audit/Infrastructure/Telemetry.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
namespace ServiceControl.Audit;
namespace ServiceControl.Infrastructure;

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using NServiceBus;
using OpenTelemetry.Metrics;

static class Telemetry
Expand All @@ -10,8 +14,17 @@ static class Telemetry

public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower();

public static void AddAuditIngestionMeters(this MeterProviderBuilder builder)
public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName);

public static TagList GetIngestedMessageTags(IDictionary<string, string> headers, ReadOnlyMemory<byte> body)
{
builder.AddMeter(MeterName);
var tags = new TagList { { "messaging.message.body.size", body.Length } };

if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
{
tags.Add("messaging.message.type", messageType);
}

return tags;
}
}