すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka データソース

最終更新日:Jan 10, 2026

Kafka データソースは、Kafka との間で双方向のデータ読み書きチャネルを提供します。このトピックでは、DataWorks が Kafka に対して提供するデータ同期機能について説明します。

サポートされているバージョン

DataWorks は、Alibaba Cloud Kafka および 0.10.2 から 3.6.x までのバージョンのセルフマネージド Kafka をサポートしています。

説明

0.10.2 より前のバージョンの Kafka では、パーティションオフセットの取得がサポートされておらず、データ構造がタイムスタンプをサポートしていない可能性があるため、データ同期はサポートされていません。

リアルタイム読み取り

  • サブスクリプションの Serverless リソースグループを使用する場合、リソース不足によるタスクの失敗を防ぐために、必要な仕様を事前に見積もる必要があります。

    Topic ごとに 1 CU を見積もります。また、トラフィックに基づいてリソースを見積もる必要があります:

    • 非圧縮の Kafka データの場合、10 MB/s のトラフィックごとに 1 CU を見積もります。

    • 圧縮された Kafka データの場合、10 MB/s のトラフィックごとに 2 CU を見積もります。

    • JSON 解析が必要な圧縮された Kafka データの場合、10 MB/s のトラフィックごとに 3 CU を見積もります。

  • サブスクリプションの Serverless リソースグループまたは旧バージョンの Data Integration 専用リソースグループを使用する場合:

    • ご利用のワークロードのフェールオーバーに対する許容度が高い場合、クラスターのスロット使用率は 80% を超えないようにしてください。

    • ご利用のワークロードのフェールオーバーに対する許容度が低い場合、クラスターのスロット使用率は 70% を超えないようにしてください。

説明

実際のリソース使用量は、データの内容や形式などの要因によって異なります。最初に見積もった後、実際の実行時の状況に基づいてリソースを調整できます。

制限事項

Kafka データソースは、Serverless リソースグループ (推奨) および 旧バージョンの Data Integration 専用リソースグループをサポートしています。

単一テーブルからのオフライン読み取り

parameter.groupIdparameter.kafkaConfig.group.id の両方が設定されている場合、parameter.groupIdkafkaConfig パラメーター内の group.id よりも優先されます。

単一テーブルへのリアルタイム書き込み

書き込み操作ではデータ重複排除はサポートされていません。オフセットリセットまたはフェールオーバー後にタスクが再起動されると、重複データが書き込まれる可能性があります。

データベース全体のリアルタイム書き込み

  • リアルタイムデータ同期タスクは、Serverless リソースグループ (推奨) および 旧バージョンの Data Integration 専用リソースグループをサポートしています。

  • ソーステーブルにプライマリキーがある場合、プライマリキーの値が Kafka レコードのキーとして使用されます。これにより、同じプライマリキーへの変更が同じ Kafka パーティションに順序通りに書き込まれることが保証されます。

  • ソーステーブルにプライマリキーがない場合、2 つのオプションがあります。プライマリキーなしでテーブルを同期するオプションを選択した場合、Kafka レコードのキーは空になります。テーブルの変更が順序通りに Kafka に書き込まれることを保証するには、宛先の Kafka Topic は単一のパーティションのみを持つ必要があります。カスタムプライマリキーを選択した場合、1 つ以上の非プライマリキーフィールドの組み合わせが Kafka レコードのキーとして使用されます。

  • Kafka クラスターが例外を返し、同じプライマリキーへの変更が同じ Kafka パーティションに順序通りに書き込まれることを保証する必要がある場合は、Kafka データソースの拡張パラメーターに次の構成を追加します。

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    重要

    この構成は同期性能を大幅に低下させます。厳密な順序性と信頼性の必要性と性能のバランスを取る必要があります。

  • リアルタイム同期で Kafka に書き込まれるメッセージの全体的な形式、ハートビートメッセージの形式、およびソースデータの変更に対応するメッセージの形式の詳細については、「付録:メッセージ形式」をご参照ください。

サポートされているフィールドタイプ

Kafka は非構造化データストレージを提供します。Kafka レコードには通常、キー、値、オフセット、タイムスタンプ、ヘッダー、パーティションなどのデータフィールドが含まれます。DataWorks が Kafka からデータを読み書きする際、DataWorks は以下のポリシーに基づいてデータを処理します。

データの読み取り

DataWorks が Kafka からデータを読み取る際、JSON 形式でデータを解析できます。次の表は、各データモジュールがどのように処理されるかを示しています。

