すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:KafkaからSimple Log Serviceへのデータのインポート

最終更新日:Aug 30, 2024

このトピックでは、KafkaからSimple Log Serviceにデータをインポートする方法について説明します。 Simple Log Serviceにデータをインポートした後、Simple Log Serviceでデータを照会、分析、変換できます。

前提条件

  • Kafkaクラスターが利用可能です。

  • プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトの作成」および「Logstore の作成」をご参照ください。

サポートされているバージョン

Kafka 2.2.0以降のみがサポートされています。

データインポート設定の作成

  1. Simple Log Serviceコンソール.

  2. クイックデータのインポートセクションで、[データのインポート] をクリックします。 表示されるダイアログボックスの [データインポート] タブで、[Kafka-データインポート] をクリックします。

  3. プロジェクトとLogstoreを選択します。 そして、[次へ] をクリックします。

  4. データインポート設定のパラメーターを設定します。

    1. 設定のインポートステップで、次のパラメータを設定します。

      パラメーター

      説明

      ジョブ名

      インポートジョブのID。

      表示名

      インポートジョブの名前。

      ジョブの説明

      インポートジョブの説明。

      Endpoint

      Kafkaクラスターへの接続に使用されるアドレス。 アドレスは、Kafkaクラスター用に設定されているbootstrap.serversフィールドから取得できます。 複数の IP アドレスはカンマ (,) で区切ります。

      • Alibaba Cloud ApsaraMQ for Kafkaインスタンスによって提供されるKafkaクラスターを使用する場合、インスタンスエンドポイントのIPアドレスまたはドメイン名を入力する必要があります。

      • Alibaba Cloud Elastic Compute Service (ECS) インスタンスにデプロイされているKafkaクラスターを使用する場合、ECSインスタンスのIPアドレスを入力する必要があります。

      • 他のKafkaクラスターを使用する場合は、KafkaクラスターのブローカーのパブリックIPアドレスまたはドメイン名を入力する必要があります。

      トピック

      カフカのトピック。 複数入力するときは、コンマ ( , ) で区切ります。

      消費者グループ

      Alibaba Cloud ApsaraMQ for Kafkaインスタンスによって提供されるKafkaクラスターを使用し、フレキシブルグループ作成機能を有効にしない場合、コンシューマーグループを選択する必要があります。 機能の詳細については、「フレキシブルグループ作成機能の使用」をご参照ください。 コンシューマーグループの作成方法の詳細については、「コンシューマーグループの作成」をご参照ください。

      開始位置

      システムがデータのインポートを開始する位置。 有効な値:

      • 初期: システムは、存在する最初のKafkaデータエントリからデータのインポートを開始します。

      • Latest: 生成された最新のKafkaデータエントリからデータのインポートが開始されます。

      データフォーマット

      インポートするデータの形式。 有効な値:

      • シンプルモード: インポートするデータが1行形式の場合は、シンプルモードを選択します。

      • JSON文字列: インポートするデータがJSON形式の場合、JSON文字列を選択します。 インポートジョブは、インポートされたデータをキーと値のペアに解析し、データの最初のレイヤーのみを解析します。

      配列要素の解析

      配列要素の解析 をオンにすると、JSON配列形式のデータが配列要素に基づいて複数のデータに分割され、データがインポートされます。

      エンコード形式

      インポートするデータのエンコード形式または文字セット。 有効な値: UTF-8とGBK。

      VPCベースのインスタンスID

      ApsaraMQ for KafkaインスタンスまたはECSインスタンスが仮想プライベートクラウド (VPC) にある場合、VPCのIDを指定して、Simple Log ServiceがAlibaba cloudの内部ネットワークを介してKafkaクラスターからデータを読み取ることができます。

      Alibaba Cloudの内部ネットワークを介して読み取られたデータは、より高いセキュリティとネットワークの安定性を提供します。

      重要

      100.104.0.0/16 CIDRブロックからKafkaクラスターにアクセスできることを確認します。

      時間設定

      時刻フィールド

      ログ時間を記録するために使用される時間フィールド。 Kafkaデータに時間を表す列の名前を入力できます。

      正規表現で時間を抽出

      データ形式シンプルモードに設定した場合、Kafkaデータから時間を抽出する正規表現を指定する必要があります。

      たとえば、Kafkaデータエントリがmessage with time 2022-08-08 14:20:20の場合、正規表現to Extract Time\d \d \d \d-\d \d \d \d \d \d \d \d:\d \d:\d \d \dを設定できます

      時刻フィールドの形式

      時刻フィールドの値を解析するために使用される時刻の形式です。

      • Java SimpleDateFormatでサポートされている時間形式を指定できます。 例: yyyy-MM-dd HH:mm:ss。 時間形式の構文の詳細については、「Class SimpleDateFormat」をご参照ください。 一般的な時間形式の詳細については、「時間形式」をご参照ください。

      • エポック時間形式を指定できます。 有効な値: epoch、epochMillis、epochMacro、epochNano。

      タイムゾーン

      タイムフィールドのタイムゾーン。

      タイムフィールド形式をエポック時間形式に設定した場合、タイムゾーンを設定する必要はありません。

      デフォルトのタイムソース

      時間抽出情報が提供されていない場合、または時間抽出が失敗した場合、システムは指定した時間ソースを使用します。 有効な値: Current System TimeおよびKafka Message Timestamp。

      詳細設定

      ログコンテキスト

      ログコンテキストをオンにすると、コンテキストクエリ機能を使用できます。 ソースKafkaパーティションにインポートするデータのコンテキストを表示できます。

      通信プロトコル

      Kafkaクラスターへの接続に使用される通信プロトコルに関する情報。 インターネット経由でデータをインポートする場合は、Simple Log ServiceとKafkaクラスター間の接続を暗号化し、ユーザー認証を実装することをお勧めします。 次のサンプルコードに例を示します。

      protocolフィールドは、plaintext、ssl、sasl_plaintext、およびsasl_sslの値をサポートします。 推奨値はsasl_sslで、接続暗号化とユーザー認証が必要です。

      protocolをsasl_plaintexまたはsasl_sslに設定した場合、saslノードも設定する必要があります。 saslノードの下のmechanismフィールドは、PLAIN、SCRAM-SHA-256、およびSCRAM-SHA-512の値をサポートします。 このフィールドは、ユーザー名とパスワードの認証メカニズムを指定します。

      {
          "protocol":"sasl_plaintext",
           "sasl":{
              "mechanism":"PLAIN",
              "username":"xxx",
              "password":"yyy"
          }
      }

      プライベートドメイン解決

      ECSインスタンスにデプロイされたKafkaクラスターを使用し、クラスター内のブローカーが内部エンドポイントを介して相互に接続されている場合、ブローカーごとにECSインスタンスのエンドポイントとIPアドレスを指定する必要があります。 例:

      {
      "hostname#1":"192.168.XX.XX",
      "hostname#2":"192.168.XX.XX",
      "hostname#3":"192.168.XX.XX"
      }
    2. プレビューインポート結果をプレビューします。

    3. 結果を確認したら、をクリックします。次へ.

  5. データをプレビューし、インデックスを設定し、[次へ] をクリックします。

    デフォルトでは、Log Serviceでフルテキストインデックスが有効になっています。 手動モードまたは自動モードで収集したログに基づいてフィールドインデックスを設定することもできます。 自動モードでフィールドインデックスを設定するには、[自動インデックス生成] をクリックします。 これにより、Log Serviceは自動的にフィールドインデックスを作成します。 詳細については、「インデックスの作成」をご参照ください。

    重要

    ログをクエリおよび分析する場合は、フルテキストインデックス作成またはフィールドインデックス作成を有効にする必要があります。 フルテキストインデックスとフィールドインデックスの両方を有効にすると、フィールドインデックスのみが使用されます。

  6. [クエリログ] をクリックします。 クエリと分析ページで、Kafkaデータがインポートされているかどうかを確認します。

    約 1 分待ちます。 必要なKafkaデータが存在する場合、データがインポートされます。

