Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 99 additions & 93 deletions src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs
Original file line number Diff line number Diff line change
@@ -1,120 +1,126 @@
namespace ServiceControl.Audit.Persistence.RavenDB
namespace ServiceControl.Audit.Persistence.RavenDB;

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Operations.Expiration;
using Raven.Client.Documents.Operations.Indexes;
using Raven.Client.Exceptions;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Operations;
using Raven.Client.ServerWide.Operations.Configuration;
using Indexes;
using SagaAudit;

class DatabaseSetup(DatabaseConfiguration configuration)
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Operations.Expiration;
using Raven.Client.Documents.Operations.Indexes;
using Raven.Client.Exceptions;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Operations;
using Raven.Client.ServerWide.Operations.Configuration;
using ServiceControl.Audit.Persistence.RavenDB.Indexes;
using ServiceControl.SagaAudit;

class DatabaseSetup(DatabaseConfiguration configuration)
public async Task Execute(IDocumentStore documentStore, CancellationToken cancellationToken)
{
public async Task Execute(IDocumentStore documentStore, CancellationToken cancellationToken)
{
await CreateDatabase(documentStore, configuration.Name, cancellationToken);
await UpdateDatabaseSettings(documentStore, configuration.Name, cancellationToken);
await CreateDatabase(documentStore, configuration.Name, cancellationToken);

await CreateIndexes(documentStore, cancellationToken);
await UpdateDatabaseSettings(documentStore, configuration.Name, cancellationToken);

await ConfigureExpiration(documentStore, cancellationToken);
}
await CreateIndexes(documentStore, configuration.EnableFullTextSearch, cancellationToken);

async Task CreateDatabase(IDocumentStore documentStore, string databaseName, CancellationToken cancellationToken)
{
var dbRecord = await documentStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(databaseName), cancellationToken);
await ConfigureExpiration(documentStore, cancellationToken);
}

if (dbRecord is null)
{
try
{
var databaseRecord = new DatabaseRecord(databaseName);
databaseRecord.Settings.Add("Indexing.Auto.SearchEngineType", "Corax");
databaseRecord.Settings.Add("Indexing.Static.SearchEngineType", "Corax");

await documentStore.Maintenance.Server.SendAsync(new CreateDatabaseOperation(databaseRecord), cancellationToken);
}
catch (ConcurrencyException)
{
// The database was already created before calling CreateDatabaseOperation
}
}
}
async Task CreateDatabase(IDocumentStore documentStore, string databaseName, CancellationToken cancellationToken)
{
var dbRecord = await documentStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(databaseName), cancellationToken);

async Task UpdateDatabaseSettings(IDocumentStore documentStore, string databaseName, CancellationToken cancellationToken)
if (dbRecord is null)
{
var dbRecord = await documentStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(databaseName), cancellationToken);

if (dbRecord is null)
try
{
throw new InvalidOperationException($"Database '{databaseName}' does not exist.");
}

var updated = false;
var databaseRecord = new DatabaseRecord(databaseName);

updated |= dbRecord.Settings.TryAdd("Indexing.Auto.SearchEngineType", "Corax");
updated |= dbRecord.Settings.TryAdd("Indexing.Static.SearchEngineType", "Corax");
SetSearchEngineType(databaseRecord, SearchEngineType.Corax);

if (updated)
await documentStore.Maintenance.Server.SendAsync(new CreateDatabaseOperation(databaseRecord), cancellationToken);
}
catch (ConcurrencyException)
{
await documentStore.Maintenance.ForDatabase(databaseName).SendAsync(new PutDatabaseSettingsOperation(databaseName, dbRecord.Settings), cancellationToken);
await documentStore.Maintenance.Server.SendAsync(new ToggleDatabasesStateOperation(databaseName, true), cancellationToken);
await documentStore.Maintenance.Server.SendAsync(new ToggleDatabasesStateOperation(databaseName, false), cancellationToken);
// The database was already created before calling CreateDatabaseOperation
}
}
}

public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentStore, CancellationToken cancellationToken)
async Task UpdateDatabaseSettings(IDocumentStore documentStore, string databaseName, CancellationToken cancellationToken)
{
var databaseRecord = await documentStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(databaseName), cancellationToken) ?? throw new InvalidOperationException($"Database '{databaseName}' does not exist.");

if (!SetSearchEngineType(databaseRecord, SearchEngineType.Corax))
{
// If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and
// create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration.
// Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait.
// for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale.
// This needs to stay in place until the next major version as the user could upgrade from an older version of the current
// Major (v5.x.x) which might still have the incorrect index.
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
var sagaDetailsIndexDefinition = await documentStore.Maintenance.SendAsync(sagaDetailsIndexOperation, cancellationToken);
if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"))
{
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"), cancellationToken);
}
return;
}

