すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka データソース

最終更新日:Apr 02, 2026

Kafka データソースは、Kafka からのデータ読み取りおよび Kafka へのデータ書き込みのための双方向チャネルを提供します。本トピックでは、DataWorks が Kafka に対して提供するデータ同期機能について説明します。

サポートされるバージョン

DataWorks は、Alibaba Cloud Kafka および 0.10.2 ~ 3.6.x のセルフマネージド Kafka をサポートしています。

説明

Kafka のバージョンが 0.10.2 より前の場合、データ同期はサポートされません。これは、これらのバージョンではパーティションオフセットの取得がサポートされておらず、またデータ構造がタイムスタンプをサポートしていないためです。

リアルタイム読み取り

  • サブスクリプション Serverless リソースグループ を使用する場合、タスク失敗を防ぐため、事前に必要な仕様を推定する必要があります。

    トピックごとに 1 CU を推定します。さらに、スループットに基づいてリソースを推定します。

    • 非圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 1 CU を推定します。

    • 圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 2 CU を推定します。

    • JSON 解析を要する圧縮 Kafka データの場合:トラフィック 10 MB/s あたり 3 CU を推定します。

  • サブスクリプション Serverless リソースグループまたは旧バージョンのデータ統合専用リソースグループを使用する場合:

    • フェールオーバーに対するワークロードの許容度が高い場合は、クラスタースロット使用率が 80 % を超えないようにしてください。

    • フェールオーバーに対するワークロードの許容度が低い場合は、クラスタースロット使用率が 70 % を超えないようにしてください。

説明

実際のリソース使用量は、データの内容や形式などの要因によって異なります。初期評価後に実際の使用状況に応じてリソースを調整できます。

制限事項

Kafka データソースは、Serverless リソースグループ(推奨) および データ統合専用リソースグループの旧バージョン をサポートしています。

単一テーブルからのオフライン読み取り

parameter.groupId および parameter.kafkaConfig.group.id の両方が設定されている場合、parameter.groupIdgroup.id よりも優先されます(kafkaConfig パラメーター内)。

単一テーブルへのリアルタイム書き込み

書き込み操作では重複排除はサポートされません。オフセットリセットまたはフェールオーバー後にタスクが再開された場合、重複したデータが書き込まれる可能性があります。

データベース全体へのリアルタイム書き込み

  • リアルタイムデータ同期タスクは、Serverless リソースグループ(推奨) および データ統合専用リソースグループの旧バージョン をサポートしています。

  • ソーステーブルにプライマリキーがある場合、そのプライマリキー値が Kafka レコードのキーとして使用されます。これにより、同一のプライマリキーに対する変更が順序通りに同一 Kafka パーティションに書き込まれることを保証します。

  • ソーステーブルにプライマリキーがない場合、以下の 2 つの選択肢があります。プライマリキーなしのテーブル同期を選択した場合、Kafka レコードのキーは空になります。テーブル変更を Kafka に順序通りに書き込むには、送信先 Kafka トピックのパーティション数を 1 にする必要があります。カスタムプライマリキーを選択した場合、1 つ以上の非プライマリキー列の組み合わせが Kafka レコードのキーとして使用されます。

  • 以下の設定を Kafka データソースの拡張パラメーターに追加します。これにより、クラスターで例外が発生した場合でも、同一プライマリキーを持つレコードの変更が順序通りに同一パーティションに書き込まれることを保証します。

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    重要

    この設定により、同期性能が大幅に低下します。厳密な順序性および信頼性の要件と性能のバランスを取ってください。

  • Kafka へのリアルタイム同期におけるメッセージ全体のフォーマット、ハートビートメッセージのフォーマット、およびソースデータ変更に対応するメッセージのフォーマットについては、「付録:メッセージフォーマット」をご参照ください。

サポートされるフィールド型

Kafka は非構造化データストレージを提供します。Kafka レコードには通常、以下のフィールドが含まれます:key(キー)、value(値)、offset(オフセット)、timestamp(タイムスタンプ)、headers(ヘッダー)、partition(パーティション)。DataWorks が Kafka からデータを読み取る場合および Kafka にデータを書き込む場合の処理方法を以下に示します。

データ読み取り

DataWorks が Kafka からデータを読み取る場合、JSON 形式のデータを解析できます。以下の表に、各データモジュールの処理方法を示します。

Kafka レコードのデータモジュール

処理後のデータ型

