Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,95 @@ type Opensearch struct {
Password *plugins.Secret `json:"password,omitempty"`
// Optional, Force certificate validation
SslVerify *bool `json:"sslVerify,omitempty"`
// Optional, Absolute path to CA certificate file
CAFile *string `json:"caFile,omitempty"`
// Optional, Absolute path to client Certificate file
ClientCert *string `json:"clientCert,omitempty"`
// Optional, Absolute path to client private Key file
ClientKey *string `json:"clientKey,omitempty"`
// Optional, password for ClientKey file
ClientKeyPassword *plugins.Secret `json:"clientKeyPassword,omitempty"`
// Optional, You can specify SSL/TLS version (default: TLSv1_2)
SslVersion *string `json:"sslVersion,omitempty"`
// Optional, Minimum SSL/TLS version
SslMinVersion *string `json:"sslMinVersion,omitempty"`
// Optional, Maximum SSL/TLS version
SslMaxVersion *string `json:"sslMaxVersion,omitempty"`
// Optional, Enable logging of 400 reason without enabling debug log level (default: false)
LogOs400Reason *bool `json:"logOs400Reason,omitempty"`
// Optional, HTTP request timeout in seconds (default: 5s)
// +kubebuilder:validation:Pattern:="^\\d+(s|m|h|d)$"
RequestTimeout *string `json:"requestTimeout,omitempty"`
// Optional, Indicates that the plugin should reset connection on any error (reconnect on next send) (default: false)
ReconnectOnError *bool `json:"reconnectOnError,omitempty"`
// Optional, Automatically reload connection after 10000 documents (default: true)
ReloadConnections *bool `json:"reloadConnections,omitempty"`
// Optional, When ReloadConnections true, this is the integer number of operations after which the plugin will reload the connections (default: 10000)
ReloadAfter *uint32 `json:"reloadAfter,omitempty"`
// Optional, Indicates that the opensearch-transport will try to reload the nodes addresses if there is a failure while making the request (default: false)
ReloadOnFailure *bool `json:"reloadOnFailure,omitempty"`
// Optional, You can specify times of retry obtaining OpenSearch version (default: 15)
MaxRetryGetOsVersion *uint32 `json:"maxRetryGetOsVersion,omitempty"`
// Optional, Indicates whether to fail when max_retry_get_os_version is exceeded (default: true)
FailOnDetectingOsVersionRetryExceed *bool `json:"failOnDetectingOsVersionRetryExceed,omitempty"`
// Optional, Default OpenSearch version (default: 1)
DefaultOpensearchVersion *uint32 `json:"defaultOpensearchVersion,omitempty"`
// Optional, Validate OpenSearch version at startup (default: true)
VerifyOsVersionAtStartup *bool `json:"verifyOsVersionAtStartup,omitempty"`
// Optional, Always update the template, even if it already exists (default: false)
TemplateOverwrite *bool `json:"templateOverwrite,omitempty"`
// Optional, You can specify times of retry putting template (default: 10)
MaxRetryPuttingTemplate *uint32 `json:"maxRetryPuttingTemplate,omitempty"`
// Optional, Indicates whether to fail when max_retry_putting_template is exceeded (default: true)
FailOnPuttingTemplateRetryExceed *bool `json:"failOnPuttingTemplateRetryExceed,omitempty"`
// Optional, Provide a different sniffer class name
SnifferClassName *string `json:"snifferClassName,omitempty"`
// Optional, Provide a selector class name
SelectorClassName *string `json:"selectorClassName,omitempty"`
// Optional, You can specify HTTP backend (default: excon). Options: excon, typhoeus
HttpBackend *string `json:"httpBackend,omitempty"`
// Optional, With http_backend_excon_nonblock false, plugin uses excon with nonblock=false (default: true)
HttpBackendExconNonblock *bool `json:"httpBackendExconNonblock,omitempty"`
// Optional, You can specify the compression level (default: no_compression). Options: no_compression, best_compression, best_speed, default_compression
CompressionLevel *string `json:"compressionLevel,omitempty"`
// Optional, With default behavior, plugin uses Yajl as JSON encoder/decoder. Set to true to use Oj (default: false)
PreferOjSerializer *bool `json:"preferOjSerializer,omitempty"`
// Optional, Suppress '[types removal]' warnings on OpenSearch 2.x (default: true for OS2+)
SuppressTypeName *bool `json:"suppressTypeName,omitempty"`
// Optional, With content_type application/x-ndjson, plugin adds application/x-ndjson as Content-Type (default: application/json)
ContentType *string `json:"contentType,omitempty"`
// Optional, Include tag key in record (default: false)
IncludeTagKey *bool `json:"includeTagKey,omitempty"`
// Optional, Tag key name when include_tag_key is true (default: tag)
TagKey *string `json:"tagKey,omitempty"`
// Optional, Record accessor syntax to specify the field to use as _id in OpenSearch
IdKey *string `json:"idKey,omitempty"`
// Optional, Remove specified keys from the event record
RemoveKeys *string `json:"removeKeys,omitempty"`
// Optional, Remove keys when record is being updated
RemoveKeysOnUpdate *string `json:"removeKeysOnUpdate,omitempty"`
// Optional, The write operation (default: index). Options: index, create, update, upsert
WriteOperation *string `json:"writeOperation,omitempty"`
// Optional, When write_operation is not index, setting this true will cause plugin to emit_error_event of records which do not include _id field (default: false)
EmitErrorForMissingId *bool `json:"emitErrorForMissingId,omitempty"`
// Optional, Custom headers in Hash format
CustomHeaders *string `json:"customHeaders,omitempty"`
// Optional, Pipeline name
Pipeline *string `json:"pipeline,omitempty"`
// Optional, UTC index (default: false for local time)
UtcIndex *bool `json:"utcIndex,omitempty"`
// Optional, Suppress doc_wrap (default: false)
SuppressDocWrap *bool `json:"suppressDocWrap,omitempty"`
// Optional, List of exception classes to ignore
IgnoreExceptions *string `json:"ignoreExceptions,omitempty"`
// Optional, Backup chunk when ignore exception occurs (default: true)
ExceptionBackup *bool `json:"exceptionBackup,omitempty"`
// Optional, Configure bulk_message request splitting threshold size (default: -1 unlimited)
BulkMessageRequestThreshold *int32 `json:"bulkMessageRequestThreshold,omitempty"`
// Optional, Specify the application name for the rollover index to be created (default: default)
ApplicationName *string `json:"applicationName,omitempty"`
// Optional, Specify the index date pattern for creating a rollover index (default: now/d)
IndexDatePattern *string `json:"indexDatePattern,omitempty"`
// Optional, Use legacy template or not (default: false for composable templates)
UseLegacyTemplate *bool `json:"useLegacyTemplate,omitempty"`
}
147 changes: 100 additions & 47 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,80 +458,133 @@ func (o *Output) elasticsearchDataStreamPlugin(parent *params.PluginStore, loade
}

