Skip to content

Commit bbc0c09

Browse files
committed
- Checkppint: Decouple event queue and dispatcher
1 parent 1c6e0b2 commit bbc0c09

File tree

6 files changed

+172
-113
lines changed

6 files changed

+172
-113
lines changed

src/SourceFlow/ClassDiagram.cd

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,10 @@
2525
</Class>
2626
<Class Name="SourceFlow.Impl.EventQueue">
2727
<Position X="10.25" Y="3" Width="1.5" />
28-
<AssociationLine Name="aggregates" Type="SourceFlow.Aggregate.IAggregate" FixedFromPoint="true" FixedToPoint="true">
29-
<Path>
30-
<Point X="11.75" Y="3.469" />
31-
<Point X="12.125" Y="3.469" />
32-
<Point X="12.125" Y="1.875" />
33-
<Point X="14.75" Y="1.875" />
34-
</Path>
35-
</AssociationLine>
36-
<AssociationLine Name="projections" Type="SourceFlow.ViewModel.IProjection">
37-
<MemberNameLabel ManuallyPlaced="true">
38-
<Position X="-0.021" Y="0.453" />
39-
</MemberNameLabel>
40-
</AssociationLine>
4128
<TypeIdentifier>
42-
<HashCode>AABAAAgAAAAAAAgCAAAAAAAAAACAAAAAACAAAAAAAAA=</HashCode>
29+
<HashCode>AABAAAAAAAAAAAACAAAAAAAAAEAAAAAAAAAAAAAAAAA=</HashCode>
4330
<FileName>Impl\EventQueue.cs</FileName>
4431
</TypeIdentifier>
45-
<ShowAsCollectionAssociation>
46-
<Field Name="aggregates" />
47-
<Field Name="projections" />
48-
</ShowAsCollectionAssociation>
4932
<Lollipop Position="0.2" />
5033
</Class>
5134
<Class Name="SourceFlow.Impl.CommandBus">
@@ -240,7 +223,7 @@
240223
<Interface Name="SourceFlow.Messaging.Bus.IEventQueue">
241224
<Position X="10" Y="1.5" Width="1.5" />
242225
<TypeIdentifier>
243-
<HashCode>AABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=</HashCode>
226+
<HashCode>AABAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAA=</HashCode>
244227
<FileName>Messaging\Bus\IEventQueue.cs</FileName>
245228
</TypeIdentifier>
246229
</Interface>
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using SourceFlow.Aggregate;
7+
using SourceFlow.Messaging;
8+
using SourceFlow.Messaging.Bus;
9+
using SourceFlow.ViewModel;
10+
11+
namespace SourceFlow.Impl
12+
{
13+
/// <summary>
14+
/// EventDispatcher is responsible for dispatching events to the appropriate aggregates and view projections.
15+
/// </summary>
16+
internal class EventDispatcher : IEventDispatcher
17+
{
18+
/// <summary>
19+
/// Logger for the event queue to log events and errors.
20+
/// </summary>
21+
private readonly ILogger<IEventDispatcher> logger;
22+
23+
/// <summary>
24+
/// Represents a collection of view transforms used to modify or manipulate views.
25+
/// </summary>
26+
/// <remarks>This collection contains instances of objects implementing the <see
27+
/// cref="IProjection"/> interface. Each projection in the collection can be applied to alter the appearance
28+
29+
/// or behavior of a view.</remarks>
30+
private IEnumerable<IProjection> projections;
31+
32+
/// <summary>
33+
/// Represents a collection of aggregate root objects.
34+
/// </summary>
35+
/// <remarks>This field holds a read-only collection of objects that implement the <see cref="IAggregate"/>
36+
/// interface. It is intended to be used internally to manage or process aggregate roots within the context of the
37+
/// application.</remarks>
38+
private readonly IEnumerable<IAggregate> aggregates;
39+
40+
/// <summary>
41+
/// Initializes a new instance of the <see cref="EventDispatcher"/> class with the specified aggregates and view projections.
42+
/// </summary>
43+
/// <param name="aggregates"></param>
44+
/// <param name="projections"></param>
45+
/// <param name="logger"></param>
46+
/// <exception cref="ArgumentNullException"></exception>
47+
public EventDispatcher(IEnumerable<IAggregate> aggregates, IEnumerable<IProjection> projections, ILogger<IEventDispatcher> logger)
48+
{
49+
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
50+
this.aggregates = aggregates ?? throw new ArgumentNullException(nameof(aggregates));
51+
this.projections = projections ?? throw new ArgumentNullException(nameof(projections));
52+
}
53+
54+
/// <summary>
55+
/// Dequeues the event to all aggregates that can handle it.
56+
/// </summary>
57+
/// <typeparam name="TEvent"></typeparam>
58+
/// <param name="event"></param>
59+
/// <returns></returns>
60+
private async Task DequeueToAggregates<TEvent>(TEvent @event)
61+
where TEvent : IEvent
62+
{
63+
var tasks = new List<Task>();
64+
65+
foreach (var aggregate in aggregates)
66+
{
67+
var handlerType = typeof(ISubscribes<>).MakeGenericType(@event.GetType());
68+
if (!handlerType.IsAssignableFrom(aggregate.GetType()))
69+
continue;
70+
71+
var method = typeof(ISubscribes<>)
72+
.MakeGenericType(@event.GetType())
73+
.GetMethod(nameof(ISubscribes<TEvent>.Handle));
74+
75+
var task = (Task)method.Invoke(aggregate, new object[] { @event });
76+
77+
tasks.Add(task);
78+
79+
logger?.LogInformation("Action=Event_Disptcher_Aggregate, Event={Event}, Aggregate={Aggregate}, Handler:{Handler}",
80+
@event.GetType().Name, aggregate.GetType().Name, method.Name);
81+
}
82+
83+
await Task.WhenAll(tasks);
84+
}
85+
86+
/// <summary>
87+
/// Dequeues the event to all view projections that can handle it.
88+
/// </summary>
89+
/// <typeparam name="TEvent"></typeparam>
90+
/// <param name="event"></param>
91+
/// <returns></returns>
92+
public async Task DequeueToViews<TEvent>(TEvent @event)
93+
where TEvent : IEvent
94+
{
95+
var tasks = new List<Task>();
96+
97+
foreach (var projection in projections)
98+
{
99+
var projectionType = typeof(IProjectOn<>).MakeGenericType(@event.GetType());
100+
if (!projectionType.IsAssignableFrom(projection.GetType()))
101+
continue;
102+
103+
var method = typeof(IProjectOn<>)
104+
.MakeGenericType(@event.GetType())
105+
.GetMethod(nameof(IProjectOn<TEvent>.Apply));
106+
107+
var task = (Task)method.Invoke(projection, new object[] { @event });
108+
109+
tasks.Add(task);
110+
111+
logger?.LogInformation("Action=Event_Dispatcher_View, Event={Event}, Apply:{Apply}",
112+
@event.Name, projection.GetType().Name);
113+
}
114+
115+
if (!tasks.Any())
116+
return;
117+
118+
await Task.WhenAll(tasks);
119+
}
120+
121+
/// <summary>
122+
/// Dispatches the event to both aggregates and view projections.
123+
/// </summary>
124+
/// <param name="sender"></param>
125+
/// <param name="event"></param>
126+
public void Dispatch(object sender, IEvent @event)
127+
{
128+
DequeueToViews(@event).GetAwaiter().GetResult();
129+
DequeueToAggregates(@event).GetAwaiter().GetResult();
130+
logger?.LogInformation("Action=Event_Dispatcher_Complete, Event={Event}, Sender:{sender}",
131+
@event.Name, sender.GetType().Name);
132+
}
133+
}
134+
}

