Skip to content
This repository was archived by the owner on Jul 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/ServiceLayer.Mesh/Functions/FileDiscoveryFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class FileDiscoveryFunction(
IMeshInboxService meshInboxService,
ServiceLayerDbContext serviceLayerDbContext,
IFileExtractQueueClient fileExtractQueueClient)
: MeshFileFunctionBase(serviceLayerDbContext)
{
[Function("FileDiscoveryFunction")]
public async Task Run([TimerTrigger("%FileDiscoveryTimerExpression%")] TimerInfo myTimer)
Expand All @@ -26,28 +27,16 @@ public async Task Run([TimerTrigger("%FileDiscoveryTimerExpression%")] TimerInfo
// TODO - check if response.IsSuccessful before proceeding to dereference the Response.Messages
foreach (var messageId in response.Response.Messages)
{
await using var transaction = await serviceLayerDbContext.Database.BeginTransactionAsync();
await using var transaction = await ServiceLayerDbContext.Database.BeginTransactionAsync();

var existing = await serviceLayerDbContext.MeshFiles
var existing = await ServiceLayerDbContext.MeshFiles
.AnyAsync(f => f.FileId == messageId);

if (!existing)
{
var file = new MeshFile
{
FileId = messageId,
FileType = MeshFileType.NbssAppointmentEvents,
MailboxId = configuration.NbssMeshMailboxId,
Status = MeshFileStatus.Discovered,
FirstSeenUtc = DateTime.UtcNow,
LastUpdatedUtc = DateTime.UtcNow
};

serviceLayerDbContext.MeshFiles.Add(file);

await serviceLayerDbContext.SaveChangesAsync();
await transaction.CommitAsync();
var file = await CreateMeshFile(messageId);

await transaction.CommitAsync();
await fileExtractQueueClient.EnqueueFileExtractAsync(file);
}
else
Expand All @@ -56,4 +45,27 @@ public async Task Run([TimerTrigger("%FileDiscoveryTimerExpression%")] TimerInfo
}
}
}

private async Task<MeshFile> CreateMeshFile(string messageId)
{
var now = DateTime.UtcNow;

var file = new MeshFile
{
FileId = messageId,
FileType = MeshFileType.NbssAppointmentEvents,
MailboxId = configuration.NbssMeshMailboxId,
Status = MeshFileStatus.Discovered,
FirstSeenUtc = now,
LastUpdatedUtc = now
};

ServiceLayerDbContext.MeshFiles.Add(file);

await UpdateMeshFile(file, MeshFileStatus.Discovered);

return file;
}

protected override FileEventSource Source => FileEventSource.DiscoveryFunction;
}
25 changes: 8 additions & 17 deletions src/ServiceLayer.Mesh/Functions/FileExtractFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ public class FileExtractFunction(
ServiceLayerDbContext serviceLayerDbContext,
IFileTransformQueueClient fileTransformQueueClient,
IFileExtractQueueClient fileExtractQueueClient,
IMeshFilesBlobStore meshFileBlobStore)
IMeshFilesBlobStore meshFileBlobStore) : MeshFileFunctionBase(serviceLayerDbContext)
{
[Function("FileExtractFunction")]
public async Task Run([QueueTrigger("%FileExtractQueueName%")] FileExtractQueueMessage message)
{
logger.LogInformation("{FunctionName} started. Processing fileId: {FileId}", nameof(FileExtractFunction), message.FileId);

await using var transaction = await serviceLayerDbContext.Database.BeginTransactionAsync();
await using var transaction = await ServiceLayerDbContext.Database.BeginTransactionAsync();

var file = await GetFileAsync(message.FileId);
if (file == null || !IsFileSuitableForExtraction(file))
{
return;
}

await UpdateFileStatusForExtraction(file);
await UpdateMeshFile(file, MeshFileStatus.Extracting);
await transaction.CommitAsync();

try
Expand All @@ -47,7 +47,7 @@ public async Task Run([QueueTrigger("%FileExtractQueueName%")] FileExtractQueueM

private async Task<MeshFile?> GetFileAsync(string fileId)
{
var file = await serviceLayerDbContext.MeshFiles
var file = await ServiceLayerDbContext.MeshFiles
.FirstOrDefaultAsync(f => f.FileId == fileId);

if (file == null)
Expand Down Expand Up @@ -76,13 +76,6 @@ private bool IsFileSuitableForExtraction(MeshFile file)
return true;
}

private async Task UpdateFileStatusForExtraction(MeshFile file)
{
file.Status = MeshFileStatus.Extracting;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
}

private async Task ProcessFileExtraction(MeshFile file)
{
var meshResponse = await meshInboxService.GetMessageByIdAsync(file.MailboxId, file.FileId);
Expand All @@ -100,19 +93,17 @@ private async Task ProcessFileExtraction(MeshFile file)
}

file.BlobPath = blobPath;
file.Status = MeshFileStatus.Extracted;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, MeshFileStatus.Extracted);

await fileTransformQueueClient.EnqueueFileTransformAsync(file);
}

private async Task HandleExtractionError(MeshFile file, FileExtractQueueMessage message, Exception ex)
{
logger.LogError(ex, "An exception occurred during file extraction for fileId: {FileId}", message.FileId);
file.Status = MeshFileStatus.FailedExtract;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, MeshFileStatus.FailedExtract);
await fileExtractQueueClient.SendToPoisonQueueAsync(message);
}

