Kafka データソースは、Kafka からのデータ読み取りおよび Kafka へのデータ書き込みのための双方向チャネルを提供します。本トピックでは、DataWorks が Kafka に対して提供するデータ同期機能について説明します。
サポートされるバージョン
DataWorks は、Alibaba Cloud Kafka および 0.10.2 ~ 3.6.x のセルフマネージド Kafka をサポートしています。
Kafka のバージョンが 0.10.2 より前の場合、データ同期はサポートされません。これは、これらのバージョンではパーティションオフセットの取得がサポートされておらず、またデータ構造がタイムスタンプをサポートしていないためです。
リアルタイム読み取り
-
サブスクリプション Serverless リソースグループ を使用する場合、タスク失敗を防ぐため、事前に必要な仕様を推定する必要があります。
トピックごとに 1 CU を推定します。さらに、スループットに基づいてリソースを推定します。
非圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 1 CU を推定します。
圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 2 CU を推定します。
JSON 解析を要する圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 3 CU を推定します。
サブスクリプション Serverless リソースグループまたは旧バージョンのデータ統合専用リソースグループを使用する場合:
フェールオーバーに対するワークロードの許容度が高い場合は、クラスタースロット使用率が 80 % を超えないようにしてください。
フェールオーバーに対するワークロードの許容度が低い場合は、クラスタースロット使用率が 70 % を超えないようにしてください。
実際のリソース使用量は、データの内容や形式などの要因によって異なります。初期評価後に実際の使用状況に応じてリソースを調整できます。
制限事項
Kafka データソースは、Serverless リソースグループ(推奨) および データ統合専用リソースグループの旧バージョン をサポートしています。
単一テーブルからのオフライン読み取り
parameter.groupId および parameter.kafkaConfig.group.id の両方が設定されている場合、parameter.groupId が group.id よりも優先されます(kafkaConfig パラメーター内)。
単一テーブルへのリアルタイム書き込み
書き込み操作では重複排除はサポートされません。オフセットリセットまたはフェールオーバー後にタスクが再開された場合、重複したデータが書き込まれる可能性があります。
データベース全体へのリアルタイム書き込み
リアルタイムデータ同期タスクは、Serverless リソースグループ(推奨) および データ統合専用リソースグループの旧バージョン をサポートしています。
ソーステーブルにプライマリキーがある場合、そのプライマリキー値が Kafka レコードのキーとして使用されます。これにより、同一のプライマリキーに対する変更が順序通りに同一 Kafka パーティションに書き込まれることを保証します。
ソーステーブルにプライマリキーがない場合、以下の 2 つの選択肢があります。プライマリキーなしのテーブル同期を選択した場合、Kafka レコードのキーは空になります。テーブル変更を Kafka に順序通りに書き込むには、送信先 Kafka トピックのパーティション数を 1 にする必要があります。カスタムプライマリキーを選択した場合、1 つ以上の非プライマリキー列の組み合わせが Kafka レコードのキーとして使用されます。
-
以下の設定を Kafka データソースの拡張パラメーターに追加します。これにより、クラスターで例外が発生した場合でも、同一プライマリキーを持つレコードの変更が順序通りに同一パーティションに書き込まれることを保証します。
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}重要この設定により、同期性能が大幅に低下します。厳密な順序性および信頼性の要件と性能のバランスを取ってください。
Kafka へのリアルタイム同期におけるメッセージ全体のフォーマット、ハートビートメッセージのフォーマット、およびソースデータ変更に対応するメッセージのフォーマットについては、「付録:メッセージフォーマット」をご参照ください。
サポートされるフィールド型
Kafka は非構造化データストレージを提供します。Kafka レコードには通常、以下のフィールドが含まれます:key(キー)、value(値)、offset(オフセット)、timestamp(タイムスタンプ)、headers(ヘッダー)、partition(パーティション)。DataWorks が Kafka からデータを読み取る場合および Kafka にデータを書き込む場合の処理方法を以下に示します。
データ読み取り
DataWorks が Kafka からデータを読み取る場合、JSON 形式のデータを解析できます。以下の表に、各データモジュールの処理方法を示します。
Kafka レコードのデータモジュール | 処理後のデータ型 |
key | keyType 設定項目に依存します(データ同期タスク内)。keyType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。 |
value | valueType 設定項目に依存します(データ同期タスク内)。valueType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。 |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
データ書き込み
DataWorks が Kafka にデータを書き込む場合、JSON 形式またはテキスト形式での書き込みをサポートします。処理ポリシーは、データ同期タスクの種類によって異なり、以下の表に詳細を示します。
テキスト形式でデータを書き込む場合、フィールド名は含まれません。フィールド値は区切り文字で区切られます。
-
リアルタイム同期タスクの場合、DataWorks はデータベース変更メッセージ、業務日時、およびデータ定義言語(DDL)情報を含む組み込み JSON 形式を使用します。メッセージフォーマットの詳細については、「付録:メッセージフォーマット」をご参照ください。
同期タスクの種類 | Kafka に書き込まれる value のフォーマット | ソースフィールド型 | 書き込み操作の処理方法 |
オフライン同期 DataStudio のオフライン同期ノード | json | String | UTF-8 エンコード済み文字列 |
Boolean | "true" または "false" の UTF-8 エンコード済み文字列に変換されます | ||
Time/Date | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列 | ||
Numeric | UTF-8 エンコード済み数値文字列 | ||
Byte stream | バイトストリームは UTF-8 エンコード済み文字列として扱われ、文字列に変換されます。 | ||
text | String | UTF-8 エンコード済み文字列 | |
Boolean | "true" または "false" の UTF-8 エンコード済み文字列に変換されます | ||
Time/Date | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列 | ||
Numeric | UTF-8 エンコード済み数値文字列 | ||
Byte stream | バイトストリームは UTF-8 エンコード済み文字列として扱われ、文字列に変換されます。 | ||
リアルタイム同期:Kafka へのリアルタイム ETL DataStudio のリアルタイム同期ノード | json | String | UTF-8 エンコード済み文字列 |
Boolean | JSON ブール型 | ||
Time/Date |
| ||
Numeric | JSON 数値型 | ||
Byte stream | バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。 | ||
text | String | UTF-8 エンコード済み文字列 | |
Boolean | "true" または "false" の UTF-8 エンコード済み文字列に変換されます | ||
Time/Date | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列 | ||
Numeric | UTF-8 エンコード済み数値文字列 | ||
Byte stream | バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。 | ||
リアルタイム同期: データベース全体の Kafka へのリアルタイム同期 増分データのみのリアルタイム同期 | 組み込み JSON 形式 | String | UTF-8 エンコード済み文字列 |
Boolean | JSON ブール型 | ||
Time/Date | 13 桁のミリ秒タイムスタンプ | ||
Numeric | JSON 数値 | ||
Byte stream | バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。 | ||
同期ソリューション:Kafka へのワンクリックリアルタイム同期 フルオフライン同期 + 増分リアルタイム同期 | 組み込み JSON 形式 | String | UTF-8 エンコード済み文字列 |
Boolean | JSON ブール型 | ||
Time/Date | 13 桁のミリ秒タイムスタンプ | ||
Numeric | JSON 数値 | ||
Byte stream | バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。 |
データソースの追加
DataWorks で同期タスクを開発する前に、データソース管理 の手順に従い、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールで パラメーターの説明を表示することで、各パラメーターの意味を確認できます。
データ同期タスクの開発
同期タスクの設定入口および設定手順については、以下の設定ガイドをご参照ください。
単一テーブル向けオフライン同期タスクの設定
手順の詳細については、「コードレス UI によるオフライン同期タスクの設定」および「コードエディタによるオフライン同期タスクの設定」をご参照ください。
コードエディタ向けの全パラメーター一覧およびスクリプトデモについては、「付録:スクリプトデモおよびパラメーター説明」をご参照ください。
単一テーブルまたはデータベース全体向けリアルタイム同期タスクの設定
手順については、「単一テーブル向けリアルタイム同期タスクの設定」および「データベース全体向けリアルタイム同期タスクの設定」をご参照ください。
認証設定
SSL
Kafka データソースの 特別な認証方式 を SSL または SASL_SSL に設定すると、Kafka クラスターで SSL 認証が有効になります。クライアントの truststore 証明書ファイルをアップロードし、truststore パスフレーズを入力する必要があります。
Kafka クラスターが Alibaba Cloud Kafka インスタンスである場合、「SSL 証明書アルゴリズムのスペックアップ手順」に従って、正しい truststore 証明書ファイルをダウンロードしてください。truststore パスフレーズは KafkaOnsClient です。
Kafka クラスターが EMR インスタンスである場合、「Kafka 接続への SSL 暗号化の適用」に従って、正しい truststore 証明書ファイルをダウンロードし、truststore パスフレーズを取得してください。
セルフマネージドクラスターの場合は、正しい truststore 証明書をアップロードし、正しい truststore パスフレーズを入力する必要があります。
keystore 証明書ファイル、keystore パスフレーズ、および SSL パスフレーズは、Kafka クラスターで双方向 SSL 認証が有効な場合のみ必要です。Kafka クラスターのサーバーは、クライアントの ID を認証するためにこれらを使用します。双方向 SSL 認証は、Kafka クラスターの server.properties ファイル内で ssl.client.auth=required が設定されている場合に有効になります。詳細については、「Kafka 接続への SSL 暗号化の適用」をご参照ください。
GSSAPI
Kafka データソースの設定時に Sasl Mechanism を GSSAPI に設定した場合、以下の 3 つの認証ファイルをアップロードする必要があります: JAAS 設定ファイル、Kerberos 設定ファイル、および Keytab ファイル。また、データ統合専用リソースグループの DNS/HOST 設定も行う必要があります。以下に、これらのファイルおよび DNS/HOST 設定の詳細を説明します。
Serverless リソースグループ の場合、内部 DNS 名前解決を使用してホストアドレス情報を設定する必要があります。詳細については、「内部 DNS 名前解決(PrivateZone)」をご参照ください。
-
JAAS 設定ファイル
JAAS ファイルは KafkaClient で始まり、その後にすべての設定項目を波括弧 {} で囲む必要があります。
波括弧内の最初の行は、使用するログインコンポーネントクラスを定義します。異なる SASL 認証方式では、ログインコンポーネントクラスは固定されています。以降の各設定項目は、key=value 形式で記述します。
-
最後の設定項目以外は、セミコロンで終了してはいけません。
-
最後の設定項目はセミコロンで終了する必要があります。また、閉じ波括弧
}の後にもセミコロンを追加する必要があります。
上記の形式要件を満たさない場合、JAAS 設定ファイルは解析できません。以下のコードは、典型的な JAAS 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };設定項目
説明
ログインモジュール
com.sun.security.auth.module.Krb5LoginModule に設定する必要があります。
useKeyTab
true に設定する必要があります。
keyTab
任意のパスを指定できます。同期タスク実行時に、データソース設定中にアップロードした keytab ファイルが自動的にローカルパスにダウンロードされます。システムは、このローカルパスを keyTab 設定項目の値として使用します。
storeKey
クライアントがキーを保存するかどうかを指定します。true または false を設定できます。データ同期には影響しません。
serviceName
Kafka サーバーの server.properties 設定ファイル内の sasl.kerberos.service.name 設定項目に対応します。必要に応じて設定してください。
principal
Kafka クライアントで使用される Kerberos プリンシパルです。アップロードした keytab ファイルに、このプリンシパルのキーが含まれていることを確認してください。
-
Kerberos 設定ファイル
Kerberos 設定ファイルには、[libdefaults] および [realms] の 2 つのモジュールが含まれている必要があります。
[libdefaults] モジュールでは、Kerberos 認証パラメーターを指定します。モジュール内の各設定項目は、key=value 形式で記述します。
-
[realms] セクションでは、KDC アドレスを指定します。複数の realm 定義を含めることができます。各 realm 定義は realm name= で始まります。
その後、波括弧で囲まれた一連の設定項目が続きます。各設定項目も key=value 形式で記述します。以下のコードは、典型的な Kerberos 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }設定項目
説明
[libdefaults].default_realm
Kafka クラスターノードへのアクセス時に使用されるデフォルト realm です。通常、JAAS 設定ファイルで指定されたクライアントプリンシパルの realm と同じです。
その他の [libdefaults] パラメーター
[libdefaults] モジュールでは、ticket_lifetime などのその他の Kerberos 認証パラメーターを指定できます。必要に応じて設定してください。
[realms].realm name
JAAS 設定ファイルで指定されたクライアントプリンシパルの realm および [libdefaults].default_realm と同じである必要があります。JAAS 設定ファイル内のクライアントプリンシパルの realm と [libdefaults].default_realm が異なる場合、2 つの realms サブモジュールを含める必要があります。それぞれが、JAAS 設定ファイル内のクライアントプリンシパルの realm および [libdefaults].default_realm に対応します。
[realms].realm name.kdc
ip:port 形式で KDC アドレスおよびポートを指定します(例:kdc=10.0.0.1:88)。ポートを省略した場合、デフォルトポート 88 が使用されます(例:kdc=10.0.0.1)。
-
Keytab ファイル
Keytab ファイルには、JAAS 設定ファイルで指定されたプリンシパルのキーが含まれており、KDC で検証可能である必要があります。たとえば、現在の作業ディレクトリに client.keytab という名前のファイルがある場合、以下のコマンドを実行して、そのファイルに指定されたプリンシパルのキーが含まれていることを確認できます。
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5) -
データ統合専用リソースグループの DNS および HOST 設定
Kerberos が有効な Kafka クラスターでは、ノードのホスト名は KDC(キー配布センター)内のプリンシパルの一部です。クライアントがノードに接続する際、ローカルの DNS および HOST 設定を使用してノードのプリンシパルを推論し、KDC からアクセス認証情報を要求します。したがって、Kerberos が有効な Kafka クラスターにアクセスするには、データ統合専用リソースグループが、これらの認証情報を取得するために適切な DNS および HOST 設定を持っている必要があります。
DNS 設定
データ統合専用リソースグループがアタッチされている VPC 内で Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されている場合、DataWorks コンソールで、データ統合専用リソースグループの VPC アタッチメントに IP アドレス 100.100.2.136 および 100.100.2.138 のカスタムルートを追加できます。これにより、Kafka クラスターノードの PrivateZone 名前解決設定がデータ統合専用リソースグループに適用されます。

