This topic describes the data types and parameters that are supported by OTSStream Reader and how to configure OTSStream Reader by using the codeless user interface (UI) and code editor.

Background information

OTSStream Reader reads incremental data from Tablestore. Incremental data can be considered as operation logs that include data and operation information.

Unlike the readers that are used to read full data, OTSStream Reader supports only the multi-version mode. If you use OTSStream Reader to read incremental data, you cannot specify the columns from which you want to read data. Before you use OTSStream Reader, make sure that the Stream feature is enabled for your source table. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation in the Tablestore SDK to enable this feature after a table is created.

The following sample code provides an example on how to enable the Stream feature:
SyncClient client = new SyncClient("", "", "", "");
Enable the Stream feature when you create a table.
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // The value 24 indicates that Tablestore retains incremental data for 24 hours. 
client.createTable(createTableRequest);
If you do not enable the Stream feature when you create a table, you can call the UpdateTable operation to enable this feature after the table is created.
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

You can enable the Stream feature and specify expiration time for incremental data by calling the UpdateTable operation in the Tablestore SDK. After you enable the Stream feature, the Tablestore server stores the operation logs of your Tablestore table. Each partition queues operation logs in sequence. Each operation log is recycled after the specified expiration time.

The Tablestore SDK provides several Stream-related API operations that are used to read operation logs. OTSStream Reader calls these API operations to read incremental data. By default, OTSStream Reader converts the incremental data into multiple 6-tuples and sends them to a writer. Each 6-tuple consists of pk, colName, version, colValue, opType, and sequenceInfo.

Read data by column

In the multi-version mode of Tablestore, table data is organized in a three-level structure: row, column, and version. One row can have multiple columns, and the column name is not fixed. Each column can have multiple versions, and each version has a specific timestamp, which is the version number.

You can call the API operations of Tablestore to read and write data. Tablestore records the recent write and modify operations that are performed on table data to record the incremental data. Therefore, incremental data can be considered as a set of operation records.

Tablestore supports the following types of operations:
  • PutRow: writes a row. If the row exists, it is overwritten.
  • UpdateRow: updates a row without modifying other data of the original row. You can use UpdateRow to add column values, overwrite column values if a version of a column exists, or delete a specific version or all the versions of a column.
  • DeleteRow: deletes a row.

Tablestore generates incremental data records based on each type of operation. OTSStream Reader reads these records and converts them to a format supported by Data Integration.

Tablestore supports dynamic columns and the multi-version mode. Therefore, a row that is generated by OTSStream Reader is a version of a column rather than a row in Tablestore. After OTSStream Reader reads data from a row in Tablestore, OTSStream Reader converts the data to multiple rows. Each row includes the primary key value, column name, timestamp of the version for the column (version number), value of the version, and operation type. If the isExportSequenceInfo parameter is set to true, time series information is also included.

The following types of operations are defined for the data that is converted to the format supported by Data Integration:
  • U (UPDATE): writes a version of a column.
  • DO (DELETE_ONE_VERSION): deletes a version of a column.
  • DA (DELETE_ALL_VERSION): deletes all the versions of a column based on the primary key value and the column name.
  • DR (DELETE_ROW): deletes a row based on the primary key value.
The following table lists the data that is converted by OTSStream Reader from a table that has two primary key columns (pkName1 and pkName2).
pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 - DO
pk1_V2 pk2_V2 col_b - - DA
pk1_V3 pk2_V3 - - - DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U
In the preceding example, three rows in the Tablestore table are read and converted to seven rows. The primary keys for the three rows are (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), and (pk1_V3, pk2_V3).
  • For the row whose primary key is (pk1_V1, pk2_V1), two versions of the col_a column and one version of the col_b column are written.
  • For the row whose primary key is (pk1_V2, pk2_V2), one version of the col_a column and all the versions of the col_b column are deleted.
  • For the row whose primary key is (pk1_V3, pk2_V3), one version of the col_a column is written, and the row is deleted.

Read data by row

You can also use OTSStream Reader to read data by row. In this mode, OTSStream Reader reads operation records as rows. You must set the mode parameter and specify the columns whose data you want to read.
"parameter": {
  # Set mode to single_version_and_update_only, set isExportSequenceInfo to false, and configure other parameters, such as datasource and table, based on your business requirements. 
  "mode": "single_version_and_update_only", # The read mode. 
  "column":[  # The columns whose data you want to read from Tablestore. You can specify the columns based on your business requirements. 
          {
             "name": "uid"  # The name of a column, which can be a primary key column or a property column. 
          },
          {
             "name": "name"  # The name of a column, which can be a primary key column or a property column. 
          },
  ],
  "isExportSequenceInfo": false, # Specifies whether to read time series information. If you set the mode parameter to single_version_and_update_only, this parameter can only be set to false. 
}
The data that is read by row is closer to the data in the original rows. This facilitates further data processing. If you read data by row, take note of the following points:
  • The rows that are read are extracted from operation records. Each row corresponds to a write or update operation. If you update only some of the columns for a row, the operation record contains only the columns that are updated.
  • The version number of each column, which is the timestamp of each column, cannot be read or deleted.