Kafka レコードデータモジュール

処理されるデータの型

key

データ同期タスクの keyType 設定項目に依存します。keyType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。

value

データ同期タスクの valueType 設定項目に依存します。valueType パラメーターの詳細については、付録の完全なパラメーター説明をご参照ください。

offset

Long

timestamp

Long

headers

String

partition

Long

データの書き込み

DataWorks が Kafka にデータを書き込む際、JSON またはテキスト形式でのデータ書き込みをサポートしています。データ処理ポリシーは、データ同期タスクの種類によって異なります。詳細は次の表をご参照ください。

重要
  • テキスト形式でデータを書き込む場合、フィールド名は含まれません。フィールド値は区切り文字で区切られます。

  • リアルタイム同期タスクで Kafka にデータを書き込む場合、組み込み JSON 形式が使用されます。書き込まれるデータには、データベース変更メッセージ、ビジネス時間、DDL 情報など、すべての情報が含まれます。データ形式の詳細については、「付録:メッセージ形式」をご参照ください。

同期タスクの種類

Kafka に書き込まれる value の形式

ソースフィールドの型

書き込み操作の処理方法

オフライン同期

DataStudio のオフライン同期ノード

json

String

UTF-8 エンコード文字列

Boolean

UTF-8 エンコード文字列 "true" または "false" に変換

時刻/日付

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列

数値

UTF-8 エンコード数値文字列

バイトストリーム

バイトストリームは UTF-8 エンコード文字列として扱われ、文字列に変換されます。

text

String

UTF-8 エンコード文字列

Boolean

UTF-8 エンコード文字列 "true" または "false" に変換

時刻/日付

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列

数値

UTF-8 エンコード数値文字列

バイトストリーム

バイトストリームは UTF-8 エンコード文字列として扱われ、文字列に変換されます。

リアルタイム同期:Kafka へのリアルタイム ETL

DataStudio のリアルタイム同期ノード

json

String

UTF-8 エンコード文字列

Boolean

JSON ブール型

時刻/日付

  • ミリ秒未満の精度の時刻値の場合:ミリ秒単位のタイムスタンプを表す 13 桁の JSON 整数に変換されます。

  • マイクロ秒またはナノ秒の精度の時刻値の場合:ミリ秒のタイムスタンプを表す 13 桁の整数と、ナノ秒のタイムスタンプを表す 6 桁の小数部を含む JSON 浮動小数点数に変換されます。

数値

JSON 数値型

バイトストリーム

バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。

text

String

UTF-8 エンコード文字列

Boolean

UTF-8 エンコード文字列 "true" または "false" に変換

時刻/日付

yyyy-MM-dd HH:mm:ss 形式の UTF-8 エンコード文字列

数値

UTF-8 エンコード数値文字列

バイトストリーム

バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。

リアルタイム同期:データベース全体の Kafka へのリアルタイム同期

増分データのみのリアルタイム同期

組み込み JSON 形式

String

UTF-8 エンコード文字列

Boolean

JSON ブール型

時刻/日付

13 桁のミリ秒タイムスタンプ

数値

JSON 数値

バイトストリーム

バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。

同期ソリューション:ワンクリックでの Kafka へのリアルタイム同期

フルのオフライン同期 + 増分のリアルタイム同期

組み込み JSON 形式

String

UTF-8 エンコード文字列

Boolean

JSON ブール型

時刻/日付

13 桁のミリ秒タイムスタンプ

数値

JSON 数値

バイトストリーム

バイトストリームは Base64 エンコードされた後、UTF-8 エンコード文字列に変換されます。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解することができます

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのオフライン同期タスクの設定

単一テーブルまたはデータベース全体のリアルタイム同期タスクの設定

手順の詳細については、「DataStudio でのリアルタイム同期タスクの設定」、「Data Integration でのリアルタイム同期タスクの設定」、および「データベース全体の同期タスクの設定」をご参照ください。

認証設定

SSL

Kafka データソースの [特別な認証方式][SSL] または [SASL_SSL] に設定すると、Kafka クラスターで SSL 認証が有効になります。クライアントトラストストア証明書ファイルをアップロードし、トラストストアパスフレーズを入力する必要があります。

  • Kafka クラスターが Alibaba Cloud Kafka インスタンスの場合、「SSL 証明書アルゴリズムのアップグレード手順」をご参照のうえ、正しいトラストストア証明書ファイルをダウンロードしてください。トラストストアパスフレーズは KafkaOnsClient です。

  • Kafka クラスターが EMR インスタンスの場合、「Kafka 接続での SSL 暗号化の使用」をご参照のうえ、正しいトラストストア証明書ファイルをダウンロードし、トラストストアパスフレーズを取得してください。

  • セルフマネージドクラスターの場合、正しいトラストストア証明書をアップロードし、正しいトラストストアパスフレーズを入力する必要があります。

