すべてのプロダクト
Search
ドキュメントセンター

Tablestore:構成の説明

最終更新日:Apr 11, 2025

Tablestore Sink Connector を起動する前に、キーと値のペアを指定して Kafka Connect プロセスにパラメーターを渡す必要があります。このトピックでは、Tablestore Sink Connector を構成する方法を示す構成例とパラメーターの説明を提供します。

構成例

構成項目は、Kafka から Tablestore のデータテーブルまたは時系列テーブルにデータを同期するかどうかによって異なります。構成ファイルの構成例は、動作モードによって異なります。このセクションでは、Kafka から Tablestore のデータテーブルへのデータ同期の構成方法の例を示します。Tablestore の時系列テーブルにデータを同期するには、Kafka から Tablestore の時系列テーブルへのデータ同期に固有の構成項目を追加する必要があります。

スタンドアロンモード

次のサンプルコードは、スタンドアロンモードの Tablestore Sink Connector 用の .properties 形式の構成ファイルを構成する方法の例を示しています。

# コネクタ名を指定します。
name=tablestore-sink
# コネクタクラスを指定します。
connector.class=TableStoreSinkConnector
# タスクの最大数を指定します。
tasks.max=1
# データのエクスポート元の Kafka トピックのリストを指定します。
topics=test
# 次の Tablestore 接続パラメーターの値を指定します。
# Tablestore インスタンスのエンドポイント。
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore インスタンスの名前。
tablestore.instance.name=xxx

# 次のデータマッピングパラメーターを指定します。
# Kafka メッセージレコードの解析に使用するパーサーを指定します。
# Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# エクスポート元のトピックのプレースホルダーとして文字列内で <topic> を使用できる、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
# topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。
# たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
table.name.format=<topic>
# Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン (:) で区切ります。複数のマッピングを指定する場合は、カンマ (,) で区切ります。
# マッピングが指定されていない場合は、table.name.format の構成が使用されます。
# topics.assign.tables=test:test_kafka

# プライマリキーモードを指定します。有効な値: kafka、record_key、record_value。デフォルト値: kafka。
# kafka: <connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
# record_key: レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
# record_value: レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
primarykey.mode=kafka

# 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
# プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
# <tablename> はデータテーブル名のプレースホルダーです。
# プライマリキーモードが kafka の場合は、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とプライマリキー列のデフォルトのデータ型 {string, integer} が使用されます。
# プライマリキーモードが record_key または record_value の場合は、プライマリキー列の名前とデータ型を指定する必要があります。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
# デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
# 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
# <tablename> はデータテーブル名のプレースホルダーです。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
# 書き込みモードを指定します。有効な値: put と update。デフォルト値: put。
# put: 宛先テーブルのデータは Kafka メッセージレコードによって上書きされます。
# update: 宛先テーブルのデータは Kafka メッセージレコードによって更新されます。
insert.mode=put
# データの読み取り順序でデータを書き込むかどうかを指定します。デフォルト値: true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
insert.order.enable=true
# 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。
auto.create=false

# 削除モードを指定します。有効な値: none、row、column、row_and_column。デフォルト値: none。
# none: 削除操作は実行できません。
# row: 行を削除できます。
# column: 属性列を削除できます。
# row_and_column: 行と属性列を削除できます。
delete.mode=none

# データをデータテーブルに書き込むときに、メモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。
buffer.size=1024
# データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
# max.thread.count=
# データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値: 10。
max.concurrency=10
# データの書き込み先となるバケットの数を指定します。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
bucket.count=3
# データをデータテーブルに書き込むときに、バッファキューをリフレッシュする間隔を指定します。単位: ミリ秒。デフォルト値: 10000。
flush.Interval=10000

