3232import io .opengemini .client .api .QueryResult ;
3333import io .opengemini .client .api .RetentionPolicy ;
3434import io .opengemini .client .api .RpConfig ;
35+ import io .opengemini .client .api .Write ;
3536import io .opengemini .client .common .BaseClient ;
3637import io .opengemini .client .common .CommandFactory ;
3738import io .opengemini .client .common .HeaderConst ;
3839import io .opengemini .client .common .JacksonService ;
3940import io .opengemini .client .common .ResultMapper ;
41+ import io .opengemini .client .interceptor .Interceptor ;
4042import org .apache .commons .lang3 .StringUtils ;
4143import org .jetbrains .annotations .NotNull ;
4244
4345import java .io .IOException ;
4446import java .nio .charset .StandardCharsets ;
47+ import java .util .ArrayList ;
48+ import java .util .Collections ;
4549import java .util .List ;
4650import java .util .Optional ;
4751import java .util .StringJoiner ;
4852import java .util .concurrent .CompletableFuture ;
4953
5054public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
55+ private final List <Interceptor > interceptors = new ArrayList <>();
5156 protected final Configuration conf ;
52-
5357 private final HttpClient client ;
5458
5559 public OpenGeminiClient (@ NotNull Configuration conf ) {
5660 super (conf );
5761 this .conf = conf ;
5862 AuthConfig authConfig = conf .getAuthConfig ();
5963 HttpClientConfig httpConfig = conf .getHttpConfig ();
64+ if (httpConfig == null ) {
65+ httpConfig = new HttpClientConfig .Builder ().build ();
66+ conf .setHttpConfig (httpConfig );
67+ }
6068 if (authConfig != null && authConfig .getAuthType ().equals (AuthType .PASSWORD )) {
6169 httpConfig .addRequestFilter (
6270 new BasicAuthRequestFilter (authConfig .getUsername (), String .valueOf (authConfig .getPassword ())));
6371 }
6472 this .client = HttpClientFactory .createHttpClient (httpConfig );
6573 }
6674
75+ public void addInterceptors (Interceptor ... interceptors ) {
76+ Collections .addAll (this .interceptors , interceptors );
77+ }
78+
6779 /**
6880 * {@inheritDoc}
6981 */
@@ -195,9 +207,21 @@ public CompletableFuture<Pong> ping() {
195207 *
196208 * @param query the query to execute.
197209 */
198- protected CompletableFuture <QueryResult > executeQuery (Query query ) {
199- String queryUrl = getQueryUrl (query );
200- return get (queryUrl ).thenCompose (response -> convertResponse (response , QueryResult .class ));
210+ public CompletableFuture <QueryResult > executeQuery (Query query ) {
211+ CompletableFuture <Void > beforeFutures = CompletableFuture .allOf (
212+ interceptors .stream ()
213+ .map (interceptor -> interceptor .queryBefore (query ))
214+ .toArray (CompletableFuture []::new )
215+ );
216+
217+ return beforeFutures .thenCompose (voidResult -> executeHttpQuery (query ).thenCompose (response -> {
218+ CompletableFuture <Void > afterFutures = CompletableFuture .allOf (
219+ interceptors .stream ()
220+ .map (interceptor -> interceptor .queryAfter (query , response ))
221+ .toArray (CompletableFuture []::new )
222+ );
223+ return afterFutures .thenCompose (voidResult2 -> convertResponse (response , QueryResult .class ));
224+ }));
201225 }
202226
203227 /**
@@ -217,9 +241,31 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
217241 * @param retentionPolicy the name of the retention policy.
218242 * @param lineProtocol the line protocol string to write.
219243 */
220- protected CompletableFuture <Void > executeWrite (String database , String retentionPolicy , String lineProtocol ) {
221- String writeUrl = getWriteUrl (database , retentionPolicy );
222- return post (writeUrl , lineProtocol ).thenCompose (response -> convertResponse (response , Void .class ));
244+ public CompletableFuture <Void > executeWrite (String database , String retentionPolicy , String lineProtocol ) {
245+ Write write = new Write (
246+ database ,
247+ retentionPolicy ,
248+ lineProtocol ,
249+ "ns"
250+ );
251+
252+ CompletableFuture <Void > beforeFutures = CompletableFuture .allOf (
253+ interceptors .stream ()
254+ .map (interceptor -> interceptor .writeBefore (write ))
255+ .toArray (CompletableFuture []::new )
256+ );
257+
258+ return beforeFutures .thenCompose (voidResult ->
259+ executeHttpWrite (write ).thenCompose (response -> {
260+ CompletableFuture <Void > afterFutures = CompletableFuture .allOf (
261+ interceptors .stream ()
262+ .map (interceptor -> interceptor .writeAfter (write , response ))
263+ .toArray (CompletableFuture []::new )
264+ );
265+ return afterFutures .thenCompose (voidResult2 ->
266+ convertResponse (response , Void .class ));
267+ })
268+ );
223269 }
224270
225271 /**
@@ -258,7 +304,7 @@ private CompletableFuture<HttpResponse> get(String url) {
258304
259305 private CompletableFuture <HttpResponse > post (String url , String body ) {
260306 return client .post (buildUriWithPrefix (url ), body == null ? new byte [0 ] : body .getBytes (StandardCharsets .UTF_8 ),
261- headers );
307+ headers );
262308 }
263309
264310 @ Override
@@ -270,4 +316,14 @@ public void close() throws IOException {
270316 public String toString () {
271317 return "OpenGeminiClient{" + "httpEngine=" + conf .getHttpConfig ().engine () + '}' ;
272318 }
319+
320+ private CompletableFuture <HttpResponse > executeHttpQuery (Query query ) {
321+ String queryUrl = getQueryUrl (query );
322+ return get (queryUrl );
323+ }
324+
325+ private CompletableFuture <HttpResponse > executeHttpWrite (Write write ) {
326+ String writeUrl = getWriteUrl (write .getDatabase (), write .getRetentionPolicy ());
327+ return post (writeUrl , write .getLineProtocol ());
328+ }
273329}
0 commit comments