Data Transmission Service (DTS) を使用して、PolarDB for MySQL クラスターのデータ変更をリアルタイムで Message Queue for Apache Kafka のトピックにストリーミングします。DTS は初期の完全データセットおよびその後のすべての増分変更をキャプチャし、Kafka コンシューマーがソースデータと常に同期した状態を維持します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
PolarDB for MySQL クラスターが存在すること。「従量課金クラスターの購入」または「サブスクリプションクラスターの購入」をご参照ください。
同期データを受信するためのトピックが作成済みの Message Queue for Apache Kafka インスタンスが存在すること。「ステップ 1:トピックの作成」をご参照ください。
DTS で使用されるデータベースアカウントに、同期対象オブジェクトに対する読み取り権限が付与済みであること。
制限事項
ソースデータベースの要件
テーブルには PRIMARY KEY またはすべてのフィールドが一意である一意制約 (UNIQUE constraint) が必要です。これを満たさない場合、送信先に重複レコードが発生する可能性があります。
送信先データベースでテーブルまたはカラムの名前を変更する場合、1 つのタスクで同期できるテーブル数は最大 1,000 個です。それ以上のテーブルを同期する場合は、複数のタスクを設定するか、代わりにデータベース全体を同期してください。
増分データの同期が必要な場合は、バイナリロギングを有効化し、
loose_polar_log_binパラメーターを [有効] に設定する必要があります。詳細については、「バイナリロギングの有効化」および「パラメーターの変更」をご参照ください。
PolarDB for MySQL クラスターでバイナリロギングを有効化すると、バイナリログファイルのストレージ料金が発生します。
ソースクラスターの読み取り専用ノードは、同期タスクに含めることはできません。
DTS は、バイナリログ位置を進めるために、ソースデータベースに対して定期的に
CREATE DATABASE IF NOT EXISTS \`test\`を実行します。これは想定された動作です。
バイナリログの保持期間
バイナリログの保持期間が不十分な場合、DTS がログ位置を失い、タスクが中断されたり、データの不整合が発生したりする可能性があります。
| タスクタイプ | 最低保持期間 |
|---|---|
| 増分データ同期のみ | 24 時間 |
| 完全 + 増分データ同期 | 7 日間 |
完全データ同期が完了した後は、保持期間を 24 時間以上に設定できます。前述の要件に基づいてバイナリログの保持期間を設定してください。これを満たさない場合、DTS の SLA ではサービスの信頼性およびパフォーマンスを保証しません。
単一レコードのサイズ制限
Kafka は 10 MB を超えるレコードを拒否します。ソースの行がこの制限を超える場合、DTS はそのレコードを書き込めず、タスクが中断されます。
大規模フィールドを含むテーブルを処理するには、以下のいずれかの方法を使用してください。
当該テーブルをタスク対象オブジェクトから完全に除外する
当該テーブルを含めるが、大規模フィールドのカラムを除外するフィルター条件を追加する
すでに大規模フィールドを含むテーブルを追加済みの場合は、選択済みオブジェクトから削除し、再度追加したうえでフィルター条件を設定してください。
同期中の DDL 操作
タスク実行中にソーステーブルに対して pt-online-schema-change を実行しないでください。これにより同期が失敗する可能性があります。
ターゲットデータベースへのデータ書き込みには、DTS のみを使用することを推奨します。これにより、ソースデータベースとターゲットデータベース間のデータの不整合を防ぐことができます。ターゲットデータベースへの書き込みが DTS のみである場合、Data Management (DMS) を使用してソーステーブルに対してロックフリー DDL 操作を実行できます。DTS 以外のツールを使用してターゲットデータベースにデータを書き込む場合、DMS を使用してオンライン DDL 操作を実行すると、ターゲットデータベースでデータ損失が発生する可能性があります。
サポートされる同期トポロジ
1 対 1
1 対多
多対 1
カスケード
詳細については、「同期トポロジ」をご参照ください。
同期可能な SQL 操作
| タイプ | ステートメント |
|---|---|
| DML | INSERT、UPDATE、DELETE |
| DDL | CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、TRUNCATE TABLE |
| DDL | CREATE VIEW、ALTER VIEW、DROP VIEW |
| DDL | CREATE PROCEDURE、ALTER PROCEDURE、DROP PROCEDURE |
| DDL | CREATE FUNCTION、DROP FUNCTION、CREATE TRIGGER、DROP TRIGGER |
| DDL | CREATE INDEX、DROP INDEX |
同期タスクの作成
ステップ 1:データ同期タスクページを開く
DMS コンソールにログインします。
上部ナビゲーションバーで、DTS をクリックします。
左側ナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。
また、新しい DTS コンソールで、データ同期タスクページ に直接移動することもできます。コンソールのナビゲーションは、使用しているレイアウトモードによって異なる場合があります。詳細については、「シンプルモード」をご参照ください。
ステップ 2:リージョンの選択
データ同期タスク の横にあるドロップダウンリストから、同期インスタンスを配置するリージョンを選択します。
新しい DTS コンソールでは、代わりに上部ナビゲーションバーからリージョンを選択します。
ステップ 3:ソースおよび送信先データベースの設定
タスクの作成 をクリックします。続行する前に、ページ上部に表示される制限事項を確認し、ソースおよび送信先の接続を設定します。
ソースデータベース
| パラメーター | 説明 |
|---|---|
| タスク名 | 自動生成されます。タスクを識別できるように、意味のある名前を指定してください(一意である必要はありません)。 |
| インスタンスの選択 | 既存のインスタンスを選択して設定を再利用するか、新規接続を構成してください。 |
| データベースタイプ | PolarDB for MySQL を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | ソース PolarDB for MySQL クラスターのリージョン。 |
| Alibaba Cloud アカウント間でのデータ複製 | 同一アカウント内での同期の場合は、いいえ を選択します。 |
| PolarDB クラスター ID | ソースクラスターの ID。 |
| データベースアカウント | 同期対象オブジェクトに対する読み取り権限を持つアカウント。 |
| データベースパスワード | データベースアカウントのパスワード。 |
| 暗号化 | セキュリティ要件に応じて、非暗号化 または SSL 暗号化 を選択します。SSL を使用する場合は、事前にクラスターで SSL 暗号化を有効化してください。 |
宛先データベース
| パラメーター | 説明 |
|---|---|
| インスタンスの選択 | 既存のインスタンスを選択して設定を再利用するか、新規接続を構成してください。 |
| データベースタイプ | Kafka を選択します。 |
| アクセス方法 | Express Connect、VPN Gateway、または Smart Access Gateway を選択します。 |
| インスタンスリージョン | 送信先 Kafka インスタンスのリージョン。 |
| 接続済み VPC | Kafka インスタンスの仮想プライベートクラウド (VPC) ID。確認方法:Message Queue for Apache Kafka コンソールに移動し、インスタンスの詳細ページを開き、設定情報 セクションを参照してください。 |
| IP アドレス | Kafka インスタンスの デフォルトエンドポイント から取得した IP アドレス。確認方法:インスタンスの詳細ページで、エンドポイント情報 セクションを参照してください。 |
| ポート番号 | Kafka インスタンスのサービスポート。デフォルト値:9092。 |
| データベースアカウント | Kafka アカウント。インスタンスタイプが VPC インスタンス の場合は、空白のままにしてください。 |
| データベースパスワード | Kafka アカウントのパスワード。インスタンスタイプが VPC インスタンス の場合は、空白のままにしてください。 |
| Kafka バージョン | 送信先 Kafka インスタンスのバージョン。 |
| 暗号化 | 非暗号化 または SCRAM-SHA-256 を選択します。 |
| トピック | 同期データを受信するトピック。ドロップダウンリストから選択します。 |
| DDL 情報を格納するトピック | DDL イベントメッセージを格納するトピック。空白のままにした場合、DDL 情報はデータと同じトピックに格納されます。 |
| Kafka Schema Registry の使用 | Avro スキーマの保存に Kafka Schema Registry を使用する場合は、はい を選択します。選択した場合は、Schema Registry の URL または IP アドレスを入力してください。いいえ を選択するとスキップされます。 |
Message Queue for Apache Kafka は、インスタンスタイプとして直接選択できません。Express Connect、VPN Gateway、または Smart Access Gateway を介して、セルフマネージド Kafka エンドポイントとして接続してください。
ステップ 4:接続性のテスト
送信先でホワイトリストが使用されている場合は、最初に DTS サーバーの CIDR ブロックをホワイトリストに追加します。詳細については、「オンプレミスデータベースのセキュリティ設定への DTS サーバー CIDR ブロックの追加」をご参照ください。
接続性のテストと続行 をクリックします。
ステップ 5:オブジェクトの選択と設定の構成
| パラメーター | 説明 |
|---|---|
| タスク段階 | 増分データ同期がデフォルトで選択されています。変更のストリーミング前に初期データセットをキャプチャするには、スキーマ同期とフルデータ同期も選択してください。 |
| 競合するテーブルの処理モード | 事前チェックとエラー報告:送信先にソースと同じ名前のテーブルが存在する場合、事前チェックは失敗します。このオプションは、予期しない上書きを避けるために使用します。エラーを無視して続行:競合チェックをスキップします。ソースデータベースとターゲットデータベースのスキーマが同じで、データレコードが送信先の既存レコードと同じプライマリキー値を持つ場合、次のように処理されます。フルデータ同期中は、DTS はそのレコードを同期せず、送信先の既存レコードが保持されます。増分データ同期中は、DTS はそのレコードを同期し、送信先の既存レコードは上書きされます。スキーマが異なる場合、データが初期化されず、一部のカラムのみが同期されるか、タスクが失敗する可能性があります。 |
| Kafka でのデータフォーマット | DTS Avro:データは DTS Avro スキーマを使用してシリアル化されます。GitHub 上のスキーマ定義をご参照ください。Canal JSON:データは Canal JSON フォーマットで保存されます。詳細については、「Canal JSON フォーマットのリファレンス」をご参照ください。 |
| Kafka パーティションへのデータ転送ポリシー | 順序付けとスループットの要件に基づいて、パーティションルーティングポリシーを選択します。詳細については、「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。 |
| 宛先インスタンスにおけるオブジェクト名の大文字/小文字 | オブジェクト名の大文字と小文字の区別の指定Kafka メッセージ内のデータベース名、テーブル名、カラム名の大文字/小文字を制御します。デフォルトは DTS のデフォルトポリシーです。詳細については、をご参照ください。 |
| ソースオブジェクト | ソースオブジェクト パネルからテーブルを選択し、矢印アイコンをクリックして 選択したオブジェクト に移動します。テーブルのみ選択できます。 |
| 選択したオブジェクト | オブジェクトを右クリックして名前を変更したり、特定の SQL 操作をフィルターしたりします。バッチ編集 をクリックすると、複数のオブジェクトの名前を一度に変更できます。行をフィルターするには、オブジェクトを右クリックして WHERE 条件を指定します。 |
ステップ 6:高度な設定の構成
次へ:高度な設定 をクリックします。
| パラメーター | 説明 |
|---|---|
| アラートの設定 | タスクの失敗や同期遅延がしきい値を超えた際に通知を受ける場合は、はい を選択します。しきい値およびアラート連絡先を指定してください。 |
| 失敗した接続の再試行時間 | DTS がタスクを失敗と見なすまでの失敗した接続の再試行時間。範囲:10~1,440 分。デフォルト値:720 分。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは送信先を共有する場合、これらのタスクの中で最も短い再試行時間が適用されます。 |
| ETL の構成 | [はい] を選択して、同期中にデータを変換します。コードエディタに処理文を入力します。詳細については、「ETL の設定」をご参照ください。 |
| 転送および逆再生タスクのハートビートテーブルに対する SQL 操作の削除 | はい:DTS はハートビートテーブル操作をソースに書き込みません。同期遅延が表示される場合があります。いいえ:DTS はハートビートテーブル操作をソースに書き込みます。これにより、ソースデータベースの物理バックアップおよびクローン作成に影響が出る可能性があります。 |
ステップ 7:事前チェックの実行
次へ:タスク設定の保存と事前チェック をクリックします。
タスクを開始する前に、DTS が事前チェックを実行します。各失敗項目について:
詳細の表示 をクリックして原因を確認します。
問題を修正し、再チェック をクリックします。
警告項目(ブロッキングではないもの)については:
可能であれば問題を修正してください。
スキップする場合は:項目の横にある 警告詳細の確認 をクリックし、無視 をクリックし、OK をクリックした後、再チェック をクリックします。
警告項目を無視すると、データの不整合が発生し、ビジネスにリスクを及ぼす可能性があります。
ステップ 8:インスタンスの購入
事前チェックの成功率达到 100 % になるまで待ち、次へ:インスタンスの購入 をクリックします。
必要な同期スループットに基づいて、[インスタンスクラス] を選択します。「データ同期インスタンスの仕様」をご参照ください。
ステップ 9:タスクの開始
Data Transmission Service (従量課金) 利用規約 を読み、チェックボックスをオンにします。
購入して開始 をクリックします。
タスクがタスクリストに表示されます。そこから進行状況および同期遅延を監視できます。
注意事項
データ同期を開始する前に、ソースおよび送信先データベースのパフォーマンスへの影響を評価してください。影響を最小限に抑えるため、ピーク時間帯を避けて同期を実行してください。初期完全データ同期中は、DTS がソースおよび送信先データベースの読み取りおよび書き込みリソースを使用するため、データベースサーバーの負荷が高まる可能性があります。
完全データ同期中、DTS は同時 INSERT 操作を実行するため、送信先テーブルの断片化が発生する可能性があります。完全同期完了後の送信先表領域は、ソースよりも大きくなる可能性があります。
送信先データベースへのデータ書き込みには、DTS のみを使用することを推奨します。DTS 以外のツールが同時に送信先 Kafka トピックに書き込む場合、DMS を使用したロックフリー DDL 操作によって送信先でデータ損失が発生する可能性があります。
次のステップ
オブジェクト名のマップ — 送信先に書き込まれる際に、テーブルまたは列の名前を変更します
SQL 条件を使用してデータをフィルターする — WHERE 条件を使用して行のサブセットを同期する
モニタリングとアラートの設定 — タスクの失敗および遅延の急増に関する通知を設定します
Kafka クラスターのデータ形式 — Kafka トピックに書き込まれる完全なメッセージスキーマについて理解します