使用logstash清洗日志
logstash是一个开源的数据收集引擎,集成了多种数据输入和输出方式。 可以理解成是一种数据管道。每种方式是以插件的形式集成在logstash中。
简单举例
logstash本身包括三部分,input, filter, output。
- input :数据输入
- filter:数据转化,过滤,分析
- output:数据输出
简单举例,下面文件保存在logstash-test.conf文件中
1 2 3 4 5 |
input { stdin{} } filter { } output { stdout {codec => rubydebug} } |
在上面的例子中input是标准输入,filter没有做任何的转化,output是标准输出,不过输出格式是rubydebug。在logstash安装目录下,运行 bin/logstash -f logstash-test.conf。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
[paydev@a01.logstat.premium.m6.youku]$ bin/logstash -f logstash-test.conf hi,bruceding { "message" => "hi,bruceding", "@version" => "1", "@timestamp" => "2015-08-30T06:50:16.460Z", "host" => "a01.logstat.premium.m6.youku" } this is a test { "message" => "this is a test", "@version" => "1", "@timestamp" => "2015-08-30T06:50:27.075Z", "host" => "a01.logstat.premium.m6.youku" } |
可以看到输出是格式化的,内容放在了message中。
清洗日志
利用filter插件,logstash可以处理一切格式化的东西。介绍下是如何处理应用日志的。上面的例子中,内容都放在了message中,我们可以利用filter把message中固定字段都提取出来。
1 2 3 4 5 |
input { #stdin{} file { path => "/opt/logstash-1.4.2/test.log" start_position => "beginning" } } filter { } output { stdout {codec => rubydebug} } |
使用了file插件,path标识了文件的文件,当然path也可以是数组的形式存在,指定多个文件路径,其中也可以是用通配符。 start_position说明从哪个位置解析文件,默认值为end。
1 2 3 4 5 6 7 8 |
[paydev@a01.logstat.premium.m6.youku]$ bin/logstash -f logstash-test.conf { "message" => "00:00:00.1257 [21wsqn 63756]\t[AppResponse::log][LN:315]\t[ERROR]\t[2015-08-30 00:00:00]\tBase_Action::response on line 196\terror:0\tPOST request:/account/get_auto_renew_account_by_id\tresponse:{\"error\":0,\"cost\":0.018793821334839,\"msg\":\"fail\"}\ta:5:{s:4:\"ytid\";s:9:\"332538449\";s:8:\"platform\";s:1:\"1\";s:7:\"pub_key\";s:4:\"P005\";s:9:\"sign_type\";s:3:\"MD5\";s:4:\"sign\";s:32:\"8eedb17f9819806d4a11e653816b095c\";}\t", "@version" => "1", "@timestamp" => "2015-08-30T07:09:05.994Z", "host" => "a01.logstat.premium.m6.youku", "path" => "/opt/logstash-1.4.2/test.log" } |
使用grok可以把非结构化的数据转化成结构化数据,便于使用。虽然test.log中是非结构化的数据,但是它确是标准化的,因为每一行都遵循相同的格式。grok其实就用正则匹配的方式,把需要的字段提取出来,并赋上有意义的字段名称。
修改filter部分
1 2 3 4 5 |
filter { grok { match => { "message" => "%{TIME}\s\[%{DATA:sessionid}\]\s\[%{GREEDYDATA}\]\s\[%{TIMESTAMP_ISO8601:time}\]\s%{GREEDYDATA:method} on line %{INT:line}\serror:%{INT:errcode}\s%{DATA:request_method}\srequest:%{DATA:url}\s%{GREEDYDATA:value}" } } } |
结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
[paydev@a01.logstat.premium.m6.youku]$ bin/logstash -f logstash-test.conf { "message" => "00:00:00.1257 [21wsqn 63756]\t[AppResponse::log][LN:315]\t[ERROR]\t[2015-08-30 00:00:00]\tBase_Action::response on line 196\terror:0\tPOST request:/account/get_auto_renew_account_by_id\tresponse:{\"error\":0,\"cost\":0.018793821334839,\"msg\":\"fail\"}\ta:5:{s:4:\"ytid\";s:9:\"332538449\";s:8:\"platform\";s:1:\"1\";s:7:\"pub_key\";s:4:\"P005\";s:9:\"sign_type\";s:3:\"MD5\";s:4:\"sign\";s:32:\"8eedb17f9819806d4a11e653816b095c\";}\t", "@version" => "1", "@timestamp" => "2015-08-30T07:24:18.428Z", "host" => "a01.logstat.premium.m6.youku", "path" => "/opt/logstash-1.4.2/test.log", "sessionid" => "21wsqn 63756", "time" => "2015-08-30 00:00:00", "method" => "Base_Action::response", "line" => "196", "errcode" => "0", "request_method" => "POST", "url" => "/account/get_auto_renew_account_by_id", "value" => "response:{\"error\":0,\"cost\":0.018793821334839,\"msg\":\"fail\"}\ta:5:{s:4:\"ytid\";s:9:\"332538449\";s:8:\"platform\";s:1:\"1\";s:7:\"pub_key\";s:4:\"P005\";s:9:\"sign_type\";s:3:\"MD5\";s:4:\"sign\";s:32:\"8eedb17f9819806d4a11e653816b095c\";}\t" } |
grok默认提供了很多patterns,大多数情况下已经够用了。位置在logstash安装目录下patterns/grok-patterns。 可以在这里http://grokdebug.herokuapp.com/进行grok调试。也可以自己定义pattern, 放在grok-patterns中,就可以使用。对nginx日志的处理,就是自定义的pattern。
日志输出
我们把处理过的日志输出到elasticsearch中,便于查询。默认情况下,elasticsearch会创建logstash-YYYY.MM.DD形式的索引。
1 2 3 4 5 6 |
output { elasticsearch { host => "10.xxx.xxx.xxx" protocol => http } } |
复杂举例
目前我们会把订单实时交易同步到elasticsearch中,便于在kibana中查询。实时订单通过rabbitmq进处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
input { rabbitmq { host => "10.100.xxx.xxx" ack => true durable => true exchange => youku_premium_order_exchange password => "xxxxxx" queue => youku_premium_order_logstash_queue user => xxxxxx auto_delete => false type => youku_premium_order vhost => ykvip } } |
由于rabbitmq的内容格式是json,通过json插件进行解码。关注下@timestamp这个字段。默认情况下,这个是logstash处理时的时间戳。很多情况下,这不是我们想要的,有时需要自定义的字段来替换这个字段的值。这可以通过date插件实现。在本例中,我们使用完成时间来替换的。完整的filter如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
filter { json { source => "message" } mutate { add_field => [ "mytimestamp", "%{[body][completetime]}"] } date { match => [ "mytimestamp", "yyyy-MM-dd HH:mm:ss" ] timezone => "Asia/Shanghai" remove_field => ["mytimestamp"] locale => "en" } } |
delta airlines overweight baggage fees Delta