Skip to content

Commit

Permalink
Only keep failed rows (#40)
Browse files Browse the repository at this point in the history
* add ability to keep only failed rows
* cleanup
  • Loading branch information
josephlewis42 authored Sep 7, 2018
1 parent bd68c15 commit 767c59d
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 4.1.0
- Added `skip_invalid_rows` configuration which will insert all valid rows of a BigQuery insert
and skip any invalid ones.
- Fixes [#5](https://github.com/logstash-plugins/logstash-output-google_bigquery/issues/5)

## 4.0.1
- Documentation cleanup

Expand Down
15 changes: 15 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-key_path>> |<<string,string>>|*Obsolete*
| <<plugins-{type}s-{plugin}-project_id>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-service_account>> |<<string,string>>|__Deprecated__
| <<plugins-{type}s-{plugin}-skip_invalid_rows>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-table_prefix>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-table_separator>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-temp_directory>> |<<string,string>>|__Deprecated__
Expand Down Expand Up @@ -174,6 +175,9 @@ added[4.0.0]
* Default value is `"/tmp/bigquery"`.

The location to store events that could not be uploaded due to errors.
By default if _any_ message in an insert is invalid all will fail.
You can use <<plugins-{type}s-{plugin}-skip_invalid_rows>> to allow partial inserts.

Consider using an additional Logstash input to pipe the contents of
these to an alert platform so you can manually fix the events.

Expand Down Expand Up @@ -298,6 +302,17 @@ deprecated[4.0.0, Replaced by `json_key_file` or by using ADC. See <<plugins-{ty

* Value type is <<string,string>>

[id="plugins-{type}s-{plugin}-skip_invalid_rows"]
===== `skip_invalid_rows`

added[4.1.0]

* Value type is <<boolean,boolean>>
* Default value is `false`

Insert all valid rows of a request, even if invalid rows exist.
The default value is false, which causes the entire request to fail if any invalid rows exist.

[id="plugins-{type}s-{plugin}-table_prefix"]
===== `table_prefix`

Expand Down
67 changes: 50 additions & 17 deletions lib/logstash/outputs/bigquery/streamclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ def create_table(dataset, table, schema)
@bigquery.create table_info
end

def append(dataset, table, rows, ignore_unknown)
def append(dataset, table, rows, ignore_unknown, skip_invalid)
api_debug("Appending #{rows.length} rows", dataset, table)

request = build_append_request dataset, table, rows, ignore_unknown
request = build_append_request(dataset, table, rows, ignore_unknown, skip_invalid)

response = @bigquery.insertAll request
return true unless response.hasErrors
return [] unless response.hasErrors

failed_rows = []
response.getInsertErrors().entrySet().each{ |entry|
key = entry.getKey
errors = entry.getValue
failed_rows << rows[key]

errors = entry.getValue
errors.each{|bqError|
@logger.warn('Error while inserting',
key: key,
Expand All @@ -57,12 +59,13 @@ def append(dataset, table, rows, ignore_unknown)
}
}

false
failed_rows
end

def build_append_request(dataset, table, rows, ignore_unknown)
def build_append_request(dataset, table, rows, ignore_unknown, skip_invalid)
request = com.google.cloud.bigquery.InsertAllRequest.newBuilder dataset, table
request.setIgnoreUnknownValues ignore_unknown
request.setSkipInvalidRows(skip_invalid)

rows.each { |serialized_row|
# deserialize rows into Java maps
Expand All @@ -75,7 +78,7 @@ def build_append_request(dataset, table, rows, ignore_unknown)

# raises an exception if the key file is invalid
def get_key_file_error(json_key_file)
return nil if json_key_file.nil? || json_key_file == ''
return nil if nil_or_empty?(json_key_file)

abs = ::File.absolute_path json_key_file
unless abs == json_key_file
Expand All @@ -94,26 +97,56 @@ def initialize_google_client(json_key_file, project_id)
err = get_key_file_error json_key_file
raise err unless err.nil?

if json_key_file.nil? || json_key_file.empty?
return com.google.cloud.bigquery.BigQueryOptions.getDefaultInstance().getService()
end

# TODO: set User-Agent

key_file = java.io.FileInputStream.new json_key_file
credentials = com.google.auth.oauth2.ServiceAccountCredentials.fromStream key_file
return com.google.cloud.bigquery.BigQueryOptions.newBuilder()
.setCredentials(credentials)
com.google.cloud.bigquery.BigQueryOptions.newBuilder()
.setCredentials(credentials(json_key_file))
.setHeaderProvider(http_headers)
.setRetrySettings(retry_settings)
.setProjectId(project_id)
.build()
.getService()
end

private

java_import 'com.google.auth.oauth2.GoogleCredentials'
def credentials(json_key_path)
return GoogleCredentials.getApplicationDefault() if nil_or_empty?(json_key_path)

key_file = java.io.FileInputStream.new(json_key_path)
GoogleCredentials.fromStream(key_file)
end

java_import 'com.google.api.gax.rpc.FixedHeaderProvider'
def http_headers
gem_name = 'logstash-output-google_bigquery'
gem_version = '4.1.0'
user_agent = "Elastic/#{gem_name} version/#{gem_version}"

FixedHeaderProvider.create({ 'User-Agent' => user_agent })
end

java_import 'com.google.api.gax.retrying.RetrySettings'
java_import 'org.threeten.bp.Duration'
def retry_settings
# backoff values taken from com.google.api.client.util.ExponentialBackOff
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.5)
.setMaxRetryDelay(Duration.ofSeconds(60))
.setInitialRpcTimeout(Duration.ofSeconds(20))
.setRpcTimeoutMultiplier(1.5)
.setMaxRpcTimeout(Duration.ofSeconds(20))
.setTotalTimeout(Duration.ofMinutes(15))
.build()
end

def api_debug(message, dataset, table)
@logger.debug(message, dataset: dataset, table: table)
end

def nil_or_empty?(param)
param.nil? || param.empty?
end
end
end
end
Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/outputs/google_bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base
# Files names follow the pattern `[table name]-[UNIX timestamp].log`
config :error_directory, validate: :string, required: true, default: '/tmp/bigquery_errors'

# Insert all valid rows of a request, even if invalid rows exist. The default value is false,
# which causes the entire request to fail if any invalid rows exist.
config :skip_invalid_rows, validate: :boolean, default: false

# The following configuration options still exist to alert users that are using them
config :uploader_interval_secs, validate: :number, deprecated: 'No longer used.'
config :deleter_interval_secs, validate: :number, deprecated: 'No longer used.'
Expand Down Expand Up @@ -232,8 +236,8 @@ def publish(messages)

create_table_if_not_exists table

successful = @bq_client.append @dataset, table, messages, @ignore_unknown_values
write_to_errors_file(messages, table) unless successful
failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows)
write_to_errors_file(failed_rows, table) unless failed_rows.empty?
rescue StandardError => e
@logger.error 'Error uploading data.', :exception => e

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-google_bigquery.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-google_bigquery'
s.version = '4.0.1'
s.version = '4.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events to Google BigQuery"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
8 changes: 8 additions & 0 deletions spec/fixtures/credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"//": "Dummy account from https://github.com/GoogleCloudPlatform/google-cloud-java/google-cloud-clients/google-cloud-core/src/test/java/com/google/cloud/ServiceOptionsTest.java",
"private_key_id": "somekeyid",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC+K2hSuFpAdrJI\nnCgcDz2M7t7bjdlsadsasad+fvRSW6TjNQZ3p5LLQY1kSZRqBqylRkzteMOyHgaR\n0Pmxh3ILCND5men43j3h4eDbrhQBuxfEMalkG92sL+PNQSETY2tnvXryOvmBRwa/\nQP/9dJfIkIDJ9Fw9N4Bhhhp6mCcRpdQjV38H7JsyJ7lih/oNjECgYAt\nknddadwkwewcVxHFhcZJO+XWf6ofLUXpRwiTZakGMn8EE1uVa2LgczOjwWHGi99MFjxSer5m9\n1tCa3/KEGKiS/YL71JvjwX3mb+cewlkcmweBKZHM2JPTk0ZednFSpVZMtycjkbLa\ndYOS8V85AgMBewECggEBAKksaldajfDZDV6nGqbFjMiizAKJolr/M3OQw16K6o3/\n0S31xIe3sSlgW0+UbYlF4U8KifhManD1apVSC3csafaspP4RZUHFhtBywLO9pR5c\nr6S5aLp+gPWFyIp1pfXbWGvc5VY/v9x7ya1VEa6rXvLsKupSeWAW4tMj3eo/64ge\nsdaceaLYw52KeBYiT6+vpsnYrEkAHO1fF/LavbLLOFJmFTMxmsNaG0tuiJHgjshB\n82DpMCbXG9YcCgI/DbzuIjsdj2JC1cascSP//3PmefWysucBQe7Jryb6NQtASmnv\nCdDw/0jmZTEjpe4S1lxfHplAhHFtdgYTvyYtaLZiVVkCgYEA8eVpof2rceecw/I6\n5ng1q3Hl2usdWV/4mZMvR0fOemacLLfocX6IYxT1zA1FFJlbXSRsJMf/Qq39mOR2\nSpW+hr4jCoHeRVYLgsbggtrevGmILAlNoqCMpGZ6vDmJpq6ECV9olliDvpPgWOP+\nmYPDreFBGxWvQrADNbRt2dmGsrsCgYEAyUHqB2wvJHFqdmeBsaacewzV8x9WgmeX\ngUIi9REwXlGDW0Mz50dxpxcKCAYn65+7TCnY5O/jmL0VRxU1J2mSWyWTo1C+17L0\n3fUqjxL1pkefwecxwecvC+gFFYdJ4CQ/MHHXU81Lwl1iWdFCd2UoGddYaOF+KNeM\nHC7cmqra+JsCgYEAlUNywzq8nUg7282E+uICfCB0LfwejuymR93CtsFgb7cRd6ak\nECR8FGfCpH8ruWJINllbQfcHVCX47ndLZwqv3oVFKh6pAS/vVI4dpOepP8++7y1u\ncoOvtreXCX6XqfrWDtKIvv0vjlHBhhhp6mCcRpdQjV38H7JsyJ7lih/oNjECgYAt\nkndj5uNl5SiuVxHFhcZJO+XWf6ofLUregtevZakGMn8EE1uVa2AY7eafmoU/nZPT\n00YB0TBATdCbn/nBSuKDESkhSg9s2GEKQZG5hBmL5uCMfo09z3SfxZIhJdlerreP\nJ7gSidI12N+EZxYd4xIJh/HFDgp7RRO87f+WJkofMQKBgGTnClK1VMaCRbJZPriw\nEfeFCoOX75MxKwXs6xgrw4W//AYGGUjDt83lD6AZP6tws7gJ2IwY/qP7+lyhjEqN\nHtfPZRGFkGZsdaksdlaksd323423d+15/UvrlRSFPNj1tWQmNKkXyRDW4IG1Oa2p\nrALStNBx5Y9t0/LQnFI4w3aG\n-----END PRIVATE KEY-----\n",
"client_email": "someclientid@developer.gserviceaccount.com",
"client_id": "someclientid.apps.googleusercontent.com",
"type": "service_account"
}
18 changes: 18 additions & 0 deletions spec/outputs/bigquery/streamclient_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8

require 'logstash/outputs/bigquery/streamclient'

describe LogStash::Outputs::BigQuery::StreamingClient do

# This test is mostly to make sure the Java types, signatures and classes
# haven't changed being that JRuby is very relaxed.
describe '#initialize' do
let(:logger) { spy('logger') }

it 'does not throw an error when initializing' do
key_file = ::File.join('spec', 'fixtures', 'credentials.json')
key_file = ::File.absolute_path(key_file)
LogStash::Outputs::BigQuery::StreamingClient.new(key_file, 'my-project', logger)
end
end
end
4 changes: 2 additions & 2 deletions spec/outputs/google_bigquery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

it 'creates a table if it does not exist' do
allow(subject).to receive(:create_table_if_not_exists).and_return(nil)
allow(bq_client).to receive(:append).and_return(true)
allow(bq_client).to receive(:append).and_return([])
allow(subject).to receive(:write_to_errors_file).and_return(nil)
expect(subject).to receive(:create_table_if_not_exists)

Expand All @@ -83,7 +83,7 @@

it 'writes rows to a file on failed insert' do
allow(subject).to receive(:create_table_if_not_exists).and_return(nil)
allow(bq_client).to receive(:append).and_return(false)
allow(bq_client).to receive(:append).and_return([0])
allow(subject).to receive(:write_to_errors_file).and_return(nil)
expect(subject).to receive(:write_to_errors_file)

Expand Down

0 comments on commit 767c59d

Please sign in to comment.