From 7e787913ad958e020724f93d2af85607bd34392b Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Mon, 20 Oct 2025 12:08:44 +0100 Subject: [PATCH 1/9] feat: implement Upsert functionality for entity management across data services --- .../ProcessFileClasses/ProcessCaasFile.cs | 34 ++++----- .../DataServices.Client/DataServiceClient.cs | 22 ++++++ .../DataServices.Client/IDataServiceClient.cs | 6 ++ .../DataServices.Core/DataServiceAccessor.cs | 76 +++++++++++++++++++ .../Interfaces/IDataServiceAccessor.cs | 7 ++ .../DataServices.Core/RequestHandler.cs | 57 ++++++++++++++ 6 files changed, 182 insertions(+), 20 deletions(-) diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs index f122b72dd2..79a0f96b4a 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs @@ -175,34 +175,28 @@ private async Task UpdateOldDemographicRecord(BasicParticipantCsvRecord ba throw new FormatException("Unable to parse NHS Number"); } - var participant = await _participantDemographic.GetSingleByFilter(x => x.NhsNumber == nhsNumber); - - if (participant == null) - { - _logger.LogWarning("The participant could not be found, when trying to update old Participant"); - return false; - } - - basicParticipantCsvRecord.Participant.RecordInsertDateTime = participant.RecordInsertDateTime?.ToString("yyyy-MM-dd HH:mm:ss"); - var participantForUpdate = basicParticipantCsvRecord.Participant.ToParticipantDemographic(); - - participantForUpdate.RecordUpdateDateTime = DateTime.UtcNow; - participantForUpdate.ParticipantId = participant.ParticipantId; + // Use Upsert instead of separate Get + Update + // This handles both insert and update atomically at the database level + var participantForUpsert = basicParticipantCsvRecord.Participant.ToParticipantDemographic(); + participantForUpsert.RecordUpdateDateTime = DateTime.UtcNow; + // Note: For new records, RecordInsertDateTime will be set by the database + // For existing records, it will be preserved + // The ParticipantId will be set automatically by the database for new records - var updated = await _participantDemographic.Update(participantForUpdate); - if (updated) + var upserted = await _participantDemographic.Upsert(participantForUpsert); + if (upserted) { - _logger.LogInformation("updating old Demographic record was successful"); - return updated; + _logger.LogInformation("Upsert of Demographic record was successful for NHS Number: {NhsNumber}", nhsNumber); + return true; } - _logger.LogError("updating old Demographic record was not successful"); - throw new InvalidOperationException("updating old Demographic record was not successful"); + _logger.LogError("Upsert of Demographic record was not successful for NHS Number: {NhsNumber}", nhsNumber); + throw new InvalidOperationException($"Upsert of Demographic record was not successful for NHS Number: {nhsNumber}"); } catch (Exception ex) { - var errorDescription = $"Update participant function failed.\nMessage: {ex.Message}\nStack Trace: {ex.StackTrace}"; + var errorDescription = $"Upsert participant function failed.\nMessage: {ex.Message}\nStack Trace: {ex.StackTrace}"; _logger.LogError(ex, errorDescription); await CreateError(basicParticipantCsvRecord.Participant, name, errorDescription); } diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceClient.cs b/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceClient.cs index 4ea66a5810..abc6267187 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceClient.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceClient.cs @@ -156,6 +156,28 @@ public async Task Update(TEntity entity) return true; } + public async Task Upsert(TEntity entity) + { + var jsonString = JsonSerializer.Serialize(entity); + + if (string.IsNullOrEmpty(jsonString)) + { + _logger.LogWarning("Unable to serialize upsert request body for entity of type {EntityType}", typeof(TEntity).FullName); + return false; + } + + // Use POST with a special endpoint or header to indicate upsert + // Option 1: Use a dedicated upsert endpoint + var upsertUrl = UrlBuilder(_baseUrl, "upsert"); + var result = await _httpClientFunction.SendPost(upsertUrl, jsonString); + + if (result.StatusCode != HttpStatusCode.OK) + { + return false; + } + return true; + } + private async Task GetJsonStringByFilter(Expression> predicate, bool returnOneRecord = false) { try diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Client/IDataServiceClient.cs b/application/CohortManager/src/Functions/Shared/DataServices.Client/IDataServiceClient.cs index b0835cdbc9..b93a848a58 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Client/IDataServiceClient.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Client/IDataServiceClient.cs @@ -51,4 +51,10 @@ public interface IDataServiceClient /// the object that is being updated/param> /// a boolean representing if the record was updated successfully Task Update(TEntity entity); + /// + /// Upserts (Inserts or Updates) a single record atomically + /// + /// the object to be upserted + /// a boolean representing if the record was upserted successfully + Task Upsert(TEntity entity); } diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs index cbff40513d..2f2ad835f2 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs @@ -141,10 +141,86 @@ public async Task Update(TEntity entity, Expression throw new MultipleRecordsFoundException("Multiple Records were updated by PUT request, Changes have been Rolled-back"); } return dbEntity!; + } + public async Task Upsert(TEntity entity, Expression> predicate) + { + int rowsAffected = 0; + var strategy = _context.Database.CreateExecutionStrategy(); + await strategy.ExecuteAsync( + async () => + { + using var transaction = await _context.Database.BeginTransactionAsync(); + try + { + // Check if entity exists + var existingEntity = await _context.Set().AsNoTracking().SingleOrDefaultAsync(predicate); + + if (existingEntity == null) + { + // Insert new entity + await _context.AddAsync(entity); + _logger.LogInformation("Inserting new entity in Upsert operation"); + } + else + { + // Update existing entity - preserve key fields + _logger.LogInformation("Updating existing entity in Upsert operation"); + + // Preserve ParticipantId (primary key) from existing record + var keyProperty = typeof(TEntity).GetProperty("ParticipantId"); + if (keyProperty != null) + { + var existingKey = keyProperty.GetValue(existingEntity); + keyProperty.SetValue(entity, existingKey); + _logger.LogDebug("Preserved ParticipantId: {ParticipantId}", existingKey); + } + + // Preserve RecordInsertDateTime from existing record to maintain audit trail + var insertDateProperty = typeof(TEntity).GetProperty("RecordInsertDateTime"); + if (insertDateProperty != null) + { + var existingInsertDate = insertDateProperty.GetValue(existingEntity); + if (existingInsertDate != null) + { + insertDateProperty.SetValue(entity, existingInsertDate); + _logger.LogDebug("Preserved RecordInsertDateTime: {RecordInsertDateTime}", existingInsertDate); + } + } + + _context.Update(entity); + } + + rowsAffected = await _context.SaveChangesAsync(); + + if (rowsAffected == 0) + { + _logger.LogWarning("Upsert resulted in 0 rows affected"); + await transaction.RollbackAsync(); + return; + } + else if (rowsAffected > 1) + { + _logger.LogError("Multiple records ({RowsAffected}) were affected during upsert operation. Rolling back transaction.", rowsAffected); + await transaction.RollbackAsync(); + return; + } + + await transaction.CommitAsync(); + _logger.LogInformation("Upsert operation completed successfully. Rows affected: {RowsAffected}", rowsAffected); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during upsert operation"); + await transaction.RollbackAsync(); + throw; + } + } + ); + return rowsAffected > 0; } } diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/Interfaces/IDataServiceAccessor.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/Interfaces/IDataServiceAccessor.cs index a150257d1f..8abfe7f45e 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/Interfaces/IDataServiceAccessor.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/Interfaces/IDataServiceAccessor.cs @@ -10,4 +10,11 @@ public interface IDataServiceAccessor Task InsertMany(IEnumerable entities); Task Remove(Expression> predicate); Task Update(TEntity entity, Expression> predicate); + /// + /// Upserts (Insert or Update) a single entity atomically using database MERGE + /// + /// The entity to upsert + /// The predicate to match existing records + /// True if successful, false otherwise + Task Upsert(TEntity entity, Expression> predicate); } diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs index 44da47726a..c42f7dfa10 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs @@ -62,6 +62,11 @@ public async Task HandleRequest(HttpRequestData req, string? k return CreateErrorResponse(req,"No Key Provided for Deletion",HttpStatusCode.BadRequest); } case "POST": + // Check if this is an upsert request (special key "upsert") + if (key != null && key.Equals("upsert", StringComparison.OrdinalIgnoreCase)) + { + return await Upsert(req); + } return await Post(req); case "PUT": if (key != null) @@ -211,6 +216,58 @@ private async Task Post(HttpRequestData req) } + private async Task Upsert(HttpRequestData req) + { + if(!_authConfig.CanPost(req)) // Reuse Post authorization for Upsert + { + return CreateErrorResponse(req,UnauthorizedErrorMessage,HttpStatusCode.Unauthorized); + } + try + { + string jsonData; + using (StreamReader reader = new StreamReader(req.Body, Encoding.UTF8)) + { + jsonData = await reader.ReadToEndAsync(); + } + + var entityData = JsonSerializer.Deserialize(jsonData,jsonSerializerOptions); + if (entityData == null) + { + return CreateErrorResponse(req, "Failed to deserialize Record", HttpStatusCode.BadRequest); + } + + // Get the key value from the entity to create the predicate + var keyValue = _keyInfo.GetValue(entityData); + if (keyValue == null) + { + return CreateErrorResponse(req, "Entity key is null", HttpStatusCode.BadRequest); + } + + var keyPredicate = CreateGetByKeyExpression(keyValue.ToString()); + var result = await _dataServiceAccessor.Upsert(entityData, keyPredicate); + + if (!result) + { + return CreateErrorResponse(req,"Failed to Upsert Record",HttpStatusCode.InternalServerError); + } + + return CreateHttpResponse(req,new DataServiceResponse + { + JsonData = "Success" + }); + } + catch(JsonException je) + { + _logger.LogError(je, "Failed to deserialize Data, This is due to a badly formed request"); + return CreateErrorResponse(req,"Failed to deserialize Record",HttpStatusCode.BadRequest); + } + catch (Exception ex) + { + _logger.LogError(ex, "An unexpected error occurred while trying to upsert data"); + return CreateErrorResponse(req,"Failed to Upsert Record",HttpStatusCode.InternalServerError); + } + } + private async Task UpdateById(HttpRequestData req, string key) { if(!_authConfig.CanPut(req)) From e92a4b7b395909b67a3a7dd3cd3c477f2d0faa1a Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Thu, 23 Oct 2025 16:56:06 +0100 Subject: [PATCH 2/9] refactor: remove batched inserts of demographic data --- .../ProcessFileClasses/Batch.cs | 4 +-- .../ProcessFileClasses/ProcessCaasFile.cs | 34 ++++--------------- .../receiveCaasFile/Program.cs | 1 - .../processCaasFileTests.cs | 13 +++---- 4 files changed, 11 insertions(+), 41 deletions(-) diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/Batch.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/Batch.cs index e4c167adb2..de1af0f730 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/Batch.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/Batch.cs @@ -10,12 +10,10 @@ public Batch() AddRecords = new ConcurrentQueue(); UpdateRecords = new ConcurrentQueue(); DeleteRecords = new ConcurrentQueue(); - DemographicData = new ConcurrentQueue(); } public ConcurrentQueue AddRecords { get; set; } public ConcurrentQueue UpdateRecords { get; set; } public ConcurrentQueue DeleteRecords { get; set; } - public ConcurrentQueue DemographicData { get; set; } -} \ No newline at end of file +} diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs index 79a0f96b4a..d905a76434 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs @@ -14,7 +14,6 @@ public class ProcessCaasFile : IProcessCaasFile { private readonly ILogger _logger; private readonly IReceiveCaasFileHelper _receiveCaasFileHelper; - private readonly ICallDurableDemographicFunc _callDurableDemographicFunc; private readonly ICreateBasicParticipantData _createBasicParticipantData; private readonly IAddBatchToQueue _addBatchToQueue; private readonly IExceptionHandler _exceptionHandler; @@ -22,7 +21,6 @@ public class ProcessCaasFile : IProcessCaasFile private readonly IRecordsProcessedTracker _recordsProcessTracker; private readonly IValidateDates _validateDates; private readonly ReceiveCaasFileConfig _config; - private readonly string DemographicURI; public ProcessCaasFile( @@ -34,7 +32,6 @@ public ProcessCaasFile( IDataServiceClient participantDemographic, IRecordsProcessedTracker recordsProcessedTracker, IValidateDates validateDates, - ICallDurableDemographicFunc callDurableDemographicFunc, IOptions receiveCaasFileConfig ) { @@ -46,9 +43,7 @@ IOptions receiveCaasFileConfig _participantDemographic = participantDemographic; _recordsProcessTracker = recordsProcessedTracker; _validateDates = validateDates; - _callDurableDemographicFunc = callDurableDemographicFunc; _config = receiveCaasFileConfig.Value; - DemographicURI = _config.DemographicURI; } /// @@ -93,10 +88,7 @@ await Parallel.ForEachAsync(values, options, async (rec, cancellationToken) => await AddRecordToBatch(participant, currentBatch, name); }); - if (await _callDurableDemographicFunc.PostDemographicDataAsync(currentBatch.DemographicData.ToList(), DemographicURI, name)) - { - await AddBatchToQueue(currentBatch, name); - } + await AddBatchToQueue(currentBatch, name); } /// @@ -114,40 +106,26 @@ private async Task AddRecordToBatch(Participant participant, Batch currentBatch, FileName = fileName, Participant = participant }; - // take note: we don't need to add DemographicData to the queue for update because we loop through all updates in the UpdateParticipant method + + // Upsert demographic record immediately (no batching) + await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName); + + // Add to Service Bus queues based on record type switch (participant.RecordType?.Trim()) { - case Actions.New: - currentBatch.AddRecords.Enqueue(basicParticipantCsvRecord); - if (await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName)) - { - break; - } - currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic()); break; case Actions.Amended: - if (!await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName)) - { - currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic()); - currentBatch.UpdateRecords.Enqueue(basicParticipantCsvRecord); - break; - } currentBatch.UpdateRecords.Enqueue(basicParticipantCsvRecord); break; case Actions.Removed: - if (!await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName)) - { - currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic()); - } currentBatch.DeleteRecords.Enqueue(basicParticipantCsvRecord); break; default: await _exceptionHandler.CreateSchemaValidationException(basicParticipantCsvRecord, "RecordType was not set to an expected value"); break; } - } private async Task AddBatchToQueue(Batch currentBatch, string name) diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs index db7f262421..287bb29928 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs @@ -31,7 +31,6 @@ services.AddScoped(); //Do not change the lifetime of this. services.AddSingleton(); services.AddScoped(); - services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); //Do not change the lifetime of this. diff --git a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs index d7ba0a86e4..2d3ebb7d1c 100644 --- a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs +++ b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs @@ -25,7 +25,6 @@ public class ProcessCaasFileTests private readonly Mock _mockHttpClientFunction = new(); - private readonly Mock _callDurableFunc = new(); private readonly ProcessCaasFile _processCaasFile; private readonly Mock> _config = new(); @@ -43,7 +42,6 @@ private ProcessCaasFile CreateProcessCaasFile(ReceiveCaasFileConfig config) _databaseClientParticipantMock.Object, _recordsProcessedTrackerMock.Object, _validateDates.Object, - _callDurableFunc.Object, _config.Object ); } @@ -80,7 +78,7 @@ public async Task ProcessRecords_ValidParticipants_ProcessesSuccessfully() It.IsAny())) .Returns(new Participant { NhsNumber = "1234567890", RecordType = Actions.New }); - _callDurableFunc.Setup(demo => demo.PostDemographicDataAsync(It.IsAny>(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _databaseClientParticipantMock.Setup(db => db.Upsert(It.IsAny())).ReturnsAsync(true); // Act await processCaasFile.ProcessRecords(participants, options, screeningService, fileName); @@ -114,8 +112,7 @@ public async Task ProcessRecords_Amend_sendOneRecordToUpdateQueue() _receiveCaasFileHelperMock.Setup(helper => helper.MapParticipant(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(new Participant { NhsNumber = "1234567890", RecordType = Actions.Amended }); - _callDurableFunc.Setup(demo => demo.PostDemographicDataAsync(It.IsAny>(), It.IsAny(), It.IsAny())) - .ReturnsAsync(true); + _databaseClientParticipantMock.Setup(db => db.Upsert(It.IsAny())).ReturnsAsync(true); // Act await processCaasFile.ProcessRecords(participants, options, screeningService, fileName); @@ -195,8 +192,7 @@ public async Task AddRecordToBatch_UpdateRecord_addsRecordToBatch() { // Arrange var processCaasFile = CreateProcessCaasFile(GetDefaultConfig(true)); - _callDurableFunc.Setup(demo => demo.PostDemographicDataAsync(It.IsAny>(), It.IsAny(), It.IsAny())) - .ReturnsAsync(true); + _databaseClientParticipantMock.Setup(db => db.Upsert(It.IsAny())).ReturnsAsync(true); var updateParticipant = processCaasFile.GetType().GetMethod("UpdateOldDemographicRecord", BindingFlags.Instance | BindingFlags.NonPublic); var basicParticipantCsvRecord = new BasicParticipantCsvRecord() @@ -231,8 +227,7 @@ public async Task AddRecordToBatch_ValidNewRecord_AddsRecordToBatch() var participant = new Participant { NhsNumber = "1234567890", RecordType = Actions.New }; var currentBatch = new Batch(); - _callDurableFunc.Setup(m => m.PostDemographicDataAsync(It.IsAny>(), It.IsAny(), It.IsAny())) - .ReturnsAsync(true); + _databaseClientParticipantMock.Setup(db => db.Upsert(It.IsAny())).ReturnsAsync(true); var arguments = new object[] { participant, currentBatch, "testFile" }; From a33659cfe1e273e8870baea714dec5948b06607f Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Thu, 23 Oct 2025 16:58:53 +0100 Subject: [PATCH 3/9] feat: enhance Upsert operation to preserve primary keys using EF Core metadata --- .../DataServices.Core/DataServiceAccessor.cs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs index 2f2ad835f2..2c6a30935d 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs @@ -169,13 +169,22 @@ await strategy.ExecuteAsync( // Update existing entity - preserve key fields _logger.LogInformation("Updating existing entity in Upsert operation"); - // Preserve ParticipantId (primary key) from existing record - var keyProperty = typeof(TEntity).GetProperty("ParticipantId"); - if (keyProperty != null) + // Preserve primary key(s) from existing record using EF Core metadata + var entityType = _context.Model.FindEntityType(typeof(TEntity)); + var keyProperties = entityType?.FindPrimaryKey()?.Properties; + + if (keyProperties != null) { - var existingKey = keyProperty.GetValue(existingEntity); - keyProperty.SetValue(entity, existingKey); - _logger.LogDebug("Preserved ParticipantId: {ParticipantId}", existingKey); + foreach (var keyProperty in keyProperties) + { + var clrProperty = typeof(TEntity).GetProperty(keyProperty.Name); + if (clrProperty != null) + { + var existingKey = clrProperty.GetValue(existingEntity); + clrProperty.SetValue(entity, existingKey); + _logger.LogDebug("Preserved primary key {KeyName}: {KeyValue}", keyProperty.Name, existingKey); + } + } } // Preserve RecordInsertDateTime from existing record to maintain audit trail From cf01a3c336b0a261fa93a864cf12d30f2cf740e7 Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Thu, 23 Oct 2025 17:09:53 +0100 Subject: [PATCH 4/9] feat: add Upsert method to DataServiceStaticCachedClient and update related tests --- .../DataServices.Client/DataServiceStaticCachedClient.cs | 5 +++++ .../processCaasFileTest/processCaasFileTests.cs | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceStaticCachedClient.cs b/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceStaticCachedClient.cs index 029c35d032..2fc9365569 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceStaticCachedClient.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Client/DataServiceStaticCachedClient.cs @@ -97,6 +97,11 @@ public Task Update(TEntity entity) throw new NotImplementedException(); } + public Task Upsert(TEntity entity) + { + throw new NotImplementedException(); + } + private Expression> CreateGetByKeyExpression(string filter) { var entityParameter = Expression.Parameter(typeof(TEntity)); diff --git a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs index 2d3ebb7d1c..5a86d67088 100644 --- a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs +++ b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs @@ -210,9 +210,9 @@ public async Task AddRecordToBatch_UpdateRecord_addsRecordToBatch() _databaseClientParticipantMock.Verify(x => x.Update(It.IsAny()), Times.Never); - _loggerMock.Verify(x => x.Log(It.Is(l => l == LogLevel.Warning), + _loggerMock.Verify(x => x.Log(It.Is(l => l == LogLevel.Information), It.IsAny(), - It.Is((v, t) => v.ToString().Contains("The participant could not be found")), + It.Is((v, t) => v.ToString().Contains("Upsert of Demographic record was successful")), It.IsAny(), It.IsAny>()), Times.Once); @@ -258,7 +258,7 @@ public async Task UpdateRecords_ValidNewRecord_ThrowsError() _databaseClientParticipantMock.Setup(x => x.GetSingleByFilter(It.IsAny>>())).Returns(Task.FromResult(response)); - _databaseClientParticipantMock.Setup(x => x.Update(It.IsAny())) + _databaseClientParticipantMock.Setup(x => x.Upsert(It.IsAny())) .ThrowsAsync(new Exception("some exception")); var updateParticipant = processCaasFile.GetType().GetMethod("UpdateOldDemographicRecord", BindingFlags.Instance | BindingFlags.NonPublic); @@ -272,7 +272,7 @@ public async Task UpdateRecords_ValidNewRecord_ThrowsError() _loggerMock.Verify(x => x.Log( It.Is(l => l == LogLevel.Error), It.IsAny(), - It.Is((v, t) => v.ToString().Contains("Update participant function failed")), + It.Is((v, t) => v.ToString().Contains("Upsert participant function failed")), It.IsAny(), It.IsAny>()), Times.Once); From a90aa3d666252b32f92d2e221ca076e8b91f1c23 Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Fri, 24 Oct 2025 11:28:35 +0100 Subject: [PATCH 5/9] refactor: streamline Upsert operation by extracting methods for entity insertion and update --- .../DataServices.Core/DataServiceAccessor.cs | 127 +++++++++++------- 1 file changed, 79 insertions(+), 48 deletions(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs index 2c6a30935d..8b8a3342c5 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs @@ -155,64 +155,21 @@ await strategy.ExecuteAsync( try { - // Check if entity exists var existingEntity = await _context.Set().AsNoTracking().SingleOrDefaultAsync(predicate); if (existingEntity == null) { - // Insert new entity - await _context.AddAsync(entity); - _logger.LogInformation("Inserting new entity in Upsert operation"); + await InsertNewEntity(entity); } else { - // Update existing entity - preserve key fields - _logger.LogInformation("Updating existing entity in Upsert operation"); - - // Preserve primary key(s) from existing record using EF Core metadata - var entityType = _context.Model.FindEntityType(typeof(TEntity)); - var keyProperties = entityType?.FindPrimaryKey()?.Properties; - - if (keyProperties != null) - { - foreach (var keyProperty in keyProperties) - { - var clrProperty = typeof(TEntity).GetProperty(keyProperty.Name); - if (clrProperty != null) - { - var existingKey = clrProperty.GetValue(existingEntity); - clrProperty.SetValue(entity, existingKey); - _logger.LogDebug("Preserved primary key {KeyName}: {KeyValue}", keyProperty.Name, existingKey); - } - } - } - - // Preserve RecordInsertDateTime from existing record to maintain audit trail - var insertDateProperty = typeof(TEntity).GetProperty("RecordInsertDateTime"); - if (insertDateProperty != null) - { - var existingInsertDate = insertDateProperty.GetValue(existingEntity); - if (existingInsertDate != null) - { - insertDateProperty.SetValue(entity, existingInsertDate); - _logger.LogDebug("Preserved RecordInsertDateTime: {RecordInsertDateTime}", existingInsertDate); - } - } - - _context.Update(entity); + await UpdateExistingEntity(entity, existingEntity); } rowsAffected = await _context.SaveChangesAsync(); - if (rowsAffected == 0) + if (!ValidateRowsAffected(rowsAffected)) { - _logger.LogWarning("Upsert resulted in 0 rows affected"); - await transaction.RollbackAsync(); - return; - } - else if (rowsAffected > 1) - { - _logger.LogError("Multiple records ({RowsAffected}) were affected during upsert operation. Rolling back transaction.", rowsAffected); await transaction.RollbackAsync(); return; } @@ -222,9 +179,9 @@ await strategy.ExecuteAsync( } catch (Exception ex) { - _logger.LogError(ex, "Error during upsert operation"); + _logger.LogError(ex, "Error during upsert operation. Rolling back transaction."); await transaction.RollbackAsync(); - throw; + throw new InvalidOperationException("Upsert operation failed. See inner exception for details.", ex); } } ); @@ -232,6 +189,80 @@ await strategy.ExecuteAsync( return rowsAffected > 0; } + private async Task InsertNewEntity(TEntity entity) + { + await _context.AddAsync(entity); + _logger.LogInformation("Inserting new entity in Upsert operation"); + } + + private async Task UpdateExistingEntity(TEntity entity, TEntity existingEntity) + { + _logger.LogInformation("Updating existing entity in Upsert operation"); + + PreservePrimaryKeys(entity, existingEntity); + PreserveRecordInsertDateTime(entity, existingEntity); + + _context.Update(entity); + await Task.CompletedTask; + } + + private void PreservePrimaryKeys(TEntity entity, TEntity existingEntity) + { + var entityType = _context.Model.FindEntityType(typeof(TEntity)); + var keyProperties = entityType?.FindPrimaryKey()?.Properties; + + if (keyProperties == null) + { + return; + } + + var propertyNames = keyProperties.Select(kp => kp.Name).ToList(); + + foreach (var propertyName in propertyNames) + { + var clrProperty = typeof(TEntity).GetProperty(propertyName); + if (clrProperty != null) + { + var existingKey = clrProperty.GetValue(existingEntity); + clrProperty.SetValue(entity, existingKey); + _logger.LogDebug("Preserved primary key {KeyName}: {KeyValue}", propertyName, existingKey); + } + } + } + + private void PreserveRecordInsertDateTime(TEntity entity, TEntity existingEntity) + { + var insertDateProperty = typeof(TEntity).GetProperty("RecordInsertDateTime"); + if (insertDateProperty == null) + { + return; + } + + var existingInsertDate = insertDateProperty.GetValue(existingEntity); + if (existingInsertDate != null) + { + insertDateProperty.SetValue(entity, existingInsertDate); + _logger.LogDebug("Preserved RecordInsertDateTime: {RecordInsertDateTime}", existingInsertDate); + } + } + + private bool ValidateRowsAffected(int rowsAffected) + { + if (rowsAffected == 0) + { + _logger.LogWarning("Upsert resulted in 0 rows affected"); + return false; + } + + if (rowsAffected > 1) + { + _logger.LogError("Multiple records ({RowsAffected}) were affected during upsert operation. Rolling back transaction.", rowsAffected); + return false; + } + + return true; + } + } From ddc8cae03a72371f9bfa67b00e45ed6348bbf9d1 Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Fri, 24 Oct 2025 11:29:12 +0100 Subject: [PATCH 6/9] refactor: replace hardcoded success and error messages with constants in RequestHandler --- .../DataServices.Core/RequestHandler.cs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs index c42f7dfa10..c99eaeeb63 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs @@ -27,6 +27,8 @@ public class RequestHandler : IRequestHandler where TEntity : }; private const string UnauthorizedErrorMessage = "Action was either Unauthorized or not enabled"; + private const string SuccessMessage = "Success"; + private const string DeserializationErrorMessage = "Failed to deserialize Record"; public RequestHandler(IDataServiceAccessor dataServiceAccessor, ILogger> logger, AuthenticationConfiguration authenticationConfiguration) { @@ -199,13 +201,13 @@ private async Task Post(HttpRequestData req) } return CreateHttpResponse(req,new DataServiceResponse { - JsonData = "Success" + JsonData = SuccessMessage }); } catch(JsonException je) { _logger.LogError(je, "Failed to get deserialize Data, This is due to a badly formed request"); - return CreateErrorResponse(req,"Failed to deserialize Record",HttpStatusCode.BadRequest); + return CreateErrorResponse(req, DeserializationErrorMessage, HttpStatusCode.BadRequest); } catch (Exception ex) { @@ -233,7 +235,7 @@ private async Task Upsert(HttpRequestData req) var entityData = JsonSerializer.Deserialize(jsonData,jsonSerializerOptions); if (entityData == null) { - return CreateErrorResponse(req, "Failed to deserialize Record", HttpStatusCode.BadRequest); + return CreateErrorResponse(req, DeserializationErrorMessage, HttpStatusCode.BadRequest); } // Get the key value from the entity to create the predicate @@ -253,13 +255,13 @@ private async Task Upsert(HttpRequestData req) return CreateHttpResponse(req,new DataServiceResponse { - JsonData = "Success" + JsonData = SuccessMessage }); } catch(JsonException je) { _logger.LogError(je, "Failed to deserialize Data, This is due to a badly formed request"); - return CreateErrorResponse(req,"Failed to deserialize Record",HttpStatusCode.BadRequest); + return CreateErrorResponse(req, DeserializationErrorMessage, HttpStatusCode.BadRequest); } catch (Exception ex) { @@ -294,13 +296,13 @@ private async Task UpdateById(HttpRequestData req, string key) } return CreateHttpResponse(req,new DataServiceResponse { - JsonData = "Success" + JsonData = SuccessMessage }); } catch(JsonException je) { _logger.LogError(je, "Failed to get deserialize Data, This is due to a badly formed request"); - return CreateErrorResponse(req,"Failed to deserialize Record",HttpStatusCode.BadRequest); + return CreateErrorResponse(req, DeserializationErrorMessage, HttpStatusCode.BadRequest); } catch (Exception ex) { @@ -323,7 +325,7 @@ private async Task DeleteById(HttpRequestData req, string key) } return CreateHttpResponse(req,new DataServiceResponse { - JsonData = "Success" + JsonData = SuccessMessage }); } @@ -336,6 +338,11 @@ private static bool IsJsonArray(string json) private Expression> CreateGetByKeyExpression(string filter) { + if (filter == null) + { + throw new ArgumentNullException(nameof(filter), "Filter parameter cannot be null"); + } + var entityParameter = Expression.Parameter(typeof(TEntity)); var entityKey = Expression.Property(entityParameter, _keyInfo.Name); var filterConstant = Expression.Constant(Convert.ChangeType(filter, ReflectionUtilities.GetPropertyType(typeof(TEntity), _keyInfo.Name))); From d45a6df776f1f484a357c5f8f8fb2739e920d971 Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Fri, 24 Oct 2025 11:46:11 +0100 Subject: [PATCH 7/9] refactor: remove NHS number from Upsert logging in ProcessCaasFile and remove redundant comments --- .../ProcessFileClasses/ProcessCaasFile.cs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs index d905a76434..35ec8a106b 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs @@ -153,24 +153,18 @@ private async Task UpdateOldDemographicRecord(BasicParticipantCsvRecord ba throw new FormatException("Unable to parse NHS Number"); } - // Use Upsert instead of separate Get + Update - // This handles both insert and update atomically at the database level var participantForUpsert = basicParticipantCsvRecord.Participant.ToParticipantDemographic(); participantForUpsert.RecordUpdateDateTime = DateTime.UtcNow; - // Note: For new records, RecordInsertDateTime will be set by the database - // For existing records, it will be preserved - // The ParticipantId will be set automatically by the database for new records - var upserted = await _participantDemographic.Upsert(participantForUpsert); if (upserted) { - _logger.LogInformation("Upsert of Demographic record was successful for NHS Number: {NhsNumber}", nhsNumber); + _logger.LogInformation("Upsert of Demographic record was successful"); return true; } - _logger.LogError("Upsert of Demographic record was not successful for NHS Number: {NhsNumber}", nhsNumber); - throw new InvalidOperationException($"Upsert of Demographic record was not successful for NHS Number: {nhsNumber}"); + _logger.LogError("Upsert of Demographic record was not successful"); + throw new InvalidOperationException("Upsert of Demographic record was not successful"); } catch (Exception ex) { From 26d0dbf94d3b90ca998a2521ba2314a630c34e0b Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Fri, 24 Oct 2025 12:02:47 +0100 Subject: [PATCH 8/9] fix: validate entity key string representation in Post method of RequestHandler --- .../Functions/Shared/DataServices.Core/RequestHandler.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs index c99eaeeb63..90dee921e4 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/RequestHandler.cs @@ -245,7 +245,13 @@ private async Task Upsert(HttpRequestData req) return CreateErrorResponse(req, "Entity key is null", HttpStatusCode.BadRequest); } - var keyPredicate = CreateGetByKeyExpression(keyValue.ToString()); + var keyValueString = keyValue.ToString(); + if (string.IsNullOrEmpty(keyValueString)) + { + return CreateErrorResponse(req, "Entity key string representation is invalid", HttpStatusCode.BadRequest); + } + + var keyPredicate = CreateGetByKeyExpression(keyValueString); var result = await _dataServiceAccessor.Upsert(entityData, keyPredicate); if (!result) From b565c6f2d4a14fc9a96eab1ba7054d5bdf38b533 Mon Sep 17 00:00:00 2001 From: Sam Ainsworth Date: Fri, 24 Oct 2025 12:13:41 +0100 Subject: [PATCH 9/9] refactor: simplify conditional checks and logging in Update method of DataServiceAccessor --- .../Shared/DataServices.Core/DataServiceAccessor.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs index 8b8a3342c5..dfcd693682 100644 --- a/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs +++ b/application/CohortManager/src/Functions/Shared/DataServices.Core/DataServiceAccessor.cs @@ -105,6 +105,7 @@ public async Task Update(TEntity entity, Expression { return null; } + _context.Update(entity); rowsEffected = await _context.SaveChangesAsync(); @@ -113,12 +114,13 @@ public async Task Update(TEntity entity, Expression await transaction.RollbackAsync(); return existingEntity; } - else if (rowsEffected > 1) + + if (rowsEffected > 1) { await transaction.RollbackAsync(); return null; - } + await transaction.CommitAsync(); return entity; @@ -130,16 +132,19 @@ public async Task Update(TEntity entity, Expression _logger.LogWarning("Entity to be updated not found"); return null; } - else if(rowsEffected == 0 && dbEntity != null) + + if(rowsEffected == 0 && dbEntity != null) { _logger.LogError("Records where found to be updated but the update failed"); throw new MultipleRecordsFoundException("Records where found to be updated but the update failed"); } - else if(rowsEffected > 1) + + if(rowsEffected > 1) { _logger.LogError("Multiple Records were updated by PUT request, Changes have been Rolled-back"); throw new MultipleRecordsFoundException("Multiple Records were updated by PUT request, Changes have been Rolled-back"); } + return dbEntity!; }