https://hub.docker.com/_/logstash/
https://www.docker.elastic.co/r/logstash
https://github.com/logstash-plugins
https://hub.docker.com/r/sebp/elk/
# docker run # --net=host
docker run -d -p 5044:5044 --name logstash --restart=always -v /etc/localtime:/etc/localtime:ro -v /data/docker/monitor/logstash/logstash.yml:/config/logstash.yml -e "http.host=0.0.0.0" docker.elastic.co/logstash/logstash:7.9.0 logstash -f /config/logstash.yml
应用: monitor -> 添加服务: logstash
镜像: docker.elastic.co/logstash/logstash:7.9.0
命令:
logstash -f /config/logstash.yml
#环境变量:
http.host = 0.0.0.0
卷:
/data/docker/monitor/logstash/logstash.yml:/config/logstash.yml
/data/site:/data/site
/data/file:/data/file
端口:tcp5044-5044
保存ip: 升级或替换 不变
主机名: 使用容器名称
install plugin
logstash-plugin install logstash-codec-fluent logstash-codec-json logstash-codec-netflow logstash-codec-rubydebug
logstash-plugin install logstash-input-file logstash-input-elasticsearch logstash-input-redis logstash-input-beats logstash-integration-jdbc
logstash-plugin install logstash-filter-grok logstash-filter-geoip logstash-filter-kv logstash-filter-json logstash-filter-urldecode logstash-filter-useragent logstash-filter-dissect
logstash-plugin install logstash-output-rabbitmq logstash-integration-kafka logstash-output-influxdb logstash-output-opentsdb logstash-output-redis
logstash-plugin prepare-offline-pack --overwrite --output logstash-filter-multiline.zip logstash-filter-multiline
logstash-plugin install --no-verify
logstash-plugin list # 看看logstash都安装了哪些插件
vi /data/docker/monitor/logstash/logstash.yml
input {
redis {
data_type =>"list"
key =>"filebeat:logstash"
host =>"redis"
port => 6379
password => "wdqdmm@r"
threads => "10"
db => 15
#codec => json
}
}
filter {
mutate {
gsub => ["message", "\\x", "\\\x"]
gsub => [
"time", "[+]", "T"
]
replace => ["time","%{time}+08:00"]
}
if "error" in [tags] {
grok {
match => {"message" => "(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) \[(?<errortype>\w+)\] \S+: \*\d+ (?<errormsg>[^,]+), \w+: %{IP:remotehost}, \w+: \w+, \w+: (?<request>[^,]+), \w+: \"%{IP:localhost}\""}
}
mutate {
remove_field => ["message"]
}
if [request] {
ruby {
init => "@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
}
else if "nginx" in [tags] {
ruby {
init => "@kname =['http_x_forwarded_for','time_local','request','status','body_bytes_sent','request_body','content_length','http_referer','http_user_agent','http_cookie','remote_addr','hostname','upstream_addr','upstream_response_time','request_time']"
code => "new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
if [request] {
ruby {
init => "@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
if [uri] {
ruby{
init => "@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
include_keys => ["uid","cip"]
remove_field => ["url_args","uri","request"]
}
mutate {
convert => [
"body_bytes_sent","integer",
"content_length","integer",
"upstream_response_time","float",
"request_time","float"
]
}
date {
match => [ "time_local","dd/MMM/yyyy:hh:mm:ss Z" ]
locale => "en"
}
}
else if "fluentd" in [tags] {
json {
source => "message"
remove_field => ["beat"]
}
}
else {
json {
source => "message"
remove_field => ["beat","message"]
}
}
#使用geoip库定位ip
if [message] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
geoip {
source => "xff" #nginx日志中外部访问ip对应字段
database => "/data/site/shell/GeoLite2-City.mmdb"
#去掉显示geoip显示的多余信息
remove_field => ["[geoip][dma_code]", "[geoip][postal_code]", "[geoip][location]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][continent_code]", "[geoip][region_code]"]
target => "geoip"
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
}
# 不同的日志输出到不同的位置
output {
if "error" in [tags]{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-error-%{+YYYY.MM.dd}"
}
}
else if "nginx" in [tags]{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-nginx-%{+YYYY.MM.dd}"
}
}
else if "fluentd" in [tags]{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-fluentd-%{+YYYY.MM.dd}"
}
}
else if "syslog" in [tags]{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-syslog-%{+YYYY.MM.dd}"
}
}
else {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-log-%{+YYYY.MM.dd}"
}
}
}
input {
file { # 使用file作为数据输入
path => "/data/file/logs/nginx/error.log" # 设定读入数据的路径
type => "error" # 设定读入数据的路径
start_position => "beginning" # 从文件的开始处读取,end从文件末尾开始读取
}
file {
path => "/data/file/logs/nginx/nginx.log"
type => "nginx"
start_position => "beginning"
codec => json
}
}
# 输出至屏幕
output {
stdout {
codec => rubydebug # 输出至屏幕
}
}
# 监控Nginx日志
仅仅列了filter配置块,input与output参考上一个配置
filter {
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""
}
remove_field => "message"
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
}
# 监控Tomcat
仅仅列了filter配置块,input与output参考上一个配置
filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
remove_field => "message"
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
}
input {
file {
path => "/var/log/messages" #系统日志
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/yun.log" #java异常日志
type => "es-error"
start_position => "beginning"
codec => multiline { #多行优化处理
pattern => "^\["
negate => true
what => "previous"
}
}
}
filter{
if "start" in [message]{ --message就是指原始消息
grok{
match => xxxxxxxxx
}
}else if "complete" in [message]{
grok{
xxxxxxxxxx
}
}else{
grok{
xxxxxxx
}
}
}
====================
配置解析:
Logstash 分为 Input、Output、Filter、Codec 等多种plugins。
Input:数据的输入源也支持多种插件,如elk官网的beats、file、graphite、http、kafka、redis、exec等等等、、、
Output:数据的输出目的也支持多种插件,如本文的elasticsearch,当然这可能也是最常用的一种输出。以及exec、stdout终端、graphite、http、zabbix、nagios、redmine等等、、、
Filter:使用过滤器根据日志事件的特征,对数据事件进行处理过滤后,在输出。支持grok、date、geoip、mutate、ruby、json、kv、csv、checksum、dns、drop、xml等等、、
Codec:编码插件,改变事件数据的表示方式,它可以作为对输入或输出运行该过滤。和其它产品结合,如rubydebug、graphite、fluent、nmap等等。
具体以上插件的细节可以去官网,介绍的挺详细的。下面说下该篇中的配置文件的含义:
input段:
file:使用file 作为输入源
path: 日志的路径,支持/var/log*.log,及[ “/var/log/messages”, “/var/log/*.log” ] 格式
start_position: 从文件的开始读取事件。另外还有end参数
ignore_older: 忽略早于24小时(默认值86400)的日志,设为0,即关闭该功能,以防止文件中的事件由于是早期的被logstash所忽略。
filter段:
grok:数据结构化转换工具
match:匹配条件格式,将nginx日志作为message变量,并应用grok条件NGINXACCESS进行转换
geoip:该过滤器从geoip中匹配ip字段,显示该ip的地理位置
source:ip来源字段,这里我们选择的是日志文件中的最后一个字段,如果你的是默认的nginx日志,选择第一个字段即可(注:这里写的字段是/opt/logstash/patterns/nginx 里面定义转换后的)
target:指定插入的logstash字断目标存储为geoip
database:geoip数据库的存放路径
add_field: 增加的字段,坐标经度
add_field: 增加的字段,坐标纬度
mutate: 数据的修改、删除、类型转换
convert: 将坐标转为float类型
convert: http的响应代码字段转换成 int
convert: http的传输字节转换成int
replace: 替换一个字段
remove_field: 移除message 的内容,因为数据已经过滤了一份,这里不必在用到该字段了。不然会相当于存两份
date: 时间处理,该插件很实用,主要是用你日志文件中事件的事件来对timestamp进行转换,导入老的数据必备!在这里曾让我困惑了很久哦。别再掉坑了
match:匹配到timestamp字段后,修改格式为dd/MMM/yyyy:HH:mm:ss Z
mutate:数据修改
remove_field: 移除timestamp字段。
output段:
elasticsearch:输出到es中
host: es的主机ip+端口或者es 的FQDN+端口
index: 为日志创建索引logstash-nginx-access-*,这里也就是kibana那里添加索引时的名称
GeoIP过滤器的版本4.0.0和更高版本使用MaxMind GeoLite2数据库并支持IPv4和IPv6查找。 4.0.0之前的版本使用传统的MaxMind GeoLite数据库,仅支持IPv4查找。
====================
https://grafana.com/grafana/dashboards/11119
https://www.linuxidc.com/Linux/2017-12/149811.htm
https://github.com/weixinqing/Logstash-example
https://www.jianshu.com/p/7aa55172c3e2
https://www.jianshu.com/p/d889aae7c72e