Alibaba Cloud Elasticsearch を使用して、分散型リレーショナルデータベースサービス (DRDS) のアップグレードバージョンである PolarDB-X のデータに対して全文検索とセマンティック分析を実行できます。DataWorks の Data Integration サービスを使用すると、PolarDB-X から Alibaba Cloud ES に大量のデータを数分で同期できます。
背景情報
DataWorks は、データ開発、タスクスケジューリング、データ管理などの機能を統合した、ビッグデータ開発とガバナンスのための包括的なプラットフォームです。DataWorks の同期タスクを使用して、さまざまなデータソースから Alibaba Cloud ES にデータをすばやく移動できます。
以下のデータソースがサポートされています。
Alibaba Cloud データベース: ApsaraDB for RDS (MySQL、PostgreSQL、SQL Server)、ApsaraDB for MongoDB、および ApsaraDB for HBase
Alibaba Cloud PolarDB-X (DRDS からアップグレード)
Alibaba Cloud MaxCompute
Alibaba Cloud Object Storage Service (OSS)
Alibaba Cloud Tablestore
セルフホスト型データソース (HDFS、Oracle、FTP、DB2 など)
シナリオ:
データベースまたはテーブルから Alibaba Cloud Elasticsearch にビッグデータをオフラインモードで同期します。詳細については、「バッチ同期タスクを作成してデータベース内のすべてのデータを Elasticsearch に同期する」をご参照ください。
完全および増分ビッグデータをリアルタイムで Alibaba Cloud Elasticsearch に同期します。詳細については、「MySQL データベース全体をリアルタイムで Elasticsearch に同期する」をご参照ください。
前提条件
PolarDB-X インスタンスを作成済みであること。詳細については、 を参照してください。
Alibaba Cloud Elasticsearch クラスタを作成し、クラスタの自動インデックス作成機能を有効にしていること。詳細については、「Alibaba Cloud Elasticsearch クラスタを作成する」および「YML ファイルを構成する」をご参照ください。
DataWorks ワークスペースを作成済みであること。詳細については、「ワークスペースを作成する」をご参照ください。
Alibaba Cloud ES にのみデータを同期できます。セルフマネージド Elasticsearch クラスタはサポートされていません。
PolarDB-X インスタンス、ES インスタンス、および DataWorks ワークスペースは、同じリージョンにある必要があります。
PolarDB-X インスタンス、ES インスタンス、および DataWorks ワークスペースは、同じタイムゾーンにある必要があります。そうでない場合、同期後にソースデータとデスティネーションデータの間にタイムゾーンの差が生じる可能性があります。
課金
ES インスタンス料金の詳細については、「ES 課金項目」をご参照ください。
Data Integration 専用リソースグループの課金の詳細については、「Data Integration 専用リソースグループの課金 (サブスクリプション)」をご参照ください。
手順
手順 1: ソースデータを準備する
PolarDB-X 1.0 インスタンスにデータを挿入します。
詳細については、 および 基本的な SQL 操作 を参照してください。次の図は、このトピックで使用されているテストデータを示しています。

