logo

Flink on YARN单机部署全攻略:从环境配置到作业提交

作者:蛮不讲李2025.09.17 11:04浏览量:0

简介:本文详细阐述Flink on YARN单机部署的全流程,涵盖环境准备、依赖安装、配置优化及作业提交等关键环节,助力开发者快速搭建高效流处理环境。

一、环境准备与依赖安装

1.1 硬件与软件要求

单机部署Flink on YARN需满足基础硬件条件:建议CPU核心数≥4、内存≥16GB、磁盘空间≥100GB(用于存储日志及临时文件)。操作系统推荐CentOS 7/8或Ubuntu 20.04 LTS,需确保系统已安装Java 11(LTS版本)并配置JAVA_HOME环境变量。通过java -version验证安装,输出应包含11.x.x版本号。

1.2 Hadoop与YARN环境配置

YARN作为资源管理器,需单独部署Hadoop 3.x版本。步骤如下:

  1. 下载Hadoop:从Apache官网获取二进制包(如hadoop-3.3.4.tar.gz),解压至/opt/hadoop
  2. 配置环境变量:在~/.bashrc中添加:
    1. export HADOOP_HOME=/opt/hadoop
    2. export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
  3. 修改核心配置
    • etc/hadoop/core-site.xml:设置HDFS默认路径
      1. <property>
      2. <name>fs.defaultFS</name>
      3. <value>hdfs://localhost:9000</value>
      4. </property>
    • etc/hadoop/hdfs-site.xml:配置单节点HDFS
      1. <property>
      2. <name>dfs.replication</name>
      3. <value>1</value>
      4. </property>
    • etc/hadoop/mapred-site.xml:启用YARN作为MapReduce框架
      1. <property>
      2. <name>mapreduce.framework.name</name>
      3. <value>yarn</value>
      4. </property>
    • etc/hadoop/yarn-site.xml:调整内存分配
      1. <property>
      2. <name>yarn.nodemanager.resource.memory-mb</name>
      3. <value>8192</value>
      4. </property>
      5. <property>
      6. <name>yarn.scheduler.maximum-allocation-mb</name>
      7. <value>8192</value>
      8. </property>
  4. 格式化HDFS并启动服务
    1. hdfs namenode -format
    2. start-dfs.sh
    3. start-yarn.sh
    通过jps验证进程(应包含NameNodeDataNodeResourceManagerNodeManager)。

二、Flink安装与YARN集成

从Apache Flink官网下载与Hadoop兼容的版本(如flink-1.17.0-bin-scala_2.12.tgz),解压至/opt/flink。修改conf/flink-conf.yaml关键配置:

  1. # 指定YARN作为任务管理器
  2. jobmanager.rpc.address: localhost
  3. taskmanager.numberOfTaskSlots: 4 # 根据CPU核心数调整
  4. yarn.application.name: Flink-on-YARN-Demo

2.2 YARN会话模式配置

2.2.1 启动YARN会话

执行以下命令启动Flink集群:

  1. /opt/flink/bin/yarn-session.sh \
  2. -n 2 \ # 任务管理器数量
  3. -tm 2048 \ # 每个任务管理器内存(MB)
  4. -jm 1024 \ # JobManager内存(MB)
  5. -Dyarn.application.name=Flink-Session # 自定义应用名

成功启动后,终端会输出YARN应用ID(如application_123456789_0001)及Web UI地址(默认http://localhost:8088)。

2.2.2 客户端连接配置

若需从其他节点提交作业,需在flink-conf.yaml中添加:

  1. rest.address: localhost # 或YARN NodeManager的IP
  2. rest.port: 8081

通过netstat -tulnp | grep 8081验证端口监听状态。

三、作业提交与调试

3.1 示例作业开发

使用Maven创建Flink项目,依赖如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-streaming-java_2.12</artifactId>
  5. <version>1.17.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-clients_2.12</artifactId>
  10. <version>1.17.0</version>
  11. </dependency>
  12. </dependencies>

编写简单WordCount作业:

  1. public class YarnWordCount {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStream<String> text = env.readTextFile("hdfs://localhost:9000/input.txt");
  5. DataStream<Tuple2<String, Integer>> counts = text
  6. .flatMap(new Tokenizer())
  7. .keyBy(value -> value.f0)
  8. .sum(1);
  9. counts.print();
  10. env.execute("Flink YARN WordCount");
  11. }
  12. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  13. @Override
  14. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  15. for (String word : value.split("\\s+")) {
  16. out.collect(new Tuple2<>(word, 1));
  17. }
  18. }
  19. }
  20. }

3.2 作业提交与监控

3.2.1 提交到YARN会话

  1. /opt/flink/bin/flink run \
  2. -m yarn-cluster \ # 指定YARN模式
  3. -yn 2 \ # 任务管理器数量
  4. -yjm 1024 \ # JobManager内存
  5. -ytm 2048 \ # 任务管理器内存
  6. -c com.example.YarnWordCount \
  7. /path/to/your-job.jar

3.2.2 监控作业状态

  • YARN Web UI:访问http://localhost:8088,查看应用状态、日志及资源使用。
  • Flink Web UI:通过YARN应用日志中的tracking URL访问(如http://localhost:8081)。
  • 命令行工具
    1. yarn application -list # 列出所有应用
    2. yarn application -status <APP_ID> # 查看特定应用状态

四、常见问题与优化

4.1 内存不足错误

现象Container killed by YARN for exceeding memory limits
解决方案

  1. 调整yarn-site.xml中的yarn.nodemanager.resource.memory-mb
  2. 在提交作业时增加-ytm-yjm参数值。
  3. 检查Flink任务是否存在内存泄漏(如未关闭的DataSet)。

4.2 作业提交失败

现象Application application_xxx failed 2 times
排查步骤

  1. 查看YARN日志:yarn logs -applicationId <APP_ID>
  2. 检查HDFS权限:确保Flink用户有读写权限。
  3. 验证网络连通性:使用telnet localhost 8088测试端口可达性。

4.3 性能优化建议

  • 并行度调整:根据数据量设置env.setParallelism(N)
  • 检查点配置:启用检查点保障容错:
    1. env.enableCheckpointing(5000); // 每5秒一次
    2. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 资源动态分配:在flink-conf.yaml中启用:
    1. taskmanager.memory.process.size: 4096m
    2. yarn.containers.vcores: 2 # 为每个TaskManager分配2个虚拟核

五、总结与扩展

Flink on YARN单机部署适合开发测试及轻量级生产场景。通过YARN的动态资源管理,可高效利用单机资源。未来可扩展至:

  1. 高可用配置:启用ZooKeeper实现JobManager HA。
  2. Kerberos集成:在安全环境中部署。
  3. Kubernetes迁移:将资源管理从YARN切换至K8s。

通过本文的详细步骤与优化建议,开发者可快速完成Flink on YARN的单机部署,并构建稳定的流处理管道。

相关文章推荐

发表评论