すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:SQL Server から Kafka Connect を使用した ApsaraMQ for Kafka へのデータ同期

最終更新日:Mar 11, 2026

このチュートリアルでは、SQL Server データベースから ApsaraMQ for Kafka へ行レベルの変更をストリームする変更データキャプチャ (CDC) パイプラインを構築する方法について説明します。このパイプラインは、Debezium SQL Serverソースコネクタと分散モードの Kafka Connect を使用します。

仕組み

Debezium SQL Serverソースコネクタは、CDC を介して SQL Server トランザクションログを読み取り、各挿入、更新、または削除を Kafkaメッセージに変換します。Kafka Connect は、コネクタを分散ワーカープロセスとして実行し、変更イベントを ApsaraMQ for Kafka の Topic にプッシュし、専用Topic にスキーマ履歴を追跡します。

データフロー:

SQL Server (CDC enabled) → Debezium source connector → Kafka Connect → ApsaraMQ for Kafka topics

各監視対象テーブルは、<server-name>.<database>.<table> という名前の個別の Topic にマップされます。例えば、server1.testDB.products のようになります。

前提条件

開始する前に、次のコンポーネントを準備してください。

  • Debezium SQL Serverソースコネクタ -- Maven リポジトリからダウンロードします。ご利用の Kafka Connect のバージョンと互換性のあるバージョンを選択してください。

  • Kafka Connect 2.1.0 以降 -- Apache Kafka ダウンロードからダウンロードします。

    説明 Debezium SQL Serverソースコネクタには Kafka Connect 2.1.0 以降が必要です。以前のバージョンはサポートされていません。
  • Docker -- Docker Desktopからダウンロードします。

ステップ 1: Kafka Connect の構成

  1. Debezium SQL Serverソースコネクタパッケージをローカルディレクトリに抽出します。

  2. Kafka Connect 構成ファイル connect-distributed.properties を開き、plugin.path を抽出したコネクタを含むディレクトリに設定します。

    重要

    以前のバージョンの Kafka Connect では、plugin.path プロパティはサポートされていません。代わりに CLASSPATH 環境変数を設定してください。

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
    # 抽出されたコネクタ JAR を含むディレクトリへのパス
    plugin.path=/kafka/connect/plugins

ステップ 2: Kafka Connect の開始

  1. インターネットアクセスのみ -- インターネット経由で ApsaraMQ for Kafka にアクセスする場合は、まず JAAS認証ファイルを構成してください。仮想プライベートクラウド (VPC) アクセスの場合は、このステップをスキップしてください。

    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
  2. 分散モードで Kafka Connect を開始します。

    bin/connect-distributed.sh config/connect-distributed.properties

ステップ 3: Docker を使用した SQL Server のセットアップ

重要

CDC には SQL Server 2016 SP1 以降が必要です。詳細については、「SQL Server 2016 SP1 リリースノート」および「変更データキャプチャについて」をご参照ください。

  1. docker-compose-sqlserver.yaml をダウンロードします。

  2. SQL Server コンテナを開始します。

    docker-compose -f docker-compose-sqlserver.yaml up
  3. inventory.sql をダウンロードし、テストデータをデータベースにロードします。

    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

ステップ 4: 既存テーブルの CDC の有効化 (オプション)

データベースに既に存在するテーブルから変更をキャプチャする場合は、データベースレベルとテーブルレベルで CDC を有効にしてください。

データベースレベルでの CDC の有効化

USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO

特定のテーブルの CDC の有効化

USE testDB
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'MyTable',
@role_name     = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GO

パラメーターは次のとおりです。

パラメーター説明
@source_schemaソーステーブルのスキーマ (例: dbo
@source_name監視対象テーブルの名前
@role_name変更データへのアクセスを制御するデータベースロール
@filegroup_name変更テーブルを保存するために使用されるファイルグループ
@supports_net_changesネット変更クエリを有効にするには 1 に設定します。

CDC ステータスの確認

次のコマンドを実行して、CDC がアクティブであり、ご利用のアカウントが必要な権限を持っていることを確認してください。

EXEC sys.sp_cdc_help_change_data_capture
GO

結果が空の場合、ご利用のアカウントは CDC が有効なテーブルにアクセスできません。ロール割り当てを確認してください。

SQL Server Agent が実行中であることの確認

CDC は SQL Server Agent に依存します。次のコマンドを実行して、そのステータスを確認してください。

EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

出力が Running を示している場合、SQL Server Agent はアクティブです。

ステップ 5: ソースコネクタの構成と開始

ApsaraMQ for Kafka での Topic の作成

コネクタを開始する前に、ApsaraMQ for Kafka コンソールで必要な Topic を作成してください。コネクタは、<server-name>.<database>.<table> という名前の Topic に変更イベントを書き込みます。

このチュートリアルのテストデータベース (testDB に 4 つのテーブル) の場合、次の Topic を作成してください。

Topic目的
server1コネクタのサーバーレベル Topic
server1.testDB.customerscustomers テーブルからの変更イベント
server1.testDB.ordersorders テーブルからの変更イベント
server1.testDB.productsproducts テーブルからの変更イベント
server1.testDB.products_on_handproducts_on_hand テーブルからの変更イベント
schema-changes-inventoryコネクタのスキーマ変更履歴

Topic の作成方法については、「Topic の作成」をご参照ください。または、CreateTopic API を呼び出します。

コネクタの構成

  1. register-sqlserver.json をダウンロードします。

  2. register-sqlserver.json を開き、アクセス方法に基づいて次のプロパティを更新します。VPC アクセス インターネットアクセス 主要なプロパティは次のとおりです。インターネットアクセスの場合、追加の SSL および SASL プロパティにより、ApsaraMQ for Kafka への暗号化された認証済み接続が可能になります。

    プロパティ説明
    database.history.kafka.bootstrap.serversご利用の ApsaraMQ for Kafka インスタンスのエンドポイント。VPC アクセスにはデフォルトエンドポイントを、インターネットアクセスには SSL エンドポイントを使用します。エンドポイントは ApsaraMQ for Kafka コンソールで確認できます。
    database.server.nameすべての変更イベント Topic のプレフィックスとして使用される論理サーバー名。server1 に設定すると、server1.testDB.products のような Topic が生成されます。
    database.history.kafka.topicコネクタがスキーマ変更履歴を保存する Topic。コネクタを開始する前に、この Topic をコンソールで作成してください。
    "database.history.kafka.bootstrap.servers" : "<your-default-endpoint>",
    "database.server.name": "server1",
    "database.history.kafka.topic": "schema-changes-inventory"
    "database.history.kafka.bootstrap.servers" : "<your-ssl-endpoint>",
    "database.server.name": "server1",
    "database.history.kafka.topic": "schema-changes-inventory",
    "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
    "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
    "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.sasl.mechanism": "PLAIN"

コネクタの開始

POST リクエストを送信して、Kafka Connect にコネクタを登録します。

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ \
  -d @register-sqlserver.json

結果の確認

  1. 監視対象の SQL Server テーブルに行を挿入、更新、または削除します。

  2. ApsaraMQ for Kafka コンソールで、[メッセージクエリ] ページに移動し、対応する Topic をクエリします。変更イベントがメッセージとして表示された場合、パイプラインは正常に動作しています。詳細については、「メッセージのクエリ」をご参照ください。

次のステップ

  • より多くのテーブルを監視するには、各テーブルの CDC を有効にし、ApsaraMQ for Kafka で対応する Topic を作成してください。

  • コネクタ構成オプションの完全なリストについては、「Debezium SQL Serverコネクタのドキュメント」をご参照ください。