Kafka データを Tablestore テーブルにインポートするときに、エラーが発生する場合があります。Tablestore Sink Connector を使用するデータインポートタスクがエラー発生時にすぐに失敗しないようにするには、エラー処理ポリシーを構成できます。このトピックでは、Kafka Connect エラーと Tablestore Sink タスクエラーのエラー処理方法について説明します。
Kafka Connect エラー
このエラーは、Tablestore Sink Connector を使用するデータインポートタスクが実行される前に発生します。たとえば、デシリアライゼーションに Converter を使用する場合、または Kafka Transformations を使用してメッセージレコードに軽量の変更を実行する場合に、このエラーが発生する可能性があります。このエラーが発生した場合は、Kafka が提供するエラー処理オプションを構成できます。
このエラーをスキップするには、Kafka Connect 構成ファイル で errors.tolerance=all を指定します。Kafka Connect 設定ファイル
Tablestore Sink タスクエラー
このエラーは、Tablestore Sink Connector を使用するデータインポートタスクの進行中に発生します。たとえば、メッセージレコードの解析中、またはメッセージレコードの Tablestore への書き込み中に、このエラーが発生する可能性があります。このエラーが発生した場合は、Tablestore Sink Connector が提供するエラー処理オプションを構成できます。
このエラーをスキップするには、コネクタ構成ファイルで errors.tolerance=all を指定します。詳細については、「構成の説明」をご参照ください。また、エラーを報告するために使用されるメソッドを指定することもできます。エラーが報告されたメッセージレコードを Tablestore の別のデータテーブルに保存する場合は、次のパラメーターを構成します。
runtime.error.tolerance=all
runtime.error.mode=tablestore
runtime.error.table.name=error分散モードでは、REST API を使用してコネクタとタスクを管理できます。エラーが発生したためにコネクタまたはタスクが停止した場合は、コネクタまたはタスクを手動で再起動できます。
コネクタとタスクのステータスを確認します。
コネクタステータスの確認:
curl http://localhost:8083/connectors/{name}/statusタスクステータスの確認:
curl http://localhost:8083/connectors/{name}/tasks/{taskid}/status
上記のコマンドでは、
http://localhost:8083/connectorsは Kafka REST サービスのアドレス、name の値は構成ファイルのコネクタ名と同じである必要があり、taskid はコネクタステータス情報に含まれています。次のコマンドを実行して、taskid 値を取得できます。
curl http://localhost:8083/connectors/{name}/tasksコネクタまたはタスクを手動で再起動します。
コネクタの再起動:
curl -X POST http://localhost:8083/connectors/{name}/restartタスクの再起動:
curl -X POST http://localhost:8083/connectors/{name}/tasks/{taskId}/restart