From 85279647ae8474832b533838fa5057490415ec4c Mon Sep 17 00:00:00 2001 From: yash97 Date: Sun, 23 Jul 2023 21:56:02 -0500 Subject: [PATCH 1/2] adding forward input Signed-off-by: yash97 --- apis/fluentbit/v1alpha2/clusterinput_types.go | 2 + .../v1alpha2/plugins/input/forward.go | 65 +++++++++++++++++++ .../v1alpha2/zz_generated.deepcopy.go | 30 +++++++++ .../fluentbit.fluent.io_clusterinputs.yaml | 33 ++++++++++ .../fluentbit.fluent.io_clusterinputs.yaml | 33 ++++++++++ manifests/setup/fluent-operator-crd.yaml | 33 ++++++++++ manifests/setup/setup.yaml | 33 ++++++++++ 7 files changed, 229 insertions(+) create mode 100644 apis/fluentbit/v1alpha2/plugins/input/forward.go diff --git a/apis/fluentbit/v1alpha2/clusterinput_types.go b/apis/fluentbit/v1alpha2/clusterinput_types.go index 7112a2e69..57bf38b12 100644 --- a/apis/fluentbit/v1alpha2/clusterinput_types.go +++ b/apis/fluentbit/v1alpha2/clusterinput_types.go @@ -53,6 +53,8 @@ type InputSpec struct { FluentBitMetrics *input.FluentbitMetrics `json:"fluentBitMetrics,omitempty"` // CustomPlugin defines Custom Input configuration. CustomPlugin *custom.CustomPlugin `json:"customPlugin,omitempty"` + // Forward defines forward input plugin configuration + Forward *input.Forward `json:"forward,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/fluentbit/v1alpha2/plugins/input/forward.go b/apis/fluentbit/v1alpha2/plugins/input/forward.go new file mode 100644 index 000000000..467050846 --- /dev/null +++ b/apis/fluentbit/v1alpha2/plugins/input/forward.go @@ -0,0 +1,65 @@ +package input + +import ( + "strconv" + + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" +) + +// +kubebuilder:objct:generate:=true +// Forward defines the in_forward Input plugin that listens to TCP socket to recieve the event stream +type Forward struct { + Port int32 `json:"port,omitempty"` + Listen string `json:"listen,omitempty"` + // in_forward uses the tag value for incoming logs. If not set it uses tag from incoming log. + Tag string `json:"tag,omitempty"` + // Adds the prefix to incoming event's tag + TagPrefix string `json:"tagPrefix,omitempty"` + // Specify the path to unix socket to recieve a forward message. If set, Listen and port are ignnored. + UnixPath string `json:"unixPath,omitempty"` + // Set the permission of unix socket file. + UnixPerm string `json:"unixPerm,omitempty"` + // Specify maximum buffer memory size used to recieve a forward message. Provide value in bytes. + BufferMaxSize int `json:"bufferMaxSize,omitempty"` + BufferChunkSize int `json:"bufferchunkSize,omitempty"` + // Threaded mechanism allows input plugin to run in a separate thread which helps to desaturate the main pipeline. + Threaded string `json:"threaded,omitempty"` +} + +func (_ *Forward) Name() string { + return "forward" +} + +// Params implement Section() method +func (f *Forward) Params(_ plugins.SecretLoader) (*params.KVs, error) { + kvs := params.NewKVs() + if f.Port >= 0 && f.Port < 65535 { + kvs.Insert("Port", strconv.Itoa(int(f.Port))) + } + if f.Listen != "" { + kvs.Insert("Listen", f.Listen) + } + if f.Tag != "" { + kvs.Insert("Tag", f.Tag) + } + if f.TagPrefix != "" { + kvs.Insert("TagPrefix", f.TagPrefix) + } + if f.UnixPath != "" { + kvs.Insert("UnixPath", f.UnixPath) + } + if f.UnixPerm != "" { + kvs.Insert("UnixPerm", f.UnixPerm) + } + if f.BufferChunkSize > 0 { + kvs.Insert("BufferChunkSize", strconv.Itoa(f.BufferChunkSize)) + } + if f.BufferMaxSize > 0 { + kvs.Insert("BufferMaxSize", strconv.Itoa(f.BufferMaxSize)) + } + if f.Threaded != "" { + kvs.Insert("threaded", f.Threaded) + } + return kvs, nil +} diff --git a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go index 4ba987718..9f565f074 100644 --- a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go @@ -1068,6 +1068,11 @@ func (in *InputSpec) DeepCopyInto(out *InputSpec) { *out = new(custom.CustomPlugin) **out = **in } + if in.Forward != nil { + in, out := &in.Forward, &out.Forward + *out = new(input.Forward) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputSpec. @@ -1501,6 +1506,11 @@ func (in *Service) DeepCopyInto(out *Service) { *out = new(bool) **out = **in } + if in.Storage != nil { + in, out := &in.Storage, &out.Storage + *out = new(Storage) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Service. @@ -1512,3 +1522,23 @@ func (in *Service) DeepCopy() *Service { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Storage) DeepCopyInto(out *Storage) { + *out = *in + if in.MaxChunksUp != nil { + in, out := &in.MaxChunksUp, &out.MaxChunksUp + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Storage. +func (in *Storage) DeepCopy() *Storage { + if in == nil { + return nil + } + out := new(Storage) + in.DeepCopyInto(out) + return out +} diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml index 44a71dbf4..15986c20c 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml @@ -80,6 +80,39 @@ spec: tag: type: string type: object + forward: + description: Forward defines forward input plugin configuration + properties: + bufferMaxSize: + description: Specify maximum buffer memory size used to recieve + a forward message. Provide value in bytes. + type: integer + bufferchunkSize: + type: integer + listen: + type: string + port: + format: int32 + type: integer + tag: + description: in_forward uses the tag value for incoming logs. + If not set it uses tag from incoming log. + type: string + tagPrefix: + description: Adds the prefix to incoming event's tag + type: string + threaded: + description: Threaded mechanism allows input plugin to run in + a separate thread which helps to desaturate the main pipeline. + type: string + unixPath: + description: Specify the path to unix socket to recieve a forward + message. If set, Listen and port are ignnored. + type: string + unixPerm: + description: Set the permission of unix socket file. + type: string + type: object logLevel: enum: - "off" diff --git a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml index 44a71dbf4..15986c20c 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml @@ -80,6 +80,39 @@ spec: tag: type: string type: object + forward: + description: Forward defines forward input plugin configuration + properties: + bufferMaxSize: + description: Specify maximum buffer memory size used to recieve + a forward message. Provide value in bytes. + type: integer + bufferchunkSize: + type: integer + listen: + type: string + port: + format: int32 + type: integer + tag: + description: in_forward uses the tag value for incoming logs. + If not set it uses tag from incoming log. + type: string + tagPrefix: + description: Adds the prefix to incoming event's tag + type: string + threaded: + description: Threaded mechanism allows input plugin to run in + a separate thread which helps to desaturate the main pipeline. + type: string + unixPath: + description: Specify the path to unix socket to recieve a forward + message. If set, Listen and port are ignnored. + type: string + unixPerm: + description: Set the permission of unix socket file. + type: string + type: object logLevel: enum: - "off" diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml index 552aae100..896531420 100644 --- a/manifests/setup/fluent-operator-crd.yaml +++ b/manifests/setup/fluent-operator-crd.yaml @@ -1782,6 +1782,39 @@ spec: tag: type: string type: object + forward: + description: Forward defines forward input plugin configuration + properties: + bufferMaxSize: + description: Specify maximum buffer memory size used to recieve + a forward message. Provide value in bytes. + type: integer + bufferchunkSize: + type: integer + listen: + type: string + port: + format: int32 + type: integer + tag: + description: in_forward uses the tag value for incoming logs. + If not set it uses tag from incoming log. + type: string + tagPrefix: + description: Adds the prefix to incoming event's tag + type: string + threaded: + description: Threaded mechanism allows input plugin to run in + a separate thread which helps to desaturate the main pipeline. + type: string + unixPath: + description: Specify the path to unix socket to recieve a forward + message. If set, Listen and port are ignnored. + type: string + unixPerm: + description: Set the permission of unix socket file. + type: string + type: object logLevel: enum: - "off" diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index 4a2575e6a..9632c826e 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -1782,6 +1782,39 @@ spec: tag: type: string type: object + forward: + description: Forward defines forward input plugin configuration + properties: + bufferMaxSize: + description: Specify maximum buffer memory size used to recieve + a forward message. Provide value in bytes. + type: integer + bufferchunkSize: + type: integer + listen: + type: string + port: + format: int32 + type: integer + tag: + description: in_forward uses the tag value for incoming logs. + If not set it uses tag from incoming log. + type: string + tagPrefix: + description: Adds the prefix to incoming event's tag + type: string + threaded: + description: Threaded mechanism allows input plugin to run in + a separate thread which helps to desaturate the main pipeline. + type: string + unixPath: + description: Specify the path to unix socket to recieve a forward + message. If set, Listen and port are ignnored. + type: string + unixPerm: + description: Set the permission of unix socket file. + type: string + type: object logLevel: enum: - "off" From ea7d20649de28a36a9d2d1fe192e814bad2bad96 Mon Sep 17 00:00:00 2001 From: yash97 Date: Mon, 31 Jul 2023 02:18:17 -0500 Subject: [PATCH 2/2] adding test and validation Signed-off-by: yash97 --- .../v1alpha2/clusterinput_types_test.go | 31 +++++++++- .../v1alpha2/plugins/input/forward.go | 56 +++++++++++-------- .../plugins/input/zz_generated.deepcopy.go | 20 +++++++ .../v1alpha2/zz_generated.deepcopy.go | 2 +- .../fluentbit.fluent.io_clusterinputs.yaml | 16 +++++- .../fluentbit.fluent.io_clusterinputs.yaml | 16 +++++- manifests/setup/fluent-operator-crd.yaml | 16 +++++- manifests/setup/setup.yaml | 16 +++++- 8 files changed, 136 insertions(+), 37 deletions(-) diff --git a/apis/fluentbit/v1alpha2/clusterinput_types_test.go b/apis/fluentbit/v1alpha2/clusterinput_types_test.go index 60684ea6e..c5aceb539 100644 --- a/apis/fluentbit/v1alpha2/clusterinput_types_test.go +++ b/apis/fluentbit/v1alpha2/clusterinput_types_test.go @@ -142,6 +142,14 @@ var fluentbitExpected = `[Input] Tag logs.foo.bar scrape_interval 2 scrape_on_start true +[Input] + Name forward + Alias input1_alias + Port 433 + Listen 0.0.0.0 + Buffer_Chunk_Size 1M + Buffer_Max_Size 6M + threaded on ` func TestFluentbitMetricClusterInputList_Load(t *testing.T) { @@ -176,9 +184,28 @@ func TestFluentbitMetricClusterInputList_Load(t *testing.T) { }, }, } - + inputObj2 := &ClusterInput{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fluentbit.fluent.io/v1alpha2", + Kind: "ClusterInput", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "input1", + Labels: labels, + }, + Spec: InputSpec{ + Alias: "input1_alias", + Forward: &input.Forward{ + Port: ptrInt32(int32(433)), + Listen: "0.0.0.0", + BufferChunkSize: "1M", + BufferMaxSize: "6M", + Threaded: "on", + }, + }, + } inputs := ClusterInputList{ - Items: []ClusterInput{*inputObj1}, + Items: []ClusterInput{*inputObj1, *inputObj2}, } i := 0 diff --git a/apis/fluentbit/v1alpha2/plugins/input/forward.go b/apis/fluentbit/v1alpha2/plugins/input/forward.go index 467050846..604dcabba 100644 --- a/apis/fluentbit/v1alpha2/plugins/input/forward.go +++ b/apis/fluentbit/v1alpha2/plugins/input/forward.go @@ -1,30 +1,42 @@ package input import ( - "strconv" + "fmt" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" ) -// +kubebuilder:objct:generate:=true -// Forward defines the in_forward Input plugin that listens to TCP socket to recieve the event stream +// +kubebuilder:object:generate:=true + +// Forward defines the in_forward Input plugin that listens to TCP socket to recieve the event stream. +// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/forward** type Forward struct { - Port int32 `json:"port,omitempty"` - Listen string `json:"listen,omitempty"` + // Port for forward plugin instance. + // +kubebuilder:validation:Minimum:=1 + // +kubebuilder:validation:Maximum:=65535 + Port *int32 `json:"port,omitempty"` + // Listener network interface. + Listen string `json:"listen,omitempty"` // in_forward uses the tag value for incoming logs. If not set it uses tag from incoming log. - Tag string `json:"tag,omitempty"` + Tag string `json:"tag,omitempty"` // Adds the prefix to incoming event's tag - TagPrefix string `json:"tagPrefix,omitempty"` + TagPrefix string `json:"tagPrefix,omitempty"` // Specify the path to unix socket to recieve a forward message. If set, Listen and port are ignnored. - UnixPath string `json:"unixPath,omitempty"` + UnixPath string `json:"unixPath,omitempty"` // Set the permission of unix socket file. - UnixPerm string `json:"unixPerm,omitempty"` - // Specify maximum buffer memory size used to recieve a forward message. Provide value in bytes. - BufferMaxSize int `json:"bufferMaxSize,omitempty"` - BufferChunkSize int `json:"bufferchunkSize,omitempty"` + UnixPerm string `json:"unixPerm,omitempty"` + // Specify maximum buffer memory size used to recieve a forward message. + // The value must be according to the Unit Size specification. + // +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$" + BufferMaxSize string `json:"bufferMaxSize,omitempty"` + // Set the initial buffer size to store incoming data. + // This value is used too to increase buffer size as required. + // The value must be according to the Unit Size specification. + // +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$" + BufferChunkSize string `json:"bufferchunkSize,omitempty"` // Threaded mechanism allows input plugin to run in a separate thread which helps to desaturate the main pipeline. - Threaded string `json:"threaded,omitempty"` + Threaded string `json:"threaded,omitempty"` } func (_ *Forward) Name() string { @@ -34,8 +46,8 @@ func (_ *Forward) Name() string { // Params implement Section() method func (f *Forward) Params(_ plugins.SecretLoader) (*params.KVs, error) { kvs := params.NewKVs() - if f.Port >= 0 && f.Port < 65535 { - kvs.Insert("Port", strconv.Itoa(int(f.Port))) + if f.Port != nil { + kvs.Insert("Port", fmt.Sprint(*f.Port)) } if f.Listen != "" { kvs.Insert("Listen", f.Listen) @@ -44,19 +56,19 @@ func (f *Forward) Params(_ plugins.SecretLoader) (*params.KVs, error) { kvs.Insert("Tag", f.Tag) } if f.TagPrefix != "" { - kvs.Insert("TagPrefix", f.TagPrefix) + kvs.Insert("Tag_Prefix", f.TagPrefix) } if f.UnixPath != "" { - kvs.Insert("UnixPath", f.UnixPath) + kvs.Insert("Unix_Path", f.UnixPath) } if f.UnixPerm != "" { - kvs.Insert("UnixPerm", f.UnixPerm) + kvs.Insert("Unix_Perm", f.UnixPerm) } - if f.BufferChunkSize > 0 { - kvs.Insert("BufferChunkSize", strconv.Itoa(f.BufferChunkSize)) + if f.BufferChunkSize != "" { + kvs.Insert("Buffer_Chunk_Size", f.BufferChunkSize) } - if f.BufferMaxSize > 0 { - kvs.Insert("BufferMaxSize", strconv.Itoa(f.BufferMaxSize)) + if f.BufferMaxSize != "" { + kvs.Insert("Buffer_Max_Size", f.BufferMaxSize) } if f.Threaded != "" { kvs.Insert("threaded", f.Threaded) diff --git a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go index cf48e6c30..689a6ca83 100644 --- a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go @@ -68,6 +68,26 @@ func (in *FluentbitMetrics) DeepCopy() *FluentbitMetrics { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Forward) DeepCopyInto(out *Forward) { + *out = *in + if in.Port != nil { + in, out := &in.Port, &out.Port + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Forward. +func (in *Forward) DeepCopy() *Forward { + if in == nil { + return nil + } + out := new(Forward) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeExporterMetrics) DeepCopyInto(out *NodeExporterMetrics) { *out = *in diff --git a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go index 9f565f074..fa797d7ae 100644 --- a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go @@ -1071,7 +1071,7 @@ func (in *InputSpec) DeepCopyInto(out *InputSpec) { if in.Forward != nil { in, out := &in.Forward, &out.Forward *out = new(input.Forward) - **out = **in + (*in).DeepCopyInto(*out) } } diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml index 15986c20c..5767962e8 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml @@ -85,14 +85,24 @@ spec: properties: bufferMaxSize: description: Specify maximum buffer memory size used to recieve - a forward message. Provide value in bytes. - type: integer + a forward message. The value must be according to the Unit Size + specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string bufferchunkSize: - type: integer + description: Set the initial buffer size to store incoming data. + This value is used too to increase buffer size as required. + The value must be according to the Unit Size specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string listen: + description: Listener network interface. type: string port: + description: Port for forward plugin instance. format: int32 + maximum: 65535 + minimum: 1 type: integer tag: description: in_forward uses the tag value for incoming logs. diff --git a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml index 15986c20c..5767962e8 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml @@ -85,14 +85,24 @@ spec: properties: bufferMaxSize: description: Specify maximum buffer memory size used to recieve - a forward message. Provide value in bytes. - type: integer + a forward message. The value must be according to the Unit Size + specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string bufferchunkSize: - type: integer + description: Set the initial buffer size to store incoming data. + This value is used too to increase buffer size as required. + The value must be according to the Unit Size specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string listen: + description: Listener network interface. type: string port: + description: Port for forward plugin instance. format: int32 + maximum: 65535 + minimum: 1 type: integer tag: description: in_forward uses the tag value for incoming logs. diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml index 896531420..b5a2ffb5f 100644 --- a/manifests/setup/fluent-operator-crd.yaml +++ b/manifests/setup/fluent-operator-crd.yaml @@ -1787,14 +1787,24 @@ spec: properties: bufferMaxSize: description: Specify maximum buffer memory size used to recieve - a forward message. Provide value in bytes. - type: integer + a forward message. The value must be according to the Unit Size + specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string bufferchunkSize: - type: integer + description: Set the initial buffer size to store incoming data. + This value is used too to increase buffer size as required. + The value must be according to the Unit Size specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string listen: + description: Listener network interface. type: string port: + description: Port for forward plugin instance. format: int32 + maximum: 65535 + minimum: 1 type: integer tag: description: in_forward uses the tag value for incoming logs. diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index 9632c826e..8238cc69d 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -1787,14 +1787,24 @@ spec: properties: bufferMaxSize: description: Specify maximum buffer memory size used to recieve - a forward message. Provide value in bytes. - type: integer + a forward message. The value must be according to the Unit Size + specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string bufferchunkSize: - type: integer + description: Set the initial buffer size to store incoming data. + This value is used too to increase buffer size as required. + The value must be according to the Unit Size specification. + pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$ + type: string listen: + description: Listener network interface. type: string port: + description: Port for forward plugin instance. format: int32 + maximum: 65535 + minimum: 1 type: integer tag: description: in_forward uses the tag value for incoming logs.