This topic describes how to use Data Integration to offline import data to Elasticsearch.

Prerequisites

  1. An Alibaba Cloud account and its AccessKey pair are created.
  2. MaxCompute is activated. A default MaxCompute connection is automatically created. The Alibaba Cloud account is used to log on to DataWorks.
  3. A workspace is created so that you can collaboratively develop workflows and maintain data and nodes in the workspace. For more information, see Create a workspace.
    Note If you want to create a data integration node as a RAM user, grant the required permissions to the RAM user. For more information, see Prepare a RAM user and Manage workspace members.
  4. The required connection is created. For more information, see Configure a connection.

Create a batch sync node

  1. Log on to the DataWorks console.
  2. In the left-side navigation pane, click Workspaces.
  3. Select the region where the required workspace resides. Find the required workspace and click Data Integration.
  4. On the homepage of Data Integration, click New offline synchronization to go to the DataStudio page.
  5. In the Create Node dialog box, set the Node Name and Location parameters.
    Note
    • The node name must be 1 to 128 characters in length.
    • Set the Location parameter to the directory where the created workflow resides. For more information, see Create a workflow.
  6. Click Commit.

Configure the batch sync node

  1. After you create the batch sync node, click the Switch to Code Editor icon in the toolbar.
  2. In the Confirm message, click OK to switch to the code editor.
  3. Edit code in the code editor based on the following sample code:
    {
    "configuration": {
    "setting": {
      "speed": {
        "concurrent": "1", // The number of concurrent threads.
        "mbps": "1" // The maximum transmission rate.
      }
    },
    "reader": {
      "parameter": {
        "connection": [
          {
            "table": [
              "`es_table`" // The name of the source table.
            ],
            "datasource": "px_mysql_OK" // The name of the connection. We recommend that you use the name of the created connection.
          }
        ],
        "column": [ // The columns in the source table.
          "col_ip",
          "col_double",
          "col_long",
          "col_integer",
          "col_keyword",
          "col_text",
          "col_geo_point",
          "col_date"
        ],
        "where": "", // The WHERE clause.
      },
      "plugin": "mysql"
    },
    "writer": {
      "parameter": {
        "cleanup": true, // Specifies whether to clear the original data each time you import data to Elasticsearch. Set the parameter to true if you import all data or recreate an index and to false if you import incremental data.
        "accessKey": "nimda", // If the X-PACK plug-in is used, enter the AccessKey secret. Otherwise, enter a null string. The X-PACK plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey secret.
        "index": "datax_test", // The index of Elasticsearch. If no index is available, the plug-in automatically creates one.
        "alias": "test-1-alias", // The alias to be added after data is imported.
        "settings": {
          "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1
          }
        },
        "batchSize": 1000, // The number of data entries to write at a time.
        "accessId": "default", // If the X-PACK plug-in is used, enter the AccessKey ID. Otherwise, enter a null string. The X-PACK plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey ID.
        "endpoint": "http://xxx.xxxx.xxx:xxxx", // The endpoint that can be used to access Elasticsearch. You can view the endpoint in the Elasticsearch console.
        "splitter": ",", // Specify a delimiter if you import arrays.
        "indexType": "default", // The type name in the index of Elasticsearch.
        "aliasMode": "append", // The mode in which an alias is added after the data is imported. append: Add a new alias. exclusive: Retain only the new alias.
        "column": [ // The columns in Elasticsearch. The sequence of the columns is the same as that of the columns in the reader.
          {
            "name": "col_ip",// This column corresponds to the name attribute column in Tablestore.
            "type": "ip"// The text type. The default analyzer is used.
          },
          {
            "name": "col_double",
            "type": "string"
          },
          {
            "name": "col_long",
            "type": "long"
          },
          {
            "name": "col_integer",
            "type": "integer"
          },
          {
            "name": "col_keyword",
            "type": "keyword"
          },
          {
            "name": "col_text",
            "type": "text"
          },
          {
            "name": "col_geo_point",
            "type": "geo_point"
          },
          {
            "name": "col_date",
            "type": "date"
          }
        ],
        "discovery": false// Specifies whether to enable automatic discovery. Set the parameter to true.
      },
      "plugin": "elasticsearch"// The writer name. The name is ElasticsearchWriter. You do not need to change the value.
    }
    },
    "type": "job",
    "version": "1.0"
    }
  4. Click the Save icon and the Run icon.
    Note
    • You can import data to Elasticsearch only in the code editor.
    • After you save the sync node, click the Run icon. The node is immediately run.

      You can also click the Submit icon to commit the sync node to the scheduling system. The scheduling system then automatically runs the node from the next day based on scheduling parameters.

Subsequent steps

For more information about how to configure sync nodes that use other types of connections, see the topics in Reader configuration and Writer configuration.