Flink – standalone 单节点独立集群模式

image2021-8-30_17-9-58.png

  • client 客户端提交任务给 JobManager
  • JobManager 负责申请任务运行锁需要的资源并管理任务和资源
  • JobManager 分发任务给 TaskManager 执行
  • TaskManager 定期向 JobManager 汇报状态

部署

环境

  • linux: centos7 
  • flink:flink-1.13.2-bin-scala_2.11.tgz
  • hadoop:hadoop-2.7.5.tar.gz
  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

在这个操作过程中,使用 hadoop 3.3.1 版本没有成功,会报错无法找到 hadoop,即使引入 flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar 还是报错无法找到,所以后期降低了 hadoop 的版本,使用 2.7.5 的时候需要使用 jdk 1.7 ,否则启动的时候会有很多警告。后来经过实验发现 hadoop 3.3.1 也可以成功了,依然使用  flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 。

安装

解压

将 flink-1.13.2-bin-scala_2.11.tgz 解压至 /usr/local/opt/ 目录下

tar -xvf flink-1.13.2-bin-scala_2.11.tgz -C  /usr/local/opt/ 

创建软连接:ln -s flink-1.13.2 flink

配置文件

配置 flink-conf.yaml

完成的配置文件内容可以参考:Apache Flink 1.11 Documentation: Configuration

这里需要主要需要配置的参数有:

    jobmanager.rpc.address: node01
    taskmanager.numberOfTaskSlots: 2
    web.submit.enable: true
    
    ### 配置历史服务
    jobmanager.archive.fs.dir: hdfs://node01:9000/flink/completed-jobs/ 
    historyserver.archive.fs.dir: hdfs://node01:9000/flink/completed-jobs/ 
    historyserver.web.address: node01
    historyserver.web.port: 8082
    historyserver.web.tmpdir: /usr/local/opt/flink/tmp/history
    
复制代码

flink-conf.yaml

配置 master

node01:8081
复制代码

配置 workers

node01
node02
node03
复制代码

添加 jar

将  flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 放到目录  /usr/local/opt/flink/lib 下:这个 jar 的版本要与 hadoop 的版本一致

image2021-9-6_17-14-35.png

启动

  • 启动 flink 的 JobManager 和 taskManager : start-cluster.sh
  • 启动 historyserver : historyserver.sh start

这里遇到了一种情况纠缠了很长时间,就是启动了 historyserver 的时候,缺在页面出现了 404 的情况:

image2021-9-6_16-48-52.png

这些其实是 Apache Flink 1.11 Documentation: History Server 中列举的 api:

image2021-9-6_16-49-44.png

经过找到了历史文件配置的地址之后,发现里面的 jobs 目录需要在执行任务之后才会产生,所以才会刚开始是无法找到的。

021-09-07 15:40:51,009 ERROR org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher [] - Failed to update job overview.\
java.nio.file.NoSuchFileException: /usr/local/opt/flink/tmp/history/jobs/overview.json\
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) ~[?:?]\
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) ~[?:?]\
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) ~[?:?]\
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219) ~[?:?]\
    at java.nio.file.Files.newByteChannel(Files.java:371) ~[?:?]\
    at java.nio.file.Files.createFile(Files.java:648) ~[?:?]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer.createOrGetFile(HistoryServer.java:324) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.updateJobOverview(HistoryServerArchiveFetcher.java:464) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.access$000(HistoryServerArchiveFetcher.java:74) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.<init>(HistoryServerArchiveFetcher.java:199) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher.<init>(HistoryServerArchiveFetcher.java:124) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer.<init>(HistoryServer.java:230) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer.<init>(HistoryServer.java:146) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer$1.call(HistoryServer.java:130) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer$1.call(HistoryServer.java:127) ~[flink-dist_2.11-1.13.2.jar:1.13.2]\
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]\
    at javax.security.auth.Subject.doAs(Subject.java:423) [?:?]\
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]\
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.13.2.jar:1.13.2]\
    at org.apache.flink.runtime.webmonitor.history.HistoryServer.main(HistoryServer.java:126) [flink-dist_2.11-1.13.2.jar:1.13.2]
复制代码

红框中的内容都是执行任务之后才产生的。

image2021-9-6_16-53-28.png

测试

使用 flink examples 中的 jar 进行测试

./flink run /usr/local/opt/flink/examples/batch/WordCount.jar

  • 执行后生成 如上图 jobs,libs ,overviews 目录。
  • 8081 web 端口可以查到:complete job 中可以查询到已经完成的任务

image2021-9-6_17-28-20.png

  • 8082 端口 history web 查询到历史任务:8081 中的 complete job 会定期移动到历史服务中\

image2021-9-6_17-29-33.png

  • 在 hdfs 上可以看到执行过的历史记录:

image2021-9-6_18-45-50.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享