Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;

@AllArgsConstructor
@Getter
@Setter
Expand All @@ -44,6 +47,8 @@ public class Query {
*/
private Precision precision;

private Map<String, Object> attributes = new HashMap<>();

public Query(String command) {
this.command = command;
}
Expand All @@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) {
this.database = database;
this.retentionPolicy = retentionPolicy;
}

public Query(String command, String database, String retentionPolicy, Precision precision) {
this.command = command;
this.database = database;
this.retentionPolicy = retentionPolicy;
this.precision = precision;
}

public void setAttribute(String key, Object value) {
attributes.put(key, value);
}

public Object getAttribute(String key) {
return attributes.get(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2025 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.api;

import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;

@Getter
@Setter
public class Write {
private String database;
private String retentionPolicy;
private String lineProtocol;
private String precision;

private Map<String, Object> attributes = new HashMap<>();

public Write(String database, String retentionPolicy, String lineProtocol, String precision) {
this.database = database;
this.retentionPolicy = retentionPolicy;
this.lineProtocol = lineProtocol;
this.precision = precision;
}

public void setAttribute(String key, Object value) {
attributes.put(key, value);
}

public Object getAttribute(String key) {
return attributes.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,50 @@
import io.opengemini.client.api.QueryResult;
import io.opengemini.client.api.RetentionPolicy;
import io.opengemini.client.api.RpConfig;
import io.opengemini.client.api.Write;
import io.opengemini.client.common.BaseClient;
import io.opengemini.client.common.CommandFactory;
import io.opengemini.client.common.HeaderConst;
import io.opengemini.client.common.JacksonService;
import io.opengemini.client.common.ResultMapper;
import io.opengemini.client.interceptor.Interceptor;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;

public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
private final List<Interceptor> interceptors = new ArrayList<>();
protected final Configuration conf;

private final HttpClient client;

public OpenGeminiClient(@NotNull Configuration conf) {
super(conf);
this.conf = conf;
AuthConfig authConfig = conf.getAuthConfig();
HttpClientConfig httpConfig = conf.getHttpConfig();
if (httpConfig == null) {
httpConfig = new HttpClientConfig.Builder().build();
conf.setHttpConfig(httpConfig);
}
if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) {
httpConfig.addRequestFilter(
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
}
this.client = HttpClientFactory.createHttpClient(httpConfig);
}

public void addInterceptors(Interceptor... interceptors) {
Collections.addAll(this.interceptors, interceptors);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -195,9 +207,21 @@ public CompletableFuture<Pong> ping() {
*
* @param query the query to execute.
*/
protected CompletableFuture<QueryResult> executeQuery(Query query) {
String queryUrl = getQueryUrl(query);
return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class));
public CompletableFuture<QueryResult> executeQuery(Query query) {
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.queryBefore(query))
.toArray(CompletableFuture[]::new)
);

return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.queryAfter(query, response))
.toArray(CompletableFuture[]::new)
);
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
}));
}

/**
Expand All @@ -217,9 +241,31 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
* @param retentionPolicy the name of the retention policy.
* @param lineProtocol the line protocol string to write.
*/
protected CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
String writeUrl = getWriteUrl(database, retentionPolicy);
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
public CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
Write write = new Write(
database,
retentionPolicy,
lineProtocol,
"ns"
);

CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.writeBefore(write))
.toArray(CompletableFuture[]::new)
);

return beforeFutures.thenCompose(voidResult ->
executeHttpWrite(write).thenCompose(response -> {
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.writeAfter(write, response))
.toArray(CompletableFuture[]::new)
);
return afterFutures.thenCompose(voidResult2 ->
convertResponse(response, Void.class));
})
);
}

/**
Expand Down Expand Up @@ -258,7 +304,7 @@ private CompletableFuture<HttpResponse> get(String url) {

private CompletableFuture<HttpResponse> post(String url, String body) {
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
headers);
headers);
}

@Override
Expand All @@ -270,4 +316,14 @@ public void close() throws IOException {
public String toString() {
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
}

private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
String queryUrl = getQueryUrl(query);
return get(queryUrl);
}

private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
return post(writeUrl, write.getLineProtocol());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2025 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.impl;

import io.opentelemetry.api.trace.Tracer;
import lombok.Getter;
import lombok.Setter;

/**
* Configuration for OpenTelemetry tracing.
*/
@Getter
@Setter
public class OpenTelemetryConfig {
private volatile Tracer tracer;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2025 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.interceptor;

import io.github.openfacade.http.HttpResponse;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.Write;

import java.util.concurrent.CompletableFuture;

/**
* Interceptor interface for OpenGemini client operations.
* Allows custom logic to be executed before and after query and write operations.
*/
public interface Interceptor {

/**
* Executes before a query operation.
*
* @param query the query to be executed
* @return a CompletableFuture that completes when the interceptor logic is done
*/
CompletableFuture<Void> queryBefore(Query query);

/**
* Executes after a query operation.
*
* @param query the query that was executed
* @param response the HTTP response from the query
* @return a CompletableFuture that completes when the interceptor logic is done
*/
CompletableFuture<Void> queryAfter(Query query, HttpResponse response);

/**
* Executes before a write operation.
*
* @param write the write operation to be executed
* @return a CompletableFuture that completes when the interceptor logic is done
*/
CompletableFuture<Void> writeBefore(Write write);

/**
* Executes after a write operation.
*
* @param write the write operation that was executed
* @param response the HTTP response from the write
* @return a CompletableFuture that completes when the interceptor logic is done
*/
CompletableFuture<Void> writeAfter(Write write, HttpResponse response);
}
Loading