edit-icon download-icon

Configure OTSStream Reader

Last Updated: Apr 17, 2018

OTSStream Reader plug-in is mainly used for exporting Table Store incremental data. Incremental data can be seen as operation logs which include data and operation information.

Different from full export plug-in, incremental export plug-in only has multi-version mode and it doesn’t support specified columns. This is related to the principle of incremental export. See the following for more information about export format.

Before using the plug-in, make sure that the Stream feature is enabled. You can enable the feature when creating the table or enable it using SDK UpdateTable API.

How to enable Stream:

  1. SyncClient client = new SyncClient("", "", "", "");
  2. Enable Stream when you create the table:
  3. CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
  4. createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24 means that the incremental data is retained for 24 hours
  5. client.createTable(createTableRequest);
  6. If Stream is not enabled when the table is created, you can enable it with UpdateTable:
  7. UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
  8. updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
  9. client.updateTable(updateTableRequest);

Implementation principle:

You can enable Stream and set expiration time by using SDK UpdateTable feature to enable incremental feature.When incremental feature is enabled, Table Store server saves your operation logs additionally. Each partition has a sequential operation log queue. Each operation log is moved by garbage collection after a period of time which is the expiration time you specified.

Table Store SDK provides several Stream-related APIs for reading these operation logs. Incremental plug-in also gets incremental data with Table Store SDK API and transforms incremental data into multiple 6-tuples (pk, colName, version, colValue, opType, sequenceInfo) and imports them into MaxCompute.

The format of the export data:

In Table Store multi-version mode, the format of table data is in three-level mode, namely row > column > version. One row can have multiple columns. The column name is not fixed, and each column can have multiple versions. Each version has a specific timestamp (version number).

You can perform read/write operations with Table Store API. Table Store records incremental data by recording your recent write operations to the table (or data change operation). Therefore, incremental data can also be seen as a series of operation records.

Table Store has three types of data change operations: PutRow, UpdateRow, and DeleteRow:

  • PutRow: write a row. If the row already exists, it is overwritten.

  • UpdateRow: update a row without changing other data of the original row. Update may include adding or overwriting (if the corresponding version of the corresponding column already exists) some column values, deleting all the versions of a column, and deleting a version of a column.

  • DeleteRow: delete a row.

Table Store generates corresponding incremental data records according to each type of operation. Reader plug-in reads the records and exports the data in the format of Datax.

Because Table Store has the feature of dynamic column and multi-version, a row exported by Reader plug-in doesn’t correspond to a row in Table Store but a version of a column in Table Store. A row in Table Store can be exported as multiple rows. Each row includes primary key value, the name of the column, the timestamp of the version under the column (version number), the value of the version, and operation type. If isExportSequenceInfo is set as true, time sequence information is also included.

When the data is transformed into Datax format, we define four types of operation as follows:

  • U (UPDATE): write a version of a column.

  • DO (DELETE_ONE_VERSION): delete a version of a column.

  • DA (DELETE_ALL_VERSION): delete all the versions of a column. Delete all the versions of the corresponding column according to primary key and column name.

  • DR (DELETE_ROW): delete a row. Delete all the data of the row according to primary key.

Assuming that the table has two primary key columns. The names of the two primary key columns are pkName1 and pkName2 as follows:

pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 DO
pk1_V2 pk2_V2 col_b DA
pk1_V3 pk2_V3 DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U

Assuming that the export data has seven rows shown preceding, corresponding to three rows in Table Store table. The primary keys are (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), (pk1_V3, pk2_V3).

  • For the row whose primary key is (pk1_V1, pk2_V1), three operations are required, respectively writing two versions of col_a column and one version of col_b column.

  • For the row whose primary key is (pk1_V2, pk2_V2), two operations are required, respectively deleting one version of col_a column and all versions of col_b column.

  • For the row whose primary key is (pk1_V3, pk2_V3), two operations are required, respectively deleting the whole row and writing one version of col_a column.

Currently OTSStream Reader supports all OTS types. The conversion list for OTS types is as follows:

Category OTSStream data type
Integer Integer
Floating point Double
String String
Boolean Boolean
Binary Binary

Parameter description

Parameter Description Required Default value
dataSource Data source name. It must be identical to the data source name added. Adding data source is supported in script mode. Yes None
dataTable The name of the table which exports the incremental data. The table needs to enable the Stream feature. You can enable the feature when creating the table or enable it using UpdateTable API. Yes None
statusTable The name of the table for recording status in Reader plug-in. The status can be used for reducing the scan of data outside the target scope so as to speed up the export. statusTable is the table for recording status in Reader. If the table doesn’t exist, Reader creates the table automatically. When an offline export task is completed, you must not delete the table. The statuses recorded in the table can be used for the next export task.
- You don’t need to create the table and you only need to provide a name for the table. Reader plug-in tries to create the table under your instance. If the table doesn’t exist, it is created. If the table already exists, it judges whether the Meta of the table is consistent with expectation. If it is not consistent, an exception is thrown.
- When an export is completed, you must not delete the table. The statuses of the table can be used for the next export task.
- The table enables TTL and data expire automatically, therefore we can consider that the data volume is small.
- For the Reader configurations of different dataTables under one instance, you can use the same statusTable. The status messages recorded are independent of each other.
In conclusion, you must configure a name such as TableStoreStreamReaderStatusTable. Note that the name must not be duplicate with business-related table.
Yes None
startTimestampMillis The left boundary of the time range (left-close, right-open) of incremental data in milliseconds.
- Reader plug-in detects the site of startTimestampMillis from statusTable, and starts to read and export the data from this site. - If the corresponding point is not found in statusTable, the system reads from the first entry of the incremental data retained in the system and skips the data whose write time is earlier than startTimestampMillis.
No None
endTimestampMillis The right boundary of the time range (left-close, right-open) of incremental data in milliseconds.
-The Reader plugin starts exporting data from the startTimestampMillis position, and stops exporting data when it encounters the data of which the timestamp is euqal to or greater than the endTimestampMillis.
- When all the incremental data are read, the read is completed, even if endTimestampMillis is not reached.
No None
date The data format is yyyyMMdd, for example 20151111, which means exporting the data of the date.
If the date is not specified, startTimestampMillis and endTimestampMillis must be specified and vice versa. For example, Alibaba Cloud Data Process Center scheduling only supports day level. Therefore, the function of the configuration is similar to startTimestampMillis and endTimestampMillis.
No None
isExportSequenceInfo Whether to export time sequence information. Time sequence information includes the write time of data. The default value is false which means not to export data. No None
maxRetries The maximum number of retries of each request when reading incremental data from TableStore. The default value is 30. There are intervals between retries. The total time of 30 retries is approximately 5 minutes which generally doesn’t require changes. No None

Development in wizard mode

Currently, development in wizard mode is not supported.

Reader configuration template

  1. {
  2. "type": "job",
  3. "version": "1.0",
  4. "configuration": {
  5. "setting": {
  6. "errorLimit": {
  7. "record": "0"
  8. },
  9. "speed": {
  10. "mbps": "1",
  11. "concurrent": "1"
  12. }
  13. },
  14. "reader": {
  15. "plugin": "otsstream",
  16. "parameter": {
  17. "datasource": "",
  18. "table": "",
  19. "dataTable": "",
  20. "statusTable": "TableStoreStreamReaderStatusTable",
  21. "startTimestampMillis": "",
  22. "endTimestampMillis": "",
  23. "date": "",
  24. "isExportSequenceInfo": true,
  25. "maxRetries": 30
  26. }
  27. },
  28. "writer": {}
  29. }
  30. }
Thank you! We've received your feedback.