MySQL とApsaraMQ for Kafka 間でデータを同期するための JDBC コネクターの作成方法を説明します。
前提条件
-
EventBridge が有効化され、必要な許可が付与されていること。詳細については、「EventBridge の有効化と許可の付与」をご参照ください。
-
Object Storage Service (OSS) が有効化され、バケットが作成されていること。詳細については、「バケットの作成」をご参照ください。
-
Serverless App Engine (SAE) が有効化されていること。詳細については、「準備」をご参照ください。
-
VPC と vSwitch が作成されていること。詳細については、「IPv4 ベースの VPC の作成」をご参照ください。
-
ApsaraMQ for Kafka インスタンスを購入し、デプロイ済みであること。詳細については、「インスタンスの購入とデプロイ」をご参照ください。
ステップ1:テーブルの作成
-
ApsaraDB RDS コンソールにログインし、ApsaraDB RDS for MySQL インスタンスを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
インスタンスを作成する際、前提条件で指定された VPC と同じものを選択し、VPC の CIDR ブロックをホワイトリストに追加します。インスタンス設定ステップでは、ネットワークタイプを VPC に設定します。対応する VPC と プライマリ vSwitch を選択します。ホワイトリストに追加をはいに設定し、同じ VPC 内の ECS インスタンスが RDS インスタンスにアクセスできるようにします。
-
インスタンスの作成後、「インスタンス」ページでターゲットインスタンスをクリックします。次に、インスタンス詳細ページのナビゲーションペインで、以下の操作を実行します。
-
アカウントの作成をクリックするか、既存のアカウントを使用します。詳細については、「アカウントとデータベースの作成」をご参照ください。
-
データベースの作成をクリックするか、既存のデータベースを使用します。詳細については、「アカウントとデータベースの作成」をご参照ください。
-
データベース接続をクリックし、内部エンドポイントとポート番号を記録します。
-
-
インスタンス詳細ページで、データベースへのログインをクリックして Data Management Service (DMS) プラットフォームに移動します。ターゲットデータベースをクリックし、SQL ステートメントを使用してテーブルを作成します。たとえば、id 列と number 列を持つテーブルを作成するには、次のコマンドを実行します。詳細については、「SQL コマンド」をご参照ください。
CREATE TABLE sql_table(id INT ,number INT);重要テーブルを作成する際、列をプライマリキーとして設定し、自動インクリメントするように設定します。詳細については、「テーブルスキーマのクエリと変更」をご参照ください。
ステップ 2:コネクタの作成
ソースコネクタ
-
JDBC コネクタファイルをダウンロードし、作成した OSS バケットにアップロードします。詳細については、「ファイルのアップロード」をご参照ください。
重要JDBC コネクタファイルをダウンロードする際は、Java 8 と互換性のあるバージョンを選択してください。
ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
-
左側のナビゲーションペインで、を選択します。
-
タスクリスト ページで、タスクリストの作成 をクリックします。
-
タスクの作成 パネルで、Task Name を設定し、次のパラメーターを設定します。
-
タスクの作成
-
Source (ソース) ステップで、データプロバイダー を Apache Kafka Connect に設定し、次へ をクリックします。
-
コネクタの設定 ステップでは、以下のパラメーターを設定し、次へ をクリックします。
サブセクション
パラメーター
説明
Kafka Connect プラグイン
バケット
JDBC コネクタファイルをアップロードした OSS バケットを選択します。
ファイル
アップロードした .zip ファイルを選択します。
ApsaraMQ for Kafka リソース
Kafka パラメーター設定
ソースコネクタを選択します。
ApsaraMQ for Kafka インスタンス
「前提条件」セクションで作成したインスタンスを選択します。
VPC
VPC ID を選択します。
vSwitch
vSwitch ID を選択します。
セキュリティグループ
セキュリティグループを選択します。
Kafka Connect 設定
ZIP パッケージ内の .properties ファイルを解析
properties ファイルの作成 を選択し、.zip パッケージからソースコネクタの .properties ファイルを選択します。 パスは
/etc/source-xxx.propertiesです。すべてのコネクタパラメータについては、「JDBC ソースコネクタの設定プロパティ」をご参照ください。
-
インスタンス設定 ステップで、次のパラメーターを設定し、次へ をクリックします。
サブセクション
パラメーター
説明
ワーカー仕様
[ Worker 仕様]
ワーカーの仕様を選択します。
[Worker の最小数]
値を 1 に設定します。
[Worker の最大数]
値を 1 に設定します。
Kafka Connect ワーカー設定
Apache Kafka コネクタワーカーの依存関係を自動的に作成
推奨。このオプションを選択すると、選択した ApsaraMQ for Kafka インスタンスで Kafka Connect を実行するために必要な内部トピックとコンシューマーグループが自動的に作成されます。以下の必須設定の値も自動的に入力されます:
-
オフセットトピック:ソースデータのオフセットを格納します。命名規則:
connect-eb-offset-<task_name>。 -
設定トピック:コネクタとタスクの設定情報を格納します。命名規則:
connect-eb-config-<task_name>。 -
ステータストピック:コネクタとタスクのステータス情報を格納します。命名規則:
connect-eb-status-<task_name>。 -
Kafka Connect コンシューマーグループ:Kafka Connect ワーカーが内部トピックからメッセージを消費するために使用するコンシューマーグループ。命名規則:
connect-eb-cluster-<task_name>。 -
Kafka ソースコネクタコンシューマーグループ:シンクコネクタがトピックからデータを消費するために使用します。このパラメーターはソースコネクタには適用されません。命名規則:
connector-eb-cluster-<task_name>-<connector_name>。
-
-
実行設定 ステップで、Log Shipping を Log Service への送信 または Kafka への送信 に設定します。ロールの付与 カードで、ロール設定 を設定し、その後 保存 をクリックします。
重要AliyunSAEFullAccess 権限ポリシーがアタッチされたロールを選択することを推奨します。そうしないと、タスクの実行に失敗する可能性があります。
-
-
タスクのプロパティ
このタスクの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行とデッドレターキュー」をご参照ください。
タスクステータスが実行中に変わるまで待ちます。これは、コネクタが正常に実行されていることを示します。
-
シンクコネクター
-
JDBC コネクターファイルをダウンロードし、作成した OSS バケットにアップロードします。 詳細については、「ファイルのアップロード」をご参照ください。
重要JDBC コネクターファイルをダウンロードする際は、Java 8 と互換性のあるバージョンを選択してください。
ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
-
左側のナビゲーションペインで、 を選択します。
-
タスクリスト ページで、タスクリストの作成 をクリックします。
-
タスクの作成 パネルで、Task Name を設定し、以下のパラメーターを設定します。
-
タスクの作成
-
Source (ソース) ステップで、データプロバイダー を [Apache Kafka Connect] に設定し、次へ をクリックします。
-
コネクタの設定 ステップで、以下のパラメーターを設定し、次へ をクリックします。
パラメーター
パラメーター
説明
Kafka Connect プラグイン
バケット
JDBC コネクターファイルをアップロードした OSS バケットを選択します。
ファイル
アップロードした JDBC コネクターファイルを選択します。
ApsaraMQ for Kafka のリソース
ApsaraMQ for Kafka パラメーター
シンクコネクターを選択します。
ApsaraMQ for Kafka インスタンス
「前提条件」で作成したインスタンスを選択します。
VPC
VPC ID を選択します。
vSwitch
vSwitch ID を選択します。
セキュリティグループ
セキュリティグループを選択します。
Kafka Connect 設定
ZIP パッケージ内の .properties ファイルを解析
properties ファイルの作成 を選択します。.zip パッケージからシンクコネクターの .properties ファイルを選択します。ファイルは /etc/sink-xxx.properties です。
コネクター設定プロパティの完全なリストについては、「JDBC シンクコネクターの設定プロパティ」をご参照ください。
-
インスタンス設定 ステップで、以下のパラメーターを設定し、次へ をクリックします。
パラメーター
パラメーター
説明
ワーカー仕様
[ Worker 仕様]
適切なワーカー仕様を選択します。
[Worker の最小数]
このパラメーターを 1 に設定します。
[Worker の最大数]
値を 1 に設定します。この値は tasks.max の値を超えることはできません。
Kafka Connect ワーカー設定
Kafka Connect ワーカーの依存関係を自動的に作成
このオプションを選択することを推奨します。これにより、選択した ApsaraMQ for Kafka インスタンスで Kafka Connect を実行するために必要な内部トピックとコンシューマーグループが自動的に作成されます。また、次の項目を含む必須構成も自動的に入力されます:
-
オフセットトピック:ソースデータのオフセットを格納します。命名規則:
connect-eb-offset-<task_name>。 -
設定トピック:コネクターとタスクの設定情報を格納します。命名規則:
connect-eb-config-<task_name>。 -
ステータストピック:コネクターとタスクのステータス情報を格納します。命名規則:
connect-eb-status-<task_name>。 -
Kafka Connect コンシューマーグループ:Kafka Connect ワーカーが内部トピックからメッセージを消費するために使用するコンシューマーグループ。命名規則:
connect-eb-cluster-<task_name>。 -
シンクコネクターにのみ適用され、ソースのトピックからデータを消費します。命名規則:
connector-eb-cluster-<task_name>-<connector_name>。
-
-
実行設定 セクションで、Log Shipping を Log Service への送信 または Kafka への送信 に設定します。ロールの付与 カードで、必要な ロール設定 を設定し、保存 をクリックします。
重要AliyunSAEFullAccess 権限ポリシーがアタッチされたロールを選択することを推奨します。そうしないと、タスクが失敗する可能性があります。
-
-
タスクのプロパティ
このタスクの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行とデッドレターキュー」をご参照ください。
タスクのステータスが 実行中 に変わるまで待ちます。実行中 ステータスは、コネクターが正しく動作していることを示します。
-
ステップ3:コネクターのテスト
ソースコネクター
-
Data Management Service (DMS) コンソールで、「ステップ1:テーブルの作成」で作成したテーブルにレコードを挿入します。たとえば、id が 12、number が 20 のレコードを挿入します。コマンドは次のとおりです。
INSERT INTO sql_table(id, number) VALUES(12,20); -
ApsaraMQ for Kafka コンソールにログインします。[インスタンス] ページで、ターゲットインスタンスをクリックします。
-
インスタンス詳細ページで、ターゲットトピック、次に メッセージ検索 タブの順にクリックし、挿入されたメッセージを表示します。メッセージ値のサンプルは次のとおりです。
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":12,"number":20}}
シンクコネクター
-
ApsaraMQ for Kafka コンソールにログインします。[インスタンス] ページで、ターゲットインスタンスをクリックします。
-
左側のナビゲーションペインで [トピック] をクリックし、次にターゲットトピックをクリックします。
-
[トピック詳細] ページの右上隅で、[メッセージの送信] をクリックします。
-
[メッセージの送信と消費を開始] パネルで、メッセージの内容を設定します。たとえば、id が 13、number が 14 のレコードを宛先テーブルに追加する場合、メッセージの内容は次のとおりです。
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":13,"number":14}} -
Data Management Service (DMS) コンソールで、データが宛先テーブルに存在することを確認します。
宛先テーブル
sql_tableには、id=13, number=14のレコードとid=12, number=20のレコードが 2 つ存在します。これにより、シンクコネクターがデータを正しく書き込んでいることが確認できます。
一般的なエラー
シナリオ1:すべてのタスクの実行失敗
エラーメッセージ:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決策:イベントストリーミングタスクの詳細ページで、基本情報 セクションの 診断 をクリックします。コネクターのモニタリングページにリダイレクトされ、そこで失敗したタスクに関する詳細なエラー情報を表示できます。
シナリオ2:Kafka Connect の終了
エラーメッセージ:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決策:ステータスの更新には時間がかかる場合があります。ページを更新してみてください。ステータスが引き続き [失敗] の場合は、次の手順に従ってエラー情報を表示してください。
-
イベントストリーミングタスクの詳細ページの Worker 情報 セクションで、SAE アプリケーション の横にあるインスタンス名をクリックして、SAE アプリケーションの詳細ページを開きます。
-
Basic Information ページで、[インスタンスのデプロイメント情報] タブをクリックします。
-
インスタンスの Actions 列で [Webshell] をクリックし、Kafka Connect のランタイム環境にログインします。
-
vi /home/admin/connector-bootstrap.logコマンドを実行してコネクタの起動ログを表示し、エラーメッセージを確認します。 -
vi /opt/kafka/logs/connect.logコマンドを実行して、コネクタのランタイムログを表示します。 エラー情報を見つけるには、ERROR または WARN を検索します。
-
エラーメッセージに基づいて問題を解決した後、タスクを再起動できます。
シナリオ3:コネクターパラメーター検証の失敗
エラーメッセージ:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決策:エラーメッセージを使用して無効なパラメーターを特定し、更新します。エラーメッセージに基づいて無効なパラメーターが見つからない場合は、シナリオ2の説明に従って Kafka Connect のランタイム環境にログインしてください。次に、次のコマンドを実行してパラメーターを検証します。コマンド内の {connectorType} を、実際のコネクタータイプに置き換えてください。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/{connectorType}/config/validate
このコマンドは、各コネクターパラメーターの検証結果を返します。パラメーターの検証が失敗した場合、errors 属性は空ではありません。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}