All Products
Search
Document Center

DataWorks:Use Tablestore Stream Reader to configure a synchronization task

Last Updated:Dec 01, 2023

Tablestore Stream Reader allows you to export incremental data from Tablestore. This topic describes how to configure a synchronization task by using Tablestore Stream Reader.

Background information

Unlike the readers that are used to read full data, Tablestore Stream Reader supports only the multi-version mode. If you use Tablestore Stream Reader to read incremental data, you cannot specify the columns from which you want to read data. Incremental data can be considered as operation logs that include data and operation information. For more information, see Tablestore Stream data source.

Note

When you use Tablestore Stream Reader to configure a synchronization task, take note of the following points:

  • If the task is scheduled to run by day, the task reads the data that is generated during the last 24 hours, but does not read the data that is generated in the last 5 minutes. We recommend that you schedule the task by hour.

  • The end time that you specify cannot be later than the current system time. Therefore, the end time must be at least 5 minutes earlier than the scheduled time to run the task.

  • When the task is scheduled to run by day, the data that is read may be incomplete.

  • The task cannot be scheduled to run by week or month.

The time period from the start time to the end time must include the time when operations are performed on the desired Tablestore table. For example, you inserted two data records into a Tablestore table at 16:20:00 on October 19, 2017. You can set the start time to 20171019161000 and the end time to 20171019162600.

Add a data source

    1. Go to the Data Integration page.

      Log on to the DataWorks console. In the left-side navigation pane, click Data Integration. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.

  1. In the left-side navigation pane of the Data Integration page, click Data Source. The Data Source List page appears.

  2. Click Add Data Source.

  3. In the Add data source dialog box, select Tablestore as the data source type.

  4. In the Add OTS data source dialog box, configure the parameters.

    Parameter

    Description

    Data Source Name

    The name of the data source. The name can contain letters, digits, and underscores (_) and must start with a letter.

    Data Source Description

    The description of the data source. The description cannot exceed 80 characters in length.

    Endpoint

    The endpoint of Tablestore.

    Table Store instance name

    The name of the Tablestore instance.

    AccessKey ID

    The AccessKey ID of the AccessKey pair that you use to connect to the Tablestore instance. You can copy the AccessKey ID on the user management page.

    AccessKey Secret

    The AccessKey secret of the AccessKey pair that you use to connect to the Tablestore instance.

  5. Click Test Connectivity.

  6. If the connectivity test is successful, click Complete.

Configure a batch synchronization task on the codeless UI

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the left-side navigation pane, choose Data Modeling and Development > DataStudio. On the page that appears, select the desired workspace from the drop-down list and click Go to DataStudio.

  2. On the DataStudio page, find the desired workflow and click the name of the workflow. Right-click Data Integration and choose Create Node > Offline synchronization.

  3. In the Create Node dialog box, configure the Name and Path parameters and click Confirm.

  4. Select Tablestore Stream for Source and MaxCompute(ODPS) for Destination, select a resource group that is used to run the synchronization task, and then test the connectivity.

  5. Configure the source and destination.

    Category

    Parameter

    Description

    Source

    Table

    The name of the table from which you want to read incremental data. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation to enable this feature after the table is created.

    Start Timestamp

    The start time of the incremental data, in milliseconds. The start time is the left boundary of the left-closed, right-open time range of the incremental data. Configure this parameter in the yyyymmddhh24miss format.

    End Timestamp

    The end time of the incremental data, in milliseconds. The end time is the right boundary of the left-closed, right-open time range of the incremental data. Configure this parameter in the yyyymmddhh24miss format.

    State Table

    The name of the table that Tablestore Stream Reader uses to store status records.

    Maximum Retries

    The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30.

    Export Time Information

    Specifies whether to read time series information. The time series information includes the time when data is written.

    Destination

    Table

    The table to which you want to write data.

    Partition Key Column

    The table to be synchronized is a non-partitioned table. No partition key column is displayed.

    Writing Rule

    • Write with Original Data Deleted (Insert Overwrite): All data in the table or partition is deleted before data import. This rule is equivalent to the INSERT OVERWRITE statement.

    • Write with Original Data Retained (Insert Into): No data is deleted before data import. New data is always appended. This rule is equivalent to the INSERT INTO statement.

    Convert Empty Strings to Null

    Specifies whether to convert empty strings to null. Default value: No.

  6. Configure field mappings.

    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To remove a field, move the pointer over the field and click the Remove icon.

  7. Configure channel control policies.

    通道控制

  8. Click the Save icon in the top toolbar.

  9. Click the Run icon in the top toolbar. You must configure custom parameters before you run the synchronization task.

Configure a synchronization task by using the code editor

To configure the synchronization task by using the code editor, click the Conversion script icon in the top toolbar and click OK in the message that appears. The code editor appears.脚本模式

Configure the synchronization task based on your business requirements. Sample code:

{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "reader": {
      "plugin": "tablestore stream",
      "parameter": {
        "datasource": "tablestore stream",// The name of the data source. Use the name of the data source that you have added. 
        "dataTable": "person",// The name of the table from which the incremental data is exported. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation to enable this feature after the table is created. 
        "startTimeString": "${startTime}",// The start time (included) in milliseconds of the incremental data. The format is yyyymmddhh24miss. 
        "endTimeString": "${endTime}",// The end time (excluded) in milliseconds of the incremental data. 
        "statusTable": "TableStoreStreamReaderStatusTable",// The name of the table that is used to store status records. 
        "maxRetries": 30,// The maximum number of retries for each request. 
        "isExportSequenceInfo": false,
      }
    },
    "writer": {
      "plugin": "odps",
      "parameter": {
        "datasource": "odps_first",// The name of the data source. 
        "table": "person",// The name of the destination table. 
        "truncate": true,
        "partition": "pt=${bdp.system.bizdate}",// The information about the partition. 
        "column": [// The column to which data is written. 
          "id",
          "colname",
          "version",
          "colvalue",
          "optype",
          "sequenceinfo"
        ]
      }
    },
    "setting": {
      "speed": {
        "mbps": 7,// The maximum transmission rate. Unit: MB/s. 
        "concurrent": 7// The maximum number of parallel threads. 
      }
    }
  }
}

You can configure the start time and end time by using one of the following methods:

  • "startTimeString": "${startTime}": the start time (included) in milliseconds of the incremental data. The format is yyyymmddhh24miss.

    "endTimeString": "${endTime}": the end time (excluded) in milliseconds of the incremental data. The format is yyyymmddhh24miss.

  • "startTimestampMillis":"": the start time (included) in milliseconds of the incremental data.

    Tablestore Stream Reader searches for the status records in the table that is specified by the statusTable parameter based on the time that is specified by the startTimestampMillis parameter. Then, Tablestore Stream Reader reads data from this point in time.

    If Tablestore Stream Reader cannot find status records of this point in time in the table that is specified by the statusTable parameter, Tablestore Stream Reader reads incremental data that is retained by the system from the first entry, and skips the data that is written earlier than the time that is specified by the startTimestampMillis parameter.

    "endTimestampMillis":"": the end time (excluded) in milliseconds of the incremental data.

    Tablestore Stream Reader reads data from the time that is specified by the startTimestampMillis parameter and stops when it reads the first entry that is written later than or equal to the time that is specified by the endTimestampMillis parameter.

    When Tablestore Stream Reader reads all the incremental data, the reading process ends even if the time that is specified by the endTimestampMillis parameter has not arrived.

If the isExportSequenceInfo parameter is set to true ("isExportSequenceInfo": true), the system exports an extra column for time series information. The time series information contains the time when data is written. The default value is false, which indicates that no time series information is exported.