Alibaba Cloud は、幅広いクラウドストレージとデータベースサービスを提供します。 これらのサービスに保存されているデータの分析や検索をする場合、Data Integration を使用してデータを Elasticsearch にレプリケートしてから、データのクエリや分析をします。 Data Integration を使用すると、最小 5 分間隔でデータをレプリケートできます。

データのレプリケートにより、パブリックネットワークトラフィックが生成され、課金が発生する可能性があります。

前提条件

オンプレミスデータの分析や検索をするには、次の手順に従います。

  • テーブルの作成と表示 、およびデータのインポートを行います。 Hadoop から MaxCompute へデータを移行してから、データを同期します。 この例では、以下のテーブルスキームとデータを使用します。

  • Data Integration でレプリケートされたデータを保存するための Elasticsearch インスタンスを作成します。

  • Elasticsearch と同じ VPC を共有する ECS インスタンスを購入します。 この ECS インスタンスでデータを取得し、Elasticsearch タスクを実行します (これらのタスクは、Data Integration によって送信されます)。

  • Data Integration を有効化し、タスクを実行可能なリソースとして ECS インスタンスを Data Integration に登録します。

  • データ同期スクリプトを設定し、定期的に実行します。

手順

  1. Elasticsearch インスタンスと ECS インスタンスの作成
    1. IPv4 VPC の作成を行います。 この例では、VPC を中国 (杭州) リージョンに作成します。 インスタンス名は es_test_vpc、VSwitch 名は es_test_switch です。

    2. Elasticsearch コンソールにログインし 、Elasticsearch インスタンスを作成します。
      前の手順で作成した VPC と同じリージョンVPC VSwitch を選択するようにします。
    3. Elasticsearch インスタンスと同じ VPC にある ECS インスタンスを購入し、パブリック IP アドレスを割り当てるか、EIP を有効化します。 コストを節約するため、要件を満たす既存の ECS インスタンスを使用することを推奨します。

      この例では、杭州 (中国東部1) ゾーン F に ECS インスタンスを作成します。 [64-bit CentOS 7.4][パブリック IP の割り当て] を選択して、ネットワークを設定します (次図)。

      • CentOS 6、CentOS 7、または Aliyun Linux を使用することを推奨します。
      • ECS インスタンスで MaxCompute タスクまたはデータ同期タスクを実行する場合、ECS インスタンスの Python バージョンが 2.6 か 2.7 であることを確認する必要があります。 CentOS 5 をインストールすると、Python 2.4 もインストールされます。 他の CentOS バージョンの Python は 2.6 以降です。
      • ECS インスタンスにパブリック IP アドレスが割り当てられていることを確認します。
  2. データ同期タスクの設定
    1. DataWorks コンソールにログインしてプロジェクトを作成します。 この例では、 bigdata_DOC という名前の DataWorks プロジェクトを使用します。

      • Data Integration が既に有効化されている場合、次のページが表示されます。
      • Data Integration が有効化されていない場合、次のページが表示されます。Data Integration を有効化するには、次の手順を実行します。 このサービスを有効化すると、料金が発生します。料金設定ルールに基づいて見積もることができます。
    2. DataWorks プロジェクトの下にある [Data Integration] をクリックします。

    3. リソースグループを作成します。

      1. [Data Integration] ページの左側のナビゲーションウィンドウで、[Resource Group] を選択し、[Add Resource Group] をクリックします。

      2. リソースグループを追加するには、次の手順に従います。

        1. リソースグループを作成します。リソースグループ名を入力します。 この例では、リソースグループに es_test_resource という名前を付けます。
        2. サーバーを追加します。
          • ECS UUID:手順 3: インスタンスへの接続。 ECS インスタンスにログインし、dmidecode | grep UUID コマンドを実行して戻り値を取得します。

          • Machine IP/Machine CPU (Cores)/Memory Size (GB):ECS インスタンスのパブリック IP アドレス、CPU コア、メモリサイズを指定します。 ECS コンソーにログインし、ECS インスタンス名をクリックすると、[設定情報] に、情報が表示されます。

        3. エージェントをインストールします。次の手順に従って、エージェントのインストールを完了します。 この例では、VPC を使用します。 したがって、インスタンスのポート 8000 を開く必要はありません。
        4. 接続を確認します。接続が正常に確立されると、ステータスが [Available] に変わります。 ステータスが [Unavailable] の場合、ECS インスタンスにログインし、tail -f /home/admin/alisatasknode/logs/heartbeat.log コマンドを実行して DataWorks と ECS インスタンス間のハートビートメッセージがタイムアウトしたかどうかを確認します。
    4. データソースを追加します。

      1. [Data Integration] ページの左側のナビゲーションウィンドウで、[Data Source] を選択し、[Add Data Source] をクリックします。

      2. ソースタイプとして [MaxCompute] を選択します。

      3. データソースに関する情報を入力します。 この例では、odps_es という名前のデータソースを作成します (次図)。

        • ODPS workspace name:DataWorks の [Data Analytics] ページの左上隅のアイコンの右側に、テーブルのワークスペース名が表示されます (次図) 。

        • AccessKeyId /AccessKeySecrete:ユーザー名の上にポインターを移動し、[User Info] を選択します (次図)。 [Personal Account] ページでアバターの上にポインターを移動し、[accesskeys] をクリックします (次図)。
    5. 同期タスクを設定します。

      1. [Data Analytics] ページの左側のナビゲーションウィンドウで、[Data Analytics] アイコンをクリックし、[Business Flow] をクリックします。
      2. ターゲットビジネスフローをクリックし、[Data Integration] を選択し、[Create Data Integration Node] > [Data Sync] を選択してから、同期ノード名を入力します。
      3. 同期ノードが正常に作成されたら、新しい同期ノードページの上部にある [Switch to Script Mode] アイコンをクリックし、[Confirm] を選択します。
      4. [Script Mode] ページの上部にある [Apply Template] アイコンをクリックします。 [Source Type]、[Data Source]、[Destination Type]、[Data Source] のオプションに情報を入力し、[OK] をクリックして初期スクリプトを生成します。
      5. データ同期スクリプトの設定を行います。 Elasticsearch の設定ルールの詳細は、「writer プラグインの設定」をご参照ください。
        • 同期スクリプトの設定には、Reader、Writer、Setting の 3 つの部分があります。 Reader では、データを同期するソースクラウドサービスを設定します。 Write では、Elasticsearch の設定ファイルを設定します。 Setting では、パケットロスと最大同時タスクを設定します。
        • Endpoint には、Elasticsearch インスタンスのプライベートまたはパブリック IP アドレスを指定します。 この例では、プライベート IP アドレスを使用します。 したがって、ホワイトリストは不要です。 外部 IP アドレスを使用する場合、Elasticsearch の [ネットワークとスナップショット] ページで、Elasticsearch へのアクセスが許可されているパブリック IP アドレスを含むホワイトリストを設定する必要があります。 ホワイトリストには、DataWorks サーバーの IP アドレスと、使用するリソースグループを指定する必要があります。
        • Elasticsearch インスタンスへのログインに使用されるユーザー名とパスワードを Elasticsearch Writer の accessIdaccesskey に設定する必要があります。
        • Elasticsearch インスタンスのインデックス名を index に入力します。 Elasticsearch インスタンスのデータにアクセスするには、このインデックス名を使用する必要があります。 この例では、es_index という名前の インデックスを使用します。
        • MaxCompute テーブルがパーティションテーブルの場合、partition フィールドにパーティション情報を設定する必要があります。 この例のパーティション情報は、pt=1 です。
        サンプル設定コード:
        {
        "configuration": {
        "reader": {
        "plugin": "odps",
        "parameter": {
          "partition": "pt=1",
          "datasource": "odps_es",
          "column": [
            "create_time",
            "category",
            "brand",
            "buyer_id",
            "trans_num",
            "trans_amount",
            "click_cnt"
          ],
          "table": "hive_doc_good_sale"
        }
        },
        "writer": {
        "plugin": "elasticsearch",
        "parameter": {
          "accessId": "elastic",
          "endpoint": "http://es-cn-mpXXXXXXX.elasticsearch.aliyuncs.com:9200",
          "indexType": "elasticsearch",
          "accessKey": "XXXXXX",
          "cleanup": true,
          "discovery": false,
          "column": [
            {
              "name": "create_time",
              "type": "string"
            },
            {
              "name": "category",
              "type": "string"
            },
            {
              "name": "brand",
              "type": "string"
            },
            {
              "name": "buyer_id",
              "type": "string"
            },
            {
              "name": "trans_num",
              "type": "long"
            },
            {
              "name": "trans_amount",
              "type": "double"
            },
            {
              "name": "click_cnt",
              "type": "long"
            }
          ],
          "index": "es_index",
          "batchSize": 1000,
          "splitter": ",",
        }
        },
        "setting": {
        "errorLimit": {
          "record": "0"
        },
        "speed": {
          "throttle": false,
          "concurrent": 1,
          "mbps": "1",
          "dmu": 1
        }
        }
        },
        "Type": "job ",
        "version": "1.0"
        }
      6. スクリプトが同期されたら、[Run] をクリックして、ODPS データを Elasticsearch と同期させます。

  3. 結果の検証
    1. Elasticsearch コンソールにログインし、右上隅の Kibana コンソールをクリックし、[Dev Tools] を選択します。

    2. 次のコマンドを実行して、データが Elasticsearch に正常にレプリケートされたことを確認します。
      POST /es_index/_search? pretty
      {
      "query": { "match_all": {}}
      }

      es_index は、データ同期中の index フィールドの値です。

      データが正常に同期されると、次のページが表示されます。

    3. 次のコマンドを実行して、trans_num フィールドを基準にドキュメントをソートします。

      POST /es_index/_search? pretty
      {
      "query": { "match_all": {} },
      "sort": { "trans_num": { "order": "desc" } }
      }
    4. 次のコマンドを実行して、ドキュメント内の category フィールドと brand フィールドを検索します。

      POST /es_index/_search? pretty
      {
      "query": { "match_all": {} },
      "_source": ["category", "brand"]
      }
    5. 次のコマンドを実行して、 categoryfresh のドキュメントをクエリします。

      POST /es_index/_search? pretty
      {
      "query": { "match": {"category":"fresh"} }
      }

      詳細は、「Alibaba Cloud Elasticsearch インスタンスへのアクセス」と『Elastic help center』をご参照ください。

よくある質問

Elasticsearch インスタンスに接続するときにエラーが発生します
  1. 同期スクリプトを実行する前に、前の手順で作成したリソースグループが、右側の [configuration tasks resources group] メニューで選択されているかどうかを確認してください。

    • リソースグループが選択されている場合、次の手順に進みます。
    • リソースグループが選択されていない場合、右側の [configuration tasks resources group] メニューで、作成したリソースグループを選択し、[Run] をクリックします。
  2. endpoint、accessId、accesskey など、同期スクリプトの設定が正しいかどうかを確認してください。 endpoint には、Elasticsearch インスタンスのプライベートまたはパブリック IP アドレスを指定します。 パブリック IP アドレスを使用する場合、ホワイトリストを設定してください。 accessId には、Elasticsearch インスタンスへのアクセスに使用されるユーザー名を指定します。デフォルトは elastic です。 accesskey には、Elasticsearch インスタンスへのアクセスに使用されるパスワードを指定します。