src/SourceFlow/Impl/EventQueue.cs

Lines changed: 8 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
42
using System.Threading.Tasks;
53
using Microsoft.Extensions.Logging;
64
using SourceFlow.Aggregate;
@@ -18,36 +16,21 @@ internal class EventQueue : IEventQueue
1816
/// <summary>
1917
/// Logger for the event queue to log events and errors.
2018
/// </summary>
21-
private readonly ILogger<EventQueue> logger;
19+
private readonly ILogger<IEventQueue> logger;
2220

2321
/// <summary>
24-
/// Represents a collection of view transforms used to modify or manipulate views.
22+
/// Represents event dispathers that can handle the dequeuing of events.
2523
/// </summary>
26-
/// <remarks>This collection contains instances of objects implementing the <see
27-
/// cref="IProjection"/> interface. Each projection in the collection can be applied to alter the appearance
28-
/// or behavior of a view.</remarks>
29-
private IEnumerable<IProjection> projections;
24+
public event EventHandler<IEvent> Handlers;
3025

3126
/// <summary>
32-
/// Represents a collection of aggregate root objects.
27+
/// Initializes a new instance of the <see cref="EventQueue"/> class with the specified logger.
3328
/// </summary>
34-
/// <remarks>This field holds a read-only collection of objects that implement the <see cref="IAggregate"/>
35-
/// interface. It is intended to be used internally to manage or process aggregate roots within the context of the
36-
/// application.</remarks>
37-
private readonly IEnumerable<IAggregate> aggregates;
38-
39-
/// <summary>
40-
/// Initializes a new instance of the <see cref="EventQueue"/> class with the specified aggregates and view projections.
41-
/// </summary>
42-
/// <param name="aggregates"></param>
43-
/// <param name="projections"></param>
4429
/// <param name="logger"></param>
4530
/// <exception cref="ArgumentNullException"></exception>
46-
public EventQueue(IEnumerable<IAggregate> aggregates, IEnumerable<IProjection> projections, ILogger<EventQueue> logger)
31+
public EventQueue(ILogger<IEventQueue> logger)
4732
{
4833
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
49-
this.aggregates = aggregates ?? throw new ArgumentNullException(nameof(aggregates));
50-
this.projections = projections ?? throw new ArgumentNullException(nameof(projections));
5134
}
5235

5336
/// <summary>
@@ -62,75 +45,10 @@ public async Task Enqueue<TEvent>(TEvent @event)
6245
if (@event == null)
6346
throw new ArgumentNullException(nameof(@event));
6447

