Skip to content

Commit 8ce14d2

Browse files
committed
feat: integrate OpenTelemetry observability
Signed-off-by: chenhuan <xiangyuyu_2024@qq.com>
1 parent 5dddefb commit 8ce14d2

File tree

5 files changed

+323
-9
lines changed

5 files changed

+323
-9
lines changed

opengemini-client/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,17 @@
7272
<version>${vertx.version}</version>
7373
</dependency>
7474
</dependencies>
75+
<build>
76+
<plugins>
77+
<plugin>
78+
<groupId>org.apache.maven.plugins</groupId>
79+
<artifactId>maven-compiler-plugin</artifactId>
80+
<configuration>
81+
<source>9</source>
82+
<target>9</target>
83+
</configuration>
84+
</plugin>
85+
</plugins>
86+
</build>
7587

7688
</project>

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,50 @@
3232
import io.opengemini.client.api.QueryResult;
3333
import io.opengemini.client.api.RetentionPolicy;
3434
import io.opengemini.client.api.RpConfig;
35+
import io.opengemini.client.api.Write;
3536
import io.opengemini.client.common.BaseClient;
3637
import io.opengemini.client.common.CommandFactory;
3738
import io.opengemini.client.common.HeaderConst;
3839
import io.opengemini.client.common.JacksonService;
3940
import io.opengemini.client.common.ResultMapper;
41+
import io.opengemini.client.interceptor.Interceptor;
4042
import org.apache.commons.lang3.StringUtils;
4143
import org.jetbrains.annotations.NotNull;
4244

4345
import java.io.IOException;
4446
import java.nio.charset.StandardCharsets;
47+
import java.util.ArrayList;
48+
import java.util.Collections;
4549
import java.util.List;
4650
import java.util.Optional;
4751
import java.util.StringJoiner;
4852
import java.util.concurrent.CompletableFuture;
4953

5054
public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
55+
private final List<Interceptor> interceptors = new ArrayList<>();
5156
protected final Configuration conf;
52-
5357
private final HttpClient client;
5458

5559
public OpenGeminiClient(@NotNull Configuration conf) {
5660
super(conf);
5761
this.conf = conf;
5862
AuthConfig authConfig = conf.getAuthConfig();
5963
HttpClientConfig httpConfig = conf.getHttpConfig();
64+
if (httpConfig == null) {
65+
httpConfig = new HttpClientConfig.Builder().build();
66+
conf.setHttpConfig(httpConfig);
67+
}
6068
if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) {
6169
httpConfig.addRequestFilter(
6270
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
6371
}
6472
this.client = HttpClientFactory.createHttpClient(httpConfig);
6573
}
6674

75+
public void addInterceptors(Interceptor... interceptors) {
76+
Collections.addAll(this.interceptors, interceptors);
77+
}
78+
6779
/**
6880
* {@inheritDoc}
6981
*/
@@ -195,9 +207,21 @@ public CompletableFuture<Pong> ping() {
195207
*
196208
* @param query the query to execute.
197209
*/
198-
protected CompletableFuture<QueryResult> executeQuery(Query query) {
199-
String queryUrl = getQueryUrl(query);
200-
return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class));
210+
public CompletableFuture<QueryResult> executeQuery(Query query) {
211+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
212+
interceptors.stream()
213+
.map(interceptor -> interceptor.queryBefore(query))
214+
.toArray(CompletableFuture[]::new)
215+
);
216+
217+
return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
218+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
219+
interceptors.stream()
220+
.map(interceptor -> interceptor.queryAfter(query, response))
221+
.toArray(CompletableFuture[]::new)
222+
);
223+
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
224+
}));
201225
}
202226

203227
/**
@@ -217,9 +241,32 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
217241
* @param retentionPolicy the name of the retention policy.
218242
* @param lineProtocol the line protocol string to write.
219243
*/
220-
protected CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
221-
String writeUrl = getWriteUrl(database, retentionPolicy);
222-
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
244+
public CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
245+
Write write = new Write(
246+
database,
247+
retentionPolicy,
248+
"default_measurement",
249+
lineProtocol,
250+
"ns"
251+
);
252+
253+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
254+
interceptors.stream()
255+
.map(interceptor -> interceptor.writeBefore(write))
256+
.toArray(CompletableFuture[]::new)
257+
);
258+
259+
return beforeFutures.thenCompose(voidResult ->
260+
executeHttpWrite(write).thenCompose(response -> {
261+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
262+
interceptors.stream()
263+
.map(interceptor -> interceptor.writeAfter(write, response))
264+
.toArray(CompletableFuture[]::new)
265+
);
266+
return afterFutures.thenCompose(voidResult2 ->
267+
convertResponse(response, Void.class));
268+
})
269+
);
223270
}
224271

