このチュートリアルでは、SQL Server データベースから ApsaraMQ for Kafka へ行レベルの変更をストリームする変更データキャプチャ (CDC) パイプラインを構築する方法について説明します。このパイプラインは、Debezium SQL Serverソースコネクタと分散モードの Kafka Connect を使用します。
仕組み
Debezium SQL Serverソースコネクタは、CDC を介して SQL Server トランザクションログを読み取り、各挿入、更新、または削除を Kafkaメッセージに変換します。Kafka Connect は、コネクタを分散ワーカープロセスとして実行し、変更イベントを ApsaraMQ for Kafka の Topic にプッシュし、専用Topic にスキーマ履歴を追跡します。
データフロー:
SQL Server (CDC enabled) → Debezium source connector → Kafka Connect → ApsaraMQ for Kafka topics各監視対象テーブルは、<server-name>.<database>.<table> という名前の個別の Topic にマップされます。例えば、server1.testDB.products のようになります。
前提条件
開始する前に、次のコンポーネントを準備してください。
Debezium SQL Serverソースコネクタ -- Maven リポジトリからダウンロードします。ご利用の Kafka Connect のバージョンと互換性のあるバージョンを選択してください。
Kafka Connect 2.1.0 以降 -- Apache Kafka ダウンロードからダウンロードします。
説明 Debezium SQL Serverソースコネクタには Kafka Connect 2.1.0 以降が必要です。以前のバージョンはサポートされていません。Docker -- Docker Desktopからダウンロードします。
ステップ 1: Kafka Connect の構成
Debezium SQL Serverソースコネクタパッケージをローカルディレクトリに抽出します。
Kafka Connect 構成ファイル
connect-distributed.propertiesを開き、plugin.pathを抽出したコネクタを含むディレクトリに設定します。重要以前のバージョンの Kafka Connect では、
plugin.pathプロパティはサポートされていません。代わりにCLASSPATH環境変数を設定してください。export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*# 抽出されたコネクタ JAR を含むディレクトリへのパス plugin.path=/kafka/connect/plugins
ステップ 2: Kafka Connect の開始
インターネットアクセスのみ -- インターネット経由で ApsaraMQ for Kafka にアクセスする場合は、まず JAAS認証ファイルを構成してください。仮想プライベートクラウド (VPC) アクセスの場合は、このステップをスキップしてください。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"分散モードで Kafka Connect を開始します。
bin/connect-distributed.sh config/connect-distributed.properties
ステップ 3: Docker を使用した SQL Server のセットアップ
CDC には SQL Server 2016 SP1 以降が必要です。詳細については、「SQL Server 2016 SP1 リリースノート」および「変更データキャプチャについて」をご参照ください。
docker-compose-sqlserver.yaml をダウンロードします。
SQL Server コンテナを開始します。
docker-compose -f docker-compose-sqlserver.yaml upinventory.sql をダウンロードし、テストデータをデータベースにロードします。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
ステップ 4: 既存テーブルの CDC の有効化 (オプション)
データベースに既に存在するテーブルから変更をキャプチャする場合は、データベースレベルとテーブルレベルで CDC を有効にしてください。
データベースレベルでの CDC の有効化
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO特定のテーブルの CDC の有効化
USE testDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GOパラメーターは次のとおりです。
| パラメーター | 説明 |
|---|---|
@source_schema | ソーステーブルのスキーマ (例: dbo |
@source_name | 監視対象テーブルの名前 |
@role_name | 変更データへのアクセスを制御するデータベースロール |
@filegroup_name | 変更テーブルを保存するために使用されるファイルグループ |
@supports_net_changes | ネット変更クエリを有効にするには 1 に設定します。 |
CDC ステータスの確認
次のコマンドを実行して、CDC がアクティブであり、ご利用のアカウントが必要な権限を持っていることを確認してください。
EXEC sys.sp_cdc_help_change_data_capture
GO結果が空の場合、ご利用のアカウントは CDC が有効なテーブルにアクセスできません。ロール割り当てを確認してください。
SQL Server Agent が実行中であることの確認
CDC は SQL Server Agent に依存します。次のコマンドを実行して、そのステータスを確認してください。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'出力が Running を示している場合、SQL Server Agent はアクティブです。
ステップ 5: ソースコネクタの構成と開始
ApsaraMQ for Kafka での Topic の作成
コネクタを開始する前に、ApsaraMQ for Kafka コンソールで必要な Topic を作成してください。コネクタは、<server-name>.<database>.<table> という名前の Topic に変更イベントを書き込みます。
このチュートリアルのテストデータベース (testDB に 4 つのテーブル) の場合、次の Topic を作成してください。
| Topic | 目的 |
|---|---|
server1 | コネクタのサーバーレベル Topic |
server1.testDB.customers | customers テーブルからの変更イベント |
server1.testDB.orders | orders テーブルからの変更イベント |
server1.testDB.products | products テーブルからの変更イベント |
server1.testDB.products_on_hand | products_on_hand テーブルからの変更イベント |
schema-changes-inventory | コネクタのスキーマ変更履歴 |
Topic の作成方法については、「Topic の作成」をご参照ください。または、CreateTopic API を呼び出します。
コネクタの構成
register-sqlserver.json をダウンロードします。
register-sqlserver.jsonを開き、アクセス方法に基づいて次のプロパティを更新します。VPC アクセス インターネットアクセス 主要なプロパティは次のとおりです。インターネットアクセスの場合、追加の SSL および SASL プロパティにより、ApsaraMQ for Kafka への暗号化された認証済み接続が可能になります。プロパティ 説明 database.history.kafka.bootstrap.serversご利用の ApsaraMQ for Kafka インスタンスのエンドポイント。VPC アクセスにはデフォルトエンドポイントを、インターネットアクセスには SSL エンドポイントを使用します。エンドポイントは ApsaraMQ for Kafka コンソールで確認できます。 database.server.nameすべての変更イベント Topic のプレフィックスとして使用される論理サーバー名。 server1に設定すると、server1.testDB.productsのような Topic が生成されます。database.history.kafka.topicコネクタがスキーマ変更履歴を保存する Topic。コネクタを開始する前に、この Topic をコンソールで作成してください。 "database.history.kafka.bootstrap.servers" : "<your-default-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory""database.history.kafka.bootstrap.servers" : "<your-ssl-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory", "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN"
コネクタの開始
POST リクエストを送信して、Kafka Connect にコネクタを登録します。
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @register-sqlserver.json結果の確認
監視対象の SQL Server テーブルに行を挿入、更新、または削除します。
ApsaraMQ for Kafka コンソールで、[メッセージクエリ] ページに移動し、対応する Topic をクエリします。変更イベントがメッセージとして表示された場合、パイプラインは正常に動作しています。詳細については、「メッセージのクエリ」をご参照ください。
次のステップ
より多くのテーブルを監視するには、各テーブルの CDC を有効にし、ApsaraMQ for Kafka で対応する Topic を作成してください。
コネクタ構成オプションの完全なリストについては、「Debezium SQL Serverコネクタのドキュメント」をご参照ください。