# ダーティデータの処理方法を指定します。
# Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生することがあります。次の 2 つのパラメーターを指定して、エラーの修正方法を決定できます。
# フォールトトレランス機能を指定します。有効な値: none と all。デフォルト値: none。
# none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
# all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
runtime.error.tolerance=none
# ダーティデータのログ記録方法を指定します。有効な値: ignore、kafka、tablestore。デフォルト値: ignore。
# ignore: すべてのエラーが無視されます。
# kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに格納されます。
# tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、別の Tablestore データテーブルに格納されます。
runtime.error.mode=ignore

# runtime.error.mode を kafka に設定した場合は、Kafka クラスタアドレスとトピックを指定する必要があります。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# runtime.error.mode を tablestore に設定した場合は、Tablestore データテーブルの名前を指定する必要があります。
# runtime.error.table.name=errors

分散モード

次のサンプルコードは、分散モードの Tablestore Sink Connector 用の .json 形式の構成ファイルを構成する方法の例を示しています。

{
  "name": "tablestore-sink",
  "config": {
    // コネクタクラスを指定します。
    "connector.class":"TableStoreSinkConnector",
    // タスクの最大数を指定します。
    "tasks.max":"3",
    // データのエクスポート元の Kafka トピックのリストを指定します。
    "topics":"test",
    // 次の Tablestore 接続パラメーターの値を指定します。
    // Tablestore インスタンスのエンドポイント。
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Tablestore インスタンスの名前。
    "tablestore.instance.name":"xxx",

    // 次のデータマッピングパラメーターを指定します。
    // Kafka メッセージレコードの解析に使用するパーサーを指定します。
    // Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // エクスポート元のトピックのプレースホルダーとして文字列内で <topic> を使用できる、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
    // topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。
    // たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
    "table.name.format":"<topic>",
    // Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン (:) で区切ります。複数のマッピングを指定する場合は、カンマ (,) で区切ります。
    // マッピングが指定されていない場合は、table.name.format の構成が使用されます。
    // "topics.assign.tables":"test:test_kafka",

    // プライマリキーモードを指定します。有効な値: kafka、record_key、record_value。デフォルト値: kafka。
    // kafka: <connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
    // record_key: レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
    // record_value: レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
    "primarykey.mode":"kafka",

    // 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
    // プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
    // <tablename> はデータテーブル名のプレースホルダーです。
    // プライマリキーモードが kafka の場合は、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とプライマリキー列のデフォルトのデータ型 {string, integer} が使用されます。
    // プライマリキーモードが record_key または record_value の場合は、プライマリキー列の名前とデータ型を指定する必要があります。
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
    // デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
    // 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
    // <tablename> はデータテーブル名のプレースホルダーです。
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
    // 書き込みモードを指定します。有効な値: put と update。デフォルト値: put。
    // put: テーブル内のデータは Kafka メッセージレコードによって上書きされます。
    // update: テーブル内のデータは Kafka メッセージレコードによって更新されます。
    "insert.mode":"put",
    // データの読み取り順序でデータを書き込むかどうかを指定します。デフォルト値: true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
    "insert.order.enable":"true",
    // 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。
    "auto.create":"false",

    // 削除モードを指定します。有効な値: none、row、column、row_and_column。デフォルト値: none。
    // none: 削除操作は実行できません。
    // row: 行を削除できます。
    // column: 属性列を削除できます。
    // row_and_column: 行と属性列を削除できます。
    "delete.mode":"none",

    // データをデータテーブルに書き込むときに、メモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。
    "buffer.size":"1024",
    // データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
    // "max.thread.count":
    // データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値: 10。
    "max.concurrency":"10",
    // データの書き込み先となるバケットの数を指定します。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
    "bucket.count":"3",
    // データをデータテーブルに書き込むときに、バッファキューをリフレッシュする間隔を指定します。単位: ミリ秒。デフォルト値: 10000。
    "flush.Interval":"10000",

    // ダーティデータの処理方法を指定します。
    // Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生することがあります。次の 2 つのパラメーターを指定して、エラーの修正方法を決定できます。
    // フォールトトレランス機能を指定します。有効な値: none と all。デフォルト値: none。
    // none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
    // all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
    "runtime.error.tolerance":"none",
    // ダーティデータのログ記録方法を指定します。有効な値: ignore、kafka、tablestore。デフォルト値: ignore。
    // ignore: すべてのエラーが無視されます。
    // kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに格納されます。
    // tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、別の Tablestore データテーブルに格納されます。
    "runtime.error.mode":"ignore"

    // runtime.error.mode を kafka に設定した場合は、Kafka クラスタアドレスとトピックを指定する必要があります。
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // runtime.error.mode を tablestore に設定した場合は、Tablestore データテーブルの名前を指定する必要があります。
    // "runtime.error.table.name":"errors",
  }
}

