input { kafka { bootstrap_servers => "elk:9092,elk2:9092,elk3:9092" # kafka地址 topics => ["vipthink","message-notify-test","test_kfk"] # kafka中topic名称,记得topic #group_id => "prod_nginx_group" # 默认为"logstash" codec => "json" # 与Shipper端output配置项一致 consumer_threads => 1 # 消费的线程数 decorate_events => true # 在输出消息的时候回输出自身的信息,包括:消费消息的大小、topic来源以及consumer的group信息。 type => "logstash_mixins" } } output { if [type] == "logstash_mixins" { elasticsearch { action => "index" #The operation on ES hosts => ["elk:9200","elk2:9200","elk3:9200"] #ElasticSearch host, can be array. #index => "%{+YYYY.MM.dd}_%{[fields][log_topic]}" #The index to write data to, can be any string. #index => "%{+YYYY.MM}_%{[@metadata][topic]}" index => "%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "UpdZtZ9yC3kxzbToFDme" } } }