https://hub.docker.com/_/logstash/

https://www.docker.elastic.co/r/logstash

https://github.com/logstash-plugins

https://hub.docker.com/r/sebp/elk/

# docker run  # --net=host

docker run -d  -p 5044:5044  --name logstash  --restart=always  -v /etc/localtime:/etc/localtime:ro  -v /data/docker/monitor/logstash/logstash.yml:/config/logstash.yml  -e "http.host=0.0.0.0"   docker.elastic.co/logstash/logstash:7.9.0  logstash -f  /config/logstash.yml   


应用: monitor -> 添加服务: logstash

镜像: docker.elastic.co/logstash/logstash:7.9.0

命令:

logstash -f  /config/logstash.yml

#环境变量:

http.host = 0.0.0.0

卷:

/data/docker/monitor/logstash/logstash.yml:/config/logstash.yml

/data/site:/data/site

/data/file:/data/file

端口:tcp5044-5044

保存ip: 升级或替换 不变

主机名: 使用容器名称


install  plugin

logstash-plugin install logstash-codec-fluent logstash-codec-json logstash-codec-netflow logstash-codec-rubydebug
logstash-plugin install logstash-input-file logstash-input-elasticsearch logstash-input-redis logstash-input-beats logstash-integration-jdbc
logstash-plugin install logstash-filter-grok logstash-filter-geoip logstash-filter-kv logstash-filter-json logstash-filter-urldecode logstash-filter-useragent logstash-filter-dissect 
logstash-plugin install logstash-output-rabbitmq logstash-integration-kafka logstash-output-influxdb logstash-output-opentsdb logstash-output-redis
logstash-plugin prepare-offline-pack --overwrite --output logstash-filter-multiline.zip logstash-filter-multiline
logstash-plugin install --no-verify


logstash-plugin list # 看看logstash都安装了哪些插件


vi  /data/docker/monitor/logstash/logstash.yml

input {
    redis {
        data_type =>"list"
        key =>"filebeat:logstash"
        host =>"redis"
        port => 6379
        password => "wdqdmm@r"
        threads => "10"
        db => 15
        #codec => json
        } 
}

filter {
    mutate {
        gsub => ["message", "\\x", "\\\x"]
        gsub => [
         "time", "[+]", "T"
        ]
        replace => ["time","%{time}+08:00"]   
    }
    
    if "error" in [tags] {
	  grok {
		match => {"message" => "(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) \[(?<errortype>\w+)\] \S+: \*\d+ (?<errormsg>[^,]+), \w+: %{IP:remotehost}, \w+: \w+, \w+: (?<request>[^,]+), \w+: \"%{IP:localhost}\""}
	  }
	  mutate {
		remove_field => ["message"]
	  }
	  if [request] {
	    ruby {
		  init => "@kname = ['method','uri','verb']"
		  code => "
			new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
			new_event.remove('@timestamp')
			event.append(new_event)
		  "
	    }
      }
      
    }
    
    else if "nginx" in [tags] {
    
      ruby {
		init => "@kname =['http_x_forwarded_for','time_local','request','status','body_bytes_sent','request_body','content_length','http_referer','http_user_agent','http_cookie','remote_addr','hostname','upstream_addr','upstream_response_time','request_time']"
		code => "new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
		new_event.remove('@timestamp')
		event.append(new_event)
		"
	  }
	  
      if [request] {
	    ruby {
		  init => "@kname = ['method','uri','verb']"
		  code => "
			new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
			new_event.remove('@timestamp')
			event.append(new_event)
		  "
	    }
      } 
      
      if [uri] {
	    ruby{
		  init => "@kname = ['url_path','url_args']"
		  code => "
			new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
			new_event.remove('@timestamp')
			event.append(new_event)
		  "
	    }
      }
      
      kv {
	    prefix =>"url_"
	    source =>"url_args"
	    field_split =>"&"
	    include_keys => ["uid","cip"]
	    remove_field => ["url_args","uri","request"]
      }
   
      mutate {
	    convert => [
		  "body_bytes_sent","integer",
		  "content_length","integer",
		  "upstream_response_time","float",
		  "request_time","float"
        ]
      }
  
      date {
   	    match => [ "time_local","dd/MMM/yyyy:hh:mm:ss Z" ]
	    locale => "en" 
      }
   
    }
    
    else if "fluentd" in [tags] {
    
      json {
            source => "message"
            remove_field => ["beat"]
        }
        
    }
    
    else {
    	
      json {
            source => "message"
            remove_field => ["beat","message"]
        }
        
    }
    
#使用geoip库定位ip
 if [message] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." { 
    geoip {
        source => "xff" #nginx日志中外部访问ip对应字段
        database => "/data/site/shell/GeoLite2-City.mmdb"
        #去掉显示geoip显示的多余信息
        remove_field => ["[geoip][dma_code]", "[geoip][postal_code]", "[geoip][location]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][continent_code]", "[geoip][region_code]"]
        target => "geoip"
     }
    mutate { 
        convert => [ "[geoip][coordinates]", "float" ] 
    }
  } 
  
}

# 不同的日志输出到不同的位置
output {
     if "error" in [tags]{
        elasticsearch {
           hosts => ["elasticsearch:9200"]
           index => "logstash-error-%{+YYYY.MM.dd}"
        }
    }
     else if "nginx" in [tags]{
        elasticsearch {
           hosts => ["elasticsearch:9200"]
           index => "logstash-nginx-%{+YYYY.MM.dd}"
        }
    }
     else if "fluentd" in [tags]{
        elasticsearch {
           hosts => ["elasticsearch:9200"]
           index => "logstash-fluentd-%{+YYYY.MM.dd}"
        }
    }
     else if "syslog" in [tags]{
        elasticsearch {
           hosts => ["elasticsearch:9200"]
           index => "logstash-syslog-%{+YYYY.MM.dd}"
        }
    }
     else {
     	elasticsearch {
           hosts => ["elasticsearch:9200"]
           index => "logstash-log-%{+YYYY.MM.dd}"
        }
    }
}


