Tablestore Sink Connector を起動する前に、キーと値のペアを指定して Kafka Connect プロセスにパラメーターを渡す必要があります。このトピックでは、Tablestore Sink Connector を構成する方法を示す構成例とパラメーターの説明を提供します。
構成例
構成項目は、Kafka から Tablestore のデータテーブルまたは時系列テーブルにデータを同期するかどうかによって異なります。構成ファイルの構成例は、動作モードによって異なります。このセクションでは、Kafka から Tablestore のデータテーブルへのデータ同期の構成方法の例を示します。Tablestore の時系列テーブルにデータを同期するには、Kafka から Tablestore の時系列テーブルへのデータ同期に固有の構成項目を追加する必要があります。
スタンドアロンモード
次のサンプルコードは、スタンドアロンモードの Tablestore Sink Connector 用の .properties 形式の構成ファイルを構成する方法の例を示しています。
# コネクタ名を指定します。
name=tablestore-sink
# コネクタクラスを指定します。
connector.class=TableStoreSinkConnector
# タスクの最大数を指定します。
tasks.max=1
# データのエクスポート元の Kafka トピックのリストを指定します。
topics=test
# 次の Tablestore 接続パラメーターの値を指定します。
# Tablestore インスタンスのエンドポイント。
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore インスタンスの名前。
tablestore.instance.name=xxx
# 次のデータマッピングパラメーターを指定します。
# Kafka メッセージレコードの解析に使用するパーサーを指定します。
# Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
# エクスポート元のトピックのプレースホルダーとして文字列内で <topic> を使用できる、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
# topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。
# たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
table.name.format=<topic>
# Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン (:) で区切ります。複数のマッピングを指定する場合は、カンマ (,) で区切ります。
# マッピングが指定されていない場合は、table.name.format の構成が使用されます。
# topics.assign.tables=test:test_kafka
# プライマリキーモードを指定します。有効な値: kafka、record_key、record_value。デフォルト値: kafka。
# kafka: <connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
# record_key: レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
# record_value: レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
primarykey.mode=kafka
# 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
# プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
# <tablename> はデータテーブル名のプレースホルダーです。
# プライマリキーモードが kafka の場合は、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とプライマリキー列のデフォルトのデータ型 {string, integer} が使用されます。
# プライマリキーモードが record_key または record_value の場合は、プライマリキー列の名前とデータ型を指定する必要があります。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer
# 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
# デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
# 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
# <tablename> はデータテーブル名のプレースホルダーです。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer
# Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
# 書き込みモードを指定します。有効な値: put と update。デフォルト値: put。
# put: 宛先テーブルのデータは Kafka メッセージレコードによって上書きされます。
# update: 宛先テーブルのデータは Kafka メッセージレコードによって更新されます。
insert.mode=put
# データの読み取り順序でデータを書き込むかどうかを指定します。デフォルト値: true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
insert.order.enable=true
# 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。
auto.create=false
# 削除モードを指定します。有効な値: none、row、column、row_and_column。デフォルト値: none。
# none: 削除操作は実行できません。
# row: 行を削除できます。
# column: 属性列を削除できます。
# row_and_column: 行と属性列を削除できます。
delete.mode=none
# データをデータテーブルに書き込むときに、メモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。
buffer.size=1024
# データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
# max.thread.count=
# データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値: 10。
max.concurrency=10
# データの書き込み先となるバケットの数を指定します。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
bucket.count=3
# データをデータテーブルに書き込むときに、バッファキューをリフレッシュする間隔を指定します。単位: ミリ秒。デフォルト値: 10000。
flush.Interval=10000
# ダーティデータの処理方法を指定します。
# Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生することがあります。次の 2 つのパラメーターを指定して、エラーの修正方法を決定できます。
# フォールトトレランス機能を指定します。有効な値: none と all。デフォルト値: none。
# none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
# all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
runtime.error.tolerance=none
# ダーティデータのログ記録方法を指定します。有効な値: ignore、kafka、tablestore。デフォルト値: ignore。
# ignore: すべてのエラーが無視されます。
# kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに格納されます。
# tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、別の Tablestore データテーブルに格納されます。
runtime.error.mode=ignore
# runtime.error.mode を kafka に設定した場合は、Kafka クラスタアドレスとトピックを指定する必要があります。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors
# runtime.error.mode を tablestore に設定した場合は、Tablestore データテーブルの名前を指定する必要があります。
# runtime.error.table.name=errors
分散モード
次のサンプルコードは、分散モードの Tablestore Sink Connector 用の .json 形式の構成ファイルを構成する方法の例を示しています。
{
"name": "tablestore-sink",
"config": {
// コネクタクラスを指定します。
"connector.class":"TableStoreSinkConnector",
// タスクの最大数を指定します。
"tasks.max":"3",
// データのエクスポート元の Kafka トピックのリストを指定します。
"topics":"test",
// 次の Tablestore 接続パラメーターの値を指定します。
// Tablestore インスタンスのエンドポイント。
"tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
// AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
"tablestore.access.key.id":"xxx",
"tablestore.access.key.secret":"xxx",
// Tablestore インスタンスの名前。
"tablestore.instance.name":"xxx",
// 次のデータマッピングパラメーターを指定します。
// Kafka メッセージレコードの解析に使用するパーサーを指定します。
// Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
"event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
// エクスポート元のトピックのプレースホルダーとして文字列内で <topic> を使用できる、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
// topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。
// たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
"table.name.format":"<topic>",
// Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン (:) で区切ります。複数のマッピングを指定する場合は、カンマ (,) で区切ります。
// マッピングが指定されていない場合は、table.name.format の構成が使用されます。
// "topics.assign.tables":"test:test_kafka",
// プライマリキーモードを指定します。有効な値: kafka、record_key、record_value。デフォルト値: kafka。
// kafka: <connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
// record_key: レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
// record_value: レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
"primarykey.mode":"kafka",
// 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
// プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
// <tablename> はデータテーブル名のプレースホルダーです。
// プライマリキーモードが kafka の場合は、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とプライマリキー列のデフォルトのデータ型 {string, integer} が使用されます。
// プライマリキーモードが record_key または record_value の場合は、プライマリキー列の名前とデータ型を指定する必要があります。
// "tablestore.test.primarykey.name":"A,B",
// "tablestore.test.primarykey.type":"string,integer",
// 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
// デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
// 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
// <tablename> はデータテーブル名のプレースホルダーです。
// "tablestore.test.columns.whitelist.name":"A,B",
// "tablestore.test.columns.whitelist.type":"string,integer",
// Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
// 書き込みモードを指定します。有効な値: put と update。デフォルト値: put。
// put: テーブル内のデータは Kafka メッセージレコードによって上書きされます。
// update: テーブル内のデータは Kafka メッセージレコードによって更新されます。
"insert.mode":"put",
// データの読み取り順序でデータを書き込むかどうかを指定します。デフォルト値: true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
"insert.order.enable":"true",
// 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。
"auto.create":"false",
// 削除モードを指定します。有効な値: none、row、column、row_and_column。デフォルト値: none。
// none: 削除操作は実行できません。
// row: 行を削除できます。
// column: 属性列を削除できます。
// row_and_column: 行と属性列を削除できます。
"delete.mode":"none",
// データをデータテーブルに書き込むときに、メモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。
"buffer.size":"1024",
// データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
// "max.thread.count":
// データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値: 10。
"max.concurrency":"10",
// データの書き込み先となるバケットの数を指定します。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
"bucket.count":"3",
// データをデータテーブルに書き込むときに、バッファキューをリフレッシュする間隔を指定します。単位: ミリ秒。デフォルト値: 10000。
"flush.Interval":"10000",
// ダーティデータの処理方法を指定します。
// Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生することがあります。次の 2 つのパラメーターを指定して、エラーの修正方法を決定できます。
// フォールトトレランス機能を指定します。有効な値: none と all。デフォルト値: none。
// none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
// all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
"runtime.error.tolerance":"none",
// ダーティデータのログ記録方法を指定します。有効な値: ignore、kafka、tablestore。デフォルト値: ignore。
// ignore: すべてのエラーが無視されます。
// kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに格納されます。
// tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、別の Tablestore データテーブルに格納されます。
"runtime.error.mode":"ignore"
// runtime.error.mode を kafka に設定した場合は、Kafka クラスタアドレスとトピックを指定する必要があります。
// "runtime.error.bootstrap.servers":"localhost:9092",
// "runtime.error.topic.name":"errors",
// runtime.error.mode を tablestore に設定した場合は、Tablestore データテーブルの名前を指定する必要があります。
// "runtime.error.table.name":"errors",
}
}
パラメーター
次の表に、構成ファイルのパラメーターを示します。Kafka から Tablestore の時系列テーブルにデータを同期する場合にのみ、時系列関連のパラメーターを構成する必要があります。
Kafka Connect パラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
name | string | はい | tablestore-sink | コネクタの名前。コネクタ名は一意である必要があります。 |
connector.class | class | はい | TableStoreSinkConnector | コネクタの Java クラス。 コネクタを使用する場合は、connector.class を使用してコネクタクラスを指定します。 connector.class は、コネクタクラスの完全名またはエイリアスに設定できます。コネクタクラスの完全名は com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector で、コネクタクラスのエイリアスは TableStoreSinkConnector です。例:
|
tasks.max | integer | はい | 3 | コネクタに対して作成できるタスクの最大数。 タスクの最大数が作成に失敗した場合、作成されるタスクの数が少なくなる可能性があります。 |
key.converter | string | いいえ | org.apache.kafka.connect.json.JsonConverter | ワーカー構成ファイルで指定されたデフォルトのキーコンバーターを置き換えるために使用されるキーコンバーター。 |
value.converter | string | いいえ | org.apache.kafka.connect.json.JsonConverter | ワーカー構成ファイルで指定されたデフォルトの値コンバーターを置き換えるために使用される値コンバーター。 |
topics | list | はい | test | コネクタに指定できる Kafka トピックのリスト。複数の Kafka トピックはカンマ (,) で区切ります。 コネクタに指定されたトピックを管理するには、topics を指定する必要があります。 |
コネクタ接続パラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
tablestore.endpoint | string | はい | https://xxx.xxx.ots.aliyuncs.com | Tablestore インスタンスのエンドポイント。詳細については、「エンドポイント」をご参照ください。 |
tablestore.mode | string | はい | timeseries | 宛先テーブルのタイプ。デフォルト値: normal。有効な値:
|
tablestore.access.key.id | string | はい | LTAn******************** | アカウントの AccessKey ID と AccessKey シークレット。 AccessKey ID と AccessKey シークレットの取得方法の詳細については、「AccessKey ペアを作成する」をご参照ください。 |
tablestore.access.key.secret | string | はい | zbnK************************** | |
tablestore.auth.mode | string | はい | aksk | 認証モード。デフォルト値: aksk。有効な値:
|
tablestore.instance.name | string | はい | myotstest | Tablestore インスタンスの名前。 |
コネクタのデータマッピングパラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
event.parse.class | class | はい | DefaultEventParser | EventParser の Java クラス。デフォルト値: DefaultEventParser。パーサーは Kafka メッセージレコードを解析して、データテーブルのプライマリキー列と属性列を取得します。 重要 Tablestore では、列値のサイズに制限があります。 string 型または binary 型のプライマリキー列の値は 1 KB を超えることはできず、属性列の値は 2 MB を超えることはできません。詳細については、「制限」をご参照ください。 データ型の変換後に列値が制限を超えた場合、Kafka メッセージレコードはダーティデータとして処理されます。 DefaultEventParser を使用するには、Kafka メッセージレコードのキーまたは値が Kafka Connect の Struct クラスまたは Map クラスである必要があります。 Struct で選択されたフィールドは、Tablestore Sink Connector でサポートされているデータ型である必要があります。フィールドは、データ型マッピングテーブルに基づいて Tablestore データ型のデータに変換され、データテーブルに書き込まれます。 Map の値のデータ型は、Tablestore Sink Connector でサポートされているデータ型である必要があります。 Tablestore Sink Connector は、Struct と Map で同じデータ型をサポートしています。 Map の値は binary 型のデータに変換され、データテーブルに書き込まれます。 Kafka メッセージレコードのデータ型が Tablestore Sink Connector と互換性がない場合は、com.aliyun.tablestore.kafka.connect.parsers.EventParser で定義されている操作を呼び出してパーサーを構成できます。 |
table.name.format | string | いいえ | kafka_<topic> | 宛先 Tablestore データテーブルの名前のフォーマット文字列。デフォルト値: <topic>。 <topic> は、データのエクスポート元のトピックのプレースホルダーとして文字列内で使用できます。たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。 topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。 |
topics.assign.tables | list | はい | test:destTable |
topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合は、table.name.format の構成は無視されます。 |
primarykey.mode | string | いいえ | kafka | データテーブルのプライマリキーモード。有効な値:
このパラメーターは、tablestore.<tablename>.primarykey.name および tablestore.<tablename>.primarykey.type と一緒に構成します。このパラメーターの値の大文字と小文字は区別されません。 |
tablestore.<tablename>.primarykey.name | list | いいえ | A,B | データテーブルのプライマリキー列名。 <tablename> はデータテーブル名のプレースホルダーです。このパラメーターの値には、カンマ (,) で区切られた 1 ~ 4 つのプライマリキー列名が含まれています。 プライマリキー列名は、プライマリキーモードによって異なります。
Tablestore データテーブルのプライマリキー列は順次です。 tablestore.<tablename>.primarykey.name を定義するときは、プライマリキー列の順序に注意する必要があります。たとえば、PRIMARY KEY (A, B, C) と PRIMARY KEY (A, C, B) はスキーマが異なります。 |
tablestore.<tablename>.primarykey.type | list | いいえ | string, integer | データテーブルのプライマリキー列のデータ型。 <tablename> はデータテーブル名のプレースホルダーです。このパラメーターの値には、プライマリキー列の 1 ~ 4 つのデータ型が含まれています。プライマリキー列のデータ型はカンマ (,) で区切ります。プライマリキー列のデータ型の順序は、tablestore.<tablename>.primarykey.name で指定されたプライマリキー列名の順序に対応している必要があります。このパラメーターの値の大文字と小文字は区別されません。有効な値: integer、string、binary、auto_increment。 プライマリキー列のデータ型は、プライマリキーモードによって異なります。
|
tablestore.<tablename>.columns.whitelist.name | list | いいえ | A,B | 属性列ホワイトリストの属性列名。 <tablename> はデータテーブル名のプレースホルダーです。属性列名はカンマ (,) で区切ります。 このパラメーターを構成しない場合、レコード値の Struct クラスのすべてのフィールドまたは Map クラスのすべてのキーがデータテーブルの属性列として使用されます。このパラメーターを構成すると、レコード値のフィールドは、指定された属性列ホワイトリストに基づいてフィルタリングされ、必要な属性列が取得されます。 |
tablestore.<tablename>.columns.whitelist.type | list | いいえ | string, integer | 属性列ホワイトリストの属性列のデータ型。 |
コネクタ書き込みパラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
insert.mode | string | いいえ | put | 書き込みモード。デフォルト値: put。有効な値:
このパラメーターの値の大文字と小文字は区別されません。 |
insert.order.enable | boolean | いいえ | true | データがデータテーブルに読み取られた順序で書き込まれるかどうかを指定します。デフォルト値: true。有効な値:
|
auto.create | boolean | いいえ | false | 宛先テーブルを自動的に作成するかどうかを指定します。データテーブルまたは時系列テーブルを自動的に作成できます。デフォルト値: false。有効な値:
|
delete.mode | string | いいえ | none | 削除モード。このパラメーターの構成は、データがデータテーブルに同期され、プライマリキーモードが record_key に設定されている場合にのみ有効になります。デフォルト値: none。有効な値:
このパラメーターの値の大文字と小文字は区別されません。 このパラメーターは、insert.mode パラメーターの値に基づいて指定されます。詳細については、「付録: 削除構文」をご参照ください。 |
buffer.size | integer | いいえ | 1024 | データがデータテーブルに書き込まれるときに、メモリ内のバッファキューに含めることができる行の最大数。デフォルト値: 1024。このパラメーターの値は 2 の指数である必要があります。 |
max.thread.count | integer | いいえ | 3 | データがデータテーブルに書き込まれるときに使用されるコールバックスレッドの数。デフォルト値 = |
max.concurrency | integer | いいえ | 10 | データがデータテーブルに書き込まれるために送信できる同時書き込みリクエストの最大数。デフォルト値: 10。 |
bucket.count | integer | いいえ | 3 | データの書き込み先となるバケットの数。デフォルト値: 3。このパラメーターの値を大きくすると、同時書き込み機能を向上させることができます。ただし、このパラメーターの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。 |
flush.Interval | integer | いいえ | 10000 | データがデータテーブルに書き込まれるときに、バッファキューがリフレッシュされる間隔。単位: ミリ秒。デフォルト値: 10000。 |
コネクタランタイムエラーパラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
runtime.error.tolerance | string | いいえ | none | Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが発生した場合に使用されるエラー処理ポリシー。デフォルト値: none。有効な値:
このパラメーターの値の大文字と小文字は区別されません。 |
runtime.error.mode | string | いいえ | ignore | Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが報告されたメッセージレコードの処理方法を指定します。デフォルト値: ignore。有効な値:
runtime.error.mode が kafka に設定されている場合は、Kafka メッセージレコードのヘッダー、キー、値をシリアル化する必要があります。 runtime.error.mode が tablestore に設定されている場合は、Kafka メッセージレコードのキーと値をシリアル化する必要があります。デフォルトでは、org.apache.kafka.connect.json.JsonConverter がデータとスキーマのシリアル化に使用され、schemas.enable は true に設定されています。 JsonConverter を使用してデータを逆シリアル化し、元のデータを取得できます。 Converter の詳細については、「Kafka Converter」をご参照ください。 |
runtime.error.bootstrap.servers | string | いいえ | localhost:9092 | エラーが報告されたメッセージレコードとエラーメッセージが格納される Kafka クラスタのアドレス。 |
runtime.error.topic.name | string | いいえ | errors | エラーが報告されたメッセージレコードとエラーメッセージを格納する Kafka トピックの名前。 |
runtime.error.table.name | string | いいえ | errors | エラーが報告されたメッセージレコードとエラーメッセージを格納する Tablestore テーブルの名前。 |
時系列関連パラメーター
パラメーター | タイプ | 必須 | 例 | 説明 |
tablestore.timeseries.<tablename>.measurement | string | はい | mName | JSON 形式のデータの指定されたキーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれることを指定します。 tablestore.timeseries.<tablename>.measurement が <topic> に設定されている場合、Kafka メッセージレコードの topic キーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれます。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。たとえば、時系列テーブルの名前が test の場合、パラメーター名は tablestore.timeseries.test.measurement です。 |
tablestore.timeseries.<tablename>.dataSource | string | はい | ds | JSON 形式のデータの ds キーに対応する値が、_data_source フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.<tablename>.tags | list | はい | region,level | JSON 形式のデータの region キーと level キーに対応する値が、tags フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.<tablename>.time | string | はい | timestamp | JSON 形式のデータの timestamp キーに対応する値が、_time フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.<tablename>.time.unit | string | はい | MILLISECONDS | tablestore.timeseries.<tablename>.time パラメーターの値の単位。有効な値: SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.<tablename>.field.name | list | いいえ | cpu,io | JSON 形式のデータの cpu キーと io キーが、_field_name の名前として時系列テーブルに書き込まれ、JSON 形式のデータの cpu キーと io キーに対応する値が、_field_name の値として時系列テーブルに書き込まれることを指定します。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.<tablename>.field.type | string | いいえ | double,integer | tablestore.timeseries.<tablename>.field.name で指定されたフィールドのデータ型。有効な値: double、integer、string、binary、boolean。複数のデータ型はカンマ (,) で区切ります。 パラメーター内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメーター名を変更します。 |
tablestore.timeseries.mapAll | boolean | いいえ | false | JSON 形式のデータのプライマリキーフィールドと時間フィールド以外のフィールドが、フィールドとして時系列テーブルに書き込まれるかどうかを指定します。 tablestore.timeseries.mapAll が false に設定されている場合は、tablestore.timeseries.<tablename>.field.name パラメーターと tablestore.timeseries.<tablename>.field.type パラメーターを構成する必要があります。 |
tablestore.timeseries.toLowerCase | boolean | いいえ | true | フィールドのキーが時系列テーブルに書き込まれる前に小文字に変換されるかどうかを指定します。フィールドのキーは、プライマリキーフィールドまたは時間フィールド以外のキー、または tablestore.timeseries.<tablename>.field.name で指定されたキーです。 |
tablestore.timeseries.rowsPerBatch | integer | いいえ | 50 | 1 つのリクエストで Tablestore に書き込むことができる行の最大数。最大値とデフォルト値は 200 です。 |
付録: Kafka と Tablestore 間のデータ型マッピング
次の表に、Kafka と Tablestore のデータ型間のマッピングを示します。
Kafka スキーマタイプ | Tablestore データ型 |
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
付録: 削除構文
この機能は、Kafka から Tablestore のデータテーブルにデータを同期する場合にのみサポートされます。
次の表に、メッセージレコードに空の値が含まれており、Kafka から Tablestore のデータテーブルにデータが同期されている場合の、書き込みモード (insert.mode) と削除モード (delete.mode) の構成に基づいて Tablestore データテーブルにデータを書き込むために使用されるメソッドを示します。
insert.mode | put | update | ||||||
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
空の値 | 上書き | 行の削除 | 上書き | 行の削除 | ダーティデータ | 行の削除 | ダーティデータ | 行の削除 |
値のすべてのフィールドが空 | 上書き | 上書き | 上書き | 上書き | ダーティデータ | ダーティデータ | 列の削除 | 列の削除 |
値の一部のフィールドが空 | 上書き | 上書き | 上書き | 上書き | 空の値を無視 | 空の値を無視 | 列の削除 | 列の削除 |