All Products
Search
Document Center

DataWorks:OTSStream data source

Last Updated:Nov 13, 2023

DataWorks provides OTSStream Reader for you to read incremental data from OTSStream data sources. This topic describes the capabilities of synchronizing data from OTSStream data sources.

Prepare an OTSStream environment before data synchronization

Before you use OTSStream Reader, make sure that the Stream feature is enabled for your source table. By default, the Stream feature is enabled for time series tables. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation in the Tablestore SDK to enable this feature after a table is created. The following sample code provides an example on how to enable the Stream feature:

SyncClient client = new SyncClient("", "", "", "");
# Method 1: Enable the Stream feature when you create a table.
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // The value 24 indicates that Tablestore retains incremental data for 24 hours. 
client.createTable(createTableRequest);
#Method 2: If you do not enable the Stream feature when you create a table, you can call the UpdateTable operation to enable this feature after the table is created.
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);

Take note of the following items when you call the UpdateTable operation in the Tablestore SDK to enable the Stream feature:

  • You can specify the expiration time for incremental data. This way, you can use OTSStream Reader to read incremental data from Tablestore. After you enable the Stream feature, the Tablestore server stores the operation logs of your Tablestore table. Each partition queues operation logs in sequence. Each operation log is recycled after the specified expiration time.

  • The Tablestore SDK provides several Stream-related API operations that are used to read operation logs. OTSStream Reader calls these API operations to read incremental data. If you use OTSStream Reader to read data in column mode, OTSStream Reader converts the incremental data into multiple 6-tuples. Each 6-tuple consists of pk, colName, version, colValue, opType, and sequenceInfo. If you use OTSStream Reader to read data in row mode, OTSStream Reader reads incremental data by row.

Supported read modes and data types

OTSStream Reader allows you to read incremental data from Tablestore in column or row mode. This section describes the data read processes and supported data types of the read modes.

Read data in column mode

In the multi-version mode of Tablestore, table data is organized in a three-level structure: row, column, and version. One row can have multiple columns, and the column name is not fixed. Each column can have multiple versions, and each version has a specific timestamp, which is the version number.

You can call the API operations of Tablestore to read and write data. Tablestore records the recent write and modify operations that are performed on table data to record the incremental data. Therefore, incremental data can be considered as a set of operation records.

Tablestore supports the following types of operations:

  • PutRow: writes a row. If the row exists, it is overwritten.

  • UpdateRow: updates a row without modifying other data of the original row. You can use UpdateRow to add column values, overwrite column values if a version of a column exists, or delete a specific version or all the versions of a column.

  • DeleteRow: deletes a row.

Tablestore generates incremental data records based on each type of operation. OTSStream Reader reads these records and converts them to a format supported by Data Integration.

Tablestore supports dynamic columns and the multi-version mode. Therefore, a row that is generated by OTSStream Reader is a version of a column rather than a row in Tablestore. After OTSStream Reader reads data from a row in Tablestore, OTSStream Reader converts the data to multiple rows. Each row includes the primary key value, column name, timestamp of the version for the column (version number), value of the version, and operation type. If the isExportSequenceInfo parameter is set to true, time series information is also included.

The following types of operations are defined for the data that is converted to the format supported by Data Integration:

  • U (UPDATE): writes a version of a column.

  • DO (DELETE_ONE_VERSION): deletes a version of a column.

  • DA (DELETE_ALL_VERSION): deletes all the versions of a column based on the primary key value and the column name.

  • DR (DELETE_ROW): deletes a row based on the primary key value.

The following table lists the data that is converted by OTSStream Reader from a table that has two primary key columns (pkName1 and pkName2).

pkName1

pkName2

columnName

timestamp

columnValue

opType

pk1_V1

pk2_V1

col_a

1441803688001

col_val1

U

pk1_V1

pk2_V1

col_a

1441803688002

col_val2

U

pk1_V1

pk2_V1

col_b

1441803688003

col_val3

U

pk1_V2

pk2_V2

col_a

1441803688000

-

DO

pk1_V2

pk2_V2

