MaxCompute数据源作为数据中枢,为您提供读取和写入MaxCompute双向通道的功能。
使用限制
离线读
- MaxCompute Reader支持读取分区表、非分区表,不支持读取虚拟视图、不支持同步外部表。
- 离线读MaxCompute分区表时,不支持直接对分区字段进行字段映射配置,需要在配置数据来源时指定待同步数据的分区信息。
例如,分区表t0其字段包含id、name两个字段,一级分区为pt,二级分区为ds。读取t0的pt=1,ds=hangzhou分区数据时,您需要在配置数据来源时指定分区值为pt=1,ds=hangzhou,后续字段映射配置时进行id、name字段的映射配置。
- MaxCompute Reader不支持数据过滤功能。
如果您在数据同步过程中,需要过滤符合条件的数据,请创建新表并写入过滤数据后,同步新表中的数据。
离线写
当数据有null值时,MaxCompute Writer不支持VARCHAR类型。
实时写
- 实时数据同步任务仅支持使用独享数据集成资源组。
- 实时同步节点目前仅支持同步PolarDB、Oracle、MySQL数据源至MaxCompute。
- 实时数据同步任务暂不支持同步没有主键的表。
- 当实时同步至MaxCompute默认数据源(一般为
odps_first
)时,默认使用临时AK进行同步,临时AK超过7天会自动过期,同时,将导致任务运行失败。平台检测到临时AK导致任务失败时会自动重启任务,如果任务配置了该类型的监控报警,您将会收到报警信息。 - 一键实时同步至MaxCompute任务配置当天仅能查询历史全量数据,增量数据需要等待第二天merge完成后才可在MaxCompute查询。
- 一键实时同步至MaxCompute任务每天会生成一个全量分区,为避免数据过多占用存储资源,本方案任务自动建立的MaxCompute表,默认生命周期为30天。如果时长不满足您的业务需求,可以在配置同步任务时单击对应的MaxCompute表名修改生命周期。
- 数据集成使用MaxCompute引擎同步数据通道进行数据上传和下载(同步数据通道SLA详情请参见数据传输服务(上传)场景与工具),请根据MaxCompute引擎同步数据通道SLA评估数据同步业务技术选型。
- 一键实时同步至MaxCompute,按实例模式同步时,独享数据集成资源组规格最低需要为8C16G。
- 仅支持与当前工作空间同地域的自建MaxCompute数据源,跨地域的MaxCompute项目在测试数据源服务连通性时可以正常连通,但同步任务执行时,在MaxCompute建表阶段会报引擎不存在的错误。说明 使用自建MaxCompute数据源时,DataWorks项目仍然需要绑定MaxCompute引擎,否则将无法创建MaxCompute SQL节点,导致全量同步标done节点创建失败。
注意事项
如果目标表列没有配置和源端的列映射,则同步任务执行完成后,此列值为空,即使此目标列建表时指定了默认值,列值仍为空。
支持的字段类型
支持MaxCompute的1.0数据类型、2.0数据类型、Hive兼容数据类型。不同数据类型版本支持的字段类型详情如下。
1.0数据类型支持的字段
字段类型 | 离线读 | 离线写 | 实时写 |
---|---|---|---|
BIGINT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
2.0数据类型、Hive兼容数据类型支持的字段
字段类型 | 离线读(MaxCompute Reader) | 离线写(MaxCompute Writer) | 实时写 |
---|---|---|---|
TINYINT | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
INT | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL(pecision,scale) | 支持 | 支持 | 支持 |
VARCHAR(n) | 支持 | 支持 | 支持 |
CHAR(n) | 不支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
数据类型转换说明
类型分类 | 数据集成配置类型 | 数据库数据类型 |
---|---|---|
整数类 | LONG | BIGINT、INT、TINYINT和SMALLINT |
布尔类 | BOOLEAN | BOOLEAN |
日期时间类 | DATE | DATETIME、TIMESTAMP和DATE |
浮点类 | DOUBLE | FLOAT、DOUBLE和DECIMAL |
二进制类 | BYTES | BINARY |
复杂类 | STRING | ARRAY、MAP和STRUCT |
数据同步前准备:MaxCompute环境准备
读取或写入MaxCompute表数据时,您可以根据需要选择是否开启相关属性。
连接MaxCompute并开启项目级配置
- 登录MaxCompute客户端,详情请参见使用客户端(odpscmd)连接。
- 开启MaxCompute项目级相关配置:请确认是否已拥有对应的操作权限,您可使用Project Owner账号执行相关操作,关于MaxCompute权限说明,详情请参见角色规划。
开启acid属性
setproject odps.sql.acid.table.enable=true;
(可选)开启2.0数据类型
setproject odps.sql.type.system.odps2=true;
(可选)创建账号
- 创建个人Accesskey ID和Accesskey Secret,操作详情请参见准备阿里云账号。
- 创建MaxCompute数据源,详情请参见配置MaxCompute数据源。
数据同步任务开发:MaxCompute同步流程引导
MaxCompute数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
绑定MaxCompute引擎与创建数据源
- 当您为DataWorks工作空间绑定MaxCompute计算引擎时,DataWorks将自动基于您MaxCompute计算引擎绑定信息,在数据源配置界面新增该计算引擎数据源。此外,您也可以将其他MaxCompute项目添加为当前工作空间数据源,以便同步读取数据时使用,操作流程请参见创建与管理数据源。
- DataWorks工作空间绑定第一个MaxCompute引擎时,DataWorks在数据源管理页面自动生成的默认的数据源名为odps_first。若再次为工作空间额外再绑定一个MaxCompute引擎实例时,DataWorks仍会为您新建MaxCompute计算引擎数据源,此时,为区别第一个引擎数据源,后添加的引擎其数据源命名格式为0_regionId_引擎名称。
- 默认添加的计算引擎数据源,其对应的MaxCompute项目名称,均为当前工作空间绑定的MaxCompute计算引擎项目名称。您可以前往工作空间管理页面,查看计算引擎绑定信息。若要修改工作空间计算引擎绑定信息,请确认无运行中的任务(包括数据集成或数据开发等一切和DataWorks相关的任务),查看MaxCompute计算引擎信息,详情请参见绑定MaxCompute引擎。
单表离线同步任务配置指导
- 操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表实时同步任务配置指导
操作流程请参见配置单表增量数据实时同步、DataStudio侧实时同步任务配置。
整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
- 读取MaxCompute(ODPS)表数据时,添加一行注意事项
- 读取MaxCompute(ODPS)表数据时,如何同步分区字段?
- 读取MaxCompuet(ODPS)表数据时,如何同步多个分区数据?
- MaxCompute如何实现列筛选、重排序和补空等
- MaxCompute列配置错误的处理
- MaxCompute分区配置注意事项
- MaxCompute任务重跑和failover
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
MaxCompute Reader脚本Demo
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",//插件名。
"parameter":{
"partition":[],//读取数据所在的分区。
"isCompress":false,//是否压缩。
"datasource":"",//数据源。
"column":[//源头表的列信息。
"id"
],
"emptyAsNull":true,
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源。将上述示例中的"datasource":"",
替换为数据源的具体参数,示例如下。 "accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
MaxCompute Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称。脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
table | 读取数据表的表名称(大小写不敏感)。 | 是 | 无 |
partition | 读取的数据所在的分区信息。
例如,分区表test包含pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、pt=2,ds=beijing四个分区,则读取不同分区数据的配置如下:
此外,您还可以根据实际需求设置分区数据的获取条件:
说明 /*query*/ 表示将其后填写的内容识别为一个where条件。 | 如果表为分区表,则必填。如果表为非分区表,则不能填写。 | 无 |
column | 读取MaxCompute源头表的列信息。例如表test的字段为id、name和age:
| 是 | 无 |
MaxCompute Writer脚本Demo
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",//插件名。
"parameter":{
"partition":"",//分区信息。
"truncate":true,//清理规则。
"compress":false,//是否压缩。
"datasource":"odps_first",//数据源名。
"column": [//源端列名。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"emptyAsNull":false,//空字符串是否作为null。
"table":""//表名。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数,表示脏数据的最大容忍条数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源:将上述示例中的"datasource":"",
替换为数据源的具体参数,示例如下。"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
MaxCompute Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
table | 写入的数据表的表名称(大小写不敏感),不支持填写多张表。 | 是 | 无 |
partition | 需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如pt=20150101, type=1, biz=2 :
| 如果表为分区表,则必填。如果表为非分区表,则不能填写。 | 无 |
column | 需要导入的字段列表。当导入全部字段时,可以配置为"column": ["*"] 。当需要插入部分MaxCompute列,则填写部分列,例如"column": ["id","name"] :
| 是 | 无 |
truncate | 通过配置"truncate": "true" 保证写入的幂等性。即当出现写入失败再次运行时,MaxCompute Writer将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致 。 因为利用MaxCompute SQL进行数据清理工作,SQL无法保证原子性,所以truncate选项不是原子操作。当多个任务同时向一个Table或Partition清理分区时,可能出现并发时序问题,请务必注意。 针对该类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。 | 是 | 无 |