Skip to content

Commit cbf0872

Browse files
committed
WIP clickhouse repo
1 parent 5775d60 commit cbf0872

File tree

2 files changed

+209
-1
lines changed

2 files changed

+209
-1
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import { EventEmitter } from "node:stream";
2+
import type { ClickHouse } from "@internal/clickhouse";
3+
import type {
4+
IEventRepository,
5+
CreatableEvent,
6+
CompleteableTaskRun,
7+
TraceEventOptions,
8+
EventBuilder,
9+
TraceSummary,
10+
TraceDetailedSummary,
11+
RunPreparedEvent,
12+
TaskEventRecord,
13+
ExceptionEventProperties,
14+
} from "./eventRepository.types";
15+
import type { TaskEventStoreTable } from "./taskEventStore.server";
16+
import type { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
17+
18+
export type ClickhouseEventRepositoryConfig = {
19+
clickhouse: ClickHouse;
20+
batchSize?: number;
21+
flushInterval?: number;
22+
};
23+
24+
/**
25+
* ClickHouse-based implementation of the EventRepository.
26+
* This implementation stores events in ClickHouse for better analytics and performance.
27+
*/
28+
export class ClickhouseEventRepository implements IEventRepository {
29+
private _subscriberCount = 0;
30+
private _clickhouse: ClickHouse;
31+
private _config: ClickhouseEventRepositoryConfig;
32+
33+
constructor(config: ClickhouseEventRepositoryConfig) {
34+
this._clickhouse = config.clickhouse;
35+
this._config = config;
36+
}
37+
38+
// Properties
39+
get subscriberCount(): number {
40+
return this._subscriberCount;
41+
}
42+
43+
// Event insertion methods
44+
async insert(event: CreatableEvent): Promise<void> {
45+
throw new Error("ClickhouseEventRepository.insert not implemented");
46+
}
47+
48+
async insertImmediate(event: CreatableEvent): Promise<void> {
49+
throw new Error("ClickhouseEventRepository.insertImmediate not implemented");
50+
}
51+
52+
async insertMany(events: CreatableEvent[]): Promise<void> {
53+
throw new Error("ClickhouseEventRepository.insertMany not implemented");
54+
}
55+
56+
async insertManyImmediate(events: CreatableEvent[]): Promise<CreatableEvent[]> {
57+
throw new Error("ClickhouseEventRepository.insertManyImmediate not implemented");
58+
}
59+
60+
// Run event completion methods
61+
async completeSuccessfulRunEvent(params: {
62+
run: CompleteableTaskRun;
63+
endTime?: Date;
64+
}): Promise<void> {
65+
throw new Error("ClickhouseEventRepository.completeSuccessfulRunEvent not implemented");
66+
}
67+
68+
async completeCachedRunEvent(params: {
69+
run: CompleteableTaskRun;
70+
blockedRun: CompleteableTaskRun;
71+
spanId: string;
72+
parentSpanId: string;
73+
spanCreatedAt: Date;
74+
isError: boolean;
75+
endTime?: Date;
76+
}): Promise<void> {
77+
throw new Error("ClickhouseEventRepository.completeCachedRunEvent not implemented");
78+
}
79+
80+
async completeFailedRunEvent(params: {
81+
run: CompleteableTaskRun;
82+
endTime?: Date;
83+
exception: { message?: string; type?: string; stacktrace?: string };
84+
}): Promise<void> {
85+
throw new Error("ClickhouseEventRepository.completeFailedRunEvent not implemented");
86+
}
87+
88+
async completeExpiredRunEvent(params: {
89+
run: CompleteableTaskRun;
90+
endTime?: Date;
91+
ttl: string;
92+
}): Promise<void> {
93+
throw new Error("ClickhouseEventRepository.completeExpiredRunEvent not implemented");
94+
}
95+
96+
async createAttemptFailedRunEvent(params: {
97+
run: CompleteableTaskRun;
98+
endTime?: Date;
99+
attemptNumber: number;
100+
exception: { message?: string; type?: string; stacktrace?: string };
101+
}): Promise<void> {
102+
throw new Error("ClickhouseEventRepository.createAttemptFailedRunEvent not implemented");
103+
}
104+
105+
async cancelRunEvent(params: {
106+
reason: string;
107+
run: CompleteableTaskRun;
108+
cancelledAt: Date;
109+
}): Promise<void> {
110+
throw new Error("ClickhouseEventRepository.cancelRunEvent not implemented");
111+
}
112+
113+
async crashEvent(params: {
114+
event: TaskEventRecord;
115+
crashedAt: Date;
116+
exception: ExceptionEventProperties;
117+
}): Promise<void> {
118+
throw new Error("ClickhouseEventRepository.crashEvent not implemented");
119+
}
120+
121+
// Query methods
122+
async getTraceSummary(
123+
storeTable: TaskEventStoreTable,
124+
traceId: string,
125+
startCreatedAt: Date,
126+
endCreatedAt?: Date,
127+
options?: { includeDebugLogs?: boolean }
128+
): Promise<TraceSummary | undefined> {
129+
throw new Error("ClickhouseEventRepository.getTraceSummary not implemented");
130+
}
131+
132+
async getTraceDetailedSummary(
133+
storeTable: TaskEventStoreTable,
134+
traceId: string,
135+
startCreatedAt: Date,
136+
endCreatedAt?: Date,
137+
options?: { includeDebugLogs?: boolean }
138+
): Promise<TraceDetailedSummary | undefined> {
139+
throw new Error("ClickhouseEventRepository.getTraceDetailedSummary not implemented");
140+
}
141+
142+
async getRunEvents(
143+
storeTable: TaskEventStoreTable,
144+
runId: string,
145+
startCreatedAt: Date,
146+
endCreatedAt?: Date
147+
): Promise<RunPreparedEvent[]> {
148+
throw new Error("ClickhouseEventRepository.getRunEvents not implemented");
149+
}
150+
151+
async getSpan(
152+
storeTable: TaskEventStoreTable,
153+
spanId: string,
154+
traceId: string,
155+
startCreatedAt: Date,
156+
endCreatedAt?: Date,
157+
options?: { includeDebugLogs?: boolean }
158+
): Promise<any> {
159+
throw new Error("ClickhouseEventRepository.getSpan not implemented");
160+
}
161+
162+
// Event recording methods
163+
async recordEvent(
164+
message: string,
165+
options: TraceEventOptions & { duration?: number; parentId?: string }
166+
): Promise<CreatableEvent> {
167+
throw new Error("ClickhouseEventRepository.recordEvent not implemented");
168+
}
169+
170+
async traceEvent<TResult>(
171+
message: string,
172+
options: TraceEventOptions & { incomplete?: boolean; isError?: boolean },
173+
callback: (
174+
e: EventBuilder,
175+
traceContext: Record<string, string | undefined>,
176+
traceparent?: { traceId: string; spanId: string }
177+
) => Promise<TResult>
178+
): Promise<TResult> {
179+
throw new Error("ClickhouseEventRepository.traceEvent not implemented");
180+
}
181+
182+
// Subscription methods
183+
async subscribeToTrace(traceId: string): Promise<{
184+
unsubscribe: () => Promise<void>;
185+
eventEmitter: EventEmitter;
186+
}> {
187+
throw new Error("ClickhouseEventRepository.subscribeToTrace not implemented");
188+
}
189+
190+
// ID generation methods
191+
generateTraceId(): string {
192+
throw new Error("ClickhouseEventRepository.generateTraceId not implemented");
193+
}
194+
195+
generateSpanId(): string {
196+
throw new Error("ClickhouseEventRepository.generateSpanId not implemented");
197+
}
198+
}
199+
200+
/**
201+
* Factory function to create a ClickhouseEventRepository instance.
202+
* This can be used as an alternative to the PostgreSQL-based EventRepository.
203+
*/
204+
export function createClickhouseEventRepository(
205+
config: ClickhouseEventRepositoryConfig
206+
): IEventRepository {
207+
return new ClickhouseEventRepository(config);
208+
}

apps/webapp/app/v3/eventRepository.types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type {
1818
import type { RedisWithClusterOptions } from "~/redis.server";
1919
import type { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
2020
import type { DetailedTraceEvent, TaskEventStoreTable } from "./taskEventStore.server";
21+
export type { ExceptionEventProperties };
2122

2223
// ============================================================================
2324
// Event Creation Types
@@ -261,7 +262,6 @@ export type TraceDetailedSummary = {
261262
export interface IEventRepository {
262263
// Properties
263264
readonly subscriberCount: number;
264-
readonly flushSchedulerStatus: ReturnType<DynamicFlushScheduler<CreatableEvent>["getStatus"]>;
265265

266266
// Event insertion methods
267267
insert(event: CreatableEvent): Promise<void>;

0 commit comments

Comments
 (0)