すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データ統合を使用して Elasticsearch にデータをインポートする

最終更新日:Apr 09, 2025

このトピックでは、データ統合を使用してオフラインデータを Elasticsearch にインポートする方法について説明します。

前提条件

  1. Alibaba Cloud アカウントとその AccessKey ペアが作成されていること。詳細については、「DataWorks をアクティブ化する」の「前提条件」セクションをご参照ください。

  2. MaxCompute がアクティブ化されていること。MaxCompute をアクティブ化すると、デフォルトの MaxCompute データソースが自動的に生成されます。Alibaba Cloud アカウントを使用して DataWorks コンソールにログオンします。

  3. DataWorks コンソールでワークスペースが作成されていること。これにより、ワークスペース内の他のメンバーと共同でワークフローを開発し、ワークスペース内のデータとノードを維持できます。ワークスペースの作成方法については、「ワークスペースを作成する」をご参照ください。

    説明

    RAM ユーザーとしてデータ統合ノードを作成する場合は、必要な権限を RAM ユーザーに付与します。RAM ユーザーの作成方法と RAM ユーザーへの権限の付与方法については、「RAM ユーザーを準備する」および「ワークスペースレベルのサービスに対する権限を管理する」をご参照ください。

  4. 必要なデータソースが準備されていること。詳細については、「MaxCompute データソース」をご参照ください。

バッチ同期ノードを作成する

  1. DataStudio ページに移動します。

    DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションペインで、[データ開発とガバナンス] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. DataStudio ページの [スケジュールされたワークフロー] ペインで、目的のワークフローを見つけ、その名前をクリックします。[データ統合] を右クリックし、[ノードの作成] > [オフライン同期] を選択します。

  3. [ノードの作成] ダイアログボックスで、[名前] パラメーターと [パス] パラメーターを設定します。

    説明
    • ノード名は 128 文字を超えることはできません。

    • [パス] パラメーターは、バッチ同期タスクを作成する自動トリガーワークフローを指定します。自動トリガーワークフローの作成方法については、「ワークフローを作成する」の「自動トリガーワークフローを作成する」セクションをご参照ください。

  4. [確認] をクリックします。

バッチ同期ノードを設定する

  1. バッチ同期ノードの設定タブで、上部ツールバーの [変換スクリプト] アイコンをクリックします。

    转换脚本

  2. [ヒント] メッセージで、[OK] をクリックしてコードエディターに切り替えます。

  3. 次のサンプルコードに基づいて、コードエディターでコードを編集します。

    {
    "configuration": {
    "setting": {
      "speed": {
        "concurrent": "1", // 並列スレッドの数。
        "mbps": 1,// 最大転送速度。単位:MB/s。
      }
    },
    "reader": {
      "parameter": {
        "connection": [
          {
            "table": [
              "es_table" // ソーステーブルの名前。
            ],
            "datasource": "px_mysql_OK" // ソースの名前。追加したソースの名前を使用することをお勧めします。
          }
        ],
        "column": [ // データを読み取る列の名前。
          "col_ip",
          "col_double",
          "col_long",
          "col_integer",
          "col_keyword",
          "col_text",
          "col_geo_point",
          "col_date"
        ],
        "where": "", // WHERE 句。
      },
      "plugin": "mysql"
    },
    "writer": {
      "parameter": {
        "cleanup": true, // Elasticsearch にデータをインポートするたびに元のデータをクリアするかどうかを指定します。すべてのデータをインポートするか、インデックスを再作成する場合は、このパラメーターを true に設定します。増分データをインポートする場合は、このパラメーターを false に設定します。
        "accessKey": "nimda", // X-Pack プラグインを使用する場合は、AccessKey シークレットを入力します。それ以外の場合は、ヌル文字列を入力します。X-Pack プラグインは Alibaba Cloud Elasticsearch に使用されます。したがって、AccessKey シークレットを入力する必要があります。
        "index": "datax_test", // Elasticsearch クラスター内のインデックスの名前。インデックスがない場合は、プラグインによって自動的に作成されます。
        "alias": "test-1-alias", // データのインポート後に追加するエイリアス。
        "settings": {
          "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1
          }
        },
        "batchSize": 1000, // 一度に書き込むデータレコードの数。
        "accessId": "default", // X-Pack プラグインを使用する場合は、AccessKey ID を入力します。それ以外の場合は、ヌル文字列を入力します。X-Pack プラグインは Alibaba Cloud Elasticsearch に使用されます。したがって、AccessKey ID を入力する必要があります。
        "endpoint": "http://xxx.xxxx.xxx:xxxx", // Elasticsearch クラスターのエンドポイント。Elasticsearch コンソールでエンドポイントを表示できます。
        "splitter": ",", // 配列をインポートする場合は、区切り文字を指定します。
        "indexType": "default", // Elasticsearch クラスター内のインデックスのタイプ名。
        "aliasMode": "append", // データのインポート後にエイリアスを追加するモード。有効な値:append と exclusive。append:新しいエイリアスを追加します。exclusive:新しいエイリアスのみを保持します。
        "column": [ // Elasticsearch の列。列の順序は、リーダーで指定された列の順序と同じです。
          {
            "name": "col_ip",
            "type": "ip"// テキストタイプ。デフォルトのアナライザーが使用されます。
          },
          {
            "name": "col_double",
            "type": "string"
          },
          {
            "name": "col_long",
            "type": "long"
          },
          {
            "name": "col_integer",
            "type": "integer"
          },
          {
            "name": "col_keyword",
            "type": "keyword"
          },
          {
            "name": "col_text",
            "type": "text"
          },
          {
            "name": "col_geo_point",
            "type": "geo_point"
          },
          {
            "name": "col_date",
            "type": "date"
          }
        ],
        "discovery": false// 自動検出を有効にするかどうかを指定します。このパラメーターを true に設定します。
      },
      "plugin": "elasticsearch"// プラグイン名。名前は Elasticsearch Writer です。値を変更する必要はありません。
    }
    },
    "type": "job",
    "version": "1.0"
    }
  4. バッチ同期ノードの設定タブの上部ツールバーで、保存 アイコンをクリックし、次に 运行 アイコンをクリックします。

    説明
    • Elasticsearch にデータをインポートできるのは、コードエディターのみです。

    • バッチ同期ノードを保存した後に 运行 アイコンをクリックすると、ノードはすぐに実行されます。

      提交 アイコンをクリックして、バッチ同期ノードをスケジューリングシステムにコミットすることもできます。スケジューリングシステムは、ノードに対して設定されたプロパティに基づいて、翌日以降にバッチ同期ノードを定期的に実行します。

追加情報

他のタイプのデータソースを使用する同期ノードの設定方法については、「MaxCompute データソース」のトピックをご参照ください。