func (o *Output) opensearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Opensearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Opensearch.Host))
if err := o.opensearchBasicConnection(parent, loader); err != nil {
return nil, err
}

if o.Opensearch.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*o.Opensearch.Port))
o.opensearchIndexConfig(parent)
if err := o.opensearchSSLConfig(parent, loader); err != nil {
return nil, err
}
o.opensearchConnectionManagement(parent)
o.opensearchVersionDetection(parent)
o.opensearchTemplateManagement(parent)
o.opensearchPerformanceTuning(parent)
o.opensearchRecordHandling(parent)
o.opensearchAdvancedOptions(parent)

if o.Opensearch.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*o.Opensearch.Hosts))
}
return parent, nil
}

func (o *Output) opensearchBasicConnection(parent *params.PluginStore, loader plugins.SecretLoader) error {
params.InsertPairs(parent, "host", o.Opensearch.Host)
params.InsertPairs(parent, "port", o.Opensearch.Port)
params.InsertPairs(parent, "hosts", o.Opensearch.Hosts)
if o.Opensearch.User != nil {
user, err := loader.LoadSecret(*o.Opensearch.User)
if err != nil {
return nil, err
return err
}
parent.InsertPairs("user", user)
}

if o.Opensearch.Password != nil {
pwd, err := loader.LoadSecret(*o.Opensearch.Password)
if err != nil {
return nil, err
return err
}
parent.InsertPairs("password", pwd)
}
params.InsertPairs(parent, "scheme", o.Opensearch.Scheme)
params.InsertPairs(parent, "path", o.Opensearch.Path)
return nil
}

if o.Opensearch.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*o.Opensearch.Scheme))
}
func (o *Output) opensearchIndexConfig(parent *params.PluginStore) {
params.InsertPairs(parent, "index_name", o.Opensearch.IndexName)
params.InsertPairs(parent, "logstash_format", o.Opensearch.LogstashFormat)
params.InsertPairs(parent, "logstash_prefix", o.Opensearch.LogstashPrefix)
params.InsertPairs(parent, "index_date_pattern", o.Opensearch.IndexDatePattern)
params.InsertPairs(parent, "utc_index", o.Opensearch.UtcIndex)
}

