集中式日志监控系统

为什么需要日志管理系统

日志,对于任何系统都是重要的组成,作为程序猿,定位问题,查看系统的健康运载情况,都需要通过查询日志进行分析。

合理的软件架构往往都不会是单点的,即使在同一台应用服务器上,日志有不同的种类,nginx访问日志,操作系统,应用服务,业务逻辑等等。

没有日志管理系统时,我们如何分析日志:

tail,cat,grep,sed ,awk ,wc… 显然不可能登录到每一台应用服务器上敲命令。

于是建立一套集中式的方法,把不同来源的数据集中整合到一起,方便归纳分析,就成为解决以上痛点的方式方法。

ELK 简介

ELK 是 Elasticsearch、Logstash 和 Kibana 三种软件产品的首字母缩写。这三者都是开源软件,通常配合使用,而且又先后归于 Elastic.co 公司名下,所以被简称为 ELK Stack,目前ELK Stack 已经成为最流行的集中式日志解决方案。

Elasticsearch

Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析。它是一个建立在全文搜索引擎 Apache Lucene 基础上的搜索引擎,使用 Java 语言编写

主要特点

  • 实时分析
  • 分布式实时文件存储,并将每一个字段都编入索引
  • 文档导向,所有的对象全部是文档
  • 高可用性,易扩展,支持集群(Cluster)、分片和复制(Shards 和 Replicas)。
  • 支持 JSON

Logstash

Logstash 是一个具有实时渠道能力的数据收集引擎。使用 JRuby 语言编写

主要特点

  • 几乎可以访问任何数据
  • 可以和多种外部应用结合
  • 支持弹性扩展

它由三个主要部分组成

  • Shipper-发送日志数据
  • Broker-收集数据,缺省内置 Redis
  • Indexer-数据写入

Kibana

Kibana 是一款基于 Apache 开源协议,为 Elasticsearch 提供分析和可视化的 Web 平台。它可以在 Elasticsearch 的索引中查找,交互数据,并生成各种维度的表图。

Elastic.co 在2016-10-27 发布了 Elastic Stack 5.0 以后目前更新的步伐还是很快的,并且Elastic.co 对部分好用插件开始收费。

整体架构图

我在对ELK有了一定了解探索后,决定先使用 ELK 5 之前比较成熟的方案做一次搭建,尝试及进一步熟悉了解集中式日志监控系统,也方便应用到我们的测试及生成环境中。

各模块版本

  • elasticsearch-2.4.4
  • logstash-2.3.4
  • kibana-4.6.3

另外还引入消息队列

  • zookeeper-3.4.9
  • kafka_2.11-0.10.2.0

Logstash 作为日志收集端,比较消耗 CPU 和内存资源,从Elastic.co的官网找到了更好的替代方案:Beats组件

Beats 作为日志shipper

  • Packetbeat(网络数据);
  • Metricbeat(从系统和服务收集指标。从CPU到内存,Redis到Nginx等等,Metricbeat是一种轻量级的方式来发送系统和服务统计信息);
  • Filebeat(日志文件);
  • Winlogbeat(搜集 Windows 事件日志数据)。
  • Heartbeat (使用主动探测监视服务的可用性。给出一个URL列表,Heartbeat询问一个简单的问题:你活着吗?Heartbeat将此信息和响应时间发送到弹性堆栈的其余部分进行进一步分析)

Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储

相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计,这样解决了Logstash 在各服务器节点上占用系统资源高的问题。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。

部署图

部署过程

简单把部署过程及配置文件进行记录,方便后续回顾及优化

相关环境:

部署平台/环境:

  • linux centos7.2
  • jdk1.8.0_74

1.部署Elasticsearch集群

我从官网下载了 elasticsearch-2.4.4.tar.gz

解压缩tar.tz

    cd /usr/local/elasticsearch
    tar -zxvf elasticsearch-2.4.4.tar.gz