パラメーター

次の表に、構成ファイルのパラメーターを示します。Kafka から Tablestore の時系列テーブルにデータを同期する場合にのみ、時系列関連のパラメーターを構成する必要があります。

Kafka Connect パラメーター

パラメーター

タイプ

必須

説明

name

string

はい

tablestore-sink

コネクタの名前。コネクタ名は一意である必要があります。

connector.class

class

はい

TableStoreSinkConnector

コネクタの Java クラス。

コネクタを使用する場合は、connector.class を使用してコネクタクラスを指定します。 connector.class は、コネクタクラスの完全名またはエイリアスに設定できます。コネクタクラスの完全名は com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector で、コネクタクラスのエイリアスは TableStoreSinkConnector です。例:

connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector

tasks.max

integer

はい

3

コネクタに対して作成できるタスクの最大数。

タスクの最大数が作成に失敗した場合、作成されるタスクの数が少なくなる可能性があります。

key.converter

string

いいえ

org.apache.kafka.connect.json.JsonConverter

ワーカー構成ファイルで指定されたデフォルトのキーコンバーターを置き換えるために使用されるキーコンバーター。

value.converter

string

いいえ

org.apache.kafka.connect.json.JsonConverter

ワーカー構成ファイルで指定されたデフォルトの値コンバーターを置き換えるために使用される値コンバーター。

topics

list

はい

test

コネクタに指定できる Kafka トピックのリスト。複数の Kafka トピックはカンマ (,) で区切ります。

コネクタに指定されたトピックを管理するには、topics を指定する必要があります。

コネクタ接続パラメーター

パラメーター

タイプ

必須

説明

tablestore.endpoint

string

はい

https://xxx.xxx.ots.aliyuncs.com

Tablestore インスタンスのエンドポイント。詳細については、「エンドポイント」をご参照ください。

tablestore.mode

string

はい

timeseries

宛先テーブルのタイプ。デフォルト値: normal。有効な値:

  • normal: Tablestore のデータテーブル。

  • timeseries: Tablestore の時系列テーブル。

tablestore.access.key.id

string

はい

LTAn********************

アカウントの AccessKey ID と AccessKey シークレット。 AccessKey ID と AccessKey シークレットの取得方法の詳細については、「AccessKey ペアを作成する」をご参照ください。

tablestore.access.key.secret

string

はい

zbnK**************************

tablestore.auth.mode

string

はい

aksk

認証モード。デフォルト値: aksk。有効な値:

  • aksk: Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを使用して認証します。このトピックでは、tablestore.auth.mode は aksk に設定されています。

  • sts: Security Token Service (STS) から取得した一時的なアクセス認証情報を使用して認証します。 Tablestore が Message Queue for Apache Kafka に接続されている場合は、tablestore.auth.mode を sts に設定します。これはデフォルト値です。

tablestore.instance.name

string

はい

myotstest

Tablestore インスタンスの名前。

コネクタのデータマッピングパラメーター

パラメーター

タイプ

必須

説明

event.parse.class

class

はい

DefaultEventParser

EventParser の Java クラス。デフォルト値: DefaultEventParser。パーサーは Kafka メッセージレコードを解析して、データテーブルのプライマリキー列と属性列を取得します。

