すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:AnalyticDB パイプラインサービス (APS) を使用した SLS データの同期 (推奨)

最終更新日:Dec 06, 2025

AnalyticDB for MySQL は、AnalyticDB パイプラインサービス (APS) 機能を提供します。この機能を使用して Simple Log Service (SLS) データリンクを作成し、指定された時間オフセットから Logstore のデータを AnalyticDB for MySQL クラスターにリアルタイムで同期できます。この機能は、ニアリアルタイムのデータ出力、全履歴データのアーカイブ、弾力的な分析などの要件を満たします。このトピックでは、SLS データソースの追加、SLS データリンクの作成、同期タスクの開始、データ分析の実行、およびデータソースの管理方法について説明します。

前提条件

注意事項

  • AnalyticDB for MySQL クラスター内のテーブルは、Simple Log Service の 1 つの Logstore とのみ同期できます。

  • データがインジェストされた後、書き込まれたデータを表示するには、コミット操作を実行する必要があります。短いコミット操作間隔がジョブの安定性や読み書き性能に影響を与えるのを防ぐため、AnalyticDB for MySQL のデータ同期機能には、デフォルトで 5 分のコミット操作間隔があります。したがって、初めてデータ同期ジョブを作成して開始する際には、最初のバッチの書き込みデータを表示するために少なくとも 5 分待つ必要があります。

課金情報

AnalyticDB for MySQL のデータ移行機能を使用して OSS にデータを移行すると、以下の料金が発生します。

手順

RAM 権限付与の設定

異なる Alibaba Cloud アカウント間で SLS データを AnalyticDB for MySQL に同期するには、ソースアカウントで Resource Access Management (RAM) ロールを作成し、その RAM ロールに正確な権限を付与し、RAM ロールの信頼ポリシーを変更する必要があります。同じアカウント内で SLS データを同期する場合は、このステップをスキップして「データソースの作成」に進んでください。

  1. RAM ロールを作成します。詳細については、「Alibaba Cloud アカウントの RAM ロールの作成」をご参照ください。

    説明

    [プリンシパル名] パラメーターを設定する際、[他のアカウント] を選択し、AnalyticDB for MySQL クラスターが含まれる Alibaba Cloud アカウントの ID を入力します。 アカウントセンターにログインし、[セキュリティ設定] ページで [アカウント ID] を確認できます。

  2. RAM ロールに AliyunAnalyticDBAccessingLogRolePolicy 権限を付与します。詳細については、「RAM ロールへの正確な権限の付与」をご参照ください。

  3. RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーの変更」をご参照ください。

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "RAM": [
                "acs:ram::<Alibaba Cloud account ID>:root"
            ],
            "Service": [
                "<Alibaba Cloud account ID>@ads.aliyuncs.com"
            ]
          }
        }
      ],
      "Version": "1"
    }
    説明

    Alibaba Cloud アカウント ID は、ステップ 1 で指定した Alibaba Cloud アカウントの ID です。山括弧 (<>) は含めないでください。

データソースの作成

説明

既存のデータソースを使用する場合は、このステップをスキップして「データリンクの作成」に進んでください。

  1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上で、リージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理したいクラスターを見つけ、クラスター ID をクリックします。

  2. 左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。

  3. 左上隅で、データソースの新規作成 をクリックします。

  4. データソースの新規作成 ページで、パラメーターを設定します。次の表にパラメーターの説明を示します。

    パラメーター

    説明

    データソースのタイプ

    [SLS] を選択します。

    データソース名

    システムはデータソースタイプと現在時刻に基づいて名前を生成します。必要に応じて名前を変更できます。

    データソースの説明

    データレイクハウスのシナリオやビジネス上の制約など、データソースの説明。

    デプロイモード

    現在、Alibaba Cloud インスタンスのみがサポートされています。

    SLS プロジェクトのリージョン

    SLS プロジェクトが存在するリージョン。

    Alibaba Cloud プライマリアカウントをクロスするかどうか

    SLS データソースは、別の Alibaba Cloud アカウントから AnalyticDB for MySQL へのデータ同期をサポートしています。

    • いいえ:現在のアカウント内の SLS データを AnalyticDB for MySQL に同期します。

    • はい:別のアカウントから SLS データを AnalyticDB for MySQL に同期します。このオプションを選択した場合、Alibaba Cloud プライマリアカウントをクロスするAlibaba Cloud をクロスするプライマリアカウントのロール名 を指定する必要があります。

      説明
      • Alibaba Cloud プライマリアカウントをクロスする:ソースが属する Alibaba Cloud アカウントの ID。

      • Alibaba Cloud をクロスするプライマリアカウントのロール名:ソースアカウントで作成された RAM ロールの名前。これはステップ 1 で作成した RAM ロールです。

    SLS project

    ソース SLS プロジェクト。

    重要

    SLS プロジェクトリストには、Alibaba Cloud アカウントとその RAM ユーザー配下のすべてのプロジェクトが表示されます。Alibaba Cloud アカウントに属するプロジェクトを選択した場合、RAM ユーザーがそのプロジェクトに対する権限を持っていることを確認してください。そうでない場合、データは AnalyticDB for MySQL に同期できません。

    SLS Logstore

    ソース SLS Logstore。

  5. パラメーターを設定した後、作成 をクリックします。

