Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 126 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,84 @@ static int blob_request_pre_signed_url(struct flb_s3 *context,
return ret;
}

int s3_parse_presigned_url(struct flb_s3 *ctx, const char *url,
flb_sds_t *out_host, flb_sds_t *out_uri,
int *out_port)
{
int ret;
char *scheme = NULL;
char *host = NULL;
char *port = NULL;
char *uri = NULL;
int resolved_port = 0;
flb_sds_t host_sds = NULL;
flb_sds_t uri_sds = NULL;

if (out_host == NULL || out_uri == NULL || url == NULL) {
return -1;
}

ret = flb_utils_url_split(url, &scheme, &host, &port, &uri);
if (ret == -1 || host == NULL || uri == NULL) {
flb_plg_error(ctx->ins, "Invalid pre signed URL: %s", url ? url : "(null)");
goto error;
}

if (port != NULL) {
resolved_port = (int) strtoul(port, NULL, 10);
}
else if (scheme != NULL) {
if (strcasecmp(scheme, "https") == 0) {
resolved_port = 443;
}
else {
resolved_port = 80;
}
}

host_sds = flb_sds_create(host);
if (host_sds == NULL) {
goto error;
}

uri_sds = flb_sds_create(uri);
if (uri_sds == NULL) {
flb_sds_destroy(host_sds);
host_sds = NULL;
goto error;
}

if (out_port != NULL) {
*out_port = resolved_port;
}

*out_host = host_sds;
*out_uri = uri_sds;

flb_free(scheme);
flb_free(host);
flb_free(port);
flb_free(uri);

return 0;

error:
if (scheme) {
flb_free(scheme);
}
if (host) {
flb_free(host);
}
if (port) {
flb_free(port);
}
if (uri) {
flb_free(uri);
}

return -1;
}