手順 2: 専用リソースグループを購入して作成する
Data Integration 専用リソースグループを購入し、VPC とワークスペースをリソースグループにアタッチします。専用リソースグループは、高速で安定したデータ転送を保証します。
DataWorks コンソール にログインします。
上部のメニューバーで、リージョンを選択します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックします。
[リソースグループ] タブで、 をクリックします。
[DataWorks 専用リソース] [ (サブスクリプション)] ページで、[リソース] [タイプ] を [Data Integration 専用リソースグループ] に設定し、リソースグループの名前を入力して、[今すぐ購入] をクリックして専用リソースグループを購入します。
詳細については、「手順 1: Data Integration 専用リソースグループを作成する」をご参照ください。
作成した専用リソースグループの [アクション] 列で、[ネットワーク設定] をクリックして、仮想プライベートクラウド (VPC) をアタッチします。詳細については、「VPC をアタッチする」をご参照ください。
説明この例では、Data Integration 専用リソースグループを使用して、VPC 経由でデータを同期します。Data Integration 専用リソースグループを使用してインターネット経由でデータを同期する方法の詳細については、「IP アドレスホワイトリストを構成する」をご参照ください。
データを同期するには、専用リソースグループが PolarDB-X インスタンスと Elasticsearch インスタンスが存在する VPC に接続されている必要があります。したがって、専用リソースグループを PolarDB-X インスタンスと Elasticsearch インスタンスの [VPC]、[ゾーン]、および [VSwitch] にアタッチする必要があります。インスタンスの VPC 情報を表示するには、「Elasticsearch インスタンスの基本情報を表示する」をご参照ください。
重要VPC をアタッチした後、[VSwitch] の CIDR ブロックを PolarDB-X インスタンスと Elasticsearch インスタンスのプライベートアクセスホワイトリストに追加する必要があります。詳細については、「Elasticsearch インスタンスのパブリックまたはプライベートアクセスホワイトリストを構成する」をご参照ください。
ページの左上隅にある戻るアイコンをクリックして、[リソースグループ] ページに戻ります。
作成した専用リソースグループを見つけ、[アクション] 列の [ワークスペースのアタッチ] をクリックして、ターゲットワークスペースをリソースグループにアタッチします。
詳細については、「手順 2: Data Integration 専用リソースグループをワークスペースに関連付ける」をご参照ください。
手順 3: データソースを追加する
DataWorks の Data Integration サービスで PolarDB-X と Elasticsearch データソースを追加します。
DataWorks の [Data Integration] ページに移動します。
DataWorks コンソール にログインします。
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
ターゲットワークスペースの [操作] 列で、 を選択します。
左側のナビゲーションウィンドウで、[データソース] をクリックします。
PolarDB-X データソースを追加します。
[データソース] ページで、[データソースの追加] をクリックします。
[データソースの追加] ページで、[DRDS] を検索して選択します。
[DRDS データソースの追加] ページで、データソースのパラメータを構成し、接続性をテストします。接続テストが成功したら、[完了] をクリックします。
詳細については、「PolarDB-X データソースを追加する」をご参照ください。
同じ方法で Elasticsearch データソースを追加します。詳細については、「Elasticsearch データソースを追加する」をご参照ください。
手順 4: データ同期タスクを構成して実行する
バッチ同期タスクは専用リソースグループで実行されます。リソースグループは Data Integration のデータソースからデータを取得し、Elasticsearch に書き込みます。
コードレス UI またはコードエディタを使用して、バッチ同期タスクを構成できます。このトピックでは、コードレス UI を例として使用します。コードエディタを使用してバッチ同期タスクを構成する方法の詳細については、「コードエディタを使用してバッチ同期タスクを構成する」および「Elasticsearch Writer」をご参照ください。
このトピックでは、従来のデータ開発 (DataStudio) でオフライン同期タスクを作成する方法について説明します。
DataWorks の [データ開発] ページに移動します。
DataWorks コンソール にログインします。
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
ターゲットワークスペースの [アクション] 列で、 を選択します。
バッチ同期タスクを作成します。
左側のナビゲーションウィンドウで、[データ開発] タブに移動します。
アイコンをクリックし、 を選択します。プロンプトに従ってビジネスフローを作成します。作成したビジネスフローを右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、ノードの名前を入力し、[確認] をクリックします。
ネットワークとリソースを構成します。
[ソース] セクションで、[ソース] を DRDS に設定し、[データソース] をデータを同期するデータソースの名前に設定します。
[リソースグループ] セクションで、専用リソースグループを選択します。
[デスティネーション] セクションで、[デスティネーション] を Elasticsearch に設定し、[データソース] をデータを同期するデータソースの名前に設定します。
[次へ] をクリックします。
タスクを構成します。
[ソース] セクションで、データを同期するテーブルを選択します。
[デスティネーション] セクションで、デスティネーションのパラメータを構成します。
[フィールドマッピング] セクションで、[ソースフィールド] と [ターゲットフィールド] の間のマッピングを構成します。詳細については、「コードレス UI でオフライン同期タスクを構成する」をご参照ください。
この例では、デフォルトの [ソースフィールド] が使用され、[デスティネーションフィールド] のみが変更されます。[デスティネーションフィールド] の右側にある
アイコンをクリックします。表示されるダイアログボックスで、フィールド構成を入力します。{"name":"Name","type":"text"} // {"name":"名前", "type":"text"} {"name":"Platform","type":"text"} // {"name":"プラットフォーム", "type":"text"} {"name":"Year_of_Release","type":"date"} // {"name":"リリース年", "type":"date"} {"name":"Genre","type":"text"} // {"name":"ジャンル", "type":"text"} {"name":"Publisher","type":"text"} // {"name":"出版社", "type":"text"} {"name":"na_Sales","type":"float"} // {"name":"北米売上", "type":"float"} {"name":"EU_Sales","type":"float"} // {"name":"EU 売上", "type":"float"} {"name":"JP_Sales","type":"float"} // {"name":"日本売上", "type":"float"} {"name":"Other_Sales","type":"float"} // {"name":"その他売上", "type":"float"} {"name":"Global_Sales","type":"float"} // {"name":"グローバル売上", "type":"float"} {"name":"Critic_Score","type":"long"} // {"name":"批評家スコア", "type":"long"} {"name":"Critic_Count","type":"long"} // {"name":"批評家数", "type":"long"} {"name":"User_Score","type":"float"} // {"name":"ユーザー スコア", "type":"float"} {"name":"User_Count","type":"long"} // {"name":"ユーザー数", "type":"long"} {"name":"Developer","type":"text"} // {"name":"開発者", "type":"text"} {"name":"Rating","type":"text"} // {"name":"レーティング", "type":"text"}[チャネル制御] セクションで、チャネルパラメータを構成します。
詳細については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。
タスクを実行します。
(オプション) タスクのスケジューリングプロパティを構成します。ページの右側にある [スケジューリング構成] をクリックし、必要に応じてスケジューリングパラメータを構成します。詳細については、「スケジューリング構成」をご参照ください。
ツールバーの [保存] アイコンをクリックして、タスクを保存します。
ツールバーの [送信] アイコンをクリックして、タスクを送信します。
タスクのスケジューリングプロパティを構成すると、タスクは定期的に実行されます。ツールバーの [実行] アイコンをクリックして、タスクをすぐに実行することもできます。
ログに
Shell run successfully!というメッセージが含まれている場合、タスクは成功です。
手順 5: データ同期の結果を表示する
デスティネーション Alibaba Cloud ES インスタンスの Kibana コンソールにログインします。
詳細については、「Kibana コンソールにログインする」をご参照ください。
左側のナビゲーションウィンドウで、[Dev Tools] をクリックします。
[コンソール] で、次のコマンドを実行して、デスティネーションのデータエントリ数をクエリします。
説明デスティネーションのデータエントリ数とソースのデータエントリ数を比較して、すべてのデータが同期されていることを確認できます。
GET drdstest/_search { "query": { "match_all": {} } }コマンドが成功すると、次の結果が返されます。

特定のフィールドからデータを取得するには、次のコマンドを実行できます。
GET drdstest/_search { "query": { "term": { "Publisher.keyword": { "value": "Nintendo" } } } }コマンドが正常に実行されると、次の出力が返されます。
