This topic describes the data types and parameters that OTSStream Reader supports and how to configure it by using the codeless user interface (UI) and code editor.

Background information

OTSStream Reader allows you to export the incremental data from Tablestore. Incremental data can be considered as operations logs that include data and operation information.

Unlike plug-ins for exporting full data, OTSStream Reader only supports the multi-version mode. In addition, OTSStream Reader cannot export the data of specified columns. Before you use OTSStream Reader, make sure that the Stream feature is enabled for your source table. You can enable this feature when you create the table or by using the UpdateTable operation in the SDK.

The following example describes how to enable the Stream feature:
SyncClient client = new SyncClient("", "", "", "");
Enable this feature when you create a table.
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // The value 24 indicates that the incremental data is retained for 24 hours.
client.createTable(createTableRequest);
If this feature is not enabled when the table is created, enable it by using the UpdateTable operation.
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

You can enable the Stream feature and set the expiration time by using the UpdateTable operation in the SDK. After this feature is enabled, you can export the incremental data from Tablestore. After the Stream feature is enabled, the Tablestore server saves your operations logs additionally. Each partition has a sequential operations log queue. Each operations log is removed by garbage collection after the specified expiration time.

The Tablestore SDK provides several Stream API operations for reading these operations logs. OTSStream Reader obtains incremental data by using these API operations. By default, OTSStream Reader transforms incremental data to multiple 6-tuples, pk, colName, version, colValue, opType, and sequenceInfo, and imports them to MaxCompute.

Export data by column

In the multi-version mode of Tablestore, table data is organized in a three-level architecture: row, column, and version. One row can have multiple columns. The column name is not fixed, and each column can have multiple versions. Each version has a specific timestamp, that is, the version number.

You can perform read/write operations by using the Tablestore API. Tablestore stores the incremental data by storing the records of recent write and modify operations on table data. Incremental data can be considered as a set of operation records.

Tablestore supports the following three operations:
  • PutRow: writes a row. If the row already exists, it is overwritten.
  • UpdateRow: updates a row without modifying other data of the original row. You can add column values, overwrite column values if the corresponding version of a column already exists, delete all the versions of a column, or delete a version of a column.
  • DeleteRow: deletes a row.

Tablestore generates incremental data records based on each type of operations. OTSStream Reader reads these records and exports them as data types that Data Integration supports.

Tablestore supports dynamic columns and the multi-version mode. Therefore, a row that is exported by OTSStream Reader corresponds to a version of a column rather than a row in Tablestore. A row in Tablestore may correspond to multiple exported rows. Each exported row includes the primary key value, column name, timestamp of the version for the column (version number), value of the version, and operation type. If the isExportSequenceInfo parameter is set to true, time series information is also included.

After data is converted to types that Data Integration supports, you can perform the following operations:
  • U (UPDATE): writes a version of a column.
  • DO (DELETE_ONE_VERSION): deletes a version of a column.
  • DA (DELETE_ALL_VERSION): deletes all the versions of a column based on the primary key and the column name.
  • DR (DELETE_ROW): deletes a row based on the primary key.
In the following example, the table has two primary key columns: pkName1 and pkName2.
pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 N/A DO
pk1_V2 pk2_V2 col_b N/A N/A DA
pk1_V3 pk2_V3 N/A N/A N/A DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U
In this example, seven rows are exported, corresponding to three rows in the Tablestore table. The primary keys for the three rows are (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), and (pk1_V3, pk2_V3).
  • For the row whose primary key is (pk1_V1, pk2_V1), three operations are included: writing two versions of column col_a and one version of column col_b.
  • For the row whose primary key is (pk1_V2, pk2_V2), two operations are included: deleting one version of column col_a and deleting all versions of column col_b.
  • For the row whose primary key is (pk1_V3, pk2_V3), two operations are included: deleting the row and writing one version of column col_a.

Export data by row

You can also export data by row. In this mode, OTSStream Reader exports operation records as rows. You must specify this mode and the columns to be exported.
"parameter": {
  # Set other parameters, such as datasource and table, as usual.
  "mode": "single_version_and_update_only", # The export mode.
  "column":[  # The columns to be exported from Tablestore. You can specify the columns as needed.
          {
             "name": "uid"  # The name of the column, which can be a primary key column or a property column.
          },
          {
             "name": "name"  # The name of the column, which can be a primary key column or a property column.
          },
  ],
  "isExportSequenceInfo": false, # Specifies whether to export time-series information. If you set the mode parameter to single_version_and_update_only, this parameter can only be set to false.
}
The data that is exported by row is closer to the data in the original rows, which facilitates further data processing. Take note of the following points when you export data by row:
  • The exported rows are extracted from operation records. Each exported row corresponds to a write or modify operation. If you only modify some of the columns for a row, the operation record only contains the modified rows, and other columns are empty.
  • If you export data by row, the version number of each column, which is the timestamp of each column, cannot be exported. In addition, delete operations are not allowed.

Data types

OTSStream Reader supports all Tablestore data types. The following table describes the data types that OTSStream Reader supports.
Category OTSStream data type
Integer INTEGER
Floating point DOUBLE
String STRING
Boolean BOOLEAN
Binary BINARY

