すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:APS データ同期機能を使用した Kafka データの同期 (推奨)

最終更新日:Dec 06, 2025

AnalyticDB for MySQL は、AnalyticDB Pipeline Service (APS) データ同期機能を提供します。この機能を使用して Kafka データリンクを作成し、特定のオフセットから Kafka のデータをリアルタイムで取り込むことができます。この機能は、ほぼリアルタイムのデータ出力、完全な履歴データのアーカイブ、および弾力的な分析をサポートします。このトピックでは、Kafka データソースの追加、Kafka データリンクの作成と開始、そしてデータの分析とデータソースの管理方法について説明します。

前提条件

注意事項

  • JSON 形式の Kafka データのみ同期できます。

  • Kafka Topic 内のデータは、一定期間が経過すると自動的にクリアされます。データ同期タスクが失敗し、Topic のデータが期限切れになった場合、タスクを再開してもクリアされたデータを取得することはできません。これにより、データが失われる可能性があります。これを防ぐには、Topic のデータライフサイクルを延長してください。同期タスクが失敗した場合は、速やかにテクニカルサポートにご連絡ください。

  • サンプルの Kafka データが 8 KB を超える場合、Kafka API はデータを切り捨てます。これにより、システムがサンプルデータを JSON 形式に解析できなくなり、フィールドマッピング情報が自動的に生成されなくなります。

  • ソースの Kafka テーブルスキーマの変更は、AnalyticDB for MySQL の DDL 変更を自動的にトリガーしません。

  • データが取り込まれた後、書き込まれたデータを可視化するには、コミット操作を実行する必要があります。短いコミット操作間隔がジョブの安定性や読み書き性能に影響を与えるのを防ぐため、AnalyticDB for MySQL のデータ同期機能には、デフォルトで 5 分のコミット操作間隔が設定されています。したがって、初めてデータ同期ジョブを作成して開始した場合、最初のバッチの書き込みデータを表示するには、少なくとも 5 分待つ必要があります。

課金

AnalyticDB for MySQL データ同期機能を使用すると、以下の料金が発生します。

操作手順

データソースの作成

説明

すでに Kafka データソースを追加している場合は、このステップをスキップして「データリンクの作成」に進んでください。

  1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。

  2. 左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。

  3. 左上隅で、データソースの新規作成 をクリックします。

  4. データソースの新規作成 ページで、パラメーターを設定します。次の表にパラメーターを説明します。

    パラメーター

    説明

    データソースのタイプ

    [Kafka] を選択します。

    データソース名

    システムはデータソースタイプと現在時刻に基づいて名前を自動的に生成します。必要に応じて名前を変更できます。

    データソースの説明

    データレイクハウスのシナリオやビジネス上の制約など、データソースの説明を入力します。

    デプロイモード

    Alibaba Cloud インスタンスのみがサポートされています。

    Kafka インスタンス

    Kafka インスタンス ID。

    ApsaraMQ for Kafka コンソールにログインし、インスタンスリスト ページでインスタンス ID を表示します。

    Kafka Topic

    Kafka で作成された Topic の名前。

    ApsaraMQ for Kafka コンソールにログインし、宛先インスタンスの Topics ページで Topic 名を表示します。

    メッセージデータフォーマット

    Kafka メッセージのデータ形式。JSON のみがサポートされています。

  5. パラメーターを設定した後、作成 をクリックします。

