Kafka データソースは、Kafka との間で双方向のデータ読み書きチャネルを提供します。このトピックでは、DataWorks が Kafka に対して提供するデータ同期機能について説明します。
サポートされているバージョン
DataWorks は、Alibaba Cloud Kafka および 0.10.2 から 3.6.x までのバージョンのセルフマネージド Kafka をサポートしています。
0.10.2 より前のバージョンの Kafka では、パーティションオフセットの取得がサポートされておらず、データ構造がタイムスタンプをサポートしていない可能性があるため、データ同期はサポートされていません。
リアルタイム読み取り
サブスクリプションの Serverless リソースグループを使用する場合、リソース不足によるタスクの失敗を防ぐために、必要な仕様を事前に見積もる必要があります。
Topic ごとに 1 CU を見積もります。また、トラフィックに基づいてリソースを見積もる必要があります:
非圧縮の Kafka データの場合、10 MB/s のトラフィックごとに 1 CU を見積もります。
圧縮された Kafka データの場合、10 MB/s のトラフィックごとに 2 CU を見積もります。
JSON 解析が必要な圧縮された Kafka データの場合、10 MB/s のトラフィックごとに 3 CU を見積もります。
サブスクリプションの Serverless リソースグループまたは旧バージョンの Data Integration 専用リソースグループを使用する場合:
ご利用のワークロードのフェールオーバーに対する許容度が高い場合、クラスターのスロット使用率は 80% を超えないようにしてください。
ご利用のワークロードのフェールオーバーに対する許容度が低い場合、クラスターのスロット使用率は 70% を超えないようにしてください。
実際のリソース使用量は、データの内容や形式などの要因によって異なります。最初に見積もった後、実際の実行時の状況に基づいてリソースを調整できます。
制限事項
Kafka データソースは、Serverless リソースグループ (推奨) および 旧バージョンの Data Integration 専用リソースグループをサポートしています。
単一テーブルからのオフライン読み取り
parameter.groupId と parameter.kafkaConfig.group.id の両方が設定されている場合、parameter.groupId が kafkaConfig パラメーター内の group.id よりも優先されます。
単一テーブルへのリアルタイム書き込み
書き込み操作ではデータ重複排除はサポートされていません。オフセットリセットまたはフェールオーバー後にタスクが再起動されると、重複データが書き込まれる可能性があります。
データベース全体のリアルタイム書き込み
リアルタイムデータ同期タスクは、Serverless リソースグループ (推奨) および 旧バージョンの Data Integration 専用リソースグループをサポートしています。
ソーステーブルにプライマリキーがある場合、プライマリキーの値が Kafka レコードのキーとして使用されます。これにより、同じプライマリキーへの変更が同じ Kafka パーティションに順序通りに書き込まれることが保証されます。
ソーステーブルにプライマリキーがない場合、2 つのオプションがあります。プライマリキーなしでテーブルを同期するオプションを選択した場合、Kafka レコードのキーは空になります。テーブルの変更が順序通りに Kafka に書き込まれることを保証するには、宛先の Kafka Topic は単一のパーティションのみを持つ必要があります。カスタムプライマリキーを選択した場合、1 つ以上の非プライマリキーフィールドの組み合わせが Kafka レコードのキーとして使用されます。
Kafka クラスターが例外を返し、同じプライマリキーへの変更が同じ Kafka パーティションに順序通りに書き込まれることを保証する必要がある場合は、Kafka データソースの拡張パラメーターに次の構成を追加します。
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}重要この構成は同期性能を大幅に低下させます。厳密な順序性と信頼性の必要性と性能のバランスを取る必要があります。
リアルタイム同期で Kafka に書き込まれるメッセージの全体的な形式、ハートビートメッセージの形式、およびソースデータの変更に対応するメッセージの形式の詳細については、「付録:メッセージ形式」をご参照ください。
サポートされているフィールドタイプ
Kafka は非構造化データストレージを提供します。Kafka レコードには通常、キー、値、オフセット、タイムスタンプ、ヘッダー、パーティションなどのデータフィールドが含まれます。DataWorks が Kafka からデータを読み書きする際、DataWorks は以下のポリシーに基づいてデータを処理します。
データの読み取り
DataWorks が Kafka からデータを読み取る際、JSON 形式でデータを解析できます。次の表は、各データモジュールがどのように処理されるかを示しています。
Kafka レコードデータモジュール | 処理されるデータの型 |
key | データ同期タスクの keyType 設定項目に依存します。keyType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。 |
value | データ同期タスクの valueType 設定項目に依存します。valueType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。 |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
データの書き込み
DataWorks が Kafka にデータを書き込む際、JSON またはテキスト形式でのデータ書き込みをサポートしています。データ処理ポリシーは、データ同期タスクの種類によって異なります。詳細は次の表をご参照ください。
テキスト形式でデータを書き込む場合、フィールド名は含まれません。フィールド値は区切り文字で区切られます。
リアルタイム同期タスクで Kafka にデータを書き込む場合、組み込み JSON 形式が使用されます。書き込まれるデータには、データベース変更メッセージ、ビジネス時間、DDL 情報など、すべての情報が含まれます。データ形式の詳細については、「付録:メッセージ形式」をご参照ください。
同期タスクの種類 | Kafka に書き込まれる value の形式 | ソースフィールドの型 | 書き込み操作の処理方法 |
オフライン同期 DataStudio のオフライン同期ノード | json | String | UTF-8 エンコード文字列 |
Boolean | UTF-8 エンコード文字列 "true" または "false" に変換 | ||
時刻/日付 | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列 | ||
数値 | UTF-8 エンコード数値文字列 | ||
バイトストリーム | バイトストリームは UTF-8 エンコード文字列として扱われ、文字列に変換されます。 | ||
text | String | UTF-8 エンコード文字列 | |
Boolean | UTF-8 エンコード文字列 "true" または "false" に変換 | ||
時刻/日付 | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列 | ||
数値 | UTF-8 エンコード数値文字列 | ||
バイトストリーム | バイトストリームは UTF-8 エンコード文字列として扱われ、文字列に変換されます。 | ||
リアルタイム同期:Kafka へのリアルタイム ETL DataStudio のリアルタイム同期ノード | json | String | UTF-8 エンコード文字列 |
Boolean | JSON ブール型 | ||
時刻/日付 |
| ||
数値 | JSON 数値型 | ||
バイトストリーム | バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。 | ||
text | String | UTF-8 エンコード文字列 | |
Boolean | UTF-8 エンコード文字列 "true" または "false" に変換 | ||
時刻/日付 | yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列 | ||
数値 | UTF-8 エンコード数値文字列 | ||
バイトストリーム | バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。 | ||
リアルタイム同期:データベース全体の Kafka へのリアルタイム同期 増分データのみのリアルタイム同期 | 組み込み JSON 形式 | String | UTF-8 エンコード文字列 |
Boolean | JSON ブール型 | ||
時刻/日付 | 13 桁のミリ秒タイムスタンプ | ||
数値 | JSON 数値 | ||
バイトストリーム | バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。 | ||
同期ソリューション:ワンクリックでの Kafka へのリアルタイム同期 フルのオフライン同期 + 増分のリアルタイム同期 | 組み込み JSON 形式 | String | UTF-8 エンコード文字列 |
Boolean | JSON ブール型 | ||
時刻/日付 | 13 桁のミリ秒タイムスタンプ | ||
数値 | JSON 数値 | ||
バイトストリーム | バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。 |
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解することができます。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定
手順の詳細については、「コードレス UI でのオフライン同期タスクの設定」および「コードエディタでのオフライン同期タスクの設定」をご参照ください。
コードエディタのパラメーターの完全なリストとスクリプトデモについては、「付録:スクリプトデモとパラメーターの説明」をご参照ください。
単一テーブルまたはデータベース全体のリアルタイム同期タスクの設定
手順の詳細については、「DataStudio でのリアルタイム同期タスクの設定」、「Data Integration でのリアルタイム同期タスクの設定」、および「データベース全体の同期タスクの設定」をご参照ください。
認証設定
SSL
Kafka データソースの [特別な認証方式] を [SSL] または [SASL_SSL] に設定すると、Kafka クラスターで SSL 認証が有効になります。クライアントトラストストア証明書ファイルをアップロードし、トラストストアパスフレーズを入力する必要があります。
Kafka クラスターが Alibaba Cloud Kafka インスタンスの場合、「SSL 証明書アルゴリズムのアップグレード手順」をご参照のうえ、正しいトラストストア証明書ファイルをダウンロードしてください。トラストストアパスフレーズは KafkaOnsClient です。
Kafka クラスターが EMR インスタンスの場合、「Kafka 接続での SSL 暗号化の使用」をご参照のうえ、正しいトラストストア証明書ファイルをダウンロードし、トラストストアパスフレーズを取得してください。
セルフマネージドクラスターの場合、正しいトラストストア証明書をアップロードし、正しいトラストストアパスフレーズを入力する必要があります。
キーストア証明書ファイル、キーストアパスフレーズ、および SSL パスフレーズは、Kafka クラスターで双方向 SSL 認証が有効になっている場合にのみ必要です。Kafka クラスターサーバーはこれを使用してクライアントの ID を認証します。双方向 SSL 認証は、Kafka クラスターの server.properties ファイルで ssl.client.auth=required が設定されている場合に有効になります。詳細については、「Kafka 接続での SSL 暗号化の使用」をご参照ください。
GSSAPI
Kafka データソースを設定する際に [Sasl メカニズム] を GSSAPI に設定した場合、[JAAS 設定ファイル]、[Kerberos 設定ファイル]、[Keytab ファイル] の 3 つの認証ファイルをアップロードする必要があります。また、専用リソースグループの [DNS/HOST] 設定も構成する必要があります。以下のセクションでは、これらのファイルと必要な DNS および HOST 設定について説明します。
Serverless リソースグループの場合、内部 DNS 解決を使用してホストアドレス情報を設定する必要があります。詳細については、「内部 DNS 解決 (PrivateZone)」をご参照ください。
JAAS 設定ファイル
JAAS ファイルは KafkaClient で始まり、その後にすべての中括弧 {} で囲まれた設定項目が続きます:
中括弧内の最初の行は、使用するログオンコンポーネントクラスを定義します。異なる SASL 認証メカニズムに対して、ログオンコンポーネントクラスは固定されています。後続の各設定項目は key=value 形式で記述されます。
最後の設定項目を除くすべての設定項目は、セミコロンで終わってはいけません。
最後の設定項目はセミコロンで終わる必要があります。また、閉じ中括弧
}の後にもセミコロンを追加する必要があります。
形式要件が満たされていない場合、JAAS 設定ファイルは解析できません。次のコードは、典型的な JAAS 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };設定項目
説明
ログオンモジュール
com.sun.security.auth.module.Krb5LoginModule を設定する必要があります。
useKeyTab
true に設定する必要があります。
keyTab
任意のパスを指定できます。同期タスクの実行時に、データソース設定時にアップロードされた keytab ファイルが自動的にローカルパスにダウンロードされます。このローカルファイルパスが、keytab 設定項目に設定されます。
storeKey
クライアントがキーを保存するかどうかを指定します。true または false に設定できます。データ同期には影響しません。
serviceName
Kafka サーバーの server.properties 設定ファイル内の sasl.kerberos.service.name 設定項目に対応します。必要に応じてこの項目を設定してください。
principal
Kafka クライアントが使用する Kerberos プリンシパル。必要に応じてこの項目を設定し、アップロードされた keytab ファイルにこのプリンシパルのキーが含まれていることを確認してください。
Kerberos 設定ファイル
Kerberos 設定ファイルには、[libdefaults] と [realms] の 2 つのモジュールが含まれている必要があります。
[libdefaults] モジュールは Kerberos 認証パラメーターを指定します。モジュール内の各設定項目は key=value 形式で記述されます。
[realms] モジュールは KDC アドレスを指定します。複数のレルムサブモジュールを含むことができます。各レルムサブモジュールはレルム名=で始まります。
その後に、中括弧で囲まれた設定項目のセットが続きます。各設定項目も key=value 形式で記述されます。次のコードは、典型的な Kerberos 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }設定項目
説明
[libdefaults].default_realm
Kafka クラスターノードにアクセスする際に使用されるデフォルトのレルム。これは通常、JAAS 設定ファイルで指定されたクライアントプリンシパルのレルムと同じです。
その他の [libdefaults] パラメーター
[libdefaults] モジュールは、ticket_lifetime などの他の Kerberos 認証パラメーターを指定できます。必要に応じて設定してください。
[realms].realm name
JAAS 設定ファイルで指定されたクライアントプリンシパルのレルムおよび [libdefaults].default_realm と同じである必要があります。JAAS 設定ファイルのクライアントプリンシパルのレルムが [libdefaults].default_realm と異なる場合は、2 つのレルムサブモジュールを含める必要があります。これらのサブモジュールは、それぞれ JAAS 設定ファイルのクライアントプリンシパルのレルムと [libdefaults].default_realm に対応する必要があります。
[realms].realm name.kdc
ip:port 形式で KDC のアドレスとポートを指定します。例:kdc=10.0.0.1:88。ポートを省略した場合、デフォルトのポート 88 が使用されます。例:kdc=10.0.0.1。
Keytab ファイル
Keytab ファイルには、JAAS 設定ファイルで指定されたプリンシパルのキーが含まれている必要があり、KDC によって検証可能である必要があります。たとえば、現在の作業ディレクトリに client.keytab という名前のファイルがある場合、次のコマンドを実行して、Keytab ファイルに指定されたプリンシパルのキーが含まれているかどうかを確認できます。
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)専用リソースグループの DNS および HOST 設定
Kerberos 認証が有効になっている Kafka クラスターでは、クラスター内のノードのホスト名が、そのノードのためにキー配布センター (KDC) に登録されたプリンシパルの一部として使用されます。クライアントが Kafka クラスター内のノードにアクセスすると、ローカルの DNS および HOST 設定に基づいてノードのプリンシパルを推測し、KDC からノードのアクセス認証情報を取得します。Kerberos 認証が有効になっている Kafka クラスターに専用リソースグループを使用してアクセスする場合、KDC からクラスターノードのアクセス認証情報を取得できるように、DNS および HOST 設定を正しく構成する必要があります:
DNS 設定
専用リソースグループがアタッチされている VPC 内で、Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されている場合、DataWorks コンソールの専用リソースグループの VPC アタッチメントに、IP アドレス 100.100.2.136 および 100.100.2.138 のカスタムルートを追加できます。これにより、Kafka クラスターノードの PrivateZone 名前解決設定が専用リソースグループに適用されることが保証されます。

