Skip to content

Commit 51769e5

Browse files
committed
feat: add OpenTelemetry's config
Signed-off-by: chenhuan <xiangyuyu_2024@qq.com>
1 parent fce3458 commit 51769e5

File tree

8 files changed

+438
-3
lines changed

8 files changed

+438
-3
lines changed

opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import lombok.Setter;
2121

2222
import java.math.BigDecimal;
23+
import java.util.HashMap;
2324
import java.util.Map;
2425

2526
@Getter
@@ -40,6 +41,36 @@ public class Point {
4041
private Map<String, String> tags;
4142
private Map<String, Object> fields;
4243

44+
public Point() {
45+
this.fields = new HashMap<>();
46+
this.tags = new HashMap<>();
47+
}
48+
49+
public Point measurement(String measurement) {
50+
this.measurement = measurement;
51+
return this;
52+
}
53+
54+
public Point addTag(String key, String value) {
55+
if (key != null && value != null) {
56+
this.tags.put(key, value);
57+
}
58+
return this;
59+
}
60+
61+
public Point addField(String key, Object value) {
62+
if (key != null && value != null) {
63+
this.fields.put(key, value);
64+
}
65+
return this;
66+
}
67+
68+
public Point time(long time, Precision precision) {
69+
this.time = time;
70+
this.precision = precision;
71+
return this;
72+
}
73+
4374
/**
4475
* Calculate the line protocol string for this point
4576
*

opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import lombok.Getter;
2121
import lombok.Setter;
2222

23+
import java.util.HashMap;
24+
import java.util.Map;
25+
2326
@AllArgsConstructor
2427
@Getter
2528
@Setter
@@ -44,6 +47,8 @@ public class Query {
4447
*/
4548
private Precision precision;
4649

50+
private Map<String, Object> attributes = new HashMap<>();
51+
4752
public Query(String command) {
4853
this.command = command;
4954
}
@@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) {
5358
this.database = database;
5459
this.retentionPolicy = retentionPolicy;
5560
}
61+
62+
public Query(String command, String database, String retentionPolicy, Precision precision) {
63+
this.command = command;
64+
this.database = database;
65+
this.retentionPolicy = retentionPolicy;
66+
this.precision = precision;
67+
}
68+
69+
public void setAttribute(String key, Object value) {
70+
attributes.put(key, value);
71+
}
72+
73+
public Object getAttribute(String key) {
74+
return attributes.get(key);
75+
}
5676
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.api;
18+
19+
import lombok.Getter;
20+
import lombok.Setter;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
@Getter
26+
@Setter
27+
public class Write {
28+
private String database;
29+
private String retentionPolicy;
30+
private String measurement;
31+
private String lineProtocol;
32+
private String precision;
33+
34+
private Map<String, Object> attributes = new HashMap<>();
35+
36+
public Write(String database, String retentionPolicy, String measurement,
37+
String lineProtocol, String precision) {
38+
this.database = database;
39+
this.retentionPolicy = retentionPolicy;
40+
this.measurement = measurement;
41+
this.lineProtocol = lineProtocol;
42+
this.precision = precision;
43+
}
44+
45+
public void setAttribute(String key, Object value) {
46+
attributes.put(key, value);
47+
}
48+
49+
public Object getAttribute(String key) {
50+
return attributes.get(key);
51+
}
52+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.impl;
18+
19+
import io.opentelemetry.api.OpenTelemetry;
20+
import io.opentelemetry.api.common.Attributes;
21+
import io.opentelemetry.api.trace.Tracer;
22+
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
23+
import io.opentelemetry.sdk.OpenTelemetrySdk;
24+
import io.opentelemetry.sdk.resources.Resource;
25+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
26+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
27+
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
28+
29+
/**
30+
* Configuration for OpenTelemetry tracing.
31+
*/
32+
public class OpenTelemetryConfig {
33+
private static volatile OpenTelemetry openTelemetry;
34+
private static volatile Tracer tracer;
35+
36+
public static synchronized void initialize() {
37+
if (openTelemetry != null) {
38+
return;
39+
}
40+
41+
try {
42+
JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder()
43+
.setEndpoint("http://localhost:14250")
44+
.build();
45+
46+
BatchSpanProcessor spanProcessor = BatchSpanProcessor.builder(jaegerExporter)
47+
.setScheduleDelay(100, java.util.concurrent.TimeUnit.MILLISECONDS)
48+
.build();
49+
50+
SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
51+
.addSpanProcessor(spanProcessor)
52+
.setResource(Resource.create(
53+
Attributes.of(ResourceAttributes.SERVICE_NAME, "opengemini-client-java")
54+
))
55+
.build();
56+
57+
openTelemetry = OpenTelemetrySdk.builder()
58+
.setTracerProvider(tracerProvider)
59+
.build();
60+
61+
tracer = openTelemetry.getTracer("opengemini-client-java");
62+
} catch (Exception e) {
63+
// Fallback to no-op implementation
64+
openTelemetry = OpenTelemetry.noop();
65+
tracer = openTelemetry.getTracer("opengemini-client-java");
66+
}
67+
}
68+
69+
public static Tracer getTracer() {
70+
if (tracer == null) {
71+
initialize();
72+
}
73+
return tracer;
74+
}
75+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;
18+
19+
import io.github.openfacade.http.HttpResponse;
20+
import io.opengemini.client.api.Query;
21+
import io.opengemini.client.api.Write;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
25+
/**
26+
* Interceptor interface for OpenGemini client operations.
27+
* Allows custom logic to be executed before and after query and write operations.
28+
*/
29+
public interface Interceptor {
30+
31+
/**
32+
* Executes before a query operation.
33+
*
34+
* @param query the query to be executed
35+
* @return a CompletableFuture that completes when the interceptor logic is done
36+
*/
37+
CompletableFuture<Void> queryBefore(Query query);
38+
39+
/**
40+
* Executes after a query operation.
41+
*
42+
* @param query the query that was executed
43+
* @param response the HTTP response from the query
44+
* @return a CompletableFuture that completes when the interceptor logic is done
45+
*/
46+
CompletableFuture<Void> queryAfter(Query query, HttpResponse response);
47+
48+
/**
49+
* Executes before a write operation.
50+
*
51+
* @param write the write operation to be executed
52+
* @return a CompletableFuture that completes when the interceptor logic is done
53+
*/
54+
CompletableFuture<Void> writeBefore(Write write);
55+
56+
/**
57+
* Executes after a write operation.
58+
*
59+
* @param write the write operation that was executed
60+
* @param response the HTTP response from the write
61+
* @return a CompletableFuture that completes when the interceptor logic is done
62+
*/
63+
CompletableFuture<Void> writeAfter(Write write, HttpResponse response);
64+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;
18+
19+
import io.github.openfacade.http.HttpResponse;
20+
import io.opengemini.client.api.Query;
21+
import io.opengemini.client.api.Write;
22+
import io.opengemini.client.impl.OpenTelemetryConfig;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanKind;
25+
import io.opentelemetry.api.trace.StatusCode;
26+
27+
import java.util.concurrent.CompletableFuture;
28+
29+
public class OtelInterceptor implements Interceptor {
30+
private static final io.opentelemetry.api.trace.Tracer tracer = OpenTelemetryConfig.getTracer();
31+
32+
@Override
33+
public CompletableFuture<Void> queryBefore(Query query) {
34+
return CompletableFuture.runAsync(() -> {
35+
Span querySpan = tracer.spanBuilder("query")
36+
.setSpanKind(SpanKind.CLIENT)
37+
.setAttribute("database", query.getDatabase() != null ? query.getDatabase() : "unknown")
38+
.setAttribute("command", query.getCommand())
39+
.startSpan();
40+
41+
query.setAttribute("querySpan", querySpan);
42+
});
43+
}
44+
45+
@Override
46+
public CompletableFuture<Void> queryAfter(Query query, HttpResponse response) {
47+
return CompletableFuture.runAsync(() -> {
48+
Span querySpan = (Span) query.getAttribute("querySpan");
49+
if (querySpan == null) {
50+
return;
51+
}
52+
53+
int statusCode = response.statusCode();
54+
querySpan.setAttribute("status_code", statusCode);
55+
if (statusCode >= 400) {
56+
String errorBody = response.bodyAsString();
57+
querySpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode);
58+
querySpan.setAttribute("error.message", errorBody);
59+
querySpan.setStatus(StatusCode.ERROR, "Query failed: " + errorBody);
60+
} else {
61+
querySpan.setStatus(StatusCode.OK);
62+
}
63+
querySpan.end();
64+
});
65+
}
66+
67+
@Override
68+
public CompletableFuture<Void> writeBefore(Write write) {
69+
return CompletableFuture.runAsync(() -> {
70+
Span writeSpan = tracer.spanBuilder("write")
71+
.setSpanKind(SpanKind.CLIENT)
72+
.setAttribute("database", write.getDatabase())
73+
.setAttribute("retention_policy", write.getRetentionPolicy())
74+
.setAttribute("measurement", write.getMeasurement())
75+
.setAttribute("command", write.getLineProtocol())
76+
.startSpan();
77+
write.setAttribute("writeSpan", writeSpan);
78+
});
79+
}
80+
81+
@Override
82+
public CompletableFuture<Void> writeAfter(Write write, HttpResponse response) {
83+
return CompletableFuture.runAsync(() -> {
84+
Span writeSpan = (Span) write.getAttribute("writeSpan");
85+
if (writeSpan == null) {
86+
return;
87+
}
88+
int statusCode = response.statusCode();
89+
writeSpan.setAttribute("status_code", statusCode);
90+
91+
if (statusCode >= 400) {
92+
String errorBody = response.bodyAsString();
93+
writeSpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode);
94+
writeSpan.setAttribute("error.message", errorBody);
95+
writeSpan.setStatus(StatusCode.ERROR, "Write failed: " + errorBody);
96+
} else {
97+
writeSpan.setStatus(StatusCode.OK);
98+
}
99+
writeSpan.end();
100+
});
101+
}
102+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;

0 commit comments

Comments
 (0)