key

keyType 設定項目に依存します(データ同期タスク内)。keyType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。

value

valueType 設定項目に依存します(データ同期タスク内)。valueType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。

offset

Long

timestamp

Long

headers

String

partition

Long

データ書き込み

DataWorks が Kafka にデータを書き込む場合、JSON 形式またはテキスト形式での書き込みをサポートします。処理ポリシーは、データ同期タスクの種類によって異なり、以下の表に詳細を示します。

重要
  • テキスト形式でデータを書き込む場合、フィールド名は含まれません。フィールド値は区切り文字で区切られます。

  • リアルタイム同期タスクの場合、DataWorks はデータベース変更メッセージ、業務日時、およびデータ定義言語(DDL)情報を含む組み込み JSON 形式を使用します。メッセージフォーマットの詳細については、「付録:メッセージフォーマット」をご参照ください。

同期タスクの種類

Kafka に書き込まれる value のフォーマット

ソースフィールド型

書き込み操作の処理方法

オフライン同期

DataStudio のオフライン同期ノード

json

String

UTF-8 エンコード済み文字列

Boolean

"true" または "false" の UTF-8 エンコード済み文字列に変換されます

Time/Date

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列

Numeric

UTF-8 エンコード済み数値文字列

Byte stream

バイトストリームは UTF-8 エンコード済み文字列として扱われ、文字列に変換されます。

text

String

UTF-8 エンコード済み文字列

Boolean

"true" または "false" の UTF-8 エンコード済み文字列に変換されます

Time/Date

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列

Numeric

UTF-8 エンコード済み数値文字列

Byte stream

バイトストリームは UTF-8 エンコード済み文字列として扱われ、文字列に変換されます。

リアルタイム同期:Kafka へのリアルタイム ETL

DataStudio のリアルタイム同期ノード

json

String

UTF-8 エンコード済み文字列

Boolean

JSON ブール型

Time/Date

  • ミリ秒未満の精度を持つ時刻値の場合:ミリ秒単位のタイムスタンプを表す 13 桁の JSON 整数に変換されます。

  • マイクロ秒またはナノ秒精度を持つ時刻値の場合:ミリ秒タイムスタンプを表す 13 桁の整数と、ナノ秒タイムスタンプを表す 6 桁の小数部を含む JSON 浮動小数点数に変換されます。

Numeric

JSON 数値型

Byte stream

バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。

text

String

UTF-8 エンコード済み文字列

Boolean

"true" または "false" の UTF-8 エンコード済み文字列に変換されます

Time/Date

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード済み文字列

Numeric

UTF-8 エンコード済み数値文字列

Byte stream

バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。

リアルタイム同期: データベース全体の Kafka へのリアルタイム同期

増分データのみのリアルタイム同期

組み込み JSON 形式

String

UTF-8 エンコード済み文字列

Boolean

JSON ブール型

Time/Date

13 桁のミリ秒タイムスタンプ

Numeric

JSON 数値

Byte stream

バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。

同期ソリューション:Kafka へのワンクリックリアルタイム同期

フルオフライン同期 + 増分リアルタイム同期

組み込み JSON 形式

String

UTF-8 エンコード済み文字列

Boolean

JSON ブール型

Time/Date

13 桁のミリ秒タイムスタンプ

Numeric

JSON 数値

Byte stream

バイトストリームは Base64 エンコードされ、その後 UTF-8 エンコード済み文字列に変換されます。

データソースの追加

DataWorks で同期タスクを開発する前に、データソース管理 の手順に従い、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールで パラメーターの説明を表示することで、各パラメーターの意味を確認できます

データ同期タスクの開発

同期タスクの設定入口および設定手順については、以下の設定ガイドをご参照ください。

単一テーブル向けオフライン同期タスクの設定

単一テーブルまたはデータベース全体向けリアルタイム同期タスクの設定

手順については、「単一テーブル向けリアルタイム同期タスクの設定」および「データベース全体向けリアルタイム同期タスクの設定」をご参照ください。

認証設定

SSL

