diff --git a/google-apis-core/lib/google/apis/core/base_service.rb b/google-apis-core/lib/google/apis/core/base_service.rb index 22ee6553ce4..c19a27fd02b 100644 --- a/google-apis-core/lib/google/apis/core/base_service.rb +++ b/google-apis-core/lib/google/apis/core/base_service.rb @@ -350,6 +350,36 @@ def verify_universe_domain! true end + # Restarts An interrupted Resumable upload + # @param [String] bucket + # Name of the bucket where the upload is being performed. + # @param [IO, String] upload_source + # IO stream or filename containing content to upload + # @param [IO, String] upload_id + # unique id generated for an ongoing upload + + def restart_resumable_upload(bucket, upload_source, upload_id, options: nil) + command = make_storage_upload_command(:put, 'b/{bucket}/o', options) + command.upload_source = upload_source + command.upload_id = upload_id + command.params['bucket'] = bucket unless bucket.nil? + execute_or_queue_command(command) + end + + # Deletes An interrupted Resumable upload + # @param [String] bucket + # Name of the bucket where the upload is being performed. + # @param [IO, String] upload_id + # unique id generated for an ongoing upload + + def delete_resumable_upload(bucket, upload_id, options: nil) + command = make_storage_upload_command(:delete, 'b/{bucket}/o', options) + command.upload_id = upload_id + command.params['bucket'] = bucket unless bucket.nil? + command.delete_upload = options[:delete_upload] unless options[:delete_upload].nil? + execute_or_queue_command(command) + end + protected # Create a new upload command. diff --git a/google-apis-core/lib/google/apis/core/storage_upload.rb b/google-apis-core/lib/google/apis/core/storage_upload.rb index bff1b88f60c..365beaf9e5f 100644 --- a/google-apis-core/lib/google/apis/core/storage_upload.rb +++ b/google-apis-core/lib/google/apis/core/storage_upload.rb @@ -49,6 +49,14 @@ class StorageUploadCommand < ApiCommand # @return [Integer] attr_accessor :upload_chunk_size + # Unique upload_id of a resumable upload + # @return [String] + attr_accessor :upload_id + + # Boolean Value to specify is a resumable upload is to be deleted or not + # @return [Boolean] + attr_accessor :delete_upload + # Ensure the content is readable and wrapped in an IO instance. # # @return [void] @@ -61,7 +69,6 @@ def prepare! # asserting that it already has a body. Form encoding is never used # by upload requests. self.body = '' unless self.body - super if streamable?(upload_source) self.upload_io = upload_source @@ -73,6 +80,8 @@ def prepare! self.upload_content_type = type&.content_type end @close_io_on_finish = true + elsif !upload_id.nil? && delete_upload + @close_io_on_finish = false else fail Google::Apis::ClientError, 'Invalid upload source' end @@ -80,7 +89,7 @@ def prepare! # Close IO stream when command done. Only closes the stream if it was opened by the command. def release! - upload_io.close if @close_io_on_finish + upload_io.close if @close_io_on_finish && !upload_io.nil? end # Execute the command, retrying as necessary @@ -96,8 +105,16 @@ def execute(client) prepare! opencensus_begin_span @upload_chunk_size = options.upload_chunk_size + if upload_id.nil? + res = do_retry :initiate_resumable_upload, client + elsif delete_upload && !upload_id.nil? + construct_resumable_upload_url upload_id + res = do_retry :cancel_resumable_upload, client + else + construct_resumable_upload_url upload_id + res = do_retry :reinitiate_resumable_upload, client + end - do_retry :initiate_resumable_upload, client while @upload_incomplete res = do_retry :send_upload_command, client end @@ -131,6 +148,22 @@ def initiate_resumable_upload(client) error(e, rethrow: true) end + # Reinitiating resumable upload + def reinitiate_resumable_upload(client) + logger.debug { sprintf('Restarting resumable upload command to %s', url) } + check_resumable_upload client + upload_io.pos = @offset + end + + # Making resumable upload url from upload_id + def construct_resumable_upload_url(upload_id) + query_params = query.dup + query_params['uploadType'] = RESUMABLE + query_params['upload_id'] = upload_id + resumable_upload_params = query_params.map { |key, value| "#{key}=#{value}" }.join('&') + @upload_url = "#{url}&#{resumable_upload_params}" + end + # Send the actual content # # @param [HTTPClient] client @@ -160,6 +193,9 @@ def send_upload_command(client) @offset += current_chunk_size if @upload_incomplete success(result) rescue => e + logger.warn { + "error occured please use uploadId-#{response.headers['X-GUploader-UploadID']} to resume your upload" + } unless response.nil? upload_io.pos = @offset error(e, rethrow: true) end @@ -182,6 +218,59 @@ def process_response(status, header, body) super(status, header, body) end + def check_resumable_upload(client) + # Setting up request header + request_header = header.dup + request_header[CONTENT_RANGE_HEADER] = "bytes */#{upload_io.size}" + request_header[CONTENT_LENGTH_HEADER] = '0' + # Initiating call + response = client.put(@upload_url, header: request_header, follow_redirect: true) + handle_resumable_upload_http_response_codes(response) + end + + # Cancel resumable upload + def cancel_resumable_upload(client) + # Setting up request header + request_header = header.dup + request_header[CONTENT_LENGTH_HEADER] = '0' + # Initiating call + response = client.delete(@upload_url, header: request_header, follow_redirect: true) + handle_resumable_upload_http_response_codes(response) + + if !@upload_incomplete && (400..499).include?(response.code.to_i) + @close_io_on_finish = true + true # method returns true if upload is successfully cancelled + else + logger.debug { sprintf("Failed to cancel upload session. Response: #{response.code} - #{response.body}") } + end + + end + + def handle_resumable_upload_http_response_codes(response) + code = response.code.to_i + + case code + when 308 + if response.headers['Range'] + range = response.headers['Range'] + @offset = range.split('-').last.to_i + 1 + logger.debug { sprintf("Upload is incomplete. Bytes uploaded so far: #{range}") } + else + logger.debug { sprintf('No bytes uploaded yet.') } + end + @upload_incomplete = true + when 400..499 + # Upload is canceled + @upload_incomplete = false + when 200, 201 + # Upload is complete. + @upload_incomplete = false + else + logger.debug { sprintf("Unexpected response: #{response.code} - #{response.body}") } + @upload_incomplete = true + end + end + def streamable?(upload_source) upload_source.is_a?(IO) || upload_source.is_a?(StringIO) || upload_source.is_a?(Tempfile) end diff --git a/google-apis-core/lib/google/apis/options.rb b/google-apis-core/lib/google/apis/options.rb index b5a0c99f4d3..f1ac437c9ed 100644 --- a/google-apis-core/lib/google/apis/options.rb +++ b/google-apis-core/lib/google/apis/options.rb @@ -41,7 +41,8 @@ module Apis :quota_project, :query, :add_invocation_id_header, - :upload_chunk_size) + :upload_chunk_size + ) # General client options class ClientOptions diff --git a/google-apis-core/spec/google/apis/core/service_spec.rb b/google-apis-core/spec/google/apis/core/service_spec.rb index 53167a12700..0710d30a0d8 100644 --- a/google-apis-core/spec/google/apis/core/service_spec.rb +++ b/google-apis-core/spec/google/apis/core/service_spec.rb @@ -225,6 +225,95 @@ include_examples 'with options' end + context 'when making restart resumable upload' do + let(:bucket_name) { 'test_bucket' } + let(:file) { StringIO.new('Hello world' * 3) } + + let(:upload_id) { 'foo' } + let(:command) do + service.send( + :restart_resumable_upload, + bucket_name, file, upload_id, + options: { upload_chunk_size: 11} + ) + end + let(:upload_url) { "https://www.googleapis.com/upload/b/#{bucket_name}/o?uploadType=resumable&upload_id=#{upload_id}"} + context 'should complete the upload' do + before(:example) do + stub_request(:put, upload_url) + .with( + headers: { + 'Content-Length' => '0', + 'Content-Range' => 'bytes */33' + } + ) + .to_return( + status: [308, 'Resume Incomplete'], + headers: { 'Range' => 'bytes=0-21' } + ) + end + + before(:example) do + stub_request(:put, upload_url) + .with(headers: { 'Content-Range' => 'bytes 22-32/33' }) + .to_return(body: %(OK)) + end + + it 'should send request to upload url multiple times' do + command + expect(a_request(:put, upload_url)).to have_been_made.twice + end + end + context 'not restart resumable upload if upload is completed' do + before(:example) do + stub_request(:put, upload_url) + .with( + headers: { + 'Content-Length' => '0', + 'Content-Range' => 'bytes */33' + } + ) + .to_return(status: 200, headers: { 'Range' => 'bytes=0-32' }) + end + + before(:example) do + stub_request(:put, upload_url) + .with(headers: { 'Content-Range' => 'bytes */33' }) + .to_return(status: 200) + end + + it 'should not restart a upload' do + command + expect(a_request(:put, upload_url)).to have_been_made + end + end + end + + context 'delete resumable upload with upload_id' do + let(:bucket_name) { 'test_bucket' } + let(:upload_id) { 'foo' } + let(:command) do + service.send( + :delete_resumable_upload, + bucket_name, upload_id, + options: { upload_chunk_size: 11, delete_upload: true } + ) + end + + let(:upload_url) { "https://www.googleapis.com/upload/b/#{bucket_name}/o?uploadType=resumable&upload_id=#{upload_id}" } + before(:example) do + stub_request(:delete, upload_url) + .with(headers: { 'Content-Length' => '0' }) + .to_return(status: [499]) + end + + it 'should cancel a resumable upload' do + command + expect(a_request(:delete, upload_url)).to have_been_made + expect(command).to be_truthy + end + end + context 'with batch' do before(:example) do response = < 'bytes 11-21/22' }) .to_return(body: %(OK)) - end + end it 'should make requests multiple times' do command.options.upload_chunk_size = 11 @@ -129,6 +129,114 @@ end end + context('restart resumable upload with upload_url') do + let(:file) { StringIO.new('Hello world' * 3) } + let(:upload_id) { 'TestId' } + let(:upload_url) { "https://www.googleapis.com/zoo/animals?uploadType=resumable&upload_id=#{upload_id}" } + + before(:example) do + stub_request(:put, upload_url) + .with( + headers: { + 'Content-Length' => '0', + 'Content-Range' => 'bytes */33' + } + ) + .to_return( + status: [308, 'Resume Incomplete'], + headers: { 'Range' => 'bytes=0-21' } + ) + end + + before(:example) do + stub_request(:put, upload_url) + .with(headers: { 'Content-Range' => 'bytes 22-32/33' }) + .to_return(body: %(OK)) + end + + it 'should restart a resumable upload' do + command.options.upload_chunk_size = 11 + command.upload_id = upload_id + command.execute(client) + expect(a_request(:put, upload_url) + .with(body: 'Hello world')).to have_been_made + end + end + + context('should not restart resumable upload if upload is completed') do + let(:file) { StringIO.new('Hello world' * 3) } + let(:upload_id) { 'TestId' } + let(:upload_url) { "https://www.googleapis.com/zoo/animals?uploadType=resumable&upload_id=#{upload_id}" } + + before(:example) do + stub_request(:put, upload_url) + .with( + headers: { + 'Content-Length' => '0', + 'Content-Range' => 'bytes */33' + } + ) + .to_return(status: 200, headers: { 'Range' => 'bytes=0-32' }) + end + + before(:example) do + stub_request(:put, upload_url) + .with(headers: { 'Content-Range' => 'bytes */33' }) + .to_return(status: 200) + end + + it 'should not restart a upload' do + command.options.upload_chunk_size = 11 + command.upload_id = upload_id + + command.execute(client) + expect(a_request(:put, upload_url) + .with(body: 'Hello world')).to have_not_been_made + end + end + + context('delete resumable upload with upload_id') do + let(:file) { StringIO.new('Hello world' * 3) } + let(:upload_id) { 'TestId' } + let(:upload_url) { "https://www.googleapis.com/zoo/animals?uploadType=resumable&upload_id=#{upload_id}" } + + + before(:example) do + stub_request(:delete, upload_url) + .with(headers: { 'Content-Length' => '0' }) + .to_return(status: [499]) + end + + before(:example) do + stub_request(:put, upload_url) + .with( + headers: { + 'Content-Length' => '0', + 'Content-Range' => 'bytes */33' + } + ) + .to_return(status: [499]) + end + + it 'should cancel a resumable upload' do + command.options.upload_chunk_size = 11 + command.upload_id = upload_id + command.delete_upload = true + command.execute(client) + expect(a_request(:delete, upload_url)).to have_been_made + expect(command).to be_truthy + end + + it 'should not call resumable upload when upload is cancelled' do + command.options.upload_chunk_size = 11 + command.upload_id = upload_id + command.delete_upload = true + command.execute(client) + expect(a_request(:put, upload_url) + .with(body: 'Hello world')).to have_not_been_made + end + end + context('with chunking disabled') do let!(:file) { StringIO.new("Hello world")} include_examples 'should upload'