
- client 客户端提交任务给 JobManager
- JobManager 负责申请任务运行锁需要的资源并管理任务和资源
- JobManager 分发任务给 TaskManager 执行
- TaskManager 定期向 JobManager 汇报状态
部署
环境
- linux: centos7
- flink:flink-1.13.2-bin-scala_2.11.tgz
- hadoop:hadoop-2.7.5.tar.gz
- flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
在这个操作过程中,使用 hadoop 3.3.1 版本没有成功,会报错无法找到 hadoop,即使引入 flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar 还是报错无法找到,所以后期降低了 hadoop 的版本,使用 2.7.5 的时候需要使用 jdk 1.7 ,否则启动的时候会有很多警告。后来经过实验发现 hadoop 3.3.1 也可以成功了,依然使用 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 。
安装
解压
将 flink-1.13.2-bin-scala_2.11.tgz 解压至 /usr/local/opt/ 目录下
tar -xvf flink-1.13.2-bin-scala_2.11.tgz -C /usr/local/opt/
创建软连接:ln -s flink-1.13.2 flink
配置文件
配置 flink-conf.yaml
完成的配置文件内容可以参考:Apache Flink 1.11 Documentation: Configuration
这里需要主要需要配置的参数有:
jobmanager.rpc.address: node01
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
### 配置历史服务
jobmanager.archive.fs.dir: hdfs://node01:9000/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://node01:9000/flink/completed-jobs/
historyserver.web.address: node01
historyserver.web.port: 8082
historyserver.web.tmpdir: /usr/local/opt/flink/tmp/history
复制代码
flink-conf.yaml
配置 master
node01:8081
复制代码
配置 workers
node01
node02
node03
复制代码
添加 jar
将 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 放到目录 /usr/local/opt/flink/lib 下:这个 jar 的版本要与 hadoop 的版本一致

启动
- 启动 flink 的 JobManager 和 taskManager : start-cluster.sh
- 启动 historyserver : historyserver.sh start
这里遇到了一种情况纠缠了很长时间,就是启动了 historyserver 的时候,缺在页面出现了 404 的情况:

这些其实是 Apache Flink 1.11 Documentation: History Server 中列举的 api:

经过找到了历史文件配置的地址之后,发现里面的 jobs 目录需要在执行任务之后才会产生,所以才会刚开始是无法找到的。
021-09-07 15:40:51,009 ERROR org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher [] - Failed to update job overview.\
java.nio.file.NoSuchFileException: /usr/local/opt/flink/tmp/history/jobs/overview.json\
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) ~[?:?]\
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) ~[?:?]\
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) ~[?:?]\
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219) ~[?:?]\
at java.nio.file.Files.newByteChannel(Files.java:371) ~[?:?]\
at java.nio.file.Files.createFile(Files.java:648) ~[?:?]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer.createOrGetFile(HistoryServer.java:324) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.updateJobOverview(HistoryServerArchiveFetcher.java:464) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.access$000(HistoryServerArchiveFetcher.java:74) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.<init>(HistoryServerArchiveFetcher.java:199) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.<init>(HistoryServerArchiveFetcher.java:124) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer.<init>(HistoryServer.java:230) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer.<init>(HistoryServer.java:146) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer$1.call(HistoryServer.java:130) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer$1.call(HistoryServer.java:127) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]\
at javax.security.auth.Subject.doAs(Subject.java:423) [?:?]\
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]\
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.13.2.jar:1.13.2]\
at org.apache.flink.runtime.webmonitor.history.HistoryServer.main(HistoryServer.java:126) [flink-dist_2.11-1.13.2.jar:1.13.2]
复制代码
红框中的内容都是执行任务之后才产生的。

测试
使用 flink examples 中的 jar 进行测试
./flink run /usr/local/opt/flink/examples/batch/WordCount.jar
- 执行后生成 如上图 jobs,libs ,overviews 目录。
- 8081 web 端口可以查到:complete job 中可以查询到已经完成的任务

- 8082 端口 history web 查询到历史任务:8081 中的 complete job 会定期移动到历史服务中\

- 在 hdfs 上可以看到执行过的历史记录:








![[DDD]读书笔记《领域驱动设计:软件核心复杂性应对之道》(2)模型驱动设计的构造块①-一一网](https://www.proyy.com/skycj/data/images/2021-09-13/d67c2b242c4ec33e43bbf4815208acfe.jpg)













![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)