OTSStream插件主要用于导出Table Store增量数据,本文将为您介绍如何通过OTSStream配置同步任务。
背景信息
OTSStream插件与全量导出插件不同,增量导出插件仅支持多版本模式,且不支持指定列。增量数据可以看作操作日志,除数据本身外还附有操作信息。详情请参见OTSStream Reader。
- 如果配置任务为日调度,您可以读取当前时间24小时以内的数据,但会丢失当前时间前5分钟的数据。建议您配置任务为小时调度。
- 设置的结束时间不能超过系统显示的时间,即您设置的结束时间要比运行时间早5分钟。
- 配置日调度会出现数据丢失的情况。
- 不可以配置周期调度和月调度。
开始时间和结束时间需要包含操作Table Store表的时间。例如,20171019162000您向Table Store插入2条数据,则开始时间设置为20171019161000,结束时间设置为20171019162600。
新增数据源
通过向导模式配置同步任务
通过脚本模式配置同步任务

{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "otsstream",
"parameter": {
"datasource": "otsstream",//数据源名,需要与您添加的数据源名称保持一致。
"dataTable": "person",//导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。
"startTimeString": "${startTime}",//增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位毫秒。
"endTimeString": "${endTime}",//运行时间。
"statusTable": "TableStoreStreamReaderStatusTable",//用于记录状态的表的名称。
"maxRetries": 30,//请求的最大重试次数。
"isExportSequenceInfo": false,
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",//数据源名。
"table": "person",//目标表名。
"truncate": true,
"partition": "pt=${bdp.system.bizdate}",//分区信息。
"column": [//目标列名。
"id",
"colname",
"version",
"colvalue",
"optype",
"sequenceinfo"
]
}
},
"setting": {
"speed": {
"mbps": 7,//作业速率上限。
"concurrent": 7//并发数。
}
}
}
}
"startTimeString": "${startTime}"
:增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位为毫秒。"endTimeString": "${endTime}"
:增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。"startTimestampMillis":""
:增量数据的时间范围(左闭右开)的左边界,单位为毫秒。Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。
如果statusTable中找不到对应的位点,则从系统保留的增量数据的第1条开始读取,并跳过写入时间小于startTimestampMillis的数据。
"endTimestampMillis":" "
:增量数据的时间范围(左闭右开)的右边界,单位为毫秒。Reader插件startTimestampMillis位置开始导出数据后,当遇到第1条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。
当读取完当前全部的增量数据时,结束读取,即使未达endTimestampMillis。
如果配置isExportSequenceInfo项为true,如“isExportSequenceInfo”: true
,则会导出时序信息,目标会多出1行,目标字段列则多1列。时序信息包含了数据的写入时间等,默认该值为false,即不导出。