Data Transmission Service (DTS) を使用すると、ApsaraDB RDS for MySQL インスタンスから ApsaraMQ for Kafka インスタンスへデータを移行できます。既存の履歴データと継続的な変更の両方を、アプリケーションのダウンタイムなしで Kafka トピックにストリーミングします。
仕組み
DTS は、最大 3 つの連続したフェーズで移行を実行します:
スキーマ移行:DTS はソースの MySQL インスタンスからテーブル定義を読み取り、送信先を準備します。
完全データ移行:DTS は、選択したテーブルのすべての既存行を、指定された Kafka トピックにコピーします。このフェーズ中に同時に INSERT 操作を行うと、送信先のテーブルスペースがソースよりも大きくなる可能性があります。
増分データ移行:完全データ移行が完了すると、DTS は MySQL のバイナリログを読み取り、INSERT、UPDATE、DELETE イベントをリアルタイムで Kafka トピックにストリーミングします。このフェーズは継続的に実行され、タスクが自動的に停止することはないため、アプリケーションは中断なくソースへの書き込みを続けることができます。
DTS は外部キーを移行しません。ソースでトリガーされたカスケード操作と削除操作は複製されません。
前提条件
開始する前に、以下が準備できていることを確認してください:
ApsaraDB RDS for MySQL インスタンス (ソース) と ApsaraMQ for Kafka インスタンス (送信先) の両方が作成済みであること。RDS インスタンスを作成する必要がある場合は、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
送信先の ApsaraMQ for Kafka インスタンスで利用可能なストレージが、ソースデータの合計サイズを超えていること。
移行されたデータを受信するために、送信先の ApsaraMQ for Kafka インスタンスにトピックが作成されていること。「ステップ 3:リソースの作成」の「ステップ 1:トピックの作成」セクションをご参照ください (ステップ 3:リソースの作成)。
バージョン 0.10.1.0 から 2.0 の Kafka クラスターが実行されていること。
移行するオブジェクトに対する読み取り権限がソースデータベースアカウントに付与されていること。必要な権限を付与します:
GRANT SELECT ON <database_name>.* TO '<dts_user>'@'%';
制限事項
ソースデータベース
| 制約 | 詳細 |
|---|---|
| 帯域幅 | ソースサーバーには十分なアウトバウンド帯域幅が必要です。帯域幅が低いと移行速度が低下します。 |
| プライマリキー要件 | テーブルには、一意のフィールド値を持つ PRIMARY KEY または UNIQUE 制約が必要です。これらがない場合、送信先に重複したレコードが含まれる可能性があります。 |
| テーブル数 (名前変更時) | テーブルを移行オブジェクトとして選択し、送信先でテーブルまたは列の名前を変更する場合、1 つのタスクでサポートされるテーブルは最大 1,000 です。1,000 を超えるテーブルの場合は、複数のタスクに分割するか、データベース全体を移行してください。 |
| 移行中の DDL 操作 | スキーマ移行または完全データ移行中に、データベースまたはテーブルスキーマを変更する DDL 文を実行しないでください。タスクは失敗します。 |
| 完全データ移行のみの移行中の書き込み | 増分データ移行を含めない場合は、移行中にソースへの書き込みを行わないでください。一貫性を保証するには、スキーマ移行、完全データ移行、増分データ移行を一緒に選択してください。 |
| バイナリログデータ | 物理バックアップの復元やカスケード操作によって生成された変更データはバイナリログに記録されず、移行されません。 |
| MySQL 8.0.23 以降の不可視カラム | DTS は不可視カラムを読み取ることができず、データ損失が発生します。タスクを開始する前に ALTER TABLE <table_name> ALTER COLUMN <column_name> SET VISIBLE; を実行してください。プライマリキーのないテーブルは自動的に不可視のプライマリキーを生成します。これらも可視にしてください。詳細については、「Invisible Columns」および「Generated Invisible Primary Keys」をご参照ください。 |
| FLOAT/DOUBLE の精度 | DTS は ROUND(COLUMN, PRECISION) を使用して値を取得します。精度を指定しない場合、DTS は FLOAT を 38 桁、DOUBLE を 308 桁に設定します。これらのデフォルト値が要件を満たしていることを確認してください。 |
増分移行のためのバイナリログ要件
増分データ移行を有効にする前に、これらの MySQL 設定を確認および構成してください:
| 設定 | 必須値 | 確認方法 |
|---|---|---|
binlog_format | row | SHOW GLOBAL VARIABLES LIKE 'binlog_format'; |
binlog_row_image | full | SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'; |
log_slave_updates | ON (自己管理のデュアルプライマリクラスターのみ) | SHOW GLOBAL VARIABLES LIKE 'log_slave_updates'; |
| バイナリログ保持期間 (RDS for MySQL) | 少なくとも 3 日 (7 日を推奨) | バイナリログの自動削除パラメーターの設定 |
| バイナリログ保持期間 (セルフマネージド MySQL) | 少なくとも 7 日 | SHOW VARIABLES LIKE 'expire_logs_days'; |
セルフマネージド MySQL インスタンスの場合、対応するコマンドを実行して非準拠の設定を修正できます:
-- binlog 形式を設定
SET GLOBAL binlog_format = 'ROW';
-- binlog row image を設定
SET GLOBAL binlog_row_image = 'FULL';ApsaraDB RDS for MySQL インスタンスの場合、バイナリログパラメーターは SET GLOBAL コマンドを直接実行するのではなく、RDS コンソールを通じて変更する必要があります。バイナリログの保持期間が最小値を下回る設定の場合、DTS がログを読み取れずにタスクが失敗する可能性があります。例外的なケースでは、データの不整合や損失が発生する可能性があります。これは、DTS とのサービスレベルアグリーメント (SLA) に影響します。
その他の制限事項
移行するオブジェクトとしてテーブルのみを選択できます。
ソースおよび送信先サーバーへの負荷を軽減するため、オフピーク時に移行を実行してください。
本番ワークロードを送信先に切り替える前に、過去 7 日間に失敗した DTS タスクを停止またはリリースしてください。DTS は失敗したタスクを自動的に再開しようとし、送信先のデータを上書きする可能性があります。または、送信先に対する DTS アカウントの書き込み権限を取り消してください。
移行中に他のソースから送信先にデータが書き込まれると、データの不整合が発生します。
移行中に送信先の Kafka クラスターがアップグレードまたはダウングレードされた場合は、移行タスクを再起動してください。
ソースの RDS for MySQL インスタンスで TDE (透過的データ暗号化) が有効になっている場合、スキーマ移行、完全データ移行、増分データ移行はすべてサポートされます。代わりに EncDB 機能が有効になっている場合、完全データ移行はサポートされません。
DTS タスクが失敗した場合、DTS テクニカルサポートは 8 時間以内に復旧を試みます。復旧中、タスクが再起動されたり、タスクレベルのパラメーターが変更されたりする可能性があります。データベースパラメーターは変更されません。
特殊なケース
セルフマネージド MySQL:
タスク実行中にプライマリ/セカンダリ スイッチオーバーが発生した場合、タスクは失敗します。
ソースで長期間 DML 操作が実行されない場合、移行遅延が不正確に表示されることがあります。ソースで任意の DML 操作を実行して遅延を更新してください。データベース全体を移行した場合は、毎秒更新を受け取るハートビートテーブルを作成してください。
DTS は、バイナリログの位置を進めるために、ソースで定期的に
CREATE DATABASE IF NOT EXISTS \`test\`を実行します。
ApsaraDB RDS for MySQL:
読み取り専用の RDS for MySQL V5.6 インスタンス (トランザクションログを記録しない) は、増分データ移行のソースとして使用できません。
DTS は、バイナリログの位置を進めるために、ソースで定期的に
CREATE DATABASE IF NOT EXISTS \`test\`を実行します。
課金
| 移行タイプ | タスク構成料金 | インターネットトラフィック料金 |
|---|---|---|
| スキーマ移行 + 完全データ移行 | 無料 | データがインターネットを経由してAlibaba Cloudを通過する場合にのみ課金されます。詳細については、「課金概要」をご参照ください。 |
| 増分データ移行 | 課金済み。「課金概要」を参照してください。 | 上記と同じです。 |
増分移行でサポートされる SQL 操作
| 操作タイプ | SQL 文 |
|---|---|
| DML | INSERT, UPDATE, DELETE |
| DDL | CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, TRUNCATE TABLE |
| CREATE VIEW, ALTER VIEW, DROP VIEW | |
| CREATE PROCEDURE, ALTER PROCEDURE, DROP PROCEDURE | |
| CREATE FUNCTION, DROP FUNCTION, CREATE TRIGGER, DROP TRIGGER | |
| CREATE INDEX, DROP INDEX |
Kafka メッセージフォーマット
DTS は、2 つのフォーマットのいずれかで Kafka にレコードを書き込みます。ご利用のコンシューマー実装に一致するフォーマットを選択してください。
DTS Avro
レコードは DTS Avro スキーマ定義に従います。GitHub でスキーマをご参照ください。
Canal JSON
レコードは Canal JSON 形式で書き込まれます。次の例は、典型的な INSERT イベントを示しています:
{
"data": [
{
"id": "1",
"name": "example_value",
"updated_at": "2026-01-01 00:00:00"
}
],
"database": "example_db",
"es": 1735689600000,
"id": 1,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(255)",
"updated_at": "datetime"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"updated_at": 93
},
"table": "example_table",
"ts": 1735689600123,
"type": "INSERT"
}UPDATE イベントの場合、old フィールドには変更前の値 (変更前の列の値) が含まれます。
完全なパラメーターの詳細とサンプルコンシューマーコードについては、「Canal JSON データ形式」をご参照ください。
移行タスクの作成
ステップ 1:データ移行ページに移動
いずれかのコンソールを使用します:
DTS コンソール
DMS コンソール
ステップ 2:ソースデータベースとターゲットデータベースの構成
[タスクの作成] をクリックします。
警告ソースと送信先を構成した後、続行する前にページの上部に表示される [使用制限] をお読みください。このステップをスキップすると、タスクが失敗したり、データの不整合が発生したりする可能性があります。
次の表のパラメーターを構成します。
ソースデータベース
| パラメーター | 説明 |
|---|---|
| タスク名 | DTS タスクの名前。DTS はデフォルト名を生成します。タスクを識別しやすくするために、説明的な名前を使用してください。一意性は必須ではありません。 |
| 既存の接続を選択 | ソースインスタンスがすでに DTS に登録されている場合は、リストから選択してください。DTS が残りのパラメーターを自動入力します。それ以外の場合は、以下のパラメーターを設定してください。 |
| データベースタイプ | MySQL を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | ソース ApsaraDB RDS for MySQL インスタンスが存在するリージョン。 |
| Alibaba Cloudアカウント全体でのデータの複製 | 同一アカウント間の移行の場合は、[いいえ] を選択します。 |
| RDS インスタンス ID | ソース ApsaraDB RDS for MySQL インスタンスの ID。 |
| データベースアカウント | 移行されるオブジェクトに対する読み取り権限を持つデータベースアカウント。 |
| データベースパスワード | データベースアカウントのパスワード。 |
| 暗号化 | 「[暗号化なし]」または「[SSL 暗号化]」を選択します。SSL を使用するには、まず RDS インスタンスで SSL 暗号化を有効化する必要があります。詳細については、「クラウド証明書を使用して SSL 暗号化を有効化する」をご参照ください。 |
宛先データベース
| パラメーター | 説明 |
|---|---|
| 既存の接続を選択 | 宛先インスタンスがすでに DTS に登録されている場合は、リストから選択します。登録されていない場合は、以下のパラメーターを設定します。 |
| データベースタイプ | [Kafka] を選択します。 |
| アクセス方法 | [Alibaba Cloud インスタンス] を選択します。 |
| インスタンスリージョン | 宛先の ApsaraMQ for Kafka インスタンスが存在するリージョン。 |
| Kafka インスタンス ID | 宛先の ApsaraMQ for Kafka インスタンスの ID。 |
| 暗号化 | セキュリティ要件に基づいて、[暗号化なし] または [SCRAM-SHA-256] を選択します。 |
| トピック | 移行データを受信するトピック。ドロップダウンリストから選択します。 |
| DDL 情報を格納するトピック | DDL イベントデータを格納するトピック。このパラメーターを空白のままにした場合、DDL 情報は [トピック] で指定されたトピックに書き込まれます。 |
| Kafka Schema Registry を使用 | Kafka Schema Registry を使用してスキーマ管理を行うかどうかを指定します。[いいえ] または [はい]アラート通知設定) を選択します。[はい] を選択した場合は、ご利用の Avro スキーマ用に Kafka Schema Registry に登録されている URL または IP アドレスを入力します。 |
[接続をテストして続行] をクリックします。
DTS はソースおよび送信先にアクセスできる必要があります。両方のデータベースのセキュリティ設定に DTS サーバーの CIDR ブロックを追加します。詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。ソースまたは送信先が Alibaba Cloud インスタンス以外のアクセス方法を使用する自己管理データベースである場合は、[接続テスト] を [DTS サーバーの CIDR ブロック] ダイアログボックスでクリックします。
ステップ 3:移行オブジェクトの選択と設定の構成
[オブジェクトの構成] ページで、次のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 移行タイプ | ニーズに応じて移行タイプを選択します:- スキーマ移行 + 完全データ移行のみ:[スキーマ移行] と [完全データ移行] を選択します。- 継続的なレプリケーションを伴うゼロダウンタイム移行:[スキーマ移行]、[完全データ移行]、および [増分データ移行] を選択します。 説明 送信先が ApsaraMQ for Kafka インスタンスの場合、スキーマ移行はサポートされません。増分データ移行を選択しない場合は、移行中にソースへの書き込みを行わないでください。 |
| 競合テーブルの処理モード | - 事前チェックとエラーの報告: ソースと同じ名前のテーブルが既に送信先に存在する場合、事前チェックは失敗します。名前衝突を解決するには、オブジェクト名マッピング を使用してください。<br>- エラーを無視して続行: 名前衝突のチェックをスキップします。完全なデータ移行中は、送信先の競合するレコードが保持されます。増分データ移行中は、それらが上書きされます。注意して使用してください。 |
| Kafka のデータ形式 | レコードが Kafka に書き込まれる形式:- [DTS Avro]:レコードは DTS Avro スキーマに従います。GitHub でスキーマ定義をご参照ください。- [Canal JSON]:レコードは Canal JSON 形式で書き込まれます。パラメーターの詳細とサンプルコードについては、「Canal JSON データ形式」をご参照ください。 |
| Kafka データ圧縮形式 | Kafka メッセージの圧縮アルゴリズム:- [LZ4] (デフォルト):低い圧縮率と高い圧縮速度。- [GZIP]:高い圧縮率と低い圧縮速度。GZIP は大量の CPU リソースを消費します。- [Snappy]:中程度の圧縮率と中程度の圧縮速度。 |
| Kafka パーティションへのデータ転送ポリシー | DTS がレコードを Kafka パーティションにルーティングする方法。詳細については、「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。 |
| メッセージ確認応答メカニズム | DTS が Kafka に書き込む際に使用する確認応答ポリシー。詳細については、「メッセージ確認応答メカニズム」をご参照ください。 |
| 宛先インスタンスにおけるオブジェクト名の大文字小文字の区別 | 送信先のデータベース名、テーブル名、および列名の大文字小文字の処理ポリシーです。デフォルトは [DTS デフォルトポリシー] です。詳しくは、「オブジェクト名の大文字小文字の指定」をご参照ください。 |
| ソースオブジェクト | [ソースオブジェクト] から 1 つ以上のテーブルを選択し、矢印アイコンをクリックして [選択したオブジェクト] に移動します。テーブルのみ選択できます。 |
| 選択したオブジェクト | ソーステーブルを特定のトピックにルーティングしたり、パーティションを構成したり、パーティションキーを設定したりするには、オブジェクト名マッピング機能を使用します。詳細については、「オブジェクト名マッピング機能の使用」をご参照ください。オブジェクトに対する特定の DML または DDL 操作を移行するには、[選択したオブジェクト] でオブジェクトを右クリックし、含める操作を選択します。 |
[次へ:詳細設定] をクリックします。
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング用の専用クラスター | デフォルトでは、DTS はタスクを共有クラスターでスケジュールします。より高い安定性を確保するには、専用クラスターをご購入ください。詳細については、「DTS 専用クラスターとは」をご参照ください。 |
| 接続失敗時の再試行時間 | 接続失敗後に DTS が再試行する時間。有効な値:10~1,440 分。デフォルト:720。少なくとも 30 に設定してください。この時間内に DTS が再接続すると、タスクは再開されます。そうでない場合、タスクは失敗します。 説明 複数のタスクが同じソースまたは送信先を共有する場合、最後に設定された再試行時間が適用されます。再試行中も課金されます。 |
| その他の問題に対する再試行時間 | DDL または DML 操作の失敗後に DTS が再試行する時間。有効な値:1~1,440 分。デフォルト:10。少なくとも 10 に設定してください。この値は [接続失敗時の再試行時間] より小さくする必要があります。 |
| 完全データ移行の流量制御を有効にする | 完全データ移行中の読み取り/書き込みスループットを制限して、ソースへの負荷を軽減します。[ソースデータベースへの秒間クエリ数 (QPS)]、[完全データ移行の RPS]、および [完全移行のデータ移行速度 (MB/s)] を構成します。[完全データ移行] が選択されている場合にのみ利用可能です。 |
| 増分データ移行の流量制御を有効にする | 増分データ移行中のスループットを制限します。[増分データ移行の RPS] と [増分移行のデータ移行速度 (MB/s)] を構成します。[増分データ移行] が選択されている場合にのみ利用可能です。 |
| 順方向および逆方向タスクのハートビートテーブルに対する SQL 操作を削除するかどうか | DTS がソースにハートビート SQL を書き込むかどうかを制御します:- [はい]:DTS はハートビート SQL を書き込みません。コンソールでの移行遅延が高く表示されることがあります。- [いいえ]:DTS はハートビート SQL を書き込みます。ソースでの物理バックアップおよびクローニング操作が影響を受ける可能性があります。 |
| 環境タグ | DTS インスタンスを環境 (例:本番またはテスト) 別に分類するためのオプションのタグ。 |
| ETL の構成 | 抽出・変換・書き出し (ETL) 変換を適用するかどうか。コードエディタでデータ処理文を記述するには、[はい] を選択します。詳細については、「データ移行または同期タスクで ETL を設定する」をご参照ください。 |
| モニタリングとアラート | タスクの失敗または移行遅延が大きい場合にアラートを設定するかどうか。[はい] を選択し、アラートのしきい値と通知先の連絡先を設定します。「モニタリングとアラートの設定」をご参照ください。 |
ステップ 4:事前チェックの実行
このタスクの API パラメーターをプレビューするには、[次へ:タスク設定を保存して事前チェック] にカーソルを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。
[次へ:タスク設定を保存して事前チェック] をクリックします。DTS はタスクを開始する前に自動事前チェックを実行します。いずれかの項目が失敗した場合:
失敗した項目の横にある [詳細を表示] をクリックし、問題を修正してから [再度事前チェック] をクリックします。
アラート項目が重大でなく、無視できる場合は、[アラート詳細の確認] > [無視] > [OK] をクリックし、その後 [再度事前チェック] をクリックします。アラートを無視すると、データの不整合につながる可能性があります。
ステップ 5:インスタンスの購入とタスクの開始
[成功率] が 100% に達するのを待ってから、[次へ:インスタンスの購入] をクリックします。
[インスタンスの購入] ページで、以下を構成します:
| セクション | パラメーター | 説明 |
|---|---|---|
| 新しいインスタンスクラス | リソースグループ | 移行インスタンスのリソースグループ。デフォルト:デフォルトリソースグループResource Management とは |
| インスタンスクラス | 移行スループットはインスタンスクラスによって決まります。「データ移行インスタンスのインスタンスクラス」をご参照ください。 |
[Data Transmission Service (従量課金) サービス規約] を読み、チェックボックスを選択して同意します。
[購入して開始] をクリックし、確認ダイアログで [OK] をクリックします。
タスクの確認
[データ移行] ページで、タスクのステータスを確認します:
[完了]:タスクにはスキーマ移行と完全データ移行のみが含まれていました。増分移行は実行されていません。
[実行中]:タスクには増分データ移行が含まれています。タスクは継続的に実行され、自動停止しません。
オブジェクト名マッピング機能の使用
この機能を使用して、ソーステーブルのデータを特定の Kafka トピックにルーティングしたり、パーティション数を制御したり、ハッシュベースのルーティングのためのパーティションキーを設定したりします。
[選択したオブジェクト] セクションで、テーブル名にカーソルを合わせます。
右クリックして [編集] を選択します。
ダイアログボックスで、次のパラメーターを構成します。
| パラメーター | 説明 |
|---|---|
| テーブル名 | このソーステーブルからデータを受信する Kafka トピックの名前。デフォルトは 宛先データベース 設定で指定されたトピックです。 重要 送信先が ApsaraMQ for Kafka の場合、トピックはインスタンスに既に存在している必要があります。スキーマ移行が含まれる自己管理 Kafka クラスターの場合、DTS は自動的にトピックを作成します。この値を変更すると、テーブルのデータが指定されたトピックにルーティングされます。 |
| フィルター条件 | 行を選択的に移行するための SQL フィルター条件。詳細については、「フィルター条件の指定」をご参照ください。 |
| パーティション数 | ターゲットトピックのパーティション数。 |
| パーティションキー | パーティションルーティングのハッシュ値を計算するために使用される 1 つ以上の列。[Kafka パーティションへのデータ送信ポリシー] が [主キーのハッシュ値に基づいてデータを個別のパーティションに送信] に設定されている場合にのみ使用できます。[すべてのテーブルを同期] を選択する前に、これをオフにしてください。 |
[OK] をクリックします。
オブジェクト名マッピングを使用してオブジェクトの名前を変更すると、それに依存する他のオブジェクトの移行が失敗する可能性があります。