このトピックでは、KafkaからSimple Log Serviceにデータをインポートする方法について説明します。 Simple Log Serviceにデータをインポートした後、Simple Log Serviceでデータを照会、分析、変換できます。
前提条件
Kafkaクラスターが利用可能です。
プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトの作成」および「Logstore の作成」をご参照ください。
サポートされているバージョン
Kafka 2.2.0以降のみがサポートされています。
データインポート設定の作成
クイックデータのインポートセクションで、[データのインポート] をクリックします。 表示されるダイアログボックスの [データインポート] タブで、[Kafka-データインポート] をクリックします。
プロジェクトとLogstoreを選択します。 そして、[次へ] をクリックします。
データインポート設定のパラメーターを設定します。
設定のインポートステップで、次のパラメータを設定します。
パラメーター
説明
ジョブ名
インポートジョブのID。
表示名
インポートジョブの名前。
ジョブの説明
インポートジョブの説明。
Endpoint
Kafkaクラスターへの接続に使用されるアドレス。 アドレスは、Kafkaクラスター用に設定されているbootstrap.serversフィールドから取得できます。 複数の IP アドレスはカンマ (,) で区切ります。
Alibaba Cloud ApsaraMQ for Kafkaインスタンスによって提供されるKafkaクラスターを使用する場合、インスタンスエンドポイントのIPアドレスまたはドメイン名を入力する必要があります。
Alibaba Cloud Elastic Compute Service (ECS) インスタンスにデプロイされているKafkaクラスターを使用する場合、ECSインスタンスのIPアドレスを入力する必要があります。
他のKafkaクラスターを使用する場合は、KafkaクラスターのブローカーのパブリックIPアドレスまたはドメイン名を入力する必要があります。
トピック
カフカのトピック。 複数入力するときは、コンマ ( , ) で区切ります。
消費者グループ
Alibaba Cloud ApsaraMQ for Kafkaインスタンスによって提供されるKafkaクラスターを使用し、フレキシブルグループ作成機能を有効にしない場合、コンシューマーグループを選択する必要があります。 機能の詳細については、「フレキシブルグループ作成機能の使用」をご参照ください。 コンシューマーグループの作成方法の詳細については、「コンシューマーグループの作成」をご参照ください。
開始位置
システムがデータのインポートを開始する位置。 有効な値:
初期: システムは、存在する最初のKafkaデータエントリからデータのインポートを開始します。
Latest: 生成された最新のKafkaデータエントリからデータのインポートが開始されます。
データフォーマット
インポートするデータの形式。 有効な値:
シンプルモード: インポートするデータが1行形式の場合は、シンプルモードを選択します。
JSON文字列: インポートするデータがJSON形式の場合、JSON文字列を選択します。 インポートジョブは、インポートされたデータをキーと値のペアに解析し、データの最初のレイヤーのみを解析します。
配列要素の解析
配列要素の解析 をオンにすると、JSON配列形式のデータが配列要素に基づいて複数のデータに分割され、データがインポートされます。
エンコード形式
インポートするデータのエンコード形式または文字セット。 有効な値: UTF-8とGBK。
VPCベースのインスタンスID
ApsaraMQ for KafkaインスタンスまたはECSインスタンスが仮想プライベートクラウド (VPC) にある場合、VPCのIDを指定して、Simple Log ServiceがAlibaba cloudの内部ネットワークを介してKafkaクラスターからデータを読み取ることができます。
Alibaba Cloudの内部ネットワークを介して読み取られたデータは、より高いセキュリティとネットワークの安定性を提供します。
重要100.104.0.0/16 CIDRブロックからKafkaクラスターにアクセスできることを確認します。
時間設定
時刻フィールド
ログ時間を記録するために使用される時間フィールド。 Kafkaデータに時間を表す列の名前を入力できます。
正規表現で時間を抽出
データ形式をシンプルモードに設定した場合、Kafkaデータから時間を抽出する正規表現を指定する必要があります。
たとえば、Kafkaデータエントリが
message with time 2022-08-08 14:20:20の場合、正規表現to Extract Timeを\d \d \d \d-\d \d \d \d \d \d \d \d:\d \d:\d \d \dを設定できます。時刻フィールドの形式
時刻フィールドの値を解析するために使用される時刻の形式です。
Java SimpleDateFormatでサポートされている時間形式を指定できます。 例: yyyy-MM-dd HH:mm:ss。 時間形式の構文の詳細については、「Class SimpleDateFormat」をご参照ください。 一般的な時間形式の詳細については、「時間形式」をご参照ください。
エポック時間形式を指定できます。 有効な値: epoch、epochMillis、epochMacro、epochNano。
タイムゾーン
タイムフィールドのタイムゾーン。
タイムフィールド形式をエポック時間形式に設定した場合、タイムゾーンを設定する必要はありません。
デフォルトのタイムソース
時間抽出情報が提供されていない場合、または時間抽出が失敗した場合、システムは指定した時間ソースを使用します。 有効な値: Current System TimeおよびKafka Message Timestamp。
詳細設定
ログコンテキスト
ログコンテキストをオンにすると、コンテキストクエリ機能を使用できます。 ソースKafkaパーティションにインポートするデータのコンテキストを表示できます。
通信プロトコル
Kafkaクラスターへの接続に使用される通信プロトコルに関する情報。 インターネット経由でデータをインポートする場合は、Simple Log ServiceとKafkaクラスター間の接続を暗号化し、ユーザー認証を実装することをお勧めします。 次のサンプルコードに例を示します。
protocolフィールドは、plaintext、ssl、sasl_plaintext、およびsasl_sslの値をサポートします。 推奨値はsasl_sslで、接続暗号化とユーザー認証が必要です。
protocolをsasl_plaintexまたはsasl_sslに設定した場合、saslノードも設定する必要があります。 saslノードの下のmechanismフィールドは、PLAIN、SCRAM-SHA-256、およびSCRAM-SHA-512の値をサポートします。 このフィールドは、ユーザー名とパスワードの認証メカニズムを指定します。
{ "protocol":"sasl_plaintext", "sasl":{ "mechanism":"PLAIN", "username":"xxx", "password":"yyy" } }プライベートドメイン解決
ECSインスタンスにデプロイされたKafkaクラスターを使用し、クラスター内のブローカーが内部エンドポイントを介して相互に接続されている場合、ブローカーごとにECSインスタンスのエンドポイントとIPアドレスを指定する必要があります。 例:
{ "hostname#1":"192.168.XX.XX", "hostname#2":"192.168.XX.XX", "hostname#3":"192.168.XX.XX" }プレビューインポート結果をプレビューします。
結果を確認したら、をクリックします。次へ.
データをプレビューし、インデックスを設定し、[次へ] をクリックします。
デフォルトでは、Log Serviceでフルテキストインデックスが有効になっています。 手動モードまたは自動モードで収集したログに基づいてフィールドインデックスを設定することもできます。 自動モードでフィールドインデックスを設定するには、[自動インデックス生成] をクリックします。 これにより、Log Serviceは自動的にフィールドインデックスを作成します。 詳細については、「インデックスの作成」をご参照ください。
重要ログをクエリおよび分析する場合は、フルテキストインデックス作成またはフィールドインデックス作成を有効にする必要があります。 フルテキストインデックスとフィールドインデックスの両方を有効にすると、フィールドインデックスのみが使用されます。
[クエリログ] をクリックします。 クエリと分析ページで、Kafkaデータがインポートされているかどうかを確認します。
約 1 分待ちます。 必要なKafkaデータが存在する場合、データがインポートされます。
データインポート設定の表示
データインポート設定を作成した後、Simple Log Serviceコンソールで設定の詳細と関連する統計レポートを表示できます。
[プロジェクト] セクションで、データインポート設定が属するプロジェクトをクリックします。
データインポート設定が属するLogstoreを見つけてクリックし、データインポート設定の名前をクリックします。
インポート設定の概要ページで、データインポート設定の基本情報と統計レポートを表示します。
次のステップ
[インポート設定の概要] ページでは、データインポート設定に対して以下の操作を実行できます。
データインポート設定の変更
データインポート設定を変更するには、[設定の編集] をクリックします。 詳細については、「データインポート設定の作成」をご参照ください。
データインポート設定の削除
データインポート設定を削除するには、[設定の削除] をクリックします。
警告データインポート設定が削除された後、復元することはできません。
インポートジョブの停止
データインポートジョブを停止するには、[停止] をクリックします。
よくある質問
問題の説明 | 考えられる原因 | 解決策 |
プレビュー中にブローカー接続エラーが発生しました。 エラーコード: ブローカートランスポートの失敗。 |
|
|
プレビュー中にタイムアウトエラーが発生します。 エラーコード: preview request timed out. | データインポート設定で指定されているKafkaトピックにはデータが含まれていません。 | Kafkaトピックにデータが含まれていない場合は、トピックにデータを書き込み、データを再度プレビューします。 |
インポートしたデータに文字化けが存在します。 | データインポート設定で指定されたエンコード形式は、要件を満たしていません。 | Kafkaデータの実際のエンコード形式に基づいて、データインポート設定を更新します。 既存の文字化け文字を処理するには、Logstoreとデータインポート設定を作成します。 |
Simple log Service に表示されるログ時刻が、インポートされたデータの実際の時刻と異なります。 | データインポート設定で時間フィールドが指定されていないか、指定された時間形式またはタイムゾーンが無効です。 | 時間フィールドを指定するか、有効な時間形式とタイムゾーンを指定します。 詳細については、「データインポート設定の作成」をご参照ください。 |
データのインポート後に、データをクエリまたは分析できません。 |
|
|
インポートされたデータエントリの数が想定を下回っています。 | 一部のKafkaメッセージのサイズが3 MBを超えています。 Kafkaメッセージのサイズは、Data Processing Insightダッシュボードで確認できます。 | 各Kafkaメッセージのサイズが3 MBを超えないようにしてください。 |
インポート中に大きなレイテンシが存在します。 |
|
|
エラー処理
項目 | 説明 |
ネットワーク接続エラーが発生しました。 | インポートジョブは定期的にリトライされます。 ネットワーク接続が復元された後、インポートジョブは、以前のデータインポート中断のオフセットからデータを消費し続けます。 |
Kafkaトピックは存在しません。 | インポートするデータを含むKafkaトピックが存在しない場合、インポートジョブはトピックをスキップします。 これは、他の通常のトピックのデータインポートには影響しません。 トピックが再作成されると、インポートジョブはトピック内のデータを期待どおりに消費し、約10分の遅延が発生します。 |