Skip to content

Commit

Permalink
Initial s3 import (#7)
Browse files Browse the repository at this point in the history
* [CHG] config file and cleaned

* [CHG] refactoring global config

* [ADD] s3 lister

* [WIP] initial s3 download

* [FIX] config on inputs

* [WIP] since and to

* [ADD] since and to

* [FIX] stop process

* [ADD] debug message on sqs deletion

* [DEL] TODOs because they are done

* [ADD] doc

* [FIX] incorrect package

* Json parser (#8)

* [ADD] allow to convert from any type to a new one

* [ADD] benchmark

* [CHG] use original type instead of converting to string

* [ADD] json log parser

* [ADD] log parsers on read

* [ADD] waf log parser

* [FIX] PR comments
  • Loading branch information
mpucholblasco authored Apr 23, 2019
1 parent 41cebb8 commit 5111d1e
Show file tree
Hide file tree
Showing 47 changed files with 1,887 additions and 423 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/.idea
/build
/logs

.DS_Store
/s3logsbeat
Expand Down
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ S3logsbeat has the following features:
* High availability: you can have several S3logsbeat running in parallel
* Reliability: SQS messages are only deleted when output contains all events
* Avoid duplicates on supported outputs
* Supported S3 log parsers: ELB, ALB, CloudFront
* Supported several S3 log formats (see [Suported log formats](#supported-log-formats))
* Extra fields based on S3 key
* Delayed shutdown based on timout and pending messages to be acked by outputs
* Limited amount of resources: ~20MB RAM in my tests
Expand Down Expand Up @@ -159,9 +159,56 @@ IAM policy:
}
```

### Initial import
You may already have S3 log files when you configure an SQS queue to import new files via `s3logsbeat`. If this is the case,
you can import those files by using the command `s3imports` and a configuration file as this:
```yaml
s3logsbeat:
inputs:
# S3 inputs (only taken into account when command `s3import` is executed)
-
type: s3
# S3
buckets:
- s3://mybucket/mypath
log_format: alb
# Optional fields extractor from key. E.g. key=staging-myapp/eu-west-1/2018/06/01/
key_regex_fields: ^(?P<environment>[^\-]+)-(?P<application>[^/]+)/(?P<awsregion>[^/]+)
since: 2018-10-15T01:00 # ISO8601 format - optional
to: 2018-11-20T01:00 # ISO8601 format - optional
```
Using command `./s3logsbeat s3imports -c config.yml` you can import all those S3 files that you already have on S3. This command
is not executed as a daemon and exits when all S3 objects are processed. Those SQS inputs present on configuration will be
ignored when command `s3imports` is executed.

This command is useful on first import, however, you should take care because in combination with standard mode of `s3logsbeat`
can generate duplicates. In order to avoid this problem you can:
* Use `@metadata._id` in order to avoid duplicates on ElasticSearch (see section [Avoid duplicates](#avoid-duplicates)).
* Configure the SQS queue and S3 event notifications. Wait until the first element is present on the queue. Via console or
cli, analyse the element present on the SQS queue without deleting it (it will reappear later). Then edit yaml configuration
and set the `to` property to just one second before the one obtained and execute `s3imports` command.

### Supported log formats
`s3logsbeat` supports the following log formats:
* `elb`: parses Elastic Load Balancer (classic ELB) log.
* `alb`: parses Application Load Balancer (ALB) log.
* `cloudfront`: parses CloudFront logs.
* `waf`: parses WAF logs.
* `json`: parses JSON logs. Requires the following options (set via parameter `log_format_options`):
* `timestamp_field`: field that represents the timestamp of log event. Mandatory.
* `timestamp_format`: format in which timestamp is represented and from which should be converted into Date/Time. See [Suported timestamp formats](#supported-timestamp-formats). Mandatory.

### Supported timestamp formats
The following timestamp formats are supported:
* `timeUnixMilliseconds`: long or string with epoc millis.
* `timeISO8601`: string with ISO8601 format.
* `time:layout`: string with layout format present after prefix `time:`. Valid layouts correspond to ones parsed by [time.Parse](https://golang.org/pkg/time/#Parse).

### Example of events

#### ALB
The following log event example is generated when `log_format: alb` is present:
```
{
"_index" : "yourindex-2018.09.14",
Expand Down Expand Up @@ -205,7 +252,7 @@ IAM policy:
These fields corresond to the ones established by AWS on [this page](https://docs.aws.amazon.com/es_es/elasticloadbalancing/latest/application/load-balancer-access-logs.html).

#### CloudFront
Events generated from CloudFront logs are in the following form:
The following log event example is generated when `log_format: cloudfront` is present:
```
{
"_index" : "yourindex-2018.09.02",
Expand Down
23 changes: 0 additions & 23 deletions TODO.md

This file was deleted.

43 changes: 42 additions & 1 deletion aws/s3object.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package aws

import "fmt"
import (
"fmt"
"regexp"

"github.com/aws/aws-sdk-go/service/s3"
)

var (
s3uriRE = regexp.MustCompile(`^s3://(?P<bucket>[^/]+)/(?P<key>.*)$`)
)

// S3Object represents an object on S3
type S3Object struct {
Expand All @@ -16,7 +25,39 @@ func NewS3Object(bucket, key string) *S3Object {
}
}

// NewS3ObjectFromURI creates a new S3 object from a URI with format:
// s3://bucket/path
func NewS3ObjectFromURI(uri string) (*S3Object, error) {
re := s3uriRE.Copy()
match := re.FindStringSubmatch(uri)
if match == nil {
return nil, fmt.Errorf("Incorrect S3 URI %s", uri)
}

return &S3Object{
Bucket: match[1],
Key: match[2],
}, nil
}

// String converts current object into string
func (s *S3Object) String() string {
return fmt.Sprintf("S3Object{Bucket:%s, Key: %s}", s.Bucket, s.Key)
}

// S3ObjectWithOriginal represents an object on S3 and includes information from original
type S3ObjectWithOriginal struct {
*s3.Object
*S3Object
}

// NewS3ObjectWithOriginal creates a new S3 object which includes the original obtained from AWS
func NewS3ObjectWithOriginal(bucket string, original *s3.Object) *S3ObjectWithOriginal {
return &S3ObjectWithOriginal{
original,
&S3Object{
Bucket: bucket,
Key: *original.Key,
},
}
}
28 changes: 28 additions & 0 deletions aws/s3object_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !integration

package aws

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestS3ObjectInvalidURI(t *testing.T) {
_, err := NewS3ObjectFromURI(`s3://no`)
assert.Error(t, err)
}

func TestS3ObjectURIWithoutPath(t *testing.T) {
s3object, err := NewS3ObjectFromURI(`s3://valid.com/`)
assert.NoError(t, err)
assert.Equal(t, "valid.com", s3object.Bucket)
assert.Equal(t, "", s3object.Key)
}

func TestS3ObjectURIComplete(t *testing.T) {
s3object, err := NewS3ObjectFromURI(`s3://valid.com/a/b/c`)
assert.NoError(t, err)
assert.Equal(t, "valid.com", s3object.Bucket)
assert.Equal(t, "a/b/c", s3object.Key)
}
21 changes: 21 additions & 0 deletions aws/s3reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type s3readcloser struct {
c []io.Closer
}

type s3ObjectHandler func(*S3ObjectWithOriginal) error

// NewS3 is a construct function for creating the object
// with session
func NewS3(session *session.Session) *S3 {
Expand All @@ -45,6 +47,25 @@ func (s *S3) GetReadCloser(o *S3Object) (io.ReadCloser, error) {
return newS3ReadCloser(output.Body, o.Key)
}

// ListObjects lists objects present on o.Bucket and prefix o.Key
func (s *S3) ListObjects(o *S3Object, oh s3ObjectHandler) (int, error) {
received := 0
s3ListObjectsInput := &s3.ListObjectsInput{
Bucket: aws.String(o.Bucket),
Prefix: aws.String(o.Key),
}
err := s.client.ListObjectsPages(s3ListObjectsInput, func(res *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
received += len(res.Contents)
for _, r := range res.Contents {
if err := oh(NewS3ObjectWithOriginal(o.Bucket, r)); err != nil {
return false
}
}
return true
})
return received, err
}

func newS3ReadCloser(i io.ReadCloser, key string) (io.ReadCloser, error) {
s := &s3readcloser{
i: i,
Expand Down
6 changes: 3 additions & 3 deletions beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ type eventACKer struct {
}

type successLogger interface {
Published(sqsMessages []*pipeline.SQSMessage)
Published(privateElements []pipeline.S3ObjectProcessNotifications)
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
}

func (a *eventACKer) ackEvents(data []interface{}) {
states := make([]*pipeline.SQSMessage, 0, len(data))
states := make([]pipeline.S3ObjectProcessNotifications, 0, len(data))
for _, datum := range data {
if datum == nil {
continue
}

st, ok := datum.(*pipeline.SQSMessage)
st, ok := datum.(pipeline.S3ObjectProcessNotifications)
if !ok {
continue
}
Expand Down
8 changes: 8 additions & 0 deletions beater/beat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package beater

import "flag"

var (
once = flag.Bool("once", false, "Run s3logsbeat only once until all inputs will be read")
keepSQSMessages = flag.Bool("keepsqsmessages", false, "Do not delete SQS messages when processed (set for testing)")
)
6 changes: 3 additions & 3 deletions beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type registrarLogger struct {
done chan struct{}
ch chan<- []*pipeline.SQSMessage
ch chan<- []pipeline.S3ObjectProcessNotifications
}

type finishedLogger struct {
Expand All @@ -35,15 +35,15 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {

func (l *registrarLogger) Close() { close(l.done) }

func (l *registrarLogger) Published(sqsMessages []*pipeline.SQSMessage) {
func (l *registrarLogger) Published(privateElements []pipeline.S3ObjectProcessNotifications) {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
// has been processed the first time.
// Note: nil channels will block, so only done channel will be actively
// report 'closed'.
l.ch = nil
case l.ch <- sqsMessages:
case l.ch <- privateElements:
}
}

Expand Down
Loading

0 comments on commit 5111d1e

Please sign in to comment.