diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java index d6beb138..c30efb75 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java @@ -20,6 +20,9 @@ import lombok.Getter; import lombok.Setter; +import java.util.HashMap; +import java.util.Map; + @AllArgsConstructor @Getter @Setter @@ -44,6 +47,8 @@ public class Query { */ private Precision precision; + private Map attributes = new HashMap<>(); + public Query(String command) { this.command = command; } @@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) { this.database = database; this.retentionPolicy = retentionPolicy; } + + public Query(String command, String database, String retentionPolicy, Precision precision) { + this.command = command; + this.database = database; + this.retentionPolicy = retentionPolicy; + this.precision = precision; + } + + public void setAttribute(String key, Object value) { + attributes.put(key, value); + } + + public Object getAttribute(String key) { + return attributes.get(key); + } } diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java new file mode 100644 index 00000000..3acee234 --- /dev/null +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.api; + +import lombok.Getter; +import lombok.Setter; + +import java.util.HashMap; +import java.util.Map; + +@Getter +@Setter +public class Write { + private String database; + private String retentionPolicy; + private String lineProtocol; + private String precision; + + private Map attributes = new HashMap<>(); + + public Write(String database, String retentionPolicy, String lineProtocol, String precision) { + this.database = database; + this.retentionPolicy = retentionPolicy; + this.lineProtocol = lineProtocol; + this.precision = precision; + } + + public void setAttribute(String key, Object value) { + attributes.put(key, value); + } + + public Object getAttribute(String key) { + return attributes.get(key); + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index 7621b907..b3bc786b 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -32,24 +32,28 @@ import io.opengemini.client.api.QueryResult; import io.opengemini.client.api.RetentionPolicy; import io.opengemini.client.api.RpConfig; +import io.opengemini.client.api.Write; import io.opengemini.client.common.BaseClient; import io.opengemini.client.common.CommandFactory; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; import io.opengemini.client.common.ResultMapper; +import io.opengemini.client.interceptor.Interceptor; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient { + private final List interceptors = new ArrayList<>(); protected final Configuration conf; - private final HttpClient client; public OpenGeminiClient(@NotNull Configuration conf) { @@ -57,6 +61,10 @@ public OpenGeminiClient(@NotNull Configuration conf) { this.conf = conf; AuthConfig authConfig = conf.getAuthConfig(); HttpClientConfig httpConfig = conf.getHttpConfig(); + if (httpConfig == null) { + httpConfig = new HttpClientConfig.Builder().build(); + conf.setHttpConfig(httpConfig); + } if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) { httpConfig.addRequestFilter( new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword()))); @@ -64,6 +72,10 @@ public OpenGeminiClient(@NotNull Configuration conf) { this.client = HttpClientFactory.createHttpClient(httpConfig); } + public void addInterceptors(Interceptor... interceptors) { + Collections.addAll(this.interceptors, interceptors); + } + /** * {@inheritDoc} */ @@ -195,9 +207,21 @@ public CompletableFuture ping() { * * @param query the query to execute. */ - protected CompletableFuture executeQuery(Query query) { - String queryUrl = getQueryUrl(query); - return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class)); + public CompletableFuture executeQuery(Query query) { + CompletableFuture beforeFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.queryBefore(query)) + .toArray(CompletableFuture[]::new) + ); + + return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> { + CompletableFuture afterFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.queryAfter(query, response)) + .toArray(CompletableFuture[]::new) + ); + return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class)); + })); } /** @@ -217,9 +241,31 @@ protected CompletableFuture executePostQuery(Query query) { * @param retentionPolicy the name of the retention policy. * @param lineProtocol the line protocol string to write. */ - protected CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { - String writeUrl = getWriteUrl(database, retentionPolicy); - return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class)); + public CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { + Write write = new Write( + database, + retentionPolicy, + lineProtocol, + "ns" + ); + + CompletableFuture beforeFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.writeBefore(write)) + .toArray(CompletableFuture[]::new) + ); + + return beforeFutures.thenCompose(voidResult -> + executeHttpWrite(write).thenCompose(response -> { + CompletableFuture afterFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.writeAfter(write, response)) + .toArray(CompletableFuture[]::new) + ); + return afterFutures.thenCompose(voidResult2 -> + convertResponse(response, Void.class)); + }) + ); } /** @@ -258,7 +304,7 @@ private CompletableFuture get(String url) { private CompletableFuture post(String url, String body) { return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8), - headers); + headers); } @Override @@ -270,4 +316,14 @@ public void close() throws IOException { public String toString() { return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}'; } + + private CompletableFuture executeHttpQuery(Query query) { + String queryUrl = getQueryUrl(query); + return get(queryUrl); + } + + private CompletableFuture executeHttpWrite(Write write) { + String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy()); + return post(writeUrl, write.getLineProtocol()); + } } diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java new file mode 100644 index 00000000..2aa5ce52 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright 2025 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl; + +import io.opentelemetry.api.trace.Tracer; +import lombok.Getter; +import lombok.Setter; + +/** + * Configuration for OpenTelemetry tracing. + */ +@Getter +@Setter +public class OpenTelemetryConfig { + private volatile Tracer tracer; +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java b/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java new file mode 100644 index 00000000..8584cd2f --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java @@ -0,0 +1,64 @@ +/* + * Copyright 2025 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; + +import io.github.openfacade.http.HttpResponse; +import io.opengemini.client.api.Query; +import io.opengemini.client.api.Write; + +import java.util.concurrent.CompletableFuture; + +/** + * Interceptor interface for OpenGemini client operations. + * Allows custom logic to be executed before and after query and write operations. + */ +public interface Interceptor { + + /** + * Executes before a query operation. + * + * @param query the query to be executed + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture queryBefore(Query query); + + /** + * Executes after a query operation. + * + * @param query the query that was executed + * @param response the HTTP response from the query + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture queryAfter(Query query, HttpResponse response); + + /** + * Executes before a write operation. + * + * @param write the write operation to be executed + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture writeBefore(Write write); + + /** + * Executes after a write operation. + * + * @param write the write operation that was executed + * @param response the HTTP response from the write + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture writeAfter(Write write, HttpResponse response); +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java b/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java new file mode 100644 index 00000000..46acf55b --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java @@ -0,0 +1,102 @@ +/* + * Copyright 2025 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; + +import io.github.openfacade.http.HttpResponse; +import io.opengemini.client.api.Query; +import io.opengemini.client.api.Write; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.CompletableFuture; + +@Getter +@Setter +public class OtelInterceptor implements Interceptor { + private io.opentelemetry.api.trace.Tracer tracer; + + @Override + public CompletableFuture queryBefore(Query query) { + return CompletableFuture.runAsync(() -> { + Span querySpan = tracer.spanBuilder("query") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("database", query.getDatabase() != null ? query.getDatabase() : "unknown") + .setAttribute("command", query.getCommand()) + .startSpan(); + + query.setAttribute("querySpan", querySpan); + }); + } + + @Override + public CompletableFuture queryAfter(Query query, HttpResponse response) { + return CompletableFuture.runAsync(() -> { + Span querySpan = (Span) query.getAttribute("querySpan"); + if (querySpan == null) { + return; + } + + int statusCode = response.statusCode(); + querySpan.setAttribute("status_code", statusCode); + if (statusCode >= 400) { + String errorBody = response.bodyAsString(); + querySpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode); + querySpan.setAttribute("error.message", errorBody); + } else { + querySpan.setStatus(StatusCode.OK); + } + querySpan.end(); + }); + } + + @Override + public CompletableFuture writeBefore(Write write) { + return CompletableFuture.runAsync(() -> { + Span writeSpan = tracer.spanBuilder("write") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("database", write.getDatabase()) + .setAttribute("retention_policy", write.getRetentionPolicy()) + .setAttribute("command", write.getLineProtocol()) + .startSpan(); + write.setAttribute("writeSpan", writeSpan); + }); + } + + @Override + public CompletableFuture writeAfter(Write write, HttpResponse response) { + return CompletableFuture.runAsync(() -> { + Span writeSpan = (Span) write.getAttribute("writeSpan"); + if (writeSpan == null) { + return; + } + int statusCode = response.statusCode(); + writeSpan.setAttribute("status_code", statusCode); + + if (statusCode >= 400) { + String errorBody = response.bodyAsString(); + writeSpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode); + writeSpan.setAttribute("error.message", errorBody); + } else { + writeSpan.setStatus(StatusCode.OK); + } + writeSpan.end(); + }); + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/interceptor/package-info.java b/opengemini-client/src/main/java/io/opengemini/client/interceptor/package-info.java new file mode 100644 index 00000000..7ca25f7f --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/interceptor/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright 2025 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; diff --git a/pom.xml b/pom.xml index 70c3dca8..81b009f3 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,9 @@ 2.1.0 5.13.0 4.12.0 + 1.32.0 + 1.28.0-alpha + 1.48.0 10.18.1 3.25.1 3.25.1 @@ -101,6 +104,11 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + io.grpc grpc-bom @@ -112,6 +120,26 @@ + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-jaeger + ${opentelemetry-exporter.version} + + + io.opentelemetry + opentelemetry-semconv + ${opentelemetry-semconv.version} + org.projectlombok lombok