キーストア証明書ファイル、キーストアパスフレーズ、および SSL パスフレーズは、Kafka クラスターで双方向 SSL 認証が有効になっている場合にのみ必要です。Kafka クラスターサーバーはこれを使用してクライアントの ID を認証します。双方向 SSL 認証は、Kafka クラスターの server.properties ファイルで ssl.client.auth=required が設定されている場合に有効になります。詳細については、「Kafka 接続での SSL 暗号化の使用」をご参照ください。

GSSAPI

Kafka データソースを設定する際に [Sasl メカニズム] を GSSAPI に設定した場合、[JAAS 設定ファイル][Kerberos 設定ファイル][Keytab ファイル] の 3 つの認証ファイルをアップロードする必要があります。また、専用リソースグループの [DNS/HOST] 設定も構成する必要があります。以下のセクションでは、これらのファイルと必要な DNS および HOST 設定について説明します。

説明

Serverless リソースグループの場合、内部 DNS 解決を使用してホストアドレス情報を設定する必要があります。詳細については、「内部 DNS 解決 (PrivateZone)」をご参照ください。

  • JAAS 設定ファイル

    JAAS ファイルは KafkaClient で始まり、その後にすべての中括弧 {} で囲まれた設定項目が続きます:

    • 中括弧内の最初の行は、使用するログオンコンポーネントクラスを定義します。異なる SASL 認証メカニズムに対して、ログオンコンポーネントクラスは固定されています。後続の各設定項目は key=value 形式で記述されます。

    • 最後の設定項目を除くすべての設定項目は、セミコロンで終わってはいけません。

    • 最後の設定項目はセミコロンで終わる必要があります。また、閉じ中括弧 } の後にもセミコロンを追加する必要があります。

    形式要件が満たされていない場合、JAAS 設定ファイルは解析できません。次のコードは、典型的な JAAS 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    設定項目

    説明

    ログオンモジュール

    com.sun.security.auth.module.Krb5LoginModule を設定する必要があります。

    useKeyTab

    true に設定する必要があります。

    keyTab

    任意のパスを指定できます。同期タスクの実行時に、データソース設定時にアップロードされた keytab ファイルが自動的にローカルパスにダウンロードされます。このローカルファイルパスが、keytab 設定項目に設定されます。

    storeKey

    クライアントがキーを保存するかどうかを指定します。true または false に設定できます。データ同期には影響しません。

    serviceName

    Kafka サーバーの server.properties 設定ファイル内の sasl.kerberos.service.name 設定項目に対応します。必要に応じてこの項目を設定してください。

    principal

    Kafka クライアントが使用する Kerberos プリンシパル。必要に応じてこの項目を設定し、アップロードされた keytab ファイルにこのプリンシパルのキーが含まれていることを確認してください。

  • Kerberos 設定ファイル

    Kerberos 設定ファイルには、[libdefaults] と [realms] の 2 つのモジュールが含まれている必要があります。

    • [libdefaults] モジュールは Kerberos 認証パラメーターを指定します。モジュール内の各設定項目は key=value 形式で記述されます。

    • [realms] モジュールは KDC アドレスを指定します。複数のレルムサブモジュールを含むことができます。各レルムサブモジュールはレルム名=で始まります。

    その後に、中括弧で囲まれた設定項目のセットが続きます。各設定項目も key=value 形式で記述されます。次のコードは、典型的な Kerberos 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    設定項目

    説明

    [libdefaults].default_realm

    Kafka クラスターノードにアクセスする際に使用されるデフォルトのレルム。これは通常、JAAS 設定ファイルで指定されたクライアントプリンシパルのレルムと同じです。

    その他の [libdefaults] パラメーター

    [libdefaults] モジュールは、ticket_lifetime などの他の Kerberos 認証パラメーターを指定できます。必要に応じて設定してください。

    [realms].realm name

    JAAS 設定ファイルで指定されたクライアントプリンシパルのレルムおよび [libdefaults].default_realm と同じである必要があります。JAAS 設定ファイルのクライアントプリンシパルのレルムが [libdefaults].default_realm と異なる場合は、2 つのレルムサブモジュールを含める必要があります。これらのサブモジュールは、それぞれ JAAS 設定ファイルのクライアントプリンシパルのレルムと [libdefaults].default_realm に対応する必要があります。

    [realms].realm name.kdc

    ip:port 形式で KDC のアドレスとポートを指定します。例:kdc=10.0.0.1:88。ポートを省略した場合、デフォルトのポート 88 が使用されます。例:kdc=10.0.0.1。

  • Keytab ファイル

    Keytab ファイルには、JAAS 設定ファイルで指定されたプリンシパルのキーが含まれている必要があり、KDC によって検証可能である必要があります。たとえば、現在の作業ディレクトリに client.keytab という名前のファイルがある場合、次のコマンドを実行して、Keytab ファイルに指定されたプリンシパルのキーが含まれているかどうかを確認できます。

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
  • 専用リソースグループの DNS および HOST 設定

    Kerberos 認証が有効になっている Kafka クラスターでは、クラスター内のノードのホスト名が、そのノードのためにキー配布センター (KDC) に登録されたプリンシパルの一部として使用されます。クライアントが Kafka クラスター内のノードにアクセスすると、ローカルの DNS および HOST 設定に基づいてノードのプリンシパルを推測し、KDC からノードのアクセス認証情報を取得します。Kerberos 認証が有効になっている Kafka クラスターに専用リソースグループを使用してアクセスする場合、KDC からクラスターノードのアクセス認証情報を取得できるように、DNS および HOST 設定を正しく構成する必要があります:

    • DNS 設定

      専用リソースグループがアタッチされている VPC 内で、Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されている場合、DataWorks コンソールの専用リソースグループの VPC アタッチメントに、IP アドレス 100.100.2.136 および 100.100.2.138 のカスタムルートを追加できます。これにより、Kafka クラスターノードの PrivateZone 名前解決設定が専用リソースグループに適用されることが保証されます。

    • HOST 設定

      専用リソースグループがアタッチされている VPC 内で、Kafka クラスターノードの名前解決に PrivateZone インスタンスが使用されていない場合、DataWorks コンソールの専用リソースグループのネットワーク設定で、各 Kafka クラスターノードの IP アドレスとドメイン名のマッピングをホスト設定に追加する必要があります。