225272
/**
@@ -258,7 +305,7 @@ private CompletableFuture<HttpResponse> get(String url) {
258305

259306
private CompletableFuture<HttpResponse> post(String url, String body) {
260307
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
261-
headers);
308+
headers);
262309
}
263310

264311
@Override
@@ -270,4 +317,14 @@ public void close() throws IOException {
270317
public String toString() {
271318
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
272319
}
320+
321+
private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
322+
String queryUrl = getQueryUrl(query);
323+
return get(queryUrl);
324+
}
325+
326+
private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
327+
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
328+
return post(writeUrl, write.getLineProtocol());
329+
}
273330
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import io.opengemini.client.api.BatchConfig;
2222
import io.opengemini.client.api.Configuration;
2323
import io.opengemini.client.api.OpenGeminiException;
24+
import io.opengemini.client.interceptor.Interceptor;
25+
import io.opengemini.client.interceptor.OtelInterceptor;
2426
import org.jetbrains.annotations.NotNull;
2527

28+
import java.util.List;
29+
2630
public class OpenGeminiClientFactory {
2731
public static OpenGeminiClient create(@NotNull Configuration configuration) throws OpenGeminiException {
2832
if (configuration.getAddresses() == null || configuration.getAddresses().isEmpty()) {
@@ -54,4 +58,13 @@ public static OpenGeminiClient create(@NotNull Configuration configuration) thro
5458
}
5559
return new OpenGeminiClient(configuration);
5660
}
61+
62+
public static OpenGeminiClient createClientWithInterceptors(Configuration config) throws OpenGeminiException {
63+
OpenGeminiClient client = create(config);
64+
List<Interceptor> interceptors = List.of(
65+
new OtelInterceptor()
66+
);
67+
client.addInterceptors(interceptors.toArray(new Interceptor[0]));
68+
return client;
69+
}
5770
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;
18+
19+
import io.github.openfacade.http.HttpClientConfig;
20+
import io.opengemini.client.api.Address;
21+
import io.opengemini.client.api.AuthConfig;
22+
import io.opengemini.client.api.AuthType;
23+
import io.opengemini.client.api.Configuration;
24+
import io.opengemini.client.api.Query;
25+
import io.opengemini.client.api.QueryResult;
26+
import io.opengemini.client.api.Write;
27+
import io.opengemini.client.impl.OpenGeminiClient;
28+
import io.opengemini.client.impl.OpenGeminiClientFactory;
29+
import io.opengemini.client.impl.OpenTelemetryConfig;
30+
import lombok.Getter;
31+
import lombok.Setter;
32+
import org.junit.jupiter.api.Assertions;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
import java.time.Duration;
40+
import java.util.Collections;
41+
import java.util.List;
42+
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.TimeUnit;
45+
46+
/**
47+
* Example demonstrating OpenGemini client usage with interceptors.
48+
*/
49+
50+
@Getter
51+
public class TracingIntegrationTest {
52+
@Setter
53+
private String database;
54+
@Setter
55+
private String retentionPolicy;
56+
@Setter
57+
private String lineProtocol;
58+
private static final Logger LOG = LoggerFactory.getLogger(TracingIntegrationTest.class);
59+
60+
private OpenGeminiClient openGeminiClient;
61+
62+
@BeforeEach
63+
void setUp() {
64+
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
65+
.connectTimeout(Duration.ofSeconds(3))
66+
.timeout(Duration.ofSeconds(3))
67+
.build();
68+
Configuration configuration = Configuration.builder()
69+
.addresses(Collections.singletonList(new Address("127.0.0.1", 8086)))
70+
.httpConfig(httpConfig)
71+
.authConfig(new AuthConfig(AuthType.PASSWORD, "test", "testPwd123@".toCharArray(), null))
72+
.gzipEnabled(false)
73+
.build();
74+
this.openGeminiClient = new OpenGeminiClient(configuration);
75+
}
76+
77+
@Test
78+
void testClientCreation() {
79+
Configuration config = new Configuration();
80+
config.setAddresses(List.of(new Address("localhost", 8086)));
81+
if (config.getHttpConfig() == null) {
82+
config.setHttpConfig(new HttpClientConfig.Builder().build());
83+
}
84+
85+
Assertions.assertDoesNotThrow(() -> {
86+
OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config);
87+
Assertions.assertNotNull(client, "OpenGeminiClient should be created successfully");
88+
client.close();
89+
}, "Client creation should not throw an exception");
90+
}
91+
92+
@Test
93+
void testDatabaseCreation() {
94+
Configuration config = new Configuration();
95+
config.setAddresses(List.of(new Address("localhost", 8086)));
96+
if (config.getHttpConfig() == null) {
97+
config.setHttpConfig(new HttpClientConfig.Builder().build());
98+
}
99+
100+
Assertions.assertDoesNotThrow(() -> {
101+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
102+
Query createDbQuery = new Query("CREATE DATABASE test_db");
103+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
104+
}
105+
}, "Database creation should not throw an exception");
106+
}
107+
108+
@Test
109+
void testQueryOperation() {
110+
Configuration config = new Configuration();
111+
config.setAddresses(java.util.Collections.singletonList(new Address("localhost", 8086)));
112+
if (config.getHttpConfig() == null) {
113+
config.setHttpConfig(new HttpClientConfig.Builder().build());
114+
}
115+
116+
Assertions.assertDoesNotThrow(() -> {
117+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
118+
Query createDbQuery = new Query("CREATE DATABASE test_db");
119+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
120+
121+
Query showDbQuery = new Query("SHOW DATABASES");
122+
QueryResult result = client.query(showDbQuery).get(10, TimeUnit.SECONDS);
123+
Assertions.assertNotNull(result, "Query result should not be null");
124+
}
125+
}, "Query operation should not throw an exception");
126+
}
127+
128+
@BeforeAll
129+
static void initializeTracing() {
130+
OpenTelemetryConfig.initialize();
131+
}
132+
133+
@Test
134+
void testWriteOperation() {
135+
Configuration config = new Configuration();
136+
config.setAddresses(java.util.Collections.singletonList(
137+
new Address("localhost", 8086)));
138+
139+
if (config.getHttpConfig() == null) {
140+
config.setHttpConfig(new HttpClientConfig.Builder().build());
141+
}
142+
143+
Assertions.assertDoesNotThrow(() -> {
144+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
145+
Query createDbQuery = new Query("CREATE DATABASE test_db");
146+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
147+
148+
Thread.sleep(1000);
149+
150+
Write write = new Write(
151+
"test_db",
152+
"autogen",
153+
"temperature",
154+
"temperature,location=room1 value=25.5 " + System.currentTimeMillis(),
155+
"ns"
156+
);
157+
158+
client.executeWrite(
159+
write.getDatabase(),
160+
write.getRetentionPolicy(),
161+
write.getLineProtocol()
162+
).get(10, TimeUnit.SECONDS);
163+
}
164+
}, "Write operation should not throw an exception");
165+
}
166+
167+
@Test
168+
void testTracingIntegration() throws ExecutionException, InterruptedException {
169+
String databaseTestName = "tracing_test_db";
170+
CompletableFuture<Void> createdb = openGeminiClient.createDatabase(databaseTestName);
171+
createdb.get();
172+
173+
Assertions.assertDoesNotThrow(() -> {
174+
175+
Write write = new Write(
176+
"tracing_test_db",
177+
"autogen",
178+
"tracing_measurement",
179+
"tracing_measurement,tag=test value=8 " + System.currentTimeMillis(),
180+
"ns"
181+
);
182+
183+
openGeminiClient.executeWrite(
184+
write.getDatabase(),
185+
write.getRetentionPolicy(),
186+
write.getLineProtocol()
187+
).get(10, TimeUnit.SECONDS);
188+
189+
Query query = new Query("SELECT * FROM tracing_measurement");
190+
openGeminiClient.query(query).get(10, TimeUnit.SECONDS);
191+
192+
}, "Tracing integration should not throw an exception");
193+
}
194+
}

0 commit comments

Comments
 (0)