Elasticstack 核心

  • Elasticsearch: 准实时索引
  • Logstash: 收集数据,配置使用 Ruby DSL
  • Kibana 展示数据,查询聚合,生成报表
  • Kafka 消息队列,做为日志接入的缓冲区

流程

  • 集中汇总日志(Shipper, Broker, Indexer)
  • 保存并索引日志(Search & Storage)
  • 在需要的时候汇总查询(Web Interface)

可视化分析流程

安装

  • 系统: Ubuntu Server 14.04.4
  • JDK: 1.8.0_111
  • filebeat: 5.1.1
  • logstash: 5.1.1
  • elasticsearch: 5.1.1
  • kibana: 5.1.1
  • kafka: 0.10.1.0

安装 JDK

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt update
sudo apt install openjdk-8-jre
java -version
# openjdk version "1.8.0_111"

# or
sudo dd-apt-repository -y ppa:webupd8team/java
sudo apt update
sudo apt -y install oracle-java8-installer

使用包管理工具安装Elasticstack

本文将默认使用包管理工具安装

Elasticsearch

# 导入PGP Key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
# 从APT库安装
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-5.x.list
sudo apt-get update
sudo apt-get install elasticsearch

# 查看系统使用的什么服务管理器
ps -p 1

# 使用SysV init运行
# 设置自启动
sudo update-rc.d elasticsearch defaults 95 10
# 运行与停止
sudo service elasticsearch start
sudo service elasticsearch stop

# 使用systemd运行
# 设置自启动
sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable elasticsearch.service
# 运行与停止
sudo systemctl start elasticsearch.service
sudo systemctl stop elasticsearch.service

# 配置文件: /etc/elasticsearch/elasticsearch.yml
# log文件: /var/log/elasticsearch/

Kibana

# 前面已加导入GPG Key和加入APT源,所以这里直接安装
sudo apt-get install kibana

# 使用SysV init运行
# 自启动
sudo update-rc.d kibana defaults 96 9
# 运行与停止
sudo service kibana start
sudo service kibana stop

# 使用systemd运行
# 设置自启动
sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable kibana.service
# 运行与停止
sudo systemctl start kibana.service
sudo systemctl stop kibana.service

# 配置文件: /etc/kibana/kibana.yml
# log文件: /var/log/kibana/

Logstash

sudo apt-get install logstash
# 运行
sudo service logstash start

# 配置文件: /etc/logstash/logstash.yml
# log文件: /var/log/logstash/

源码安装运行Elasticstack(可选)

mkdir elasticstack && cd elasticstack
# 下载 elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.1.tar.gz
# 下载 logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.1.1.tar.gz
# 下载 kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-5.1.1-linux-x86_64.tar.gz

tar xvf elasticsearch-5.1.1.tar.gz
tar xvf logstash-5.1.1.tar.gz
tar xvf kibana-5.1.1-linux-x86_64.tar.gz

# 运行 elasticsearch
cd elasticsearch-5.1.1/bin; ./elasticsearch

# 运行 kibana
cd kibana-5.1.1-linux-x86_64/bin; ./kibana
# 访问 http://localhost:5601 访问kibana控制台
# ubuntu server中需要修改 kibana-5.1.1-linux-x86_64/config/kibana.yml
# 修改 server.host 后的值为 0.0.0.0
# 才可以用虚拟机的ip在外面访问

# 运行并测试logstash
cd logstash-5.1.1/
bin/logstash -e 'input { stdin { } } output { stdout { } }'
# 随便输入内容,输出为:
junnan.org
2016-12-21T06:04:48.197Z junnan.org

使用Elashticsearch索引数据

Elasticsearch 保存了整个文档(document)并对内容进行索引,便于用户进行搜索、排序和过滤等操作,展示文档使用JSON格式,如:

{
  "_index" : "customer",
  "_type" : "external",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : { "name": "John Doe" }
}

Elasticsearch中存储数据的行为称为索引(indexing),而前面提到的文档,属于一种类型(type),类型会存在索引(index)中,与传统数据库的比较如下:

关系型数据库 Elasticsearch
Databases Indices
Tables Types
Rows Documents
Columns Fields

一个Elasticsearch集群可以包含多个索引(indices),每个索引可以包含多个类型(types),每个类型可以包含多个文档(document),每个文档可以包含多个字段(fields)。