データリンクの作成

  1. 左側のナビゲーションウィンドウで、Simple Log Service / Kafka データ同期 をクリックします。

  2. 左上隅で、同期リンクの新規作成 をクリックします。

  3. 同期リンクの新規作成 ページで、データソースと宛先の設定ターゲットデータベースとターゲットテーブルの設定、および 同期設定 のパラメーターを設定します。

    • 次の表に [ソースと宛先] のパラメーターの説明を示します。

      パラメーター

      説明

      データリンク名

      データリンクの名前。システムはデータソースタイプと現在時刻に基づいて名前を生成します。必要に応じて名前を変更できます。

      データソース

      既存の SLS データソースを選択するか、新しいデータソースを作成します。

      送信先ポートタイプ

      次のオプションがサポートされています:

      • データ湖-ユーザーOSS .

      • データレイク - ADB レイクストレージ (推奨).

        重要

        データレイク - ADB レイクストレージ を選択した場合、レイクストレージ機能を有効にする必要があります。

      ADB湖のストレージ

      AnalyticDB for MySQLレイクデータが存在するレイクストレージの名前。

      ドロップダウンリストから宛先のレイクストレージを選択します。レイクストレージが作成されていない場合は、ドロップダウンリストの 自動作成 をクリックして自動的に作成します。

      重要

      このパラメーターは、送信先ポートタイプデータレイク - ADB レイクストレージ に設定されている場合にのみ必須です。

      OSS パス

      AnalyticDB for MySQLデータレイクハウスデータの OSS 内のストレージパス。

      重要
      • このパラメーターは、送信先ポートタイプデータ湖-ユーザーOSS に設定した場合に必須です。

      • 表示されるバケットは、AnalyticDB for MySQL クラスターと同じリージョン内のすべてのバケットです。いずれかを選択できます。ストレージパスは慎重に計画してください。作成後に変更することはできません。

      • 空のフォルダを選択してください。データの上書きを防ぐため、OSS パスは他のタスクの OSS パスとプレフィックス関係を持ってはいけません。例えば、2 つのデータ同期タスクの OSS パスが oss://testBucketName/test/sls1/oss://testBucketName/test/ の場合、これらはプレフィックス関係にあり、データ同期中にデータが上書きされる原因となります。

      ストレージフォーマット

      データストレージのフォーマット。次のオプションがサポートされています:

      • PAIMON。

        重要

        このフォーマットは、送信先ポートタイプデータ湖-ユーザーOSS に設定されている場合にのみサポートされます。

      • ICEBERG。

    • 次の表に ターゲットデータベースとターゲットテーブルの設定 のパラメーターの説明を示します。

      パラメーター

      説明

      ライブラリ名

      AnalyticDB for MySQL の宛先データベースの名前。同じ名前のデータベースが存在しない場合は、新しいデータベースが作成されます。同じ名前のデータベースが存在する場合は、データは既存のデータベースに同期されます。命名規則の詳細については、「制限事項」をご参照ください。

      重要

      データソースと宛先の設定 セクションで、ストレージフォーマット [PAIMON] に設定した場合、既存のデータベースは次の条件を満たす必要があります。そうでない場合、データ同期タスクは失敗します。

      • 外部データベースである必要があります。データベースを作成するステートメントは CREATE EXTERNAL DATABASE<database_name> である必要があります。

      • `CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、catalog プロパティが含まれている必要があり、catalog の値は paimon である必要があります。

      • `CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、adb.paimon.warehouse プロパティが含まれている必要があります。例:adb.paimon.warehouse=oss://testBucketName/aps/data

      • `CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、LOCATION プロパティが含まれている必要があり、データベース名の後に .db を追加する必要があります。そうでない場合、XIHE クエリは失敗します。例:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/

        LOCATION に設定された OSS パスのバケットディレクトリが存在する必要があります。そうでない場合、データベースの作成は失敗します。

      テーブル名

      AnalyticDB for MySQL の宛先テーブルの名前。データベースに同じ名前のテーブルが存在しない場合は、新しいテーブルが作成されます。同じ名前のテーブルが既に存在する場合、データ同期は失敗します。命名規則の詳細については、「制限事項」をご参照ください。

      スキーマフィールドマッピング

      デフォルトでは、フィールドは Simple Log Service の配信タスク設定から取得されます。Logstore に配信タスクが設定されていない場合、フィールドはデフォルトで最新のログデータから取得されます。

      • サポートされているデータの型:BOOLEAN、INT、BIGINT、FLOAT、DOUBLE、STRING。

      • SLS の予約済みフィールドの同期がサポートされています。詳細については、「予約済みフィールド」をご参照ください。

      重要
      • 宛先フィールド名の変更はサポートされていません。

      • タスクが開始されている (実行中または完了) 場合、既存の列は変更できませんが、新しい列を追加することはできます。タスクが作成されたが開始されていない場合は、列を変更できます。

      パーティションキーの設定

      宛先テーブルのパーティションキーを設定します。データインジェストとクエリのパフォーマンスを確保するために、ログ時間またはビジネスロジックに基づいてパーティションを設定することを推奨します。パーティションキーを設定しない場合、宛先テーブルにはデフォルトでパーティションがありません。

      宛先パーティションキーは、時間フォーマットを使用するか、パーティションフィールドを指定することでフォーマットできます。

      • 日時でパーティション分割するには、パーティションフィールド名に日時フィールドを選択します。フォーマット処理方法には、時間フォーマットを選択し、ソースフィールドのフォーマットと宛先パーティションのフォーマットを選択します。AnalyticDB for MySQL は、ソースフィールドのフォーマットに基づいてパーティションフィールドの値を識別し、それを宛先パーティションのフォーマットに変換してパーティション分割します。例えば、ソースフィールドが gmt_created で値が 1711358834、ソースフィールドのフォーマットが秒単位のタイムスタンプ、宛先パーティションのフォーマットが yyyyMMdd の場合、データは 20240325 でパーティション分割されます。

      • フィールド値でパーティション分割するには、フォーマット処理方法にパーティションフィールドの指定を選択します。

    • 次の表に 同期設定 のパラメーターの説明を示します。

      パラメーター

      説明

      増分同期の最初のコンシューマーオフセット

      同期タスクが開始されると、選択した時点から SLS データを消費します。有効な値:

      • 最も早いオフセット (begin_cursor):SLS データの最も早い時点から自動的にデータを消費します。

      • 最新のオフセット (end_cursor):SLS データの最新の時点から自動的にデータを取得します。

      • カスタムオフセット:任意の時点を選択できます。システムは、この時点以降の SLS の最初のエントリからデータの消費を開始します。

      ジョブ型リソースグループ

      タスクを実行するためのジョブリソースグループを指定します。

      増分同期に必要な ACU の数

      ジョブリソースグループの ACU 数を指定します。最小 ACU 数は 2 で、最大はジョブリソースグループの利用可能な最大コンピューティングリソースです。データインジェストのパフォーマンスとタスクの安定性を向上させるために、より多くの ACU を指定することを推奨します。

      説明

      データ同期タスクを作成すると、ジョブリソースグループから弾性リソースが使用されます。データ同期タスクはリソースを長時間占有するため、システムはタスクが使用するリソースをリソースグループから差し引きます。例えば、ジョブリソースグループの最大 ACU が 48 で、既に 8 ACU を使用する同期タスクを作成している場合、このリソースグループで別の同期タスクに選択できる最大 ACU 数は 40 です。

      詳細設定

      詳細設定では、同期タスクをカスタマイズできます。カスタム構成を使用するには、テクニカルサポートにお問い合わせください。

  4. パラメーターを設定した後、送信 をクリックします。

データ同期タスクの開始

  1. Simple Log Service / Kafka データ同期 ページで、作成したデータ同期タスクを見つけ、操作 列の スタート をクリックします。

  2. 左上隅で、クエリ をクリックします。タスクのステータスが 実行中 に変わると、タスクは正常に開始されています。

データ分析

データが同期された後、Spark Jar 開発機能を使用して AnalyticDB for MySQL のデータを分析できます。Spark 開発の詳細については、「Spark 開発エディター」および「オフライン Spark アプリケーション開発」をご参照ください。

  1. 左側のナビゲーションウィンドウで、ジョブを開発する > Spark Jar 開発 を選択します。

  2. デフォルトのテンプレートにサンプルステートメントを入力し、実行 をクリックします。

    -- これは SparkSQL の一例です。内容を修正して Spark プログラムを実行してください。
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- ここに SQL ステートメントを記述します
    show tables from lakehouse20220413156_adbTest;
  3. 任意:アプリケーションリスト タブで、[操作] 列の ログ をクリックして、Spark SQL ジョブの実行ログを表示します。

データソースの管理

左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。操作 列で次の操作を実行できます。

操作

説明

リンクの新規作成

このデータソースのデータ同期またはデータ移行タスクを作成するページにすばやく移動します。

見る

データソースの詳細な構成を表示します。

編集

名前や説明など、データソースのプロパティを編集します。

削除

現在のデータソースを削除します。

説明

データソースにデータ同期タスクまたはデータ移行タスクが存在する場合、データソースを直接削除することはできません。まず Simple Log Service / Kafka データ同期 ページに移動し、ターゲットの同期タスクを見つけ、操作 列の 削除 をクリックしてデータ同期タスクまたはデータ移行タスクを削除する必要があります。