博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Logstash 参考指南(使用Filebeat Modules配置示例)
阅读量:5989 次
发布时间:2019-06-20

本文共 9936 字,大约阅读时间需要 33 分钟。

使用Filebeat Modules配置示例

本节中的示例展示了如何构建用于解析Filebeat模块收集的数据的Logstash管道:

Apache 2日志

本例中的Logstash管道配置展示了如何运送和解析收集的访问和错误日志。

input {  beats {    port => 5044    host => "0.0.0.0"  }}filter {  if [fileset][module] == "apache2" {    if [fileset][name] == "access" {      grok {        match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?",          "%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }        remove_field => "message"      }      mutate {        add_field => { "read_timestamp" => "%{@timestamp}" }      }      date {        match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ]        remove_field => "[apache2][access][time]"      }      useragent {        source => "[apache2][access][agent]"        target => "[apache2][access][user_agent]"        remove_field => "[apache2][access][agent]"      }      geoip {        source => "[apache2][access][remote_ip]"        target => "[apache2][access][geoip]"      }    }    else if [fileset][name] == "error" {      grok {        match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",          "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }        pattern_definitions => {          "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"        }        remove_field => "message"      }      mutate {        rename => { "[apache2][error][message1]" => "[apache2][error][message]" }      }      date {        match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]        remove_field => "[apache2][error][timestamp]"      }    }  }}output {  elasticsearch {    hosts => localhost    manage_template => false    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"  }}

MySQL日志

本例中的Logstash管道配置展示了如何运送和解析收集的错误和慢日志日志。

input {  beats {    port => 5044    host => "0.0.0.0"  }}filter {  if [fileset][module] == "mysql" {    if [fileset][name] == "error" {      grok {        match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",          "%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",          "%{GREEDYDATA:[mysql][error][message2]}"] }        pattern_definitions => {          "LOCALDATETIME" => "[0-9]+ %{TIME}"        }        remove_field => "message"      }      mutate {        rename => { "[mysql][error][message1]" => "[mysql][error][message]" }      }      mutate {        rename => { "[mysql][error][message2]" => "[mysql][error][message]" }      }      date {        match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]        remove_field => "[mysql][error][time]"      }    }    else if [fileset][name] == "slowlog" {      grok {        match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @ %{HOSTNAME:[mysql][slowlog][host]} \[(IP:[mysql][slowlog][ip])?\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowlog][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }        pattern_definitions => {          "GREEDYMULTILINE" => "(.|\n)*"        }        remove_field => "message"      }      date {        match => [ "[mysql][slowlog][timestamp]", "UNIX" ]      }      mutate {        gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""]      }    }  }}output {  elasticsearch {    hosts => localhost    manage_template => false    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"  }}

Nginx日志

本例中的Logstash管道配置展示了如何运送和解析收集的访问和错误日志。

input {  beats {    port => 5044    host => "0.0.0.0"  }}filter {  if [fileset][module] == "nginx" {    if [fileset][name] == "access" {      grok {        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }        remove_field => "message"      }      mutate {        add_field => { "read_timestamp" => "%{@timestamp}" }      }      date {        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]        remove_field => "[nginx][access][time]"      }      useragent {        source => "[nginx][access][agent]"        target => "[nginx][access][user_agent]"        remove_field => "[nginx][access][agent]"      }      geoip {        source => "[nginx][access][remote_ip]"        target => "[nginx][access][geoip]"      }    }    else if [fileset][name] == "error" {      grok {        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }        remove_field => "message"      }      mutate {        rename => { "@timestamp" => "read_timestamp" }      }      date {        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]        remove_field => "[nginx][error][time]"      }    }  }}output {  elasticsearch {    hosts => localhost    manage_template => false    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"  }}

System日志

本例中的Logstash管道配置展示了如何运送和解析收集的系统日志。

input {  beats {    port => 5044    host => "0.0.0.0"  }}filter {  if [fileset][module] == "system" {    if [fileset][name] == "auth" {      grok {        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }        pattern_definitions => {          "GREEDYMULTILINE"=> "(.|\n)*"        }        remove_field => "message"      }      date {        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]      }      geoip {        source => "[system][auth][ssh][ip]"        target => "[system][auth][ssh][geoip]"      }    }    else if [fileset][name] == "syslog" {      grok {        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }        remove_field => "message"      }      date {        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]      }    }  }}output {  elasticsearch {    hosts => localhost    manage_template => false    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"  }}

转载地址:http://osnlx.baihongyu.com/

你可能感兴趣的文章
20个热门jQuery的提示和技巧
查看>>
JavaScript对于函数的调用及原理
查看>>
Unix网络编程代码 第13章 守护进程和inetd超级服务器
查看>>
code::blocks怎么设置命令行参数
查看>>
淘宝npm镜像
查看>>
PHP redis Api 中文文档
查看>>
关于SWT的容器类之----面板Composite类和Group类
查看>>
SQL2005性能分析一些细节功能你是否有用到?
查看>>
ios app的真机调试与发布配置
查看>>
cygwin相关
查看>>
oracle自定义判断数据是否为数值函数
查看>>
notepad++快捷键大全
查看>>
JAVA通过继承线性表来实现有序表
查看>>
android设置view透明度的效果
查看>>
IOS-通讯录
查看>>
通讯录——数据结构课设
查看>>
ENVI【非监督分类】
查看>>
Hive深入浅出
查看>>
uva 1594 Ducci Sequence <queue,map>
查看>>
40、JDBC相关概念介绍
查看>>