本文为您介绍如何提交Flink全托管Python作业至集群。
上传资源
Python API作业运行前,需要您按照以下步骤将Python作业文件或Python依赖上传到Flink全托管开发控制台。
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击资源上传。
- 单击上传资源,选择您要上传的Python作业文件或Python依赖。
作业提交
- 登录Flink全托管开发控制台,新建作业。
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击作业开发。
- 单击新建。
- 在新建文件对话框,填写作业配置信息。
作业参数 说明 文件名称 作业的名称。 说明 作业名称在当前项目中必须保持唯一。文件类型 文件类型需要选择为流作业/PYTHON。 流作业和批作业均支持以下类型:- SQL
- JAR
- PYTHON
说明 实时计算引擎VVR 3.0.1及以上版本支持批作业。部署目标 选择作业需要部署的集群。 说明 Python作业仅支持Per-Job集群。存储位置 指定该作业的代码文件所属的文件夹。 您还可以在现有文件夹右侧,单击
图标,新建子文件夹。
- 单击确认。
- 在作业开发页面,填写基本配置信息。您可以直接填写以下配置信息,也可以单击YAML直接修改配置信息。配置参数解释如下表所示。
参数 说明 部署目标 您可以修改创建作业时已选择的部署目标。 Python文件地址 Python作业文件,可以为.py文件或者.zip文件。 Entry Module 程序的入口类。如果Python作业文件为.py文件,则该项不需要填写;如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如example.word_count。 Entry Point Main Arguments 作业参数。 Python Libraries 第三方Python包。第三方Python包会被添加到Python worker进程的PYTHONPATH中,从而在Python自定义函数中可以直接访问。如何使用第三方Python包,详情请参见使用第三方Python包。 Python Archives 存档文件,目前仅支持ZIP格式的文件,例如 .zip、.jar、.whl和.egg等。 存档文件会被解压到Python worker进程的工作目录下。如果存档文件所在的压缩包名称为mydata.zip,则在Python自定义函数中可以编写以下代码来访问mydata.zip存档文件。def map(): with open("mydata.zip/mydata/data.txt") as f: ...
请参见使用自定义的Python虚拟环境和使用数据文件,了解更多关于Python Archives的信息。
附加依赖文件 上传JAR包、数据文件等,上传的依赖文件默认会被上传到作业运行节点的/flink/usrlib/目录下。 说明 Session集群不支持设置附加依赖文件,仅Per-Job集群支持设置附加依赖文件。 - 在作业开发页面右侧,单击高级配置,根据业务需要填写配置信息。参数解释如下表所示。
类别 配置项 说明 常规配置 引擎版本 从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理: - Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
编辑标签 您可以为作业设置标签。后续可以通过搜索的方式,快速找到目标作业。 Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭Checkpoint。 两次Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。 开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大减少反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。 Flink重启策略配置 当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下: - Failure Rate:基于失败率重启。
选择基于失败率重启后,您还需要设置检测Failure Rate的时间间隔、时间间隔内的最大失败次数和每次重启时间间隔。
- Fixed Delay:固定间隔重启。
选择基于固定间隔重启后,您还需要设置尝试重启的次数和每次重启时间间隔。
- No Restarts(默认值):不会重启。
更多Flink配置 在此设置其他Flink配置。例如 taskmanager.numberOfTaskSlots: 1
。日志配置 开启日志归档 默认已开启日志归档功能。开启日志归档后,您可以在作业探查页面查看历史作业实例的日志,详情请参见查看历史作业实例日志。 说明- 在VVR 3.x版本,仅VVR 3.0.7及以上版本支持开启日志归档功能。
- 在VVR 4.x版本,仅VVR 4.0.11及以上版本支持开启日志归档功能。
归档日志有效期(天) 日志归档有效期默认为7天。 Root Log Level 日志级别从低到高顺序如下: - TRACE:比DEBUG更细粒度的信息。
- DEBUG:系统运行状态的信息。
- INFO:重要或者您感兴趣的信息。
- WARN:系统可能出现潜在错误的信息。
- ERROR:系统出现错误和异常的信息。
Log Levels 填写日志名称和日志级别。 Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。 - 在作业开发页面右上角,单击上线。