Flink集群搭建
LiuSw Lv6

Flink集群搭建

Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

在安装spark之前,需要安装hadoop集群环境

集群列表

服务器 地址 角色 备注
hadoop1 192.168.11.81 master slaves 32G 12C 800G
hadoop2 192.168.11.82 master slaves 32G 12C 800G
hadoop3 192.168.11.83 slaves 32G 12C 800G

一.基础环境设置

关闭防火墙

1
2
systemctl stop firewalld
systemctl disable firewalld

关闭selinux

1
2
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
setenforce 0

添加hosts

1
2
3
4
vi /etc/hosts
192.168.11.81 hadoop1
192.168.11.82 hadoop2
192.168.11.83 hadoop3

二.安装flink集群

下载flink安装包

1
2
3
https://archive.apache.org/dist/flink/

https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz

上传解压flink安装包

1
tar -zxvf flink-1.14.4-bin-scala_2.12.tgz -C /data

配置环境变量

1
2
3
4
5
6
7
cat >>/etc/profile <<EOF
# flink
export FLINK_HOME=/data/flink-1.14.4
export PATH=$PATH:$FLINK_HOME/bin
EOF

source /etc/profile

配置flink参数

1
2
3
4
cd $FLINK_HOME/conf
vi flink-conf.yaml
# 添加
jobmanager.rpc.address: hadoop1

配置工作节点

1
2
3
4
vi workers
# 添加
hadoop2
hadoop3

注:以上配置需要复制到其它节点中

启动服务

1
2
cd $FLINK_HOME/bin
./start-cluster.sh

关闭服务

1
2
cd $FLINK_HOME/bin
./stop-cluster.sh

五.web可视化

1
http://192.168.11.81:8081

六.配置flink集群Standalone HA模式

1
2
3
4
5
cd $FLINK_HOME/conf
vi masters
# 修改
hadoop2:8081
hadoop1:8081
1
2
3
4
5
vi workers
# 修改
hadoop1
hadoop2
hadoop3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
cd $FLINK_HOME/conf
vi flink-conf.yaml
# 修改
# 基础配置
jobmanager.rpc.address: westgis181
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 5

# 指定使用 zookeeper 进行 HA 协调
high-availability: zookeeper
high-availability.storageDir: hdfs:///user/flink/recovery
high-availability.zookeeper.quorum: 192.168.11.160:2181
high-availability.zookeeper.path.root: /flink

# 指定 checkpoint 的类型和对应的数据存储目录
state.backend: filesystem
state.checkpoints.dir: hdfs://westgis181:9000/flink/flink-checkpoints
jobmanager.execution.failover-strategy: region

# Rest和网络配置
rest.port: 8081
rest.address: westgis181

# 高级配置,临时文件目录
io.tmp.dirs: /tmp

# 配置 HistoryServe
jobmanager.archive.fs.dir: hdfs:///user/flink/recovery/completed-jobs/
historyserver.web.address: westgis182
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs:///user/flink/recovery/completed-jobs/
historyserver.archive.fs.refresh-interval: 1000
1
2
3
4
5
6
cd $FLINK_HOME/conf
vi zoo.cfg
# 修改
server.1=192.168.11.160:2888:3888
server.2=192.168.11.161:2888:3888
server.3=192.168.11.162:2888:3888

根据自身hadoop版本和flink版本下载相应的依赖包,然后上传到$FLINK_HOME /lib目录下,最后分发给其他节点

下载地址

1
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.5-10.0/

启动服务

1
2
cd $FLINK_HOME/bin
./start-cluster.sh

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@hadoop1 conf]# jps
7616 NodeManager
7074 DataNode
7331 ResourceManager
708 TaskManagerRunner
10340 TaskManagerRunner
25132 Jps
7955 Master
20599 TaskManagerRunner # maste节点 flink程序
6939 NameNode
9243 StandaloneSessionClusterEntrypoint # maste节点 flink程序
8123 TaskManagerRunner
9565 TaskManagerRunner
1
2
3
4
5
6
[root@hadoop2 ~]# jps
5378 Worker
7331 Jps
5140 NodeManager
6107 TaskManagerRunner # standby节点 flink程序
5022 DataNode
 评论