Kafka データソースの 特別な認証方式SSL または SASL_SSL に設定すると、Kafka クラスターで SSL 認証が有効になります。クライアントの truststore 証明書ファイルをアップロードし、truststore パスフレーズを入力する必要があります。

  • Kafka クラスターが Alibaba Cloud Kafka インスタンスである場合、「SSL 証明書アルゴリズムのスペックアップ手順」に従って、正しい truststore 証明書ファイルをダウンロードしてください。truststore パスフレーズは KafkaOnsClient です。

  • Kafka クラスターが EMR インスタンスである場合、「Kafka 接続への SSL 暗号化の適用」に従って、正しい truststore 証明書ファイルをダウンロードし、truststore パスフレーズを取得してください。

  • セルフマネージドクラスターの場合は、正しい truststore 証明書をアップロードし、正しい truststore パスフレーズを入力する必要があります。

keystore 証明書ファイル、keystore パスフレーズ、および SSL パスフレーズは、Kafka クラスターで双方向 SSL 認証が有効な場合のみ必要です。Kafka クラスターのサーバーは、クライアントの ID を認証するためにこれらを使用します。双方向 SSL 認証は、Kafka クラスターの server.properties ファイル内で ssl.client.auth=required が設定されている場合に有効になります。詳細については、「Kafka 接続への SSL 暗号化の適用」をご参照ください。

GSSAPI

Kafka データソースの設定時に Sasl Mechanism を GSSAPI に設定した場合、以下の 3 つの認証ファイルをアップロードする必要があります: JAAS 設定ファイルKerberos 設定ファイル、および Keytab ファイル。また、データ統合専用リソースグループの DNS/HOST 設定も行う必要があります。以下に、これらのファイルおよび DNS/HOST 設定の詳細を説明します。

説明

Serverless リソースグループ の場合、内部 DNS 名前解決を使用してホストアドレス情報を設定する必要があります。詳細については、「内部 DNS 名前解決(PrivateZone)」をご参照ください。

  • JAAS 設定ファイル

    JAAS ファイルは KafkaClient で始まり、その後にすべての設定項目を波括弧 {} で囲む必要があります。

    • 波括弧内の最初の行は、使用するログインコンポーネントクラスを定義します。異なる SASL 認証方式では、ログインコンポーネントクラスは固定されています。以降の各設定項目は、key=value 形式で記述します。

    • 最後の設定項目以外は、セミコロンで終了してはいけません。

    • 最後の設定項目はセミコロンで終了する必要があります。また、閉じ波括弧 } の後にもセミコロンを追加する必要があります。

    上記の形式要件を満たさない場合、JAAS 設定ファイルは解析できません。以下のコードは、典型的な JAAS 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    設定項目

    説明

    ログインモジュール

    com.sun.security.auth.module.Krb5LoginModule に設定する必要があります。

    useKeyTab

    true に設定する必要があります。

    keyTab

    任意のパスを指定できます。同期タスク実行時に、データソース設定中にアップロードした keytab ファイルが自動的にローカルパスにダウンロードされます。システムは、このローカルパスを keyTab 設定項目の値として使用します。

    storeKey

    クライアントがキーを保存するかどうかを指定します。true または false を設定できます。データ同期には影響しません。

    serviceName

    Kafka サーバーの server.properties 設定ファイル内の sasl.kerberos.service.name 設定項目に対応します。必要に応じて設定してください。

    principal

    Kafka クライアントで使用される Kerberos プリンシパルです。アップロードした keytab ファイルに、このプリンシパルのキーが含まれていることを確認してください。

  • Kerberos 設定ファイル

    Kerberos 設定ファイルには、[libdefaults] および [realms] の 2 つのモジュールが含まれている必要があります。

    • [libdefaults] モジュールでは、Kerberos 認証パラメーターを指定します。モジュール内の各設定項目は、key=value 形式で記述します。

    • [realms] セクションでは、KDC アドレスを指定します。複数の realm 定義を含めることができます。各 realm 定義は realm name= で始まります。

    その後、波括弧で囲まれた一連の設定項目が続きます。各設定項目も key=value 形式で記述します。以下のコードは、典型的な Kerberos 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    設定項目

    説明

    [libdefaults].default_realm

    Kafka クラスターノードへのアクセス時に使用されるデフォルト realm です。通常、JAAS 設定ファイルで指定されたクライアントプリンシパルの realm と同じです。

    その他の [libdefaults] パラメーター

    [libdefaults] モジュールでは、ticket_lifetime などのその他の Kerberos 認証パラメーターを指定できます。必要に応じて設定してください。

    [realms].realm name

    JAAS 設定ファイルで指定されたクライアントプリンシパルの realm および [libdefaults].default_realm と同じである必要があります。JAAS 設定ファイル内のクライアントプリンシパルの realm と [libdefaults].default_realm が異なる場合、2 つの realms サブモジュールを含める必要があります。それぞれが、JAAS 設定ファイル内のクライアントプリンシパルの realm および [libdefaults].default_realm に対応します。

    [realms].realm name.kdc

    ip:port 形式で KDC アドレスおよびポートを指定します(例:kdc=10.0.0.1:88)。ポートを省略した場合、デフォルトポート 88 が使用されます(例:kdc=10.0.0.1)。

  • Keytab ファイル

    Keytab ファイルには、JAAS 設定ファイルで指定されたプリンシパルのキーが含まれており、KDC で検証可能である必要があります。たとえば、現在の作業ディレクトリに client.keytab という名前のファイルがある場合、以下のコマンドを実行して、そのファイルに指定されたプリンシパルのキーが含まれていることを確認できます。

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
  • データ統合専用リソースグループの DNS および HOST 設定

    Kerberos が有効な Kafka クラスターでは、ノードのホスト名は KDC(キー配布センター)内のプリンシパルの一部です。クライアントがノードに接続する際、ローカルの DNS および HOST 設定を使用してノードのプリンシパルを推論し、KDC からアクセス認証情報を要求します。したがって、Kerberos が有効な Kafka クラスターにアクセスするには、データ統合専用リソースグループが、これらの認証情報を取得するために適切な DNS および HOST 設定を持っている必要があります。

    • DNS 設定

      データ統合専用リソースグループがアタッチされている VPC 内で Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されている場合、DataWorks コンソールで、データ統合専用リソースグループの VPC アタッチメントに IP アドレス 100.100.2.136 および 100.100.2.138 のカスタムルートを追加できます。これにより、Kafka クラスターノードの PrivateZone 名前解決設定がデータ統合専用リソースグループに適用されます。

    • HOST 設定

      データ統合専用リソースグループがアタッチされている VPC で PrivateZone インスタンスが名前解決に使用されていない場合、各 Kafka ノードの IP アドレスとホスト名を手動でマッピングする必要があります。DataWorks コンソールで、データ統合専用リソースグループのネットワーク設定内の Host 設定セクションに、これらのマッピングを追加します。