修改配置文件

    cd /usr/local/elasticsearch/elasticsearch-2.4.4/config
    vim elasticsearch.yml

配置文件关键配置说明:

部分配置已经在部署图中有所说明,这里再补充一些字段说明:

 	#配置es的集群名称,不同的集群用名字来区分,es会自动发现在同一网段下的es,配置成相同集群名字的各个节点形成一个集群。如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
    cluster.name: blu-es 

      #节点名称,es启动时会自动创建节点名称,自己配置下更方便维护吧
    node.name: es01
    
     #是否作为主节点
    node.master: true
    
    #是否存储数据 
    node.data: false

    # 默认情况下,ElasticSearch使用0.0.0.0地址,并为http传输开启9200-9300端口,为节点到节点的通信开启9300-9400端口,可以自行设置IP地址
    network.host: 我设置为内网的IP了

    # 输监听定制端口
    http.port: 9200

    # 数据文件存储路径 此路径要创建出来
    path.data: /home/elk/data

    # 日志文件存储路径,此路径要创建出来
    path.logs: /var/log/elasticsearch

ES环境启动

  • elasticsearch默认是不支持用root用户来启动的。

解决方案:

  • 1.新建专门的用户用来管理elasticsearch,线上环境确实也不建议使用root用户

  • 2.启动时,追加 Des.insecure.allow.root=true

      在 `/usr/local/elasticsearch/elasticsearch-2.4.4/bin/elasticsearch` 中增加 ES_JAVA_OPTS="-Des.insecure.allow.root=true"
    

启动ES环境

    cd /usr/local/elasticsearch/elasticsearch-2.4.4/bin
    sh ./elasticsearch -d
  • -d表示后台启动

Elasticsearch插件安装

在熟悉Elasticsearch过程中,了解到有许多不错的插件,这里把简单记录下插件的安装方法

  • 通过plugin 命令进行安装

      #head 方便对es进行各种操作的客户端,可以查看各个索引的数据量以及分片的状态,
      /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
      #kopf es的管理工具,也提供了对ES集群操作的API。
      /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf
      #bigdesk 监控es状态的插
      /usr/share/elasticsearch/bin/plugin install hlstudio/bigdesk
    
  • 还可以到github上查找插件的源码进行手动安装,安装细节不再赘述。

  • 访问插件: /_plugin/插件名称

      如访问head
      http://ip:9200/_plugin/head/
    

Elasticsearch集群配置

主要是针对 elasticsearch.yml的配置,上述步骤已经对该文件部分关键字段说明,我在测试环境使用了3台做集群配置,通过head插件可以查看集群的状态:

ES 集群搭建OK~

2.部署kafka集群

Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。

  • Zookeeper通过复制来实现高可用性,只要集群中半数以上的节点处于可用状态,它就能够保证服务继续。所以搭建集群的服务器台数应该为(2*n+1)台。

1) Zookeeper的安装配置

上述说明zookeeper集群必须保证3台以上的服务器,我这里搭建3台zookeeper服务器

创建myid文件
  • myid 为服务器编号,用于标识服务器,这个值必须和dataDir目录下myid文件中的值保证一致
服务IP myid
10.11.1.11 11
10.11.1.12 12
10.11.1.13 13
为每台机器创建myid文件
# 10.11.1.11
echo 11 >/home/zookeeper/data/myid
# 10.11.1.12
echo 12 >/home/zookeeper/data/myid
# 10.11.1.13
echo 13 >/home/zookeeper/data/myid
我从官网下载了 zookeeper-3.4.9.tar.gz
解压缩tar.tz
    cd /usr/local/zookeeper
    tar -zxvf zookeeper-3.4.9.tar.gz
