AnalyticDB for MySQL は、AnalyticDB パイプラインサービス (APS) 機能を提供します。この機能を使用して Simple Log Service (SLS) データリンクを作成し、指定された時間オフセットから Logstore のデータを AnalyticDB for MySQL クラスターにリアルタイムで同期できます。この機能は、ニアリアルタイムのデータ出力、全履歴データのアーカイブ、弾力的な分析などの要件を満たします。このトピックでは、SLS データソースの追加、SLS データリンクの作成、同期タスクの開始、データ分析の実行、およびデータソースの管理方法について説明します。
前提条件
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition のクラスターが作成されていること。
AnalyticDB for MySQL クラスターのジョブリソースグループが作成されていること。
AnalyticDB for MySQL クラスターのデータベースアカウントが作成されていること。
Alibaba Cloud アカウントを使用する場合、特権アカウントを作成するだけで済みます。
Resource Access Management (RAM) ユーザーを使用する場合、特権アカウントと標準アカウントを作成し、標準アカウントを RAM ユーザーに関連付ける必要があります。
Simple Log Service (SLS) を有効化し、ご利用の AnalyticDB for MySQL クラスターと同じリージョンにプロジェクトと Logstore を作成済みであること。詳細については、「LoongCollector を使用した ECS テキストログの収集と分析」をご参照ください。
注意事項
AnalyticDB for MySQL クラスター内のテーブルは、Simple Log Service の 1 つの Logstore とのみ同期できます。
データがインジェストされた後、書き込まれたデータを表示するには、コミット操作を実行する必要があります。短いコミット操作間隔がジョブの安定性や読み書き性能に影響を与えるのを防ぐため、AnalyticDB for MySQL のデータ同期機能には、デフォルトで 5 分のコミット操作間隔があります。したがって、初めてデータ同期ジョブを作成して開始する際には、最初のバッチの書き込みデータを表示するために少なくとも 5 分待つ必要があります。
課金情報
AnalyticDB for MySQL のデータ移行機能を使用して OSS にデータを移行すると、以下の料金が発生します。
AnalyticDB for MySQL の AnalyticDB コンピュートユニット (ACU) の弾性リソース料金。詳細については、「Data Lakehouse Edition の課金項目」および「Enterprise Edition と Basic Edition の課金項目」をご参照ください。
OSS のストレージ料金、GET リクエスト、および PUT などのその他のリクエスト。詳細については、「課金の概要」をご参照ください。
手順
ステップ 1 (任意):RAM 権限付与の設定。
ステップ 2:データソースの作成。
ステップ 3:データリンクの作成。
ステップ 4:データ同期タスクの開始。
ステップ 5:データ分析の実行。
ステップ 6 (任意):データソースの管理。
RAM 権限付与の設定
異なる Alibaba Cloud アカウント間で SLS データを AnalyticDB for MySQL に同期するには、ソースアカウントで Resource Access Management (RAM) ロールを作成し、その RAM ロールに正確な権限を付与し、RAM ロールの信頼ポリシーを変更する必要があります。同じアカウント内で SLS データを同期する場合は、このステップをスキップして「データソースの作成」に進んでください。
RAM ロールを作成します。詳細については、「Alibaba Cloud アカウントの RAM ロールの作成」をご参照ください。
説明[プリンシパル名] パラメーターを設定する際、[他のアカウント] を選択し、AnalyticDB for MySQL クラスターが含まれる Alibaba Cloud アカウントの ID を入力します。 アカウントセンターにログインし、[セキュリティ設定] ページで [アカウント ID] を確認できます。
RAM ロールに AliyunAnalyticDBAccessingLogRolePolicy 権限を付与します。詳細については、「RAM ロールへの正確な権限の付与」をご参照ください。
RAM ロールの信頼ポリシーを変更します。詳細については、「RAM ロールの信頼ポリシーの変更」をご参照ください。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::<Alibaba Cloud account ID>:root" ], "Service": [ "<Alibaba Cloud account ID>@ads.aliyuncs.com" ] } } ], "Version": "1" }説明Alibaba Cloud アカウント ID は、ステップ 1 で指定した Alibaba Cloud アカウントの ID です。山括弧 (<>) は含めないでください。
データソースの作成
既存のデータソースを使用する場合は、このステップをスキップして「データリンクの作成」に進んでください。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上で、リージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理したいクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。
左上隅で、データソースの新規作成 をクリックします。
データソースの新規作成 ページで、パラメーターを設定します。次の表にパラメーターの説明を示します。
パラメーター
説明
データソースのタイプ
[SLS] を選択します。
データソース名
システムはデータソースタイプと現在時刻に基づいて名前を生成します。必要に応じて名前を変更できます。
データソースの説明
データレイクハウスのシナリオやビジネス上の制約など、データソースの説明。
デプロイモード
現在、Alibaba Cloud インスタンスのみがサポートされています。
SLS プロジェクトのリージョン
SLS プロジェクトが存在するリージョン。
Alibaba Cloud プライマリアカウントをクロスするかどうか
SLS データソースは、別の Alibaba Cloud アカウントから AnalyticDB for MySQL へのデータ同期をサポートしています。
いいえ:現在のアカウント内の SLS データを AnalyticDB for MySQL に同期します。
はい:別のアカウントから SLS データを AnalyticDB for MySQL に同期します。このオプションを選択した場合、Alibaba Cloud プライマリアカウントをクロスする と Alibaba Cloud をクロスするプライマリアカウントのロール名 を指定する必要があります。
説明Alibaba Cloud プライマリアカウントをクロスする:ソースが属する Alibaba Cloud アカウントの ID。
Alibaba Cloud をクロスするプライマリアカウントのロール名:ソースアカウントで作成された RAM ロールの名前。これはステップ 1 で作成した RAM ロールです。
SLS project
ソース SLS プロジェクト。
重要SLS プロジェクトリストには、Alibaba Cloud アカウントとその RAM ユーザー配下のすべてのプロジェクトが表示されます。Alibaba Cloud アカウントに属するプロジェクトを選択した場合、RAM ユーザーがそのプロジェクトに対する権限を持っていることを確認してください。そうでない場合、データは AnalyticDB for MySQL に同期できません。
SLS Logstore
ソース SLS Logstore。
パラメーターを設定した後、作成 をクリックします。
データリンクの作成
左側のナビゲーションウィンドウで、Simple Log Service / Kafka データ同期 をクリックします。
左上隅で、同期リンクの新規作成 をクリックします。
同期リンクの新規作成 ページで、データソースと宛先の設定、ターゲットデータベースとターゲットテーブルの設定、および 同期設定 のパラメーターを設定します。
次の表に [ソースと宛先] のパラメーターの説明を示します。
パラメーター
説明
データリンク名
データリンクの名前。システムはデータソースタイプと現在時刻に基づいて名前を生成します。必要に応じて名前を変更できます。
データソース
既存の SLS データソースを選択するか、新しいデータソースを作成します。
送信先ポートタイプ
次のオプションがサポートされています:
データ湖-ユーザーOSS .
データレイク - ADB レイクストレージ (推奨).
重要データレイク - ADB レイクストレージ を選択した場合、レイクストレージ機能を有効にする必要があります。
ADB湖のストレージ
AnalyticDB for MySQLレイクデータが存在するレイクストレージの名前。
ドロップダウンリストから宛先のレイクストレージを選択します。レイクストレージが作成されていない場合は、ドロップダウンリストの 自動作成 をクリックして自動的に作成します。
重要このパラメーターは、送信先ポートタイプ が データレイク - ADB レイクストレージ に設定されている場合にのみ必須です。
OSS パス
AnalyticDB for MySQLデータレイクハウスデータの OSS 内のストレージパス。
重要このパラメーターは、送信先ポートタイプ を データ湖-ユーザーOSS に設定した場合に必須です。
表示されるバケットは、AnalyticDB for MySQL クラスターと同じリージョン内のすべてのバケットです。いずれかを選択できます。ストレージパスは慎重に計画してください。作成後に変更することはできません。
空のフォルダを選択してください。データの上書きを防ぐため、OSS パスは他のタスクの OSS パスとプレフィックス関係を持ってはいけません。例えば、2 つのデータ同期タスクの OSS パスが
oss://testBucketName/test/sls1/とoss://testBucketName/test/の場合、これらはプレフィックス関係にあり、データ同期中にデータが上書きされる原因となります。
ストレージフォーマット
データストレージのフォーマット。次のオプションがサポートされています:
PAIMON。
重要このフォーマットは、送信先ポートタイプ が データ湖-ユーザーOSS に設定されている場合にのみサポートされます。
ICEBERG。
次の表に ターゲットデータベースとターゲットテーブルの設定 のパラメーターの説明を示します。
パラメーター
説明
ライブラリ名
AnalyticDB for MySQL の宛先データベースの名前。同じ名前のデータベースが存在しない場合は、新しいデータベースが作成されます。同じ名前のデータベースが存在する場合は、データは既存のデータベースに同期されます。命名規則の詳細については、「制限事項」をご参照ください。
重要データソースと宛先の設定 セクションで、ストレージフォーマット を [PAIMON] に設定した場合、既存のデータベースは次の条件を満たす必要があります。そうでない場合、データ同期タスクは失敗します。
外部データベースである必要があります。データベースを作成するステートメントは
CREATE EXTERNAL DATABASE<database_name>である必要があります。`CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、
catalogプロパティが含まれている必要があり、catalogの値はpaimonである必要があります。`CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、
adb.paimon.warehouseプロパティが含まれている必要があります。例:adb.paimon.warehouse=oss://testBucketName/aps/data。`CREATE DATABASE` ステートメントの `DBPROPERTIES` パラメーターには、
LOCATIONプロパティが含まれている必要があり、データベース名の後に.dbを追加する必要があります。そうでない場合、XIHE クエリは失敗します。例:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/。LOCATIONに設定された OSS パスのバケットディレクトリが存在する必要があります。そうでない場合、データベースの作成は失敗します。
テーブル名
AnalyticDB for MySQL の宛先テーブルの名前。データベースに同じ名前のテーブルが存在しない場合は、新しいテーブルが作成されます。同じ名前のテーブルが既に存在する場合、データ同期は失敗します。命名規則の詳細については、「制限事項」をご参照ください。
スキーマフィールドマッピング
デフォルトでは、フィールドは Simple Log Service の配信タスク設定から取得されます。Logstore に配信タスクが設定されていない場合、フィールドはデフォルトで最新のログデータから取得されます。
サポートされているデータの型:BOOLEAN、INT、BIGINT、FLOAT、DOUBLE、STRING。
SLS の予約済みフィールドの同期がサポートされています。詳細については、「予約済みフィールド」をご参照ください。
重要宛先フィールド名の変更はサポートされていません。
タスクが開始されている (実行中または完了) 場合、既存の列は変更できませんが、新しい列を追加することはできます。タスクが作成されたが開始されていない場合は、列を変更できます。
パーティションキーの設定
宛先テーブルのパーティションキーを設定します。データインジェストとクエリのパフォーマンスを確保するために、ログ時間またはビジネスロジックに基づいてパーティションを設定することを推奨します。パーティションキーを設定しない場合、宛先テーブルにはデフォルトでパーティションがありません。
宛先パーティションキーは、時間フォーマットを使用するか、パーティションフィールドを指定することでフォーマットできます。
日時でパーティション分割するには、パーティションフィールド名に日時フィールドを選択します。フォーマット処理方法には、時間フォーマットを選択し、ソースフィールドのフォーマットと宛先パーティションのフォーマットを選択します。AnalyticDB for MySQL は、ソースフィールドのフォーマットに基づいてパーティションフィールドの値を識別し、それを宛先パーティションのフォーマットに変換してパーティション分割します。例えば、ソースフィールドが gmt_created で値が 1711358834、ソースフィールドのフォーマットが秒単位のタイムスタンプ、宛先パーティションのフォーマットが yyyyMMdd の場合、データは 20240325 でパーティション分割されます。
フィールド値でパーティション分割するには、フォーマット処理方法にパーティションフィールドの指定を選択します。
次の表に 同期設定 のパラメーターの説明を示します。
パラメーター
説明
増分同期の最初のコンシューマーオフセット
同期タスクが開始されると、選択した時点から SLS データを消費します。有効な値:
最も早いオフセット (begin_cursor):SLS データの最も早い時点から自動的にデータを消費します。
最新のオフセット (end_cursor):SLS データの最新の時点から自動的にデータを取得します。
カスタムオフセット:任意の時点を選択できます。システムは、この時点以降の SLS の最初のエントリからデータの消費を開始します。
ジョブ型リソースグループ
タスクを実行するためのジョブリソースグループを指定します。
増分同期に必要な ACU の数
ジョブリソースグループの ACU 数を指定します。最小 ACU 数は 2 で、最大はジョブリソースグループの利用可能な最大コンピューティングリソースです。データインジェストのパフォーマンスとタスクの安定性を向上させるために、より多くの ACU を指定することを推奨します。
説明データ同期タスクを作成すると、ジョブリソースグループから弾性リソースが使用されます。データ同期タスクはリソースを長時間占有するため、システムはタスクが使用するリソースをリソースグループから差し引きます。例えば、ジョブリソースグループの最大 ACU が 48 で、既に 8 ACU を使用する同期タスクを作成している場合、このリソースグループで別の同期タスクに選択できる最大 ACU 数は 40 です。
詳細設定
詳細設定では、同期タスクをカスタマイズできます。カスタム構成を使用するには、テクニカルサポートにお問い合わせください。
パラメーターを設定した後、送信 をクリックします。
データ同期タスクの開始
Simple Log Service / Kafka データ同期 ページで、作成したデータ同期タスクを見つけ、操作 列の スタート をクリックします。
左上隅で、クエリ をクリックします。タスクのステータスが 実行中 に変わると、タスクは正常に開始されています。
データ分析
データが同期された後、Spark Jar 開発機能を使用して AnalyticDB for MySQL のデータを分析できます。Spark 開発の詳細については、「Spark 開発エディター」および「オフライン Spark アプリケーション開発」をご参照ください。
左側のナビゲーションウィンドウで、 を選択します。
デフォルトのテンプレートにサンプルステートメントを入力し、実行 をクリックします。
-- これは SparkSQL の一例です。内容を修正して Spark プログラムを実行してください。 conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- ここに SQL ステートメントを記述します show tables from lakehouse20220413156_adbTest;任意:アプリケーションリスト タブで、[操作] 列の ログ をクリックして、Spark SQL ジョブの実行ログを表示します。
データソースの管理
左側のナビゲーションウィンドウで、データアクセス > データソースの管理 を選択します。操作 列で次の操作を実行できます。
操作 | 説明 |
リンクの新規作成 | このデータソースのデータ同期またはデータ移行タスクを作成するページにすばやく移動します。 |
見る | データソースの詳細な構成を表示します。 |
編集 | 名前や説明など、データソースのプロパティを編集します。 |
削除 | 現在のデータソースを削除します。 説明 データソースにデータ同期タスクまたはデータ移行タスクが存在する場合、データソースを直接削除することはできません。まず Simple Log Service / Kafka データ同期 ページに移動し、ターゲットの同期タスクを見つけ、操作 列の 削除 をクリックしてデータ同期タスクまたはデータ移行タスクを削除する必要があります。 |