PLAIN

Kafka データソースを設定する際に [Sasl メカニズム] を PLAIN に設定した場合、JAAS ファイルは KafkaClient で始まり、その後にすべての中括弧 {} で囲まれた設定項目が続きます。

  • 中括弧内の最初の行は、使用するログオンコンポーネントクラスを定義します。異なる SASL 認証メカニズムに対して、ログオンコンポーネントクラスは固定されています。後続の各設定項目は key=value 形式で記述されます。

  • 最後の設定項目を除くすべての設定項目は、セミコロンで終わってはいけません。

  • 最後の設定項目はセミコロンで終わる必要があります。また、閉じ中括弧 "}" の後にもセミコロンを追加する必要があります。

形式要件が満たされていない場合、JAAS 設定ファイルは解析できません。次のコードは、典型的な JAAS 設定ファイルの形式を示しています。xxx プレースホルダーを実際の情報に置き換えてください。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

設定項目

説明

ログオンモジュール

org.apache.kafka.common.security.plain.PlainLoginModule を設定する必要があります。

username

ユーザー名。必要に応じてこの項目を設定してください。

password

パスワード。必要に応じてこの項目を設定してください。

よくある質問

付録:スクリプトデモとパラメーターの説明

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。

Reader スクリプトデモ

次のコードは、Kafka からデータを読み取るための JSON 設定を示しています。

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//throttle が false の場合、mbps パラメーターは効果がなく、レートは制限されません。throttle が true の場合、レートは制限されます。
            "concurrent": 1,//同時実行スレッド数。
            "mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
        }
    }
}

Reader スクリプトパラメーター

パラメーター

説明

必須

datasource

データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

server

ip:port 形式の Kafka ブローカーサーバーのアドレス。

server は 1 つしか設定できませんが、DataWorks が Kafka クラスター内のすべてのブローカーの IP アドレスに接続できることを確認する必要があります。

はい

topic

Kafka Topic。Topic は、Kafka が処理するメッセージフィードの集約です。

はい

column

