image: tag: latest-debug config: service: | [SERVICE] parsers_file /fluent-bit/etc/parsers.conf parsers_file /fluent-bit/etc/conf/custom_parsers.conf streams_file /fluent-bit/etc/conf/custom_streams.conf http_server on health_check on inputs: | # parser is either docker or cri (containerd) [INPUT] name tail path /var/log/pods/*ingress-nginx*/controller/0.log exclude_path */0.log. parser cri tag ingress mem_buf_limit 5MB skip_empty_lines on skip_long_lines on buffer_max_size 16k buffer_chunk_size 16k filters: | # prevent flood and smooth down index docs count # pass 1000 messages per second in average over 5 minutes (300 seconds) [FILTER] name throttle match * rate 1000 window 300 interval 1s [FILTER] name expect match ingress key_exists stream action exit [FILTER] name modify match ingress condition key_value_matches message ^\[Error\] set stream stderr [FILTER] name modify match ingress # @timestamp is enough for logs - remove k8s timestamp remove time # stream processor refuses stream field rename stream streamfix # field is log with docker vs. message with containerd # parse custom nginx log format as json # https://docs.fluentbit.io/manual/pipeline/filters/parser [FILTER] name parser match source.stdout key_name message parser json_no_time reserve_data true [FILTER] name modify match source.stdout # @timestamp is enough for logs - remove nginx timestamp remove time_local # elasticsearch refuses host field rename host vhost # provides upstream_ip upstream_port [FILTER] name parser match source.stdout key_name upstream_addr parser custom_upstream reserve_data true # provides method path http_version [FILTER] name parser match source.stdout key_name request parser split_request reserve_data true # provides page [FILTER] name parser match source.stdout key_name path parser strip_querystr reserve_data true preserve_key true # ingress-*-access - obfuscate tokens passwords codes # provides path_first and eventually path_second [FILTER] name parser match source.stdout key_name path parser obfus_secrets reserve_data true preserve_key false # concatenate path_first (and path_second when there is) back to path # https://docs.fluentbit.io/manual/pipeline/filters/lua [FILTER] name lua match source.stdout script concat_path.lua call concat_path [FILTER] name modify match source.* # clean-up obfuscation artifacts remove path_first remove path_second # revert back to original field name rename streamfix stream add sensor ingress@test outputs: | #[OUTPUT] # name stdout # match source.* [OUTPUT] name opensearch match source.stderr host {{log_host}} port {{log_port}} tls on tls.verify off http_user {{log_http_user}} http_passwd {{log_http_passwd}} index test-error suppress_type_name on #replace_dots on trace_error on [OUTPUT] name opensearch match source.stdout host {{log_host}} port {{log_port}} tls on tls.verify off http_user {{log_http_user}} http_passwd {{log_http_passwd}} index test-access suppress_type_name on #replace_dots on trace_error on customParsers: | [PARSER] name json_no_time format json [PARSER] name custom_upstream format regex regex ^(?[^:]*):(?[^ ]*)$ # split-up the request field [PARSER] name split_request format regex regex ^(?[^ ]*) (?[^ ]*) HTTP/(?[^ ]*) # help differenciate web pages - strip out the query string [PARSER] name strip_querystr format regex regex ^(?[^?]*) # obfuscate only the secret part # catch token= password= search_field=code&search_value= code= and avoid barcode= [PARSER] name obfus_secrets format regex #regex ^(?.+(token|password|search_field=code&search_value|[?&]code)=)[^&]+(?.*) regex ^(?.+(token|password|code&search_value|[?&]code)=)[^&]+(?.*) extraFiles: concat_path.lua: | function concat_path(tag, timestamp, record) if record.path_first then if record.path_second then record.path = record.path_first .. "OBFUSCATED" .. record.path_second else record.path = record.path_first .. "OBFUSCATED" end return 2, timestamp, record end end custom_streams.conf: | [STREAM_TASK] Name error_log Exec CREATE STREAM error WITH (tag='source.stderr') AS SELECT * from STREAM:tail.0 WHERE streamfix = 'stderr'; [STREAM_TASK] Name access_log Exec CREATE STREAM access WITH (tag='source.stdout') AS SELECT * from STREAM:tail.0 WHERE streamfix = 'stdout'; priorityClassName: {{priclass}} nodeSelector: app_type: ingress tolerations: - key: app_type operator: Equal value: ingress effect: NoSchedule