PLAIN

Kafka データソースを設定する際に、Sasl Mechanism を PLAIN に設定した場合、JAAS ファイルは KafkaClient で始まり、その後にすべての設定項目を波括弧 {} で囲む必要があります。

  • 波括弧内の最初の行は、使用するログインコンポーネントクラスを定義します。異なる SASL 認証方式では、ログインコンポーネントクラスは固定されています。以降の各設定項目は、key=value 形式で記述します。

  • 最後の設定項目以外は、セミコロンで終了してはいけません。

  • 最後の設定項目はセミコロンで終了する必要があります。また、閉じ波括弧 "}" の後にもセミコロンを追加する必要があります。

上記の形式要件を満たさない場合、JAAS 設定ファイルは解析できません。以下のコードは、典型的な JAAS 設定ファイルの形式例です。xxx のプレースホルダーは、実際の情報に置き換えてください。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

設定項目

説明

ログインモジュール

org.apache.kafka.common.security.plain.PlainLoginModul に設定する必要があります。

username

ユーザー名です。必要に応じて設定してください。

password

パスワードです。必要に応じて設定してください。

よくある質問

付録:スクリプトデモおよびパラメーター説明

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式要件に従って、関連するパラメーターをスクリプト内に設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに関連する必須パラメーターを説明します。

リーダースクリプトデモ

以下は、Kafka からデータを読み取るための JSON 設定例です。

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//throttle が false の場合、mbps パラメーターは無効となり、レート制限が行われません。throttle が true の場合、レートが制限されます。
            "concurrent": 1,//並行スレッド数。
            "mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
        }
    }
}

リーダースクリプトパラメーター

パラメーター

説明

必須

datasource

データソースの名称です。コードエディタではデータソースの追加が可能です。このパラメーターの値は、追加したデータソースの名称と一致している必要があります。

はい

server

Kafka ブローカーサーバーのアドレスを ip:port 形式で指定します。

1 つの server のみを設定できますが、DataWorks が Kafka クラスター内のすべてのブローカーの IP アドレスに接続できることを確認する必要があります。

はい

topic

Kafka トピックです。トピックとは、Kafka が処理するメッセージフィードの集約です。