protected override FileEventSource Source => FileEventSource.ExtractFunction;
}
16 changes: 9 additions & 7 deletions src/ServiceLayer.Mesh/Functions/FileRetryFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class FileRetryFunction(
ServiceLayerDbContext serviceLayerDbContext,
IFileExtractQueueClient fileExtractQueueClient,
IFileTransformQueueClient fileTransformQueueClient,
IFileRetryFunctionConfiguration configuration)
IFileRetryFunctionConfiguration configuration) : MeshFileFunctionBase(serviceLayerDbContext)
{
[Function("FileRetryFunction")]
public async Task Run([TimerTrigger("%FileRetryTimerExpression%")] TimerInfo myTimer)
Expand All @@ -28,7 +28,7 @@ public async Task Run([TimerTrigger("%FileRetryTimerExpression%")] TimerInfo myT

private async Task RetryStaleExtractions(DateTime staleDateTimeUtc)
{
var staleFiles = await serviceLayerDbContext.MeshFiles
var staleFiles = await ServiceLayerDbContext.MeshFiles
.Where(f =>
(f.Status == MeshFileStatus.Discovered || f.Status == MeshFileStatus.Extracting)
&& f.LastUpdatedUtc <= staleDateTimeUtc)
Expand All @@ -39,15 +39,15 @@ private async Task RetryStaleExtractions(DateTime staleDateTimeUtc)
foreach (var file in staleFiles)
{
await fileExtractQueueClient.EnqueueFileExtractAsync(file);
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, file.Status);
await ServiceLayerDbContext.SaveChangesAsync();
logger.LogInformation("FileRetryFunction: File {FileFileId} enqueued to Extract queue", file.FileId);
}
}

private async Task RetryStaleTransformations(DateTime staleDateTimeUtc)
{
var staleFiles = await serviceLayerDbContext.MeshFiles
var staleFiles = await ServiceLayerDbContext.MeshFiles
.Where(f =>
(f.Status == MeshFileStatus.Extracted || f.Status == MeshFileStatus.Transforming)
&& f.LastUpdatedUtc <= staleDateTimeUtc)
Expand All @@ -58,9 +58,11 @@ private async Task RetryStaleTransformations(DateTime staleDateTimeUtc)
foreach (var file in staleFiles)
{
await fileTransformQueueClient.EnqueueFileTransformAsync(file);
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, file.Status);
await ServiceLayerDbContext.SaveChangesAsync();
logger.LogInformation("FileRetryFunction: File {FileFileId} enqueued to Transform queue", file.FileId);
}
}

