edit-icon download-icon

Import data using data integration of ElasticSearch

Last Updated: Apr 04, 2018

Data Integration is a data synchronization platform provided by the Alibaba Group. Data Integration is a reliable, secure, cost-effective, and elastically scalable data synchronization platform. It can be used across heterogeneous data storage systems and provides offline (full/incremental) data access channels in different network environments for more than 20 data sources. For more information about data source types, see Supported data source.

This article explains how to import data for offline ElasticSearch by using Data Integration.

Prerequisites

  1. Activate an Alibaba Cloud primary account, and create the AccessKeys for this account.

  2. Activate MaxCompute to auto generate a default MaxCompute data source, and log on to DataWorks console by using the primary account.

  3. Create a project so that you can complete the workflow in a project through collaboration to maintain data and tasks. You must create a project before using DataWorks.

    Note:

    If you want to create data integration tasks using a sub-account, grant related permissions to the sub-account. For more information, see Prepare a RAM account.

Procedure

For example, synchronize MySQL data to DataHub in script mode.

  1. Log on to the DataWorks console as a developer and click Enter Project.

    1

  2. Click Data Integration from the menu, and navigate to the Sync Task page.

  3. Select New > Script Mode on the page.

    1

  4. Select a Source Type and a Type of objective in the Import Template window that appears. See the following figure.

    1

  5. Click Confirmation to enter the script mode configuration page and perform the configuration as needed. If you have any questions, click Help Manual in the upper-right corner.

    1. {
    2. "configuration": {
    3. "setting": {
    4. "speed": {
    5. "concurrent": "1", //Number of concurrent tasks
    6. "mbps": "1" //Maximum job rate
    7. }
    8. },
    9. "reader": {
    10. "parameter": {
    11. "connection": [
    12. {
    13. "table": [
    14. "`es_table`" //Source table name
    15. ],
    16. "datasource": "px_mysql_OK" //Data source name. We recommend you use the same data source name as the one you added.
    17. }
    18. ],
    19. "column": [ //Column name of source table
    20. "col_ip",
    21. "col_double",
    22. "col_long",
    23. "col_integer",
    24. "col_keyword",
    25. "col_text",
    26. "col_geo_point",
    27. "col_date"
    28. ],
    29. "where": "", //Filtering condition
    30. },
    31. "plugin": "mysql"
    32. },
    33. "writer": {
    34. "parameter": {
    35. "cleanup": true, //Whether to clear the original data when importing the data to ElasticSearch each time. Set to true for full import/rebuilding index. Set to false for synchronization increment. Here for synchronization, you must set it to false.
    36. "accessKey": "nimda", //If the X-PACK plug-in is used, enter a password here, and if not, enter an empty string. Because the X-PACK plug-in is used for Alibaba Cloud ElasticSearch, a password is required here.
    37. "index": "datax_test", // Index name of ElasticSearch. If it is unavailable, the plug-in will create one automatically.
    38. "alias": "test-1-alias", //Write an alias after the data is imported.
    39. "settings": {
    40. "index": {
    41. "number_of_replicas": 0,
    42. "number_of_shards": 1
    43. }
    44. },
    45. "batchSize": 1000, //The number of data items per batch.
    46. "accessId": "default", //If the X-PACK plug-in is used, enter a username here, and if not, enter an empty string. Because the X-PACK plug-in is use for Alibaba Cloud ElasticSearch, a username is required here.
    47. "endpoint": "http://xxx.xxxx.xxx:xxxx", //The URL to ElasticSearch, which can be found on the console.
    48. "splitter": ",", //If the inserted data is array, use specified delimiter.
    49. "indexType": "default", //The type name under the corresponding index in ElasticSearch.
    50. "aliasMode": "append", //Modes of adding an alias after the data is imported: append and exclusive.
    51. "column": [ //Column name of ElasticSearch, whose order is the same as that of Column in Reader.
    52. {
    53. "name": "col_ip",//Corresponds to the property column in TableStore: name
    54. "type": "ip"//Text type, using the default word segmentation
    55. },
    56. {
    57. "name": "col_double",
    58. "type": "string"
    59. },
    60. {
    61. "name": "col_long",
    62. "type": "long"
    63. },
    64. {
    65. "name": "col_integer",
    66. "type": "integer"
    67. },
    68. {
    69. "name": "col_keyword",
    70. "type": "keyword"
    71. },
    72. {
    73. "name": "col_text",
    74. "type": "text"
    75. },
    76. {
    77. "name": "col_geo_point",
    78. "type": "geo_point"
    79. },
    80. {
    81. "name": "col_date",
    82. "type": "date"
    83. }
    84. ],
    85. "discovery": false//Automatic discovery. Set to true
    86. },
    87. "plugin": "elasticsearch"//Name of the Writer plug-in: ElasticSearchWriter, modification not required.
    88. }
    89. },
    90. "type": "job",
    91. "version": "1.0"
    92. }
  6. Click Save.

    Note:

    • ElasticSearch only supports importing data in script mode.

    • If you want to choose a new template, click Import Template in the toolbar. Note that the existing content is overwritten once the new template is imported.

  7. Click Run to run the synchronization task.

Note:

After saving a synchronization task, click Run and the task runs immediately. You can also click Submit in the right to submit the synchronization task to the scheduling system. The scheduling system automatically and cyclically runs the task from the second day according to the configuration properties. For more information on scheduling configurations, see Scheduling configuration.

Thank you! We've received your feedback.