Data Transmission Service (DTS) を使用して、PolarDB-X 1.0 インスタンスの変更データをリアルタイムで ApsaraMQ for Kafka のトピックにストリーミングします。DTS は同期処理を分散型のサブタスクとして実行し、各アタッチされた ApsaraDB RDS for MySQL インスタンスに対して 1 つのサブタスクを割り当てます。これにより、アプリケーションの変更を加えることなく、CDC イベントを Kafka コンシューマーにファンアウトできます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
PolarDB-X 1.0 インスタンスが必要です。詳細については、「PolarDB-X 1.0 インスタンスの作成」をご参照ください。
同期データを受信するための宛先 ApsaraMQ for Kafka インスタンス内のトピックが必要です。詳細については、「クイックスタート概要」をご参照ください。
ApsaraMQ for Kafka インスタンスに、PolarDB-X 1.0 からの完全なデータセットを格納できる十分な空きストレージ容量がある必要があります。
同期対象オブジェクトに対する読み取り権限が、データベースアカウントに付与されている必要があります。詳細については、「アカウントの管理」をご参照ください。
制限事項
ソースデータベースの要件
テーブルには主キー制約 (PRIMARY KEY constraint) または一意制約 (UNIQUE constraint) が設定されている必要があり、すべてのフィールドが一意である必要があります。これを満たさない場合、ターゲットデータベースに重複したデータレコードが生成される可能性があります。一意制約のみを持つテーブルでは、スキーマ同期はサポートされません。主キー制約を持つテーブルの同期を推奨します。セカンダリインデックスを持つテーブルは同期できません。
同期対象としてテーブル(データベース全体ではなく)を選択し、宛先でテーブル名またはカラム名を変更する必要がある場合、1 つのタスクで最大 5,000 個のテーブルをサポートします。それ以上のテーブル数が必要な場合は、複数のタスクに分割するか、データベース全体を同期してください。
PolarDB-X 1.0 にアタッチされた ApsaraDB RDS for MySQL インスタンスは、以下のバイナリログ要件を満たす必要があります:PolarDB-X 1.0
バイナリロギングが有効化されており、
binlog_row_imageがfullに設定されている必要があります。該当しない場合、事前チェックに失敗し、タスクを開始できません。これらの設定を確認するには、ソースデータベースで次の SQL を実行します。sql -- バイナリロギングのステータスを確認 SHOW VARIABLES LIKE 'log_bin'; -- binlog_row_image の値を確認(FULL である必要があります) SHOW VARIABLES LIKE 'binlog_row_image';いずれかの設定が不適切な場合は、各アタッチされた RDS インスタンスの MySQL 構成ファイル (
my.cnf) を更新します。log_bin = ON binlog_format = ROW binlog_row_image = FULL増分同期のみの場合:バイナリログは少なくとも 24 時間保持する必要があります。
完全同期+増分同期の場合:バイナリログは少なくとも 7 日間保持する必要があります。DTS がバイナリログを読み取れない場合、タスクは失敗し、データ損失が発生する可能性があります。完全データ同期が完了した後は、保持期間を 24 時間以上に設定できます。前述の要件に従ってバイナリログの保持期間を設定してください。そうしないと、Data Transmission Service (DTS) のサービスレベル合意 (SLA) で定められたサービスの信頼性およびパフォーマンスを保証できません。
運用上の制限
同期中に以下の操作を実行しないでください。これらの操作はタスクの失敗またはデータの不整合を引き起こす可能性があります。
アタッチされた ApsaraDB RDS for MySQL インスタンスのスペックアップ/スペックダウン
ApsaraDB RDS for MySQL インスタンス内の論理データベースおよび論理テーブルに対応する物理データベースおよび物理テーブルのディストリビューションの変更
シャードキーの変更
同期対象オブジェクトに対する DDL 操作
gh-ost または pt-online-schema-change を使用した DDL 操作
DTS は同期中、セッションレベルで外部キー制約チェックおよびカスケード操作を無効化します。ソースでカスケード更新またはカスケード削除を実行した場合、データの不整合が発生する可能性があります。
同期中に PolarDB-X 1.0 インスタンスのネットワークタイプが変更された場合、DTS タスクのネットワーク接続設定を更新してください。PolarDB-X 1.0
完全同期のみの場合:タスク実行中にソースデータベースへの書き込みを行わないでください。整合性を確保するため、スキーマ同期、完全データ同期、増分データ同期を同時に実行してください。
宛先へのデータ書き込みは DTS を通じてのみ行ってください。他のツールを使用して書き込んだ場合、DMS オンライン DDL 操作を実行するとデータ損失が発生する可能性があります。
PolarDB-X 1.0 同期は分散型同期として実行されます。各アタッチされた ApsaraDB RDS for MySQL インスタンスは、1 つの DTS サブタスクに対応します。タスクのトポロジでサブタスクのステータスを監視してください。
宛先の ApsaraMQ for Kafka インスタンスがスペックアップ/スペックダウンされた場合、同期を再開するためにインスタンスを再起動してください。
可能であれば、ピーク時を避けて同期を実行してください。初期完全データ同期では、ソースおよび宛先の両方で読み取りおよび書き込み負荷が増加します。
初期完全データ同期が完了した後、同時 INSERT 操作による断片化の影響で、宛先の表領域がソースよりも大きくなる場合があります。
サポートされる同期トポロジ
単方向・一対一
単方向・一対多
単方向・カスケード
単方向・多対一
詳細については、「同期トポロジ」をご参照ください。
同期可能な SQL 操作
| 操作タイプ | SQL ステートメント |
|---|---|
| DML | INSERT、UPDATE、DELETE |
同期タスクの作成
DTS コンソールの「データ同期」ページに移動します。
説明または、Data Management (DMS) コンソールにログインし、上部ナビゲーションバーの Data + AI 上にポインターを合わせ、DTS (DTS) > データ同期 を選択します。
左上隅で、同期インスタンスを配置するリージョンを選択します。
[タスクの作成] をクリックし、ソースおよび宛先データベースを構成します。
タスク設定
パラメーター 説明 タスク名 説明的な名前を入力します。DTS が自動的に名前を生成します。名前は一意である必要はありません。 ソースデータベース
パラメーター 説明 DMS データベースインスタンスの選択 任意。既存のインスタンスを選択すると、DTS が残りのフィールドを自動的に設定します。 データベースタイプ PolarDB-X 1.0 を選択します。 アクセス方法 Alibaba Cloud インスタンス を選択します。 インスタンスリージョン ソース PolarDB-X 1.0 インスタンスのリージョンです。 Alibaba Cloudアカウント全体でのデータの複製 同一アカウント内での同期の場合は いいえ を選択します。 インスタンス ID ソース PolarDB-X 1.0 インスタンスの ID です。 データベースアカウント 同期対象オブジェクトに対する読み取り権限を持つデータベースアカウントです。 データベースパスワード データベースアカウントのパスワードです。 宛先データベース
パラメーター 説明 DMS データベースインスタンスの選択 任意。既存のインスタンスを選択すると、DTS が残りのフィールドを自動的に設定します。 データベースタイプ Kafka を選択します。 アクセス方法 Express Connect、VPN Gateway、または Smart Access Gateway を選択します。Alibaba Cloud インスタンス はサポートされていません。 インスタンスリージョン 宛先 ApsaraMQ for Kafka インスタンスのリージョンです。 接続済み VPC ApsaraMQ for Kafka インスタンスの仮想プライベートクラウド (VPC) ID です。VPC ID を確認するには、ApsaraMQ for Kafka コンソールのインスタンス詳細ページに移動し、構成情報 を インスタンス情報 タブで確認します。 ドメイン名または IP アドレス ApsaraMQ for Kafka インスタンスの IP アドレス。IP を確認するには、ApsaraMQ for Kafka コンソールでインスタンスの詳細ページに移動し、[インスタンス情報] タブの エンドポイント情報 の下にある [デフォルトエンドポイント] フィールドから IP をコピーします。 ポート番号 ApsaraMQ for Kafka インスタンスのサービスポートです。デフォルト値: 9092。データベースアカウント ApsaraMQ for Kafka インスタンスのアカウントです。アクセス制御リスト (ACL) 認証が有効な場合にのみ必要です。詳細については、「SASL ユーザーへの権限付与」をご参照ください。 データベースパスワード Kafka アカウントのパスワードです。ACL 認証が有効な場合にのみ必要です。 Kafka バージョン 宛先 ApsaraMQ for Kafka インスタンスのバージョンです。 暗号化 ビジネスおよびセキュリティ要件に応じて、暗号化なし または SCRAM-SHA-256 を選択します。 トピック 同期データを受信するトピックです。ドロップダウンリストから選択します。 DDL 情報を格納するトピック DDL 情報を格納するトピックです。空白のままにした場合、DDL 情報は トピック で指定されたトピックに格納されます。 Kafka Schema Registry の使用 Avro スキーマの保存に Kafka Schema Registry を使用するかどうかを指定します。はい を選択して Schema Registry の URL を指定すると有効化され、いいえ を選択すると無効化されます。 「[接続性のテストと続行]」をクリックします。DTS は、自動的にそのサーバーの CIDR ブロックを Alibaba Cloud データベースインスタンスのホワイトリスト、または Elastic Compute Service (ECS) インスタンスのセキュリティグループルールに追加します。データセンター内またはサードパーティプロバイダーがホストする自己管理データベースの場合は、DTS サーバーの CIDR ブロックをデータベースのホワイトリストに手動で追加します。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
警告DTS サーバーの CIDR ブロックをホワイトリストまたはセキュリティグループに追加すると、セキュリティリスクが発生する可能性があります。予防措置として、認証情報を強化し、公開ポートを制限し、API 呼び出しを認証し、ホワイトリストルールを定期的に監査し、Express Connect、VPN Gateway、または Smart Access Gateway 経由での接続を検討してください。
同期対象オブジェクトを選択し、同期設定を構成します。
パラメーター 説明 同期タイプ デフォルトでは 増分データ同期 が選択されています。また、スキーマ同期 および 完全データ同期 も必ず選択してください。この 3 つを同時に実行することで、まず既存データがロードされ、増分同期の整合性を確保するためのベースラインが形成されます。 競合するテーブルの処理モード 事前チェックとエラー報告(デフォルト):宛先に同名のテーブルが存在する場合、事前チェックが失敗します。エラーを無視して続行:チェックをスキップしますが、データの不整合を引き起こす可能性があるため、慎重に使用してください。 Kafka 内のデータ形式 DTS Avro(PolarDB-X 1.0 のデフォルトかつ唯一のオプション):データは DTS Avro スキーマ定義に基づいて解析されます。Canal JSON は PolarDB-X 1.0 ではサポートされていません。DTS Avro スキーマの詳細については、「GitHub」をご参照ください。 Kafka パーティションへのデータ送信ポリシー サポートされていません。 宛先インスタンスにおけるオブジェクト名の大文字小文字 宛先におけるデータベース名、テーブル名、カラム名の大文字小文字を制御します。デフォルト: DTS デフォルトポリシー送信先インスタンスにおけるオブジェクト名の大文字小文字の指定。詳細については、「」をご参照ください。 ソースオブジェクト ソースオブジェクト からオブジェクトを選択し、矢印アイコンをクリックして 選択済みオブジェクト に移動します。データベース全体ではなく、個別のテーブルを選択してください。データベース全体を選択した場合、DTS は CREATE TABLE および DROP TABLE 操作を同期しません。 選択済みオブジェクト 単一のオブジェクトの名前を変更するには、そのオブジェクトを右クリックしてマッピングオプションを選択します。 複数のオブジェクトの名前を一度に変更するには、[一括編集] をクリックします。 条件に基づいて行をフィルター処理するには、オブジェクトを右クリックして WHERE 句を指定します。 「オブジェクト名をマップする」および「フィルター条件を指定する」をご参照ください。 [次へ:高度な設定] をクリックし、以下のオプションを構成します。
パラメーター 説明 DTS インスタンスを専用クラスターから共有クラスターへ移行 DTS 専用クラスターが不要な場合は、空白のままにします。DTS 専用クラスターとは モニタリングとアラート タスクが失敗した場合、または同期遅延がしきい値を超えた場合に通知を受信するには、[はい] を選択します。アラートのしきい値と連絡先を設定します。詳細については、「モニタリングとアラートの設定」をご参照ください。 接続失敗時の再試行時間 DTS が接続失敗時に再試行を行う時間枠です。範囲:10~1440 分。デフォルト:720 分。最低でも 30 分以上に設定してください。複数のタスクが同じソースまたは宛先データベースを共有する場合、最も短い再試行時間枠が優先されます。DTS が接続を再試行する際には、DTS インスタンスの課金が発生します。ビジネス要件に応じて再試行時間を設定することを推奨します。また、ソースおよび宛先インスタンスがリリースされた直後に、DTS インスタンスもできるだけ早くリリースしてください。 ソースおよび宛先データベースでその他の問題が発生した際の再試行待機時間 DTS が失敗した DML または DDL 操作を再試行する時間枠です。範囲:1~1440 分。デフォルト:10 分。最低でも 10 分以上に設定し、接続失敗時の再試行時間 の設定より短く設定してください。 ETL の構成 「[はい]」を選択して、抽出・変換・書き出し (ETL) 変換を適用し、コードエディタに処理ステートメントを入力します。詳しくは、「ETL の設定」をご参照ください。 タスク設定を保存し、事前チェックを実行します。
このタスク構成の API パラメーターをプレビューするには、[次へ:タスク設定の保存と事前チェック] 上にポインターを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。
[次へ:タスク設定の保存と事前チェック] をクリックします。
説明事前チェックに合格した場合にのみタスクが開始されます。いずれかの項目が不合格となった場合は、[詳細の表示] をクリックして原因を確認し、問題を修正してから再度事前チェックを実行してください。安全に無視できるアラート項目については、[アラートの詳細の確認]、次に [無視]、さらに [再チェック] をクリックしてください。
成功率 が 100 % になるまで待ち、その後 [次へ:インスタンスの購入] をクリックします。
[購入] ページで、課金およびインスタンス設定を構成します。
パラメーター 説明 課金方法 サブスクリプション:固定期間の前払い方式で、長期利用にコスト効率的です。従量課金:時間単位で課金され、短期利用に適しています。不要になった時点でインスタンスをリリースして課金を停止できます。 リソースグループ設定 同期インスタンスのリソースグループです。デフォルト: デフォルトリソースグループ。詳細については、「Resource Management とは? インスタンスクラス 同期スループット階層。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。 サブスクリプション期間 サブスクリプション 課金方法でのみ利用可能です。選択肢:1~9 か月、1 年、2 年、3 年、5 年。 Data Transmission Service (従量課金) 利用規約 を読み、同意します。
[購入して開始] をクリックし、ダイアログボックスで [OK] をクリックします。
タスクがタスクリストに表示されます。そこから同期の進行状況を監視できます。