OTSStream插件主要用于导出Table Store增量数据,本文将为您介绍如何通过OTSStream配置同步任务。

背景信息

OTSStream插件与全量导出插件不同,增量导出插件仅支持多版本模式,且不支持指定列。增量数据可以看作操作日志,除数据本身外还附有操作信息。详情请参见OTSStream Reader

说明 OTSStream配置同步任务时,请注意以下问题:
  • 如果配置任务为日调度,您可以读取当前时间24小时以内的数据,但会丢失当前时间前5分钟的数据。建议您配置任务为小时调度。
  • 设置的结束时间不能超过系统显示的时间,即您设置的结束时间要比运行时间早5分钟。
  • 配置日调度会出现数据丢失的情况。
  • 不可以配置周期调度和月调度。

开始时间和结束时间需要包含操作Table Store表的时间。例如,20171019162000您向Table Store插入2条数据,则开始时间设置为20171019161000,结束时间设置为20171019162600

新增数据源

  1. 登录DataWorks控制台,单击相应工作空间后的进入数据集成
    如果您已在DataWorks的某个功能模块,请单击左上角的图标,选择 全部产品 > 数据集成,即可跳转至 数据集成页面。
  2. 单击左侧导航栏中的数据源,进入工作空间管理 > 数据源管理页面。
  3. 单击右上角的新增数据源
  4. 新增数据源对话框中,选择数据源类型为Table Store(OTS)
  5. 填写Table Store(OTS)数据源的各配置项。
    新增数据源
    参数 描述
    数据源名称 数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。
    数据源描述 对数据源进行简单描述,不得超过80个字符。
    适用环境 可以选择开发生产环境。
    说明 仅标准模式工作空间会显示此配置。
    Endpoint

    Table Store服务对应的Endpoint。

    Table Store实例ID Table Store服务对应的实例ID。
    AccessKey ID 访问密钥中的AccessKey ID,您可以进入用户信息管理页面进行复制。
    AccessKey Secret 访问密钥中的AccessKey Secret,相当于登录密码。
  6. 单击测试连通性
  7. 测试连通性通过后,单击完成

通过向导模式配置同步任务

  1. 数据源管理页面,单击左上角的图标,选择全部产品 > 数据集成
  2. 单击首页中的新建同步任务
  3. 新建节点对话框中,输入节点名称并选择目标文件夹,单击提交
  4. 进入离线同步节点的配置页面,选择数据来源。
    数据来源
    参数 描述
    数据源 输入数据源的名称。
    导出增量数据的表的名称。该表需要开启Stream,您可以在建表时开启,或使用UpdateTable接口开启。
    开始时间 增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位为毫秒。
    结束时间 增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。
    状态表 用于记录状态的表的名称。
    最大重试次数 从TableStore中读增量数据时,每次请求的最大重试次数,默认是30
    导出时序信息 是否导出时序信息,包括数据的写入时间等信息。
  5. 选择数据去向。
    数据去向.png
    参数 描述
    数据源 输入配置的数据源名称。
    选择需要同步的表。
    分区信息 此处需同步的表是非分区表,所以无分区信息。
    清理规则
    • 写入前清理已有数据:导数据之前,清空表或者分区的所有数据,相当于insert overwrite
    • 写入前保留已有数据:导数据之前,不清理任何数据,每次运行数据都是追加进去的,相当于insert into
    空字符串作为null 默认值为
  6. 字段映射。
    左侧的源头表字段和右侧的目标表字段为一一对应的关系。单击 添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击 删除图标进行删除。 字段映射
  7. 通道控制。
    通道控制
  8. 单击工具栏中的保存图标。
  9. 单击工具栏中的运行图标,运行之前需要配置自定义参数。

通过脚本模式配置同步任务

如果您需要通过脚本模式配置此任务,单击工具栏中的 转换脚本,选择 确认即可进入脚本模式。 脚本模式
您可以根据自身进行配置,示例脚本如下。
{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "reader": {
      "plugin": "otsstream",
      "parameter": {
        "datasource": "otsstream",//数据源名,需要与您添加的数据源名称保持一致。
        "dataTable": "person",//导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。
        "startTimeString": "${startTime}",//增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位毫秒。
        "endTimeString": "${endTime}",//运行时间。
        "statusTable": "TableStoreStreamReaderStatusTable",//用于记录状态的表的名称。
        "maxRetries": 30,//请求的最大重试次数。
        "isExportSequenceInfo": false,
      }
    },
    "writer": {
      "plugin": "odps",
      "parameter": {
        "datasource": "odps_first",//数据源名。
        "table": "person",//目标表名。
        "truncate": true,
        "partition": "pt=${bdp.system.bizdate}",//分区信息。
        "column": [//目标列名。
          "id",
          "colname",
          "version",
          "colvalue",
          "optype",
          "sequenceinfo"
        ]
      }
    },
    "setting": {
      "speed": {
        "mbps": 7,//作业速率上限。
        "concurrent": 7//并发数。
      }
    }
  }
}
关于运行时间参数和结束时间参数,有以下两种表现形式(配置任务时选择其中一种):
  • "startTimeString": "${startTime}":增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位为毫秒。

    "endTimeString": "${endTime}":增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。

  • "startTimestampMillis":"":增量数据的时间范围(左闭右开)的左边界,单位为毫秒。

    Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。

    如果statusTable中找不到对应的位点,则从系统保留的增量数据的第1条开始读取,并跳过写入时间小于startTimestampMillis的数据。

    "endTimestampMillis":" ":增量数据的时间范围(左闭右开)的右边界,单位为毫秒。

    Reader插件startTimestampMillis位置开始导出数据后,当遇到第1条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。

    当读取完当前全部的增量数据时,结束读取,即使未达endTimestampMillis

如果配置isExportSequenceInfo项为true,如“isExportSequenceInfo”: true,则会导出时序信息,目标会多出1行,目标字段列则多1列。时序信息包含了数据的写入时间等,默认该值为false,即不导出。