すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:MaxCompute から Alibaba Cloud ES へのデータ同期

最終更新日:Nov 09, 2025

Alibaba Cloud Elasticsearch (ES) を使用して、MaxCompute (ODPS) の大量のデータに対して、情報の取得、多次元クエリの実行、統計分析を行うことができます。このトピックでは、DataWorks のデータ統合サービスを使用して、大量の MaxCompute データをオフラインモードで Alibaba Cloud ES インスタンスに同期する方法について説明します。このプロセスは通常、数分しかかかりません。

背景情報

DataWorks は、データ開発、タスクスケジューリング、データ管理などの機能を統合した、ビッグデータ開発とガバナンスのための包括的なプラットフォームです。DataWorks の同期タスクを使用すると、さまざまなデータソースから Alibaba Cloud ES にデータを迅速に移動できます。

  • サポートされているデータソースは次のとおりです。

    • Alibaba Cloud データベース: ApsaraDB for RDS (MySQL、PostgreSQL、SQL Server)、ApsaraDB for MongoDB、および ApsaraDB for HBase

    • Alibaba Cloud PolarDB-X (DRDS からアップグレード)

    • Alibaba Cloud MaxCompute

    • Alibaba Cloud Object Storage Service (OSS)

    • Alibaba Cloud Tablestore

    • HDFS、Oracle、FTP、DB2 などの自己ホスト型データソース。

  • シナリオ:

前提条件

説明
  • データ同期は Alibaba Cloud ES インスタンスでのみサポートされています。セルフマネージド Elasticsearch クラスターはサポートされていません。

  • MaxCompute プロジェクト、ES インスタンス、および DataWorks ワークスペースは、同じリージョンにある必要があります。

  • ES インスタンス、MaxCompute プロジェクト、および DataWorks ワークスペースは、同じタイムゾーンにある必要があります。そうでない場合、時間関連のデータが同期された後、ソースデータと宛先データの間でタイムゾーンの差が生じる可能性があります。

課金

手順

ステップ 1: ソースデータの準備

MaxCompute でテーブルを作成し、そのテーブルにデータをインポートします。詳細については、「テーブルの作成」および「テーブルへのデータのインポート」をご参照ください。

このトピックでは、次のテーブルスキーマとデータを使用します。

  • テーブルスキーマ表结构

  • テーブルデータの一部表数据

ステップ 2: 専用リソースグループの購入と設定

データ統合専用リソースグループを購入します。次に、VPC とワークスペースをリソースグループにアタッチします。専用リソースグループは、高速で安定したデータ伝送を保証します。

  1. DataWorks コンソールにログインします。

  2. 上部のナビゲーションバーでリージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。

  3. [専用リソースグループ] タブで、[レガシーリソースグループの作成] > [データ統合リソースグループ] をクリックします。

  4. [DataWorks 専用リソース (サブスクリプション)] 購入ページで、[専用リソースタイプ][データ統合専用リソース] に設定し、リソースグループの名前を入力してから、[今すぐ購入] をクリックして専用リソースグループを購入します。

    詳細については、「ステップ 1: データ統合専用リソースグループの作成」をご参照ください。

  5. 作成した専用リソースグループを見つけ、[アクション] 列で [ネットワーク設定] をクリックして VPC をアタッチします。詳細については、「VPC のアタッチ」をご参照ください。

    説明

    この例では、データ統合専用リソースグループを使用して VPC 経由でデータを同期します。データ統合専用リソースグループを使用してインターネット経由でデータを同期する方法の詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

    専用リソースグループは、Elasticsearch クラスターが存在する VPC に接続する必要があります。これにより、専用リソースグループを使用してデータを同期できます。したがって、専用リソースグループを Elasticsearch クラスターの [VPC][ゾーン]、および [vSwitch] に関連付ける必要があります。Elasticsearch クラスターの VPC、ゾーン、および vSwitch を表示するには、「クラスターの基本情報の表示」をご参照ください。

    重要

    VPC を関連付けた後、VPC の [vSwitch CIDR ブロック] を Elasticsearch インスタンスの VPC 内部アクセスホワイトリストに追加する必要があります。詳細については、「Elasticsearch インスタンスのパブリックまたは内部アクセスホワイトリストの設定」をご参照ください。

  6. ページの左上隅にある戻るアイコンをクリックして、[リソースグループリスト] ページに戻ります。

  7. 作成した専用リソースグループの [操作] 列で、[ワークスペースのアタッチ] をクリックして、ターゲットワークスペースをリソースグループにアタッチします。

    詳細については、「ステップ 2: データ統合専用リソースグループとワークスペースの関連付け」をご参照ください。

ステップ 3: データソースの追加

