All Products
Search
Document Center

DataWorks:Use Data Integration to import data to Elasticsearch

Last Updated:Aug 28, 2023

This topic describes how to use Data Integration to import offline data to Elasticsearch.

Prerequisites

  1. An Alibaba Cloud account and its AccessKey pair are created. For more information, see the "Prerequisites" section in Activate DataWorks.

  2. MaxCompute is activated. After you activate MaxCompute, a default MaxCompute data source is automatically generated. The Alibaba Cloud account is used to log on to the DataWorks console.

  3. A workspace is created in the DataWorks console. This way, you can collaborate with other members in the workspace to develop workflows and maintain data and nodes in the workspace. For information about how to create a workspace, see Create a workspace.

    Note

    If you want to create a data integration node as a RAM user, grant the required permissions to the RAM user. For information about how to create a RAM user and grant permissions to the RAM user, see Prepare a RAM user and Manage permissions on workspace-level services.

  4. The required data sources are prepared. For more information, see Add a data source.

Create a batch synchronization node

  1. Log on to the DataWorks console.

  2. In the left-side navigation pane, click Workspaces.

  3. In the top navigation bar, select the region in which the created workspace resides. On the Workspaces page, find the workspace and choose Shortcuts > Data Development in the Actions column.

  4. In the Scheduled Workflow pane of the DataStudio page, find the desired workflow and click its name. Right-click Data Integration and choose Create Node > Offline synchronization.

  5. In the Create Node dialog box, configure the Name and Path parameters.

    Note
    • The node name cannot exceed 128 characters in length.

    • The Path parameter specifies the auto triggered workflow in which you want to create the batch synchronization node. For information about how to create an auto triggered workflow, see the "Create an auto triggered workflow" section in Create a workflow.

  6. Click Confirm.

Configure the batch synchronization node

  1. On the configuration tab of the batch synchronization node, click the Conversion script icon in the top toolbar.

    转换脚本
  2. In the Tips message, click OK to switch to the code editor.

  3. Edit code in the code editor based on the following sample code:

    {
    "configuration": {
    "setting": {
      "speed": {
        "concurrent": "1", // The number of parallel threads. 
        "mbps": "1" // The maximum transmission rate. 
      }
    },
    "reader": {
      "parameter": {
        "connection": [
          {
            "table": [
              "es_table" // The name of the source table. 
            ],
            "datasource": "px_mysql_OK" // The name of the source. We recommend that you use the name of the added source. 
          }
        ],
        "column": [ // The names of the columns from which you want to read data. 
          "col_ip",
          "col_double",
          "col_long",
          "col_integer",
          "col_keyword",
          "col_text",
          "col_geo_point",
          "col_date"
        ],
        "where": "", // The WHERE clause. 
      },
      "plugin": "mysql"
    },
    "writer": {
      "parameter": {
        "cleanup": true, // Specifies whether to clear the original data each time you import data to Elasticsearch. If you want to import all data or recreate an index, set this parameter to true. If you want to import incremental data, set this parameter to false. 
        "accessKey": "nimda", // If the X-Pack plug-in is used, enter the AccessKey secret. Otherwise, enter a null string. The X-Pack plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey secret. 
        "index": "datax_test", // The name of the index in the Elasticsearch cluster. If no index is available, the plug-in automatically creates one. 
        "alias": "test-1-alias", // The alias that you want to add after data is imported. 
        "settings": {
          "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1
          }
        },
        "batchSize": 1000, // The number of data records to write at a time. 
        "accessId": "default", // If the X-Pack plug-in is used, enter the AccessKey ID. Otherwise, enter a null string. The X-Pack plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey ID. 
        "endpoint": "http://xxx.xxxx.xxx:xxxx", // The endpoint of the Elasticsearch cluster. You can view the endpoint in the Elasticsearch console. 
        "splitter": ",", // If you want to import arrays, specify a delimiter. 
        "indexType": "default", // The type name of the index in the Elasticsearch cluster. 
        "aliasMode": "append", // The mode in which you want to add an alias after data is imported. Valid values: append and exclusive. append: Add a new alias. exclusive: Retain only the new alias. 
        "column": [ // The columns in Elasticsearch. The sequence of the columns is the same as that of the columns specified in the reader. 
          {
            "name": "col_ip",// This field corresponds to the name attribute column in Tablestore. 
            "type": "ip"// The text type. The default analyzer is used. 
          },
          {
            "name": "col_double",
            "type": "string"
          },
          {
            "name": "col_long",
            "type": "long"
          },
          {
            "name": "col_integer",
            "type": "integer"
          },
          {
            "name": "col_keyword",
            "type": "keyword"
          },
          {
            "name": "col_text",
            "type": "text"
          },
          {
            "name": "col_geo_point",
            "type": "geo_point"
          },
          {
            "name": "col_date",
            "type": "date"
          }
        ],
        "discovery": false// Specifies whether to enable automatic discovery. Set this parameter to true. 
      },
      "plugin": "elasticsearch"// The plug-in name. The name is Elasticsearch Writer. You do not need to change the value. 
    }
    },
    "type": "job",
    "version": "1.0"
    }
  4. In the top toolbar of the configuration tab of the batch synchronization node, click the 保存 icon and then the 运行 icon.

    Note
    • You can import data to Elasticsearch only in the code editor.

    • If you click the 运行 icon after you save the batch synchronization node, the node is immediately run.

      You can also click the 提交 icon to commit the batch synchronization node to the scheduling system. The scheduling system periodically runs the batch synchronization node from the next day based on the properties configured for the node.

Additional information

For information about how to configure synchronization nodes that use other types of data sources, see the topics in Reader configuration and Writer configuration.