edit-icon download-icon

Configure OTSStream syncing task

Last Updated: Apr 04, 2018

The OTSStream plug-in is mainly used for exporting Table Store containing data and operation information.

Being different from the full export plug-in, the incremental export plug-in supports only the multi-version mode and does not support specifying columns because of the principle of incremental export. For more information, see Configuring OTSStream reader.

When configuring synchronization tasks with OTSStream, consider the following:

  • You can only read the data if it is generated five minutes earlier than the currently displayed time within 24 hours.

  • The end time cannot be later than the current displayed time, namely, the set end time must be at least five minutes earlier than the task running time.

  • Configuring daily scheduling may result in data loss.

  • Cyclic and monthly scheduling are not supported.

For example:

The start time and end time must cover the time for manipulating Table Store tables. For example, if you want to insert 2 pieces of data to Table Store at 20171019162000, then the start time and the end time can be set as 20171019161000 and 20171019162600 respectively.

Procedure

  1. Add a Table Store data source.

  2. Manipulate a Table Store table. For more information, see What is Table Store.

  3. Create a Synchronization task.

  4. Run the synchronization task through scheduling.

Configure a synchronization task in wizard mode

  1. Log on to the DTplus console as a project administrator and click Enter Project on the DataWorks project operation pane.

  2. Go to Data Integration page and select Script Mode.

    • Wizard Mode: The wizard mode provides a visual interface for synchronization task configuration in five steps: select a source, select a target, map fields, control the channel, preview, and save the configuration. The displayed interfaces of those steps may vary for different data sources, and the wizard mode can be switched to the script mode.

    • Script Mode: On the script mode page, you can select an appropriate template, which contains key parameters of a synchronization task. You can enter the required information in the template. However, the script mode cannot be switched to a wizard mode.

  3. Select a source: Select the OTSStream data source and the source table “person” where the data browsing area is collapsed by default. Click Next.

    • Data Source: The name of data source.

    • Table: The name of the table from which the incremental data is exported. For this table, you must enable the Stream function at table creation or through the UpdateTable interface.

    • Start Time: The left boundary of the time range (left-inclusive and right-exclusive) of incremental data. The format is yyyymmddhh24miss and the unit is millisecond.

    • End Time: The right boundary of the time range (left-inclusive and right-exclusive) of incremental data. The format is yyyymmddhh24miss and the unit is millisecond.

    • Status Table: The name of the table for recording the status. Max Retries: the maximum number of retries of each request for reading incremental data from Table Store, which defaults to 30.

    • Export time sequence information: You must ascertain whether to export the time sequence information containing data writing time and other information.

  4. Select a target: Select the MaxCompute Data Source and the target table “person”, click Next.

  5. Map fields: Click Next and select field mappings. You must configure field mappings. The Source Table Field on the left has one-to-one correspondence with the Target Table Field on the right.

  6. Control the channel: Click Next to set a maximum job rate and dirty data checking rules.

    • Maximum Job Rate: The maximum possible rate of data synchronization jobs. The actual job rate is subject to the network environment and database configuration. Number of concurrent jobs: maximum job rate = number of concurrent jobs * Transfer rate of the single concurrency. When the maximum job rate is specified, how to select the number of concurrent jobs?

      • If your data source is an online business database, we do not recommend that you set a large value for the number of concurrent jobs so that any potential impact on the online database can be avoided.

      • If you require a high data synchronization rate, we recommend that you select the maximum job rate and a large number for concurrent jobs.

  7. Preview and save the configuration: After completing the preceding steps, scroll up and down to check the task configuration. Click Save after a thorough preview.

  8. Manually run the synchronization task.

Configure a synchronization task in script mode

  1. {
  2. "type": "job",
  3. "version": "1.0",
  4. "configuration": {
  5. "reader": {
  6. "plugin": "otsstream",
  7. "parameter": {
  8. "datasource": "otsstream",//The data source name, which must be consistent with the name of the data source added.
  9. "dataTable": "person",//The name of the table from which the incremental data is exported. For this table, you must enable the Stream function at table creation or through the UpdateTable interface.
  10. "startTimeString": "${startTime}",//The left boundary of the time range (left-inclusive and right-exclusive) of incremental data. The format is yyyymmddhh24miss and the unit is millisecond.
  11. "endTimeString": "${endTime}",//The end time of the task running.
  12. "statusTable": "TableStoreStreamReaderStatusTable",//The name of the table for recording the status.
  13. "maxRetries": 30,//The maximum number of retries of each request.
  14. "isExportSequenceInfo": true,
  15. "mode": "ots",
  16. "column": [//The column name.
  17. "id",
  18. "colname",
  19. "version",
  20. "colvalue",
  21. "optype",
  22. "sequenceinfo"
  23. ]
  24. }
  25. },
  26. "writer": {
  27. "plugin": "odps",
  28. "parameter": {
  29. "datasource": "odps_first",//The data source name.
  30. "table": "person",//The target table name.
  31. "truncate": true,
  32. "partition": "pt=${bdp.system.bizdate}",//The partition information.
  33. "column": [//The target column name.
  34. "id",
  35. "colname",
  36. "version",
  37. "colvalue",
  38. "optype",
  39. "sequenceinfo"
  40. ]
  41. }
  42. },
  43. "setting": {
  44. "speed": {
  45. "mbps": 7,//The maximum job rate.
  46. "concurrent": 7//The number of concurrent tasks.
  47. }
  48. }
  49. }
  50. }
  • Two ways to express runtime parameters and end time parameters.
  1. "startTimeString": "${startTime}"---The left boundary of incremental data time range. The format is yyyymmddhh24miss, and the unit is millisecond.
  2. "endTimeString": "${endTime}"----The right boundary of incremental data time range. The format is yyyymmddhh24miss, and the unit is millisecond.
  1. "startTimestampMillis":""---The left boundary of incremental data time range, and the unit is millisecond. - The Reader plug-in finds a corresponding point for startTimestampMillis from the statusTable, and starts to read and export data from this point. - If the Reader plug-in cannot find a corrsponding point from the statusTable, it starts to read data from the first line of incremental data, and skip the data whose the writen time is less than the startTimestampMillis.
  2. "endTimestampMillis":""----The right boundary of incremental data time range, and the unit is millisecond. -The Reader plug-in exports data from the startTimestampMillis. When the data with the first timestamp greater than or equal to the endTimestampMillis is encountered, the data exporting is ended. - When the Reader plug-in finishes reading for all the incremental data, the reading process is ended, even it does not reach the endTimestampMillis.
  3. The format is the timestamp, and the unit is millisecond.
  • If the isExportSequenceInfo item is configured as true, the system exports an extra row and column of target time sequence and target column. The timing information contains data writing time, and the default value is false (not expirted).

Run the synchronization task and check data quality on the target

Note:You can consult Table Store personnel for the generation of operating records. For example, as shown in the preceding figure, two pieces of data is inserted (put) to the person table in Table Store. Four operation records are displayed because each insertion (put) operation is equivalent to one delete operation and one update operation.

Configure a scheduling task

  1. Uses a minutely scheduling task as an example.

    Set for minute scheduling between 00:00 and 23:59, namely, between startTime=$[yyyymmddhh24miss-10/24/60] (10 minutes before the system time) and endTime=$[yyyymmddhh24miss-5/24/60] (5 minutes before the system time). For parameter settings, see System scheduling parameters.

  2. The following figure indicates the task generated by Operation Center.

    CycleTask_En

  3. The following figure indicates a running instance.

    CycleInstance_En

Thank you! We've received your feedback.