Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding forward input plugin configuration #843

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apis/fluentbit/v1alpha2/clusterinput_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type InputSpec struct {
FluentBitMetrics *input.FluentbitMetrics `json:"fluentBitMetrics,omitempty"`
// CustomPlugin defines Custom Input configuration.
CustomPlugin *custom.CustomPlugin `json:"customPlugin,omitempty"`
// Forward defines forward input plugin configuration
Forward *input.Forward `json:"forward,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
31 changes: 29 additions & 2 deletions apis/fluentbit/v1alpha2/clusterinput_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ var fluentbitExpected = `[Input]
Tag logs.foo.bar
scrape_interval 2
scrape_on_start true
[Input]
Name forward
Alias input1_alias
Port 433
Listen 0.0.0.0
Buffer_Chunk_Size 1M
Buffer_Max_Size 6M
threaded on
`

func TestFluentbitMetricClusterInputList_Load(t *testing.T) {
Expand Down Expand Up @@ -176,9 +184,28 @@ func TestFluentbitMetricClusterInputList_Load(t *testing.T) {
},
},
}

inputObj2 := &ClusterInput{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterInput",
},
ObjectMeta: metav1.ObjectMeta{
Name: "input1",
Labels: labels,
},
Spec: InputSpec{
Alias: "input1_alias",
Forward: &input.Forward{
Port: ptrInt32(int32(433)),
Listen: "0.0.0.0",
BufferChunkSize: "1M",
BufferMaxSize: "6M",
Threaded: "on",
},
},
}
inputs := ClusterInputList{
Items: []ClusterInput{*inputObj1},
Items: []ClusterInput{*inputObj1, *inputObj2},
}

i := 0
Expand Down
77 changes: 77 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package input

import (
"fmt"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)

// +kubebuilder:object:generate:=true

// Forward defines the in_forward Input plugin that listens to TCP socket to recieve the event stream.
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/forward**
type Forward struct {
// Port for forward plugin instance.
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=65535
Port *int32 `json:"port,omitempty"`
// Listener network interface.
Listen string `json:"listen,omitempty"`
// in_forward uses the tag value for incoming logs. If not set it uses tag from incoming log.
Tag string `json:"tag,omitempty"`
// Adds the prefix to incoming event's tag
TagPrefix string `json:"tagPrefix,omitempty"`
// Specify the path to unix socket to recieve a forward message. If set, Listen and port are ignnored.
UnixPath string `json:"unixPath,omitempty"`
// Set the permission of unix socket file.
UnixPerm string `json:"unixPerm,omitempty"`
// Specify maximum buffer memory size used to recieve a forward message.
// The value must be according to the Unit Size specification.
// +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$"
BufferMaxSize string `json:"bufferMaxSize,omitempty"`
// Set the initial buffer size to store incoming data.
// This value is used too to increase buffer size as required.
// The value must be according to the Unit Size specification.
// +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$"
BufferChunkSize string `json:"bufferchunkSize,omitempty"`
// Threaded mechanism allows input plugin to run in a separate thread which helps to desaturate the main pipeline.
Threaded string `json:"threaded,omitempty"`
}

func (_ *Forward) Name() string {
return "forward"
}

// Params implement Section() method
func (f *Forward) Params(_ plugins.SecretLoader) (*params.KVs, error) {
kvs := params.NewKVs()
if f.Port != nil {
kvs.Insert("Port", fmt.Sprint(*f.Port))
}
if f.Listen != "" {
kvs.Insert("Listen", f.Listen)
}
if f.Tag != "" {
kvs.Insert("Tag", f.Tag)
}
if f.TagPrefix != "" {
kvs.Insert("Tag_Prefix", f.TagPrefix)
}
if f.UnixPath != "" {
kvs.Insert("Unix_Path", f.UnixPath)
}
if f.UnixPerm != "" {
kvs.Insert("Unix_Perm", f.UnixPerm)
}
if f.BufferChunkSize != "" {
kvs.Insert("Buffer_Chunk_Size", f.BufferChunkSize)
}
if f.BufferMaxSize != "" {
kvs.Insert("Buffer_Max_Size", f.BufferMaxSize)
}
if f.Threaded != "" {
kvs.Insert("threaded", f.Threaded)
}
return kvs, nil
}
20 changes: 20 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions apis/fluentbit/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,49 @@ spec:
tag:
type: string
type: object
forward:
description: Forward defines forward input plugin configuration
properties:
bufferMaxSize:
description: Specify maximum buffer memory size used to recieve
a forward message. The value must be according to the Unit Size
specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
bufferchunkSize:
description: Set the initial buffer size to store incoming data.
This value is used too to increase buffer size as required.
The value must be according to the Unit Size specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
listen:
description: Listener network interface.
type: string
port:
description: Port for forward plugin instance.
format: int32
maximum: 65535
minimum: 1
type: integer
tag:
description: in_forward uses the tag value for incoming logs.
If not set it uses tag from incoming log.
type: string
tagPrefix:
description: Adds the prefix to incoming event's tag
type: string
threaded:
description: Threaded mechanism allows input plugin to run in
a separate thread which helps to desaturate the main pipeline.
type: string
unixPath:
description: Specify the path to unix socket to recieve a forward
message. If set, Listen and port are ignnored.
type: string
unixPerm:
description: Set the permission of unix socket file.
type: string
type: object
logLevel:
enum:
- "off"
Expand Down
43 changes: 43 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,49 @@ spec:
tag:
type: string
type: object
forward:
description: Forward defines forward input plugin configuration
properties:
bufferMaxSize:
description: Specify maximum buffer memory size used to recieve
a forward message. The value must be according to the Unit Size
specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
bufferchunkSize:
description: Set the initial buffer size to store incoming data.
This value is used too to increase buffer size as required.
The value must be according to the Unit Size specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
listen:
description: Listener network interface.
type: string
port:
description: Port for forward plugin instance.
format: int32
maximum: 65535
minimum: 1
type: integer
tag:
description: in_forward uses the tag value for incoming logs.
If not set it uses tag from incoming log.
type: string
tagPrefix:
description: Adds the prefix to incoming event's tag
type: string
threaded:
description: Threaded mechanism allows input plugin to run in
a separate thread which helps to desaturate the main pipeline.
type: string
unixPath:
description: Specify the path to unix socket to recieve a forward
message. If set, Listen and port are ignnored.
type: string
unixPerm:
description: Set the permission of unix socket file.
type: string
type: object
logLevel:
enum:
- "off"
Expand Down
43 changes: 43 additions & 0 deletions manifests/setup/fluent-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,49 @@ spec:
tag:
type: string
type: object
forward:
description: Forward defines forward input plugin configuration
properties:
bufferMaxSize:
description: Specify maximum buffer memory size used to recieve
a forward message. The value must be according to the Unit Size
specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
bufferchunkSize:
description: Set the initial buffer size to store incoming data.
This value is used too to increase buffer size as required.
The value must be according to the Unit Size specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
listen:
description: Listener network interface.
type: string
port:
description: Port for forward plugin instance.
format: int32
maximum: 65535
minimum: 1
type: integer
tag:
description: in_forward uses the tag value for incoming logs.
If not set it uses tag from incoming log.
type: string
tagPrefix:
description: Adds the prefix to incoming event's tag
type: string
threaded:
description: Threaded mechanism allows input plugin to run in
a separate thread which helps to desaturate the main pipeline.
type: string
unixPath:
description: Specify the path to unix socket to recieve a forward
message. If set, Listen and port are ignnored.
type: string
unixPerm:
description: Set the permission of unix socket file.
type: string
type: object
logLevel:
enum:
- "off"
Expand Down
Loading
Loading