在将Kafka数据导入到表格存储的过程中可能产生错误,如果您不希望导致Sink Task立即失败,您可以配置错误处理策略。

可能产生的错误类型如下:
  • Kafka Connect Error

    此类错误发生在Sink Task执行数据导入前,例如使用Converter进行反序列化或者使用Kafka Transformations对消息记录进行轻量级修改时产生错误,您可以配置由Kafka提供的错误处理选项。

    如果您想要跳过此类错误,请在connector配置文件中配置属性errors.tolerance=all。更多信息,请参见Kafka Connect Configs

  • Tablestore Sink Task Error

    此类错误发生在Sink Task执行数据导入时,例如解析消息记录或者写入Tablestore时产生错误,您可以配置由Tablestore Sink Connector提供的错误处理选项。

    如果您想要跳过此类错误,请在connector配置文件中配置属性errors.tolerance=all。更多信息,请参见配置说明。同时,您还可以选择错误报告的方式,如果需要将产生错误的消息记录保存到Tablestore中独立的一张数据表中,请进行如下配置:
    runtime.error.tolerance=all
    runtime.error.mode=tablestore
    runtime.error.table.name=error
    在distributed模式下,您也可以通过REST API来管理connector和task。如果发生错误导致connector或者task停止,您可以选择手动重启connector或者task。
    1. 检查connector和task状态。
      • 查看connector状态。
        curl http://localhost:8083/connectors/{name}/status
      • 查看task状态。
        curl http://localhost:8083/connectors/{name}/tasks/{taskid}/status

      其中http://localhost:8083/connectors为Kafka REST服务的地址,name必须与配置文件中的name(连接器名称)相同,taskid包含在connector状态信息中。

      您还可以通过执行以下命令获取taskid。
      curl http://localhost:8083/connectors/{name}/tasks
    2. 手动重启connector或者task。
      • 重启connector。
        curl -X POST http://localhost:8083/connectors/{name}/restart
      • 重启task。
        curl -X POST http://localhost:8083/connectors/{name}/tasks/{taskId}/restart