このトピックでは、データ統合を使用してオフラインデータを Elasticsearch にインポートする方法について説明します。
前提条件
Alibaba Cloud アカウントとその AccessKey ペアが作成されていること。詳細については、「DataWorks をアクティブ化する」の「前提条件」セクションをご参照ください。
MaxCompute がアクティブ化されていること。MaxCompute をアクティブ化すると、デフォルトの MaxCompute データソースが自動的に生成されます。Alibaba Cloud アカウントを使用して DataWorks コンソールにログオンします。
DataWorks コンソールでワークスペースが作成されていること。これにより、ワークスペース内の他のメンバーと共同でワークフローを開発し、ワークスペース内のデータとノードを維持できます。ワークスペースの作成方法については、「ワークスペースを作成する」をご参照ください。
説明RAM ユーザーとしてデータ統合ノードを作成する場合は、必要な権限を RAM ユーザーに付与します。RAM ユーザーの作成方法と RAM ユーザーへの権限の付与方法については、「RAM ユーザーを準備する」および「ワークスペースレベルのサービスに対する権限を管理する」をご参照ください。
必要なデータソースが準備されていること。詳細については、「MaxCompute データソース」をご参照ください。
バッチ同期ノードを作成する
DataStudio ページに移動します。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションペインで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
DataStudio ページの [スケジュールされたワークフロー] ペインで、目的のワークフローを見つけ、その名前をクリックします。[データ統合] を右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、[名前] パラメーターと [パス] パラメーターを設定します。
説明ノード名は 128 文字を超えることはできません。
[パス] パラメーターは、バッチ同期タスクを作成する自動トリガーワークフローを指定します。自動トリガーワークフローの作成方法については、「ワークフローを作成する」の「自動トリガーワークフローを作成する」セクションをご参照ください。
[確認] をクリックします。
バッチ同期ノードを設定する
バッチ同期ノードの設定タブで、上部ツールバーの [変換スクリプト] アイコンをクリックします。

[ヒント] メッセージで、[OK] をクリックしてコードエディターに切り替えます。
次のサンプルコードに基づいて、コードエディターでコードを編集します。
{ "configuration": { "setting": { "speed": { "concurrent": "1", // 並列スレッドの数。 "mbps": 1,// 最大転送速度。単位:MB/s。 } }, "reader": { "parameter": { "connection": [ { "table": [ "es_table" // ソーステーブルの名前。 ], "datasource": "px_mysql_OK" // ソースの名前。追加したソースの名前を使用することをお勧めします。 } ], "column": [ // データを読み取る列の名前。 "col_ip", "col_double", "col_long", "col_integer", "col_keyword", "col_text", "col_geo_point", "col_date" ], "where": "", // WHERE 句。 }, "plugin": "mysql" }, "writer": { "parameter": { "cleanup": true, // Elasticsearch にデータをインポートするたびに元のデータをクリアするかどうかを指定します。すべてのデータをインポートするか、インデックスを再作成する場合は、このパラメーターを true に設定します。増分データをインポートする場合は、このパラメーターを false に設定します。 "accessKey": "nimda", // X-Pack プラグインを使用する場合は、AccessKey シークレットを入力します。それ以外の場合は、ヌル文字列を入力します。X-Pack プラグインは Alibaba Cloud Elasticsearch に使用されます。したがって、AccessKey シークレットを入力する必要があります。 "index": "datax_test", // Elasticsearch クラスター内のインデックスの名前。インデックスがない場合は、プラグインによって自動的に作成されます。 "alias": "test-1-alias", // データのインポート後に追加するエイリアス。 "settings": { "index": { "number_of_replicas": 0, "number_of_shards": 1 } }, "batchSize": 1000, // 一度に書き込むデータレコードの数。 "accessId": "default", // X-Pack プラグインを使用する場合は、AccessKey ID を入力します。それ以外の場合は、ヌル文字列を入力します。X-Pack プラグインは Alibaba Cloud Elasticsearch に使用されます。したがって、AccessKey ID を入力する必要があります。 "endpoint": "http://xxx.xxxx.xxx:xxxx", // Elasticsearch クラスターのエンドポイント。Elasticsearch コンソールでエンドポイントを表示できます。 "splitter": ",", // 配列をインポートする場合は、区切り文字を指定します。 "indexType": "default", // Elasticsearch クラスター内のインデックスのタイプ名。 "aliasMode": "append", // データのインポート後にエイリアスを追加するモード。有効な値:append と exclusive。append:新しいエイリアスを追加します。exclusive:新しいエイリアスのみを保持します。 "column": [ // Elasticsearch の列。列の順序は、リーダーで指定された列の順序と同じです。 { "name": "col_ip", "type": "ip"// テキストタイプ。デフォルトのアナライザーが使用されます。 }, { "name": "col_double", "type": "string" }, { "name": "col_long", "type": "long" }, { "name": "col_integer", "type": "integer" }, { "name": "col_keyword", "type": "keyword" }, { "name": "col_text", "type": "text" }, { "name": "col_geo_point", "type": "geo_point" }, { "name": "col_date", "type": "date" } ], "discovery": false// 自動検出を有効にするかどうかを指定します。このパラメーターを true に設定します。 }, "plugin": "elasticsearch"// プラグイン名。名前は Elasticsearch Writer です。値を変更する必要はありません。 } }, "type": "job", "version": "1.0" }バッチ同期ノードの設定タブの上部ツールバーで、
アイコンをクリックし、次に
アイコンをクリックします。説明Elasticsearch にデータをインポートできるのは、コードエディターのみです。
バッチ同期ノードを保存した後に
アイコンをクリックすると、ノードはすぐに実行されます。
アイコンをクリックして、バッチ同期ノードをスケジューリングシステムにコミットすることもできます。スケジューリングシステムは、ノードに対して設定されたプロパティに基づいて、翌日以降にバッチ同期ノードを定期的に実行します。
追加情報
他のタイプのデータソースを使用する同期ノードの設定方法については、「MaxCompute データソース」のトピックをご参照ください。