Elasticsearch和Lucene使用倒排索引的数据结构来完成索引(传统数据库使用红黑树或B树来完成)。默认情况下,文档的每个字段都会有对应的倒排索引,Elasticsearch也是用这个来进行检索的。

测试

以建立用户为例,通过Kibana的控制台里的Dev Tools,在console中输入如下数据来为每个用户的文档(document)建立索引,每个文档包含一个员工的姓名和年龄信息,类型为user。

PUT /users/user/1
{
    "name": "user1",
    "age": 29,
    "interests": ["art", "drive"]
}

PUT /users/user/2
{
    "name": "user2",
    "age": 31,
    "interests": ["game", "music"]
}

PUT /users/user/3
{
    "name": "user3",
    "age": 34,
    "interests": ["game", "drive"]
}

查询用户

GET /users/user/_search?q=name:user1

结果如下

{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": "users",
        "_type": "user",
        "_id": "1",
        "_score": 0.2876821,
        "_source": {
          "name": "user1",
          "age": 29,
          "interests": [
            "art",
            "drive"
          ]
        }
      }
    ]
  }
}

Elasticsearch根据相关性来对结果进行排序,结果中的_score为相关性分数。

除了简单的搜索,Elasticsearch还支持用DSL语句组合更复杂的搜索,如过滤、组合、全文、短语等等。还可以利用“聚合”来实现关系型数据库类似“GROUP BY“的操作,也支持推荐、定位、渗透、模糊及部分匹配。

使用Logstash收集日志

Logstash是用JRuby编写,使用基于消息的简单加架构,在JVM上运行,包含三大模块:

  • Collect: 数据输入(input)
  • Enrich: 数据处理(filter)
  • Transport: 数据输出(output)

Logstash利用名为FileWatch的Ruby Gem库来库见监听文件变化,这个库支持展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。通过记录下来的 inode, major number, minor number 和 pos 就可以保证不漏过每一条日志。

使用包管理器安装后,配置文件将统一放在 /etc/logstash/conf.d/ 中,logstash列出目录下所有文件时是字母排序的,而配置段的filter和output都是顺序执行,所以推荐采用编号方式来命名配置文件。

收集系统日志的配置文件

# 01_syslog.conf
input {
  file {
    # 确定需要检测的文件
    path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog", "/var/log/apt", "/var/log/fsck", "/var/log/faillog"]
    # 日志类型
    type => "syslog"
  }
}

output {
  # 输出到命令行,一般用于调试
  stdout {
    codec => rubydebug
  }
  # 输出到 elasticsearch,这里指定索引名为 system-log
  elasticsearch {
    hosts => "localhost:9200"
    index => "system-log"
  }
}
# 这里不使用service来重启服务,而是采用手动启动的方式来测试配置
cd /usr/share/logstash/
sudo ./bin/logstash -f /etc/logstash/conf.d/01_syslog.conf

重新登录一个新的ssh链接到测试主机,会看到之前测试启动的logstash控制台输出如下内容:

{
          "path" => "/var/log/auth.log",
    "@timestamp" => 2016-12-21T08:01:39.870Z,
      "@version" => "1",
          "host" => "ubuntu",
       "message" => "Dec 21 16:01:39 ubuntu sshd[2044]: Received disconnect from 192.168.1.92: 11: disconnected by user",
          "type" => "syslog",
          "tags" => []
}

信息无误,可以退出测试用的logstash,并重启包管理器安装的logstash, sudo service logstash restart

使用kibana查看索引数据

这时候到kibana的控制台->Management->Index Patterns中创建名为system-log的索引,然后就可以到Discover面板中看到数据了。

kibana面板介绍:

  • Discover: 探索数据
  • Visualize: 可视化统计
  • Dashboard: 仪表盘
  • Timelion: 时序
  • Management: 设置
  • Dev Tools: 开发工具,可以方便的测试内置接口

安装x-pack插件

x-pack插件提供如下功能:

  • 安全:用户权限管理
  • 警报:自动报警
  • 监控:监控 Elasticsearch 集群状态
  • 报告:发送报表、导出数据
  • 图表:可视化数据
# 为elasticsearch安装x-pack
cd /usr/share/elasticsearch/bin
sudo ./elasticsearch-plugin install x-pack
# 重启elasticsearch
sudo service elasticsearch restart