データリンクの作成

  1. 左側のナビゲーションウィンドウで、Simple Log Service / Kafka データ同期 をクリックします。

  2. 左上隅で、同期リンクの新規作成 をクリックします。

  3. 同期リンクの新規作成 ページで、データソースと宛先の設定ターゲットデータベースとターゲットテーブルの設定、および 同期設定 セクションを設定します。

    • データソースと宛先の設定 のパラメーターを次の表に示します。

      パラメーター

      説明

      データリンク名

      データリンクの名前。システムはデータソースタイプと現在時刻に基づいて名前を自動的に生成します。必要に応じて名前を変更できます。

      データソース

      既存の Kafka データソースを選択するか、新しいデータソースを作成します。

      送信先ポートタイプ

      有効な値:

      • データ湖-ユーザーOSS

      • データレイク - ADB レイクストレージ (推奨)

        重要

        ターゲットタイプを データレイク - ADB レイクストレージ に設定する場合、レイクストレージ機能を有効にする必要があります。

      ADB湖のストレージ

      AnalyticDB for MySQL レイクデータが配置されるレイクストレージの名前。

      ドロップダウンリストから宛先レイクストレージを選択します。レイクストレージが作成されていない場合は、ドロップダウンリストで 自動作成 をクリックして自動的に作成します。

      重要

      このパラメーターは、送信先ポートタイプデータレイク - ADB レイクストレージ に設定する場合に必要です。

      OSS パス

      AnalyticDB for MySQL レイクデータの OSS 内のストレージパス。

      重要
      • このパラメーターは、送信先ポートタイプデータ湖-ユーザーOSS に設定する場合に必要です。

      • 表示されるバケットは、AnalyticDB for MySQL クラスターと同じリージョンにあるすべてのバケットです。いずれかを選択できます。ストレージパスは慎重に計画してください。作成後に変更することはできません。

      • 空のフォルダーを選択してください。データの上書きを防ぐため、OSS パスは他のタスクの OSS パスとプレフィックス関係を持ってはいけません。たとえば、2 つのデータ同期タスクの OSS パスが oss://testBucketName/test/sls1/oss://testBucketName/test/ の場合、プレフィックス関係があるため、データ同期中にデータが上書きされます。

      ストレージフォーマット

      データストレージ形式。有効な値:

      • PAIMON

        重要

        この形式は、送信先ポートタイプデータ湖-ユーザーOSS に設定されている場合にのみサポートされます。

      • ICEBERG

    • ターゲットデータベースとターゲットテーブルの設定 のパラメーターを次の表に示します。

      パラメーター

      説明

      ライブラリ名

      AnalyticDB for MySQL のターゲットデータベースの名前。同じ名前のデータベースが存在しない場合は、新しいデータベースが作成されます。同じ名前のデータベースが存在する場合は、データは既存のデータベースに同期されます。データベースの命名規則の詳細については、「制限事項」をご参照ください。

      重要

      データソースと宛先の設定 セクションで、ストレージフォーマット [PAIMON] に設定した場合、既存のデータベースは次の条件を満たす必要があります。そうでない場合、データ同期タスクは失敗します。

      • 外部データベースである必要があります。データベースを作成する文は CREATE EXTERNAL DATABASE<database_name> である必要があります。

      • `CREATE DATABASE` 文の `DBPROPERTIES` パラメーターには catalog プロパティが含まれている必要があり、catalog の値は paimon である必要があります。

      • `CREATE DATABASE` 文の `DBPROPERTIES` パラメーターには adb.paimon.warehouse プロパティが含まれている必要があります。例:adb.paimon.warehouse=oss://testBucketName/aps/data

      • `CREATE DATABASE` 文の `DBPROPERTIES` パラメーターには LOCATION プロパティが含まれている必要があり、データベース名の後に .db を追加する必要があります。そうでない場合、XIHE クエリは失敗します。例:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/

        LOCATION に設定された OSS パスのバケットディレクトリが存在する必要があります。そうでない場合、データベースの作成は失敗します。

      テーブル名

      AnalyticDB for MySQL のターゲットテーブルの名前。データベース内に同じ名前のテーブルが存在しない場合は、新しいテーブルが作成されます。同じ名前のテーブルがすでに存在する場合、データ同期は失敗します。テーブルの命名規則の詳細については、「制限事項」をご参照ください。

      サンプルデータ

      Kafka Topic から最新のデータが自動的に取得され、サンプルデータとして使用されます。

      説明

      Kafka Topic のデータは JSON 形式である必要があります。他のデータ形式が存在する場合、データ同期中にエラーが発生します。

      JSON 解析階層

      JSON データで解析するネストレベルの数を設定します。有効な値:

      • 0:解析しない。

      • 1 (デフォルト):1 レベル解析する。

      • 2:2 レベル解析する。

      • 3:3 レベル解析する。

      • 4:4 レベル解析する。

      JSON のネスト解析ポリシーの詳細については、「JSON 解析レベルとスキーマ推論の例」をご参照ください。

      スキーマフィールドマッピング

      JSON 解析後のサンプルデータのスキーマ情報を表示します。必要に応じて、ターゲットフィールドの名前と型を調整したり、フィールドを追加または削除したりできます。

      パーティションキーの設定

      ターゲットテーブルのパーティションキーを設定します。データインジェストとクエリのパフォーマンスを確保するために、ログ時間またはビジネスロジックに基づいてパーティションを設定することを推奨します。パーティションキーを設定しない場合、ターゲットテーブルにはデフォルトでパーティションがありません。

      ターゲットパーティションキーは、時間形式を使用するか、パーティションフィールドを指定することでフォーマットできます。

      • 日時でパーティション分割するには、パーティションフィールド名に日時フィールドを選択します。フォーマット処理方法として [時間フォーマット] を選択し、ソースフィールドのフォーマットとターゲットパーティションのフォーマットを選択します。AnalyticDB for MySQL は、ソースフィールドのフォーマットに基づいてパーティションフィールドの値を識別し、それをターゲットパーティションのフォーマットに変換してパーティション分割します。たとえば、ソースフィールドが gmt_created で値が 1711358834、ソースフィールドのフォーマットが秒単位のタイムスタンプ、ターゲットパーティションのフォーマットが yyyyMMdd の場合、データは 20240325 でパーティション分割されます。

      • フィールド値でパーティション分割するには、フォーマット処理方法として [パーティションフィールドの指定] を選択します。

    • 同期設定 のパラメーターを次の表に示します。

      パラメーター

      説明

      増分同期の最初のコンシューマーオフセット

      同期タスクが開始されると、選択した時点から Kafka データの消費を開始します。有効な値:

      • 最も古いオフセット (begin_cursor):Kafka データの最も古い時点から自動的にデータを消費します。

      • 最新のオフセット (end_cursor):Kafka データの最新の時点から自動的にデータを消費します。

      • カスタムオフセット:任意の時点を選択できます。システムは、この時刻以降の Kafka の最初のデータから消費を開始します。

      ジョブ型リソースグループ

      タスクを実行するジョブリソースグループを指定します。

      増分同期に必要な ACU の数

      ジョブリソースグループの ACU 数を指定します。最小 ACU 数は 2 で、最大はジョブリソースグループの利用可能な最大コンピューティングリソースです。データインジェストのパフォーマンスとタスクの安定性を向上させるために、より多くの ACU を指定することを推奨します。

      説明

      データ同期タスクを作成すると、ジョブリソースグループから弾力的なリソースが使用されます。データ同期タスクは長時間リソースを占有するため、システムはタスクが使用するリソースをリソースグループから差し引きます。たとえば、ジョブリソースグループの最大 ACU が 48 で、すでに 8 ACU を使用する同期タスクを作成している場合、このリソースグループで別の同期タスクに選択できる最大 ACU 数は 40 です。

      詳細設定

      詳細設定により、同期タスクをカスタマイズできます。カスタム設定を行うには、テクニカルサポートにお問い合わせください。

  4. パラメーターを設定した後、送信 をクリックします。

