AnalyticDB for MySQL 2.0数据源为您提供读取和写入AnalyticDB for MySQL 2.0的双向通道,本文为您介绍DataWorks的AnalyticDB for MySQL 2.0数据同步的能力支持情况。

使用限制

  • 离线同步支持读取视图(VIEW)表。
  • AnalyticDB for MySQL 2.0 Reader不支持multivalue,否则同步任务会直接异常退出。

支持的字段类型

离线读

AnalyticDB for MySQL 2.0 Reader针对AnalyticDB for MySQL 2.0类型的转换列表,如下所示。
AnalyticDB for MySQL 2.0类型DataX类型MaxCompute类型
BIGINTLONGBIGINT
TINYINTLONGINT
TIMESTAMPDATEDATETIME
VARCHARSTRINGSTRING
SMALLINTLONGINT
INTLONGINT
FLOATSTRINGDOUBLE
DOUBLESTRINGDOUBLE
DATEDATEDATETIME
TIMEDATEDATETIME

离线写

AnalyticDB for MySQL 2.0 Writer针对AnalyticDB for MySQL 2.0类型的转换列表,如下所示。
类型AnalyticDB for MySQL 2.0数据类型
整数类INT、TINYINT、SMALLINT、BIGINT
浮点类FLOAT和DOUBLE
字符串类VARCHAR
日期时间类DATE和TIMESTAMP
布尔类BOOLEAN

数据同步任务开发

AnalyticDB for MySQL 2.0数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源

单表离线同步任务配置指导

整库离线读同步配置指导

操作流程请参见数据集成侧同步任务配置

附录:脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

Reader脚本Demo

{
    "type": "job",
    "steps": [
        {
            "stepType": "ads",
            "parameter": {
                "datasource": "ads_demo",
                "table": "th_test",
                "column": [
                    "id",
                    "testtinyint",
                    "testbigint",
                    "testdate",
                    "testtime",
                    "testtimestamp",
                    "testvarchar",
                    "testdouble",
                    "testfloat"
                ],
                "odps": {
                    "accessId": "<yourAccessKeyId>",
                    "accessKey": "<yourAccessKeySecret>",
                    "account": "*********@aliyun.com",
                    "odpsServer": " http://service.cn.maxcompute.aliyun-inc.com/api",
                    "tunnelServer": "http://dt.cn-shanghai.maxcompute.aliyun-inc.com",
                    "accountType": "aliyun",
                    "project": "odps_test"
                },
                "mode": "ODPS"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "concurrent": 2,
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "mbps":"12"//限流
        }
    }
}

Reader脚本参数

参数描述是否必选默认值
table需要导出的表的名称。
column列名,如果没有,则为全部。*
limit限制导出的记录数。
wherewhere条件,方便添加筛选条件,此处的String会被直接作为SQL条件添加到查询语句中,例如where id < 100
mode目前支持Select和ODPS2种导入类型。
  • Select:使用limit分页。
  • ODPS:使用ODPS DUMP来导出数据,需要有ODPS的访问权限。
Select
odps.accessKey当mode=ODPS时必填,AnalyticDB for MySQL 2.0访问ODPS使用的云账号AccessKey,需要有Describe、Create、Select、Alter、Update和Drop权限。
odps.accessId当mode=ODPS时必填,AnalyticDB for MySQL 2.0访问ODPS使用的云账号AccessID,需要有Describe、Create、Select、Alter、Update和Drop权限。
odps.odpsServer当mode=ODPS时必填,ODPS API地址。
odps.tunnelServer当mode=ODPS时必填,ODPS Tunnel地址。
odps.project当mode=ODPS时必填,ODPS Project名称。
odps.accountType当mode=ODPS时生效,ODPS访问账号类型。aliyun

Writer脚本Demo

{
    "type":"job",
    "version":"2.0",
    "steps":[ 
        {
            "stepType":"stream",
            "parameter":{
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ads",//插件名。
            "parameter":{
                "partition":"",//目标表的分区名称。
                "datasource":"",//数据源。
                "column":[//字段。
                     "id"
                ],
                "writeMode":"insert",//写入模式。
                "batchSize":"256",//一次性批量提交的记录数大小。
                "table":"",//表名。
                "overWrite":"true"//AnalyticDB for MySQL 2.0写入是否覆盖当前写入的表,true为覆盖写入,false为不覆盖。(追加)写入。当 writeMode 为 Load 时,该值才会生效。
                "options.ignoreEmptySource": true //忽略源头数据空文件报错信息,任务不报错,默认true,不配置默认是true,设置为false读不到源头数据任务会报错
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本参数

参数描述必选默认值
连接urlAnalyticDB for MySQL 2.0连接信息,格式为Address:Port
数据库AnalyticDB for MySQL 2.0的数据库名称。
Access IdAnalyticDB for MySQL 2.0对应的AccessKey Id
Access KeyAnalyticDB for MySQL 2.0对应的AccessKey Secret
datasource数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。
table目标表的表名称。
partition目标表的分区名称,当目标表为普通表,需要指定该字段。
writeModeAnalyticDB for MySQL 2.0 Writer实现了两种模式向AnalyticDB for MySQL 2.0导入数据。
  • Insert模式,在主键冲突情况下新的记录会覆盖旧的记录。
  • Load模式,数据通过第三方系统中转落地导入方式。
column目的表字段列表,可以为["*"],或者具体的字段列表,例如["a", "b", "c"]
suffixAnalyticDB for MySQL 2.0 url配置项的格式为ip:port,此部分为您定制的连接串,是可选参数。实际在AnalyticDB for MySQL 2.0数据库访问时,会变成JDBC数据库连接串。例如配置suffix为autoReconnect=true&failOverReadOnly=false&maxReconnects=10
batchSizeAnalyticDB for MySQL 2.0提交数据写的批量条数,当writeMode为insert时,该值才会生效。writeMode为insert时,为必选。
bufferSizeDataX数据收集缓冲区大小,缓冲区的目的是积累一个较大的Buffer,源头的数据首先进入到此Buffer中进行排序,排序完成后再提交至AnalyticDB for MySQL 2.0。排序是根据AnalyticDB for MySQL 2.0的分区列模式进行的,排序的目的是数据顺序对AnalyticDB for MySQL 2.0服务端更友好(出于性能考虑)。

BufferSize缓冲区中的数据会经过batchSize批量提交至AnalyticDB for MySQL 2.0,通常需要设置bufferSize为batchSize数量的多倍。当writeMode为insert时,该值才会生效。

writeMode为insert时,为必选。默认不配置不开启此功能。