読み取る Kafka データ。定数列、データ列、属性列がサポートされています。

  • 定数列:単一引用符で囲まれた列。例:["'abc'", "'123'"]

  • データ列

    • データが JSON 形式の場合、["event_id"] のように JSON オブジェクトのプロパティを取得できます。

    • データが JSON 形式の場合、["tag.desc"] のように JSON オブジェクトのネストされたサブプロパティを取得できます。

  • 属性列

    • __key__:メッセージのキー。

    • __value__:メッセージの完全な内容。

    • __partition__:現在のメッセージが存在するパーティション。

    • __headers__:現在のメッセージのヘッダー。

    • __offset__:現在のメッセージのオフセット。

    • __timestamp__:現在のメッセージのタイムスタンプ。

    次のコードは完全な例を示しています。

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

はい

keyType

Kafka キーの型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

いいえ

valueType

Kafka 値の型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

いいえ

beginDateTime

データ消費の開始オフセット。これは時間範囲の左境界 (含む) です。yyyymmddhhmmss 形式の時間文字列です。[スケジューリングパラメーター] と併用できます。詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。

説明

この機能は Kafka 0.10.2 以降でサポートされています。

このパラメーターまたは beginOffset のいずれかを指定する必要があります。

説明

beginDateTimeendDateTime は一緒に使用されます。

endDateTime

データ消費の終了オフセット。これは時間範囲の右境界 (含まない) です。yyyymmddhhmmss 形式の時間文字列です。[スケジューリングパラメーター] と併用できます。詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。

説明

この機能は Kafka 0.10.2 以降でサポートされています。

このパラメーターまたは endOffset のいずれかを指定する必要があります。

説明

endDateTimebeginDateTime は一緒に使用されます。

beginOffset

データ消費の開始オフセット。次の形式で設定できます:

  • 数値、たとえば 15553274。これは消費の開始オフセットを示します。

  • seekToBeginning:最も古いオフセットからデータを消費することを示します。

  • seekToLastkafkaConfig 設定の group.id で指定されたグループ ID に対して保存されたオフセットからデータを読み取ることを示します。グループオフセットはクライアントによって定期的に Kafka サーバーに自動コミットされることに注意してください。そのため、タスクが失敗して再実行されると、データが重複したり失われたりする可能性があります。skipExceedRecord パラメーターが true に設定されている場合、タスクは読み取られた最後のいくつかのレコードを破棄する可能性があります。この破棄されたデータのグループオフセットはすでにサーバーにコミットされているため、このデータは次のタスク実行では読み取れません。

  • seekToEnd:最新のオフセットからデータを消費することを示します。これにより、空のデータが読み取られます。

このパラメーターまたは beginDateTime のいずれかを指定する必要があります。

endOffset

データ消費の終了オフセット。これは、データ消費タスクが終了するタイミングを制御するために使用されます。

このパラメーターまたは endDateTime のいずれかを指定する必要があります。

skipExceedRecord

Kafka は public ConsumerRecords<K, V> poll(final Duration timeout) を使用してデータを消費します。1 回の poll 呼び出しで、endOffset または endDateTime の範囲外のデータがフェッチされることがあります。skipExceedRecord は、超過したデータを宛先に書き込むかどうかを制御します。データ消費は自動オフセットコミットを使用するため、以下を推奨します:

  • 0.10.2 より前の Kafka バージョンの場合:skipExceedRecord を false に設定します。

  • Kafka 0.10.2 以降の場合:skipExceedRecord を true に設定します。

いいえ。デフォルト値は false です。

partition

Kafka Topic には複数のパーティション (partition) があります。デフォルトでは、データ同期タスクは Topic 内のすべてのパーティションをカバーするオフセット範囲からデータを読み取ります。partition を指定して、単一のパーティションのオフセット範囲からのみデータを読み取ることもできます。

いいえ。デフォルト値はありません。

kafkaConfig

データ消費のために KafkaConsumer クライアントを作成する際に、bootstrap.serversauto.commit.interval.mssession.timeout.ms などの拡張パラメーターを指定できます。kafkaConfig を使用して、KafkaConsumer の消費動作を制御できます。

いいえ

encoding

keyType または valueType が STRING に設定されている場合、このパラメーターで指定されたエンコーディングが文字列の解析に使用されます。

いいえ。デフォルト値は UTF-8 です。

waitTIme

コンシューマーオブジェクトが 1 回の試行で Kafka からデータをプルするのを待つ最大時間 (秒単位)。

いいえ。デフォルト値は 60 です。

