このトピックでは、Alibaba Cloud Function Compute を使用して、ApsaraMQ for Kafka インスタンスのデータソース トピックから AnalyticDB for MySQL または AnalyticDB for PostgreSQL データベースにデータをエクスポートするための AnalyticDB シンクコネクタを作成する方法について説明します。
前提条件
次の要件を満たしている必要があります。
ApsaraMQ for Kafka
ApsaraMQ for Kafka インスタンスでコネクタ機能が有効になっていること。詳細については、コネクタ機能の有効化 をご参照ください。
ApsaraMQ for Kafka インスタンスにデータソース トピックが作成されていること。詳細については、手順 1: トピックの作成 をご参照ください。
Function Compute
Function Compute がアクティブ化されていること。詳細については、関数をすばやく作成する をご参照ください。
AnalyticDB for MySQL および AnalyticDB for PostgreSQL
AnalyticDB for MySQL データベースにデータをエクスポートする場合、AnalyticDB for MySQL コンソール で、クラスタとデータベース アカウントを作成し、クラスタに接続し、クラスタ内にデータベースを作成していることを確認してください。詳細については、クラスタの作成、データベース アカウントの作成、AnalyticDB for MySQL クラスタへの接続、および データベースの作成 をご参照ください。
AnalyticDB for PostgreSQL データベースにデータをエクスポートする場合、AnalyticDB for PostgreSQL コンソール で、インスタンスとデータベース アカウントを作成し、データベースに接続していることを確認してください。詳細については、インスタンスの作成、データベース アカウントの作成、および クライアント接続 をご参照ください。
使用上の注意
同じリージョン内の Function Compute を介して、ApsaraMQ for Kafka インスタンスのデータソース トピックから AnalyticDB for MySQL または AnalyticDB for PostgreSQL データベースにのみデータをエクスポートできます。コネクタの制限については、制限 をご参照ください。
AnalyticDB シンクコネクタは、Function Compute を使用してデータをエクスポートします。 Function Compute は無料のリソース枠を提供します。使用量がこの無料枠を超えた場合、Function Compute の請求ルールに基づいて超過分が課金されます。詳細については、請求の概要 をご参照ください。
Function Compute では、関数呼び出しのログをクエリして問題のトラブルシューティングを行うことができます。詳細については、ロギングの設定 をご参照ください。
ApsaraMQ for Kafka は、転送のためにメッセージを UTF-8 エンコードの文字列にシリアル化します。 Message Queue for Apache Kafka はバイナリ データをサポートしていません。
AnalyticDB シンクコネクタの宛先データベースのプライベート エンドポイントを指定する場合は、Function Compute コンソール で、対応する関数の宛先データベースと同じ仮想プライベート クラウド (VPC) と vSwitch を指定する必要があります。そうしないと、Function Compute は宛先データベースにアクセスできません。詳細については、サービスの更新 をご参照ください。
コネクタを作成すると、ApsaraMQ for Kafka によってサービスリンク ロールが作成されます。
サービスリンク ロールが使用できない場合、ApsaraMQ for Kafka は、AnalyticDB シンクコネクタを使用して ApsaraMQ for Kafka から AnalyticDB for MySQL または AnalyticDB for PostgreSQL にデータをエクスポートするために、自動的にサービスリンク ロールを作成します。
サービスリンク ロールが使用可能な場合、ApsaraMQ for Kafka は新しいサービスリンク ロールを作成しません。
サービスリンク ロールの詳細については、サービスリンク ロール をご参照ください。
手順
このセクションでは、AnalyticDB シンクコネクタを使用して、ApsaraMQ for Kafka インスタンスのデータソース トピックから AnalyticDB for MySQL または AnalyticDB for PostgreSQL データベースにデータをエクスポートする方法について説明します。
オプション: AnalyticDB シンクコネクタに必要なトピックとグループを作成します。
トピックとグループを手動で作成したくない場合は、この手順をスキップし、次の手順で [リソース作成方法] パラメータを [自動] に設定します。
重要AnalyticDB シンクコネクタに必要な特定のトピックでは、ローカル ストレージ エンジンを使用する必要があります。 ApsaraMQ for Kafka インスタンスのメジャー バージョンが 0.10.2 の場合、ローカル ストレージ エンジンを使用するトピックは手動で作成できません。このバージョンでは、これらのトピックは自動的に作成する必要があります。
Function Compute と AnalyticDB for MySQL または AnalyticDB for PostgreSQL を設定します。
結果を確認します。
AnalyticDB シンクコネクタに必要なトピックの作成
ApsaraMQ for Kafka コンソールで、AnalyticDB シンクコネクタに必要な 5 つのトピックを手動で作成できます。 5 つのトピックは、タスク オフセット トピック、タスク設定トピック、タスクステータス トピック、デッドレター キュー トピック、およびエラーデータ トピックです。 5 つのトピックは、ストレージ エンジンとパーティションの数で異なります。詳細については、ソース サービスの設定手順のパラメータ をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
重要トピックは、Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに作成する必要があります。トピックはリージョンをまたがって使用できません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンスで実行されている場合、トピックも中国 (北京) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーション ペインで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、トピックのプロパティを指定し、[OK] をクリックします。
パラメータ
説明
例
名前
トピック名。
demo
記述
トピックの説明。
デモ テスト
パーティションの数
トピック内のパーティションの数。
12
ストレージエンジン
説明Professional Edition インスタンスを使用している場合にのみ、ストレージ エンジン タイプを指定できます。 Standard Edition インスタンスを使用している場合、デフォルトでクラウド ストレージが選択されます。
トピックにメッセージを格納するために使用されるストレージ エンジンのタイプ。
ApsaraMQ for Kafka は、次のタイプのストレージ エンジンをサポートしています。
クラウドストレージ: この値を選択すると、システムはトピックに Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを格納します。このストレージ エンジンは、低レイテンシ、高パフォーマンス、長期の耐久性、および高信頼性を備えています。インスタンスの作成時に 仕様タイプ パラメータを Standard Edition (High Write) に設定した場合、このパラメータは クラウドストレージ にのみ設定できます。
ローカルストレージ: この値を選択すると、システムはオープン ソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを格納します。
クラウドストレージ
メッセージタイプ
トピックのメッセージ タイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序は保持されない場合があります。 ストレージエンジン パラメータを クラウドストレージ に設定すると、このパラメータは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合でも、メッセージはメッセージが送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。 ストレージエンジン パラメータを ローカルストレージ に設定すると、このパラメータは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
トピックで使用されるログ クリーンアップ ポリシー。
ストレージエンジン パラメータを ローカルストレージ に設定した場合、ログリリースポリシー パラメータを設定する必要があります。 ApsaraMQ for Kafka Professional Edition インスタンスを使用している場合にのみ、[ストレージ エンジン] パラメータを [ローカル ストレージ] に設定できます。
ApsaraMQ for Kafka は、次のログ クリーンアップ ポリシーを提供します。
Delete: デフォルトのログ クリーンアップ ポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保存されます。ストレージ使用率が 85% を超えると、システムは最も古い保存メッセージを削除してサービスの可用性を確保します。
Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システムの再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システム ステータスと設定に関する情報をログ圧縮トピックに格納する必要があります。
重要ログ圧縮トピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウド ネイティブ コンポーネントでのみ使用できます。詳細については、aliware-kafka-demos をご参照ください。
Compact
タグ
トピックに添付するタグ。
demo
トピックが作成されると、トピック管理 ページでトピックを表示できます。
AnalyticDB シンクコネクタに必要なグループの作成
ApsaraMQ for Kafka コンソールで、AnalyticDB シンクコネクタに必要なグループを手動で作成できます。グループの名前は、connect-タスク名 形式である必要があります。詳細については、ソース サービスの設定手順のパラメータ をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーション ペインで、Group の管理 をクリックします。
Group の管理 ページで、グループの作成 をクリックします。
グループの作成 パネルで、Group ID フィールドにグループ名を入力し、記述 フィールドにグループの説明を入力し、グループにタグを添付して、[OK] をクリックします。
コンシューマー グループが作成されると、Group の管理 ページでコンシューマー グループを表示できます。
AnalyticDB シンクコネクタの作成とデプロイ
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーション ペインで、Connector タスクリスト をクリックします。
Connector タスクリスト ページで、Connector の作成 をクリックします。
Connector の作成 ウィザードで、次の手順を実行します。
基本情報の設定 手順で、次の表に記載されているパラメータを設定し、次へ をクリックします。
パラメータ
説明
例
名前
コネクタの名前。コネクタ名を指定する際は、次のルールに注意してください。
コネクタ名は 1 ~ 48 文字である必要があります。数字、小文字、ハイフン (-) を使用できますが、ハイフン (-) で始めることはできません。
各コネクタ名は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。
コネクタ タスクで使用されるグループの名前は、connect-タスク名 形式である必要があります。このようなグループをまだ作成していない場合、Message Queue for Apache Kafka によって自動的に作成されます。
kafka-adb-sink
インスタンス
Message Queue for Apache Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。
demo alikafka_post-cn-st21p8vj****
ソースサービスの設定 手順で、ソース サービスとして [Message Queue for Apache Kafka] を選択し、次の表に記載されているパラメータを設定して、次へ をクリックします。
表 1. ソース サービスの設定手順のパラメータ
パラメータ
説明
例
データソース Topic
データをエクスポートするデータソース トピックの名前。
adb-test-input
コンシューマースレッドの同時発生数
データソース トピックからデータをエクスポートするために使用される同時コンシューマー スレッドの数。デフォルト値: 6。有効な値:
1
2
3
6
12
6
消費の開始位置
消費が開始されるオフセット。有効な値:
一番古いオフセット: 消費は最も古いオフセットから開始されます。
一番新しいオフセット: 消費は最新のオフセットから開始されます。
一番古いオフセット
VPC ID
データ エクスポート タスクが実行される VPC の ID。 実行環境の設定 をクリックしてパラメータを表示します。デフォルト値は、ApsaraMQ for Kafka インスタンスのデプロイ時に指定した VPC ID です。値を変更する必要はありません。
vpc-bp1xpdnd3l***
VSwitch ID
データ エクスポート タスクが実行される vSwitch の ID。 実行環境の設定 をクリックしてパラメータを表示します。 vSwitch は、ApsaraMQ for Kafka インスタンスと同じ VPC にデプロイする必要があります。デフォルト値は、ApsaraMQ for Kafka インスタンスのデプロイ時に指定した vSwitch ID です。
vsw-bp1d2jgg81***
失敗の処理
エラーによってメッセージの送信に失敗した場合に、パーティションへのサブスクリプションを保持するかどうかを指定します。 実行環境の設定 をクリックしてパラメータを表示します。有効な値:
サブスクリプションの継続: エラーが発生したパーティションへのサブスクリプションを保持し、ログを返します。
サブスクリプションの停止: エラーが発生したパーティションへのサブスクリプションを停止し、ログを返します。
サブスクリプションの継続
リソースの作成方法
AnalyticDB シンクコネクタに必要なトピックとグループを作成する方法。 実行環境の設定 をクリックしてパラメータを表示します。有効な値:
自動作成
手動で作成します
自動作成
Connector コンシューマーグループ
コネクタで使用されるグループ。グループ 実行環境の設定 をクリックしてパラメータを表示します。グループ名は、connect-タスク名 形式である必要があります。
connect-kafka-adb-sink
タスクサイトの Topic
コンシューマー オフセットを格納するために使用されるトピック。 実行環境の設定 をクリックしてパラメータを表示します。
トピック: トピック名は connect-offset で始めることをお勧めします。
パーティション: トピック内のパーティションの数は 1 より大きい必要があります。
ストレージ エンジン: トピックのストレージ エンジンは、ローカル ストレージに設定する必要があります。
cleanup.policy: トピックのログ クリーンアップ ポリシーは、圧縮に設定する必要があります。
connect-offset-kafka-adb-sink
タスク設定の Topic
タスク設定を格納するために使用されるトピック。 実行環境の設定 をクリックしてパラメータを表示します。
トピック: トピック名は connect-config で始めることをお勧めします。
パーティション: トピックには 1 つのパーティションのみを含めることができます。
ストレージ エンジン: トピックのストレージ エンジンは、ローカル ストレージに設定する必要があります。
cleanup.policy: トピックのログ クリーンアップ ポリシーは、圧縮に設定する必要があります。
connect-config-kafka-adb-sink
タスクステータスの Topic
タスクステータスを格納するために使用されるトピック。 実行環境の設定 をクリックしてパラメータを表示します。
トピック: トピック名は connect-status で始めることをお勧めします。
パーティション: トピック内のパーティションの数を 6 に設定することをお勧めします。
ストレージ エンジン: トピックのストレージ エンジンは、ローカル ストレージに設定する必要があります。
cleanup.policy: トピックのログ クリーンアップ ポリシーは、圧縮に設定する必要があります。
connect-status-kafka-adb-sink
デッドレターキューの Topic
Kafka Connect フレームワークのエラー データを格納するために使用されるトピック。 実行環境の設定 をクリックしてパラメータを表示します。トピック リソースを節約するために、デッドレター キュー トピックと エラーデータ トピック の両方としてトピックを作成できます。
トピック: トピック名は connect-error で始めることをお勧めします。
パーティション: トピック内のパーティションの数を 6 に設定することをお勧めします。
ストレージ エンジン: トピックのストレージ エンジンは、ローカル ストレージまたはクラウド ストレージに設定できます。
connect-error-kafka-adb-sink
例外データ Topic
シンクコネクタのエラー データを格納するために使用されるトピック。 実行環境の設定 をクリックしてパラメータを表示します。トピック リソースを節約するために、デッドレター キュー トピック とエラーデータ トピックの両方としてトピックを作成できます。
トピック: トピック名は connect-error で始めることをお勧めします。
パーティション: トピック内のパーティションの数を 6 に設定することをお勧めします。
ストレージ エンジン: トピックのストレージ エンジンは、ローカル ストレージまたはクラウド ストレージに設定できます。
connect-error-kafka-adb-sink
ターゲットサービスの設定 手順で、宛先サービスとして [AnalyticDB] を選択し、次の表に記載されているパラメータを設定して、作成 をクリックします。
パラメータ
説明
例
インスタンスタイプ
宛先データベース インスタンスのタイプ。有効な値: MySQL Edition および PostgreSQL Edition。
AnalyticDB for MySQL
AnalyticDB インスタンス ID
宛先 AnalyticDB for MySQL または AnalyticDB for PostgreSQL インスタンスの ID。
am-bp139yqk8u1ik****
データベース名
宛先データベースの名前。
adb_demo
テーブル名
エクスポートされたデータが格納される宛先データベース内のテーブルの名前。
user
データベースのユーザー名
宛先データベースにログインするために使用するユーザー名。
adbmysql
データベースのパスワード
宛先データベースにログインするために使用するパスワード。パスワードは、宛先 AnalyticDB for MySQL AnalyticDB for PostgreSQL インスタンスの作成時に指定されます。パスワードを忘れた場合は、リセットできます。
AnalyticDB for MySQL データベース アカウントのパスワードをリセットする場合、特権アカウントのパスワードのリセット に記載されている手順を実行します。
AnalyticDB for PostgreSQL データベース アカウントのパスワードをリセットする場合、AnalyticDB for PostgreSQL コンソール にログインし、宛先インスタンスをクリックします。左側のナビゲーション ペインで、[アカウント管理] をクリックし、パスワードをリセットするデータベース アカウントを見つけて、[アクション] 列の [パスワードのリセット] をクリックします。
********
説明ユーザー名とパスワードは、ApsaraMQ for Kafka がデータ エクスポート タスクを作成するときに、環境変数として Function Compute の関数に渡されます。タスクが作成されると、ApsaraMQ for Kafka はユーザー名またはパスワードを保存しません。
コネクタが作成されると、Connector タスクリスト ページで表示できます。
Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。
関連する Function Compute サービスの設定
ApsaraMQ for Kafka コンソールで AnalyticDB シンクコネクタが作成およびデプロイされると、Function Compute によってコネクタの関数サービスと関数が自動的に作成されます。関数サービスは、kafka-service-<connector_name>-<ランダムな文字列> 形式で名前が付けられ、関数は fc-adb-<ランダムな文字列> 形式で名前が付けられます。
Connector タスクリスト ページで、Function Compute サービスを設定するコネクタを見つけて、関数の設定操作 列の をクリックします。
ページが Function Compute コンソールにリダイレクトされます。
Function Compute コンソール で、自動的に作成されたサービスを見つけて、サービスの VPC と vSwitch を設定します。詳細については、サービスの更新 をご参照ください。
AnalyticDB for MySQL または AnalyticDB for PostgreSQL の設定
Function Compute サービスがデプロイされた後、Function Compute コンソールで指定した VPC の CIDR ブロックを、宛先 AnalyticDB for MySQL または AnalyticDB for PostgreSQL インスタンスのホワイトリストに追加する必要があります。 CIDR ブロックは、VPC コンソール の vSwitch ページで表示できます。 CIDR ブロックは、Function Compute サービスの VPC と vSwitch が存在する行にあります。
クラスタのホワイトリストを設定するには、AnalyticDB for MySQL コンソール にログインする必要があります。詳細については、IP アドレス ホワイトリストの設定 をご参照ください。
クラスタのホワイトリストを設定するには、AnalyticDB for PostgreSQL コンソール にログインする必要があります。詳細については、IP アドレス ホワイトリストの設定 をご参照ください。
テスト メッセージの送信
ApsaraMQ for Kafka インスタンスのデータソース トピックにメッセージを送信して、トピック内のデータが AnalyticDB for MySQL または AnalyticDB for PostgreSQL にエクスポートできるかどうかを確認できます。
[メッセージの内容] パラメータの値は JSON 形式である必要があり、キーと値のペアに解析されます。キーは宛先データベース テーブルの列名であり、値は列のデータです。したがって、メッセージの内容の各キーが宛先データベース テーブルの対応する列名を持っていることを確認してください。 ApsaraMQ for Kafka AnalyticDB for MySQL コンソール または AnalyticDB for PostgreSQL コンソール にログインして宛先データベースに接続し、宛先データベース テーブルの列名を確認できます。
Connector タスクリスト ページで、管理するコネクタを見つけて、テスト操作 列の をクリックします。
メッセージの送信 パネルで、テスト メッセージを送信するためのパラメータを設定します。
送信方法 パラメータを コンソール に設定した場合、次の手順を実行します。
メッセージキー フィールドに、メッセージ キーを入力します。例: demo。
メッセージの内容 フィールドに、メッセージの内容を入力します。例: {"key": "test"}。
指定されたパーティションに送信 パラメータを設定して、テスト メッセージを特定のパーティションに送信するかどうかを指定します。
テスト メッセージを特定のパーティションに送信する場合、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例: 0。パーティション ID をクエリする方法については、パーティション ステータスの表示 をご参照ください。
テスト メッセージを特定のパーティションに送信したくない場合は、いいえ をクリックします。
送信方法 パラメータを Docker に設定した場合、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テスト メッセージを送信します。
送信方法 パラメータを SDK に設定した場合、必要なプログラミング言語またはフレームワークの SDK と、テスト メッセージを送受信するためのアクセス方法を選択します。
データ エクスポート結果の確認
ApsaraMQ for Kafka インスタンスのデータソース トピックにメッセージを送信した後、AnalyticDB for MySQL コンソール または AnalyticDB for PostgreSQL コンソール にログインし、デスティネーション データベースに接続します。Data Management Service 5.0 コンソール の [sqlconsole] コマンド ウィンドウで、デスティネーション テーブルをクリックして、データソース トピックのデータが正常にエクスポートされたかどうかを確認します。
次の図は、ApsaraMQ for Kafka から AnalyticDB for MySQL へのデータ エクスポート タスクの結果を示しています: 