if o.Opensearch.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*o.Opensearch.Path))
func (o *Output) opensearchSSLConfig(parent *params.PluginStore, loader plugins.SecretLoader) error {
params.InsertPairs(parent, "ssl_verify", o.Opensearch.SslVerify)
params.InsertPairs(parent, "ca_file", o.Opensearch.CAFile)
params.InsertPairs(parent, "client_cert", o.Opensearch.ClientCert)
params.InsertPairs(parent, "client_key", o.Opensearch.ClientKey)
if o.Opensearch.ClientKeyPassword != nil {
pwd, err := loader.LoadSecret(*o.Opensearch.ClientKeyPassword)
if err != nil {
return err
}
parent.InsertPairs("client_key_pass", pwd)
}
params.InsertPairs(parent, "ssl_version", o.Opensearch.SslVersion)
params.InsertPairs(parent, "ssl_min_version", o.Opensearch.SslMinVersion)
params.InsertPairs(parent, "ssl_max_version", o.Opensearch.SslMaxVersion)
return nil
}

if o.Opensearch.IndexName != nil {
parent.InsertPairs("index_name", fmt.Sprint(*o.Opensearch.IndexName))
}
func (o *Output) opensearchConnectionManagement(parent *params.PluginStore) {
params.InsertPairs(parent, "log_os_400_reason", o.Opensearch.LogOs400Reason)
params.InsertPairs(parent, "request_timeout", o.Opensearch.RequestTimeout)
params.InsertPairs(parent, "reconnect_on_error", o.Opensearch.ReconnectOnError)
params.InsertPairs(parent, "reload_connections", o.Opensearch.ReloadConnections)
params.InsertPairs(parent, "reload_after", o.Opensearch.ReloadAfter)
params.InsertPairs(parent, "reload_on_failure", o.Opensearch.ReloadOnFailure)
}

if o.Opensearch.LogstashFormat != nil {
parent.InsertPairs("logstash_format", fmt.Sprint(*o.Opensearch.LogstashFormat))
}
func (o *Output) opensearchVersionDetection(parent *params.PluginStore) {
params.InsertPairs(parent, "max_retry_get_os_version", o.Opensearch.MaxRetryGetOsVersion)
params.InsertPairs(parent, "fail_on_detecting_os_version_retry_exceed", o.Opensearch.FailOnDetectingOsVersionRetryExceed)
params.InsertPairs(parent, "default_opensearch_version", o.Opensearch.DefaultOpensearchVersion)
params.InsertPairs(parent, "verify_os_version_at_startup", o.Opensearch.VerifyOsVersionAtStartup)
}

if o.Opensearch.LogstashPrefix != nil {
parent.InsertPairs("logstash_prefix", fmt.Sprint(*o.Opensearch.LogstashPrefix))
}
func (o *Output) opensearchTemplateManagement(parent *params.PluginStore) {
params.InsertPairs(parent, "template_overwrite", o.Opensearch.TemplateOverwrite)
params.InsertPairs(parent, "max_retry_putting_template", o.Opensearch.MaxRetryPuttingTemplate)
params.InsertPairs(parent, "fail_on_putting_template_retry_exceed", o.Opensearch.FailOnPuttingTemplateRetryExceed)
params.InsertPairs(parent, "use_legacy_template", o.Opensearch.UseLegacyTemplate)
}

if o.Opensearch.SslVerify != nil {
parent.InsertPairs("ssl_verify", fmt.Sprint(*o.Opensearch.SslVerify))
}
func (o *Output) opensearchPerformanceTuning(parent *params.PluginStore) {
params.InsertPairs(parent, "sniffer_class_name", o.Opensearch.SnifferClassName)
params.InsertPairs(parent, "selector_class_name", o.Opensearch.SelectorClassName)
params.InsertPairs(parent, "http_backend", o.Opensearch.HttpBackend)
params.InsertPairs(parent, "http_backend_excon_nonblock", o.Opensearch.HttpBackendExconNonblock)
params.InsertPairs(parent, "compression_level", o.Opensearch.CompressionLevel)
params.InsertPairs(parent, "prefer_oj_serializer", o.Opensearch.PreferOjSerializer)
params.InsertPairs(parent, "bulk_message_request_threshold", o.Opensearch.BulkMessageRequestThreshold)
}