はい

column

Kafka から読み取るデータです。定数列、データ列、属性列がサポートされます。

  • 定数列:シングルクォートで囲まれた列(例:["'abc'", "'123'"])。

  • データ列

    • データが JSON 形式の場合、JSON オブジェクトのプロパティを取得できます(例:["event_id"])。

    • データが JSON 形式の場合、JSON オブジェクトのネストされたサブプロパティを取得できます(例:["tag.desc"])。

  • 属性列

    • __key__:メッセージのキー。

    • __value__:メッセージの完全な内容。

    • __partition__:現在のメッセージが存在する partition

    • __headers__:現在のメッセージのヘッダー。

    • __offset__:現在のメッセージの offset

    • __timestamp__:現在のメッセージの timestamp

    以下のコードは完全な例です。

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

はい

keyType

Kafka キーの型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

いいえ

valueType

Kafka 値の型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

いいえ

beginDateTime

データ消費の開始オフセットです。これは、期間の左境界(包含)です。yyyymmddhhmmss 形式の時刻文字列です。スケジューリングパラメーター と併用できます。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

説明

この機能は Kafka 0.10.2 以降でサポートされています。

このパラメーターまたは beginOffset のいずれかを指定する必要があります。

説明

beginDateTime および endDateTime は併用されます。

endDateTime

データ消費の終了オフセットです。これは、期間の右境界(除外)です。yyyymmddhhmmss 形式の時刻文字列です。スケジューリングパラメーター と併用できます。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

説明

この機能は Kafka 0.10.2 以降でサポートされています。

このパラメーターまたは endOffset のいずれかを指定する必要があります。

説明

endDateTime および beginDateTime は併用されます。

beginOffset

データ消費の開始オフセットです。以下の形式で設定できます。

  • 数値(例:15553274):消費の開始オフセットを示します。

  • seekToBeginning:最小オフセットからデータを消費することを示します。

  • seekToLastkafkaConfig > group.id で指定されたグループ ID の保存済み offset からデータを読み取ります。注:グループオフセットは定期的にコミットされます。タスクが失敗して再実行された場合、データが重複または欠落する可能性があります。skipExceedRecord パラメーターを true に設定した場合、最後の数レコードが破棄される可能性があります。この破棄されたデータのオフセットはすでにコミットされているため、次回の実行では再読み取りできません。

  • seekToEnd:最大オフセットからデータを消費することを示します。これは空のデータを読み取ります。

このパラメーターまたは beginDateTime のいずれかを指定する必要があります。

endOffset

データ消費の終了オフセットです。データ消費タスクの終了タイミングを制御するために使用されます。

このパラメーターまたは endDateTime のいずれかを指定する必要があります。

skipExceedRecord

Kafka は public ConsumerRecords<K, V> poll(final Duration timeout) メソッドを使用してデータを消費します。1 回の poll 呼び出しで取得されるデータは、endOffset または endDateTime を超える場合があります。skipExceedRecord パラメーターは、この超過データを送信先に書き込むかどうかを制御します。オフセットは自動的にコミットされるため、以下のとおり推奨します。

  • Kafka のバージョンが 0.10.2 より前の場合:skipExceedRecord を false に設定してください。

  • Kafka 0.10.2 以降の場合:skipExceedRecord を true に設定してください。

いいえ。デフォルト値は false です。

partition

Kafka トピックには複数のパーティション(partition)があります。デフォルトでは、データ同期タスクはトピック内のすべてのパーティションをカバーするオフセット範囲からデータを読み取ります。ただし、partition を指定することで、単一パーティションのオフセット範囲からのみデータを読み取ることもできます。

いいえ。デフォルト値はありません。

kafkaConfig

KafkaConsumer クライアントを作成する際に、bootstrap.serversauto.commit.interval.mssession.timeout.ms などの拡張パラメーターを指定できます。kafkaConfig を使用することで、KafkaConsumer の消費動作を制御できます。

いいえ

encoding

keyType または valueType が STRING に設定されている場合、このパラメーターで指定されたエンコーディングを使用して文字列を解析します。

いいえ。デフォルト値は UTF-8 です。

waitTIme

コンシューマーオブジェクトが Kafka からデータを 1 回の試行で取得する際の最大待機時間(秒)です。

いいえ。デフォルト値は 60 です。

stopWhenPollEmpty