static int blob_fetch_pre_signed_url(struct flb_s3 *context,
flb_sds_t *result_url,
char *format,
Expand Down Expand Up @@ -2467,8 +2545,14 @@ static int put_blob_object(struct flb_s3 *ctx,
int ret;
int num_headers = 0;
char *final_key;
flb_sds_t uri;
flb_sds_t uri = NULL;
flb_sds_t tmp;
flb_sds_t presigned_full = NULL;
flb_sds_t presigned_host = NULL;
const char *original_host = NULL;
char *original_upstream_host = NULL;
int original_upstream_port = 0;
int presigned_port = 0;
char final_body_md5[25];

if (ctx->authorization_endpoint_url == NULL) {
Expand Down Expand Up @@ -2499,13 +2583,33 @@ static int put_blob_object(struct flb_s3 *ctx,
uri = tmp;
}
else {
uri = NULL;
ret = blob_fetch_put_object_pre_signed_url(ctx, &presigned_full, (char *) tag, ctx->bucket, (char *) path);

if (ret != 0) {
return -1;
}

ret = blob_fetch_put_object_pre_signed_url(ctx, &uri, (char *) tag, ctx->bucket, (char *) path);
ret = s3_parse_presigned_url(ctx, presigned_full, &presigned_host, &uri, &presigned_port);
flb_sds_destroy(presigned_full);
presigned_full = NULL;

if (ret != 0) {
return -1;
}

if (presigned_port != 0 && presigned_port != ctx->port) {
flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port);
flb_sds_destroy(presigned_host);
flb_sds_destroy(uri);
return -1;
}

original_host = ctx->s3_client->host;
original_upstream_host = ctx->s3_client->upstream->tcp_host;
original_upstream_port = ctx->s3_client->upstream->tcp_port;
ctx->s3_client->host = presigned_host;
ctx->s3_client->upstream->tcp_host = presigned_host;
ctx->s3_client->upstream->tcp_port = presigned_port != 0 ? presigned_port : ctx->port;
Comment on lines +2607 to +2612
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how upstream connections are managed and whether tcp_host changes affect new connections
echo "=== Searching for tcp_host and tcp_port usage in upstream ==="
rg -n "tcp_host|tcp_port" src/flb_upstream.c -C 3

echo ""
echo "=== Checking for connection pool/reuse patterns ==="
rg -n "conn_get|conn_release|pool" src/ --type c -i | head -30

echo ""
echo "=== Looking for upstream socket/connection creation logic ==="
rg -n "upstream.*connect|upstream.*socket|flb_upstream_conn" src/ --type c | head -20

Repository: fluent/fluent-bit

Length of output: 8723


🏁 Script executed:

#!/bin/bash
# Check how connections are matched/selected from the pool
echo "=== Looking for connection pool selection/matching logic ==="
rg -n "flb_upstream_queue_get|conn_get_from|connection.*match" src/flb_upstream.c -A 10

echo ""
echo "=== Check upstream connection structure to see if host/port are part of matching ==="
rg -n "struct flb_upstream|struct flb_connection" include/ src/ --type c -A 15 | head -60

echo ""
echo "=== Verify if pool is keyed per upstream or per upstream+host+port ==="
rg -n "queue.*connection|pool.*connection" src/flb_upstream.c -i -B 2 -A 2

Repository: fluent/fluent-bit

Length of output: 7831


🏁 Script executed:

#!/bin/bash
# Check what happens after the host swap in S3 plugin
echo "=== Looking at code after host swap (lines 2607-2650 in s3.c) ==="
sed -n '2607,2650p' plugins/out_s3/s3.c

echo ""
echo "=== Checking what functions are called and if they use upstream ==="
rg -n "flb_upstream_conn_get|s3_client->upstream" plugins/out_s3/s3.c | head -20

Repository: fluent/fluent-bit

Length of output: 2903


🏁 Script executed:

#!/bin/bash
# Find what client_vtable->request does and if it uses flb_upstream_conn_get
echo "=== Search for s3_client_vtable and request function ==="
rg -n "client_vtable|vtable.*request" plugins/out_s3/ -A 5

echo ""
echo "=== Find request function implementation in S3 ==="
rg -n "^.*request.*\(.*s3_client" plugins/out_s3/s3.c -A 3 | head -40

echo ""
echo "=== Check if http_request or similar uses flb_upstream_conn_get ==="
rg -n "flb_upstream_conn_get" plugins/out_s3/ -B 3 -A 3

Repository: fluent/fluent-bit

Length of output: 3771


🏁 Script executed:

#!/bin/bash
# Find the client_vtable definition and request function implementation
echo "=== Find where client_vtable is defined ==="
rg -n "client_vtable.*=" plugins/out_s3/ -B 2 -A 10 | head -40

echo ""
echo "=== Find request function definition ==="
rg -n "\.request\s*=" plugins/out_s3/ -B 2 -A 2

echo ""
echo "=== Search for http_request_vtable or similar ==="
rg -n "struct.*vtable|s3.*request.*\(" plugins/out_s3/ --type c | grep -i "request\|vtable"

Repository: fluent/fluent-bit

Length of output: 814


🏁 Script executed:

#!/bin/bash
# Find s3_client structure and where client_vtable is assigned
echo "=== Find s3_client structure definition ==="
rg -n "struct.*s3_client|typedef.*s3_client" plugins/out_s3/ include/ --type c -A 15 | head -80

echo ""
echo "=== Search for where client_vtable is assigned ==="
rg -n "s3_client->client_vtable" plugins/out_s3/ -B 5 -A 2 | head -40

echo ""
echo "=== Look for http_request or http client implementation ==="
rg -n "flb_http_client_request\|http.*request" plugins/out_s3/ include/ --type c | head -20

Repository: fluent/fluent-bit

Length of output: 7946


🏁 Script executed:

#!/bin/bash
# Find flb_aws_client structure
echo "=== Find flb_aws_client structure ==="
rg -n "struct flb_aws_client|typedef.*flb_aws_client" include/ src/ --type c -A 20 | head -100

echo ""
echo "=== Look for aws_client in fluent-bit core ==="
fd "aws" include/ src/ --type f | head -20

Repository: fluent/fluent-bit

Length of output: 6862


🏁 Script executed:

#!/bin/bash
# Find the request_do implementation to see how it uses upstream
echo "=== Find request_do implementation ==="
rg -n "^static struct flb_http_client \*request_do" src/aws/flb_aws_util.c -A 50

echo ""
echo "=== Look for flb_upstream_conn_get in request_do or related code ==="
sed -n '66,200p' src/aws/flb_aws_util.c

Repository: fluent/fluent-bit

Length of output: 3771


🏁 Script executed:

#!/bin/bash
# Find the full request_do implementation
echo "=== Search for request_do implementation in aws files ==="
rg -n "request_do(" src/aws/ --type c -A 80 | grep -A 80 "^src/aws.*request_do"

echo ""
echo "=== Specifically look at lines around request_do in flb_aws_util.c ==="
wc -l src/aws/flb_aws_util.c

echo ""
echo "=== Get the whole flb_aws_util.c to find request_do ==="
cat src/aws/flb_aws_util.c | grep -n "request_do" -A 60 | head -100

Repository: fluent/fluent-bit

Length of output: 3925


🏁 Script executed:

#!/bin/bash
# Find request_do function body
echo "=== Find request_do function ==="
sed -n '1300,1326p' src/aws/flb_aws_util.c

echo ""
echo "=== Search backwards from end of file for request_do ==="
tail -100 src/aws/flb_aws_util.c | grep -n "request_do" -B 5 -A 50

Repository: fluent/fluent-bit

Length of output: 772


🏁 Script executed:

#!/bin/bash
# Search for request_do definition more broadly
echo "=== Search all files for request_do implementation ==="
rg -n "^[^/]*request_do\s*\(" src/ include/ --type c

echo ""
echo "=== Search for it as a static function ==="
rg -n "static.*request_do" src/ include/ --type c -A 50 | head -150

Repository: fluent/fluent-bit

Length of output: 530


🏁 Script executed:

#!/bin/bash
# Get the request_do implementation starting at line 342
echo "=== Get request_do function implementation ==="
sed -n '342,500p' src/aws/flb_aws_util.c

Repository: fluent/fluent-bit

Length of output: 5493


🏁 Script executed:

#!/bin/bash
# Check if connections in pool are already fully established (connected)
echo "=== Check connection state when retrieved from pool ==="
sed -n '750,790p' src/flb_upstream.c

Repository: fluent/fluent-bit

Length of output: 1692


Connection pooling prevents presigned URL host swap from working—requests will be sent to the original endpoint.

When a presigned URL is used, the code correctly saves the original tcp_host/tcp_port and swaps them (lines 2611–2612), then makes an HTTP request. However, the request uses flb_upstream_conn_get() to retrieve a connection from the pool. If a keepalive connection exists in the available queue, it will be recycled and returned with its socket (fd) already connected to the original host (from before the swap). Simply changing tcp_host/tcp_port variables does not reconnect an already-open socket—the request will be sent over the existing connection to the wrong endpoint.

This is not a theoretical concern: presigned URL requests will fail because they connect to the original S3 endpoint instead of the host embedded in the presigned URL.

To fix this, either:

  • Invalidate pooled connections when presigned URLs require a different host, or
  • Create a fresh connection instead of retrieving from the pool for presigned URL requests
🤖 Prompt for AI Agents
In @plugins/out_s3/s3.c around lines 2607 - 2612, The presigned URL host/port
swap (setting ctx->s3_client->host, ctx->s3_client->upstream->tcp_host and
tcp_port using presigned_host/presigned_port) is insufficient because
flb_upstream_conn_get() may return an existing keepalive socket already
connected to the original endpoint; requests will then go to the wrong host. Fix
by detecting when presigned_host/presigned_port differ from the upstream's
original tcp_host/tcp_port and in that case avoid using pooled connections:
either invalidate or bypass the pool and force creation of a new connection for
this request (i.e., do not call flb_upstream_conn_get() for presigned requests,
or clear/mark the pool entry so a fresh socket is established), ensuring the
socket is actually connected to presigned_host/presigned_port before issuing the
HTTP request.

}

memset(final_body_md5, 0, sizeof(final_body_md5));
Expand All @@ -2514,8 +2618,8 @@ static int put_blob_object(struct flb_s3 *ctx,
final_body_md5, sizeof(final_body_md5));
if (ret != 0) {
flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
flb_sds_destroy(uri);
return -1;
ret = -1;
goto cleanup;
}
}

Expand All @@ -2527,8 +2631,7 @@ static int put_blob_object(struct flb_s3 *ctx,
ret = create_headers(ctx, final_body_md5, &headers, &num_headers, FLB_FALSE);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create headers");
flb_sds_destroy(uri);
return -1;
goto cleanup;
}

c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
Expand All @@ -2545,10 +2648,9 @@ static int put_blob_object(struct flb_s3 *ctx,
*/
final_key = uri + strlen(ctx->bucket) + 1;
flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key);
flb_sds_destroy(uri);
flb_http_client_destroy(c);

return 0;
ret = 0;
goto cleanup;
}
flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
"PutObject", ctx->ins);
Expand All @@ -2559,9 +2661,21 @@ static int put_blob_object(struct flb_s3 *ctx,
}

flb_plg_error(ctx->ins, "PutObject request failed");
flb_sds_destroy(uri);
ret = -1;

return -1;
cleanup:
if (original_host != NULL) {
ctx->s3_client->host = original_host;
ctx->s3_client->upstream->tcp_host = original_upstream_host;
ctx->s3_client->upstream->tcp_port = original_upstream_port;
flb_sds_destroy(presigned_host);
}

if (uri != NULL) {
flb_sds_destroy(uri);
}

return ret;
}

static int abort_blob_upload(struct flb_s3 *ctx,
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ void multipart_upload_destroy(struct multipart_upload *m_upload);
struct flb_http_client *mock_s3_call(char *error_env_var, char *api);
int s3_plugin_under_test();

int s3_parse_presigned_url(struct flb_s3 *ctx, const char *url,
flb_sds_t *out_host, flb_sds_t *out_uri,
int *out_port);

int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size);

int create_headers(struct flb_s3 *ctx, char *body_md5,
Expand Down
Loading
Loading