protected override FileEventSource Source => FileEventSource.RetryFunction;
}
25 changes: 8 additions & 17 deletions src/ServiceLayer.Mesh/Functions/FileTransformFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class FileTransformFunction(
ServiceLayerDbContext serviceLayerDbContext,
IFileTransformQueueClient fileTransformQueueClient,
IMeshFilesBlobStore meshFileBlobStore,
IEnumerable<IFileTransformer> fileTransformers)
IEnumerable<IFileTransformer> fileTransformers) : MeshFileFunctionBase(serviceLayerDbContext)
{
private static readonly JsonSerializerOptions ValidationErrorJsonOptions = new()
{
Expand All @@ -34,15 +34,15 @@ public async Task Run([QueueTrigger("%FileTransformQueueName%")] FileTransformQu
logger.LogInformation("{FunctionName} started. Processing fileId: {FileId}", nameof(FileTransformFunction),
message.FileId);

await using var transaction = await serviceLayerDbContext.Database.BeginTransactionAsync();
await using var transaction = await ServiceLayerDbContext.Database.BeginTransactionAsync();

var file = await GetFileAsync(message.FileId);
if (file == null || !IsFileSuitableForTransformation(file))
{
return;
}

await UpdateFileStatusForTransformation(file);
await UpdateMeshFile(file, MeshFileStatus.Transforming);
await transaction.CommitAsync();

try
Expand All @@ -57,7 +57,7 @@ public async Task Run([QueueTrigger("%FileTransformQueueName%")] FileTransformQu

private async Task<MeshFile?> GetFileAsync(string fileId)
{
var file = await serviceLayerDbContext.MeshFiles
var file = await ServiceLayerDbContext.MeshFiles
.FirstOrDefaultAsync(f => f.FileId == fileId);

if (file == null)
Expand Down Expand Up @@ -87,13 +87,6 @@ private bool IsFileSuitableForTransformation(MeshFile file)
return true;
}

private async Task UpdateFileStatusForTransformation(MeshFile file)
{
file.Status = MeshFileStatus.Transforming;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
}

private async Task ProcessFileTransformation(MeshFile file)
{
var transformer = GetTransformerFor(file.FileType);
Expand All @@ -107,9 +100,7 @@ private async Task ProcessFileTransformation(MeshFile file)
throw new InvalidOperationException("Validation errors encountered");
}

file.Status = MeshFileStatus.Transformed;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, MeshFileStatus.Transformed);
}

private IFileTransformer GetTransformerFor(MeshFileType type)
Expand All @@ -129,9 +120,9 @@ private IFileTransformer GetTransformerFor(MeshFileType type)
private async Task HandleTransformationError(MeshFile file, FileTransformQueueMessage message, Exception ex)
{
logger.LogError(ex, "An exception occurred during file transformation for fileId: {FileId}", message.FileId);
file.Status = MeshFileStatus.FailedTransform;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await UpdateMeshFile(file, MeshFileStatus.FailedTransform);
await fileTransformQueueClient.SendToPoisonQueueAsync(message);
}

protected override FileEventSource Source => FileEventSource.TransformFunction;
}
31 changes: 31 additions & 0 deletions src/ServiceLayer.Mesh/Functions/MeshFileFunctionBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using ServiceLayer.Data;
using ServiceLayer.Data.Models;

namespace ServiceLayer.Mesh.Functions;

public abstract class MeshFileFunctionBase(ServiceLayerDbContext serviceLayerDbContext)
{
protected ServiceLayerDbContext ServiceLayerDbContext { get; } = serviceLayerDbContext;

protected abstract FileEventSource Source { get; }

protected async Task UpdateMeshFile(MeshFile file, MeshFileStatus status)
{
var now = DateTime.UtcNow;

file.Status = status;
file.LastUpdatedUtc = now;

var fileEvent = new MeshFileEvent
{
FileId = file.FileId,
Status = status,
TimestampUtc = now,
Source = Source
};

ServiceLayerDbContext.MeshFileEvents.Add(fileEvent);

await ServiceLayerDbContext.SaveChangesAsync();
}
}
Loading
Loading