データインポート設定の表示

データインポート設定を作成した後、Simple Log Serviceコンソールで設定の詳細と関連する統計レポートを表示できます。

  1. [プロジェクト] セクションで、データインポート設定が属するプロジェクトをクリックします。

  2. データインポート設定が属するLogstoreを見つけてクリックし、データ収集 > データインポートデータインポート設定の名前をクリックします。

  3. インポート設定の概要ページで、データインポート設定の基本情報と統計レポートを表示します。

次のステップ

[インポート設定の概要] ページでは、データインポート設定に対して以下の操作を実行できます。

  • データインポート設定の変更

    データインポート設定を変更するには、[設定の編集] をクリックします。 詳細については、「データインポート設定の作成」をご参照ください。

  • データインポート設定の削除

    データインポート設定を削除するには、[設定の削除] をクリックします。

    警告

    データインポート設定が削除された後、復元することはできません。

  • インポートジョブの停止

    データインポートジョブを停止するには、[停止] をクリックします。

よくある質問

問題の説明

考えられる原因

解決策

プレビュー中にブローカー接続エラーが発生しました。 エラーコード: ブローカートランスポートの失敗。

  • Kafkaクラスターに接続するために指定されたアドレスが無効です。

  • Kafkaクラスターにアクセスするためにインポートジョブによって使用されるIPアドレスは、クラスターのホワイトリストに追加されません。 その結果、インポートジョブはクラスターにアクセスできません。

  • KafkaクラスターはAlibaba Cloudにデプロイされていますが、VPCベースのインスタンスIDパラメーターは設定されていません。

  • Kafkaクラスターに指定されたアドレスが有効であることを確認してください。

  • インポートジョブがKafkaクラスターにアクセスするために使用するIPアドレスをクラスターのホワイトリストに追加します。 詳細については、「IPアドレスのホワイトリスト」をご参照ください。

  • Alibaba Cloudの内部ネットワークを介してKafkaクラスターからデータをインポートする場合は、VPCベースのインスタンスIDパラメーターが設定されていることを確認してください。

