AnalyticDB シンクコネクタは、ApsaraMQ for Kafka の Topic からメッセージを読み取り、それらを AnalyticDB for MySQL または AnalyticDB for PostgreSQL データベースに書き込みます。このコネクタは、Function Compute を使用して、同じリージョン内のサービス間でデータを転送します。
仕組み
データは 3 つのコンポーネントを介して流れます。
ApsaraMQ for Kafka は、データソース Topic からメッセージを読み取ります。
Function Compute は、メッセージを受信し、それらをターゲットデータベースに書き込みます。
AnalyticDB は、指定されたテーブルにデータを保存します。
内部的には、コネクタは 5 つの Topic (オフセット、構成、ステータス、デッドレターキュー、およびエラーデータ用) と 1 つのコンシューマーグループを使用します。これらのリソースは、自動または手動で作成できます。
前提条件
開始する前に、以下を確認してください。
コネクタ機能が有効化されている ApsaraMQ for Kafka インスタンス
ApsaraMQ for Kafka インスタンス内の データソース Topic
AnalyticDB ターゲットデータベース:
AnalyticDB for MySQL: クラスター、データベースアカウント、クライアント接続、および データベース
AnalyticDB for PostgreSQL: インスタンス、データベースアカウント、および クライアント接続
コネクタの構成のために、以下の情報を収集してください。
| 情報 | 説明 | 例 |
|---|---|---|
| データソース Topic 名 | データのエクスポート元となる Kafka Topic | adb-test-input |
| AnalyticDB インスタンス ID | 宛先インスタンス ID | am-bp139yqk8u1ik**** |
| データベース名 | ターゲットデータベース | adb_demo |
| テーブル名 | エクスポートされたデータの宛先テーブル | user |
| データベースユーザー名 | データベースログインユーザー名 | adbmysql |
| データベースパスワード | データベースログインパスワード | ******** |
注意事項
データエクスポートは同じリージョンに限定されます。リージョン間エクスポートはサポートされていません。詳細については、「使用制限」をご参照ください。
Function Compute は無料リソースクォータを提供します。無料クォータを超える使用量については、「Function Compute の料金」に従って課金されます。
ApsaraMQ for Kafka は、メッセージを UTF-8 エンコードされた文字列としてシリアル化します。バイナリデータはサポートされていません。
ターゲットデータベースがプライベートエンドポイントを使用している場合、Function Compute サービスに同じ VPC と vSwitch を構成してください。そうしないと、Function Compute はデータベースに到達できません。詳細については、「サービスの更新」をご参照ください。
ApsaraMQ for Kafka は、コネクタを作成する際に、既に存在しない場合、自動的にサービスリンクロールを作成します。
関数実行の問題をトラブルシューティングするには、Function Compute のログ記録を使用します。詳細については、「ログ記録の設定」をご参照ください。
コネクタのセットアップ
コネクタをセットアップするには:
ステップ 1: (オプション) 必要な Topic とコンシューマーグループの作成
ApsaraMQ for Kafka がこれらのリソースを自動的に作成するには、この手順をスキップし、[リソース作成方法] を [自動] に設定します(ステップ 2 で)。
ご利用の ApsaraMQ for Kafka インスタンスがメジャーバージョン 0.10.2 を実行している場合、ローカルストレージエンジンを必要とする Topic は自動的に作成する必要があります。このバージョンでは、ローカルストレージ Topic の手動作成はサポートされていません。
コネクタには 5 つの内部 Topic と 1 つのコンシューマーグループが必要です。特定の構成が必要な場合にのみ、手動で作成してください。
必要な Topic
| Topic | 名前プレフィックス | パーティション | ストレージエンジン | ログクリーンアップポリシー |
|---|---|---|---|---|
| タスクオフセット | connect-offset | 1 より大きい | ローカルストレージ | コンパクト |
| タスク構成 | connect-config | 1 | ローカルストレージ | コンパクト |
| タスクステータス | connect-status | 6 (推奨) | ローカルストレージ | コンパクト |
| デッドレターキュー | connect-error | 6 (推奨) | ローカルストレージ または クラウドストレージ | 任意 |
| エラーデータ | connect-error | 6 (推奨) | ローカルストレージ または クラウドストレージ | 任意 |
ヒント: デッドレターキュー Topic とエラーデータ Topic は、リソースを節約するために同じ Topic を共有できます。
Topic の作成
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース分布] セクションで、インスタンスのリージョンを選択します。
Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用できません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされた ECS インスタンスで実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。
[インスタンス] ページで、インスタンス名をクリックします。
左側のナビゲーションウィンドウで、[トピック] をクリックします。
[トピック] ページで、[トピックの作成] をクリックします。
[トピックの作成] パネルで、以下のパラメーターを設定し、[OK] をクリックします。
| パラメーター | 説明 |
|---|---|
| Name | トピック名。上記の表にある命名プレフィックスを使用します(例:connect-offset-kafka-adb-sink)。 |
| Partitions | パーティション数。上記の表で必要な値を確認してください。 |
| Storage Engine | ストレージエンジンのタイプ。Professional Edition インスタンスでのみ利用可能です。Standard Edition インスタンスでは、デフォルトで Cloud Storage が使用されます。オプションは以下のとおりです。Cloud Storage(Alibaba Cloud ディスクによる低遅延・高信頼性の 3 レプリカ分散ストレージ)または Local Storage(Apache Kafka ISR アルゴリズムによる 3 レプリカ分散ストレージ)。Standard (High Write) インスタンスでは、Cloud Storage のみがサポートされています。 |
| Message Type | メッセージの順序保証レベル。Normal Message(Cloud Storage のデフォルト):ブローカー障害時にパーティション内の順序が保たれない場合があります。Partitionally Ordered Message(Local Storage のデフォルト):ブローカー障害時にも順序が保たれますが、影響を受けたパーティションは復旧されるまで利用できません。 |
| Log Cleanup Policy | Storage Engine が Local Storage の場合に必須です。Delete(デフォルト):保存期間に基づいてメッセージを保持し、ストレージ使用率が 85 % を超えると最も古いメッセージを削除します。Compact:キーごとに最新の値のみを保持します。Kafka Connect を使用するコネクタ内部トピックにはこの設定が必要です。 重要 ログ圧縮済みトピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。 |
| Description | 任意の説明です。 |
| Tag | 任意のタグです。 |
必要な 5 つの Topic をすべて作成するために繰り返します。
コンシューマーグループの作成
コネクタには、connect-<connector-name> (例: connect-kafka-adb-sink) という名前のコンシューマーグループが必要です。
ApsaraMQ for Kafka コンソールにログインします。
「[リソース配布]」セクションで、インスタンスのリージョンを選択します。
[インスタンス] ページで、インスタンス名をクリックします。
左側のナビゲーションウィンドウで、[グループ] をクリックします。
[グループ] ページで、[グループの作成] をクリックします。
「[グループの作成]」パネルで、[グループ ID] フィールドにグループ名を入力し、任意の説明とタグを追加してから、[OK] をクリックします。
ステップ 2: コネクタの作成とデプロイ
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース分布] セクションで、インスタンスのリージョンを選択します。
[インスタンス] ページで、インスタンス名をクリックします。
左側のナビゲーションウィンドウで、[コネクタ] をクリックします。
「コネクタ」ページで、「コネクタの作成」をクリックします。
[コネクタの作成] ウィザードを完了します。
基本情報の構成
| パラメーター | 説明 | 例 |
|---|---|---|
| 名前 | コネクタ名。1~48 文字: 数字、小文字、ハイフン (-)。ハイフンで開始することはできません。インスタンス内で一意である必要があります。 | kafka-adb-sink |
| インスタンス | ApsaraMQ for Kafka インスタンス。インスタンス名と ID を表示します。 | demo alikafka_post-cn-st21p8vj**** |
[次へ] をクリックします。
ソースサービスの構成
[Message Queue For Apache Kafka] をソースサービスとして選択し、以下のパラメーターを設定します。
| パラメーター | 説明 | 例 |
|---|---|---|
| [データソースのトピック] | データをエクスポートする元のトピックです。 | adb-test-input |
| [コンシューマースレッドの同時実行数] | 同時実行コンシューマースレッドの数です。デフォルト: 6。有効な値: 1、2、3、6、12。 | 6 |
| [コンシューマーオフセット] | 消費を開始する場所です。 [最も古いオフセット]:最初から開始します。 [最新のオフセット]:最新のメッセージから開始します。 | [最も古いオフセット] |
「[ランタイム環境の設定]」をクリックして、詳細設定を展開します。
| パラメーター | 説明 | 例 |
|---|---|---|
| VPC ID | エクスポートタスク用の VPC。ApsaraMQ for Kafka インスタンスの VPC にデフォルト設定されます。 | vpc-bp1xpdnd3l*** |
| vSwitch ID | エクスポートタスク用の vSwitch。インスタンスと同じ VPC 内にある必要があります。 | vsw-bp1d2jgg81*** |
| 障害処理ポリシー | メッセージ送信失敗時のアクション。[継続サブスクリプション]: メッセージの消費を継続し、エラーをログに記録します。[停止サブスクリプション]: メッセージの消費を停止し、エラーをログに記録します。ログの詳細については、「コネクタの管理」をご参照ください。エラーコードについては、「エラーコード」をご参照ください。 | サブスクリプションを続行 |
| リソース作成方法 | 必要な内部トピックとコンシューマーグループの作成方法。 自動: ApsaraMQ for Kafka が自動的に作成します。 手動: ステップ 1 で作成されたリソースを使用します。 | 自動 |
| コネクタコンシューマーグループ | コネクタ用のコンシューマーグループ。フォーマット: connect-<connector-name>。 | connect-kafka-adb-sink |
| タスクオフセットトピック | コンシューマオフセットを保存します。名前プレフィックス: connect-offset。パーティション: 1 より大きい。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクト。 | connect-offset-kafka-adb-sink |
| タスク構成 Topic | タスク構成を保存します。名前プレフィックス: connect-config。パーティション: 1。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクト。 | connect-config-kafka-adb-sink |
| タスクステータス Topic | タスクステータスを保存します。名前プレフィックス: connect-status。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクト。 | connect-status-kafka-adb-sink |
| デッドレターキュー トピック | Kafka Connect フレームワークのエラーデータを保存します。名前プレフィックス: connect-error。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ または クラウドストレージ。エラーデータ Topic と Topic を共有できます。 | connect-error-kafka-adb-sink |
| エラーデータ Topic | シンクコネクタのエラーデータを保存します。名前プレフィックス: connect-error。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ または クラウドストレージ。デッドレターキュー Topic と Topic を共有できます。 | connect-error-kafka-adb-sink |
[Next] をクリックします。
宛先サービスの構成
宛先サービスとして AnalyticDB を選択し、以下のパラメーターを構成します。
| パラメーター | 説明 | 例 |
|---|---|---|
| インスタンスタイプ | ターゲットデータベースタイプ: AnalyticDB for MySQL または AnalyticDB for PostgreSQL。 | AnalyticDB for MySQL |
| AnalyticDB インスタンス ID | 宛先インスタンス ID。 | am-bp139yqk8u1ik**** |
| データベース名 | ターゲットデータベース。 | adb_demo |
| テーブル名 | エクスポートされたデータの宛先テーブル。 | user |
| データベースユーザー名 | データベースログインユーザー名。 | adbmysql |
| データベースパスワード | データベースのログインパスワード。パスワードを忘れた場合、AnalyticDB for MySQL では AnalyticDB for MySQL コンソールでリセットします。AnalyticDB for PostgreSQL では、[アカウント管理] に移動し、[パスワードのリセット] をクリックします。 | ******** |
データベースのユーザー名とパスワードは、ApsaraMQ for Kafka がエクスポートタスクを作成する際に環境変数として Function Compute に渡されます。ApsaraMQ for Kafka は、タスク作成後にこれらの認証情報を保存しません。
[作成] をクリックします。
[コネクタ] ページで、コネクタを見つけ、[アクション] 列の [デプロイ] をクリックします。
ステップ 3: Function Compute ネットワークの構成
デプロイ後、Function Compute はコネクタ用にサービス (名前: kafka-service-<connector_name>-<random_string>) と関数 (名前: fc-adb-<random_string>) を自動的に作成します。
[コネクタ] ページでコネクタを見つけ、[操作] 列の [関数の設定] をクリックします。これにより Function Compute コンソールが開きます。
Function Compute コンソールで、自動的に作成されたサービスを見つけ、ターゲットデータベースと一致するように VPC と vSwitch を構成します。詳細については、「サービスの更新」をご参照ください。
ステップ 4: AnalyticDB のホワイトリストへの VPC CIDR ブロックの追加
Function Compute で設定された VPC の CIDR ブロックを AnalyticDB のホワイトリストに追加します。CIDR ブロックは、[VPC コンソール] の [vSwitch] ページで確認できます — Function Compute サービスの VPC および vSwitch と一致する行に表示されます。
AnalyticDB for MySQL: AnalyticDB for MySQL コンソールでホワイトリストを構成します。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
AnalyticDB for PostgreSQL: AnalyticDB for PostgreSQL コンソールでホワイトリストを構成します。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
ステップ 5: データエクスポートの検証
テストメッセージの送信
メッセージコンテンツは有効な JSON である必要があります。各 JSON キーは宛先テーブルのカラム名にマップされ、各値はカラムデータにマップされます。テストメッセージを送信する前に、JSON キーが宛先テーブルのカラム名と一致することを確認してください。
「コネクタ」ページで、コネクタを見つけ、「操作」列の「テスト」をクリックします。
[メッセージの送信] パネルで、[送信方法] を選択します:
コンソール:
[メッセージキー] フィールドに、キー (例:
demo) を入力します。[メッセージ内容] フィールドに、JSON コンテンツ (例:
{"key": "test"}) を入力します。[指定パーティションへの送信] で、[はい] を選択して [パーティション ID] (例:
0) を入力して特定のパーティションをターゲットにするか、[いいえ] を選択して ApsaraMQ for Kafka にパーティションを割り当てさせます。 パーティション ID の詳細については、「パーティションステータスの表示」をご参照ください。
Docker:[Docker コンテナを実行してサンプルメッセージを生成する] セクションに表示されている Docker コマンドを実行します。
[SDK]: プログラミング言語用の SDK とテストメッセージを送信するためのアクセス方法を選択します。
結果の検証
メッセージ送信後、宛先テーブルをチェックしてデータがエクスポートされたことを確認します。
AnalyticDB for MySQL コンソールまたは AnalyticDB for PostgreSQL コンソールにログインします。
ターゲットデータベースに接続します。
「Data Management Service 5.0 コンソール」の [SQL コンソール] で、送信先テーブルを開き、エクスポートされたデータが存在することを確認してください。