return parent, nil
func (o *Output) opensearchRecordHandling(parent *params.PluginStore) {
params.InsertPairs(parent, "suppress_type_name", o.Opensearch.SuppressTypeName)
params.InsertPairs(parent, "content_type", o.Opensearch.ContentType)
params.InsertPairs(parent, "include_tag_key", o.Opensearch.IncludeTagKey)
params.InsertPairs(parent, "tag_key", o.Opensearch.TagKey)
params.InsertPairs(parent, "id_key", o.Opensearch.IdKey)
params.InsertPairs(parent, "remove_keys", o.Opensearch.RemoveKeys)
params.InsertPairs(parent, "remove_keys_on_update", o.Opensearch.RemoveKeysOnUpdate)
params.InsertPairs(parent, "write_operation", o.Opensearch.WriteOperation)
params.InsertPairs(parent, "emit_error_for_missing_id", o.Opensearch.EmitErrorForMissingId)
params.InsertPairs(parent, "suppress_doc_wrap", o.Opensearch.SuppressDocWrap)
}

func (o *Output) opensearchAdvancedOptions(parent *params.PluginStore) {
params.InsertPairs(parent, "custom_headers", o.Opensearch.CustomHeaders)
params.InsertPairs(parent, "pipeline", o.Opensearch.Pipeline)
params.InsertPairs(parent, "ignore_exceptions", o.Opensearch.IgnoreExceptions)
params.InsertPairs(parent, "exception_backup", o.Opensearch.ExceptionBackup)
params.InsertPairs(parent, "application_name", o.Opensearch.ApplicationName)
}

func (o *Output) kafka2Plugin(parent *params.PluginStore, _ plugins.SecretLoader) *params.PluginStore {
if o.Kafka.Brokers != nil {
parent.InsertPairs("brokers", fmt.Sprint(*o.Kafka.Brokers))
}
if o.Kafka.TopicKey != nil {
parent.InsertPairs("topic_key", fmt.Sprint(*o.Kafka.TopicKey))
}
if o.Kafka.DefaultTopic != nil {
parent.InsertPairs("default_topic", fmt.Sprint(*o.Kafka.DefaultTopic))
}
if o.Kafka.UseEventTime != nil {
parent.InsertPairs("use_event_time", fmt.Sprint(*o.Kafka.UseEventTime))
}
if o.Kafka.RequiredAcks != nil {
parent.InsertPairs("required_acks", fmt.Sprint(*o.Kafka.RequiredAcks))
}
if o.Kafka.CompressionCodec != nil {
parent.InsertPairs("compression_codec", fmt.Sprint(*o.Kafka.CompressionCodec))
}
params.InsertPairs(parent, "brokers", o.Kafka.Brokers)
params.InsertPairs(parent, "topic_key", o.Kafka.TopicKey)
params.InsertPairs(parent, "default_topic", o.Kafka.DefaultTopic)
params.InsertPairs(parent, "use_event_time", o.Kafka.UseEventTime)
params.InsertPairs(parent, "required_acks", o.Kafka.RequiredAcks)
params.InsertPairs(parent, "compression_codec", o.Kafka.CompressionCodec)

return parent
}
Expand Down
4 changes: 3 additions & 1 deletion apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ps *PluginStore) InsertPairs(key, value string) {
}

type ValueType interface {
*string | *bool | *int | *int16 | *uint16 | *uint32
*string | *bool | *int | *int16 | *int32 | *uint16 | *uint32
}

func InsertPairs[T ValueType](ps *PluginStore, key string, value T) {
Expand All @@ -52,6 +52,8 @@ func InsertPairs[T ValueType](ps *PluginStore, key string, value T) {
ps.InsertPairs(key, strconv.FormatInt(int64(*v), 10))
case *int16:
ps.InsertPairs(key, strconv.FormatInt(int64(*v), 10))
case *int32:
ps.InsertPairs(key, strconv.FormatInt(int64(*v), 10))
Copy link
Collaborator

@marcofranssen marcofranssen Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ps.InsertPairs(key, strconv.FormatInt(int64(*v), 10))
ps.InsertPairs(key, strconv.FormatInt(int32(*v), 10))

Not sure, looks like all we use int64, but not sure if that is correct either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion to int64/uint64 in these lines looks correct to me - it's required by strconv.FormatInt() and strconv.FormatUint().

The underlying inconsistency is in the type definitions across the project:

  • Input ports: mostly *int32
  • Output ports: mostly *uint32
  • Exception: monitor_agent.Port uses *int64
  • Other numeric fields: mixed *int16, *int32, *int64

The InsertPairs function must support all these types generically, hence the different cases. Standardizing types across the project could be a good idea for the future ;)

case *uint16:
ps.InsertPairs(key, strconv.FormatUint(uint64(*v), 10))
case *uint32:
Expand Down
Loading
Loading