Skip to content

Commit cdfb5b3

Browse files
committed
add message broker (dapr provider) #5
1 parent de9babd commit cdfb5b3

File tree

18 files changed

+232
-68
lines changed

18 files changed

+232
-68
lines changed

coolstore.sln.DotSettings

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88
<s:Boolean x:Key="/Default/UserDictionary/Words/=exprs/@EntryIndexedValue">True</s:Boolean>
99
<s:Boolean x:Key="/Default/UserDictionary/Words/=Grpc/@EntryIndexedValue">True</s:Boolean>
1010
<s:Boolean x:Key="/Default/UserDictionary/Words/=Mediat/@EntryIndexedValue">True</s:Boolean>
11-
<s:Boolean x:Key="/Default/UserDictionary/Words/=Migrator/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
11+
<s:Boolean x:Key="/Default/UserDictionary/Words/=Migrator/@EntryIndexedValue">True</s:Boolean>
12+
<s:Boolean x:Key="/Default/UserDictionary/Words/=pubsub/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

src/BuildingBlocks/CoolStore.AppContracts/CoolStore.AppContracts.csproj

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
1313
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="5.0.1" />
1414
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="5.0.1" />
15-
<PackageReference Include="RestEase" Version="1.5.2" />
16-
<PackageReference Include="RestEase.HttpClientFactory" Version="1.5.2" />
1715
<PackageReference Include="RestEase.SourceGenerator" Version="1.5.2">
1816
<PrivateAssets>all</PrivateAssets>
1917
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using N8T.Core.Domain;
2+
using N8T.Infrastructure.Bus.Dapr;
3+
4+
namespace CoolStore.AppContracts.IntegrationEvents
5+
{
6+
[DaprPubSubName(PubSubName = "pubsub")]
7+
public class CustomerCreatedIntegrationEvent : IntegrationEventBase
8+
{
9+
public override void Flatten()
10+
{
11+
}
12+
}
13+
}

src/BuildingBlocks/N8T.Core/Domain/DomainEventBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IDomainEventContext
1717

1818
public abstract class EventBase : IDomainEvent
1919
{
20-
public string EventType { get { return GetType().Name; } }
20+
public string EventType { get { return GetType().AssemblyQualifiedName; } }
2121
public DateTime CreatedAt { get; } = DateTime.UtcNow;
2222
public string CorrelationId { get; init; }
2323
public IDictionary<string, object> MetaData { get; } = new Dictionary<string, object>();
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Dapr.Client;
5+
using Microsoft.Extensions.Logging;
6+
using N8T.Core.Domain;
7+
8+
namespace N8T.Infrastructure.Bus.Dapr
9+
{
10+
public class DaprEventBus : IEventBus
11+
{
12+
private readonly DaprClient _daprClient;
13+
private readonly ILogger<DaprEventBus> _logger;
14+
15+
public DaprEventBus(DaprClient daprClient, ILogger<DaprEventBus> logger)
16+
{
17+
_daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient));
18+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
19+
}
20+
21+
public async Task PublishAsync<TEvent>(TEvent @event, string[] topics = default,
22+
CancellationToken token = default) where TEvent : IntegrationEventBase
23+
{
24+
var attr = (DaprPubSubNameAttribute)Attribute.GetCustomAttribute(typeof(TEvent),
25+
typeof(DaprPubSubNameAttribute));
26+
27+
var pubsubName = "pubsub";
28+
29+
if (attr is not null)
30+
{
31+
pubsubName = attr.PubSubName;
32+
}
33+
34+
if (topics is null)
35+
{
36+
var topicName = @event.GetType().Name;
37+
38+
_logger.LogInformation("Publishing event {@Event} to {PubsubName}.{TopicName}", @event, pubsubName, topicName);
39+
await _daprClient.PublishEventAsync(pubsubName, topicName, @event, token);
40+
}
41+
else
42+
{
43+
foreach (var topicName in topics)
44+
{
45+
_logger.LogInformation("Publishing event {@Event} to {PubsubName}.{TopicName}", @event, pubsubName,
46+
topicName);
47+
await _daprClient.PublishEventAsync(pubsubName, topicName, @event, token);
48+
}
49+
}
50+
}
51+
52+
public Task SubscribeAsync<TMessage>(string[] topics = default, CancellationToken token = default) where TMessage : IntegrationEventBase
53+
{
54+
throw new System.NotImplementedException();
55+
}
56+
}
57+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System;
2+
3+
namespace N8T.Infrastructure.Bus.Dapr
4+
{
5+
public class DaprPubSubNameAttribute : Attribute
6+
{
7+
public string PubSubName { get; set; } = "pubsub";
8+
}
9+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using N8T.Infrastructure.Bus.Dapr;
3+
4+
namespace N8T.Infrastructure.Bus
5+
{
6+
public static class Extensions
7+
{
8+
public static IServiceCollection AddMessageBroker(this IMvcBuilder mvcBuilder,
9+
string messageBrokerType = "dapr")
10+
{
11+
switch (messageBrokerType)
12+
{
13+
case "dapr":
14+
mvcBuilder.AddDapr();
15+
mvcBuilder.Services.AddScoped<IEventBus, DaprEventBus>();
16+
break;
17+
}
18+
19+
return mvcBuilder.Services;
20+
}
21+
}
22+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using N8T.Core.Domain;
4+
5+
namespace N8T.Infrastructure.Bus
6+
{
7+
public interface IEventBus
8+
{
9+
Task PublishAsync<TEvent>(TEvent @event, string[] topics = default, CancellationToken token = default)
10+
where TEvent : IntegrationEventBase;
11+
12+
Task SubscribeAsync<TEvent>(string[] topics = default, CancellationToken token = default)
13+
where TEvent : IntegrationEventBase;
14+
}
15+
}

src/BuildingBlocks/N8T.Infrastructure/Dapr/Extensions.cs

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/BuildingBlocks/N8T.Infrastructure/Extensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public static class Extensions
2121
public static IServiceCollection AddCore(this IServiceCollection services, Type[] types = null,
2222
Action<IServiceCollection> doMoreActions = null)
2323
{
24+
services.AddHttpContextAccessor();
2425
services.AddCustomMediatR(types);
2526
services.AddCustomValidators(types);
2627

0 commit comments

Comments
 (0)