Flink on K8s:云原生架构部署分析
一、前言
容器化部署是如今业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(K8s),而Flink也在最近的版本中支持了K8s部署模式。
与Standalone和YARN部署相比,Flink on K8s具有以下优势:
- 资源隔离:利用K8s Namespace和ResourceQuota实现多租户隔离
- 弹性伸缩:结合K8s HPA/VPA实现自动扩缩容
- 云原生生态:无缝对接K8s的监控、日志、服务发现等能力
- 简化运维:通过声明式API管理Flink作业生命周期
二、Flink on K8s 架构概述
2.1 原生K8s部署架构
Flink从1.10版本开始支持原生Kubernetes部署,基本原理与YARN类似:
如上图所示,Flink原生K8s部署的核心组件交互流程:
- Flink Client通过K8s Client向K8s ApiServer提交资源请求
- K8s ApiServer创建ConfigMap存储作业配置
- Flink Master Deployment启动Dispatcher和K8sResMngr(K8s资源管理器)
- JobMaster向K8sResMngr申请资源
- K8sResMngr通过K8s ApiServer创建TaskManager Pod
- TaskManager Pod启动后向JobMaster注册
- Flink Dashboard通过SVC访问Web UI
2.2 Flink与K8s组件映射
| Flink组件 | K8s资源类型 | 说明 |
|---|---|---|
| JobManager | Deployment + Service | 主控节点,通过Service暴露REST端口 |
| TaskManager | Pod | 工作节点,由JobManager动态申请创建 |
| 配置信息 | ConfigMap | 存储flink-conf.yaml等配置 |
| 持久化存储 | PersistentVolumeClaim | Checkpoint/Savepoint持久化 |
| 服务发现 | Service + DNS | 内部组件通信 |
三、三种部署模式详解
Flink on K8s同样支持三种部署模式:Session Mode、Per-Job Mode和Application Mode。
3.1 三种模式对比
| 对比维度 | Session Mode | Per-Job Mode | Application Mode |
|---|---|---|---|
| 集群生命周期 | 长期运行,多作业共享 | 随作业创建和销毁 | 随作业创建和销毁 |
| 资源隔离 | 弱(共享JobManager) | 强(独立JobManager) | 强(独立JobManager) |
| main方法执行位置 | Client | Client | JobManager |
| 资源申请时机 | 集群启动时 | 作业提交时 | 作业提交时 |
| 适用场景 | 短作业、测试环境 | 长作业、生产环境 | 推荐的生产环境方式 |
| K8s支持 | ✅ 完全支持 | ✅ 支持 | ✅ 推荐方式 |
3.2 Session Mode(会话模式)
Session Mode先启动一个长期运行的Flink集群,然后向该集群提交多个作业。
特点:
- 多个作业共享同一个JobManager
- 资源提前分配,作业启动快
- 资源隔离性差,一个作业故障可能影响其他作业
启动命令:
# 启动Flink Session集群./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-flink-session-Dkubernetes.namespace=flink-Dkubernetes.container.image=flink:1.17-scala_2.12提交作业:
./bin/flink run-tkubernetes-session -Dkubernetes.cluster-id=my-flink-session-Dkubernetes.namespace=flink-ccom.example.MyJob ./my-job.jar3.3 Per-Job Mode(单作业模式)
Per-Job Mode为每个作业单独创建一个Flink集群,作业完成后集群销毁。
特点:
- 每个作业独立JobManager,资源隔离好
- 作业提交时申请资源,启动稍慢
- main方法在Client执行,需要下载依赖到本地
启动命令:
./bin/flink run-tkubernetes-per-job -Dkubernetes.cluster-id=my-flink-job-Dkubernetes.namespace=flink-Dkubernetes.container.image=flink:1.17-scala_2.12-ccom.example.MyJob ./my-job.jar3.4 Application Mode(应用模式)⭐推荐
Application Mode是Flink 1.11引入的部署模式,也是K8s环境下最推荐的方式。
特点:
- main方法在JobManager上执行,减轻Client负担
- 每个作业独立JobManager,资源隔离好
- 作业jar包和依赖直接构建到镜像中,无需每次传输
启动命令:
./bin/flink run-application-tkubernetes-application -Dkubernetes.cluster-id=my-flink-app-Dkubernetes.namespace=flink-Dkubernetes.container.image=flink:1.17-scala_2.12 local:///opt/flink/usrlib/my-job.jar关键配置说明:
# K8s集群标识 kubernetes.cluster-id: my-flink-app # 命名空间 kubernetes.namespace: flink # 容器镜像 kubernetes.container.image: flink:1.17-scala_2.12 # 镜像拉取策略(Always/Never/IfNotPresent) kubernetes.container.image.pull-policy: IfNotPresent # JobManager资源 kubernetes.jobmanager.cpu: 1.0 kubernetes.jobmanager.memory: 1600m # TaskManager资源 kubernetes.taskmanager.cpu: 2.0 kubernetes.taskmanager.memory: 1728m # 每个TaskManager的Slot数 taskmanager.numberOfTaskSlots: 2四、Flink Kubernetes Operator
4.1 什么是Flink Kubernetes Operator
Flink Kubernetes Operator是Flink社区官方推出的K8s Operator,通过CRD(Custom Resource Definition)以声明式的方式管理Flink作业生命周期。
核心能力:
- 声明式部署:通过YAML定义Flink作业,无需手动执行命令
- 作业升级:支持Savepoint停启方式升级作业
- 自动故障恢复:监控作业状态,自动从Checkpoint恢复
- 弹性伸缩:支持手动和自动调整并行度
4.2 安装Flink Kubernetes Operator
前提条件:
- Kubernetes 1.9+
- Helm 3+
- 集群已安装cert-manager
安装步骤:
# 1. 添加Helm仓库helm repoaddflink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/# 2. 安装Operatorhelminstallflink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator# 3. 验证安装kubectl get pods-nflink# 应看到 flink-kubernetes-operator 的Pod运行中4.3 使用FlinkDeployment CRD部署作业
FlinkDeployment是Operator定义的核心CRD,用于描述Flink作业的配置。
Session集群示例:
apiVersion:flink.apache.org/v1beta1kind:FlinkDeploymentmetadata:name:session-clusternamespace:flinkspec:image:flink:1.17-scala_2.12flinkVersion:v1.17mode:nativeingress:template:"/{{namespace}}/{{name}}(/|$)(.*)"className:"nginx"annotations:nginx.ingress.kubernetes.io/rewrite-target:"/$2"jobManager:resource:memory:"2048m"cpu:1taskManager:resource:memory:"2048m"cpu:2replicas:2Application模式作业示例:
apiVersion:flink.apache.org/v1beta1kind:FlinkDeploymentmetadata:name:wordcount-jobnamespace:flinkspec:image:flink:1.17-scala_2.12flinkVersion:v1.17mode:nativejobManager:resource:memory:"2048m"cpu:1taskManager:resource:memory:"2048m"cpu:2replicas:2job:jarURI:local:///opt/flink/examples/streaming/WordCount.jarparallelism:2upgradeMode:savepointstate:running提交作业:
kubectl apply-fflink-deployment.yaml查看作业状态:
kubectl get flinkdeployments-nflink kubectl describe flinkdeployment wordcount-job-nflink五、高可用(HA)配置
5.1 K8s环境下HA架构
如上图所示,Flink on K8s的高可用架构包含以下组件:
- 多个JobManager:通过Leader Election选举主节点
- Zookeeper/K8s ConfigMap:存储Leader信息和元数据
- 分布式存储(S3/HDFS):存储Checkpoint和Savepoint
- TaskManager:连接Leader JobManager
5.2 基于K8s ConfigMap的HA配置
Flink支持使用K8s ConfigMap替代Zookeeper作为HA元数据存储。
# 启用HA high-availability: kubernetes high-availability.namespace: flink # JobManager副本数 kubernetes.jobmanager.replicas: 3 # 选举超时时间 high-availability.zookeeper.client.session-timeout: 60000 # Checkpoint存储路径 state.checkpoints.dir: s3://my-bucket/flink/checkpoints # Savepoint存储路径 state.savepoints.dir: s3://my-bucket/flink/savepoints5.3 基于Zookeeper的HA配置
# 启用Zookeeper HA high-availability: zookeeper high-availability.zookeeper.quorum: zk-0:2181,zk-1:2181,zk-2:2181 high-availability.zookeeper.path.root: /flink # JobManager元数据存储 high-availability.storageDir: s3://my-bucket/flink/ha六、持久化存储配置
6.1 Checkpoint与Savepoint持久化
在K8s环境中,Pod是临时的,因此必须将Checkpoint和Savepoint持久化到外部存储。
如上图所示,JobManager和TaskManager将状态快照存储到NAS(网络附加存储)等持久化存储中。
支持的外部存储:
- HDFS:hadoop分布式文件系统
- S3/MinIO:对象存储
- OSS:阿里云对象存储
- GCS:Google云存储
6.2 S3存储配置示例
# Checkpoint目录 state.checkpoints.dir: s3://my-bucket/flink/checkpoints # Savepoint目录 state.savepoints.dir: s3://my-bucket/flink/savepoints # S3访问配置 s3.access-key: your-access-key s3.secret-key: your-secret-key s3.endpoint: http://minio-service:90006.3 使用PersistentVolumeClaim
对于RocksDB状态后端,可以使用PVC挂载高性能本地磁盘或网络存储。
apiVersion:v1kind:PersistentVolumeClaimmetadata:name:flink-rocksdb-storagenamespace:flinkspec:accessModes:-ReadWriteOnceresources:requests:storage:100GistorageClassName:fast-ssd在FlinkDeployment中引用:
spec:taskManager:volumeClaimTemplates:-metadata:name:rocksdb-storagespec:accessModes:-ReadWriteOnceresources:requests:storage:100GiadditionalVolumes:-name:rocksdb-storagepersistentVolumeClaim:claimName:rocksdb-storageadditionalVolumeMounts:-name:rocksdb-storagemountPath:/data/rocksdb七、生产环境最佳实践
7.1 镜像构建最佳实践
推荐方式:将作业jar包和依赖构建到自定义镜像中。
FROM flink:1.17-scala_2.12 # 复制作业jar包 COPY target/my-flink-job-1.0.jar /opt/flink/usrlib/my-job.jar # 复制依赖配置 COPY conf/flink-conf.yaml /opt/flink/conf/ # 设置用户 USER flink构建并推送:
dockerbuild-tmy-registry/flink-job:1.0.dockerpush my-registry/flink-job:1.07.2 资源配置建议
| 组件 | CPU | 内存 | 说明 |
|---|---|---|---|
| JobManager | 1-2核 | 2-4GB | 控制节点,资源需求较小 |
| TaskManager | 2-8核 | 4-16GB | 工作节点,根据作业需求调整 |
| Slot数 | - | - | 建议与CPU核数1:1或1:2 |
7.3 监控与日志
如上图所示,生产环境Flink on K8s的监控与日志架构:
监控方案:
- Metrics:通过Prometheus + Grafana采集Flink指标
- Logging:使用Vector/Fluentd收集日志到ELK/Loki
- Alerting:配置告警规则,监控作业健康状态
Prometheus集成配置:
spec:jobManager:metrics:enabled:truereporterPrometheus:enabled:trueport:9249taskManager:metrics:enabled:truereporterPrometheus:enabled:trueport:92497.4 安全性配置
RBAC权限控制:
apiVersion:rbac.authorization.k8s.io/v1kind:Rolemetadata:namespace:flinkname:flink-operatorrules:-apiGroups:[""]resources:["pods","services","configmaps"]verbs:["get","list","watch","create","update","delete"]-apiGroups:["apps"]resources:["deployments"]verbs:["get","list","watch","create","update","delete"]网络策略:
apiVersion:networking.k8s.io/v1kind:NetworkPolicymetadata:name:flink-network-policynamespace:flinkspec:podSelector:matchLabels:app:flinkpolicyTypes:-Ingress-Egressingress:-from:-namespaceSelector:matchLabels:name:flinkports:-protocol:TCPport:6123八、常见问题与排查
8.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| Pod启动失败 | 镜像拉取失败 | 检查镜像名称和拉取策略 |
| OOMKilled | 内存不足 | 调整taskmanager.memory配置 |
| Checkpoint失败 | 存储权限不足 | 检查S3/HDFS访问权限 |
| HA切换失败 | ConfigMap/ZK连接问题 | 检查网络连通性和配置 |
| 作业反压 | 下游处理能力不足 | 增加并行度或优化算子 |
8.2 调试命令
# 查看Flink Pod状态kubectl get pods-nflink-lapp=flink# 查看JobManager日志kubectl logs-nflink deployment/my-flink-app-jobmanager# 查看TaskManager日志kubectl logs-nflink pod/my-flink-app-taskmanager-1-2# 进入Pod调试kubectlexec-it-nflink pod/my-flink-app-taskmanager-1-2 -- /bin/bash# 查看FlinkDeployment状态kubectl get flinkdeployments-nflink-oyaml九、总结
本文详细讲解了Flink on Kubernetes的部署方式:
- 原生K8s部署:通过kubernetes-session.sh和flink run命令直接部署,三种模式(Session/Per-Job/Application)灵活选择
- Flink Kubernetes Operator:通过CRD以声明式方式管理Flink作业,推荐的生产环境方案
- 高可用配置:支持基于K8s ConfigMap和Zookeeper的HA方案
- 持久化存储:Checkpoint/Savepoint必须配置外部存储(S3/HDFS等)
- 生产实践:镜像构建、资源配置、监控日志、安全控制等最佳实践
Flink on K8s是云原生时代流处理部署的主流方向,掌握其部署和运维技能对于大数据工程师至关重要。
如果本文对你有帮助,欢迎点赞 👍 + 收藏 ⭐ + 关注 🔖,你的支持是我持续创作的动力!
