本文是继上篇《ELK 综合日志归档分析系统(1)-Elasticsearch安装配置》之后的第二篇,将详细介绍ELK之Logstash安装配置,关于基础环境配置,请参考第一篇文章。
1.Logstash介绍
Logstash 项目诞生于2009,她是一款非常优秀的日志收集处理框架,主要用于收集、过滤、分析服务器的日志。
2.logstash安装
Ubuntu(APT)在线安装
$ wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - $ echo "deb https://packages.elastic.co/logstash/2.3/debian stable main" | sudo tee -a /etc/apt/sources.list $ sudo apt-get update && sudo apt-get install logstash
RedHat/CentOS(YUM)在线安装
$ rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch $ vim /etc/yum.repos.d/logstash-23.repos [logstash-2.3] name=Logstash repository for 2.3.x packages baseurl=https://packages.elastic.co/logstash/2.3/centos gpgcheck=1 gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 $ yum install logstash
3. logstash-Indexer配置
通过Logstash建立中心日志收集服务,并启动监听TCP端口,用于接收服务器发送过来的日志,并将日志暂存到Redis中.
logstash安装完毕后,默认的配置文件目录在/etc/logstash/conf.d,由于前端日志采集的服务器包括了Windows、Linux。我们建议在Linux系统下使用filebeat,Windows系统使用nxlog来作为Shipper。在日志传输的过程中,采用SSL对数据流量进行加密处理。
首先创建用于Filebeat接收日志的配置文件,这里需要使用beats插件,并启用SSL传输加密。
$ vim 01-filebeat-input.conf
input {
beats {
#监听端口
port => 5044
#启用ssl
ssl => true
ssl_certificate_authorities => ["/etc/logstash/pki/ca.crt"]
ssl_certificate => "/etc/logstash/pki/elk.wanglijie.cn.crt"
ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key"
ssl_verify_mode => "force_peer"
}
}
配置Logstash TCP Input,用于收集Nxlog发送过来的日志
$ vim 02-nxlog-input.conf
input {
tcp {
port => 5002
codec => "json"
ssl_extra_chain_certs => ["/etc/logstash/pki/ca.crt"]
ssl_cert => "/etc/logstash/pki/elk.wanglijie.cn.cn.crt"
ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key"
ssl_enable => true
ssl_verify => false
}
}
将采集的日志数据,发送的Redis中进行暂存.
$ vim 99-output.conf
output {
redis { host => "127.0.0.1" data_type => "list" key => "logstash" password=>"GZcY*****Vm" }
# stdout { codec => rubydebug }
}
4.Logstash Center配置
在Logstash indexer中,仅用来接收日志文件,并提交到Redis中,然后通过Logstash Center从Redis中获取接收的日志,并对日志进行过滤后,存入Elasticsearch集群.
Logstash安装请参考上文。这里主要对Center日志处理,分词等配置进行解析。
从Redis中取出日志
这里使用了spiped将处理外网的Redis映射到如本地的7480端口.
$ vim 01-input-cloud-redis.conf
input {
redis {
host=> "127.0.0.1"
data_type => "list"
key => "logstash"
codec => json
port => 7480
password=>"GZcY*****Vm"
}
}
使用GeoIP分析IP地理位置
$ vim 02-geoip.conf
filter{
geoip {
source => "ip"
target => "geoip"
database => "/usr/share/GeoIP/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
Linux syslog日志分析
$ vim 10-syslog.conf
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
#match => { "message" => "%{SYSLOGLINE}" }
#add_field => [ "received_at", "%{@timestamp}" ]
#add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
if [type] == "syslog_cron" {
grok {
match => { "message" => "%{CRONLOG}" }
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
if [type] == "syslog_pamsession" {
grok {
match => { "message" => "%{SYSLOGPAMSESSION}" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
Apache日志文件处理
$ vim 11-apache-log.conf
filter {
if [type] == "apache_access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
target => "geoip"
database => "/usr/share/GeoIP/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
if [type] == "apache_error" {
grok {
patterns_dir => ["/etc/logstash/patterns.d/"]
#match => { "message" => "%{APACHEERRORLOG}" }
match => { "message" => "%{HTTPD_ERRORLOG}"}
overwrite => ["message"]
}
}
}
Tomcat日志处理
$ vim 12-tomcat-log.conf
filter {
if [type] == "tomcat_catalina" and [message] !~ /(.+)/ {
drop { }
}
if [type] == "tomcat_catalina" and "multiline" in [tags] {
grok {
match => [ "message", "%{JAVASTACKTRACEPART}" ]
}
}
if [type] == "tomcat_access" {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
#Use GeoIP Locate the IP geographical location
geoip {
source => "clientip"
target => "geoip"
database => "/usr/share/GeoIP/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
}
Windows 日志分析与处理
$ vim 20-windows-event-log-filter.conf
filter {
if [type] == "WindowsEventLog" {
mutate {
lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
}
mutate {
rename => [ "Hostname", "source_host" ]
}
mutate {
gsub => ["source_host","\.example\.com",""]
}
date {
match => [ "EventTime", "YYYY-MM-dd HH:mm:ss +0800" ]
timezone => "UTC"
}
mutate {
rename => [ "Severity", "eventlog_severity" ]
rename => [ "SeverityValue", "eventlog_severity_code" ]
rename => [ "Channel", "eventlog_channel" ]
rename => [ "SourceName", "eventlog_program" ]
rename => [ "SourceModuleName", "nxlog_input" ]
rename => [ "Category", "eventlog_category" ]
rename => [ "EventID", "eventlog_id" ]
rename => [ "RecordNumber", "eventlog_record_number" ]
rename => [ "ProcessID", "eventlog_pid" ]
}
if [SubjectUserName] =~ "." {
mutate {
replace => [ "AccountName", "%{SubjectUserName}" ]
}
}
if [TargetUserName] =~ "." {
mutate {
replace => [ "AccountName", "%{TargetUserName}" ]
}
}
if [FileName] =~ "." {
mutate {
replace => [ "eventlog_channel", "%{FileName}" ]
}
}
mutate {
lowercase => [ "AccountName", "eventlog_channel" ]
}
mutate {
remove_field => [ "SourceModuleType", "EventTimeWritten", "EventReceivedTime", "EventType" ]
}
}
}
IIS日志访问日志处理
这里需要配置Nxlog自定义日志收集时的字段。
$ vim 21-filter-iis.conf
filter {
if [SourceName] == "IIS" {
if [message] =~ "^#" {
drop {}
}
useragent {
add_tag => [ "UA" ]
source => "csUser-Agent"
}
if "UA" in [tags] {
mutate {
rename => [ "name", "browser_name" ]
}
}
mutate {
rename => [ "s-ip","serverip"]
rename => [ "cs-method","method" ]
rename => [ "cs-uri-stem","request"]
rename => [ "cs-uri-query","uri_query" ]
rename => [ "s-port","port"]
rename => [ "cs-username","username" ]
rename => [ "c-ip","clientip"]
rename => [ "cs-Referer","referer"]
rename => [ "sc-status","response" ]
rename => [ "sc-substatus","substatus"]
rename => [ "sc-win32-status","win32-status"]
rename => [ "timetaken","time_request" ]
}
geoip {
source => "clientip"
target => "geoip"
database => "/usr/share/GeoIP/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
remove_field => [
"SourceModuleType",
"cs-Referer",
"cs-uri-query",
"cs-username",
"csUser-Agent",
"EventReceivedTime"
]
}
}
}
将日志输出并存入ElasticSearch集群
$ vim 30-lumberjack-output.conf
output {
elasticsearch {
hosts => ["10.112.49.38:9200","10.112.49.99:9200","10.112.49.169:9200"]
codec => "json"
#sniffing => true
}
#stdout {
# codec => rubydebug
#}
}
配置完毕后,重启Logstash,并检查日志文件是否有错误。
转载请注明:自动化运维 » ELK 综合日志归档分析系统(2)-Logstash安装配置