65-
await DequeueToViews(@event);
66-
await DequeueToAggregates(@event);
67-
}
68-
69-
/// <summary>
70-
/// Dequeues the event to all aggregates that can handle it.
71-
/// </summary>
72-
/// <typeparam name="TEvent"></typeparam>
73-
/// <param name="event"></param>
74-
/// <returns></returns>
75-
private async Task DequeueToAggregates<TEvent>(TEvent @event)
76-
where TEvent : IEvent
77-
{
78-
var tasks = new List<Task>();
79-
80-
foreach (var aggregate in aggregates)
81-
{
82-
var handlerType = typeof(ISubscribes<>).MakeGenericType(@event.GetType());
83-
if (!handlerType.IsAssignableFrom(aggregate.GetType()))
84-
continue;
85-
86-
var method = typeof(ISubscribes<>)
87-
.MakeGenericType(@event.GetType())
88-
.GetMethod(nameof(ISubscribes<TEvent>.Handle));
89-
90-
var task = (Task)method.Invoke(aggregate, new object[] { @event });
91-
92-
tasks.Add(task);
93-
94-
logger?.LogInformation("Action=Event_Enqueue, Event={Event}, Aggregate={Aggregate}, Handler:{Handler}",
95-
@event.GetType().Name, aggregate.GetType().Name, method.Name);
96-
}
97-
98-
await Task.WhenAll(tasks);
99-
}
100-
101-
/// <summary>
102-
/// Dequeues the event to all view projections that can handle it.
103-
/// </summary>
104-
/// <typeparam name="TEvent"></typeparam>
105-
/// <param name="event"></param>
106-
/// <returns></returns>
107-
public async Task DequeueToViews<TEvent>(TEvent @event)
108-
where TEvent : IEvent
109-
{
110-
var tasks = new List<Task>();
111-
112-
foreach (var projection in projections)
113-
{
114-
var projectionType = typeof(IProjectOn<>).MakeGenericType(@event.GetType());
115-
if (!projectionType.IsAssignableFrom(projection.GetType()))
116-
continue;
117-
118-
var method = typeof(IProjectOn<>)
119-
.MakeGenericType(@event.GetType())
120-
.GetMethod(nameof(IProjectOn<TEvent>.Apply));
121-
122-
var task = (Task)method.Invoke(projection, new object[] { @event });
123-
124-
tasks.Add(task);
125-
126-
logger?.LogInformation("Action=View_Projection, Event={Event}, Apply:{Apply}",
127-
@event.Name, projection.GetType().Name);
128-
}
129-
130-
if (!tasks.Any())
131-
return;
48+
Handlers?.Invoke(this, @event);
13249

133-
await Task.WhenAll(tasks);
50+
logger?.LogInformation("Action=Event_Enqueue, Event={Event}, Payload={Payload}",
51+
@event.GetType().Name, @event.Payload.GetType().Name);
13452
}
13553
}
13654
}

src/SourceFlow/IocExtensions.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,21 @@ public static void UseSourceFlow(this IServiceCollection services, Action<ISourc
5151
c.GetService<ICommandStore>(),
5252
c.GetService<ILogger<ICommandBus>>()));
5353

54-
services.AddSingleton<IEventQueue, EventQueue>(c => new EventQueue(
55-
c.GetServices<IAggregate>(),
56-
c.GetServices<IProjection>(),
57-
c.GetService<ILogger<EventQueue>>()));
54+
services.AddSingleton<IEventDispatcher, EventDispatcher>(c => new EventDispatcher(
55+
c.GetServices<IAggregate>(),
56+
c.GetServices<IProjection>(),
57+
c.GetService<ILogger<IEventDispatcher>>())
58+
);
59+
60+
services.AddSingleton<IEventQueue, EventQueue>(c =>
61+
{
62+
var queue = new EventQueue(
63+
c.GetService<ILogger<IEventQueue>>());
64+
65+
var dispatcher = c.GetService<IEventDispatcher>();
66+
queue.Handlers += dispatcher.Dispatch;
67+
return queue;
68+
});
5869

5970
services.AddSingleton<IAggregateFactory, AggregateFactory>();
6071
services.AddSingleton<ICommandPublisher, CommandPublisher>(c => new CommandPublisher(c.GetService<ICommandBus>()));
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace SourceFlow.Messaging.Bus
2+
{
3+
public interface IEventDispatcher
4+
{
5+
void Dispatch(object sender, IEvent @event);
6+
}
7+
}

src/SourceFlow/Messaging/Bus/IEventQueue.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
using System;
12
using System.Threading.Tasks;
23

34
namespace SourceFlow.Messaging.Bus
45
{
56
public interface IEventQueue
67
{
8+
/// <summary>
9+
/// Handlers that are invoked to dispatch an event that is dequeued from the event queue.
10+
/// </summary>
11+
event EventHandler<IEvent> Handlers;
12+
713
/// <summary>
814
/// Enqueues an event in order to publish to subcribers.
915
/// </summary>

0 commit comments

Comments
 (0)