async Task CreateIndexes(IDocumentStore documentStore, CancellationToken cancellationToken)
await documentStore.Maintenance.ForDatabase(databaseName).SendAsync(new PutDatabaseSettingsOperation(databaseName, databaseRecord.Settings), cancellationToken);
await documentStore.Maintenance.Server.SendAsync(new ToggleDatabasesStateOperation(databaseName, true), cancellationToken);
await documentStore.Maintenance.Server.SendAsync(new ToggleDatabasesStateOperation(databaseName, false), cancellationToken);
}

public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentStore, CancellationToken cancellationToken)
{
// If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and
// create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration.
// Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait.
// for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale.
// This needs to stay in place until the next major version as the user could upgrade from an older version of the current
// Major (v5.x.x) which might still have the incorrect index.
var sagaDetailsIndexOperation = new GetIndexOperation(SagaDetailsIndexName);
var sagaDetailsIndexDefinition = await documentStore.Maintenance.SendAsync(sagaDetailsIndexOperation, cancellationToken);
if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"))
{
await DeleteLegacySagaDetailsIndex(documentStore, cancellationToken);
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation(SagaDetailsIndexName), cancellationToken);
}
}

List<AbstractIndexCreationTask> indexList = [new FailedAuditImportIndex(), new SagaDetailsIndex()];
internal static async Task CreateIndexes(IDocumentStore documentStore, bool enableFreeTextSearch, CancellationToken cancellationToken)
{
await DeleteLegacySagaDetailsIndex(documentStore, cancellationToken);

if (configuration.EnableFullTextSearch)
{
indexList.Add(new MessagesViewIndexWithFullTextSearch());
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("MessagesViewIndex"), cancellationToken);
}
else
{
indexList.Add(new MessagesViewIndex());
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("MessagesViewIndexWithFullTextSearch"), cancellationToken);
}
List<AbstractIndexCreationTask> indexList = [new FailedAuditImportIndex(), new SagaDetailsIndex()];

await IndexCreation.CreateIndexesAsync(indexList, documentStore, null, null, cancellationToken);
if (enableFreeTextSearch)
{
indexList.Add(new MessagesViewIndexWithFullTextSearch());
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation(MessagesViewIndexName), cancellationToken);
}
else
{
indexList.Add(new MessagesViewIndex());
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation(MessagesViewIndexWithFulltextSearchName), cancellationToken);
}

await IndexCreation.CreateIndexesAsync(indexList, documentStore, null, null, cancellationToken);
}

async Task ConfigureExpiration(IDocumentStore documentStore, CancellationToken cancellationToken)
async Task ConfigureExpiration(IDocumentStore documentStore, CancellationToken cancellationToken)
{
var expirationConfig = new ExpirationConfiguration
{
var expirationConfig = new ExpirationConfiguration
{
Disabled = false,
DeleteFrequencyInSec = configuration.ExpirationProcessTimerInSeconds
};
Disabled = false,
DeleteFrequencyInSec = configuration.ExpirationProcessTimerInSeconds
};

await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expirationConfig), cancellationToken);
}
await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expirationConfig), cancellationToken);
}

bool SetSearchEngineType(DatabaseRecord database, SearchEngineType searchEngineType)
{
var updated = false;

updated |= database.Settings.TryAdd("Indexing.Auto.SearchEngineType", searchEngineType.ToString());
updated |= database.Settings.TryAdd("Indexing.Static.SearchEngineType", searchEngineType.ToString());

return updated;
}
}

internal const string MessagesViewIndexWithFulltextSearchName = "MessagesViewIndexWithFullTextSearch";
internal const string SagaDetailsIndexName = "SagaDetailsIndex";
internal const string MessagesViewIndexName = "MessagesViewIndex";
}
134 changes: 134 additions & 0 deletions src/ServiceControl.Audit.Persistence.Tests.RavenDB/IndexSetupTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace ServiceControl.Audit.Persistence.Tests;

using System;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Persistence.RavenDB;
using Persistence.RavenDB.Indexes;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Operations.Indexes;
using Raven.Client.Exceptions.Documents.Indexes;

