diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 022894d6..dacee06f 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -1,7 +1,9 @@ # frozen_string_literal: false -require 'socketry' +require 'socket' +require 'openssl' require 'uri' +require 'timeout' module SplitIoClient module SSE @@ -36,12 +38,12 @@ def initialize(config, def close(status = nil) unless connected? - @config.logger.error('SSEClient already disconected.') if @config.debug_enabled + @config.logger.debug('SSEClient already disconected.') return end @connected.make_false - @socket&.close + @socket.close push_status(status) rescue StandardError => e @config.logger.error("SSEClient close Error: #{e.inspect}") @@ -74,42 +76,63 @@ def connected? def connect_thread(latch) @config.threads[:connect_stream] = Thread.new do - @config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled + @config.logger.info('Starting connect_stream thread ...') new_status = connect_stream(latch) push_status(new_status) - @config.logger.info('connect_stream thread finished.') if @config.debug_enabled + @config.logger.info('connect_stream thread finished.') end end def connect_stream(latch) return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch) - while connected? || @first_event.value - begin - partial_data = @socket.readpartial(10_000, timeout: @read_timeout) - - read_first_event(partial_data, latch) - - raise 'eof exception' if partial_data == :eof - rescue Errno::EBADF, IOError => e - @config.logger.error(e.inspect) if @config.debug_enabled - return nil - rescue StandardError => e - return nil if ENV['SPLITCLIENT_ENV'] == 'test' - - @config.logger.error("Error reading partial data: #{e.inspect}") if @config.debug_enabled + if IO.select([@socket], nil, nil, @read_timeout) + begin + partial_data = @socket.readpartial(10_000) + read_first_event(partial_data, latch) + + raise 'eof exception' if partial_data == :eof + rescue IO::WaitReadable => e + @config.logger.debug("SSE client transient error: #{e.inspect}") + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::ETIMEDOUT => e + @config.logger.error("SSE read operation timed out!: #{e.inspect}") + return Constants::PUSH_RETRYABLE_ERROR + rescue EOFError => e + @config.logger.error("SSE read operation EOF Exception!: #{e.inspect}") + raise 'eof exception' + rescue Errno::EAGAIN => e + puts "transient error" + @config.logger.debug("SSE client transient error: #{e.inspect}") + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::EBADF, IOError => e + @config.logger.error("SSE read operation EBADF or IOError: #{e.inspect}") + return nil + rescue StandardError => e + @config.logger.error("SSE read operation StandardError: #{e.inspect}") + return nil if ENV['SPLITCLIENT_ENV'] == 'test' + + @config.logger.error("Error reading partial data: #{e.inspect}") + return Constants::PUSH_RETRYABLE_ERROR + end + else + @config.logger.error("SSE read operation timed out, no data available.") return Constants::PUSH_RETRYABLE_ERROR end process_data(partial_data) end + @config.logger.info("SSE read operation exited: #{connected?}") + nil end def socket_write(latch) @first_event.make_true @socket = socket_connect - @socket.write(build_request(@uri)) + @socket.puts(build_request(@uri)) true rescue StandardError => e @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") @@ -130,6 +153,7 @@ def read_first_event(data, latch) if response_code == OK_CODE && !error_event @connected.make_true + @config.logger.debug("SSE client first event Connected is true") @telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil) push_status(Constants::PUSH_CONNECTED) end @@ -138,15 +162,37 @@ def read_first_event(data, latch) end def socket_connect - return Socketry::SSL::Socket.connect(@uri.host, @uri.port) if @uri.scheme.casecmp('https').zero? + tcp_socket = TCPSocket.new(@uri.host, @uri.port) + if @uri.scheme.casecmp('https').zero? + begin + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) + ssl_socket.hostname = @uri.host + + begin + ssl_socket.connect_nonblock + rescue IO::WaitReadable + IO.select([ssl_socket]) + retry + rescue IO::WaitWritable + IO.select(nil, [ssl_socket]) + retry + end + return ssl_socket + + rescue Exception => e + @config.logger.error("socket connect error: #{e.inspect}") + return nil + end + end - Socketry::TCP::Socket.connect(@uri.host, @uri.port) + tcp_socket end def process_data(partial_data) + @config.logger.debug("Event partial data: #{partial_data}") return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE - @config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled events = @event_parser.parse(partial_data) events.each { |event| process_event(event) } rescue StandardError => e @@ -162,7 +208,7 @@ def build_request(uri) req << "SplitSDKMachineName: #{@config.machine_name}\r\n" req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil? req << "Cache-Control: no-cache\r\n\r\n" - @config.logger.debug("Request info: #{req}") if @config.debug_enabled + @config.logger.debug("Request info: #{req}") req end diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index 1afbdd0a..015056f0 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '8.9.0' + VERSION = '8.10.0-rc5' end diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 1c199b09..9498632c 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -2,6 +2,7 @@ require 'spec_helper' require 'http_server_mock' +require 'rspec/mocks' describe SplitIoClient::SSE::EventSource::Client do subject { SplitIoClient::SSE::EventSource::Client } @@ -221,6 +222,36 @@ end end + it 'client timeout and reconnect' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=-1&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":-1,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=5564531221&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":5564531221,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + + mock_server do |server| + start_workers + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue, read_timeout: 0.1) + connected = sse_client.start(server.base_uri) + sleep 1 + expect(connected).to eq(true) + expect(sse_client.connected?).to eq(true) + expect(push_status_queue.pop(true)).to eq(SplitIoClient::Constants::PUSH_CONNECTED) + sleep 3 + expect(log.string).to include 'SSE read operation timed out, no data available' + expect(sse_client.connected?).to eq(true) + sse_client.close + expect(sse_client.connected?).to eq(false) + + stop_workers + end + end + it 'first event - when server return 400' do mock_server do |server| server.setup_response('/') do |_, res| @@ -236,6 +267,72 @@ stop_workers end end + + it 'test exceptions' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + start_workers + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::ETIMEDOUT) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation timed out!' + + allow(sse_client).to receive(:read_first_event).and_raise(EOFError) + expect { sse_client.send(:connect_stream, latch) }.to raise_error(RuntimeError) + expect(log.string).to include 'SSE read operation EOF Exception!' + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::EBADF) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation EBADF or IOError' + + allow(sse_client).to receive(:read_first_event).and_raise(IOError) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation EBADF or IOError' + + allow(sse_client).to receive(:read_first_event).and_raise(StandardError) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation StandardError:' + + stop_workers + end + end + + it 'test retry with EAGAIN and IO::WaitReadable exceptions' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + start_workers + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::EAGAIN) + thr1 = Thread.new do + sse_client.send(:connect_stream, latch) + end + allow(sse_client).to receive(:read_first_event).and_return(true) + expect(thr1.status).to eq('run') + + allow(sse_client).to receive(:read_first_event).and_raise(IO::WaitReadable) + thr2 = Thread.new do + sse_client.send(:connect_stream, latch) + end + allow(sse_client).to receive(:read_first_event).and_return(true) + expect(thr2.status).to eq('run') + + stop_workers + end + end end private diff --git a/splitclient-rb.gemspec b/splitclient-rb.gemspec index 3c818df9..742c6d3f 100644 --- a/splitclient-rb.gemspec +++ b/splitclient-rb.gemspec @@ -59,6 +59,5 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'lru_redux', '~> 1.1' spec.add_runtime_dependency 'net-http-persistent', '>= 2.9', '< 5.0' spec.add_runtime_dependency 'redis', '>= 4.0.0', '< 6.0' - spec.add_runtime_dependency 'socketry', '>= 0.4', '< 1.0' spec.add_runtime_dependency 'thread_safe', '~> 0.3' end