すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:DataWorks を使用して Hadoop から Alibaba Cloud ES にデータを同期する

最終更新日:Nov 09, 2025

Hadoop でインタラクティブなビッグデータ分析やクエリを実行する際にレイテンシーが高い場合は、データを Alibaba Cloud Elasticsearch に同期することで、クエリと分析を高速化できます。Elasticsearch は、特にアドホッククエリなど、複数のタイプのクエリに数秒で応答できます。このトピックでは、DataWorks のデータ統合サービスを使用して、Hadoop から Alibaba Cloud ES に大量のデータを同期する方法について説明します。

背景情報

DataWorks は、データ開発、タスクスケジューリング、データ管理などの機能を統合した、ビッグデータ開発とガバナンスのための包括的なプラットフォームです。DataWorks の同期タスクを使用すると、さまざまなデータソースから Alibaba Cloud ES にデータを迅速に移動できます。

  • サポートされているデータソースは次のとおりです:

    • Alibaba Cloud データベース: ApsaraDB for RDS (MySQL、PostgreSQL、SQL Server)、ApsaraDB for MongoDB、および ApsaraDB for HBase

    • Alibaba Cloud PolarDB-X (DRDS からアップグレード)

    • Alibaba Cloud MaxCompute

    • Alibaba Cloud Object Storage Service (OSS)

    • Alibaba Cloud Tablestore

    • HDFS、Oracle、FTP、DB2 などの自己ホスト型データソース。

  • シナリオ:

前提条件

  • Alibaba Cloud Elasticsearch クラスターが作成され、クラスターの自動インデックス作成機能が有効になっていること。詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」および「YML ファイルの設定」をご参照ください。

    説明

    データを同期できるのは Alibaba Cloud ES インスタンスのみです。セルフマネージド Elasticsearch クラスターはサポートされていません。

  • DataWorks ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

説明
  • Hadoop クラスターが存在し、データが含まれていること。

  • Hadoop クラスター、ES インスタンス、および DataWorks ワークスペースは、同じリージョンにある必要があります。

  • Hadoop クラスター、ES インスタンス、および DataWorks ワークスペースは、同じタイムゾーンにある必要があります。そうでない場合、時間関連のデータを同期すると、ソースと宛先のデータにタイムゾーンの差が生じる可能性があります。

課金

手順

ステップ 1: 専用リソースグループの購入と作成

データ統合専用リソースグループを購入し、リソースグループを VPC とワークスペースに関連付けます。専用リソースグループは、高速で安定したデータ転送を保証します。

  1. DataWorks コンソールにログインします。

  2. 上部のナビゲーションバーでリージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。

  3. [専用リソースグループ] タブで、[旧バージョンのリソースグループを作成] > [データ統合リソースグループ] をクリックします。

  4. [DataWorks 専用リソース (サブスクリプション)] 購入ページで、[専用リソースタイプ][データ統合専用リソース] に設定し、リソースグループ名を入力して、[今すぐ購入] をクリックします。

    詳細については、「ステップ 1: データ統合専用リソースグループを作成する」をご参照ください。

  5. 作成した専用リソースグループの [アクション] 列で、[ネットワーク設定] をクリックして、仮想プライベートクラウド (VPC) をリソースグループにアタッチします。詳細については、「VPC のアタッチ」をご参照ください。

    説明

    この例では、データ統合専用リソースグループを使用して VPC 経由でデータを同期します。データ統合専用リソースグループを使用してインターネット経由でデータを同期する方法については、「IP アドレスホワイトリストの設定」をご参照ください。

    専用リソースグループは、Hadoop クラスターが存在する VPC と Elasticsearch クラスターが存在する VPC に接続する必要があります。これにより、専用リソースグループを使用してデータを同期できます。したがって、専用リソースグループを Hadoop クラスターと Elasticsearch クラスターの [VPC][ゾーン]、および [vSwitch] に関連付ける必要があります。Elasticsearch クラスターの VPC、ゾーン、および vSwitch を表示する方法については、「クラスターの基本情報を表示する」をご参照ください。

    重要

    VPC をアタッチした後、VPC の [vSwitch CIDR ブロック] を Hadoop クラスターと ES インスタンスの内部向けアクセスホワイトリストに追加する必要があります。詳細については、「ES インスタンスのパブリックまたはプライベートアクセスホワイトリストを設定する」をご参照ください。

  6. ページの左上隅にある戻るアイコンをクリックして、[リソースグループ] ページに戻ります。

  7. 作成した専用リソースグループの [アクション] 列で、[ワークスペースのアタッチ] をクリックして、リソースグループをターゲットワークスペースにアタッチします。

    詳細については、「ステップ 2: データ統合専用リソースグループをワークスペースに関連付ける」をご参照ください。