重要

Tablestore では、列値のサイズに制限があります。 string 型または binary 型のプライマリキー列の値は 1 KB を超えることはできず、属性列の値は 2 MB を超えることはできません。詳細については、「制限」をご参照ください。

データ型の変換後に列値が制限を超えた場合、Kafka メッセージレコードはダーティデータとして処理されます。

DefaultEventParser を使用するには、Kafka メッセージレコードのキーまたは値が Kafka Connect の Struct クラスまたは Map クラスである必要があります。 Struct で選択されたフィールドは、Tablestore Sink Connector でサポートされているデータ型である必要があります。フィールドは、データ型マッピングテーブルに基づいて Tablestore データ型のデータに変換され、データテーブルに書き込まれます。 Map の値のデータ型は、Tablestore Sink Connector でサポートされているデータ型である必要があります。 Tablestore Sink Connector は、Struct と Map で同じデータ型をサポートしています。 Map の値は binary 型のデータに変換され、データテーブルに書き込まれます。

Kafka メッセージレコードのデータ型が Tablestore Sink Connector と互換性がない場合は、com.aliyun.tablestore.kafka.connect.parsers.EventParser で定義されている操作を呼び出してパーサーを構成できます。

table.name.format

string

いいえ

kafka_<topic>

宛先 Tablestore データテーブルの名前のフォーマット文字列。デフォルト値: <topic>。 <topic> は、データのエクスポート元のトピックのプレースホルダーとして文字列内で使用できます。たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。

topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。

topics.assign.tables

list

はい

test:destTable

<topic_1>:<tablename_1>,<topic_2>:<tablename_2> 形式で、トピックと宛先 Tablestore テーブル間のマッピングを指定します。複数のマッピングはカンマ (,) で区切ります。たとえば、test:destTable は、test という名前のトピックからのメッセージレコードが destTable という名前のデータテーブルに書き込まれることを指定します。

topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。

primarykey.mode

string

いいえ

kafka

データテーブルのプライマリキーモード。有効な値:

  • kafka: <connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。 Kafka トピック <connect_topic> とパーティション <connect_partition> はアンダースコア (_) で区切られ、<connect_offset> はパーティション内のメッセージレコードのオフセットを指定します。

  • record_key: レコードキーの Struct クラスのフィールドまたは Map クラスのキーがデータテーブルのプライマリキーとして使用されます。

  • record_value: レコード値の Struct クラスのフィールドまたは Map クラスのキーがデータテーブルのプライマリキーとして使用されます。

このパラメーターは、tablestore.<tablename>.primarykey.name および tablestore.<tablename>.primarykey.type と一緒に構成します。このパラメーターの値の大文字と小文字は区別されません。

tablestore.<tablename>.primarykey.name

list

いいえ

A,B

データテーブルのプライマリキー列名。 <tablename> はデータテーブル名のプレースホルダーです。このパラメーターの値には、カンマ (,) で区切られた 1 ~ 4 つのプライマリキー列名が含まれています。

プライマリキー列名は、プライマリキーモードによって異なります。

  • プライマリキーモードが kafka に設定されている場合、このパラメーターのデフォルト値は topic_partition,offset です。 kafka プライマリキーモードでは、プライマリキー列名を指定する必要はありません。プライマリキー列名が指定されている場合でも、デフォルトのプライマリキー列名が優先されます。

  • プライマリキーモードが record_key に設定されている場合、指定されたプライマリキー列名と同じ名前を持つ Struct クラスのフィールドまたは Map クラスのキーが、レコードキーからデータテーブルのプライマリキーとして抽出されます。 record_key プライマリキーモードでは、プライマリキー列名を指定する必要があります。

  • プライマリキーモードが record_value に設定されている場合、指定されたプライマリキー列名と同じ名前を持つ Struct クラスのフィールドまたは Map クラスのキーが、レコード値からデータテーブルのプライマリキーとして抽出されます。 record_value プライマリキーモードでは、プライマリキー列名を指定する必要があります。