修改配置文件
    cd /usr/local/zookeeper/zookeeper-3.4.9/conf
    cp zoo_sample.cfg zoo.cfg
    vim zoo.cfg


    # 这个时间是作为Zk服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime 
    tickTime=2000
    # 此配置表示,允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
    initLimit=10
    # 此配置项表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
    syncLimit=5
    # 数据的存放路径
    dataDir=/home/zookeeper/data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # 最大的并发连接数限制,设置为0或者不设置该参数,表示不进行连接数的限制。
    #maxClientCnxns=60
    
    # 集群模式的配置参数
    # 第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
    server.11=10.11.1.11:2888:3888
    server.12=10.11.1.12:2888:3888
    server.13=10.11.1.13:2888:3888
    
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1		

每台的zoo.cfg的配置相同,复制到每一台即可

启动zookeeper环境
在bin目录下执行
nohup ./zkServer.sh start &
查看状态
./zkServer.sh status
Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower 
或者 Mode: leader 

2) kafka的安装配置

我从kafka的官网下载了 kafka_2.11-0.10.2.0.tgz
解压缩tar.tz
    cd /usr/local/kafka
    tar -zxvf kafka_2.11-0.10.2.0.tgz
修改配置文件
    vim /usr/local/kafka/kafka_2.11-0.10.2.0/config/server.properties

配置文件主要参数说明

    #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    broker.id=1  
    
    #当前kafka对外提供服务的端口默认是9092
    port=9092 #不配置的话,默认为9092
    
    #这个是borker进行网络处理的线程数
    num.network.threads=3 
    
    #这个是borker进行I/O处理的线程数
    num.io.threads=8 
    
    #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    log.dirs=/usr/local/kafka/kafka_2.11-0.10.2.0/logs 
    
    
    #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.send.buffer.bytes=102400 
    
    
    #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.receive.buffer.bytes=102400 
    
    #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    socket.request.max.bytes=104857600 
    
    #默认的分区数,一个topic默认1个分区数
    num.partitions=6
    
    #默认消息的最大持久化时间(小时)
    log.retention.hours=60 
    
    #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.segment.bytes=1073741824
    
    #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.retention.check.interval.ms=300000 
    
    #设置zookeeper的连接端口
    zookeeper.connect=10.11.1.11:2181,10.11.1.12:2181,10.11.1.13:2181
    
    指定客户端连接zookeeper的最大超时时间
    zookeeper.connection.timeout.ms=6000
  • 另外2台的配置只是需要修改broker.id即可,3台服务器保证broker.id不相同。
配置主机名对应IP的解析

3台配置相同

    vim /etc/hosts
     
    10.11.1.11 server1
    10.11.1.12 server2
    10.11.1.13 server3
启动kafka环境
    nohup ./kafka-server-start.sh ../config/server.properties &

kafka集群搭建OK

3.部署logstash服务

在此架构中,logstash担任两种角色,也处于不同的层次

  • 对日志进行格式化等处理,对接转存到kafka集群中。
  • 作为(消费者)从kafka集群中拉取日志消息。同步到ES集群。

部署日志处理层的logstash

这里用到了 GeoLite, 可用于转换IP,变成地理位置信息。

从GeoLite官网下载 GeoLiteCity.dat.gz
从elastic官网下载 logstash-2.3.4.tar.gz
解压logstash-2.3.4.tar.gz
    cd /usr/local/logstash
    tar -zxvf logstash-2.3.4.tar.gz
解压GeoLiteCity.dat.gz
    cd /usr/local/logstash
    tar -zxvf GeoLiteCity.dat.gz
编辑获取日志并输出到kafka的配置文件

