OTSStream is a plug-in that allows you to export incremental data from Tablestore. This topic describes how to configure an OTSStream sync node.

Background information

Unlike plug-ins that are used to export full data, OTSStream supports only the multi-version mode and cannot be used to export data in specified columns. Incremental data can be considered as operations logs that include data and operation information. For more information, see Configure OTSStream Reader.

Note When you configure an OTSStream sync node, take note of the following points:
  • If the node is scheduled to run by day, the node reads the data that is generated during the last 24 hours, but does not read the data that is generated in the last 5 minutes. We recommend that you schedule the node to run at intervals of hours.
  • The end time that you specify cannot be later than the current system time. Therefore, the end time must be at least 5 minutes earlier than the scheduled time to run the node.
  • When the node is scheduled to run by day, the read data may be incomplete.
  • The node cannot be scheduled to run by week or month.

The time period from the start time to the end time must include the time when operations are performed on the Tablestore table. Assume that you inserted two data records to a Tablestore table at 16:20:00 on October 19, 2017. You can set the start time to 20171019161000 and the end time to 20171019162600.

Create a connection

  1. Log on to the DataWorks console. Find the required workspace and click Data Integration.
    If you are using another service of DataWorks, click the More icon in the upper-left corner and choose All Products > Data Integration to go to the Data Integration page.
  2. Click Connection in the left-side navigation pane to go to the Workspace Management > Data Source page.
  3. Click New data source in the upper-right corner.
  4. In the Add data source dialog box, select OTS as the connection type.
  5. Set parameters for the Tablestore connection.
    Parameter Description
    Data Source Name The name of the connection can contain letters, digits, and underscores (_), and must start with a letter.
    Data source description The description of the connection. The description can be up to 80 characters in length.
    Environment Valid values: Development and Production.
    Note This parameter is displayed only when the workspace is in standard mode.
    Endpoint

    The endpoint of Tablestore.

    Table Store instance ID The ID of the Tablestore instance.
    AccessKey ID The AccessKey ID of the AccessKey pair. You can copy the AccessKey ID on the User Management page.
    AccessKey Secret The AccessKey secret of the AccessKey pair, which is equivalent to the logon password.
  6. Click Test connectivity.
  7. After the connection passes the connectivity test, click Complete.

Configure a sync node on the codeless user interface (UI)

  1. On the Data Source page, click the More icon in the upper-left corner and choose All Products > Data Integration.
  2. Click New offline synchronization on the homepage.
  3. In the Create Node dialog box, set the Node Name and Location parameters and click Commit.
  4. On the configuration tab of the batch sync node, select the connection to the source data store.
    Parameter Description
    Connection The name of the connection.
    Table The name of the table from which the incremental data is exported. You must enable the Stream feature for the table when you create the table, or by calling the UpdateTable operation after you create the table.
    Start Timestamp The start time (included) in milliseconds of the incremental data. The format is yyyymmddhh24miss.
    End Timestamp The end time (excluded) in milliseconds of the incremental data. The format is yyyymmddhh24miss.
    State Table The name of the table that is used to store status records.
    Maximum Retries The maximum number of retries of each request that is used to read incremental data from Tablestore. Default value: 30.
    Export Time Information Specifies whether to export time series information, including the time when data is written.
  5. Configure the connection to the destination data store.
    Parameter Description
    Connection The name of the connection that you have configured.
    Table The table that you want to synchronize.
    Partition Key Column The table to be synchronized is an unpartitioned table. No partition key column is displayed.
    Writing Rule
    • Write with Original Data Deleted (Insert Overwrite): All data in the table or partition is deleted before data import. This rule is equivalent to the INSERT OVERWRITE statement.
    • Write with Original Data Retained (Insert Into): No data is deleted before data import. New data is always appended upon each run. This rule is equivalent to the INSERT INTO statement.
    Convert Empty Strings to Null Default value: No.
  6. Configure field mappings.
    Each field in the source table on the left corresponds with a field in the destination table on the right. You can click Add to add a field, or move the pointer over a field and click the Delete icon to delete the field.
  7. Configure channel control policies.
    Channel
  8. Click the Save icon in the toolbar.
  9. Click the Run icon in the toolbar. You must configure custom parameters before you run the batch sync node.

Configure a batch sync node by using the code editor

To configure the batch sync node by using the code editor, click Switch to Code Editor in the toolbar, and then click OK.

Configure the batch sync node as needed. The following provides an example script:
{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "reader": {
      "plugin": "otsstream",
      "parameter": {
        "datasource": "otsstream",// The name of the connection. Use the name of the connection that you have created.
        "dataTable": "person",// The name of the table from which the incremental data is exported. You must enable the Stream feature for the table when you create the table, or by calling the UpdateTable operation after you create the table.
        "startTimeString": "${startTime}",// The start time (included) in milliseconds of the incremental data. The format is yyyymmddhh24miss.
        "endTimeString": "${endTime}",// The end time (excluded) in milliseconds of the incremental data.
        "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that is used to store status records.
        "maxRetries": 30,// The maximum number of retries of each request.
        "isExportSequenceInfo": false,
      }
    },
    "writer": {
      "plugin": "odps",
      "parameter": {
        "datasource":"odps_first",// The name of the connection.
        "table": "person",// The name of the destination table.
        "truncate": true,
        "partition": "pt=${bdp.system.bizdate}",// The information about the partition.
        "column": [// The column to which data is written.
          "id",
          "colname",
          "version",
          "colvalue",
          "optype",
          "sequenceinfo"
        ]
      }
    },
    "setting": {
      "speed": {
        "mbps": 7,// The maximum transmission rate.
        "concurrent": 7// The maximum number of concurrent threads.
      }
    }
  }
}
You can configure the start time and end time by using one of the following methods:
  • "startTimeString": "${startTime}",// The start time (included) in milliseconds of the incremental data. The format is yyyymmddhh24miss.

    "endTimeString": "${endTime}",// The end time (excluded) in milliseconds of the incremental data. The format is yyyymmddhh24miss.

  • "startTimestampMillis":"": The start time (included) in milliseconds of the incremental data.

    OTSStream Reader searches for the status records in the table that is specified by the statusTable parameter based on the time that is specified by the startTimestampMillis parameter and reads data from this time point.

    If OTSStream Reader cannot find status records of this time point in the table that is specified by the statusTable parameter, OTSStream Reader reads incremental data that is retained by the system from the first entry, and skips the data that is written earlier than the time that is specified by the startTimestampMillis parameter.

    "endTimestampMillis":" ": The end time (excluded) in milliseconds of the incremental data.

    OTSStream Reader reads data from the time that is specified by the startTimestampMilli parameter and stops to read data that is written later than or equal to the time that is specified by the endTimestampMilli parameter.

    When OTSStream Reader reads all the incremental data, the reading process is ended even if the time that is specified by the endTimestampMillis parameter has not arrived.

If isExportSequenceInfo is set to true ("isExportSequenceInfo": true), the system exports an extra column for time series information. The time series information contains the time when data is written. The default value is false, which indicates that no time series information is exported.