Skip to content

Commit bd71cfc

Browse files
authored
Merge pull request #596 from splitio/sseclient-socket
Sseclient socket
2 parents f121022 + 1b0dab4 commit bd71cfc

File tree

8 files changed

+218
-31
lines changed

8 files changed

+218
-31
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ jobs:
3535
with:
3636
fetch-depth: 0
3737

38+
- name: Set up Java
39+
uses: actions/setup-java@v2
40+
with:
41+
java-version: 17
42+
distribution: "temurin"
43+
3844
- name: Setup Ruby ${{ matrix.version }}
3945
uses: ruby/setup-ruby@v1
4046
with:
@@ -57,7 +63,7 @@ jobs:
5763

5864
- name: SonarQube Scan (Push)
5965
if: matrix.version == '3.2.2' && github.event_name == 'push'
60-
uses: SonarSource/sonarcloud-github-action@v1.9
66+
uses: SonarSource/sonarcloud-github-action@v5.0.0
6167
env:
6268
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
6369
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -69,13 +75,14 @@ jobs:
6975
7076
- name: SonarQube Scan (Pull Request)
7177
if: matrix.version == '3.2.2' && github.event_name == 'pull_request'
72-
uses: SonarSource/sonarcloud-github-action@v1.9
78+
uses: SonarSource/sonarcloud-github-action@v5.0.0
7379
env:
7480
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
7581
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
7682
with:
7783
projectBaseDir: .
7884
args: >
85+
-Dsonar.java.source=17
7986
-Dsonar.host.url=${{ secrets.SONARQUBE_HOST }}
8087
-Dsonar.projectVersion=${{ env.VERSION }}
8188
-Dsonar.pullrequest.key=${{ github.event.pull_request.number }}

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CHANGES
22

3+
8.10.0 (Nov 28, 2025)
4+
- Replaced socketry gem used in streaming feature with built-in socket lib.
5+
36
8.9.0 (Oct 8, 2025)
47
- Added new configuration for Fallback Treatments, which allows setting a treatment value and optional config to be returned in place of "control", either globally or by flag. Read more in our docs.
58

lib/splitclient-rb/sse/event_source/client.rb

Lines changed: 84 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# frozen_string_literal: false
22

3-
require 'socketry'
3+
require 'socket'
4+
require 'openssl'
45
require 'uri'
6+
require 'timeout'
57

68
module SplitIoClient
79
module SSE
@@ -36,12 +38,15 @@ def initialize(config,
3638

3739
def close(status = nil)
3840
unless connected?
39-
@config.logger.error('SSEClient already disconected.') if @config.debug_enabled
41+
@config.logger.debug('SSEClient already disconected.')
4042
return
4143
end
44+
@config.logger.debug("Closing SSEClient socket")
4245

4346
@connected.make_false
44-
@socket&.close
47+
@socket.sync_close = true if @socket.is_a? OpenSSL::SSL::SSLSocket
48+
@socket.close
49+
@config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket
4550
push_status(status)
4651
rescue StandardError => e
4752
@config.logger.error("SSEClient close Error: #{e.inspect}")
@@ -55,7 +60,6 @@ def start(url)
5560

5661
@uri = URI(url)
5762
latch = Concurrent::CountDownLatch.new(1)
58-
5963
connect_thread(latch)
6064

6165
return false unless latch.wait(CONNECT_TIMEOUT)
@@ -74,42 +78,73 @@ def connected?
7478

7579
def connect_thread(latch)
7680
@config.threads[:connect_stream] = Thread.new do
77-
@config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled
81+
@config.logger.info('Starting connect_stream thread ...')
7882
new_status = connect_stream(latch)
7983
push_status(new_status)
80-
@config.logger.info('connect_stream thread finished.') if @config.debug_enabled
84+
@config.logger.info('connect_stream thread finished.')
8185
end
8286
end
8387

8488
def connect_stream(latch)
8589
return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch)
86-
8790
while connected? || @first_event.value
88-
begin
89-
partial_data = @socket.readpartial(10_000, timeout: @read_timeout)
90-
91-
read_first_event(partial_data, latch)
92-
93-
raise 'eof exception' if partial_data == :eof
94-
rescue Errno::EBADF, IOError => e
95-
@config.logger.error(e.inspect) if @config.debug_enabled
96-
return nil
97-
rescue StandardError => e
98-
return nil if ENV['SPLITCLIENT_ENV'] == 'test'
99-
100-
@config.logger.error("Error reading partial data: #{e.inspect}") if @config.debug_enabled
101-
return Constants::PUSH_RETRYABLE_ERROR
91+
begin
92+
if IO.select([@socket], nil, nil, @read_timeout)
93+
begin
94+
partial_data = @socket.readpartial(10_000)
95+
read_first_event(partial_data, latch)
96+
97+
raise 'eof exception' if partial_data == :eof
98+
rescue IO::WaitReadable => e
99+
@config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}")
100+
IO.select([@socket], nil, nil, @read_timeout)
101+
retry
102+
rescue Errno::EAGAIN => e
103+
@config.logger.debug("SSE client transient error: #{e.inspect}")
104+
IO.select([@socket], nil, nil, @read_timeout)
105+
retry
106+
rescue Errno::ETIMEDOUT => e
107+
@config.logger.error("SSE read operation timed out!: #{e.inspect}")
108+
return Constants::PUSH_RETRYABLE_ERROR
109+
rescue EOFError => e
110+
puts "SSE read operation EOF Exception!: #{e.inspect}"
111+
@config.logger.error("SSE read operation EOF Exception!: #{e.inspect}")
112+
raise 'eof exception'
113+
rescue Errno::EBADF, IOError => e
114+
@config.logger.error("SSE read operation EBADF or IOError: #{e.inspect}")
115+
return Constants::PUSH_RETRYABLE_ERROR
116+
rescue StandardError => e
117+
@config.logger.error("SSE read operation StandardError: #{e.inspect}")
118+
return nil if ENV['SPLITCLIENT_ENV'] == 'test'
119+
120+
@config.logger.error("Error reading partial data: #{e.inspect}")
121+
return Constants::PUSH_RETRYABLE_ERROR
122+
end
123+
else
124+
@config.logger.error("SSE read operation timed out, no data available.")
125+
return Constants::PUSH_RETRYABLE_ERROR
126+
end
127+
rescue Errno::EBADF
128+
@config.logger.debug("SSE socket is not connected (Errno::EBADF)")
129+
break
130+
rescue RuntimeError
131+
raise 'eof exception'
132+
rescue Exception => e
133+
@config.logger.debug("SSE socket is not connected: #{e.inspect}")
134+
break
102135
end
103136