Data types

OTSStream Reader supports all Tablestore data types. The following table lists the data types that are supported by OTSStream Reader.
Category Tablestore data type
Integer INTEGER
Floating point DOUBLE
String STRING
Boolean BOOLEAN
Binary BINARY

Parameters

Parameter Description Required Default value
dataSource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes No default value
dataTable The name of the table from which you want to read incremental data. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation to enable this feature after the table is created. Yes No default value
statusTable The name of the table that OTSStream Reader uses to store status records. These records help find the data that is not required and improve read efficiency. If the specified table does not exist, OTSStream Reader automatically creates such a table. After an offline read task is completed, you do not need to delete the table. The status records in the table can be used for the next read task.
  • You do not need to create a status table. You need only to provide a table name. OTSStream Reader attempts to create a status table on your instance. If no such table exists, OTSStream Reader automatically creates one. If the table already exists, OTSStream Reader determines whether the metadata of the table meets the expectation. If the metadata does not meet the expectation, OTSStream Reader reports an error.
  • After a read task is completed, you do not need to delete the table. The status records in the table can be used for the next read task.
  • Time-to-live (TTL) is enabled for the table, and data automatically expires based on the TTL. This way, the table stores a small volume of data.
  • You can use the same status table to store the status records of multiple tables that are specified by the dataTable parameter and managed by the same instance. The status records are independent of each other.

You can configure a name similar to TableStoreStreamReaderStatusTable. Make sure that the name is different from the name of a business-related table.

Yes No default value
date The date on which the data that you want to read is generated. Specify this parameter in the yyyyMMdd format, such as 20151111. No No default value
isExportSequenceInfo Specifies whether to read time series information. The time series information includes the time when data is written. The default value is false, which indicates that time series information is not read. No false
maxRetries The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. Retries are performed at a specific interval. The total duration of 30 retries is approximately 5 minutes. You can keep the default settings. No 30
startTimeString The start time of the incremental data, in seconds. The start time is the left boundary of the left-closed, right-open time range of the incremental data. Specify this parameter in the yyyymmddhh24miss format. No No default value
endTimeString The end time of the incremental data, in seconds. The end time is the right boundary of the left-closed, right-open time range of the incremental data. Specify this parameter in the yyyymmddhh24miss format. No No default value
mode The read mode. If this parameter is set to single_version_and_update_only, data is read by row. No No default value

Configure OTSStream Reader by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the dataSource parameter that is described in the preceding section.
    Table The name of the table from which you want to read data. This parameter is equivalent to the dataTable parameter that is described in the preceding section.
    Start Timestamp The start time of the incremental data, in seconds. The start time is the left boundary of the left-closed, right-open time range of the incremental data. Specify this parameter in the yyyymmddhh24miss format.
    Note The start time must be a point in time that is within seven days from the time when the synchronization node is configured.
    End Timestamp The end time of the incremental data, in seconds. The end time is the right boundary of the left-closed, right-open time range of the incremental data. Specify this parameter in the yyyymmddhh24miss format.
    State Table The name of the table that is used to store status records.
    Maximum Retries The maximum number of retries. This parameter is equivalent to the maxRetries parameter that is described in the preceding section. Default value: 30.
    Export Time Information Specifies whether to read time series information. This parameter is equivalent to the isExportSequenceInfo parameter that is described in the preceding section. Default value: false.
  2. Configure field mappings.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure OTSStream Reader by using the code editor

In the following code, a synchronization node is configured to read data from Tablestore. For more information about the parameters, see the preceding parameter description. For more information about how to configure a synchronization node by using the code editor, see Create a sync node by using the code editor.
{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"otsstream",// The reader type. 
            "parameter":{
                "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that OTSStream Reader uses to store status records. 
                "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. 
                "isExportSequenceInfo":false,// Specifies whether to read time series information. 
                "datasource":"$srcDatasource",// The name of the data source. 
                "startTimeString":"${startTime}${hh}",// The start time of the incremental data, which is the same as the time when a synchronization node starts to run. Specify this parameter in the yyyymmddhh24miss format.
                "table":"",// The name of the table from which you want to read data. 
                "endTimeString":"${endTime}${hh}"// The end time of the incremental data, which is the same as the time when the running of a synchronization node is complete. Specify this parameter in the yyyymmddhh24miss format.
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1 // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate.

        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}