vim logstash_in_kafka.conf

    # 用于接收Beats组件传送的日志信息
    input {
        beats {
        port => 5044
        codec => "json"
    }
    }
    
    # 过滤日志内容,这里判断nginx日志时,增加ip转换的内容  
    filter {
        if [type] == "nginxacclog" {
     
        geoip {
            source => "clientip"
            target => "geoip"
            database => "/usr/local/logstash/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]","%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]","%{[geoip][latitude]}" ]
    }
    
        mutate {
            convert => [ "[geoip][coordinates]","float" ]
     
    }
    }
    }

    # 输出到kafka中
    output {
      kafka {
        workers => 2
        bootstrap_servers => "10.11.1.11:9092,10.11.1.12:9092,10.11.1.13:9092"
        topic_id => "peiyinlog"
    }
    }
  • workers:用于写入时的工作线程
  • bootstrap_servers:指定可用的kafka broker实例列表
  • topic_id:指定topic名称,可以在写入前手动在broker创建定义好分片数和副本数,也可以不提前创建,那么在logstash写入时会自动创建
  • topic,分片数和副本数则默认为broker配置文件中设置的。
启动logstash
    nohup ./logstash agent -f logstash_in_kafka.conf &

部署作为consumer的logstash

同样解压logstash-2.3.4.tar.gz,只是这里的配置文件不同

编辑从kafka获取日志内容,传输到ES集群的配置文件

vim kafka_to_es.conf

    # 从kafka获取日志内容
    input{
        kafka {
            zk_connect => "10.11.1.11:2181,10.11.1.12:2181,10.11.1.13:2181"
            group_id => "logstash"
            topic_id => "peiyinlog"
            reset_beginning => false
            consumer_threads => 50
            decorate_events => true
     
    }
     
    }
     
    # 删除一些不需要的字段  
    filter {
      if [type] == "nginxacclog" {
     
         mutate {
         remove_field => ["slbip","kafka","domain","serverip","url","@version","offset","input_type","count","source","fields","beat.hostname","host","tags"]
        }
    }
     
    }
     
    # 输出日志到ES集群
    output {
        if [type] == "nginxacclog" {
           # stdout {codec => rubydebug }
            elasticsearch {
                hosts => ["x.x.x.x:9200","x.x.x.x:9200"]
                index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
                manage_template => true
                flush_size => 50000
                idle_flush_time => 10
                workers => 2
    }
     
    }
        if [type] == "messages" {
            elasticsearch {
                hosts => ["x.x.x.x:9200","x.x.x.x:9200"]
                index => "logstash-messages-%{+YYYY.MM.dd}"
                manage_template => true
                flush_size => 50000
                idle_flush_time => 30
                workers => 1
    }
     
    }
     
    }
启动logstash
    nohup ./logstash agent -f kafka_to_es.conf &

4.部署日志采集程序Filebeat

上述已说明使用beats组件作为日志采集程序,这里只使用了filebeat组件收集我们测试环境上的nginx日志centos操作系统日志,并传输到logstash中。

从elastic官网下载 filebeat-1.2.3-x86_64.tar.gz

解压logstash-2.3.4.tar.gz

    cd /usr/local/filebeat
    tar -zxvf ogstash-2.3.4.tar.gz

配置filebeat.yml 文件

    ################### Filebeat Configuration Example #########################
     
    ############################# Filebeat ######################################
     
    filebeat:
      prospectors:
        -
          paths:
            - /var/log/messages
     
          input_type: log
           
          document_type: messages
     
        -
          paths:
            - /var/log/nginx/access.log
           
          input_type: log
     
          document_type: nginxacclog
         
     
          multiline: 
              pattern: '^[[:space:]]'
              negate: true
              match: after
     
      registry_file: /var/lib/filebeat/registry
     
       
    ############################# Output ##########################################
       
    output:
      logstash: 
        hosts: ["x.x.x.x:5044","x.x.x.x:5044"]
       
     
    ############################# Shipper #########################################
       
    shipper: 
      name: "blu_test"
       
       
    ############################# Logging ######################################### 
       
    logging:  
      files:
        rotateeverybytes: 10485760 # = 10MB