stopWhenPollEmpty

有効な値は true と false です。このパラメーターが true に設定され、コンシューマーが Kafka から空のデータをプルした場合 (通常は Topic 内のすべてのデータが読み取られたか、ネットワークまたは Kafka クラスターの可用性の問題が原因)、タスクはすぐに停止します。それ以外の場合は、データが再び読み取られるまで再試行します。

いいえ。デフォルト値は true です。

stopWhenReachEndOffset

このパラメーターは、stopWhenPollEmpty が true の場合にのみ有効です。有効な値は true と false です。

  • このパラメーターが true に設定され、コンシューマーが Kafka から空のデータをプルした場合、Kafka Topic パーティションの最新データが読み取られたかどうかがチェックされます。すべてのパーティションから最新データが読み取られた場合、タスクはすぐに停止します。それ以外の場合は、Kafka Topic からデータをプルし続けます。

  • このパラメーターが false に設定され、コンシューマーが Kafka から空のデータをプルした場合、チェックは実行されず、タスクはすぐに停止します。

いいえ。デフォルト値は false です。

説明

これは過去のロジックとの互換性のためです。V0.10.2 より前の Kafka バージョンでは、Kafka Topic のすべてのパーティションから最新データが読み取られたかどうかをチェックできません。ただし、一部のコードエディタタスクは、V0.10.2 より前の Kafka バージョンからデータを読み取っている可能性があります。

次の表は、kafkaConfig パラメーターについて説明しています。

パラメーター

説明

fetch.min.bytes

コンシューマーがブローカーから取得できるメッセージの最小バイト数を指定します。データは、十分なデータがある場合にのみコンシューマーに返されます。

fetch.max.wait.ms

ブローカーがデータを返すのを待つ最大時間。デフォルト値は 500 ミリ秒です。データは、fetch.min.bytes または fetch.max.wait.ms のいずれかの条件が先に満たされた時点で返されます。

max.partition.fetch.bytes

ブローカーが各 partition からコンシューマーに返すことができる最大バイト数を指定します。デフォルト値は 1 MB です。

session.timeout.ms

コンシューマーがサービスを受けられなくなる前にサーバーから切断できる時間を指定します。デフォルト値は 30 秒です。

auto.offset.reset

オフセットがないか、無効なオフセット (コンシューマーが長時間非アクティブで、オフセットを持つレコードが期限切れで削除されたため) で読み取る場合にコンシューマーが実行するアクション。デフォルト値は none で、オフセットは自動的にリセットされないことを意味します。earliest に変更でき、これはコンシューマーが最も古いオフセットから partition レコードを読み取ることを意味します。

max.poll.records

poll メソッドの 1 回の呼び出しで返すことができるメッセージの数。

key.deserializer

メッセージキーの逆シリアル化メソッド。例:org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

データ値の逆シリアル化メソッド。例:org.apache.kafka.common.serialization.StringDeserializer

ssl.truststore.location

SSL ルート証明書のパス。

ssl.truststore.password

ルート証明書ストアのパスワード。Alibaba Cloud Kafka を使用している場合は、これを KafkaOnsClient に設定します。

security.protocol

アクセスプロトコル。現在、SASL_SSL プロトコルのみがサポートされています。

sasl.mechanism

SASL 認証方式。Alibaba Cloud Kafka を使用している場合は、PLAIN を使用します。

java.security.auth.login.config

SASL 認証ファイルのパス。

Writer スクリプトデモ

次のコードは、Kafka にデータを書き込むための JSON 設定を示しています。

{
  "type":"job",
  "version":"2.0",//バージョン番号。
  "steps":[
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"Kafka",//プラグイン名。
      "parameter":{
          "server": "ip:9092", //Kafka のサーバーアドレス。
          "keyIndex": 0, //キーとして使用する列。k を小文字にしたキャメルケース命名規則に従う必要があります。
          "valueIndex": 1, //値として使用する列。現在、ソースデータから 1 つの列を選択するか、このパラメーターを空にすることができます。空の場合、すべてのソースデータが使用されます。
          //たとえば、ODPS テーブルの 2、3、4 番目の列を kafkaValue として使用するには、新しい ODPS テーブルを作成し、元の ODPS テーブルのデータをクリーンアップして新しいテーブルに統合し、その新しいテーブルを同期に使用します。
          "keyType": "Integer", //Kafka キーの型。
          "valueType": "Short", //Kafka 値の型。
          "topic": "t08", //Kafka Topic。
          "batchSize": 1024 //一度に Kafka に書き込まれるデータ量 (バイト単位)。
        },
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
      "errorLimit":{
      "record":"0"//エラーレコード数。
    },
    "speed":{
        "throttle":true,//throttle が false の場合、mbps パラメーターは効果がなく、レートは制限されません。throttle が true の場合、レートは制限されます。
        "concurrent":1, //同時実行ジョブ数。
        "mbps":"12"//最大転送レート。1 mbps は 1 MB/s に相当します。
    }
   },
    "order":{
    "hops":[
        {
            "from":"Reader",
            "to":"Writer"
        }
      ]
    }
}