有効な値は true および false です。このパラメーターを true に設定し、コンシューマーが Kafka から空のデータを取得した場合(通常はトピック内のすべてのデータが読み取られた場合、またはネットワークまたは Kafka クラスターの可用性の問題による場合)、タスクは直ちに停止します。それ以外の場合、再度データが取得されるまで再試行します。

いいえ。デフォルト値は true です。

stopWhenReachEndOffset

このパラメーターは、stopWhenPollEmpty が true の場合にのみ有効です。有効な値は true および false です。

  • true:コンシューマーが Kafka から空のデータを取得した場合、システムは Kafka トピックパーティションの最新 offset がすべて読み取られたかどうかをチェックします。読み取られている場合、タスクは直ちに停止します。そうでない場合、タスクは Kafka トピックから引き続きデータを取得します。

  • false:コンシューマーが Kafka から空のデータを取得した場合、システムはチェックを行わず、タスクを直ちに停止します。

いいえ。デフォルト値は false です。

説明

このパラメーターは下位互換性のために提供されています。Kafka のバージョンが 0.10.2 より前の場合、Kafka トピックのすべてのパーティションの最新 offset が読み取られたかどうかをチェックできません。ただし、一部の script mode タスクでは、Kafka のバージョンが 0.10.2 より前のものからデータを読み取る場合があります。

以下の表に、kafkaConfig パラメーターを示します。

パラメーター

説明

fetch.min.bytes

コンシューマーがブローカーから取得できるデータの最小量(バイト単位)です。十分な量のデータが利用可能になった場合にのみ、データがコンシューマーに返されます。

fetch.max.wait.ms

ブローカーがデータを返すまでの最大待機時間です。デフォルト値は 500 ms です。ブローカーは、fetch.min.bytes または fetch.max.wait.ms のいずれかの条件が満たされた時点でデータを返します。

max.partition.fetch.bytes

ブローカーがコンシューマーに各 partition から返すことができる最大バイト数を指定します。デフォルト値は 1 MB です。

session.timeout.ms

コンシューマーがサーバーから切断された場合にサービスを受けることができなくなるまでの時間を指定します。デフォルト値は 30 秒です。

auto.offset.reset

オフセットがない場合、または無効なオフセット(コンシューマーが長期間非アクティブであり、オフセットを持つレコードが期限切れになって削除された場合)で読み取る際に、コンシューマーが実行するアクションです。デフォルト値は none であり、オフセットは自動的にリセットされません。これを earliest に変更すると、コンシューマーは partition レコードを最小オフセットから読み取ります。

max.poll.records

poll メソッドの 1 回の呼び出しで返されるメッセージ数です。

key.deserializer

メッセージキーの逆シリアル化方法です(例:org.apache.kafka.common.serialization.StringDeserializer)。

value.deserializer

データ値の逆シリアル化方法です(例:org.apache.kafka.common.serialization.StringDeserializer)。

ssl.truststore.location

SSL ルート証明書のパスです。

ssl.truststore.password

ルート証明書ストアのパスワードです。Alibaba Cloud Kafka を使用する場合、この値を KafkaOnsClient に設定します。

security.protocol

アクセスプロトコルです。現在、SASL_SSL プロトコルのみがサポートされています。

sasl.mechanism

SASL 認証方式です。Alibaba Cloud Kafka を使用する場合、PLAIN を使用します。

java.security.auth.login.config

SASL 認証ファイルのパスです。

ライタースクリプトデモ

以下は、Kafka にデータを書き込むための JSON 設定例です。

{
  "type":"job",
  "version":"2.0",//バージョン番号。
  "steps":[
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"Kafka",//プラグイン名。
      "parameter":{
          "server": "ip:9092", //Kafka のサーバーアドレス。
          "keyIndex": 0, //キーとして使用する列。キャメルケース命名規則に従う必要があります(k は小文字)。
          "valueIndex": 1, //値として使用する列。現在、ソースデータから 1 列のみ選択できます。または、このパラメーターを空のままにできます。空の場合は、ソースデータ全体が使用されます。
          //たとえば、ODPS テーブルの 2 番目、3 番目、4 番目の列を kafkaValue として使用する場合、元の ODPS テーブルからデータをクリーンアップおよび統合して新しい ODPS テーブルを作成し、その新しいテーブルを同期に使用します。
          "keyType": "Integer", //Kafka キーの型。
          "valueType": "Short", //Kafka 値の型。
          "topic": "t08", //Kafka トピック。
          "batchSize": 1024 //Kafka に一度に書き込まれるデータ量(バイト単位)。
        },
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
      "errorLimit":{
      "record":"0"//エラーとなるレコード数。
    },
    "speed":{
        "throttle":true,//throttle が false の場合、mbps パラメーターは無効となり、レート制限が行われません。throttle が true の場合、レートが制限されます。
        "concurrent":1, //同時ジョブ数。
        "mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
    }
   },
    "order":{
    "hops":[
        {
            "from":"Reader",
            "to":"Writer"
        }
      ]
    }
}