MaxCompute と Elasticsearch をデータソースとして DataWorks のデータ統合サービスに追加します。

  1. DataWorks の [データ統合] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. ターゲットワークスペースの [操作] 列で、[クイックアクセス] > [データ統合] を選択します。

  2. 左側のナビゲーションウィンドウで、[データソース] をクリックします。

  3. MaxCompute データソースを追加します。

    1. [データソースリスト] ページで、[データソースの追加] をクリックします。

    2. [データソースの追加] ページで、[MaxCompute] を検索して選択します。

    3. [MaxCompute データソースの追加] ダイアログボックスで、[基本情報] セクションのデータソースパラメーターを設定します。

      詳細については、「MaxCompute データソースの追加」をご参照ください。

    4. [接続設定] セクションで、[接続性テスト] をクリックします。接続ステータスが [接続済み] の場合、接続は成功です。

    5. [完了] をクリックします。

  4. 同様に Elasticsearch データソースを追加します。詳細については、「Elasticsearch データソースの追加」をご参照ください。

ステップ 4: データ同期タスクの設定と実行

データ同期タスクは、専用リソースグループを使用して実行されます。リソースグループは、データ統合のデータソースからデータを取得し、Elasticsearch にデータを書き込みます。

説明
  1. DataWorks の [データ開発] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. ターゲットワークスペースの [操作] 列で、[クイックアクセス] > [データ開発] を選択します。

  2. オフライン同期タスクを作成します。

    1. 左側のナビゲーションウィンドウの [データ開発] (image アイコン) タブで、[作成] > [ワークフローの作成] を選択し、画面の指示に従ってワークフローを作成します。

    2. 作成したワークフローを右クリックし、[ノードの作成] > [データ統合] > [オフライン同期] を選択します。

    3. [ノードの作成] ダイアログボックスで、ノード名を入力し、[確認] をクリックします。

  3. ネットワークとリソースグループを設定します。

    1. [データソース] セクションで、[ソース] を MaxCompute (ODPS) に、[データソース名] をソースデータソースの名前に設定します。

    2. [マイリソースグループ] セクションで、専用リソースグループを選択します。

    3. [データ宛先] セクションで、[宛先] を Elasticsearch に、[データソース名] を宛先データソースの名前に設定します。

  4. [次へ] をクリックします。

  5. タスクを設定します。

    1. [データソース] セクションで、ソーステーブルを選択します。

    2. [データ宛先] セクションで、パラメーターを設定します。

    3. [フィールドマッピング] セクションで、[ソースフィールド][宛先フィールド] の間のマッピングを設定します。

    4. [チャネルコントロール] セクションで、チャネルパラメーターを設定します。

    詳細については、「コードレス UI を使用したバッチ同期タスクの設定」をご参照ください。

  6. タスクを実行します。

    1. (オプション) タスクのスケジューリングプロパティを設定します。右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、必要に応じてパラメーターを設定します。パラメーターの詳細については、「スケジューリング設定」をご参照ください。

    2. ノード設定タブの右上隅にある保存アイコンをクリックして、タスクを保存します。

    3. ノード設定タブの右上隅にある送信アイコンをクリックして、タスクを送信します。

      スケジューリングプロパティを設定した場合、タスクはスケジュールに従って自動的に実行されます。ノード設定タブの右上隅にある実行アイコンをクリックして、タスクをすぐに実行することもできます。

      ログに Shell run successfully! が含まれている場合、タスクは正常に実行されています。次のコードはサンプルログを示しています。

      2023-10-31 16:52:35 INFO Exit code of the Shell command 0
      2023-10-31 16:52:35 INFO --- Invocation of Shell command completed ---
      2023-10-31 16:52:35 INFO Shell run successfully!
      2023-10-31 16:52:35 INFO Current task status: FINISH
      2023-10-31 16:52:35 INFO Cost time is: 33.106s

ステップ 5: データ同期結果の確認

Kibana コンソールで、同期されたデータを表示し、指定された条件に基づいてデータをクエリできます。

  1. ターゲットの Alibaba Cloud ES インスタンスの Kibana コンソールにログインします。

    詳細については、「Kibana コンソールへのログイン」をご参照ください。

  2. Kibana ページの左上隅にある 菜单.png アイコンをクリックし、[Dev Tools] を選択します。

  3. [コンソール] で、次のコマンドを実行して同期されたデータを表示します。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {}}
    }
    説明

    odps_index は、データ同期スクリプトで設定した index フィールドの値です。

    データが同期されると、次のような結果が返されます。查看同步的数据

  4. 次のコマンドを実行して、ドキュメント内の category フィールドと brand フィールドを検索します。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "_source": ["category", "brand"]
    }
  5. 次のコマンドを実行して、categoryfresh produce であるドキュメントを検索します。

    POST /odps_index/_search?pretty
    {
    "query": { "match": {"category":"fresh produce"} }
    }
  6. 次のコマンドを実行して、trans_num フィールドでドキュメントをソートします。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "trans_num": { "order": "desc" } }
    }

    コマンドとアクセス方法の詳細については、「Elastic.co ヘルプセンター」をご参照ください。