Parameters

Parameter Description Required Default value
dataSource The connection name. It must be the same as the name of the created connection. You can create connections in the code editor. Yes N/A
dataTable The name of the table from which incremental data is exported. You must enable the Stream feature for a table when you create the table, or by calling the UpdateTable operation after you create the table. Yes N/A
statusTable The name of the table that OTSStream Reader uses to store status records. These records help filter out the data that is not covered by the target range and improve export efficiency. The table that is specified by the statusTable parameter is used to store status records. If the specified table does not exist, OTSStream Reader automatically creates the table. After a sync node is completed, you do not need to delete the table. The status records in the table can be used by the next sync node.
  • You do not need to create the table in advance. Instead, provide the table name. If the specified table does not exist, OTSStream Reader attempts to create the table under your instance. If the specified table exists, OTSStream Reader checks whether the metadata of the table is as expected. If the metadata is not as expected, an exception is thrown.
  • After a sync node is completed, you do not need to delete the table. The status records in the table can be used by the next sync node.
  • Data in the table automatically expires based on the time-to-live (TTL). Therefore, the data volume is small.
  • You can use the same table to store status records of multiple tables that are managed by the same instance. The status records are independent of each other.

You must configure a name that is easy to recognize, such as TableStoreStreamReaderStatusTable. The name cannot be the same as that for any business-related table.

Yes N/A
startTimestampMillis The left boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds.
  • OTSStream Reader searches for the status records in the table that is specified by the statusTable parameter based on the time that is specified by the startTimestampMillis parameter and reads data from this time point.
  • If OTSStream Reader cannot find status records of this time point, OTSStream Reader reads the incremental data that is retained by the system from the first entry, and skips the data that is written later than the time that is specified by the startTimestampMillis parameter.
No N/A
endTimestampMillis The right boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds.
  • OTSStream Reader reads data from the time that is specified by the startTimestampMillis parameter and stops to read data that is written later than or equal to the time that is specified by the endTimestampMillis parameter.
  • If OTSStream Reader has read all the incremental data, OTSStream Reader stops reading data even before the time that is specified by the endTimestampMillis parameter.
No N/A
date The date when data is exported. The format is yyyyMMdd, for example, 20151111. You must specify this parameter or the startTimestampMillis and endTimestampMillis parameters. For example, Alibaba Cloud Data Process Center schedules nodes by day. Therefore, the date parameter is provided. No N/A
isExportSequenceInfo Specifies whether to export time-series information. Time-series information includes the time when data is written. The default value is false, indicating that time series information is not exported. No false
maxRetries The maximum number of retries of each request for reading incremental data from Tablestore. The default value is 30. Retries are performed at specific intervals. The total time of 30 retries is approximately 5 minutes. Generally, you can use the default value. No 30
startTimeString The start time of the sync node. The start time is the left boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss. No N/A
endTimeString The end time of the sync node. The end time is the right boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss. No N/A
mode The export mode. If this parameter is set to single_version_and_update_only, data is exported by row. By default, this parameter is not specified, and data is not exported by column. No N/A

Configure OTSStream Reader by using the codeless UI

  1. Configure the connections.
    Configure the connections to the source and destination data stores for the sync node.Connections section
    GUI element Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type and select the name of a connection that you have configured in DataWorks.
    Table The dataTable parameter in the preceding parameter description.
    Start Timestamp The start time of the sync node. The start time is the left boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss.
    Note The start time must be a point in time that is within seven days from the time when the sync node is configured.
    End Timestamp The end time of the sync node. The end time is the right boundary of the time range (left-closed, right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss.
    State Table The name of the table for storing status records.
    Maximum Retries The maxRetries parameter in the preceding parameter description. Default value: 30.
    Export Time Information The isExportSequenceInfo parameter in the preceding parameter description. By default, Export Time Information is not selected.
  2. Configure field mapping. It is equivalent to setting the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To delete a field, move the pointer over the field and click theDelete icon.Mappings section
    GUI element Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
  3. Configure channel control policies.Channel section
    GUI element Description
    Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from or write data to data stores. You can configure the concurrency for the node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.

Configure OTSStream Reader by using the code editor

The following example shows how to configure a sync node to read data from Tablestore. For more information about the parameters, see the preceding parameter description. For more information about how to configure a sync node by using the code editor, see Create a sync node by using the code editor.
{
    "type":"job",
    "version":"2.0", // The version number.
    "steps":[
        {
            "stepType":"otsstream",// The reader type.
            "parameter":{
                "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table for storing status records.
                "maxRetries":30,// The maximum number of retries on each request of reading incremental data from Tablestore. It is set to 30 by default.
                "isExportSequenceInfo":false,// Specifies whether to export the time series information.
                "datasource":"$srcDatasource",// The connection name.
                "startTimeString":"${startTime}",// The left boundary of the time range (left-closed, right-open) of incremental data.
                "table":"",// The name of the source table.
                "endTimeString":"${endTime}"// The right boundary of the time range (left-closed, right-open) of incremental data.
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0" // The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":false // Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "concurrent":1, // The maximum number of concurrent threads.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}