col_b

-

-

DA

pk1_V3

pk2_V3

-

-

-

DR

pk1_V3

pk2_V3

col_a

1441803688005

col_val1

U

In the preceding example, three rows in the Tablestore table are read and converted to seven rows. The primary keys for the three rows are (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), and (pk1_V3, pk2_V3).

  • For the row whose primary key is (pk1_V1, pk2_V1), two versions of the col_a column and one version of the col_b column are written.

  • For the row whose primary key is (pk1_V2, pk2_V2), one version of the col_a column and all the versions of the col_b column are deleted.

  • For the row whose primary key is (pk1_V3, pk2_V3), one version of the col_a column is written, and the row is deleted.

Read data in row mode

  • Wide tables

    You can also use OTSStream Reader to read data in row mode. In this mode, OTSStream Reader reads operation records as rows. You must configure the mode parameter and specify the columns whose data you want to read.

    "parameter": {
      # Set mode to single_version_and_update_only, set isExportSequenceInfo to false, and configure other parameters, such as datasource and table, based on your business requirements. 
      "mode": "single_version_and_update_only", # The read mode. 
      "column":[  # The columns whose data you want to read from Tablestore. You can specify the columns based on your business requirements. 
              {
                 "name": "uid"  # The name of a column, which can be a primary key column or a property column. 
              },
              {
                 "name": "name"  # The name of a column, which can be a primary key column or a property column. 
              },
      ],
      "isExportSequenceInfo": false, # Specifies whether to read time series information. If you set the mode parameter to single_version_and_update_only, this parameter can only be set to false. 
    }
  • Time series tables

    The Stream feature is automatically enabled when you create a time series table.

    OTSStream Reader allows you to read incremental data from a time series table. If the table is a time series table, you must configure the following parameters:

    "parameter": {
      # Configure the following parameters and other parameters, such as datasource and table, based on your business requirements. 
      "mode": "single_version_and_update_only", # The read mode. 
      "isTimeseriesTable":"true", # Specifies whether the table is a time series table. 
      "column":[  # The columns whose data you want to read from Tablestore. You can specify the columns based on your business requirements. 
              {
                "name": "_m_name" # The name of a metric name column. 
              },
              {
                "name": "_data_source" # The name of a data source column. 
              },
              {
                "name": "_tags" # The name of a tag column. Tags are converted into data of a string type. 
              },
              {
                "name": "tag1", # The name of a tag key. 
                "is_timeseries_tag":"true"  # Specifies whether the field is an internal field of the tags. 
              },
              {
                "name": "time" # The name of a timestamp column. 
              },
              {
                 "name": "name" # The name of a property column. 
              },
      ],
      "isExportSequenceInfo": false, # Specifies whether to read time series information. If you set the mode parameter to single_version_and_update_only, this parameter can only be set to false. 
    }

    The data that is read by row is closer to the data in the original rows. This facilitates further data processing. If you read data by row, take note of the following points:

    • The rows that are read are extracted from operation records. Each row corresponds to a write or update operation. If you update only some of the columns for a row, the operation record contains only the columns that are updated.

    • The version number of each column, which is the timestamp of each column, cannot be read or deleted.

Data type mappings

OTSStream Reader supports all Tablestore data types. The following table lists the data type mappings based on which OTSStream Reader converts data types.

Category

Tablestore data type

Integer

INTEGER

Floating point

DOUBLE

String

STRING

Boolean

BOOLEAN

Binary

BINARY