# 为kibana安装x-pack
cd /usr/share/kibana/bin
sudo ./kibana-plugin install x-pack
# 重启kibana
sudo service kibana restart

安装完成后,再次访问kibana(http://主机ip:5601),会出现认证界面,默认用户名和密码为: elastic/changem, 登录成功后可以右下角点击用户名来修改密码。

同时从logstash输出到elasticsearch的配置中需要加入认证信息

elasticsearch {
  hosts => "localhost:9200"
  index => "system-log"
  user => "elastic"
  password => "changeme"
}

Kafka消息队列

Kafka是一个分布式、可分区、可复制的消息系统, 在Kafka集群中,没有『中心主节点』的概念,集群中所有的服务器都是对等的,因此可以在不做任何配置更改的情况下对服务器进行添加和删除,同样的消息生产者和消费都也能做到随意重启和机器的上下线。

Kafka相关概念

kafka核心组件工作流程

  • Consumer:用于从Broker中取出/消费Message
  • Producer:用于往Broker中发送/生产Message
  • Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作

以上组件在分布式环境下均可以是多个,支持故障转移。同时ZooKeeper仅和broker和consumer相关。broker的设计是无状态的,消费的状态信息依靠消费者自己维护,通过一个offset偏移量。client和server之间通信采用TCP协议。

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers 可以同时从服务端读取消息,每个消息只被其中一个 consumer 读到;发布-订阅模式中消息被广播到所有的 consumer 中。更常见的是,每个 topic 都有若干数量的 consumer 组,每个组都是一个逻辑上的『订阅者』,为了容错和更好的稳定性,每个组由若干 consumer 组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个 consumer。

kafka的Topic与Partition工作流程

消息是按照主题来提交到Partition当中的。Partition当中的消息是有序的,consumer从一个有序的分区消息队列中顺序获取消息。相关名次定义如下:

  • Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上
  • Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition
  • offset:消息在Partition中的编号,编号顺序不跨Partition

  • 分区目的:Kafka中采用分区的设计有几个目的。一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。第二,分区可以作为并行处理的单元
  • offset:由消费者控制offset,因此分区本身所在broker是无状态的。消费者可以自由控制offset,很灵活
  • 同个分区内有序消费:每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的

消息中间件性能对比

阿里中间件团队对 Kafka、RabbitMQ、RocketMQ三款消息中间件吞吐量的对比。 不断增加发送端的压力,直到系统吞吐量不再上升,而响应时间拉长。这时服务端已出现性能瓶颈,可以获得相应的系统最佳吞吐量。

同步发送性能对比

下载与配置

# 下载页面: https://kafka.apache.org/downloads
wget http://apache.mirrors.lucidnetworks.net/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
# 解压
tar xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0/

zookeeper配置文件为: config/zookeeper.properties kafka配置文件为: config/server.properties

# 将两者的数据存放到统一的位置:
cd
mkdir -p data/kafka/zookeeper
mkdir -p data/kafka/logs

cd kafka_2.11-0.10.1.0
vim config/zookeeper.properties
# zookeeper.properties
dataDir=/home/jonas/data/kafka/zookeeper

vim config/server.properties
# server.properties
log.dirs=/home/jonas/data/kafka/logs
# 修改对外服务地址
advertised.listeners=PLAINTEXT://192.168.1.30:9092
# 允许删除 topic
delete.topic.enable=true
# 设定每个 topic 的分区数量
num.partitions=100
# 设定日志保留的时间
log.retention.hours=72

测试

# 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties
# 创建一个叫users的只有1个分区和一个副本的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic users
# => Created topic "users".
# 查看已创建的topics
bin/kafka-topics.sh --list --zookeeper localhost:2181
# => users

# 创建一个向topic(users)发送消息的producer
bin/kafka-console-producer.sh --broker-list 192.168.1.30:9092 --topic users
# 新建一个窗口用来创建一个从topic(users)读取消息的consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic users --from-beginning

# 回到producer窗口随便输入内容,就会在consumer窗口中看到

使用nginx转发

将所有8080端口的消息转发到mq_pool的三台机器上(负载均衡)

upstream mq_pool{
  server other_ip1:9092 weight=1 max_fails=3 fail_timeout=30s;
  server other_ip2:9092 weight=1 max_fails=3 fail_timeout=30s;
  server localhost:9092 weight=1 max_fails=3 fail_timeout=30s;
}
server{
  listen 8080;
  allow all;
  proxy_pass mq_pool;
  proxy_connect_timeout 24h;
  proxy_timeout 24h;
}

外部测试

这里使用python写个小程序通过主机的ip连接到9092端口来进行发送message测试,当然在设置好nginx后,也可以通过8080端口来测试。

# 安装kafka-python
sudo pip install kafka-python
# main.py
from kafka import KafkaProducer
# 设置 Kafka 地址
producer = KafkaProducer(
    bootstrap_servers='192.168.1.30:9092')
# 向名为users的topic发送消息
producer.send('users', 'Hello World!')

通过执行python main.py查看consumer窗口中有没有看到Hello World!字样。

将kafka做为logstash和elasticsearch的缓冲区

这里使用kafka做为缓冲区的目的是为了防止logstash把日志直接发送给elasticsearch时,万一elasticsearch停掉而导致的数据丢失。

logstash将日志发送给kafka

这里的logstash相当于producer,确保zookeeper和kafka的服务已经启动,并修改之前的logstash收集系统日志的配置文件:

# /etc/logstash/conf.d/01_syslog.conf
input {
  file {
    # 确定需要检测的文件
    path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog", "/var/log/apt", "/var/log/fsck", "/var/log/faillog"]
    # 日志类型
    type => "syslog"
    add_field => { "service" => "system-log"}
    # stat_interval => 1800
  }
}

output {
  # 输出到命令行,一般用于调试
  stdout {
    codec => rubydebug
  }

  # 输出到 elasticsearch,这里指定索引名为 system-log
  # elasticsearch {
  #   hosts => "localhost:9200"
  #   index => "system-log"
  # }

  # 输出到kafka, topic名称为logs
  kafka {
    topic_id => "logs"
    bootstrap_servers => "localhost:9092"
  }
}

新加的内容中 add_field是用来添加一个topic字段,用作以后导入elasticsearch的索引标识,stat_interval指每30分钟进行一次检测,测试的时候不需要。

kafka插件的其他配置项

  • acks 可以选的值为 0, 1, all,这里解释一下,0 表示不需要 server 返回就认为请求已完成;1 表示需要 leader 返回才认为请求完成;all 表示需要所有的服务器返回才认为请求完成
  • batch_size 单位是字节,如果是发送到同一分区,会攒够这个大小才发送一次请求
  • block_on_buffer_full 这个设置在缓冲区慢了之后阻塞还是直接报错
  • buffer_memory 发送给服务器之前的缓冲区大小,单位是字节
  • client_id 可以在这里设定有意义的名字,就不一定要用 ip 和 端口来区分
  • compression_type 压缩方式,默认是 none,其他可选的是 gzip 和 snappy

从kafka中导出数据到elasticsearch

# /etc/logstash/conf.d/02_kafka2es.conf
input {
  kafka {
    bootstrap_servers => "192.168.1.30:9092"
    topics => ["logs"]
  }
}
output {
  # for debugging
  stdout {
     codec => rubydebug
  }

  elasticsearch {
    hosts => "localhost:9200"
    index => "system-log"
    user => "elastic"
    password => "changeme"
  }
}

至此所有配置已准备完成,重新启用各个服务:

# 进入kafka的目录,启动zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

# 重启logstash
sudo service logstash restart

# 重启elasticsearch
sudo service elasticsearch restart
# 重启kibana
sudo service kibana restart

再次新建ssh连接登录和登出主机,查看kibana管理面板生成的数据。

遇到的问题

安装x-pack插件时显示javax.net.ssl.SSLException: java.lang.RuntimeException错误

这时候只需要执行一下 sudo update-ca-certificates -f 来更新证书即可。

其他链接

  • elasticstack - 数据分析与可视化套件
  • kafka - 由LinkedIn公司使用Scala编写的开源消息中间件项目
  • flume - 日志收集,apache基金会项目
  • scribe - 日志收集,facebook项目
  • rabbitMQ - 消息中间件项目
  • rocketMQ - 阿里巴巴参考kafka推出的消息消息中间件项目
  • Lucene - 全文搜索开源库

参考