ライタースクリプトパラメーター

パラメーター

説明

必須

datasource

データソースの名称です。コードエディタではデータソースの追加が可能です。このパラメーターの値は、追加したデータソースの名称と一致している必要があります。

はい

server

Kafka サーバーのアドレスを ip:port 形式で指定します。

はい

topic

Kafka トピックです。Kafka が処理するさまざまなメッセージフィードのカテゴリです。

Kafka クラスターに公開される各メッセージにはカテゴリがあり、これをトピックと呼びます。トピックとは、一連のメッセージの集合です。

はい

valueIndex

Kafka ライターで値として使用する列です。指定しない場合、デフォルトですべての列が連結されて値が形成されます。区切り文字は fieldDelimiter で指定します。

いいえ

writeMode

valueIndex が設定されていない場合、このパラメーターはソースレコードのすべての列を連結して Kafka レコードの値を形成する際のフォーマットを決定します。有効な値は text および JSON です。デフォルト値は text です。

  • text に設定した場合、すべての列が fieldDelimiter で指定された区切り文字を使用して連結されます。

  • JSON に設定した場合、column パラメーターで指定されたフィールド名に基づいて、すべての列が JSON 文字列として連結されます。

たとえば、ソースレコードに a、b、c の値を持つ 3 つの列がある場合、writeMode が text に設定され、fieldDelimiter が # に設定されている場合、書き込まれる Kafka レコードの値は文字列 a#b#c になります。writeMode が JSON に設定され、column が [{"name":"col1"},{"name":"col2"},{"name":"col3"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":"c"} になります。

valueIndex が設定されている場合、このパラメーターは無効です。

いいえ

column

データを書き込む先の宛先テーブルのフィールドをカンマで区切って指定します。例:"column": ["id", "name", "age"]

