Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.influxdb.v3.netty.rest;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Promise;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

private final SslContext sslCtx;

private final Promise<FullHttpResponse> promise;

private final String host;

private final Integer port;

public ClientChannelInitializer(@Nonnull String host, @Nonnull Integer port, @Nonnull Promise<FullHttpResponse> promise, @Nullable SslContext sslCtx) {
this.sslCtx = sslCtx;
this.promise = promise;
this.host = host;
this.port = port;
}

@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast("ssl", sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new ClientHandler(this.promise));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.influxdb.v3.netty.rest;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise;

public class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

private final Promise<FullHttpResponse> promise;

public ClientHandler(Promise<FullHttpResponse> promise) {
this.promise = promise;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
// if (msg instanceof HttpResponse) {
// HttpResponse response = (HttpResponse) msg;
// System.err.println("{ START CONTENT");
// }
// if (msg instanceof HttpContent) {
// HttpContent content = (HttpContent) msg;
// System.err.print(content.content().toString(CharsetUtil.UTF_8));
// System.err.flush();
//
// if (content instanceof LastHttpContent) {
// System.err.println("} END OF CONTENT");
// }
// }
// System.out.println(msg);
this.promise.trySuccess(msg.retain());

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
this.promise.tryFailure(cause);
cause.printStackTrace();
ctx.close();
}

}
69 changes: 69 additions & 0 deletions examples/src/main/java/com/influxdb/v3/netty/rest/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.influxdb.v3.netty.rest;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.config.ClientConfig;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;

import javax.net.ssl.SSLException;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;


