ELK+Filebeat+Kafka集群部署
LiuSw Lv6

ELK+Filebeat+Kafka集群部署

前言

业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。

消息系统主要功能

1、解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
2、冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3、扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4、灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。
如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5、可恢复性
 系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6、顺序保证
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7、缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8、异步通信
很多时候,用户不想也不需要立即处理消息。
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Redis与Kafka

我们都知道Redis是以key的hash方式来分散对列存储数据的,且Redis作为集群使用时,对应的应用对应一个Redis,在某种程度上会造成数据的倾斜性,从而导致数据的丢失。
而从之前我们部署Kafka集群来看,kafka的一个topic(主题),可以有多个partition(副本),而且是均匀的分布在Kafka集群上,这就不会出现redis那样的数据倾斜性。Kafka同时也具备Redis的冗余机制,像Redis集群如果有一台机器宕掉是很有可能造成数据丢失,而Kafka因为是均匀的分布在集群主机上,即使宕掉一台机器,是不会影响使用。同时Kafka作为一个订阅消息系统,还具备每秒百万级别的高吞吐量,持久性的、分布式的特点等。

架构图

架构图

说明

说明
1、可以使用一台Nginx代理访问kibana的请求;
2、三台es组成es集群,并且在三台es上面都安装kibana;( 以下对elasticsearch简称es ),两台logstash;
3、中间三台服务器就是我的kafka(zookeeper)集群啦; 上面写的 消费者/生产者 这是kafka(zookeeper)中的概念;
4、使用filebeat收集日志(windows linux等)

角色

1.es1+zookeeper+kafka+logstash: 192.168.11.156

2.es2+zookeeper+kafka+logstash: 192.168.11.157

3.es3+zookeeper+kafka: 192.168.11.159

4.kibana: 192.168.11.156

5.filebeat:客户端安装

软件说明

1.es : 7.14.0

2.logstash : 7.14.0

3.kibana : 7.14.0

4.filebeat : 7.14.0

5.zookeeper : 3.4.14

6.kafka : 2.13

7.jdk : 1.8

安装步骤

1、ES集群安装配置;
2、Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);
3、Kafka+zookeeper集群配置;(Logstash写入数据到Kafka消息系统);
4、Kibana部署;
5、filebeat安装;

详细安装步骤

一.安装es集群

1.上传elasticsearch-7.14.0-linux-x86_64.tar.gz安装包到服务器/data下,并安装jdk;

2.解压安装包;

1
2
cd /data
tar -zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz

3.修改服务器配置;

1
2
3
4
5
6
7
8
9
10
# 创建elk用户
useradd elk elk
# 修改elk用户拥有的内存权限至少需要262144;
sysctl -w vm.max_map_count=262144
# 修改 /etc/sysctl.conf文件
vm.max_map_count=262144
# 修改/etc/security/limits.conf
# 在文件末尾添加下面的参数值
* soft nofile 65536
* hard nofile 131072

4.修改es配置文件;