valueIndex が設定されておらず、writeMode が JSON に設定されている場合、このパラメーターはソースレコードの列値の JSON 構造におけるフィールド名を定義します。例:"column": [{"name":id","type":"JSON_NUMBER"}, {"name":"name","type":"JSON_STRING"}, {"name":"age","type":"JSON_NUMBER"}]

  • ソースレコードの列数が column で設定されたフィールド名の数より多い場合、書き込み時にデータが切り捨てられます。例:

    ソースレコードに a、b、c の値を持つ 3 つの列があり、column が [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b"} になります。

  • ソースレコードの列数が column で定義されたフィールド数より少ない場合、余分なフィールドには null または nullValueFormat の文字列が挿入されます。例:

    ソースレコードに a および b の値を持つ 2 つの列があり、column が [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"},{"name":"col3","type":"JSON_STRING"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":null} になります。valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。

  • JSON フィールド型が設定されていない場合、デフォルトのフィールド型は JSON_STRING です。

  • JSON フィールド型の有効な値については、「付録:JSON フィールド型」をご参照ください。

valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。

valueIndex が設定されておらず、writeMode が JSON に設定されている場合に必須です。

partition

Kafka トピックのパーティション番号を指定します。これは 0 以上の整数である必要があります。

いいえ

keyIndex

Kafka ライターでキーとして使用する列です。

keyIndex パラメーターの値は 0 以上の整数である必要があります。それ以外の場合、タスクは失敗します。

いいえ

keyIndexes

ソースレコードの列の序数(0 から始まる)の配列で、Kafka レコードのキーとして使用します。

たとえば、[0,1,2] を指定すると、すべての設定された列番号の値がカンマで連結されて Kafka レコードのキーが形成されます。指定しない場合、Kafka レコードのキーは null になり、データはトピックのパーティションにラウンドロビン方式で書き込まれます。このパラメーターと keyIndex のどちらか一方のみを指定できます。

いいえ

fieldDelimiter

writeMode が text に設定され、valueIndex が設定されていない場合、ソースレコードのすべての列がこのパラメーターで指定された列区切り文字を使用して連結され、Kafka レコードの値が形成されます。1 文字または複数文字を区切り文字として設定できます。Unicode 文字を \u0001 の形式で設定できます。エスケープ文字(\t\n など)もサポートされます。デフォルト値は \t です。

writeMode が text に設定されていない場合、または valueIndex が設定されている場合、このパラメーターは無効です。

いいえ

keyType

Kafka キーの型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

はい

valueType

Kafka 値の型です。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

はい

nullKeyFormat

keyIndex または keyIndexes で指定されたソース列の値が null の場合、このパラメーターで指定された文字列に置き換えられます。設定されていない場合、置き換えは行われません。

いいえ

nullValueFormat

ソース列の値が null の場合、Kafka レコードの値を構築する際に、このパラメーターで指定された文字列に置き換えられます。このパラメーターが設定されていない場合、置き換えは行われません。

いいえ

acks

Kafka プロデューサーの初期化時の acks 設定です。これは、正常な書き込みに対する確認応答方法を決定します。デフォルトでは、acks パラメーターは all に設定されています。acks の有効な値は以下のとおりです。

  • 0:正常な書き込みに対する確認応答を行いません。

  • 1:プライマリレプリカへの正常な書き込みに対する確認応答を行います。

  • all:すべてのレプリカへの正常な書き込みに対する確認応答を行います。

いいえ

付録:Kafka への書き込みにおけるメッセージフォーマットの定義

リアルタイム同期タスクを設定して実行すると、ソースデータベースから読み取ったデータが JSON 形式で Kafka トピックに書き込まれます。まず、指定されたソーステーブルの既存データが対応する Kafka トピックに書き込まれます。その後、タスクはリアルタイム同期を開始し、増分データを継続的にトピックに書き込みます。ソーステーブルからの増分 DDL 変更情報も JSON 形式で Kafka トピックに書き込まれます。Kafka に書き込まれたメッセージのステータスおよび変更情報を取得できます。詳細については、「付録:メッセージフォーマット」をご参照ください。

説明

オフライン同期タスクの場合、JSON 構造内の payload.sequenceIdpayload.timestamp.eventTime、および payload.timestamp.checkpointTime フィールドは -1 に設定されます。

付録: JSON フィールド型

writeMode が JSON に設定されている場合、column パラメーターの type フィールドを使用して JSON データ型を指定できます。書き込み操作中、システムはソースレコードの列値を指定された型に変換しようとします。この変換が失敗した場合、ダーティデータが生成されます。

有効な値

説明

JSON_STRING

ソースレコードの列値を文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が整数 123 であり、column パラメーターが [{"name":"col1","type":"JSON_STRING"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":"123"} になります。

JSON_NUMBER

ソースレコードの列値を数値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 1.23 であり、column パラメーターが [{"name":"col1","type":"JSON_NUMBER"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":1.23} になります。

JSON_BOOL

ソースレコードの列値をブール値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 true であり、column が [{"name":"col1","type":"JSON_BOOL"} に設定されている場合、Kafka に書き込まれるレコードの値は文字列 {"col1":true} になります。

JSON_ARRAY

ソースレコードの列値を JSON 配列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 [1,2,3] であり、column パラメーターが [{"name":"col1","type":"JSON_ARRAY"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":[1,2,3]} になります。

JSON_MAP

ソースレコードの列値を JSON オブジェクトに変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列値が文字列 {"k1":"v1"} であり、column パラメーターが [{"name":"col1","type":"JSON_MAP"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":{"k1":"v1"}} になります。

JSON_BASE64

ソースレコードの列値のバイナリコンテンツを BASE64 エンコードされた文字列に変換し、JSON フィールドに書き込みます。たとえば、列値が長さ 2 のバイト配列であり、16 進数で 0x01 0x02 で表される場合、column パラメーターが [{"name":"col1","type":"JSON_BASE64"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":"AQI="} になります。

JSON_HEX

ソースレコードの列値のバイナリコンテンツを 16 進数文字列に変換し、JSON フィールドに書き込みます。たとえば、列値が長さ 2 のバイト配列であり、16 進数で 0x01 0x02 で表される場合、column パラメーターが [{"name":"col1","type":"JSON_HEX"}] に設定されている場合、Kafka に書き込まれるレコードは {"col1":"0102"} になります。