Writer スクリプトパラメーター

パラメーター

説明

必須

datasource

データソース名。コードエディタはデータソースの追加をサポートしています。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

server

ip:port 形式の Kafka サーバーのアドレス。

はい

topic

Kafka Topic。Kafka が処理するさまざまなメッセージフィードのカテゴリです。

Kafka クラスターに公開される各メッセージにはカテゴリがあり、これは Topic と呼ばれます。Topic はメッセージのグループのコレクションです。

はい

valueIndex

Kafka ライターで値として使用される列。指定しない場合、デフォルトですべての列が連結されて値を形成します。区切り文字は fieldDelimiter で指定されます。

いいえ

writeMode

valueIndex が設定されていない場合、このパラメーターは、ソースレコードのすべての列を連結して Kafka レコードの値を形成する形式を決定します。有効な値は text と JSON です。デフォルト値は text です。

  • text に設定すると、すべての列が fieldDelimiter で指定された区切り文字を使用して連結されます。

  • JSON に設定すると、すべての列が column パラメーターで指定されたフィールド名に基づいて JSON 文字列に連結されます。

たとえば、ソースレコードに a、b、c の値を持つ 3 つの列があり、writeMode が text に、fieldDelimiter が # に設定されている場合、書き込まれる Kafka レコードの値は文字列 a#b#c になります。writeMode が JSON に設定され、column が [{"name":"col1"},{"name":"col2"},{"name":"col3"}] に設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":"c"} になります。

valueIndex が設定されている場合、このパラメーターは無効です。

いいえ

column

データが書き込まれる宛先テーブルのフィールドをコンマで区切って指定します。例:"column": ["id", "name", "age"]