这里把nginx的access日志源改为json格式,方便后续处理

    log_format json '{ "@timestamp":"$time_local",'
             '"clientip":"$remote_addr",'
             '"remote_user": "$remote_user", '
             '"http_x_forwarded_for":"$http_x_forwarded_for",'
             '"serverip":"$server_addr",'
             '"size":$body_bytes_sent,'
             '"request_time":$request_time,'
             '"domain":"$host",'
             '"request": "$request", '
             '"method":"$request_method",'
             '"requesturi":"$request_uri",'
             '"url":"$uri",'
             '"appversion":"$HTTP_APP_VERSION",'
             '"referer":"$http_referer",'
             '"agent":"$http_user_agent",'
             '"status":"$status"}';

重启nginx服务

    nginx -s reload

启动 filebeat

    cd /usr/local/filebeat/filebeat-1.2.3-x86_64
    nohup ./filebeat start &

5.安装配置kibana

安装kibana

kibana最早是为了代替logstash-web 用来查看 ES 中的数据,用PHP编写的web

k2 是k1作者使用ruby重写

K3 是纯前端框架搭建,使用angularjs 编写

K4 是使用node.js编写

目前最新的版本已经是K5

本次部署依然选用了比较成熟的k4,安装也相对比较简单

从elastic官网下载 kibana-4.6.3-linux-x86_64.tar.gz
解压kibana-4.6.3-linux-x86_64.tar.gz
    cd /usr/local/kibana
    tar -zxvf kibana-4.6.3-linux-x86_64.tar.gz
配置kibana.yml 文件
    cd /usr/local/kibana/kibana-4.6.3-linux-x86_64/config
    vim kibana.yml

一堆参数,只需修改这3个,即可启动

    # Kibana is served by a back end server. This controls which port to use.
    server.port: 5601
    
    # The host to bind the server to.
    server.host: "0.0.0.0"
    
    # The Elasticsearch instance to use for all your queries.
    elasticsearch.url: "http://IP:9200"
启动kibana
    nohup ./kibana &

访问kibana,配置日志索引

  • 访问地址: http://IP:5601

默认情况下,Kibana 认为你要访问的是通过 Logstash 导入 Elasticsearch 的数据。这时候你可以用默认的 logstash-* 作为你的 index pattern。通配符(*) 匹配索引名中零到多个字符。如果你的 Elasticsearch 索引有其他命名约定,输入合适的 pattern。pattern 也开始是最简单的单个索引的名字。

如果一个新索引是定期生成,而且索引名中带有时间戳,选择 Use event times to create index names 选项,然后再选择 Index pattern interval。这可以提高搜索性能,Kibana 会至搜索你指定的时间范围内的索引。在你用 Logstash 输出数据给 Elasticsearch 的情况下尤其有效。

点击 Create 添加 index pattern。第一个被添加的 pattern 会自动被设置为默认值。如果你有多个 index pattern 的时候,你可以在 Settings > Indices 里设置具体哪个是默认值。

kibana简单介绍

简单介绍下kibana的三个模块

Discover

可以从 Discover 页面以交互方式探索日志数据,可以过滤搜索结果以及查看文档数据。 还可以查看与搜索查询匹配的文档数,并获取字段值统计信息,可以配置时间字段,则日志随时间的分布将显示在页面顶部的直方图中。

Visualize

Visualize 用来构建显示相关可视化的仪表板.Kibana可视化基于Elasticsearch查询。 通过使用一系列Elasticsearch聚合来提取和处理数据

Dashboard

Kibana仪表板显示已保存的可视化对象的集合。 可以根据需要安排和调整可视化对象,并保存仪表板,以便重新加载和共享。

我在Visualize中制作了一些简单的统计,绘制了一个Dashboard

目前对Kibana及ES尚未深入了解,后续有一定掌握之后再做总结分享。

后续优化:

  • 部署的的相关应用统一管理等
  • ES 调优
  • kafka集群调优
  • kibana扩展
  • kibana接入到nginx中
  • 等等

参考资料: