From 767c59d7aaa1ca35b8f343f6348418de930312a4 Mon Sep 17 00:00:00 2001 From: Joseph Lewis III Date: Fri, 7 Sep 2018 09:26:11 -0700 Subject: [PATCH] Only keep failed rows (#40) * add ability to keep only failed rows * cleanup --- CHANGELOG.md | 5 ++ docs/index.asciidoc | 15 +++++ lib/logstash/outputs/bigquery/streamclient.rb | 67 ++++++++++++++----- lib/logstash/outputs/google_bigquery.rb | 8 ++- logstash-output-google_bigquery.gemspec | 2 +- spec/fixtures/credentials.json | 8 +++ spec/outputs/bigquery/streamclient_spec.rb | 18 +++++ spec/outputs/google_bigquery_spec.rb | 4 +- 8 files changed, 105 insertions(+), 22 deletions(-) create mode 100644 spec/fixtures/credentials.json create mode 100644 spec/outputs/bigquery/streamclient_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e8bd52..5b1251e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 4.1.0 + - Added `skip_invalid_rows` configuration which will insert all valid rows of a BigQuery insert + and skip any invalid ones. + - Fixes [#5](https://github.com/logstash-plugins/logstash-output-google_bigquery/issues/5) + ## 4.0.1 - Documentation cleanup diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a001a7b..a8920dc 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -99,6 +99,7 @@ This plugin supports the following configuration options plus the <> |<>|*Obsolete* | <> |<>|Yes | <> |<>|__Deprecated__ +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|__Deprecated__ @@ -174,6 +175,9 @@ added[4.0.0] * Default value is `"/tmp/bigquery"`. The location to store events that could not be uploaded due to errors. +By default if _any_ message in an insert is invalid all will fail. +You can use <> to allow partial inserts. + Consider using an additional Logstash input to pipe the contents of these to an alert platform so you can manually fix the events. @@ -298,6 +302,17 @@ deprecated[4.0.0, Replaced by `json_key_file` or by using ADC. See <> +[id="plugins-{type}s-{plugin}-skip_invalid_rows"] +===== `skip_invalid_rows` + +added[4.1.0] + + * Value type is <> + * Default value is `false` + +Insert all valid rows of a request, even if invalid rows exist. +The default value is false, which causes the entire request to fail if any invalid rows exist. + [id="plugins-{type}s-{plugin}-table_prefix"] ===== `table_prefix` diff --git a/lib/logstash/outputs/bigquery/streamclient.rb b/lib/logstash/outputs/bigquery/streamclient.rb index 5d46b26..01b4ee8 100644 --- a/lib/logstash/outputs/bigquery/streamclient.rb +++ b/lib/logstash/outputs/bigquery/streamclient.rb @@ -36,18 +36,20 @@ def create_table(dataset, table, schema) @bigquery.create table_info end - def append(dataset, table, rows, ignore_unknown) + def append(dataset, table, rows, ignore_unknown, skip_invalid) api_debug("Appending #{rows.length} rows", dataset, table) - request = build_append_request dataset, table, rows, ignore_unknown + request = build_append_request(dataset, table, rows, ignore_unknown, skip_invalid) response = @bigquery.insertAll request - return true unless response.hasErrors + return [] unless response.hasErrors + failed_rows = [] response.getInsertErrors().entrySet().each{ |entry| key = entry.getKey - errors = entry.getValue + failed_rows << rows[key] + errors = entry.getValue errors.each{|bqError| @logger.warn('Error while inserting', key: key, @@ -57,12 +59,13 @@ def append(dataset, table, rows, ignore_unknown) } } - false + failed_rows end - def build_append_request(dataset, table, rows, ignore_unknown) + def build_append_request(dataset, table, rows, ignore_unknown, skip_invalid) request = com.google.cloud.bigquery.InsertAllRequest.newBuilder dataset, table request.setIgnoreUnknownValues ignore_unknown + request.setSkipInvalidRows(skip_invalid) rows.each { |serialized_row| # deserialize rows into Java maps @@ -75,7 +78,7 @@ def build_append_request(dataset, table, rows, ignore_unknown) # raises an exception if the key file is invalid def get_key_file_error(json_key_file) - return nil if json_key_file.nil? || json_key_file == '' + return nil if nil_or_empty?(json_key_file) abs = ::File.absolute_path json_key_file unless abs == json_key_file @@ -94,16 +97,10 @@ def initialize_google_client(json_key_file, project_id) err = get_key_file_error json_key_file raise err unless err.nil? - if json_key_file.nil? || json_key_file.empty? - return com.google.cloud.bigquery.BigQueryOptions.getDefaultInstance().getService() - end - - # TODO: set User-Agent - - key_file = java.io.FileInputStream.new json_key_file - credentials = com.google.auth.oauth2.ServiceAccountCredentials.fromStream key_file - return com.google.cloud.bigquery.BigQueryOptions.newBuilder() - .setCredentials(credentials) + com.google.cloud.bigquery.BigQueryOptions.newBuilder() + .setCredentials(credentials(json_key_file)) + .setHeaderProvider(http_headers) + .setRetrySettings(retry_settings) .setProjectId(project_id) .build() .getService() @@ -111,9 +108,45 @@ def initialize_google_client(json_key_file, project_id) private + java_import 'com.google.auth.oauth2.GoogleCredentials' + def credentials(json_key_path) + return GoogleCredentials.getApplicationDefault() if nil_or_empty?(json_key_path) + + key_file = java.io.FileInputStream.new(json_key_path) + GoogleCredentials.fromStream(key_file) + end + + java_import 'com.google.api.gax.rpc.FixedHeaderProvider' + def http_headers + gem_name = 'logstash-output-google_bigquery' + gem_version = '4.1.0' + user_agent = "Elastic/#{gem_name} version/#{gem_version}" + + FixedHeaderProvider.create({ 'User-Agent' => user_agent }) + end + + java_import 'com.google.api.gax.retrying.RetrySettings' + java_import 'org.threeten.bp.Duration' + def retry_settings + # backoff values taken from com.google.api.client.util.ExponentialBackOff + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.5) + .setMaxRetryDelay(Duration.ofSeconds(60)) + .setInitialRpcTimeout(Duration.ofSeconds(20)) + .setRpcTimeoutMultiplier(1.5) + .setMaxRpcTimeout(Duration.ofSeconds(20)) + .setTotalTimeout(Duration.ofMinutes(15)) + .build() + end + def api_debug(message, dataset, table) @logger.debug(message, dataset: dataset, table: table) end + + def nil_or_empty?(param) + param.nil? || param.empty? + end end end end diff --git a/lib/logstash/outputs/google_bigquery.rb b/lib/logstash/outputs/google_bigquery.rb index de5a6a5..dec0f45 100644 --- a/lib/logstash/outputs/google_bigquery.rb +++ b/lib/logstash/outputs/google_bigquery.rb @@ -160,6 +160,10 @@ class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base # Files names follow the pattern `[table name]-[UNIX timestamp].log` config :error_directory, validate: :string, required: true, default: '/tmp/bigquery_errors' + # Insert all valid rows of a request, even if invalid rows exist. The default value is false, + # which causes the entire request to fail if any invalid rows exist. + config :skip_invalid_rows, validate: :boolean, default: false + # The following configuration options still exist to alert users that are using them config :uploader_interval_secs, validate: :number, deprecated: 'No longer used.' config :deleter_interval_secs, validate: :number, deprecated: 'No longer used.' @@ -232,8 +236,8 @@ def publish(messages) create_table_if_not_exists table - successful = @bq_client.append @dataset, table, messages, @ignore_unknown_values - write_to_errors_file(messages, table) unless successful + failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows) + write_to_errors_file(failed_rows, table) unless failed_rows.empty? rescue StandardError => e @logger.error 'Error uploading data.', :exception => e diff --git a/logstash-output-google_bigquery.gemspec b/logstash-output-google_bigquery.gemspec index f12b3f2..ea17c92 100644 --- a/logstash-output-google_bigquery.gemspec +++ b/logstash-output-google_bigquery.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-google_bigquery' - s.version = '4.0.1' + s.version = '4.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to Google BigQuery" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/fixtures/credentials.json b/spec/fixtures/credentials.json new file mode 100644 index 0000000..5488bf4 --- /dev/null +++ b/spec/fixtures/credentials.json @@ -0,0 +1,8 @@ +{ + "//": "Dummy account from https://github.com/GoogleCloudPlatform/google-cloud-java/google-cloud-clients/google-cloud-core/src/test/java/com/google/cloud/ServiceOptionsTest.java", + "private_key_id": "somekeyid", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC+K2hSuFpAdrJI\nnCgcDz2M7t7bjdlsadsasad+fvRSW6TjNQZ3p5LLQY1kSZRqBqylRkzteMOyHgaR\n0Pmxh3ILCND5men43j3h4eDbrhQBuxfEMalkG92sL+PNQSETY2tnvXryOvmBRwa/\nQP/9dJfIkIDJ9Fw9N4Bhhhp6mCcRpdQjV38H7JsyJ7lih/oNjECgYAt\nknddadwkwewcVxHFhcZJO+XWf6ofLUXpRwiTZakGMn8EE1uVa2LgczOjwWHGi99MFjxSer5m9\n1tCa3/KEGKiS/YL71JvjwX3mb+cewlkcmweBKZHM2JPTk0ZednFSpVZMtycjkbLa\ndYOS8V85AgMBewECggEBAKksaldajfDZDV6nGqbFjMiizAKJolr/M3OQw16K6o3/\n0S31xIe3sSlgW0+UbYlF4U8KifhManD1apVSC3csafaspP4RZUHFhtBywLO9pR5c\nr6S5aLp+gPWFyIp1pfXbWGvc5VY/v9x7ya1VEa6rXvLsKupSeWAW4tMj3eo/64ge\nsdaceaLYw52KeBYiT6+vpsnYrEkAHO1fF/LavbLLOFJmFTMxmsNaG0tuiJHgjshB\n82DpMCbXG9YcCgI/DbzuIjsdj2JC1cascSP//3PmefWysucBQe7Jryb6NQtASmnv\nCdDw/0jmZTEjpe4S1lxfHplAhHFtdgYTvyYtaLZiVVkCgYEA8eVpof2rceecw/I6\n5ng1q3Hl2usdWV/4mZMvR0fOemacLLfocX6IYxT1zA1FFJlbXSRsJMf/Qq39mOR2\nSpW+hr4jCoHeRVYLgsbggtrevGmILAlNoqCMpGZ6vDmJpq6ECV9olliDvpPgWOP+\nmYPDreFBGxWvQrADNbRt2dmGsrsCgYEAyUHqB2wvJHFqdmeBsaacewzV8x9WgmeX\ngUIi9REwXlGDW0Mz50dxpxcKCAYn65+7TCnY5O/jmL0VRxU1J2mSWyWTo1C+17L0\n3fUqjxL1pkefwecxwecvC+gFFYdJ4CQ/MHHXU81Lwl1iWdFCd2UoGddYaOF+KNeM\nHC7cmqra+JsCgYEAlUNywzq8nUg7282E+uICfCB0LfwejuymR93CtsFgb7cRd6ak\nECR8FGfCpH8ruWJINllbQfcHVCX47ndLZwqv3oVFKh6pAS/vVI4dpOepP8++7y1u\ncoOvtreXCX6XqfrWDtKIvv0vjlHBhhhp6mCcRpdQjV38H7JsyJ7lih/oNjECgYAt\nkndj5uNl5SiuVxHFhcZJO+XWf6ofLUregtevZakGMn8EE1uVa2AY7eafmoU/nZPT\n00YB0TBATdCbn/nBSuKDESkhSg9s2GEKQZG5hBmL5uCMfo09z3SfxZIhJdlerreP\nJ7gSidI12N+EZxYd4xIJh/HFDgp7RRO87f+WJkofMQKBgGTnClK1VMaCRbJZPriw\nEfeFCoOX75MxKwXs6xgrw4W//AYGGUjDt83lD6AZP6tws7gJ2IwY/qP7+lyhjEqN\nHtfPZRGFkGZsdaksdlaksd323423d+15/UvrlRSFPNj1tWQmNKkXyRDW4IG1Oa2p\nrALStNBx5Y9t0/LQnFI4w3aG\n-----END PRIVATE KEY-----\n", + "client_email": "someclientid@developer.gserviceaccount.com", + "client_id": "someclientid.apps.googleusercontent.com", + "type": "service_account" +} diff --git a/spec/outputs/bigquery/streamclient_spec.rb b/spec/outputs/bigquery/streamclient_spec.rb new file mode 100644 index 0000000..1b1640e --- /dev/null +++ b/spec/outputs/bigquery/streamclient_spec.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +require 'logstash/outputs/bigquery/streamclient' + +describe LogStash::Outputs::BigQuery::StreamingClient do + + # This test is mostly to make sure the Java types, signatures and classes + # haven't changed being that JRuby is very relaxed. + describe '#initialize' do + let(:logger) { spy('logger') } + + it 'does not throw an error when initializing' do + key_file = ::File.join('spec', 'fixtures', 'credentials.json') + key_file = ::File.absolute_path(key_file) + LogStash::Outputs::BigQuery::StreamingClient.new(key_file, 'my-project', logger) + end + end +end diff --git a/spec/outputs/google_bigquery_spec.rb b/spec/outputs/google_bigquery_spec.rb index c97f784..b5d957c 100644 --- a/spec/outputs/google_bigquery_spec.rb +++ b/spec/outputs/google_bigquery_spec.rb @@ -74,7 +74,7 @@ it 'creates a table if it does not exist' do allow(subject).to receive(:create_table_if_not_exists).and_return(nil) - allow(bq_client).to receive(:append).and_return(true) + allow(bq_client).to receive(:append).and_return([]) allow(subject).to receive(:write_to_errors_file).and_return(nil) expect(subject).to receive(:create_table_if_not_exists) @@ -83,7 +83,7 @@ it 'writes rows to a file on failed insert' do allow(subject).to receive(:create_table_if_not_exists).and_return(nil) - allow(bq_client).to receive(:append).and_return(false) + allow(bq_client).to receive(:append).and_return([0]) allow(subject).to receive(:write_to_errors_file).and_return(nil) expect(subject).to receive(:write_to_errors_file)