This topic describes how to use Data Integration to offline import data to Elasticsearch.

Prerequisites

  1. An Alibaba Cloud account and its AccessKey pair are created. For more information, see Prepare an Alibaba Cloud account.
  2. MaxCompute is activated, a default MaxCompute connection is automatically created, and the Alibaba Cloud account is used to log on to the DataWorks console.
  3. A workspace is created so that you can create a workflow in the workspace and create different types of nodes in the workflow to maintain data and complete data analytics. For more information, see Create a workspace.
    Note If you want to create a data integration node as a Resource Access Management (RAM) user, grant the required permissions to the RAM user. For more information, see Prepare a RAM user and Manage workspace members.
  4. The required connection is added. For more information, see Configure a connection.

Create a batch sync node

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Integration in the Actions column.
  2. On the Home Page page that appears, click New Task to go to the DataStudio page.
  3. In the Create Node dialog box that appears, set Node Name and Location and click Commit.
    Note
    • The node name can be up to 128 characters in length.
    • Set Location to the directory where the created workflow resides. For more information about how to create a workflow, see Create a workflow.

Configure the batch sync node

  1. After creating the batch sync node, click the Switch to Code Editor icon in the toolbar on the node editing tab.
  2. In the Confirm dialog box that appears, click OK to switch to the code editor.
  3. Edit code as required in the code editor. The sample code is as follows:
    {
    "configuration": {
    "setting": {
      "speed": {
        "concurrent": "1", // The maximum number of concurrent threads.
        "mbps": "1" // The maximum transmission rate.
      }
    },
    "reader": {
      "parameter": {
        "connection": [
          {
            "table": [
              "`es_table`" // The name of the source table.
            ],
            "datasource": "px_mysql_OK" // The name of the source connection. We recommend that you use the name of the connection that you have added.
          }
        ],
        "column": [ // The columns in the source table.
          "col_ip",
          "col_double",
          "col_long",
          "col_integer",
          "col_keyword",
          "col_text",
          "col_geo_point",
          "col_date"
        ],
        "where": "", // The WHERE clause.
      },
      "plugin": "mysql"
    },
    "writer": {
      "parameter": {
        "cleanup": true, // Specifies whether to clear the original data each time when you import data to Elasticsearch. Set the value to true if you import all data or recreate an index and to false if you import incremental data.
        "accessKey": "nimda", // The AccessKey secret for accessing Elasticsearch. If the X-PACK plug-in is used, enter the AccessKey secret. Otherwise, enter a null string. The X-PACK plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you need to enter the AccessKey secret here.
        "index": "datax_test", // The index of Elasticsearch. If no index is available, the X-PACK plug-in automatically creates one.
        "alias": "test-1-alias", // The alias of the index, which is added after data is imported.
        "settings": {
          "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1
          }
        },
        "batchSize": 1000, // The number of data entries to write at a time.
        "accessId": "default", // The AccessKey ID for accessing Elasticsearch. If the X-PACK plug-in is used, enter the AccessKey ID. Otherwise, enter a null string. The X-PACK plugin is used for Alibaba Cloud Elasticsearch. Therefore, you need to enter the AccessKey ID here.
        "endpoint": "http://xxx.xxxx.xxx:xxxx", // The endpoint for accessing Elasticsearch. You can view the endpoint in the Elasticsearch console.
        "splitter": ",", // The delimiter used to split source data. Specify a delimiter if you import arrays.
        "indexType": "default", // The type name in the index of Elasticsearch.
        "aliasMode": "append", // The mode in which an alias is added after the data is imported. Valid values: append and exclusive.
        "column": [ // The columns in Elasticsearch. The sequence of the columns is the same as that of the columns in the reader.
          {
            "name": "col_ip",// This column corresponds to the name attribute column in Table Store.
            "type": "ip"// The text type. The default analyzer is used to convert text to terms.
          },
          {
            "name": "col_double",
            "type": "string"
          },
          {
            "name": "col_long",
            "type": "long"
          },
          {
            "name": "col_integer",
            "type": "integer"
          },
          {
            "name": "col_keyword",
            "type": "keyword"
          },
          {
            "name": "col_text",
            "type": "text"
          },
          {
            "name": "col_geo_point",
            "type": "geo_point"
          },
          {
            "name": "col_date",
            "type": "date"
          }
        ],
        "discovery": false// Specifies whether to enable automatic discovery. Set the value to true.
      },
      "plugin": "elasticsearch"// The writer name. The name is ElasticsearchWriter. Leave it as the default.
    }
    },
    "type": "job",
    "version": "1.0"
    }
  4. Click the Save icon and Run icon icons in sequence in the toolbar.
    Note
    • You can only import data to Elasticsearch in the code editor.
    • After saving the batch sync node, click the Run icon icon. The node runs immediately.

      You can also click the Submit icon icon to submit the sync node to the scheduling system. The scheduling system then automatically runs the node from the next day based on the scheduling time parameters.

What to do next

For more information about how to use other types of connections to configure sync nodes, see Configure the reader and Configure the writer.