This topic describes the data types and parameters supported by OTSStream Reader, how it works, and how to configure it by using the codeless user interface (UI) and code editor.

OTSStream Reader allows you to export the incremental data from Table Store. Incremental data can be considered as operations logs that include data and operation information.

Unlike plug-ins for exporting full data, OTSStream Reader only supports the multi-version mode. In addition, OTSStream Reader cannot export the data of specified columns. Before using OTSStream Reader, make sure that the Stream feature is enabled for the source table. You can enable this feature when creating the table or by using the UpdateTable operation in the SDK.

The method for enabling Stream is described as follows:
SyncClient client = new SyncClient("", "", "", "");
Enable this feature when creating a table.
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // The value 24 indicates that the incremental data is retained for 24 hours.
client.createTable(createTableRequest);
If this feature is not enabled when the table is created, enable it by using the UpdateTable operation.
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

You can enable the Stream feature and set the expiration time by using the UpdateTable operation in the SDK. After this feature is enabled, you can export the incremental data of Table Store. After the Stream feature is enabled, the Table Store server saves your operations logs additionally. Each partition has a sequential operations log queue. Each operations log is removed by garbage collection after the specified expiration time.

The Table Store SDK provides several Stream API operations for reading these operations logs. OTSStream Reader obtains incremental data by using these API operations. By default, OTSStream Reader transforms incremental data to multiple 6-tuples (pk, colName, version, colValue, opType, and sequenceInfo) and imports them to MaxCompute.

Export data by column

In the multi-version mode of Table Store, table data is organized in a three-level architecture: row, column, and version. One row can have multiple columns. The column name is not fixed, and each column can have multiple versions. Each version has a specific timestamp, that is, the version number.

You can perform read/write operations by using the Table Store API. Table Store stores the incremental data by storing the records of recent write and modify operations on table data. Incremental data can be considered as a set of operation records.

Table Store supports the following three types of modify operations:
  • PutRow: writes a row. If the row already exists, it is overwritten.
  • UpdateRow: updates a row without modifying other data of the original row. You can add column values, overwrite column values if the corresponding version of a column already exists, delete all the versions of a column, or delete a version of a column.
  • DeleteRow: deletes a row.

Table Store generates incremental data records based on each type of operations. OTSStream Reader reads these records and exports the data in the format of Data Integration.

Table Store supports dynamic columns and the multi-version mode. Therefore, a row exported by OTSStream Reader corresponds to a version of a column rather than a row in Table Store. A row in Table Store may correspond to multiple exported rows. Each exported row includes the primary key value, column name, timestamp of the version for the column (version number), value of the version, and operation type. If the isExportSequenceInfo parameter is set to true, time series information is also included.

When the data is transformed to the Data Integration format, the following four types of operations are defined:
  • U (UPDATE): writes a version of a column.
  • DO (DELETE_ONE_VERSION): deletes a version of a column.
  • DA (DELETE_ALL_VERSION): deletes all the versions of a column. Delete all the versions of the corresponding column according to the primary key and the column name.
  • DR (DELETE_ROW): deletes a row. Delete all the data of the row according to the primary key.
In the following example, the table has two primary key columns: pkName1 and pkName2.
pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 N/A DO
pk1_V2 pk2_V2 col_b N/A N/A DA
pk1_V3 pk2_V3 N/A N/A N/A DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U

In this example, seven rows are exported, corresponding to three rows in the Table Store table. The primary keys for the three rows are (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), and (pk1_V3, pk2_V3).

  • For the row whose primary key is (pk1_V1, pk2_V1), three operations are included: writing two versions of column col_a and one version of column col_b.
  • For the row whose primary key is (pk1_V2, pk2_V2), two operations are included: deleting one version of column col_a and deleting all versions of column col_b.
  • For the row whose primary key is (pk1_V3, pk2_V3), two operations are included: deleting the row and writing one version of column col_a.

Export data by row

You can also export data by row. In this mode, OTSStream Reader exports operation records as rows. You must specify the export mode and the columns to be exported, as shown in the following example.

"parameter": {
  # Set other parameters, such as datasource and table, as usual.
  "mode": "single_version_and_update_only", # The export mode.
  "column":[  # The columns to be exported from Table Store. You can specify the columns as needed.
          {
             "name": "uid"  # The name of the column, which can be a primary key column or a property column.
          },
          {
             "name": "name"  # The name of the column, which can be a primary key column or a property column.
          },
  ],
  "isExportSequenceInfo": false, # Specifies whether to export time-series information. If you set the mode parameter to single_version_and_update_only, this parameter can only be set to false.
}
The data exported by row is closer to the data in the original rows, which facilitates further data processing. Note the following points when you export data by row:
  • The exported rows are extracted from operation records. Each exported row corresponds to a write or modify operation. If you only modify some of the columns for a row, the operation record only contains the modified rows, and other columns are empty.
  • If you export data by row, the version number of each column, which is the timestamp of each column, cannot be exported. In addition, delete operations cannot be exported.

Data types

Currently, OTSStream Reader supports all Table Store data types. The following table lists the data types supported by OTSStream Reader.