Develop a data synchronization task

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for OTSStream Reader

  • Read data in column mode

    {
        "type":"job",
        "version":"2.0",// The version number. 
        "steps":[
            {
                "stepType":"otsstream",// The plug-in name. 
                "parameter":{
                    "datasource":"$srcDatasource",// The name of the data source. 
                    "dataTable":"",// The name of the table. 
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that OTSStream Reader uses to store status records. 
                    "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. 
                    "isExportSequenceInfo":false,// Specifies whether to read time series information. 
                    "startTimeString":"${startTime}${hh}",// The start time of the incremental data, which is the same as the time when a synchronization task starts to run. Configure this parameter in the yyyymmddhh24miss format.
                    "endTimeString":"${endTime}${hh}"// The end time of the incremental data, which is the same as the time when the running of a synchronization task is complete. Configure this parameter in the yyyymmddhh24miss format.
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// The maximum number of dirty data records allowed. 
            },
            "speed":{
                "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                "concurrent":1,// The maximum number of parallel threads. 
                "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • Read data from a wide table in row mode

    {
        "type":"job",
        "version":"2.0",// The version number. 
        "steps":[
            {
                "stepType":"otsstream",// The plug-in name. 
                "parameter":{
                    "datasource":"$srcDatasource",// The name of the data source. 
                    "dataTable":"",// The name of the table. 
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that OTSStream Reader uses to store status records. 
                    "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. 
                    "isExportSequenceInfo":false,// Specifies whether to read time series information. 
                    "startTimeString":"${startTime}${hh}",// The start time of the incremental data, which is the same as the time when a synchronization task starts to run. Configure this parameter in the yyyymmddhh24miss format.
                    "endTimeString":"${endTime}${hh}"// The end time of the incremental data, which is the same as the time when the running of a synchronization task is complete. Configure this parameter in the yyyymmddhh24miss format.
                    "mode": "single_version_and_update_only",
                    "column":[
                            {
                                "name":"pId"
                            },
                            {
                                "name": "uId"
                            },
                            {
                                "name":"col0"
                            },
                            {
                                "name": "col1"
                            }
                        ],
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// The maximum number of dirty data records allowed. 
            },
            "speed":{
                "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                "concurrent":1,// The maximum number of parallel threads. 
                "mbps":"12" // The maximum transmission rate.
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • Read data from a time series table in row mode

    {
        "type":"job",
        "version":"2.0",// The version number. 
        "steps":[
            {
                "stepType":"otsstream",// The plug-in name. 
                "parameter":{
                    "datasource":"$srcDatasource",// The name of the data source. 
                    "dataTable":"",// The name of the table. 
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that OTSStream Reader uses to store status records. 
                    "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. 
                    "isExportSequenceInfo":false,// Specifies whether to read time series information. 
                    "startTimeString":"${startTime}${hh}",// The start time of the incremental data, which is the same as the time when a synchronization task starts to run. Configure this parameter in the yyyymmddhh24miss format.
                    "endTimeString":"${endTime}${hh}"// The end time of the incremental data, which is the same as the time when the running of a synchronization task is complete. Configure this parameter in the yyyymmddhh24miss format.
                    "mode": "single_version_and_update_only",
                    "isTimeseriesTable":"true",
                    "column": [
                              {
                                "name": "_m_name"
                              },
                              {
                                "name": "_data_source",
                              },
                              {
                                "name": "_tags",
                              },
                              {
                                "name": "string_column",
                              }
                        ]
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// The maximum number of dirty data records allowed. 
            },
            "speed":{
                "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                "concurrent":1,// The maximum number of parallel threads. 
                "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }

Parameters in code for OTSStream Reader

Parameter

Description

Required

Default value

dataSource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

dataTable

The name of the table from which you want to read incremental data. The Stream feature must be enabled for the table. You can enable the Stream feature for a table when you create the table. You can also call the UpdateTable operation to enable this feature after the table is created.

Yes

No default value

statusTable

The name of the table that OTSStream Reader uses to store status records. These records help find the data that is not required and improve read efficiency. If the specified table does not exist, OTSStream Reader automatically creates such a table. After an offline read task is completed, you do not need to delete the table. The status records in the table can be used for the next read task.

  • You do not need to create a status table. You need to only provide a table name. OTSStream Reader attempts to create a status table on your instance. If no such table exists, OTSStream Reader automatically creates one. If the table already exists, OTSStream Reader determines whether the metadata of the table meets the expectation. If the metadata does not meet the expectation, OTSStream Reader reports an error.

  • After a read task is completed, you do not need to delete the table. The status records in the table can be used for the next read task.

  • Time-to-live (TTL) is enabled for the table, and data automatically expires after the TTL elapses. This way, the table stores a small volume of data.

  • You can use the same status table to store the status records of multiple tables that are specified by the dataTable parameter and managed by the same instance. The status records are independent of each other.

You can configure a name similar to TableStoreStreamReaderStatusTable. Make sure that the name is different from the name of a business-related table.

Yes

No default value

startTimestampMillis

The start time of the incremental data, in milliseconds. The start time is the left boundary of the left-closed, right-open time range of the incremental data.

  • OTSStream Reader searches for the status records in the table that is specified by the statusTable parameter based on the time that is specified by the startTimestampMillis parameter and reads data from the time.

  • If OTSStream Reader cannot find the status records, OTSStream Reader reads the incremental data that is retained by the system from the first entry, and skips the data that is written later than the time that is specified by the startTimestampMillis parameter.

No

No default value

endTimestampMillis

The end time of the incremental data, in milliseconds. The end time is the right boundary of the left-closed, right-open time range of the incremental data.

  • OTSStream Reader exports data from the time specified by the startTimestampMillis parameter and stops exporting data when the timestamp of a data record is later than or equal to the time specified by the endTimestampMillis parameter.

  • After all the incremental data is read, OTSStream Reader stops reading data even before the time specified by the endTimestampMillis parameter.

No

No default value

date

The date on which the data that you want to read is generated. Configure this parameter in the yyyyMMdd format, such as 20151111. You must configure the date parameter, the startTimestampMillis and endTimestampMillis parameters, or the startTimeString and endTimeString parameters. For example, Alibaba Cloud Data Process Center schedules tasks by day. Therefore, you need to configure the date parameter and do not need to configure the startTimestampMillis and endTimestampMillis parameters or the startTimeString and endTimeString parameters.

No

No default value

isExportSequenceInfo

Specifies whether to read time series information. The time series information includes the time when data is written. The default value is false, which indicates that time series information is not read.

No

false

maxRetries

The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. Retries are performed at a specific interval. The total duration of 30 retries is approximately 5 minutes. You can keep the default settings.

No

30

startTimeString

The start time of the incremental data, in seconds. The start time is the left boundary of the left-closed, right-open time range of the incremental data. Configure this parameter in the yyyymmddhh24miss format.

No

No default value

endTimeString

The end time of the incremental data, in seconds. The end time is the right boundary of the left-closed, right-open time range of the incremental data. Configure this parameter in the yyyymmddhh24miss format.

No

No default value

enableSeekIterator

Specifies whether to determine the offset from which OTSStream Reader starts to read incremental data. If incremental data is frequently read, OTSStream Reader automatically determines the offset based on the offset from which data is read last time. If OTSStream Reader is not run before, data is read from the start time of incremental data. By default, incremental data is stored for seven days. Before the start time arrives, no data is exported. You can add "enableSeekIterator": true to the configurations of OTSStream Reader to help you find the start time from which OTSStream Reader starts to read incremental data.

No

false

mode

The read mode. If this parameter is set to single_version_and_update_only, data is read in row mode.

No

No default value

isTimeseriesTable

Specifies whether the table is a time series table. This parameter takes effect only when the mode parameter is set to single_version_and_update_only.

No

false

column

The names of columns from which you want to read data when you set the mode parameter to single_version_and_update_only. Sample code:

"column":[
    {"name":"pk1"},
	{"name":"col1"},
	{"name":"col2","dataType":"new"},
	{"name":"col2","dataType":"old"},
	{"name":"col2","dataType":"latest"}
],
  • The name field specifies the name of the column from which you want to read data. This field is required.

  • The dataType field specifies the type of data that you want to read. The default type is new. This field is optional. The dataType field supports the following enumeration types:

    • new: indicates data in the current column after an update.

    • old: indicates data in the current column before an update.

    • latest: indicates the latest data in the current column.

Note

If you want to read data in row mode, you must configure this parameter. Otherwise, data cannot be read.

  • Yes in row mode

  • No in column mode

No default value