ステップ 2: データソースの追加

  1. DataWorks の [データ統合] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. ターゲットワークスペースの [操作] 列で、[クイックアクセス] > [データ統合] を選択します。

  2. 左側のナビゲーションウィンドウで、[データソース] をクリックします。

  3. Hadoop 分散ファイルシステム (HDFS) データソースを追加します。

    1. [データソース] ページで、[データソースの追加] をクリックします。

    2. [データソースの追加] ダイアログボックスで、[HDFS] を検索して選択します。

    3. [HDFS データソースの追加] ページで、データソースパラメーターを設定します。

      詳細については、「HDFS データソースの追加」をご参照ください。

    4. [接続テスト] をクリックします。[接続済み] のステータスは、接続が成功したことを示します。

    5. [完了] をクリックします。

  4. 同様に Elasticsearch データソースを追加します。詳細については、「Elasticsearch データソースの追加」をご参照ください。

ステップ 3: バッチデータ同期タスクの設定と実行

バッチ同期タスクは、専用リソースグループを使用して実行されます。リソースグループは、ソースからデータを取得し、ES インスタンスにデータを書き込みます。

説明
  1. DataWorks のデータ開発ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. 宛先ワークスペースの [操作] 列で、[移動] [データ開発] を選択します。

  2. オフライン同期タスクを作成します。

    1. 左側のナビゲーションウィンドウのデータ開発 (image アイコン) タブで、[作成] > [ワークフローの作成] を選択し、画面の指示に従ってワークフローを作成します。

    2. 作成したワークフローを右クリックし、[ノードの作成] > [データ統合] > [オフライン同期] を選択します。

    3. [ノードの作成] ダイアログボックスで、ノード名を入力し、[確認] をクリックします。

  3. ネットワークとリソースを設定します。

    1. [データソース] セクションで、[データソース] を HDFS に設定し、[データソース名] を同期するデータソースの名前に設定します。

    2. [マイリソースグループ] セクションで、専用リソースグループを選択します。

    3. [データ宛先] セクションで、[データ宛先] を ES に設定し、[データソース名] を同期するデータソースの名前に設定します。

  4. 次へをクリックします。

  5. タスクを設定します。

    1. [ソース] セクションで、データを同期するテーブルを選択します。

    2. [データ宛先] セクションで、パラメーターを設定します。

    3. [フィールドマッピング] セクションで、[ソースフィールド][宛先フィールド] にマッピングします。

    4. [チャネルコントロール] セクションで、チャネルパラメーターを設定します。

    詳細については、「コードレス UI を使用してオフライン同期タスクを設定する」をご参照ください。

  6. タスクを実行します。

    1. (オプション) タスクのスケジューリングプロパティを設定します。ページの右側で [スケジューリング設定] をクリックし、必要に応じてスケジューリングパラメーターを設定します。パラメーターの詳細については、「スケジューリング設定」をご参照ください。

    2. ノード設定タブの左上隅にある保存アイコンをクリックして、タスクを保存します。

    3. ノード設定タブの左上隅にある送信アイコンをクリックして、タスクを送信します。

      タスクのスケジューリングプロパティを設定した場合、タスクはスケジュールされた間隔で自動的に実行されます。ノード設定タブの左上隅にある実行アイコンをクリックして、タスクをすぐに実行することもできます。

      実行ログに Shell run successfully! メッセージが表示された場合、タスクは正常に実行されています。

ステップ 4: データ同期結果の確認

  1. 宛先の Alibaba Cloud ES インスタンスの Kibana コンソールにログインします。

    詳細については、「Kibana コンソールへのログイン」をご参照ください。

  2. 左側のナビゲーションウィンドウで、[Dev Tools] をクリックします。

  3. [コンソール] で、次のコマンドを実行して同期されたデータを表示します:

    POST /hive_esdoc_good_sale/_search?pretty
    {
    "query": { "match_all": {}}
    }
    説明

    hive_esdoc_good_sale は、データ同期スクリプトの index フィールドに設定した値です。

    データが同期されると、次の結果が返されます。View the synchronized data