-
HOST 設定
データ統合専用リソースグループがアタッチされている VPC で PrivateZone インスタンスが名前解決に使用されていない場合、各 Kafka ノードの IP アドレスとホスト名を手動でマッピングする必要があります。DataWorks コンソールで、データ統合専用リソースグループのネットワーク設定内の Host 設定セクションに、これらのマッピングを追加します。

PLAIN
Kafka データソースを設定する際に、Sasl Mechanism を PLAIN に設定した場合、JAAS ファイルは KafkaClient で始まり、その後にすべての設定項目を波括弧 {} で囲む必要があります。
波括弧内の最初の行は、使用するログインコンポーネントクラスを定義します。異なる SASL 認証方式では、ログインコンポーネントクラスは固定されています。以降の各設定項目は、key=value 形式で記述します。
-
最後の設定項目以外は、セミコロンで終了してはいけません。
最後の設定項目はセミコロンで終了する必要があります。また、閉じ波括弧 "}" の後にもセミコロンを追加する必要があります。
上記の形式要件を満たさない場合、JAAS 設定ファイルは解析できません。以下のコードは、典型的な JAAS 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};
設定項目 | 説明 |
ログインモジュール | org.apache.kafka.common.security.plain.PlainLoginModul に設定する必要があります。 |
username | ユーザー名です。必要に応じて設定してください。 |
password | パスワードです。必要に応じて設定してください。 |
よくある質問
付録:スクリプトデモおよびパラメーター説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式要件に従って、関連するパラメーターをスクリプト内に設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに関連する必須パラメーターを説明します。
リーダースクリプトデモ
以下は、Kafka からデータを読み取るための JSON 設定例です。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,//throttle が false の場合、mbps パラメーターは無効となり、レート制限が行われません。throttle が true の場合、レートが制限されます。
"concurrent": 1,//並行スレッド数。
"mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
}
}
}リーダースクリプトパラメーター
パラメーター | 説明 | 必須 |
datasource | データソースの名称です。コードエディタではデータソースの追加が可能です。このパラメーターの値は、追加したデータソースの名称と一致している必要があります。 | はい |
server | Kafka ブローカーサーバーのアドレスを ip:port 形式で指定します。 1 つの server のみを設定できますが、DataWorks が Kafka クラスター内のすべてのブローカーの IP アドレスに接続できることを確認する必要があります。 | はい |
topic | Kafka トピックです。トピックとは、Kafka が処理するメッセージフィードの集約です。 | はい |
column |
Kafka から読み取るデータです。定数列、データ列、属性列がサポートされます。
|
はい |
keyType |
Kafka キーの型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 |
いいえ |
valueType |
Kafka 値の型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 |
いいえ |
beginDateTime | データ消費の開始オフセットです。これは、期間の左境界(包含)です。yyyymmddhhmmss 形式の時刻文字列です。スケジューリングパラメーター と併用できます。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。 説明 この機能は Kafka 0.10.2 以降でサポートされています。 | このパラメーターまたは beginOffset のいずれかを指定する必要があります。 説明 beginDateTime および endDateTime は併用されます。 |
endDateTime | データ消費の終了オフセットです。これは、期間の右境界(除外)です。yyyymmddhhmmss 形式の時刻文字列です。スケジューリングパラメーター と併用できます。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。 説明 この機能は Kafka 0.10.2 以降でサポートされています。 | このパラメーターまたは endOffset のいずれかを指定する必要があります。 説明 endDateTime および beginDateTime は併用されます。 |
beginOffset |
データ消費の開始オフセットです。以下の形式で設定できます。
|
このパラメーターまたは beginDateTime のいずれかを指定する必要があります。 |
endOffset | データ消費の終了オフセットです。データ消費タスクの終了タイミングを制御するために使用されます。 | このパラメーターまたは endDateTime のいずれかを指定する必要があります。 |
skipExceedRecord |
Kafka は
|
いいえ。デフォルト値は false です。 |
partition | Kafka トピックには複数のパーティション(partition)があります。デフォルトでは、データ同期タスクはトピック内のすべてのパーティションをカバーするオフセット範囲からデータを読み取ります。ただし、partition を指定することで、単一パーティションのオフセット範囲からのみデータを読み取ることもできます。 | いいえ。デフォルト値はありません。 |
kafkaConfig | KafkaConsumer クライアントを作成する際に、bootstrap.servers、auto.commit.interval.ms、session.timeout.ms などの拡張パラメーターを指定できます。kafkaConfig を使用することで、KafkaConsumer の消費動作を制御できます。 | いいえ |
encoding | keyType または valueType が STRING に設定されている場合、このパラメーターで指定されたエンコーディングを使用して文字列を解析します。 | いいえ。デフォルト値は UTF-8 です。 |
waitTIme | コンシューマーオブジェクトが Kafka からデータを 1 回の試行で取得する際の最大待機時間(秒)です。 | いいえ。デフォルト値は 60 です。 |
stopWhenPollEmpty | 有効な値は true および false です。このパラメーターを true に設定し、コンシューマーが Kafka から空のデータを取得した場合(通常はトピック内のすべてのデータが読み取られた場合、またはネットワークまたは Kafka クラスターの可用性の問題による場合)、タスクは直ちに停止します。それ以外の場合、再度データが取得されるまで再試行します。 | いいえ。デフォルト値は true です。 |
stopWhenReachEndOffset |
このパラメーターは、stopWhenPollEmpty が true の場合にのみ有効です。有効な値は true および false です。
|
いいえ。デフォルト値は false です。 説明
このパラメーターは下位互換性のために提供されています。Kafka のバージョンが 0.10.2 より前の場合、Kafka トピックのすべてのパーティションの最新 |
以下の表に、kafkaConfig パラメーターを示します。
パラメーター | 説明 |
fetch.min.bytes |
コンシューマーがブローカーから取得できるデータの最小量(バイト単位)です。十分な量のデータが利用可能になった場合にのみ、データがコンシューマーに返されます。 |
fetch.max.wait.ms |
ブローカーがデータを返すまでの最大待機時間です。デフォルト値は 500 ms です。ブローカーは、fetch.min.bytes または fetch.max.wait.ms のいずれかの条件が満たされた時点でデータを返します。 |
max.partition.fetch.bytes | ブローカーがコンシューマーに各 partition から返すことができる最大バイト数を指定します。デフォルト値は 1 MB です。 |
session.timeout.ms | コンシューマーがサーバーから切断された場合にサービスを受けることができなくなるまでの時間を指定します。デフォルト値は 30 秒です。 |
auto.offset.reset | オフセットがない場合、または無効なオフセット(コンシューマーが長期間非アクティブであり、オフセットを持つレコードが期限切れになって削除された場合)で読み取る際に、コンシューマーが実行するアクションです。デフォルト値は none であり、オフセットは自動的にリセットされません。これを earliest に変更すると、コンシューマーは partition レコードを最小オフセットから読み取ります。 |
max.poll.records | poll メソッドの 1 回の呼び出しで返されるメッセージ数です。 |
key.deserializer | メッセージキーの逆シリアル化方法です(例:org.apache.kafka.common.serialization.StringDeserializer)。 |
value.deserializer | データ値の逆シリアル化方法です(例:org.apache.kafka.common.serialization.StringDeserializer)。 |
ssl.truststore.location | SSL ルート証明書のパスです。 |
ssl.truststore.password | ルート証明書ストアのパスワードです。Alibaba Cloud Kafka を使用する場合、この値を KafkaOnsClient に設定します。 |
security.protocol | アクセスプロトコルです。現在、SASL_SSL プロトコルのみがサポートされています。 |
sasl.mechanism | SASL 認証方式です。Alibaba Cloud Kafka を使用する場合、PLAIN を使用します。 |
java.security.auth.login.config | SASL 認証ファイルのパスです。 |
ライタースクリプトデモ
以下は、Kafka にデータを書き込むための JSON 設定例です。
{
"type":"job",
"version":"2.0",//バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//プラグイン名。
"parameter":{
"server": "ip:9092", //Kafka のサーバーアドレス。
"keyIndex": 0, //キーとして使用する列。キャメルケース命名規則に従う必要があります(k は小文字)。
"valueIndex": 1, //値として使用する列。現在、ソースデータから 1 列のみ選択できます。または、このパラメーターを空のままにできます。空の場合は、ソースデータ全体が使用されます。
//たとえば、ODPS テーブルの 2 番目、3 番目、4 番目の列を kafkaValue として使用する場合、元の ODPS テーブルからデータをクリーンアップおよび統合して新しい ODPS テーブルを作成し、その新しいテーブルを同期に使用します。
"keyType": "Integer", //Kafka キーの型。
"valueType": "Short", //Kafka 値の型。
"topic": "t08", //Kafka トピック。
"batchSize": 1024 //Kafka に一度に書き込まれるデータ量(バイト単位)。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//エラーとなるレコード数。
},
"speed":{
"throttle":true,//throttle が false の場合、mbps パラメーターは無効となり、レート制限が行われません。throttle が true の場合、レートが制限されます。
"concurrent":1, //同時ジョブ数。
"mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}ライタースクリプトパラメーター
パラメーター | 説明 | 必須 |
datasource | データソースの名称です。コードエディタではデータソースの追加が可能です。このパラメーターの値は、追加したデータソースの名称と一致している必要があります。 | はい |
server | Kafka サーバーのアドレスを ip:port 形式で指定します。 | はい |
topic | Kafka トピックです。Kafka が処理するさまざまなメッセージフィードのカテゴリです。 Kafka クラスターに公開される各メッセージにはカテゴリがあり、これをトピックと呼びます。トピックとは、一連のメッセージの集合です。 | はい |
valueIndex | Kafka ライターで値として使用する列です。指定しない場合、デフォルトですべての列が連結されて値が形成されます。区切り文字は fieldDelimiter で指定します。 | いいえ |
writeMode | valueIndex が設定されていない場合、このパラメーターはソースレコードのすべての列を連結して Kafka レコードの値を形成する際のフォーマットを決定します。有効な値は text および JSON です。デフォルト値は text です。
たとえば、ソースレコードに a、b、c の値を持つ 3 つの列がある場合、writeMode が text に設定され、fieldDelimiter が # に設定されている場合、書き込まれる Kafka レコードの値は文字列 a#b#c になります。writeMode が JSON に設定され、column が [{"name":"col1"},{"name":"col2"},{"name":"col3"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":"c"} になります。 valueIndex が設定されている場合、このパラメーターは無効です。 | いいえ |
column |
データを書き込む先の宛先テーブルのフィールドをカンマで区切って指定します。例: valueIndex が設定されておらず、writeMode が JSON に設定されている場合、このパラメーターはソースレコードの列値の JSON 構造におけるフィールド名を定義します。例:
valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。 |
valueIndex が設定されておらず、writeMode が JSON に設定されている場合に必須です。 |
partition | Kafka トピックのパーティション番号を指定します。これは 0 以上の整数である必要があります。 | いいえ |
keyIndex | Kafka ライターでキーとして使用する列です。 keyIndex パラメーターの値は 0 以上の整数である必要があります。それ以外の場合、タスクは失敗します。 | いいえ |
keyIndexes | ソースレコードの列の序数(0 から始まる)の配列で、Kafka レコードのキーとして使用します。 たとえば、[0,1,2] を指定すると、すべての設定された列番号の値がカンマで連結されて Kafka レコードのキーが形成されます。指定しない場合、Kafka レコードのキーは null になり、データはトピックのパーティションにラウンドロビン方式で書き込まれます。このパラメーターと keyIndex のどちらか一方のみを指定できます。 | いいえ |
fieldDelimiter | writeMode が text に設定され、valueIndex が設定されていない場合、ソースレコードのすべての列がこのパラメーターで指定された列区切り文字を使用して連結され、Kafka レコードの値が形成されます。1 文字または複数文字を区切り文字として設定できます。Unicode 文字を \u0001 の形式で設定できます。エスケープ文字(\t、\n など)もサポートされます。デフォルト値は \t です。 writeMode が text に設定されていない場合、または valueIndex が設定されている場合、このパラメーターは無効です。 | いいえ |
keyType | Kafka キーの型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | はい |
valueType | Kafka 値の型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | はい |
nullKeyFormat | keyIndex または keyIndexes で指定されたソース列の値が null の場合、このパラメーターで指定された文字列に置き換えられます。設定されていない場合、置き換えは行われません。 | いいえ |
nullValueFormat |
ソース列の値が |
いいえ |
acks | Kafka プロデューサーの初期化時の acks 設定です。これは、正常な書き込みに対する確認応答方法を決定します。デフォルトでは、acks パラメーターは all に設定されています。acks の有効な値は以下のとおりです。
| いいえ |
付録:Kafka への書き込みにおけるメッセージフォーマットの定義
リアルタイム同期タスクを設定して実行すると、ソースデータベースから読み取ったデータが JSON 形式で Kafka トピックに書き込まれます。まず、指定されたソーステーブルの既存データが対応する Kafka トピックに書き込まれます。その後、タスクはリアルタイム同期を開始し、増分データを継続的にトピックに書き込みます。ソーステーブルからの増分 DDL 変更情報も JSON 形式で Kafka トピックに書き込まれます。Kafka に書き込まれたメッセージのステータスおよび変更情報を取得できます。詳細については、「付録:メッセージフォーマット」をご参照ください。
オフライン同期タスクの場合、JSON 構造内の payload.sequenceId、payload.timestamp.eventTime、および payload.timestamp.checkpointTime フィールドは -1 に設定されます。
付録: JSON フィールド型
writeMode が JSON に設定されている場合、column パラメーターの type フィールドを使用して JSON データ型を指定できます。書き込み操作中、システムはソースレコードの列値を指定された型に変換しようとします。この変換が失敗した場合、ダーティデータが生成されます。
有効な値 | 説明 |
JSON_STRING |
ソースレコードの列値を文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が整数 |
JSON_NUMBER |
ソースレコードの列値を数値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 |
JSON_BOOL | ソースレコードの列値をブール値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 |
JSON_ARRAY |
ソースレコードの列値を JSON 配列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 |
JSON_MAP |
ソースレコードの列値を JSON オブジェクトに変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 |
JSON_BASE64 |
ソースレコードの列値のバイナリコンテンツを BASE64 エンコードされた文字列に変換し、JSON フィールドに書き込みます。たとえば、列値が長さ 2 のバイト配列であり、16 進数で |
JSON_HEX |
ソースレコードの列値のバイナリコンテンツを 16 進数文字列に変換し、JSON フィールドに書き込みます。たとえば、列値が長さ 2 のバイト配列であり、16 進数で |