すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:DataWorks を使用して PolarDB-X (DRDS) から Alibaba Cloud ES にデータを同期する

最終更新日:Aug 18, 2025

Alibaba Cloud Elasticsearch を使用して、分散型リレーショナルデータベースサービス (DRDS) のアップグレードバージョンである PolarDB-X のデータに対して全文検索とセマンティック分析を実行できます。DataWorks の Data Integration サービスを使用すると、PolarDB-X から Alibaba Cloud ES に大量のデータを数分で同期できます。

背景情報

DataWorks は、データ開発、タスクスケジューリング、データ管理などの機能を統合した、ビッグデータ開発とガバナンスのための包括的なプラットフォームです。DataWorks の同期タスクを使用して、さまざまなデータソースから Alibaba Cloud ES にデータをすばやく移動できます。

  • 以下のデータソースがサポートされています。

    • Alibaba Cloud データベース: ApsaraDB for RDS (MySQL、PostgreSQL、SQL Server)、ApsaraDB for MongoDB、および ApsaraDB for HBase

    • Alibaba Cloud PolarDB-X (DRDS からアップグレード)

    • Alibaba Cloud MaxCompute

    • Alibaba Cloud Object Storage Service (OSS)

    • Alibaba Cloud Tablestore

    • セルフホスト型データソース (HDFS、Oracle、FTP、DB2 など)

  • シナリオ:

前提条件

説明
  • Alibaba Cloud ES にのみデータを同期できます。セルフマネージド Elasticsearch クラスタはサポートされていません。

  • PolarDB-X インスタンス、ES インスタンス、および DataWorks ワークスペースは、同じリージョンにある必要があります。

  • PolarDB-X インスタンス、ES インスタンス、および DataWorks ワークスペースは、同じタイムゾーンにある必要があります。そうでない場合、同期後にソースデータとデスティネーションデータの間にタイムゾーンの差が生じる可能性があります。

課金

手順

手順 1: ソースデータを準備する

  1. PolarDB-X 1.0 インスタンスにデータを挿入します。

    詳細については、 および 基本的な SQL 操作 を参照してください。次の図は、このトピックで使用されているテストデータを示しています。Test data

手順 2: 専用リソースグループを購入して作成する

Data Integration 専用リソースグループを購入し、VPC とワークスペースをリソースグループにアタッチします。専用リソースグループは、高速で安定したデータ転送を保証します。

  1. DataWorks コンソール にログインします。

  2. 上部のメニューバーで、リージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。

  3. [リソースグループ] タブで、[リソースグループの作成] > [Data Integration リソースグループ] をクリックします。

  4. [DataWorks 専用リソース] [ (サブスクリプション)] ページで、[リソース] [タイプ][Data Integration 専用リソースグループ] に設定し、リソースグループの名前を入力して、[今すぐ購入] をクリックして専用リソースグループを購入します。

    詳細については、「手順 1: Data Integration 専用リソースグループを作成する」をご参照ください。

  5. 作成した専用リソースグループの [アクション] 列で、[ネットワーク設定] をクリックして、仮想プライベートクラウド (VPC) をアタッチします。詳細については、「VPC をアタッチする」をご参照ください。

    説明

    この例では、Data Integration 専用リソースグループを使用して、VPC 経由でデータを同期します。Data Integration 専用リソースグループを使用してインターネット経由でデータを同期する方法の詳細については、「IP アドレスホワイトリストを構成する」をご参照ください。

    データを同期するには、専用リソースグループが PolarDB-X インスタンスと Elasticsearch インスタンスが存在する VPC に接続されている必要があります。したがって、専用リソースグループを PolarDB-X インスタンスと Elasticsearch インスタンスの [VPC][ゾーン]、および [VSwitch] にアタッチする必要があります。インスタンスの VPC 情報を表示するには、「Elasticsearch インスタンスの基本情報を表示する」をご参照ください。

    重要

    VPC をアタッチした後、[VSwitch] の CIDR ブロックを PolarDB-X インスタンスと Elasticsearch インスタンスのプライベートアクセスホワイトリストに追加する必要があります。詳細については、「Elasticsearch インスタンスのパブリックまたはプライベートアクセスホワイトリストを構成する」をご参照ください。

  6. ページの左上隅にある戻るアイコンをクリックして、[リソースグループ] ページに戻ります。

  7. 作成した専用リソースグループを見つけ、[アクション] 列の [ワークスペースのアタッチ] をクリックして、ターゲットワークスペースをリソースグループにアタッチします。

    詳細については、「手順 2: Data Integration 専用リソースグループをワークスペースに関連付ける」をご参照ください。

手順 3: データソースを追加する

DataWorks の Data Integration サービスで PolarDB-X と Elasticsearch データソースを追加します。

  1. DataWorks の [Data Integration] ページに移動します。

    1. DataWorks コンソール にログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. ターゲットワークスペースの [操作] 列で、[クイックアクセス] > [Data Integration] を選択します。

  2. 左側のナビゲーションウィンドウで、[データソース] をクリックします。

  3. PolarDB-X データソースを追加します。

  4. [データソース] ページで、[データソースの追加] をクリックします。

  5. [データソースの追加] ページで、[DRDS] を検索して選択します。

  6. [DRDS データソースの追加] ページで、データソースのパラメータを構成し、接続性をテストします。接続テストが成功したら、[完了] をクリックします。

    詳細については、「PolarDB-X データソースを追加する」をご参照ください。

  7. 同じ方法で Elasticsearch データソースを追加します。詳細については、「Elasticsearch データソースを追加する」をご参照ください。