104137
process_data(partial_data)
105138
end
139+
@config.logger.info("SSE read operation exited: #{connected?}")
140+
106141
nil
107142
end
108143

109144
def socket_write(latch)
110145
@first_event.make_true
111146
@socket = socket_connect
112-
@socket.write(build_request(@uri))
147+
@socket.puts(build_request(@uri))
113148
true
114149
rescue StandardError => e
115150
@config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}")
@@ -130,6 +165,7 @@ def read_first_event(data, latch)
130165

131166
if response_code == OK_CODE && !error_event
132167
@connected.make_true
168+
@config.logger.debug("SSE client first event Connected is true")
133169
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil)
134170
push_status(Constants::PUSH_CONNECTED)
135171
end
@@ -138,15 +174,37 @@ def read_first_event(data, latch)
138174
end
139175

140176
def socket_connect
141-
return Socketry::SSL::Socket.connect(@uri.host, @uri.port) if @uri.scheme.casecmp('https').zero?
177+
tcp_socket = TCPSocket.new(@uri.host, @uri.port)
178+
if @uri.scheme.casecmp('https').zero?
179+
begin
180+
ssl_context = OpenSSL::SSL::SSLContext.new
181+
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
182+
ssl_socket.hostname = @uri.host
183+
184+
begin
185+
ssl_socket.connect_nonblock
186+
rescue IO::WaitReadable
187+
IO.select([ssl_socket])
188+
retry
189+
rescue IO::WaitWritable
190+
IO.select(nil, [ssl_socket])
191+
retry
192+
end
193+
return ssl_socket
194+
195+
rescue Exception => e
196+
@config.logger.error("socket connect error: #{e.inspect}")
197+
return nil
198+
end
199+
end
142200

143-
Socketry::TCP::Socket.connect(@uri.host, @uri.port)
201+
tcp_socket
144202
end
145203

146204
def process_data(partial_data)
205+
@config.logger.debug("Event partial data: #{partial_data}")
147206
return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE
148207

149-
@config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled
150208
events = @event_parser.parse(partial_data)
151209
events.each { |event| process_event(event) }
152210
rescue StandardError => e
@@ -162,7 +220,7 @@ def build_request(uri)
162220
req << "SplitSDKMachineName: #{@config.machine_name}\r\n"
163221
req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil?
164222
req << "Cache-Control: no-cache\r\n\r\n"
165-
@config.logger.debug("Request info: #{req}") if @config.debug_enabled
223+
@config.logger.debug("Request info: #{req}")
166224
req
167225
end
168226

lib/splitclient-rb/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module SplitIoClient
2-
VERSION = '8.9.0'
2+
VERSION = '8.10.0'
33
end

sonar-project.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ sonar.projectKey=ruby-client
22
sonar.projectKey=ruby-client
33
sonar.sources=lib
44
sonar.tests=spec
5+
sonar.java.source=17
56
sonar.ruby.coverage.reportPaths=coverage/.resultset.sonarqube.json

spec/sse/event_source/client_spec.rb

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'spec_helper'
44
require 'http_server_mock'
5+
require 'rspec/mocks'
56

67
describe SplitIoClient::SSE::EventSource::Client do
78
subject { SplitIoClient::SSE::EventSource::Client }
@@ -221,6 +222,36 @@
221222
end
222223
end
223224