データ同期タスクの開始

  1. Simple Log Service / Kafka データ同期 ページで、作成したデータ同期タスクを見つけ、操作 列の スタート をクリックします。

  2. 左上隅で、クエリ をクリックします。タスクのステータスが 実行中 に変わると、タスクは正常に開始されています。

データ分析

データが同期された後、Spark Jar 開発機能を使用して AnalyticDB for MySQL のデータを分析できます。Spark 開発の詳細については、「Spark 開発エディター」および「オフライン Spark アプリケーション開発」をご参照ください。

  1. 左側のナビゲーションウィンドウで、ジョブを開発する > Spark Jar 開発 を選択します。

  2. デフォルトのテンプレートにサンプル文を入力し、実行 をクリックします。

    -- これは SparkSQL の一例です。内容を修正して、ご利用の Spark プログラムを実行してください。
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- ここに SQL 文を記述します
    show tables from lakehouse20220413156_adbTest;
  3. 任意:アプリケーションリスト タブで、[操作] 列の ログ をクリックして、Spark SQL ジョブの実行ログを表示します。

データソースの管理

左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。操作 列で次の操作を実行できます。

操作

説明

リンクの新規作成

このデータソースのデータ同期またはデータ移行タスクを作成するページにすばやく移動します。

見る

データソースの詳細な構成を表示します。

編集

名前や説明など、データソースのプロパティを編集します。

削除

現在のデータソースを削除します。

説明

データソースにデータ同期またはデータ移行タスクが存在する場合、データソースを直接削除することはできません。まず Simple Log Service / Kafka データ同期 ページに移動し、ターゲットの同期タスクを見つけ、操作 列の 削除 をクリックしてデータ同期またはデータ移行タスクを削除する必要があります。

JSON 解析レベルとスキーマ推論の例

解析レベルは、JSON データで解析するネストレベルの数を指定します。たとえば、ユーザーが次の JSON データを Kafka に送信します。

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

以下のセクションでは、レベル 0 から 4 の解析結果を示します。

レベル 0 の解析

データは解析されません。元の JSON データが直接出力されます。

JSON フィールド

ターゲットフィールド名

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

レベル 1 の解析

JSON データの第 1 レベルが解析されます。

JSON フィールド

ターゲットフィールド名

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

レベル 2 の解析

JSON データの第 2 レベルが解析されます。フィールドがネストされていない場合、直接出力されます。たとえば、name と age フィールドは直接出力されます。フィールドがネストされている場合、そのサブフィールドが出力されます。たとえば、device フィールドはネストされているため、そのサブフィールド device.osdevice.brand、および device.version が出力されます。

重要

ターゲットフィールド名にはピリオド (.) を含めることができないため、ピリオドは自動的にアンダースコア (_) に置き換えられます。

JSON フィールド

ターゲットフィールド名

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

レベル 3 の解析

JSON フィールド

ターゲットフィールド名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

レベル 4 の解析

JSON フィールド

ターゲットフィールド名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version