手順 4: データ同期タスクを構成して実行する

バッチ同期タスクは専用リソースグループで実行されます。リソースグループは Data Integration のデータソースからデータを取得し、Elasticsearch に書き込みます。

説明
  1. DataWorks の [データ開発] ページに移動します。

    1. DataWorks コンソール にログインします。

    2. 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。

    3. ターゲットワークスペースの [アクション] 列で、[クイックアクセス] > [データ開発] を選択します。

  2. バッチ同期タスクを作成します。

    1. 左側のナビゲーションウィンドウで、[データ開発] タブに移動します。image アイコンをクリックし、[新規] > [ビジネスフロー] を選択します。プロンプトに従ってビジネスフローを作成します。

    2. 作成したビジネスフローを右クリックし、[ノードの作成] > [バッチ同期] を選択します。

    3. [ノードの作成] ダイアログボックスで、ノードの名前を入力し、[確認] をクリックします。

  3. ネットワークとリソースを構成します。

    1. [ソース] セクションで、[ソース] を DRDS に設定し、[データソース] をデータを同期するデータソースの名前に設定します。

    2. [リソースグループ] セクションで、専用リソースグループを選択します。

    3. [デスティネーション] セクションで、[デスティネーション] を Elasticsearch に設定し、[データソース] をデータを同期するデータソースの名前に設定します。

  4. [次へ] をクリックします。

  5. タスクを構成します。

    1. [ソース] セクションで、データを同期するテーブルを選択します。

    2. [デスティネーション] セクションで、デスティネーションのパラメータを構成します。

    3. [フィールドマッピング] セクションで、[ソースフィールド][ターゲットフィールド] の間のマッピングを構成します。詳細については、「コードレス UI でオフライン同期タスクを構成する」をご参照ください。

      この例では、デフォルトの [ソースフィールド] が使用され、[デスティネーションフィールド] のみが変更されます。[デスティネーションフィールド] の右側にある 修改字段图标 アイコンをクリックします。表示されるダイアログボックスで、フィールド構成を入力します。

      {"name":"Name","type":"text"} // {"name":"名前", "type":"text"}
      {"name":"Platform","type":"text"} // {"name":"プラットフォーム", "type":"text"}
      {"name":"Year_of_Release","type":"date"} // {"name":"リリース年", "type":"date"}
      {"name":"Genre","type":"text"} // {"name":"ジャンル", "type":"text"}
      {"name":"Publisher","type":"text"} // {"name":"出版社", "type":"text"}
      {"name":"na_Sales","type":"float"} // {"name":"北米売上", "type":"float"}
      {"name":"EU_Sales","type":"float"} // {"name":"EU 売上", "type":"float"}
      {"name":"JP_Sales","type":"float"} // {"name":"日本売上", "type":"float"}
      {"name":"Other_Sales","type":"float"} // {"name":"その他売上", "type":"float"}
      {"name":"Global_Sales","type":"float"} // {"name":"グローバル売上", "type":"float"}
      {"name":"Critic_Score","type":"long"} // {"name":"批評家スコア", "type":"long"}
      {"name":"Critic_Count","type":"long"} // {"name":"批評家数", "type":"long"}
      {"name":"User_Score","type":"float"} // {"name":"ユーザー スコア", "type":"float"}
      {"name":"User_Count","type":"long"} // {"name":"ユーザー数", "type":"long"}
      {"name":"Developer","type":"text"} // {"name":"開発者", "type":"text"}
      {"name":"Rating","type":"text"} // {"name":"レーティング", "type":"text"}
      
    4. [チャネル制御] セクションで、チャネルパラメータを構成します。

    詳細については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。

  6. タスクを実行します。

    1. (オプション) タスクのスケジューリングプロパティを構成します。ページの右側にある [スケジューリング構成] をクリックし、必要に応じてスケジューリングパラメータを構成します。詳細については、「スケジューリング構成」をご参照ください。

    2. ツールバーの [保存] アイコンをクリックして、タスクを保存します。

    3. ツールバーの [送信] アイコンをクリックして、タスクを送信します。

      タスクのスケジューリングプロパティを構成すると、タスクは定期的に実行されます。ツールバーの [実行] アイコンをクリックして、タスクをすぐに実行することもできます。

      ログに Shell run successfully! というメッセージが含まれている場合、タスクは成功です。

手順 5: データ同期の結果を表示する

  1. デスティネーション Alibaba Cloud ES インスタンスの Kibana コンソールにログインします。

    詳細については、「Kibana コンソールにログインする」をご参照ください。

  2. 左側のナビゲーションウィンドウで、[Dev Tools] をクリックします。

  3. [コンソール] で、次のコマンドを実行して、デスティネーションのデータエントリ数をクエリします。

    説明

    デスティネーションのデータエントリ数とソースのデータエントリ数を比較して、すべてのデータが同期されていることを確認できます。

    GET drdstest/_search
    {
      "query": {
        "match_all": {}
      }
    }

    コマンドが成功すると、次の結果が返されます。查看目标端数据量

  4. 特定のフィールドからデータを取得するには、次のコマンドを実行できます。

    GET drdstest/_search
    {
      "query": {
        "term": {
          "Publisher.keyword": {
            "value": "Nintendo"
          }
        }
      }
    }

    コマンドが正常に実行されると、次の出力が返されます。对字段进行数据检索