前言
在之前一篇文章中关于Logstash安装使用已经演示过读写的功能了,Logstash不单是对数据进行读取和输出的功能,另一个强大的功能就是对数据的清洗,也就是俗称的过滤,那么此篇文章就是介绍Logstash使用Grok来进行对数据的清洗过滤!
Grok介绍
grok是用正则表达式来捕获关键数据的,grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便车查询的结构,他是目前logstash中解析非结构化日志数据最好的方式。
Grok的语法规则
%{语法:语义}
1.
语法是指匹配的模式,例如使用NUMBER模式就可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址
如:输入的内容为127.0.0.1 [07/Feb/2020:16:24:19 +0800] “GET /HTTP /1.1” 430 5039
那么使用%{IP:clientip}匹配模式将获得的结果为:clientip:127.0.0.1
那么使用%{HTTPDATE:timestamp}匹配模式将获得结果为:timestamp:07/Feb/2020:16:24:19 +0800
那么使用%{QS:referrer}匹配模式将获得的结果为:referrer:“GET /HTTP /1.1”
这些匹配模式定义模板是存放在/logstash-6.5.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
这里有一些常用的软件的匹配模式,其中grok-patterns是最基础的匹配模式,查看一下里面是啥
也就是定义了一些正则表达式罢了!
Grok语法编写
在编写过滤脚本前,先提供一个Grok的在线语法检测工具 Grok在线语法检测
这个网站可能比较慢,科学上网的可以用这个,不会科学上网的可以用国内的 国内Grok在线语法检测效果一样的
将nginx的日志放入进去最为输入数据
192.168.0.99 – – [30/Sep/2020:10:00:01 +0000] “GET / HTTP/1.1” 200 971 “-” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36” “-”
%{IP:clientip}
1.
2.
3.
4.
那么更多匹配写法演示如下
注意这里的一些符号问题,如空格、{}、-,这些符号需要转义一下
空格
\
–
\-
[
\[
1.
2.
3.
4.
5.
6.
否则无法匹配,通过上面的匹配演示,我们将输入的内容分成五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志非常有用,这正式使用grok的目的,Lostash默认提供近200个匹配模式,(其实也就是定义好的正则表达式)然我们来使用。
Logstash使用filter过滤数据
1.grok字段匹配
vi test-grok.conf
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
}
}
output{
stdout{codec=>”rubydebug”}
}
2.启动使用test-grok.conf配置文件的Logstash
输入:
192.168.0.99 — [30/Sep/2020:10:00:01 +0000] “GET / HTTP/1.1” 200 971 “-” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36” “-”
1.
2.
3.排除掉不需要的字段
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
remove_field => [“message”]
}
}
output{
stdout{codec=>”rubydebug”}
}
4.日期格式化
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
#match => [ “time”,”MMM d HH:mm:ss”, “MMM dd HH:mm:ss”, “ISO8601″] %{TIMESTAMP_ISO8601:time}
}
}
output{
stdout{codec=>”rubydebug”}
}
那么格式化后存在两个timestamp,一个为@timestamp另一个是timestamp,那么这里的@timestamp是logstash日志收集的时间,timestamp是massage中的时间,且使用date后会将timestamp的值付给@timestamp,那么此时可以将其中一个去除!
5.去除timestamp
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
}
}
output{
stdout{codec=>”rubydebug”}
}
这里注意:经过grok匹配后的remove_field 中不能直接将timestamp剔除否则在date中无法将timestamp的值赋予给@timestamp只能在时间替换后再将冗余的timestamp字段剔除掉!(呆了@的字段属于元数据字段,这些字段是不能被剔除的吧!)
那么在输出的数据中就没有两个代表时间的字段了!
6.指定字段替换内容
数据修改(mutate)可以使用gsub通过正则表达式替换字段中匹配到的值,只对字符串有效,下面演示gsub 的使用
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
}
}
output{
stdout{codec=>”rubydebug”}
}
匹配替换成功!
6.指定规则分割
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
#remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
split => [“message”,”/”]
}
}
output{
stdout{codec=>”rubydebug”}
}
7.字段重命名
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
#remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
split => [“new_message”,”/”]
rename => {“message” => “new_message”}
}
}
output{
stdout{codec=>”rubydebug”}
}
在这里不难看出mutate中的配置是存在一定的优先级的,这里重命名的优先级是高于拆分的
8.IP详细信息分析插件
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
#remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
split => [“new_message”,”/”]
rename => {“message” => “new_message”}
}
geoip{
source => “clientip”
}
}
output{
stdout{codec=>”rubydebug”}
}
提取部分geoip产生的数据
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}”]
#remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
split => [“new_message”,”/”]
rename => {“message” => “new_message”}
}
geoip{
source => “clientip”
fields => [“city_name”,”region_name”,”country_name”,”ip”,”longitude”,”timezone”]
}
}
output{
stdout{codec=>”rubydebug”}
}
9.匹配任何内容
input{
stdin{}
}
filter{
grok{
match => [“message”,”%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}\ %{GREEDYDATA:oth1}\ %{GREEDYDATA:oth2}\ %{GREEDYDATA:oth3}”]
#remove_field => [“message”]
}
date{
match => [“timestamp”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
mutate{
remove_field => [“timestamp”]
gsub => [“host”, “master”,”yyy”]
split => [“new_message”,”/”]
rename => {“message” => “new_message”}
}
geoip{
source => “clientip”
fields => [“city_name”,”region_name”,”country_name”,”ip”,”longitude”,”timezone”]
}
}
output{
stdout{codec=>”rubydebug”}
}
这里就任意匹配了其他的一些字段