valueIndex が設定されておらず、writeMode が JSON に設定されている場合、このパラメーターはソースレコードの列の値の JSON 構造におけるフィールド名を定義します。例:"column": [{"name":id","type":"JSON_NUMBER"}, {"name":"name","type":"JSON_STRING"}, {"name":"age","type":"JSON_NUMBER"}]

  • ソースレコードの列数が column で設定されたフィールド名の数より多い場合、書き込み中にデータは切り捨てられます。例:

    ソースレコードに a、b、c の値を持つ 3 つの列があり、column が [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b"} になります。

  • ソースレコードの列数が column で設定されたフィールド名の数より少ない場合、column 設定の余分なフィールド名は null または nullValueFormat で指定された文字列で埋められます。例:

    ソースレコードに a、b の値を持つ 2 つの列があり、column が [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"},{"name":"col3","type":"JSON_STRING"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"a","col2":"b","col3":null} になります。valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。

  • JSON フィールドタイプが設定されていない場合、デフォルトのフィールドタイプは JSON_STRING です。

  • JSON フィールドタイプの有効な値については、「付録:JSON フィールドタイプ」をご参照ください。

valueIndex が設定されている場合、または writeMode が text に設定されている場合、このパラメーターは無効です。

valueIndex が設定されておらず、writeMode が JSON に設定されている場合に必須です。

partition

データが書き込まれる Kafka Topic 内のパーティションの番号を指定します。これは 0 以上の整数でなければなりません。

いいえ

keyIndex

Kafka ライターでキーとして使用される列。

keyIndex パラメーターの値は 0 以上の整数でなければなりません。そうでない場合、タスクは失敗します。

いいえ

keyIndexes

Kafka レコードのキーとして使用されるソースレコード内の列の序数の配列。

列の序数は 0 から始まります。たとえば、[0,1,2] は、設定されたすべての列番号の値をコンマで連結して Kafka レコードのキーを形成します。これが指定されていない場合、Kafka レコードのキーは null になり、データは Topic のパーティションにラウンドロビン方式で書き込まれます。このパラメーターまたは keyIndex のいずれか一方のみを指定できます。

いいえ

fieldDelimiter

writeMode が text に設定され、valueIndex が設定されていない場合、ソースレコードのすべての列は、このパラメーターで指定された列区切り文字を使用して連結され、Kafka レコードの値を形成します。区切り文字として単一の文字または複数の文字を設定できます。Unicode 文字は \u0001 の形式で設定できます。\t\n などのエスケープ文字がサポートされています。デフォルト値は \t です。

writeMode が text に設定されていない場合、または valueIndex が設定されている場合、このパラメーターは無効です。

いいえ

keyType

Kafka キーの型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

はい

valueType

Kafka 値の型。有効な値:BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。

はい

nullKeyFormat

keyIndex または keyIndexes で指定されたソース列の値が null の場合、このパラメーターで指定された文字列に置き換えられます。これが設定されていない場合、置き換えは行われません。

いいえ

nullValueFormat

ソース列の値が null の場合、Kafka レコードの値を組み立てる際に、このパラメーターで指定された文字列に置き換えられます。これが設定されていない場合、置き換えは行われません。

いいえ

acks

Kafka プロデューサーを初期化する際の acks 設定。これにより、書き込み成功の確認応答方式が決まります。デフォルトでは、acks パラメーターは all に設定されます。acks の有効な値は次のとおりです:

  • 0:書き込み成功の確認応答なし。

  • 1:プライマリレプリカへの書き込み成功の確認応答。

  • all:すべてのレプリカへの書き込み成功の確認応答。

いいえ

付録:Kafka への書き込みメッセージ形式の定義

リアルタイム同期タスクを設定して実行すると、ソースデータベースから読み取られたデータが JSON 形式で Kafka Topic に書き込まれます。まず、指定されたソーステーブル内のすべての既存データが対応する Kafka Topic に書き込まれます。その後、タスクはリアルタイム同期を開始し、増分データを継続的に Topic に書き込みます。ソーステーブルからの増分 DDL 変更情報も JSON 形式で Kafka Topic に書き込まれます。Kafka に書き込まれたメッセージのステータスと変更情報を取得できます。詳細については、「付録:メッセージ形式」をご参照ください。

説明

オフライン同期タスクによって Kafka に書き込まれるデータの JSON 構造では、payload.sequenceId、payload.timestamp.eventTime および payload.timestamp.checkpointTime フィールドは -1 に設定されます。

付録:JSON フィールドタイプ

[writeMode] が JSON に設定されている場合、column パラメーターの type フィールドを使用して JSON データ型を指定できます。書き込み操作中に、システムはソースレコードの列の値を指定された型に変換しようとします。この変換が失敗すると、ダーティデータが生成されます。

有効な値

説明

JSON_STRING

ソースレコードの列の値を文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が整数 123 で、column が [{"name":"col1","type":"JSON_STRING"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":"123"} になります。

JSON_NUMBER

ソースレコードの列の値を数値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 1.23 で、column が [{"name":"col1","type":"JSON_NUMBER"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":1.23} になります。

JSON_BOOL

ソースレコードの列の値をブール値に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 true で、column が [{"name":"col1","type":"JSON_BOOL"} として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":true} になります

JSON_ARRAY

ソースレコードの列の値を JSON 配列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 [1,2,3] で、column が [{"name":"col1","type":"JSON_ARRAY"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":[1,2,3]} になります。

JSON_MAP

ソースレコードの列の値を JSON オブジェクトに変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が文字列 {"k1":"v1"} で、column が [{"name":"col1","type":"JSON_MAP"}] として設定されている場合、書き込まれる Kafka レコードの値は文字列 {"col1":{"k1":"v1"}} になります。

JSON_BASE64

ソースレコードの列の値のバイナリバイトコンテンツを BASE64 エンコード文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が長さ 2 のバイト配列で、16 進数で 0x01 0x02 と表される場合、column が [{"name":"col1","type":"JSON_BASE64"}] として設定されていると、書き込まれる Kafka レコードの値は文字列 {"col1":"AQI="} になります。

JSON_HEX

ソースレコードの列の値のバイナリバイトコンテンツを 16 進数文字列に変換し、JSON フィールドに書き込みます。たとえば、ソースレコードの列の値が長さ 2 のバイト配列で、16 進数で 0x01 0x02 と表される場合、column が [{"name":"col1","type":"JSON_HEX"}] として設定されていると、書き込まれる Kafka レコードの値は文字列 {"col1":"0102"} になります。