Tablestore データテーブルのプライマリキー列は順次です。 tablestore.<tablename>.primarykey.name を定義するときは、プライマリキー列の順序に注意する必要があります。たとえば、PRIMARY KEY (A, B, C) と PRIMARY KEY (A, C, B) はスキーマが異なります。

tablestore.<tablename>.primarykey.type

list

いいえ

string, integer

データテーブルのプライマリキー列のデータ型。 <tablename> はデータテーブル名のプレースホルダーです。このパラメーターの値には、プライマリキー列の 1 ~ 4 つのデータ型が含まれています。プライマリキー列のデータ型はカンマ (,) で区切ります。プライマリキー列のデータ型の順序は、tablestore.<tablename>.primarykey.name で指定されたプライマリキー列名の順序に対応している必要があります。このパラメーターの値の大文字と小文字は区別されません。有効な値: integer、string、binary、auto_increment。

プライマリキー列のデータ型は、プライマリキーモードによって異なります。

  • プライマリキーモードが kafka に設定されている場合、このパラメーターのデフォルト値は string, integer です。

    kafka プライマリキーモードでは、プライマリキー列のデータ型を指定する必要はありません。プライマリキー列のデータ型が指定されている場合でも、デフォルトのプライマリキー列のデータ型が優先されます。

  • プライマリキーモードが record_key または record_value に設定されている場合は、プライマリキー列のデータ型を指定する必要があります。

    指定されたプライマリキー列のデータ型が Kafka スキーマで定義されたデータ型と競合する場合、解析エラーが発生します。この場合、ランタイムエラーパラメーターを構成してエラーを修正できます。

    このパラメーターが auto_increment に設定されている場合、データがデータテーブルに書き込まれるときに、Kafka メッセージレコードのフィールドが自動採番プライマリキー列としてデータテーブルに挿入されます。

tablestore.<tablename>.columns.whitelist.name

list

いいえ

A,B

属性列ホワイトリストの属性列名。 <tablename> はデータテーブル名のプレースホルダーです。属性列名はカンマ (,) で区切ります。

このパラメーターを構成しない場合、レコード値の Struct クラスのすべてのフィールドまたは Map クラスのすべてのキーがデータテーブルの属性列として使用されます。このパラメーターを構成すると、レコード値のフィールドは、指定された属性列ホワイトリストに基づいてフィルタリングされ、必要な属性列が取得されます。

tablestore.<tablename>.columns.whitelist.type

list

いいえ

string, integer

属性列ホワイトリストの属性列のデータ型。 <tablename> はデータテーブル名のプレースホルダーです。属性列のデータ型はカンマ (,) で区切ります。属性列のデータ型の順序は、tablestore.<tablename>.columns.whitelist.name で指定された属性列名の順序に対応している必要があります。このパラメーターの値の大文字と小文字は区別されません。有効な値: integer、string、binary、boolean、double。

コネクタ書き込みパラメーター

パラメーター

タイプ

必須

説明

insert.mode

string

いいえ

put

書き込みモード。デフォルト値: put。有効な値:

  • put: 既存のデータは、テーブルに書き込むデータ行によって上書きされます。この値は、Tablestore の PutRow 操作に対応します。

  • update: データ行を更新すると、属性列が行に追加されるか、既存の属性列の値が更新されます。この値は、Tablestore の UpdateRow 操作に対応します。

このパラメーターの値の大文字と小文字は区別されません。

insert.order.enable

boolean

いいえ

true

データがデータテーブルに読み取られた順序で書き込まれるかどうかを指定します。デフォルト値: true。有効な値:

  • true: Kafka メッセージレコードは、メッセージレコードが読み取られた順序でデータテーブルに書き込まれます。

  • false: Kafka メッセージレコードは、特定の順序なしでデータテーブルに書き込まれます。これにより、書き込みパフォーマンスが向上します。