225+
it 'client timeout and reconnect' do
226+
stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=-1&rbSince=-1')
227+
.with(headers: { 'Authorization' => 'Bearer client-spec-key' })
228+
.to_return(status: 200, body: '{"ff":{"d":[],"s":-1,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}')
229+
stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=5564531221&rbSince=-1')
230+
.with(headers: { 'Authorization' => 'Bearer client-spec-key' })
231+
.to_return(status: 200, body: '{"ff":{"d":[],"s":5564531221,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}')
232+
233+
mock_server do |server|
234+
start_workers
235+
server.setup_response('/') do |_, res|
236+
send_stream_content(res, event_split_update)
237+
end
238+
239+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue, read_timeout: 0.1)
240+
connected = sse_client.start(server.base_uri)
241+
sleep 1
242+
expect(connected).to eq(true)
243+
expect(sse_client.connected?).to eq(true)
244+
expect(push_status_queue.pop(true)).to eq(SplitIoClient::Constants::PUSH_CONNECTED)
245+
sleep 3
246+
expect(log.string).to include 'SSE read operation timed out, no data available'
247+
expect(sse_client.connected?).to eq(true)
248+
sse_client.close
249+
expect(sse_client.connected?).to eq(false)
250+
251+
stop_workers
252+
end
253+
end
254+
224255
it 'first event - when server return 400' do
225256
mock_server do |server|
226257
server.setup_response('/') do |_, res|
@@ -236,6 +267,95 @@
236267
stop_workers
237268
end
238269
end
270+
271+
it 'test exceptions' do
272+
mock_server do |server|
273+
server.setup_response('/') do |_, res|
274+
send_stream_content(res, event_split_update)
275+
end
276+
start_workers
277+
278+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
279+
280+
sse_client.instance_variable_set(:@uri, URI(server.base_uri))
281+
latch = Concurrent::CountDownLatch.new(1)
282+
283+
allow(sse_client).to receive(:read_first_event).and_raise(Errno::ETIMEDOUT)
284+
sse_client.send(:connect_stream, latch)
285+
expect(log.string).to include 'SSE read operation timed out!'
286+
287+
allow(sse_client).to receive(:read_first_event).and_raise(EOFError)
288+
expect { sse_client.send(:connect_stream, latch) }.to raise_error(RuntimeError)
289+
expect(log.string).to include 'SSE read operation EOF Exception!'
290+
291+
allow(sse_client).to receive(:read_first_event).and_raise(Errno::EBADF)
292+
sse_client.send(:connect_stream, latch)
293+
expect(log.string).to include 'SSE read operation EBADF or IOError'
294+
295+
allow(sse_client).to receive(:read_first_event).and_raise(IOError)
296+
sse_client.send(:connect_stream, latch)
297+
expect(log.string).to include 'SSE read operation EBADF or IOError'
298+
299+
allow(sse_client).to receive(:read_first_event).and_raise(StandardError)
300+
sse_client.send(:connect_stream, latch)
301+
expect(log.string).to include 'SSE read operation StandardError:'
302+
303+
stop_workers
304+
end
305+
end
306+
307+
it 'test retry with EAGAIN exceptions' do
308+
mock_server do |server|
309+
server.setup_response('/') do |_, res|
310+
send_stream_content(res, event_occupancy)
311+
end
312+
start_workers
313+
314+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
315+
316+
sse_client.instance_variable_set(:@uri, URI(server.base_uri))
317+
latch = Concurrent::CountDownLatch.new(1)
318+
319+
allow(sse_client).to receive(:read_first_event).and_raise(Errno::EAGAIN)
320+
sleep(1)
321+
thr1 = Thread.new do
322+
sse_client.send(:connect_stream, latch)
323+
end
324+
sleep(1)
325+
allow(sse_client).to receive(:read_first_event).and_return(true)
326+
expect(log.string).to include 'SSE client transient error'
327+
328+
stop_workers
329+
end
330+
end
331+
332+
it 'test retry with IO::WaitReadable exceptions' do
333+
log2 = StringIO.new
334+
config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2))
335+
336+
mock_server do |server|
337+
server.setup_response('/') do |_, res|
338+
send_stream_content(res, event_occupancy)
339+
end
340+
start_workers
341+
342+
sse_client2 = subject.new(config2, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
343+
344+
sse_client2.instance_variable_set(:@uri, URI(server.base_uri))
345+
latch = Concurrent::CountDownLatch.new(1)
346+
347+
allow(sse_client2).to receive(:read_first_event).and_raise(IO::EWOULDBLOCKWaitReadable)
348+
sleep(1)
349+
thr2 = Thread.new do
350+
sse_client2.send(:connect_stream, latch)
351+
end
352+
sleep(1)
353+
allow(sse_client2).to receive(:read_first_event).and_return(true)
354+
expect(log2.string).to include 'SSE client IO::WaitReadable transient error'
355+
356+
stop_workers
357+
end
358+
end
239359
end
240360

241361
private

spec/sse/sse_handler_spec.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848

4949
config.streaming_service_url = server.base_uri
5050
sse_handler = subject.new(config, splits_worker, segments_worker, sse_client)
51-
5251
connected = sse_handler.start('token-test', 'channel-test')
5352
expect(connected).to eq(true)
5453
expect(sse_handler.connected?).to eq(true)

0 commit comments

Comments
 (0)