Category OTSStream data type
Integer INTEGER
Floating point DOUBLE
String STRING
Boolean BOOLEAN
Binary BINARY

Parameters

Parameter Description Required Default value
datasource The connection name. It must be identical to the name of the added connection. You can add connections in the code editor. Yes None
table The name of the table from which incremental data is exported. You must enable the Stream feature for a table when creating the table, or by calling the UpdateTable operation after creating the table. Yes None
statusTable The name of the table used by OTSStream Reader to store status records. These records help to filter out the data that is not covered by the target range and improve export efficiency. The table specified by the statusTable parameter is used to store status records. If the specified table does not exist, OTSStream Reader automatically creates the table. After a sync node is completed, you do not need to delete the table. The status records in the table can be used by the next sync node.
  • You do not need to create the table in advance. You only need to provide the table name. If the specified table does not exist, OTSStream Reader attempts to create the table under your instance. If the specified table exists, OTSStream Reader checks whether the metadata of the table is as expected. If the metadata is not as expected, an exception is thrown.
  • After a sync node is completed, you do not need to delete the table. The status records in the table can be used by the next sync node.
  • Data in the table automatically expires based on the time-to-live (TTL). Therefore, the data volume is small.
  • You can use the same table to store status records of multiple tables that are managed by the same instance. The status records are independent of each other.

You must configure a name that is easily recognizable, such as TableStoreStreamReaderStatusTable. Note that the name cannot be the same as that for any business-related table.

Yes None
startTimestampMillis The start time (included) in milliseconds of the incremental data.
  • OTSStream Reader searches for the status records in the table specified by the statusTable parameter based on the time specified by the startTimestampMillis parameter and reads data from this time point.
  • If OTSStream Reader cannot find status records of this time point, OTSStream Reader reads incremental data retained by the system from the first entry, and skips the data that is written later than the time specified by the startTimestampMillis parameter.
No None
endTimestampMillis The end time (excluded) in milliseconds of the incremental data.
  • OTSStream Reader reads data from the time specified by the startTimestampMillis parameter and stops to read data that is written later than or equal to the time specified by the endTimestampMillis parameter.
  • If OTSStream Reader has read all the incremental data, OTSStream Reader stops reading data even before the time specified by the endTimestampMillis parameter.
No None
date The date when data is exported. The format is yyyyMMdd, for example, 20151111. You must specify this parameter or the startTimestampMillis and endTimestampMillis parameters. For example, Alibaba Cloud Data Process Center schedules nodes by day. Therefore, the date parameter is provided. No None
isExportSequenceInfo Specifies whether to export time-series information. Time-series information includes the time when data is written. The default value is false, indicating that time series information is not exported. No false
maxRetries The maximum number of retries of each request for reading incremental data from Table Store. The default value is 30. Retries are performed at certain intervals. The total time of 30 retries is approximately 5 minutes. Generally, you can use the default value. No 30
startTimeString The left boundary of the time range (left-closed and right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss. No None
endTimeString The right boundary of the time range (left-closed and right-open) of incremental data, measured in milliseconds in the format of yyyymmddhh24miss. No None
mode The export mode. If this parameter is set to single_version_and_update_only, data is exported by row. By default, this parameter is not specified, and data is not exported by column. No None

Configure OTSStream Reader by using the codeless UI

  1. Configure the connections.
    Configure the source and destination connections for the sync node.Connections section
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type, and enter the name of a connection that has been configured in DataWorks.
    Table The table parameter in the preceding parameter description.
    Start Timestamp The start time (included) in milliseconds of the incremental data.
    End Timestamp The end time (excluded) in milliseconds of the incremental data.
    State Table The name of the table for storing status records.
    Maximum Retries The maxRetries parameter in the preceding parameter description. Default value: 30.
    Export Time Information The isExportSequenceInfo parameter in the preceding parameter description. By defaut, Export Time Information is not selected.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field, or move the pointer over a field and click the Delete icon to delete the field.Mapping section
    Button Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. Note that the data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping for fields in the same row. Note that the data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
  3. Configure channel control policies.Channel section
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads to read data from or write data to data storage within the sync node. You can configure the concurrency for a node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group used for running the sync node. If a large number of nodes including this sync node are deployed on the default resource group, the sync node may need to wait for resources. We recommend that you purchase an exclusive resource group for data integration or add a custom resource group. For more information, see DataWorks exclusive resources and Add a custom resource group.

Configure OTSStream Reader by using the code editor

In the following code, a node is configured to read data from Table Store. For more information about the parameters, see the preceding parameter description.

{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"otsstream",// The reader type.
            "parameter":{
                "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table for storing status records.
                "maxRetries":30,// The maximum number of retries on each request of reading incremental data from Table Store. It is set to 30 by default.
                "isExportSequenceInfo":false,// Specifies whether to export the time series information.
                "datasource":"$srcDatasource",// The connection name.
                "startTimeString":"${startTime}",// The start time (included) of the incremental data.
                "table":"",// The name of the table to be synchronized.
                "endTimeString":"${endTime}"// The end time (excluded) of the incremental data.
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "concurrent":1,// The maximum number of concurrent threads.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}