auto.create

boolean

いいえ

false

宛先テーブルを自動的に作成するかどうかを指定します。データテーブルまたは時系列テーブルを自動的に作成できます。デフォルト値: false。有効な値:

  • true: システムは宛先 Tablestore テーブルを自動的に作成します。

  • false: システムは宛先 Tablestore テーブルを自動的に作成しません。

delete.mode

string

いいえ

none

削除モード。このパラメーターの構成は、データがデータテーブルに同期され、プライマリキーモードが record_key に設定されている場合にのみ有効になります。デフォルト値: none。有効な値:

  • none: 削除操作は実行できません。

  • row: 行を削除できます。レコード値が空の場合、対応する行が削除されます。

  • column: 属性列を削除できます。レコード値の Struct クラスのフィールド値または Map クラスのキー値が空の場合、対応する属性列が削除されます。

  • row_and_column: 行と属性列を削除できます。

このパラメーターの値の大文字と小文字は区別されません。

このパラメーターは、insert.mode パラメーターの値に基づいて指定されます。詳細については、「付録: 削除構文」をご参照ください。

buffer.size

integer

いいえ

1024

データがデータテーブルに書き込まれるときに、メモリ内のバッファキューに含めることができる行の最大数。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。

max.thread.count

integer

いいえ

3

データがデータテーブルに書き込まれるときに使用されるコールバックスレッドの数。デフォルト値 = vCPU の数 + 1

max.concurrency

integer

いいえ

10

データがデータテーブルに書き込まれるために送信できる同時書き込みリクエストの最大数。デフォルト値: 10。

bucket.count

integer

いいえ

3

データの書き込み先となるバケットの数。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。

flush.Interval

integer

いいえ

10000

データがデータテーブルに書き込まれるときに、バッファキューがリフレッシュされる間隔。単位: ミリ秒。デフォルト値: 10000。

コネクタランタイムエラーパラメーター

パラメーター

タイプ

必須

説明

runtime.error.tolerance

string

いいえ

none

Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが発生した場合に使用されるエラー処理ポリシー。デフォルト値: none。有効な値:

  • none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。

  • all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。

このパラメーターの値の大文字と小文字は区別されません。

runtime.error.mode

string

いいえ

ignore

Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが報告されたメッセージレコードの処理方法を指定します。デフォルト値: ignore。有効な値:

  • ignore: すべてのエラーが無視されます。

  • kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに格納されます。この場合、runtime.error.bootstrap.servers と runtime.error.topic.name を指定する必要があります。新しいトピックのエラーが報告された Kafka メッセージレコードのキーと値は、データのエクスポート元のトピックのメッセージレコードのキーと値と同じです。 ErrorInfo フィールドはヘッダーに含まれており、エラーメッセージをログに記録します。

  • tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、別の Tablestore データテーブルに格納されます。この場合、runtime.error.table.name を指定する必要があります。エラーが報告されたメッセージレコードとエラーメッセージをログに記録するために使用されるデータテーブルのプライマリキー列は、topic_partition (string 型) と offset (integer 型) です。データテーブルの属性列は、key (bytes 型)、value (bytes 型)、error_info (string 型) です。

runtime.error.mode が kafka に設定されている場合は、Kafka メッセージレコードのヘッダー、キー、値をシリアル化する必要があります。 runtime.error.mode が tablestore に設定されている場合は、Kafka メッセージレコードのキーと値をシリアル化する必要があります。デフォルトでは、org.apache.kafka.connect.json.JsonConverter がデータとスキーマのシリアル化に使用され、schemas.enable は true に設定されています。 JsonConverter を使用してデータを逆シリアル化し、元のデータを取得できます。 Converter の詳細については、「Kafka Converter」をご参照ください。

runtime.error.bootstrap.servers

string

いいえ

localhost:9092

エラーが報告されたメッセージレコードとエラーメッセージが格納される Kafka クラスタのアドレス。

runtime.error.topic.name