プレビュー中にタイムアウトエラーが発生します。 エラーコード: preview request timed out.

データインポート設定で指定されているKafkaトピックにはデータが含まれていません。

Kafkaトピックにデータが含まれていない場合は、トピックにデータを書き込み、データを再度プレビューします。

インポートしたデータに文字化けが存在します。

データインポート設定で指定されたエンコード形式は、要件を満たしていません。

Kafkaデータの実際のエンコード形式に基づいて、データインポート設定を更新します。

既存の文字化け文字を処理するには、Logstoreとデータインポート設定を作成します。

Simple log Service に表示されるログ時刻が、インポートされたデータの実際の時刻と異なります。

データインポート設定で時間フィールドが指定されていないか、指定された時間形式またはタイムゾーンが無効です。

時間フィールドを指定するか、有効な時間形式とタイムゾーンを指定します。 詳細については、「データインポート設定の作成」をご参照ください。

データのインポート後に、データをクエリまたは分析できません。

  • データがクエリ期間内にありません。

  • インデックスが設定されていません。

  • 設定されたインデックスは有効になりませんでした。

  • クエリ対象のデータの時刻が、指定したクエリ期間内にあるかどうかを確認します。

    いいえの場合、クエリ時間範囲を調整し、データを再度クエリします。

  • データのインポート先の Logstore にインデックスが設定されているかどうかを確認します。

    いいえの場合、最初にインデックスを設定します。 詳細については、「インデックスの作成」および「Logstore に対するログの再インデックス」をご参照ください。

  • Logstoreにインデックスが設定されており、インポートされたデータの量が [data Processing Insight] ダッシュボードに期待どおりに表示されている場合、考えられる原因はインデックスが有効にならないことです。 この場合、データのインデックスを再作成します。 詳細については、「Logstore に対するログの再インデックス」をご参照ください。

インポートされたデータエントリの数が想定を下回っています。

一部のKafkaメッセージのサイズが3 MBを超えています。 Kafkaメッセージのサイズは、Data Processing Insightダッシュボードで確認できます。

各Kafkaメッセージのサイズが3 MBを超えないようにしてください。

インポート中に大きなレイテンシが存在します。

  • Kafkaクラスターの帯域幅制限に達しました。

  • インターネット経由でデータをインポートすると、ネットワークが不安定になります。

  • Kafkaトピックのパーティション数が少なすぎます。

  • Logstore 内のシャードの数が少なすぎます。

  • その他の考えられる原因の詳細については、「パフォーマンスの制限」をご参照ください。

  • Kafkaクラスター、特にAlibaba CloudにデプロイされているKafkaクラスターのトラフィックが帯域幅制限に達しているかどうかを確認します。 トラフィックが帯域幅制限に達した場合、またはそれに近づいた場合は、クラスターの帯域幅リソースをスケールアウトします。

  • Kafkaトピックのパーティション数が少なすぎる場合は、パーティション数を増やしてレイテンシを監視します。

  • Logstore内のシャードの数が少なすぎる場合は、シャードの数を増やしてレイテンシを監視します。 詳細については、「シャードの管理」をご参照ください。

エラー処理

項目

説明

ネットワーク接続エラーが発生しました。

インポートジョブは定期的にリトライされます。 ネットワーク接続が復元された後、インポートジョブは、以前のデータインポート中断のオフセットからデータを消費し続けます。

Kafkaトピックは存在しません。

インポートするデータを含むKafkaトピックが存在しない場合、インポートジョブはトピックをスキップします。 これは、他の通常のトピックのデータインポートには影響しません。

トピックが再作成されると、インポートジョブはトピック内のデータを期待どおりに消費し、約10分の遅延が発生します。