ダウンストリームアプリケーションが、イベント駆動型処理、検索インデックス構築、または分析などの目的でデータベース変更のリアルタイムフィードを必要とする場合、PolarDB-X 2.0 から Apache Kafka へ生の変更データをストリーミングすることで、アプリケーションの変更を加えることなく、プロデューサーとコンシューマーを分離できます。Data Transmission Service (DTS) は、ソースのバイナリログから INSERT、UPDATE、DELETE 操作をキャプチャし、それらを Kafka のトピックに配信します。これにより、ダウンストリームのコンシューマーは常に最新のデータビューを利用できます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
MySQL 5.7 と互換性のある PolarDB-X 2.0 インスタンス
DTS がサポートするバージョンの Message Queue for Apache Kafka インスタンス — 詳細については、「データ同期シナリオの概要」をご参照ください。
Kafka インスタンスの利用可能なストレージ容量が、ソース PolarDB-X インスタンスの全データサイズを超えること
同期データを受信するための Kafka トピックが作成済みであること — 詳細については、「ステップ 1:トピックの作成」をご参照ください。
制限事項
ソースデータベースの要件
テーブルには PRIMARY KEY または一意制約 (UNIQUE constraint) を設定する必要があります。また、すべてのフィールドが一意である必要があります。これを満たさない場合、宛先に重複レコードが生成される可能性があります。
同期対象としてテーブルを選択し、宛先でのテーブル名またはカラム名の変更が必要な場合、1 つのタスクでサポートされるテーブル数は最大 5,000 個です。5,000 個を超える場合は、複数のタスクに分割するか、代わりにデータベース全体を同期してください。
同期対象として選択できるのはテーブルのみです。ビュー、トリガー、ストアドプロシージャは同期されません。
DTS は外部キー (foreign keys) を同期しません。ソース側で実行されたカスケード操作および削除操作は、宛先に反映されません。
バイナリログの要件
PolarDB-X 2.0 コンソールでバイナリロギングを有効化し、binlog_row_image を full に設定します。このパラメーターが正しく設定されていない場合、事前チェックは失敗し、タスクを開始できません。詳細については、「パラメーター設定」をご参照ください。
同期タイプに応じて、バイナリログの保持期間を設定します。
| 同期タイプ | 最低保持期間 |
|---|---|
| 増分同期のみ | 24 時間 |
| フル + 増分同期 | 7 日間(フル同期完了後は 24 時間以上に短縮可能) |
バイナリログが DTS によって処理される前にパージされた場合、以下の問題が発生する可能性があります。
タスクの失敗:DTS がバイナリログを取得できず、タスクが失敗する可能性があります。
データ損失または不整合:例外的な状況では、データの不整合または損失が発生する可能性があります。DTS は、サービスレベルアグリーメント (SLA) で定義された信頼性およびパフォーマンスを保証できません。タスクを開始する前に、上記の要件を満たすよう保持期間を設定してください。
その他の制限事項
同期対象に対する DDL 操作で pt-online-schema-change を使用しないでください。タスクが失敗する可能性があります。
同期中は、宛先 Kafka インスタンスへの書き込みを DTS 経由でのみ行ってください。他のツール経由での書き込みは、データの不整合を引き起こす可能性があります。他のツールが宛先に同時書き込みを行う状況で、DMS を使用してオンライン DDL 操作を実行すると、データ損失が発生する可能性があります。
テーブルの名前が変更され、その新しい名前が同期対象オブジェクトに含まれていない場合、DTS はそのテーブルの同期を停止します。再開するには、同期タスクにオブジェクトを追加するおよび同期対象オブジェクトを再選択します。
初期のフルデータ同期では、並列 INSERT 操作が使用されるため、宛先のテーブルが断片化 (fragmentation) します。フル同期完了後の宛先テーブルの表領域 (tablespace) サイズは、ソースより大きくなります。
可能であれば、ピーク時を避けてタスクを実行してください。フルデータ同期は、ソースおよび宛先データベースの読み取りおよび書き込み負荷を高めます。
DTS は定期的にソースデータベース内のdts_health_check.ha_health_checkテーブルを更新し、バイナリログの位置を進めます。
課金
| 同期タイプ | 料金 |
|---|---|
| スキーマ同期およびフルデータ同期 | 無料 |
| 増分データ同期 | 課金済み — 「課金概要 |
単一レコードのサイズ制限
Kafka は 10 MB を超えるレコードを拒否します。ソースの行がこの制限を超える場合、DTS は当該レコードを書き込めず、タスクが中断されます。これを回避するため、タスク設定時に大規模フィールドを含むカラムを除外してください。すでに大規模フィールドを含むテーブルが同期対象に含まれている場合は、当該テーブルを同期対象リストから削除し、再度追加して、 oversized カラムを除外するフィルター条件を設定してください。
同期可能な SQL 操作
DML 操作のみ:INSERT、UPDATE、DELETE。
DDL 情報は、DDL 情報を格納するトピック パラメーターを使用して、別途指定した Kafka トピックにルーティングします。
同期タスクの設定
ステップ 1:データ同期タスクページへ移動
Data Management (DMS) コンソール にログインします。
トップナビゲーションバーで、DTS をクリックします。
左側のナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。
ナビゲーション オプションは、コンソール モードによって異なります。『シンプル モード』および『DMS コンソールのレイアウトとスタイルをカスタマイズする』をご参照ください。または、直接『データ同期タスク ページ』に移動します。
ステップ 2:リージョンの選択
データ同期タスク の右側で、同期インスタンスが配置されているリージョンを選択します。
新しい DTS コンソールでは、代わりにトップナビゲーションバーでリージョンを選択します。
ステップ 3:ソースおよび宛先データベースの設定
タスクの作成 をクリックします。表示されたページで、以下のパラメーターを設定します。
一般
| パラメーター | 説明 |
|---|---|
| タスク名 | タスクの名前です。DTS がデフォルト名を自動的に割り当てます。タスクを容易に識別できるように、意味のある名前を付けてください(一意である必要はありません)。 |
ソースデータベース
ソースデータベースを設定する前に、データベースアカウントが、同期対象のオブジェクトに対して SELECT、REPLICATION CLIENT、および REPLICATION SLAVE の必要な権限を持っていることを確認してください。権限の付与に関する詳細については、「PolarDB-X のデータ同期ツール」をご参照ください。
| パラメーター | 値 / 説明 |
|---|---|
| 既存の DMS データベースインスタンスを選択 | (任意)下記のパラメーターを自動入力するために、既存のインスタンスを選択します。 |
| データベースタイプ | PolarDB-X 2.0 を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | ソース PolarDB-X インスタンスが配置されているリージョンです。 |
| インスタンス ID | ソース PolarDB-X インスタンスの ID です。 |
| データベースアカウント | SELECT、REPLICATION CLIENT、REPLICATION SLAVE 権限を持つアカウントです。 |
| データベースパスワード | データベースアカウントのパスワードです。 |
宛先データベース
| パラメーター | 値 / 説明 |
|---|---|
| 既存の DMS データベースインスタンスを選択 | (任意)下記のパラメーターを自動入力するために、既存のインスタンスを選択します。 |
| データベースタイプ | Kafka を選択します。 |
| アクセス方法 | Express Connect、VPN Gateway、または Smart Access Gateway を選択します。DTS は Message Queue for Apache Kafka を直接アクセス方法として一覧表示しません — 自己管理 Kafka クラスターとして設定してください。 |
| インスタンスリージョン | 宛先 Kafka インスタンスが配置されているリージョンです。 |
| 接続済み VPC | Kafka インスタンスの仮想プライベートクラウド (VPC) ID です。VPC ID を確認するには、Kafka コンソールに移動し、インスタンスの詳細ページを開き、「設定情報」セクションを確認してください。 |
| IP アドレスまたはドメイン名 | Kafka インスタンスの「デフォルトエンドポイント」フィールドから取得した IP アドレスです。これは、インスタンスの詳細ページの「基本情報」セクションで確認できます。 |
| ポート番号 | Kafka サービスポートです。デフォルト値:9092。 |
| データベースアカウント | Kafka アカウントです。VPC 経由で接続する場合は空白のままにしてください — VPC 接続のインスタンスでは認証は不要です。 |
| データベースパスワード | Kafka アカウントのパスワードです。VPC 接続のインスタンスの場合は空白のままにしてください。 |
| Kafka バージョン | 宛先 Kafka インスタンスのバージョンです。 |
| 暗号化 | セキュリティ要件に応じて、非暗号化 または SCRAM-SHA-256 を選択します。 |
| トピック | 同期データを受信するトピックです。 |
| DDL 情報を格納するトピック | (任意)DDL 情報専用の別途のトピックです。空白のままにした場合、DDL 情報は「トピック」で指定したトピックに書き込まれます。 |
| Kafka Schema Registry の使用 | Avro スキーマ管理のために Kafka Schema Registry を使用するかどうかを指定します。はい を選択し、必要に応じて Schema Registry の URL を入力します。それ以外の場合は いいえ を選択します。 |
ステップ 4:接続性のテスト
接続性のテストと続行 をクリックします。
DTS は、自動的にその CIDR ブロックを Alibaba Cloud データベースインスタンスのホワイトリストおよび Elastic Compute Service (ECS) でホストされるデータベースのセキュリティグループルールに追加します。オンプレミスまたはサードパーティのデータベースの場合は、DTS の CIDR ブロックを手動で追加してください。詳細については、「オンプレミスデータベースのセキュリティ設定への DTS サーバーの CIDR ブロックの追加」をご参照ください。
DTS CIDR ブロックをホワイトリストまたはセキュリティグループに追加すると、セキュリティリスクが発生します。続行する前に、強力な認証情報を使用する、公開ポートを最小限に抑える、API 呼び出しを監査する、ホワイトリストルールを定期的にレビューする、パブリックインターネット接続よりもプライベートネットワーク接続(Express Connect、VPN Gateway、または Smart Access Gateway)を優先するなどの対策を講じてください。
ステップ 5:同期対象および設定の構成
| パラメーター | 説明 |
|---|---|
| 同期タイプ | 増分データ同期 がデフォルトで選択されています。スキーマ同期 および フルデータ同期 も選択することで、増分同期のベースラインとして既存データを同期できます。 |
| 競合テーブルの処理モード | [事前チェックとエラー報告](デフォルト):ソースと送信先に同名のテーブルが存在する場合、事前チェックは失敗します。送信先テーブルを削除することなく名前の競合を解決するには、オブジェクト名マッピング を使用してください。[エラーを無視して続行]:名前の競合チェックをスキップします。完全同期中は既存の送信先レコードが保持され、増分同期中は上書きされます。データの不整合が生じる可能性があるため、注意して使用してください。 |
| Kafka 内のデータ形式 | DTS が Kafka トピックにレコードを書き込む際の形式です。ダウンストリームのコンシューマーがデータを読み取る方法に応じて選択してください:DTS Avro(スキーマ強制パイプラインに推奨):レコードは DTS Avro スキーマ定義を使用してシリアル化されます。GitHub 上のスキーマをご参照ください。Canal Json:レコードは Canal JSON 形式で保存されます。「Kafka クラスターのデータ形式」で、全フィールドのリファレンスをご確認ください。 |
| Kafka パーティションへのデータ送信ポリシー | 各レコードがどの Kafka パーティションにルーティングされるかを制御します。「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。 重要 この機能は、ソースデータベースが PolarDB-X 1.0 の場合、サポートされていません。 |
| 宛先インスタンスにおけるオブジェクト名の大文字小文字 | 宛先におけるデータベース名、テーブル名、および列名の大文字小文字を決定します。「DTS デフォルトポリシー」がデフォルトで選択されます。詳細については、「宛先インスタンスでのオブジェクト名の大文字小文字の指定」をご参照ください。 |
| ソースオブジェクト | ソースオブジェクト からカラム、テーブル、またはデータベースを選択し、矢印アイコンをクリックして 選択済みオブジェクト に移動します。 |
| 選択済みオブジェクト | 単一のオブジェクトの名前を変更するには、そのオブジェクトを右クリックします。詳細については、「単一のオブジェクトの名前をマッピングする」をご参照ください。複数のオブジェクトの名前を一度に変更するには、[バッチ編集] をクリックします。詳細については、「複数のオブジェクト名を一度にマッピングする」をご参照ください。特定のオブジェクトについて同期する SQL 操作を選択するには、そのオブジェクトを右クリックして操作を選択します。行をフィルターするには、オブジェクトを右クリックして WHERE 句を指定します。詳細については、「フィルター条件の設定」をご参照ください。 |
ステップ 6:高度な設定の構成
次へ:高度な設定 をクリックします。
| パラメーター | 説明 |
|---|---|
| アラートの設定 | タスクが失敗した場合や同期遅延がしきい値を超えた場合に通知されるよう、アラートを設定します。[はい] を選択して、しきい値とアラート連絡先を設定します。詳細については、「新しい DTS タスクのモニタリングとアラート機能を設定する」をご参照ください。 |
| 失敗した接続の再試行時間範囲の指定 | DTS が接続失敗時に再試行を実行する時間です。範囲:10~1,440 分。デフォルト値:720 分。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは宛先データベースを共有する場合、最も短い再試行時間が適用されます。再試行期間中も DTS インスタンスの課金が発生します。 |
| ETL の構成 | 抽出・変換・書き出し (extract, transform, and load) 変換を適用するかどうかを指定します。はいデータ移行またはデータ同期タスクにおける ETL の設定 を選択して、処理文を入力します。「」をご参照ください。 |
| 転送および逆再生タスクのハートビートテーブルに対する SQL 操作の削除有無 | DTS がソースデータベースにハートビート操作を書き込むかどうかを制御します。はい:ハートビート書き込みは抑制され、同期遅延の測定値が不正確になる可能性があります。いいえ:ハートビート書き込みが実行され、ソースデータベースの物理バックアップまたはクローン作成に影響を与える可能性があります。 |
ステップ 7:事前チェックの実行
次へ:タスク設定の保存と事前チェック をクリックします。
このタスクを作成するために使用される OpenAPI パラメーターをプレビューするには、ボタンにカーソルを合わせて、OpenAPI パラメーターのプレビュー をクリックします。
DTS はタスク開始前に事前チェックを実行します。項目が失敗した場合:
失敗した項目の横にある 詳細の表示 をクリックし、問題を修正してから、再チェック をクリックします。
無視可能な警告が表示された場合、警告の詳細の確認 をクリックし、ダイアログボックスで 無視 をクリックします。警告を無視するとデータの不整合が発生する可能性があるため、注意して進めてください。
ステップ 8:インスタンスの購入
事前チェックが 100 % に達するまで待ち、その後 次へ:インスタンスの購入 をクリックします。
| パラメーター | 説明 |
|---|---|
| 課金方法 | サブスクリプション:前払い方式。長期利用にコスト効率が良いです。従量課金:時間単位で課金されます。不要になった時点でインスタンスを解放すれば、課金を停止できます。 |
| リソースグループ | インスタンスのリソースグループです。デフォルトはデフォルト リソースグループです。詳細については、「Resource Management とは?」をご参照ください。 |
| インスタンスクラス | 同期仕様は、スループットを決定します。詳細については、「データ同期インスタンスの仕様」をご参照ください。 |
| サブスクリプション期間 | サブスクリプション を選択した場合に利用可能です。選択肢:1~9 か月、1 年、2 年、3 年、5 年。 |
Data Transmission Service (従量課金) 利用規約 をお読みになり、同意した上で、購入して開始 をクリックします。
タスクがタスクリストに表示されます。そこから進行状況を監視できます。