string

いいえ

errors

エラーが報告されたメッセージレコードとエラーメッセージを格納する Kafka トピックの名前。

runtime.error.table.name

string

いいえ

errors

エラーが報告されたメッセージレコードとエラーメッセージを格納する Tablestore テーブルの名前。

時系列関連パラメーター

パラメーター

タイプ

必須

説明

tablestore.timeseries.<tablename>.measurement

string

はい

mName

JSON 形式のデータの指定されたキーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれることを指定します。

tablestore.timeseries.<tablename>.measurement が <topic> に設定されている場合、Kafka メッセージレコードの topic キーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれます。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。たとえば、時系列テーブルの名前が test の場合、パラメーター名は tablestore.timeseries.test.measurement です。

tablestore.timeseries.<tablename>.dataSource

string

はい

ds

JSON 形式のデータの ds キーに対応する値が、_data_source フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.<tablename>.tags

list

はい

region,level

JSON 形式のデータの region キーと level キーに対応する値が、tags フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.<tablename>.time

string

はい

timestamp

JSON 形式のデータの timestamp キーに対応する値が、_time フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.<tablename>.time.unit

string

はい

MILLISECONDS

tablestore.timeseries.<tablename>.time パラメーターの値の単位。有効な値: SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.<tablename>.field.name

list

いいえ

cpu,io

JSON 形式のデータの cpu キーと io キーが、_field_name の名前として時系列テーブルに書き込まれ、JSON 形式のデータの cpu キーと io キーに対応する値が、_field_name の値として時系列テーブルに書き込まれることを指定します。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.<tablename>.field.type

string

いいえ

double,integer

tablestore.timeseries.<tablename>.field.name で指定されたフィールドのデータ型。有効な値: double、integer、string、binary、boolean。複数のデータ型はカンマ (,) で区切ります。

パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。

tablestore.timeseries.mapAll

boolean

いいえ

false

JSON 形式のデータのプライマリキーフィールドと時間フィールド以外のフィールドが、フィールドとして時系列テーブルに書き込まれるかどうかを指定します。

tablestore.timeseries.mapAll が false に設定されている場合は、tablestore.timeseries.<tablename>.field.name パラメーターと tablestore.timeseries.<tablename>.field.type パラメーターを構成する必要があります。

tablestore.timeseries.toLowerCase

boolean

いいえ

true

フィールドのキーが時系列テーブルに書き込まれる前に小文字に変換されるかどうかを指定します。フィールドのキーは、プライマリキーフィールドまたは時間フィールド以外のキー、または tablestore.timeseries.<tablename>.field.name で指定されたキーです。

tablestore.timeseries.rowsPerBatch

integer

いいえ

50

1 つのリクエストで Tablestore に書き込むことができる行の最大数。最大値とデフォルト値は 200 です。

付録: Kafka と Tablestore 間のデータ型マッピング

次の表に、Kafka と Tablestore のデータ型間のマッピングを示します。

Kafka スキーマタイプ

Tablestore データ型

STRING

STRING

INT8、INT16、INT32、INT64

INTEGER

FLOAT32、FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

付録: 削除構文

説明

この機能は、Kafka から Tablestore のデータテーブルにデータを同期する場合にのみサポートされます。

次の表に、メッセージレコードに空の値が含まれており、Kafka から Tablestore のデータテーブルにデータが同期されている場合の、書き込みモード (insert.mode) と削除モード (delete.mode) の構成に基づいて Tablestore データテーブルにデータを書き込むために使用されるメソッドを示します。

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

空の値

上書き

行の削除

上書き

行の削除

ダーティデータ

行の削除

ダーティデータ

行の削除

値のすべてのフィールドが空

上書き

上書き

上書き

上書き

ダーティデータ

ダーティデータ

列の削除

列の削除

値の一部のフィールドが空

上書き

上書き

上書き

上書き

空の値を無視

空の値を無視

列の削除

列の削除