public class Main {
public static void main(String[] args) throws URISyntaxException, SSLException, ExecutionException, InterruptedException {
ClientConfig clientConfig = configCloud();

var testId = UUID.randomUUID().toString();
// Temporarily use `ClientConfig` as a constructor argument.
try (RestClient restClient = new RestClient(clientConfig)) {

// Get server version.
System.out.println("Server version: " + restClient.getServerVersion());

// Write data
System.out.println("Write data with testId " + testId);
var p = Point.measurement("cpu_sonnh")
.setTag("host", "server1")
.setField("usage_idle", 90.0f)
.setField("testId", testId);
var lineProtocol = p.toLineProtocol();
restClient.write(lineProtocol);

// Read data
System.out.println("Read data with testId " + testId);
String query = String.format("SELECT * FROM \"cpu_sonnh\" WHERE \"testId\" = '%s'", testId);
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
var stream = influxDBClient.queryPoints(query);
stream.findFirst().ifPresent(pointValues -> System.out.println("Field usage_idle: " + pointValues.getField("usage_idle")));

} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static ClientConfig configCloud() {
String url = System.getenv("TESTING_INFLUXDB_URL");
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
return new ClientConfig.Builder()
.host(url)
.token(token.toCharArray())
.database(database)
.build();
}

// This is a docker container ran from scripts/influxdb-setup.sh, get the token and database, url from that script
public static ClientConfig configLocal() {
String url = "localhost";
String token = "apiv3_sMYBS-vRxl6UDMylb7m2u64G6R7g61jlGL76XnUJY3EaN4MD0tZd4DZOBhe6j-dYtoVhrC6PqGgI9Xiv8d3Psw";
String database = System.getenv("bucket0");
return new ClientConfig.Builder()
.host(url)
.token(token.toCharArray())
.database(database)
.build();
}
}
221 changes: 221 additions & 0 deletions examples/src/main/java/com/influxdb/v3/netty/rest/RestClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package com.influxdb.v3.netty.rest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.influxdb.v3.client.InfluxDBApiHttpException;
import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.internal.Identity;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public final class RestClient implements AutoCloseable {

private Channel channel;

private final EventLoopGroup eventLoopGroup;

private final Promise<FullHttpResponse> promise;

private final Map<String, String> defaultHeader = new HashMap<>();

private final Integer port;

private final String host;

private SslContext sslCtx;

private final ObjectMapper objectMapper = new ObjectMapper();

private final ClientConfig config;

private final Map<String, String> defaultHeaders;

final String userAgent;

final String baseUrl;

private static final Logger LOG = LoggerFactory.getLogger(com.influxdb.v3.netty.rest.RestClient.class);

public RestClient(ClientConfig config) throws URISyntaxException, SSLException {
URI uri = new URI(config.getHost());
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
int port = uri.getPort();
if (port == -1) {
if ("http".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("https".equalsIgnoreCase(scheme)) {
port = 443;
}
}
this.port = port;
this.host = uri.getHost();

this.baseUrl = host.endsWith("/") ? host : String.format("%s/", host);

// user agent version
this.userAgent = Identity.getUserAgent();

this.config = config;

if ("https".equalsIgnoreCase(scheme)) {
this.sslCtx = SslContextBuilder.forClient().build();
}

this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());

this.promise = this.eventLoopGroup.next().newPromise();

this.defaultHeaders = config.getHeaders() != null ? Map.copyOf(config.getHeaders()) : null;

this.channel = createChannel();
}

public Channel createChannel() {
//fixme handler follow-redirect
int timeoutMillis = (int) this.config.getWriteTimeout().toMillis();
Bootstrap b = new Bootstrap();
return b.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.handler(new ClientChannelInitializer(this.host, this.port, this.promise, this.sslCtx))
.remoteAddress(this.host, this.port)
.connect()
.syncUninterruptibly()
.channel();
}

public String getServerVersion() throws InterruptedException, ExecutionException, JsonProcessingException {
FullHttpResponse response = this.request(HttpMethod.GET, "/ping", null, null);

String version = response.headers().get("x-influxdb-version");
if (version == null) {
return "unknown";
}
//fixme get version from the body
return version;
}

public void write(String lineProtocol) throws InterruptedException, ExecutionException, JsonProcessingException {
var header = new HashMap<String, String>();
header.put("content-type", "text/plain; charset=utf-8");

header.put("content-length", String.valueOf(lineProtocol.getBytes(StandardCharsets.UTF_8).length));
header.putAll(this.defaultHeader);

QueryStringEncoder encoder = new QueryStringEncoder("/api/v2/write");
encoder.addParam("bucket", "bucket0");
encoder.addParam("precision", "ns");

this.request(HttpMethod.POST, encoder.toString(), header, lineProtocol);
}

public FullHttpResponse request(@Nonnull HttpMethod method, @Nonnull String path, @Nullable Map<String, String> header, @Nullable String body) throws InterruptedException, ExecutionException, JsonProcessingException {
var content = Unpooled.EMPTY_BUFFER;
if (body != null) {
content = Unpooled.copiedBuffer(body, CharsetUtil.UTF_8);
}
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, content);

if (this.defaultHeaders != null) {
this.defaultHeaders.forEach((s, s2) -> request.headers().set(s, s2));
}

request.headers().set("user-agent", this.userAgent);
request.headers().set("host", this.host);
if (this.config.getToken() != null && this.config.getToken().length > 0) {
String authScheme = config.getAuthScheme();
if (authScheme == null) {
authScheme = "Token";
}
request.headers().set("authorization", String.format("%s %s", authScheme, new String(this.config.getToken())));
}
request.headers().set("connection", "closed");

if (header != null) {
header.forEach(request.headers()::set);
}


if (!this.channel.isOpen()) {
this.channel = createChannel();
}

this.channel.writeAndFlush(request).channel().closeFuture().sync();
FullHttpResponse fullHttpResponse = this.promise.get();

int statusCode = fullHttpResponse.status().code();
if (statusCode < 200 || statusCode >= 300) {
String reason = "";
var jsonString = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
if (!jsonString.isEmpty()) {
try {
final JsonNode root = objectMapper.readTree(jsonString);
final List<String> possibilities = List.of("message", "error_message", "error");
for (final String field : possibilities) {
final JsonNode node = root.findValue(field);
if (node != null) {
reason = node.asText();
break;
}
}
} catch (JsonProcessingException e) {
LOG.debug("Can't parse msg from response {}", fullHttpResponse);
}
}

if (reason.isEmpty()) {
for (String s : List.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error")) {
if (fullHttpResponse.headers().contains(s.toLowerCase())) {
reason = fullHttpResponse.headers().get(s);
break;
}
}
}

// if (reason.isEmpty()) {
// reason = body;
// }

if (reason.isEmpty()) {
reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase();
}
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
throw new InfluxDBApiHttpException(message, null, statusCode);
}

return fullHttpResponse;
}

@Override
public void close() {
channel.close();
eventLoopGroup.shutdownGracefully();
}
}


6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@
<version>${netty-handler.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty-handler.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
Expand Down
Loading
Loading