input {

    file {                                                                  # 使用file作为数据输入

      path => "/data/file/logs/nginx/error.log"     # 设定读入数据的路径

      type => "error"                                              # 设定读入数据的路径

      start_position => "beginning"                       # 从文件的开始处读取,end从文件末尾开始读取 

    }

    file {

       path => "/data/file/logs/nginx/nginx.log"

       type => "nginx"

       start_position => "beginning"

      codec => json

    }

}


# 输出至屏幕

output {                                                    

    stdout {

        codec => rubydebug                       # 输出至屏幕

    }

}


# 监控Nginx日志

仅仅列了filter配置块,input与output参考上一个配置

filter {

    grok {

            match => {

                    "message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""

            }

            remove_field => "message"

    }

    date {

            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]

            remove_field => "timestamp"

    }

}

# 监控Tomcat

仅仅列了filter配置块,input与output参考上一个配置

filter {

    grok {

            match => {

                    "message" => "%{HTTPD_COMMONLOG}"

            }

            remove_field => "message"

    }

    date {

            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]

            remove_field => "timestamp"

    }


input {

      file {

          path => "/var/log/messages"              #系统日志

          type => "system"

          start_position => "beginning"

      }

      file {

                    path => "/var/log/elasticsearch/yun.log"  #java异常日志

                    type => "es-error"

                    start_position => "beginning"  

                    codec => multiline {                  #多行优化处理

                    pattern => "^\["

                    negate => true

                    what => "previous"

                }

            }

}


filter{

    if "start" in [message]{     --message就是指原始消息

        grok{

            match => xxxxxxxxx

        }

    }else if "complete" in [message]{

        grok{

            xxxxxxxxxx

        }

    }else{

        grok{

            xxxxxxx

        }

    }

 

}


====================

配置解析:

Logstash 分为 Input、Output、Filter、Codec 等多种plugins。

Input:数据的输入源也支持多种插件,如elk官网的beats、file、graphite、http、kafka、redis、exec等等等、、、

Output:数据的输出目的也支持多种插件,如本文的elasticsearch,当然这可能也是最常用的一种输出。以及exec、stdout终端、graphite、http、zabbix、nagios、redmine等等、、、

Filter:使用过滤器根据日志事件的特征,对数据事件进行处理过滤后,在输出。支持grok、date、geoip、mutate、ruby、json、kv、csv、checksum、dns、drop、xml等等、、

Codec:编码插件,改变事件数据的表示方式,它可以作为对输入或输出运行该过滤。和其它产品结合,如rubydebug、graphite、fluent、nmap等等。

具体以上插件的细节可以去官网,介绍的挺详细的。下面说下该篇中的配置文件的含义:

input段:

 file:使用file 作为输入源

  path: 日志的路径,支持/var/log*.log,及[ “/var/log/messages”, “/var/log/*.log” ] 格式

  start_position: 从文件的开始读取事件。另外还有end参数

  ignore_older: 忽略早于24小时(默认值86400)的日志,设为0,即关闭该功能,以防止文件中的事件由于是早期的被logstash所忽略。

filter段:

 grok:数据结构化转换工具

  match:匹配条件格式,将nginx日志作为message变量,并应用grok条件NGINXACCESS进行转换

 geoip:该过滤器从geoip中匹配ip字段,显示该ip的地理位置

  source:ip来源字段,这里我们选择的是日志文件中的最后一个字段,如果你的是默认的nginx日志,选择第一个字段即可(注:这里写的字段是/opt/logstash/patterns/nginx 里面定义转换后的)

  target:指定插入的logstash字断目标存储为geoip

  database:geoip数据库的存放路径

  add_field: 增加的字段,坐标经度

  add_field: 增加的字段,坐标纬度

 mutate: 数据的修改、删除、类型转换

  convert: 将坐标转为float类型

  convert: http的响应代码字段转换成 int

  convert: http的传输字节转换成int

  replace: 替换一个字段

  remove_field: 移除message 的内容,因为数据已经过滤了一份,这里不必在用到该字段了。不然会相当于存两份

 date: 时间处理,该插件很实用,主要是用你日志文件中事件的事件来对timestamp进行转换,导入老的数据必备!在这里曾让我困惑了很久哦。别再掉坑了

  match:匹配到timestamp字段后,修改格式为dd/MMM/yyyy:HH:mm:ss Z

 mutate:数据修改

  remove_field: 移除timestamp字段。

output段:

 elasticsearch:输出到es中

  host: es的主机ip+端口或者es 的FQDN+端口

  index: 为日志创建索引logstash-nginx-access-*,这里也就是kibana那里添加索引时的名称

GeoIP过滤器的版本4.0.0和更高版本使用MaxMind GeoLite2数据库并支持IPv4和IPv6查找。 4.0.0之前的版本使用传统的MaxMind GeoLite数据库,仅支持IPv4查找。

====================


https://grafana.com/grafana/dashboards/11119

https://www.linuxidc.com/Linux/2017-12/149811.htm

https://github.com/weixinqing/Logstash-example

https://www.jianshu.com/p/7aa55172c3e2

https://www.jianshu.com/p/d889aae7c72e


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回