1
2
# 修改相应的选项
vi /data/elasticsearch-7.14.0/config/elasticsearch.yml
elasticsearch.yml
 
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
# 集群名称
cluster.name: elk-boer
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
# 节点名称,不能重复
node.name: elk01
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# 是否能当master
node.master: true
#
# 是否能储存数据
node.data: true
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/elasticsearch-7.14.0/data
# 
#
# Path to log files:
#
#path.logs: /data/elasticsearch-7.14.0/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# By default Elasticsearch is only accessible on localhost. Set a different
# address here to expose this node on the network:
#
#network.host: 192.168.0.1
# 配置可以访问的IP,无其他要求不修改
network.host: 0.0.0.0
#
# By default Elasticsearch listens for HTTP traffic on the first free port it
# finds starting at 9200. Set a specific HTTP port here:
#
# es服务端口号
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
# 集群配置
discovery.seed_hosts: ["elk01", "elk02","elk03"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
# 集群配置
cluster.initial_master_nodes: ["elk01", "elk02","elk03"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
# 跨域
http.cors.enabled: true 
http.cors.allow-origin: "*"

5.启动es;

1
2
3
#!/bin/sh
# 用elk用户 -d参数后台启动,ES_JAVA_OPTS参数为内存的一半
sudo -u elk ES_JAVA_OPTS="-Xms8192m -Xmx8192m" /data/elasticsearch-7.14.0/bin/elasticsearch -d

6.访问IP:9200 ;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@elk01 elasticsearch-7.14.0]# curl http://192.168.11.157:9200
{
"name" : "elk02",
"cluster_name" : "elk-boer",
"cluster_uuid" : "QrvFk9tSQT2qPD7kBHPBLw",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

二.安装zoopeeker

1.安装jdk1.8,并解压zookeeper压缩包;

2.修改/etc/profile;

1
2
3
4
5
6
7
8
vim  /etc/profile

JAVA_HOME=/java/jdk1.8.0_161
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME
export PATH
export CLASSPATH

3.使用source命令使profile文件内容生效 ;

1
2
source  /etc/profile
java -version

4.修改配置文件;

1
2
3
cd /data/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg

5.将 zookeeper集群的三台服务器地址填入,端口默认 ;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# zookeeper1 配置文件实例
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.11.156:2888:3888
server.2=192.168.11.157:2888:3888
server.3=192.168.11.159:2888:3888

6.创建tmp文件夹以及创建myid文件 ;

1
2
3
4
cd  /data/zookeeper/
echo 1 > myid #zookeeper1创建
echo 2 > myid #zookeeper2创建
echo 3 > myid #zookeeper3创建

7.启动所有Zookeeper ;

1
2
cd  /data/zookeeper/bin
./zkServer.sh start

8.注册服务

三.kafka

1.解压kafka_2.13-2.6.0.tgz压缩包;

2.修改配置文件;

1
2
3
4
5
6
7
8
9
10
11
cd  /data/kafka/config
vim server.properties
broker.id=1 #三台服务器id分别为1、2、3

listeners=PLAINTEXT://192.168.11.156:9092 #本服务器地址(端口号默认)

advertised.listeners=PLAINTEXT://192.168.11.156:9092 #本服务器地址(端口号默认)

log.dirs=/data/kafka/logs #log文件位置

zookeeper.connect=192.168.11.156:2181,192.168.11.157:2181,192.168.11.159:2181 #zookeeper服务器集群IP(端口号默认)

3.启动kafka

1
2
cd  /data/kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties #-daemon为后台启动参数

四.安装Logstash

1.上传并解压logstash-7.14.0-linux-x86_64.tar.gz;

2.修改配置文件(没有可以创建);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
input{
kafka {
bootstrap_servers => "192.168.11.156:9092,192.168.11.157:9092,192.168.11.159:9092" #kafka服务器地址
topics => "monitoring-log"
group_id => "elk-boer"
decorate_events => true #kafka标记
consumer_threads => 1
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
type => "monitoring-log"
}
kafka {
bootstrap_servers => "192.168.11.156:9092,192.168.11.157:9092,192.168.11.159:9092" #kafka服务器地址
topics => "metricbeat"
group_id => "elk-boer"
decorate_events => true #kafka标记
consumer_threads => 1
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
type => "metricbeat"
}
kafka {
bootstrap_servers => "192.168.11.156:9092,192.168.11.157:9092,192.168.11.159:9092" #kafka服务器地址
topics => "oracle-irm"
group_id => "elk-boer"
decorate_events => true #kafka标记
consumer_threads => 1
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
type => "oracle-irm"
}


}
filter{
# if [type] == "monitoring-log" {
# grok {
# match => {
# "message" => "(?<Date>^%{DAY}\s%{MONTH}\s%{MONTHDAY}\s%{TIME}\s%{YEAR}).\[(?<info>[a-z]*)\].(?<fangfa>[%{WORD}-]*).(?<yunaiz>[%{WORD}-]*).(?<yuanyin>[%{WORD}-]+).(?<mess>.*)"
# }
# overwrite => ["message"]
# }
# }
}

output{
if [type] == "monitoring-log" {
elasticsearch {
user => "elastic"
password => "Root@123"
hosts => ["192.168.11.156:9200",
"192.168.11.157:9200",
"192.168.11.159:9200"]
index => "monitoring-log-%{+YYYY.MM}"
}
}
if [type] == "metricbeat" {
elasticsearch {
user => "elastic"
password => "Root@123"
hosts => ["192.168.11.156:9200",
"192.168.11.157:9200",
"192.168.11.159:9200"]
index => "metricbeat-%{+YYYY.MM}"
}
}
if [type] == "oracle-irm" {
elasticsearch {
user => "elastic"
password => "Root@123"
hosts => ["192.168.11.156:9200",
"192.168.11.157:9200",
"192.168.11.159:9200"]
index => "oracle-irm-%{+YYYY.MM}"
}
}
# stdout {
# codec => rubydebug
# }
}

3.启动logstash;

1
2
#!/bin/sh
nohup ./bin/logstash -f config/logstash.conf & >./nohup.out

五.安装kibana

1.上传并解压kibana-7.14.0-linux-x86_64.tar.gz压缩包到/data;

2.修改配置文件;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Kibana is served by a back end server. This setting specifies the port to use.
#server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
#server.host: "localhost"
server.host: "0.0.0.0"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""

# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false

# Specifies the public URL at which Kibana is available for end users. If
# `server.basePath` is configured this URL should end with the same basePath.
#server.publicBaseUrl: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayload: 1048576

# The Kibana server's name. This is used for display purposes.
#server.name: "your-hostname"

# The URLs of the Elasticsearch instances to use for all your queries.

3.启动kibana;

1
2
#!/bin/sh
sudo -u elk /data/kibana-7.14.0/bin/kibana -c config/kibana.yml & >/data/kibana-7.14.0/kibana.log

六.安装filebeat

1.上传并解压filebeat-7.14.0-linux-x86_64.tar.gz文件到/data;

2.修改配置文件;

1
2
cd /data/filebeat-7.14.0
vi filebeat.yml

3.启动filebeat;

1
2
#!/bin/sh
nohup ./filebeat -e -c filebeat.yml & >./nohup.out

5.安装windows版本filebeat,上传filebeat-7.14.0-windows-x86.zip,并解压;

6.修改配置文件同上;

7.注册成windows服务(注意安装路径修改)

1
sc create filebeat-7.14.0 binpath= "D:\filebeat-7.14.0\filebeat.exe -e -c D:\filebeat-7.14.0\filebeat.yml" type= own start= auto displayname= filebeat-7.14.0

8.启动windows服务;

七.访问kibana查看数据

1.http://192.168.11.156:5601/

如下图,为接入数据后的效果:

完(持续更新)

 评论