Skip to content

Commit 00e1cd1

Browse files
committed
Add CompositeBatchInterceptor
1 parent f25dbb6 commit 00e1cd1

File tree

1 file changed

+112
-0
lines changed

1 file changed

+112
-0
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Runtime.ExceptionServices;
6+
using System.Threading.Tasks;
7+
8+
namespace MicroBatchFramework
9+
{
10+
public class CompositeBatchInterceptor : IBatchInterceptor
11+
{
12+
readonly IBatchInterceptor[] interceptors;
13+
14+
public CompositeBatchInterceptor(params IBatchInterceptor[] interceptors)
15+
{
16+
this.interceptors = interceptors;
17+
}
18+
19+
public async ValueTask OnBatchEngineBeginAsync(IServiceProvider serviceProvider, ILogger<BatchEngine> logger)
20+
{
21+
var exceptions = new AggregateExceptionHolder();
22+
foreach (var item in interceptors)
23+
{
24+
try
25+
{
26+
await item.OnBatchEngineBeginAsync(serviceProvider, logger);
27+
}
28+
catch (Exception e)
29+
{
30+
exceptions.Add(e);
31+
}
32+
}
33+
exceptions.ThrowIfExists();
34+
}
35+
36+
public async ValueTask OnBatchEngineEndAsync()
37+
{
38+
var exceptions = new AggregateExceptionHolder();
39+
foreach (var item in interceptors)
40+
{
41+
try
42+
{
43+
await item.OnBatchEngineEndAsync();
44+
}
45+
catch (Exception e)
46+
{
47+
exceptions.Add(e);
48+
}
49+
}
50+
exceptions.ThrowIfExists();
51+
}
52+
53+
public async ValueTask OnBatchRunBeginAsync(BatchContext context)
54+
{
55+
var exceptions = new AggregateExceptionHolder();
56+
foreach (var item in interceptors)
57+
{
58+
try
59+
{
60+
await item.OnBatchRunBeginAsync(context);
61+
}
62+
catch (Exception e)
63+
{
64+
exceptions.Add(e);
65+
}
66+
}
67+
exceptions.ThrowIfExists();
68+
}
69+
70+
public async ValueTask OnBatchRunCompleteAsync(BatchContext context, string errorMessageIfFailed, Exception exceptionIfExists)
71+
{
72+
var exceptions = new AggregateExceptionHolder();
73+
foreach (var item in interceptors)
74+
{
75+
try
76+
{
77+
await item.OnBatchRunCompleteAsync(context, errorMessageIfFailed, exceptionIfExists);
78+
}
79+
catch (Exception e)
80+
{
81+
exceptions.Add(e);
82+
}
83+
}
84+
exceptions.ThrowIfExists();
85+
}
86+
}
87+
88+
internal struct AggregateExceptionHolder
89+
{
90+
List<ExceptionDispatchInfo> exceptions;
91+
92+
public void Add(Exception ex)
93+
{
94+
if (exceptions == null) exceptions = new List<ExceptionDispatchInfo>();
95+
exceptions.Add(ExceptionDispatchInfo.Capture(ex));
96+
}
97+
98+
public void ThrowIfExists()
99+
{
100+
if (exceptions == null) return;
101+
102+
if (exceptions.Count == 1)
103+
{
104+
exceptions[0].Throw();
105+
}
106+
else
107+
{
108+
throw new AggregateException(exceptions.Select(x => x.SourceException));
109+
}
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)