HOST 設定
専用リソースグループがアタッチされている VPC 内で、Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されていない場合、DataWorks コンソールの専用リソースグループのネットワーク設定で、各 Kafka クラスターノードの IP アドレスとドメイン名のマッピングをホスト設定に追加する必要があります。

PLAIN
Kafka データソースを設定する際に [Sasl メカニズム] を PLAIN に設定した場合、JAAS ファイルは KafkaClient で始まり、その後にすべての中括弧 {} で囲まれた設定項目が続きます。
中括弧内の最初の行は、使用するログオンコンポーネントクラスを定義します。異なる SASL 認証メカニズムに対して、ログオンコンポーネントクラスは固定されています。後続の各設定項目は key=value 形式で記述されます。
最後の設定項目を除くすべての設定項目は、セミコロンで終わってはいけません。
最後の設定項目はセミコロンで終わる必要があります。また、閉じ中括弧 "}" の後にもセミコロンを追加する必要があります。
形式要件が満たされていない場合、JAAS 設定ファイルは解析できません。次のコードは、典型的な JAAS 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};設定項目 | 説明 |
ログオンモジュール | org.apache.kafka.common.security.plain.PlainLoginModule を設定する必要があります。 |
username | ユーザー名。必要に応じてこの項目を設定してください。 |
password | パスワード。必要に応じてこの項目を設定してください。 |
よくある質問
付録:スクリプトデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。
Reader スクリプトデモ
次のコードは、Kafka からデータを読み取るための JSON 設定を示しています。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,//throttle が false の場合、mbps パラメーターは効果がなく、レートは制限されません。throttle が true の場合、レートは制限されます。
"concurrent": 1,//同時実行スレッド数。
"mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
}
}
}Reader スクリプトパラメーター
パラメーター | 説明 | 必須 |
datasource | データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい |
server | ip:port 形式の Kafka ブローカーサーバーのアドレス。 server は 1 つしか設定できませんが、DataWorks が Kafka クラスター内のすべてのブローカーの IP アドレスに接続できることを確認する必要があります。 | はい |
topic | Kafka Topic。Topic は、Kafka が処理するメッセージフィードの集約です。 | はい |
column | 読み取る Kafka データ。定数列、データ列、属性列がサポートされています。
| はい |
keyType | Kafka キーの型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | いいえ |
valueType | Kafka 値の型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | いいえ |
beginDateTime | データ消費の開始オフセット。これは時間範囲の左境界 (含む) です。yyyymmddhhmmss 形式の時間文字列です。[スケジューリングパラメーター] と併用できます。詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。 説明 この機能は Kafka 0.10.2 以降でサポートされています。 | このパラメーターまたは beginOffset のいずれかを指定する必要があります。 説明 beginDateTime と endDateTime は一緒に使用されます。 |
endDateTime | データ消費の終了オフセット。これは時間範囲の右境界 (含まない) です。yyyymmddhhmmss 形式の時間文字列です。[スケジューリングパラメーター] と併用できます。詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。 説明 この機能は Kafka 0.10.2 以降でサポートされています。 | このパラメーターまたは endOffset のいずれかを指定する必要があります。 説明 endDateTime と beginDateTime は一緒に使用されます。 |
beginOffset | データ消費の開始オフセット。次の形式で設定できます:
| このパラメーターまたは beginDateTime のいずれかを指定する必要があります。 |
endOffset | データ消費の終了オフセット。これは、データ消費タスクが終了するタイミングを制御するために使用されます。 | このパラメーターまたは endDateTime のいずれかを指定する必要があります。 |
skipExceedRecord | Kafka は
| いいえ。デフォルト値は false です。 |
partition | Kafka Topic には複数のパーティション (partition) があります。デフォルトでは、データ同期タスクは Topic 内のすべてのパーティションをカバーするオフセット範囲からデータを読み取ります。partition を指定して、単一のパーティションのオフセット範囲からのみデータを読み取ることもできます。 | いいえ。デフォルト値はありません。 |
kafkaConfig | データ消費のために KafkaConsumer クライアントを作成する際に、bootstrap.servers、auto.commit.interval.ms、session.timeout.ms などの拡張パラメーターを指定できます。kafkaConfig を使用して、KafkaConsumer の消費動作を制御できます。 | いいえ |
encoding | keyType または valueType が STRING に設定されている場合、このパラメーターで指定されたエンコーディングが文字列の解析に使用されます。 | いいえ。デフォルト値は UTF-8 です。 |
waitTIme | コンシューマーオブジェクトが 1 回の試行で Kafka からデータをプルするのを待つ最大時間 (秒単位)。 | いいえ。デフォルト値は 60 です。 |
stopWhenPollEmpty | 有効な値は true と false です。このパラメーターが true に設定され、コンシューマーが Kafka から空のデータをプルした場合 (通常は Topic 内のすべてのデータが読み取られたか、ネットワークまたは Kafka クラスターの可用性の問題が原因)、タスクはすぐに停止します。それ以外の場合は、データが再び読み取られるまで再試行します。 | いいえ。デフォルト値は true です。 |
stopWhenReachEndOffset | このパラメーターは、stopWhenPollEmpty が true の場合にのみ有効です。有効な値は true と false です。
| いいえ。デフォルト値は false です。 説明 これは過去のロジックとの互換性のためです。V0.10.2 より前の Kafka バージョンでは、Kafka Topic のすべてのパーティションから最新データが読み取られたかどうかをチェックできません。ただし、一部のコードエディタタスクは、V0.10.2 より前の Kafka バージョンからデータを読み取っている可能性があります。 |
次の表は、kafkaConfig パラメーターについて説明しています。
パラメーター | 説明 |
fetch.min.bytes | コンシューマーがブローカーから取得できるメッセージの最小バイト数を指定します。データは、十分なデータがある場合にのみコンシューマーに返されます。 |
fetch.max.wait.ms | ブローカーがデータを返すのを待つ最大時間。デフォルト値は 500 ミリ秒です。データは、fetch.min.bytes または fetch.max.wait.ms のいずれかの条件が先に満たされた時点で返されます。 |
max.partition.fetch.bytes | ブローカーが各 partition からコンシューマーに返すことができる最大バイト数を指定します。デフォルト値は 1 MB です。 |
session.timeout.ms | コンシューマーがサービスを受けられなくなる前にサーバーから切断できる時間を指定します。デフォルト値は 30 秒です。 |
auto.offset.reset | オフセットがないか、無効なオフセット (コンシューマーが長時間非アクティブで、オフセットを持つレコードが期限切れで削除されたため) で読み取る場合にコンシューマーが実行するアクション。デフォルト値は none で、オフセットは自動的にリセットされないことを意味します。earliest に変更でき、これはコンシューマーが最も古いオフセットから partition レコードを読み取ることを意味します。 |
max.poll.records | poll メソッドの 1 回の呼び出しで返すことができるメッセージの数。 |
key.deserializer | メッセージキーの逆シリアル化メソッド。例:org.apache.kafka.common.serialization.StringDeserializer。 |
value.deserializer | データ値の逆シリアル化メソッド。例:org.apache.kafka.common.serialization.StringDeserializer。 |
ssl.truststore.location | SSL ルート証明書のパス。 |
ssl.truststore.password | ルート証明書ストアのパスワード。Alibaba Cloud Kafka を使用している場合は、これを KafkaOnsClient に設定します。 |
security.protocol | アクセスプロトコル。現在、SASL_SSL プロトコルのみがサポートされています。 |
sasl.mechanism | SASL 認証方式。Alibaba Cloud Kafka を使用している場合は、PLAIN を使用します。 |
java.security.auth.login.config | SASL 認証ファイルのパス。 |
Writer スクリプトデモ
次のコードは、Kafka にデータを書き込むための JSON 設定を示しています。
{
"type":"job",
"version":"2.0",//バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//プラグイン名。
"parameter":{
"server": "ip:9092", //Kafka のサーバーアドレス。
"keyIndex": 0, //キーとして使用する列。k を小文字にしたキャメルケース命名規則に従う必要があります。
"valueIndex": 1, //値として使用する列。現在、ソースデータから 1 つの列を選択するか、このパラメーターを空にすることができます。空の場合、すべてのソースデータが使用されます。
//たとえば、ODPS テーブルの 2、3、4 番目の列を kafkaValue として使用するには、新しい ODPS テーブルを作成し、元の ODPS テーブルのデータをクリーンアップして新しいテーブルに統合し、その新しいテーブルを同期に使用します。
"keyType": "Integer", //Kafka キーの型。
"valueType": "Short", //Kafka 値の型。
"topic": "t08", //Kafka Topic。
"batchSize": 1024 //一度に Kafka に書き込まれるデータ量 (バイト単位)。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//エラーレコード数。
},
"speed":{
"throttle":true,//throttle が false の場合、mbps パラメーターは効果がなく、レートは制限されません。throttle が true の場合、レートは制限されます。
"concurrent":1, //同時実行ジョブ数。
"mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer スクリプトパラメーター
パラメーター | 説明 | 必須 |
datasource | データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい |
server | ip:port 形式の Kafka サーバーのアドレス。 | はい |
topic | Kafka Topic。Kafka が処理するさまざまなメッセージフィードのカテゴリです。 Kafka クラスターに公開される各メッセージにはカテゴリがあり、これは Topic と呼ばれます。Topic はメッセージのグループのコレクションです。 | はい |
valueIndex | Kafka ライターで値として使用される列。指定しない場合、デフォルトですべての列が連結されて値を形成します。区切り文字は fieldDelimiter で指定されます。 | いいえ |
writeMode | valueIndex が設定されていない場合、このパラメーターは、ソースレコードのすべての列を連結して Kafka レコードの値を形成する形式を決定します。有効な値は text と JSON です。デフォルト値は text です。
たとえば、ソースレコードに a、b、c の値を持つ 3 つの列があり、writeMode が text に、fieldDelimiter が # に設定されている場合、書き込まれる Kafka レコードの値は文字列 a#b#c になります。writeMode が JSON に設定され、column が [{"name":"col1"},{"name":"col2"},{"name":"col3"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":"c"} になります。 valueIndex が設定されている場合、このパラメーターは無効です。 | いいえ |
column | データが書き込まれる宛先テーブルのフィールドをコンマで区切って指定します。例: valueIndex が設定されておらず、writeMode が JSON に設定されている場合、このパラメーターはソースレコードの列の値の JSON 構造におけるフィールド名を定義します。例:
valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。 | valueIndex が設定されておらず、writeMode が JSON に設定されている場合に必須です。 |
partition | データが書き込まれる Kafka Topic 内のパーティションの番号を指定します。これは 0 以上の整数でなければなりません。 | いいえ |
keyIndex | Kafka ライターでキーとして使用される列。 keyIndex パラメーターの値は 0 以上の整数でなければなりません。そうでない場合、タスクは失敗します。 | いいえ |
keyIndexes | Kafka レコードのキーとして使用されるソースレコード内の列の序数の配列。 列の序数は 0 から始まります。たとえば、[0,1,2] は、設定されたすべての列番号の値をコンマで連結して Kafka レコードのキーを形成します。これが指定されていない場合、Kafka レコードのキーは null になり、データは Topic のパーティションにラウンドロビン方式で書き込まれます。このパラメーターまたは keyIndex のいずれか一方のみを指定できます。 | いいえ |
fieldDelimiter | writeMode が text に設定され、valueIndex が設定されていない場合、ソースレコードのすべての列は、このパラメーターで指定された列区切り文字を使用して連結され、Kafka レコードの値を形成します。区切り文字として単一の文字または複数の文字を設定できます。Unicode 文字は \u0001 の形式で設定できます。\t や \n などのエスケープ文字がサポートされています。デフォルト値は \t です。 writeMode が text に設定されていない場合、または valueIndex が設定されている場合、このパラメーターは無効です。 | いいえ |
keyType | Kafka キーの型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | はい |
valueType | Kafka 値の型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。 | はい |
nullKeyFormat | keyIndex または keyIndexes で指定されたソース列の値が null の場合、このパラメーターで指定された文字列に置き換えられます。これが設定されていない場合、置き換えは行われません。 | いいえ |
nullValueFormat | ソース列の値が null の場合、Kafka レコードの値を組み立てる際に、このパラメーターで指定された文字列に置き換えられます。これが設定されていない場合、置き換えは行われません。 | いいえ |
acks | Kafka プロデューサーを初期化する際の acks 設定。これにより、書き込み成功の確認応答方式が決まります。デフォルトでは、acks パラメーターは all に設定されます。acks の有効な値は次のとおりです:
| いいえ |
付録:Kafka への書き込みメッセージ形式の定義
リアルタイム同期タスクを設定して実行すると、ソースデータベースから読み取られたデータが JSON 形式で Kafka Topic に書き込まれます。まず、指定されたソーステーブル内のすべての既存データが対応する Kafka Topic に書き込まれます。その後、タスクはリアルタイム同期を開始し、増分データを継続的に Topic に書き込みます。ソーステーブルからの増分 DDL 変更情報も JSON 形式で Kafka Topic に書き込まれます。Kafka に書き込まれたメッセージのステータスと変更情報を取得できます。詳細については、「付録:メッセージ形式」をご参照ください。
オフライン同期タスクによって Kafka に書き込まれるデータの JSON 構造では、payload.sequenceId、payload.timestamp.eventTime および payload.timestamp.checkpointTime フィールドは -1 に設定されます。
付録:JSON フィールドタイプ
[writeMode] が JSON に設定されている場合、column パラメーターの type フィールドを使用して JSON データ型を指定できます。書き込み操作中に、システムはソースレコードの列の値を指定された型に変換しようとします。この変換が失敗すると、ダーティデータが生成されます。
有効な値 | 説明 |
JSON_STRING | ソースレコードの列の値を文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が整数 |
JSON_NUMBER | ソースレコードの列の値を数値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 |
JSON_BOOL | ソースレコードの列の値をブール値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 |
JSON_ARRAY | ソースレコードの列の値を JSON 配列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 |
JSON_MAP | ソースレコードの列の値を JSON オブジェクトに変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 |
JSON_BASE64 | ソースレコードの列の値のバイナリバイトコンテンツを BASE64 エンコード文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が長さ 2 のバイト配列で、16 進数で |
JSON_HEX | ソースレコードの列の値のバイナリバイトコンテンツを 16 進数文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が長さ 2 のバイト配列で、16 進数で |