From 7d5efb2d7712f1cb0dbd41c5a190fd19453cc787 Mon Sep 17 00:00:00 2001 From: Johannes Vetter Date: Thu, 1 Jan 2026 14:30:16 -0800 Subject: [PATCH 1/2] Normalize endpoints to avoid endless memory growth --- .../plugins/retries/clock_skew.rb | 44 ++- .../aws/plugins/retries/clock_skew_spec.rb | 294 +++++++++++++++++- 2 files changed, 313 insertions(+), 25 deletions(-) diff --git a/gems/aws-sdk-core/lib/aws-sdk-core/plugins/retries/clock_skew.rb b/gems/aws-sdk-core/lib/aws-sdk-core/plugins/retries/clock_skew.rb index 877d0e30e4f..acb600079b4 100644 --- a/gems/aws-sdk-core/lib/aws-sdk-core/plugins/retries/clock_skew.rb +++ b/gems/aws-sdk-core/lib/aws-sdk-core/plugins/retries/clock_skew.rb @@ -3,10 +3,8 @@ module Aws module Plugins module Retries - # @api private class ClockSkew - CLOCK_SKEW_THRESHOLD = 5 * 60 # five minutes def initialize @@ -22,9 +20,9 @@ def initialize end # Gets the clock_correction in seconds to apply to a given endpoint - # @param endpoint [URI / String] + # @param endpoint [URI, String] def clock_correction(endpoint) - @mutex.synchronize { @endpoint_clock_corrections[endpoint.to_s] } + @mutex.synchronize { @endpoint_clock_corrections[normalized_endpoint(endpoint)] } end # The estimated skew factors in any clock skew from @@ -35,7 +33,7 @@ def clock_correction(endpoint) # Estimated Skew should not be used to correct clock skew errors # it should only be used to estimate TTL for a request def estimated_skew(endpoint) - @mutex.synchronize { @endpoint_estimated_skews[endpoint.to_s] } + @mutex.synchronize { @endpoint_estimated_skews[normalized_endpoint(endpoint)] } end # Determines whether a request has clock skew by comparing @@ -55,9 +53,9 @@ def update_clock_correction(context) endpoint = context.http_request.endpoint now_utc = Time.now.utc server_time = server_time(context.http_response) - if server_time && (now_utc - server_time).abs > CLOCK_SKEW_THRESHOLD - set_clock_correction(endpoint, server_time - now_utc) - end + return unless server_time && (now_utc - server_time).abs > CLOCK_SKEW_THRESHOLD + + set_clock_correction(normalized_endpoint(endpoint), server_time - now_utc) end # Called for every request @@ -69,20 +67,35 @@ def update_estimated_skew(context) now_utc = Time.now.utc server_time = server_time(context.http_response) return unless server_time + @mutex.synchronize do - @endpoint_estimated_skews[endpoint.to_s] = server_time - now_utc + @endpoint_estimated_skews[normalized_endpoint(endpoint)] = server_time - now_utc end end private + ## + # @param endpoint [URI, String] + # the endpoint to normalize + # + # @return [String] + # the endpoint's schema, host, and port - without any path or query arguments + def normalized_endpoint(endpoint) + uri = endpoint.is_a?(URI::Generic) ? endpoint : URI(endpoint.to_s) + + return endpoint.to_s unless uri.scheme && uri.host + + "#{uri.scheme}://#{uri.host}:#{uri.port}" + rescue URI::InvalidURIError + endpoint.to_s + end + # @param response [Seahorse::Client::Http::Response:] def server_time(response) - begin - Time.parse(response.headers['date']).utc - rescue - nil - end + Time.parse(response.headers['date']).utc + rescue StandardError + nil end # Sets the clock correction for an endpoint @@ -90,11 +103,10 @@ def server_time(response) # @param correction [Number] def set_clock_correction(endpoint, correction) @mutex.synchronize do - @endpoint_clock_corrections[endpoint.to_s] = correction + @endpoint_clock_corrections[normalized_endpoint(endpoint)] = correction end end end end end end - diff --git a/gems/aws-sdk-core/spec/aws/plugins/retries/clock_skew_spec.rb b/gems/aws-sdk-core/spec/aws/plugins/retries/clock_skew_spec.rb index cb15af005e0..d64b53f7256 100644 --- a/gems/aws-sdk-core/spec/aws/plugins/retries/clock_skew_spec.rb +++ b/gems/aws-sdk-core/spec/aws/plugins/retries/clock_skew_spec.rb @@ -6,12 +6,20 @@ module Aws module Plugins describe Retries::ClockSkew do + ## + # @param endpoint [URI, String] the endpoint the request points at + # @param server_time [String, nil] the timestamp returned by the server + def build_context(endpoint, server_time) + response = double('response', headers: { 'date' => server_time }) + request = double('request', endpoint: endpoint) + + double('context', http_response: response, http_request: request) + end + subject { Retries::ClockSkew.new } - let(:endpoint) { 'example_endpoint' } - let(:context) { double('context', http_response: response, http_request: request) } - let(:response) { double('http_response', headers: headers) } - let(:request) { double('http_request', endpoint: endpoint) } - let(:headers) { {'date' => server_time} } + + let(:endpoint) { 'example_endpoint' } + let(:context) { build_context(endpoint, server_time) } let(:server_time) { Time.now.utc.to_s } describe '#initialize' do @@ -24,6 +32,14 @@ module Plugins end end + describe '#clock_correction' do + [nil, '', ' '].each do |value| + it "handles #{value.inspect} as endpoint" do + expect(subject.clock_correction(value)).to eq(0) + end + end + end + describe '#clock_skewed?' do context 'server time is not set' do let(:server_time) { nil } @@ -46,7 +62,17 @@ module Plugins end end + describe '#estimated_skew' do + [nil, '', ' '].each do |value| + it "handles #{value.inspect} as endpoint" do + expect(subject.estimated_skew(value)).to be_nil + end + end + end + describe '#update_clock_correction' do + let(:server_time) { (Time.now.utc + 1000).to_s } + context 'server time is not set' do let(:server_time) { nil } it 'does not update the corrections' do @@ -64,20 +90,140 @@ module Plugins end context 'server time is off by more than the threshold' do - let(:server_time) { (Time.now.utc + 1000).to_s } it 'updates the corrections' do subject.update_clock_correction(context) expect(subject.clock_correction(endpoint)).to be_within(5).of(1000) end - it 'does not update corrections for other end points' do + it 'does not update corrections for other endpoints' do subject.update_clock_correction(context) expect(subject.clock_correction('other_endpoint')).to be 0 end end + + context 'with malformed URI endpoints' do + let(:malformed_endpoints) { ['ht!tp://invalid', '://missing-scheme', 'https://[invalid-ipv6'] } + + before do + malformed_endpoints.each do |endpoint| + subject.update_clock_correction(build_context(endpoint, server_time)) + end + end + + it 'handles malformed URIs gracefully' do + malformed_endpoints.each do |endpoint| + expect(subject.clock_correction(endpoint)).to be_within(5).of(1000) + end + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_clock_corrections).keys).to match_array(malformed_endpoints) + end + end + + context 'with incomplete URIs as separate endpoints' do + let(:incomplete_endpoints) { ['/just/a/path', 'bare-hostname', 'example.com'] } + + before do + incomplete_endpoints.each do |endpoint| + subject.update_clock_correction(build_context(endpoint, server_time)) + end + end + + it 'handles incomplete URIs as separate endpoints' do + incomplete_endpoints.each do |endpoint| + expect(subject.clock_correction(endpoint)).to be_within(5).of(1000) + end + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_clock_corrections).keys) + .to match_array(incomplete_endpoints) + end + end + + context 'with default ports' do + let(:port_variations) do + [ + 'https://s3.amazonaws.com/bucket/key', + URI('https://s3.amazonaws.com:443/bucket/other'), + 'http://example.com/path', + URI('http://example.com:80/different') + ] + end + + let(:tracked_endpoints) { %w[https://s3.amazonaws.com:443 http://example.com:80] } + + before do + port_variations.each do |endpoint| + subject.update_clock_correction(build_context(endpoint, server_time)) + end + end + + it 'tracks to correct correction for other endpoint per server', :aggregate_failures do + expect(subject.clock_correction('https://s3.amazonaws.com/new/path')).to be_within(5).of(1000) + expect(subject.clock_correction('http://example.com/new/path')).to be_within(5).of(1000) + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_clock_corrections).keys).to match_array(tracked_endpoints) + end + end + + context 'with IPv6 addresses' do + let(:ipv6_endpoints) do + [ + 'https://[2001:db8::1]/path', + URI('https://[2001:db8::1]/other/path'), + 'https://[2001:db8::1]:8080/different', + URI('https://[2001:db8::1]:8080/very/different') + ] + end + + let(:tracked_endpoints) { ['https://[2001:db8::1]:443', 'https://[2001:db8::1]:8080'] } + + before do + ipv6_endpoints.each do |endpoint| + subject.update_clock_correction(build_context(endpoint, server_time)) + end + end + + it 'tracks to correct correction for other endpoint per server', :aggregate_failures do + expect(subject.clock_correction('https://[2001:db8::1]/new')).to be_within(5).of(1000) + expect(subject.clock_correction('https://[2001:db8::1]:8080/new')).to be_within(5).of(1000) + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_clock_corrections).keys).to match_array(tracked_endpoints) + end + end + + context 'with case variations in scheme and host' do + let(:case_endpoints) { ['HTTP://EXAMPLE.COM/path', 'http://EXAMPLE.COM/other', 'http://example.com/path'] } + + before do + case_endpoints.each do |endpoint| + subject.update_clock_correction(build_context(endpoint, server_time)) + end + end + + it 'handles the endpoints correctly' do + case_endpoints.each do |endpoint| + expect(subject.clock_correction(endpoint)).to be_within(5).of(1000) + end + end + + it 'treats them as different endpoints while it consolidates the scheme' do + expect(subject.instance_variable_get(:@endpoint_clock_corrections).keys).to contain_exactly( + 'http://EXAMPLE.COM:80', 'http://example.com:80' + ) + end + end end describe '#update_estimated_skew' do + let(:server_time) { (Time.now.utc + 1000).to_s } + context 'server time is not set' do let(:server_time) { nil } it 'does not update the skew' do @@ -86,12 +232,142 @@ module Plugins end end - context 'server time is set' do - let(:server_time) { (Time.now.utc + 1000).to_s } + context 'server time matches the clients time' do + let(:server_time) { Time.now.utc.to_s } + it 'updates the skew' do + subject.update_estimated_skew(context) + expect(subject.estimated_skew(endpoint)).to be_a(Float) + end + end + + context 'server time is off by more than the threshold' do it 'updates the skew' do subject.update_estimated_skew(context) expect(subject.estimated_skew(endpoint)).to be_within(5).of(1000) end + + it 'does not update the skew for other endpoints' do + subject.update_estimated_skew(context) + expect(subject.estimated_skew('other_endpoint')).to be nil + end + end + + context 'with malformed URI endpoints' do + let(:malformed_endpoints) { ['ht!tp://invalid', '://missing-scheme', 'https://[invalid-ipv6'] } + + before do + malformed_endpoints.each do |endpoint| + subject.update_estimated_skew(build_context(endpoint, server_time)) + end + end + + it 'handles malformed URIs gracefully' do + malformed_endpoints.each do |endpoint| + expect(subject.estimated_skew(endpoint)).to be_within(5).of(1000) + end + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_estimated_skews).keys).to match_array(malformed_endpoints) + end + end + + context 'with incomplete URIs as separate endpoints' do + let(:incomplete_endpoints) { ['/just/a/path', 'bare-hostname', 'example.com'] } + + before do + incomplete_endpoints.each do |endpoint| + subject.update_estimated_skew(build_context(endpoint, server_time)) + end + end + + it 'handles incomplete URIs as separate endpoints' do + incomplete_endpoints.each do |endpoint| + expect(subject.estimated_skew(endpoint)).to be_within(5).of(1000) + end + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_estimated_skews).keys).to match_array(incomplete_endpoints) + end + end + + context 'with default ports' do + let(:port_variations) do + [ + 'https://s3.amazonaws.com/bucket/key', + URI('https://s3.amazonaws.com:443/bucket/other'), + 'http://example.com/path', + URI('http://example.com:80/different') + ] + end + + let(:tracked_endpoints) { %w[https://s3.amazonaws.com:443 http://example.com:80] } + + before do + port_variations.each do |endpoint| + subject.update_estimated_skew(build_context(endpoint, server_time)) + end + end + + it 'tracks to correct correction for other endpoint per server', :aggregate_failures do + expect(subject.estimated_skew('https://s3.amazonaws.com/new/path')).to be_within(5).of(1000) + expect(subject.estimated_skew('http://example.com/new/path')).to be_within(5).of(1000) + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_estimated_skews).keys).to match_array(tracked_endpoints) + end + end + + context 'with IPv6 addresses' do + let(:ipv6_endpoints) do + [ + 'https://[2001:db8::1]/path', + URI('https://[2001:db8::1]/other/path'), + 'https://[2001:db8::1]:8080/different', + URI('https://[2001:db8::1]:8080/very/different') + ] + end + + let(:tracked_endpoints) { ['https://[2001:db8::1]:443', 'https://[2001:db8::1]:8080'] } + + before do + ipv6_endpoints.each do |endpoint| + subject.update_estimated_skew(build_context(endpoint, server_time)) + end + end + + it 'tracks to correct correction for other endpoint per server', :aggregate_failures do + expect(subject.estimated_skew('https://[2001:db8::1]/new')).to be_within(5).of(1000) + expect(subject.estimated_skew('https://[2001:db8::1]:8080/new')).to be_within(5).of(1000) + end + + it 'tracks the endpoints as is' do + expect(subject.instance_variable_get(:@endpoint_estimated_skews).keys).to match_array(tracked_endpoints) + end + end + + context 'with case variations in scheme and host' do + let(:case_endpoints) { ['HTTP://EXAMPLE.COM/path', 'http://EXAMPLE.COM/other', 'http://example.com/path'] } + + before do + case_endpoints.each do |endpoint| + subject.update_estimated_skew(build_context(endpoint, server_time)) + end + end + + it 'handles the endpoints correctly' do + case_endpoints.each do |endpoint| + expect(subject.estimated_skew(endpoint)).to be_within(5).of(1000) + end + end + + it 'treats them as different endpoints while it consolidates the scheme' do + expect(subject.instance_variable_get(:@endpoint_estimated_skews).keys).to contain_exactly( + 'http://EXAMPLE.COM:80', 'http://example.com:80' + ) + end end end end From 33b9c34914cf1a705c4695510707dfe3dd8c1248 Mon Sep 17 00:00:00 2001 From: Johannes Vetter Date: Thu, 1 Jan 2026 17:29:34 -0800 Subject: [PATCH 2/2] Add CHANGELOG entry for memory leak fix --- gems/aws-sdk-core/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gems/aws-sdk-core/CHANGELOG.md b/gems/aws-sdk-core/CHANGELOG.md index 2281a6ec34d..856a364f38c 100644 --- a/gems/aws-sdk-core/CHANGELOG.md +++ b/gems/aws-sdk-core/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Issue - Fix memory leak in ClockSkew retry plugin by normalizing endpoints to prevent unlimited hash growth. + 3.240.0 (2025-12-16) ------------------