[TestFixture]
class IndexSetupTests : PersistenceTestFixture
{
[Test]
public async Task Corax_should_be_the_default_search_engine_type()
{
var indexes = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexesOperation(0, int.MaxValue));

foreach (var index in indexes)
{
var indexStats = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexStatisticsOperation(DatabaseSetup.MessagesViewIndexWithFulltextSearchName));
Assert.That(indexStats.SearchEngineType, Is.EqualTo(SearchEngineType.Corax), $"{index.Name} is not using Corax");
}
}

[Test]
public async Task Free_text_search_index_should_be_used_by_default()
{
var freeTextIndex = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexOperation(DatabaseSetup.MessagesViewIndexWithFulltextSearchName));
var nonFreeTextIndex = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexOperation(DatabaseSetup.MessagesViewIndexName));

Assert.That(nonFreeTextIndex, Is.Null);
Assert.That(freeTextIndex, Is.Not.Null);
}

[Test]
public async Task Free_text_search_index_can_be_opted_out_from()
{
await DatabaseSetup.CreateIndexes(configuration.DocumentStore, false, TestTimeoutCancellationToken);

var freeTextIndex = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexOperation(DatabaseSetup.MessagesViewIndexWithFulltextSearchName));
var nonFreeTextIndex = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexOperation(DatabaseSetup.MessagesViewIndexName));

Assert.That(freeTextIndex, Is.Null);
Assert.That(nonFreeTextIndex, Is.Not.Null);
}

[Test]
public async Task Indexes_should_be_reset_on_setup()
{
var index = new MessagesViewIndexWithFullTextSearch { Configuration = { ["Indexing.Static.SearchEngineType"] = SearchEngineType.Lucene.ToString() } };

var indexWithCustomConfigStats = await UpdateIndex(index);

Assert.That(indexWithCustomConfigStats.SearchEngineType, Is.EqualTo(SearchEngineType.Lucene));

await DatabaseSetup.CreateIndexes(configuration.DocumentStore, true, TestTimeoutCancellationToken);

await WaitForIndexDefinitionUpdate(indexWithCustomConfigStats);

var indexAfterResetStats = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexStatisticsOperation(index.IndexName));

Assert.That(indexAfterResetStats.SearchEngineType, Is.EqualTo(SearchEngineType.Corax));
}

[Test]
public async Task Indexes_should_not_be_reset_on_setup_when_locked_as_ignore()
{
var index = new MessagesViewIndexWithFullTextSearch
{
Configuration = { ["Indexing.Static.SearchEngineType"] = SearchEngineType.Lucene.ToString() },
LockMode = IndexLockMode.LockedIgnore
};

var indexStatsBefore = await UpdateIndex(index);

Assert.That(indexStatsBefore.SearchEngineType, Is.EqualTo(SearchEngineType.Lucene));

await DatabaseSetup.CreateIndexes(configuration.DocumentStore, true, TestTimeoutCancellationToken);

// raven will ignore the update since index was locked, so best we can do is wait a bit and check that settings hasn't changed
await Task.Delay(1000);

var indexStatsAfter = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexStatisticsOperation(index.IndexName));
Assert.That(indexStatsAfter.SearchEngineType, Is.EqualTo(SearchEngineType.Lucene));
}

[Test]
public async Task Indexes_should_not_be_reset_on_setup_when_locked_as_error()
{
var index = new MessagesViewIndexWithFullTextSearch
{
Configuration = { ["Indexing.Static.SearchEngineType"] = SearchEngineType.Lucene.ToString() },
LockMode = IndexLockMode.LockedError
};

await UpdateIndex(index);

Assert.ThrowsAsync<IndexCreationException>(async () => await DatabaseSetup.CreateIndexes(configuration.DocumentStore, true, TestTimeoutCancellationToken));
}

async Task<IndexStats> UpdateIndex(IAbstractIndexCreationTask index)
{
var statsBefore = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexStatisticsOperation(index.IndexName), TestTimeoutCancellationToken);

await IndexCreation.CreateIndexesAsync([index], configuration.DocumentStore, null, null, TestTimeoutCancellationToken);

return await WaitForIndexDefinitionUpdate(statsBefore);
}

async Task<IndexStats> WaitForIndexDefinitionUpdate(IndexStats oldStats)
{
while (true)
{
try
{
var newStats = await configuration.DocumentStore.Maintenance.SendAsync(new GetIndexStatisticsOperation(oldStats.Name), TestTimeoutCancellationToken);

if (newStats.CreatedTimestamp > oldStats.CreatedTimestamp)
{
return newStats;
}
}
catch (OperationCanceledException)
{
// keep going since we can get this if we query right when the update happens
}

await Task.Delay(TimeSpan.FromMilliseconds(100), TestTimeoutCancellationToken);
}
}
}
Loading