このトピックでは、Data Transmission Service (DTS) を使用して ApsaraDB RDS for MySQL インスタンスから ApsaraMQ for Kafka インスタンスにデータを同期する方法について説明します。
前提条件
ソース ApsaraDB RDS for MySQL インスタンスと宛先 ApsaraMQ for Kafka インスタンスが作成されていること。
説明ApsaraDB RDS for MySQL インスタンスの作成方法の詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
ソースインスタンスと宛先インスタンスでサポートされているバージョンの詳細については、「データ同期シナリオの概要」をご参照ください。
宛先 ApsaraMQ for Kafka インスタンスで、同期されたデータを受信するためのトピックが作成されていること。詳細については、「ステップ 3: リソースの作成」トピックの「ステップ 1: トピックの作成」セクションをご参照ください。
宛先 ApsaraMQ for Kafka インスタンスの使用可能なストレージ容量が、ソース ApsaraDB RDS for MySQL インスタンスのデータの合計サイズよりも大きいこと。
使用上の注意
DTS は、ソースデータベースから宛先データベースに外部キーを同期しません。そのため、ソースデータベースのカスケード操作と削除操作は、宛先データベースに同期されません。
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
特殊なケース |
|
課金
同期タイプ | タスク構成料金 |
スキーマ同期と完全データ同期 | 無料。 |
増分同期 | 有料。詳細については、「課金概要」をご参照ください。 |
単一レコードのサイズ制限
Kafka に書き込むことができる単一レコードの最大サイズは 10 MB です。そのため、ソースデータの行のサイズが 10 MB を超える場合、DTS が Kafka にレコードを書き込むことができないため、関連する DTS タスクは中断されます。このシナリオでは、大きなフィールドを含むテーブル全体を同期するのではなく、テーブルの一部のフィールドのみを同期することをお勧めします。 DTS タスクを構成する場合は、これらの大きなフィールドのレコードを除外する必要があります。大きなフィールドを含むテーブルがタスクのオブジェクトに含まれている場合は、テーブルを削除し、テーブルをオブジェクトに再度追加してから、フィルタ条件を指定して大きなフィールドを除外する必要があります。
サポートされている同期トポロジ
一方向 1 対 1 同期
一方向 1 対多同期
一方向多対 1 同期
DTS でサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。
同期可能な SQL 操作
操作タイプ | SQL 文 |
DML | INSERT、UPDATE、および DELETE |
DDL |
|
手順
次のいずれかの方法を使用して [データ同期] ページに移動し、データ同期インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソール にログインします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、データ同期インスタンスが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
DMS コンソール にログインします。
上部のナビゲーションバーで、[データ + AI] にポインタを移動し、
を選択します。データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク構成ページに移動します。
ソースデータベースと宛先データベースを構成します。次の表にパラメータを示します。
警告ソースデータベースと宛先データベースを構成した後、ページに表示される [制限] を読んでおくことをお勧めします。そうでない場合、タスクが失敗したり、データの不整合が発生したりする可能性があります。
セクション
パラメータ
説明
該当なし
タスク名
DTS タスクの名前。 DTS はタスク名を自動的に生成します。タスクを簡単に識別できる説明的な名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。
移行元データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。 DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
ソースデータベースのタイプ。 MySQL を選択します。
アクセス方法
ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソース ApsaraDB RDS for MySQL インスタンスが存在するリージョン。
Alibaba Cloud アカウント間でデータを複製
Alibaba Cloud アカウント間でデータを同期するかどうかを指定します。この例では、× が選択されています。
RDS インスタンス ID
ソース ApsaraDB RDS for MySQL インスタンスの ID。
データベースアカウント
ソース ApsaraDB RDS for MySQL インスタンスのデータベースアカウント。アカウントには、同期対象のオブジェクトに対する読み取り権限が必要です。
データベースのパスワード
データベースへのアクセスに使用するパスワード。
暗号化
データベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、[非暗号化] または [SSL 暗号化] を選択できます。このパラメータを [SSL 暗号化] に設定する場合は、DTS タスクを構成する前に、ApsaraDB RDS for MySQL インスタンスで SSL 暗号化を有効にする必要があります。詳細については、「クラウド証明書を使用して SSL 暗号化を有効にする」をご参照ください。
移行先データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。 DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
宛先データベースのタイプ。 Kafka を選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
宛先 ApsaraMQ for Kafka インスタンスが存在するリージョン。
Kafka インスタンス ID
宛先 ApsaraMQ for Kafka インスタンスの ID。
暗号化
Kafka クラスタへの接続を暗号化するかどうかを指定します。ビジネスとセキュリティの要件に応じて、非暗号化 または SCRAM-SHA-256 を選択します。
トピック
同期されたデータを受信するために使用されるトピック。ドロップダウンリストからトピックを選択します。
DDL 情報を格納するトピック
DDL 情報を格納するために使用されるトピック。ドロップダウンリストからトピックを選択します。このパラメータを構成しない場合、DDL 情報は トピック パラメータで指定されたトピックに格納されます。
Kafka スキーマレジストリの使用
Kafka Schema Registry を使用するかどうかを指定します。 Kafka Schema Registry は、メタデータのサービス層を提供します。 Avro スキーマを格納および取得するための RESTful API を提供します。有効な値:
×。
○。このパラメータで [はい] を選択した場合は、Avro スキーマ用に Kafka Schema Registry に登録されている URL または IP アドレスを入力する必要があります。
ページの下部にある 接続をテストして続行 をクリックします。
説明DTS サーバーの CIDR ブロックをソースデータベースと宛先データベースのセキュリティ設定に自動または手動で追加して、DTS サーバーからのアクセスを許可できることを確認してください。詳細については、「DTS サーバーの CIDR ブロックの追加」をご参照ください。
ソースデータベースまたは宛先データベースが自己管理データベースであり、その アクセス方法 が Alibaba Cloud インスタンス に設定されていない場合は、DTS サーバーの CIDR ブロック ダイアログボックスの 接続テスト をクリックします。
同期するオブジェクトを構成します。
オブジェクト設定 ステップで、同期するオブジェクトを構成します。
パラメータ
説明
同期タイプ
同期タイプ。デフォルトでは、[増分同期] が選択されています。 [スキーマ同期] と [完全データ同期] も選択する必要があります。事前チェックが完了すると、DTS は選択したオブジェクトの既存データをソースデータベースから宛先クラスタに同期します。既存データは、後続の増分同期の基礎となります。
説明宛先データベースが ApsaraMQ for Kafka インスタンスの場合、[スキーマ同期] を選択することはできません。
競合するテーブルの処理モード
エラーの事前チェックと報告: 宛先データベースにソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。ソースデータベースと宛先データベースに同じテーブル名のテーブルが含まれていない場合、事前チェックは合格です。そうでない場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明ソースデータベースと宛先データベースに同じ名前のテーブルが含まれており、宛先データベースのテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、宛先データベースに同期されるテーブルの名前を変更できます。詳細については、「オブジェクト名のマッピング」をご参照ください。
エラーを無視して続行: ソースデータベースと宛先データベースの同じテーブル名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースと宛先データベースのスキーマが同じで、宛先データベースのデータレコードのプライマリキー値または一意キー値がソースデータベースのデータレコードと同じである場合:
完全データ同期中、DTS はデータレコードを宛先データベースに同期しません。宛先データベースの既存のデータレコードは保持されます。
増分同期中、DTS はデータレコードを宛先データベースに同期します。宛先データベースの既存のデータレコードは上書きされます。
ソースデータベースと宛先データベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。注意して進めてください。
[Kafka のデータ形式]
ApsaraMQ for Kafka インスタンスにデータを格納する形式。
[DTS Avro] を選択すると、DTS Avro のスキーマ定義に基づいてデータが解析されます。詳細については、「GitHub」をご参照ください。
[Canal JSON] を選択すると、データは Canal JSON 形式で保存されます。関連パラメーターと例の詳細については、「Kafka クラスターのデータ形式」Topic の Canal JSON セクションをご参照ください。
[Kafka データ圧縮形式]
Kafka 圧縮データの圧縮形式。ビジネス要件に基づいて圧縮形式を選択します。有効な値:
[LZ4] (デフォルト): 圧縮率が低く、圧縮速度が速い。
[GZIP]: 圧縮率が高く、圧縮速度が遅い。
説明GZIP 圧縮は大量の CPU リソースを消費します。
[Snappy]: 中程度の圧縮率と中程度の圧縮速度。
[Kafka パーティションへのデータ送信ポリシー]
Kafka パーティションにデータを同期するためのポリシー。ビジネス要件に基づいてポリシーを選択します。詳細については、「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。
[メッセージ確認応答メカニズム]
ビジネス要件に基づいてこのパラメータを指定します。詳細については、「メッセージ確認応答メカニズム」をご参照ください。
移行先インスタンスでのオブジェクト名の大文字化
宛先インスタンスのデータベース名、テーブル名、および列名の大文字と小文字の区別。デフォルトでは、[DTS デフォルトポリシー] が選択されています。オブジェクト名の大文字と小文字の区別がソースデータベースまたは宛先データベースと一致するように、他のオプションを選択できます。詳細については、「宛先インスタンスのオブジェクト名の大文字と小文字の区別の指定」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、
アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。
説明同期対象のオブジェクトとしてテーブルを選択できます。
選択中のオブジェクト
この例では、このパラメータを構成する必要はありません。オブジェクト名マッピング機能を使用して、ソーステーブルのデータの同期先のトピックの名前、トピックのパーティション数、パーティションキーなど、宛先 ApsaraMQ for Kafka インスタンスに関する情報を指定できます。詳細については、このトピックの「オブジェクト名マッピング機能の使用」セクションをご参照ください。
説明特定のデータベースまたはテーブルで実行される SQL 操作を選択するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。表示されるダイアログボックスで、同期する SQL 操作を選択します。
オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトが同期されない場合があります。
次へ:詳細設定 をクリックして、詳細設定を構成します。
パラメータ
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、専用クラスタを指定しない場合、DTS は共有クラスタにタスクをスケジュールします。データ同期タスクの安定性を向上させるには、専用クラスタを購入します。詳細については、「DTS 専用クラスタとは」をご参照ください。
失敗した接続の再試行時間
接続失敗時の再試行時間の範囲。データ同期タスクの開始後にソースデータベースまたは宛先データベースへの接続に失敗した場合、DTS は指定された時間範囲内で直ちに接続を再試行します。有効な値: 10 ~ 1440。単位: 分。デフォルト値: 720。このパラメータは 30 より大きい値に設定することをお勧めします。 DTS が指定された時間範囲内にソースデータベースと宛先データベースに再接続すると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
説明ソースデータベースまたは宛先データベースが同じ複数のデータ同期タスクに異なる再試行時間の範囲を指定した場合、最も短い再試行時間の範囲が優先されます。
DTS が接続を再試行すると、DTS インスタンスの料金が発生します。ビジネス要件に基づいて再試行時間の範囲を指定することをお勧めします。また、ソースインスタンスと宛先インスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題の再試行時間の範囲。たとえば、データ同期タスクの開始後に DDL 操作または DML 操作の実行に失敗した場合、DTS は指定された時間範囲内で直ちに操作を再試行します。有効な値: 1 ~ 1440。単位: 分。デフォルト値: 10。このパラメータは 10 より大きい値に設定することをお勧めします。失敗した操作が指定された時間範囲内で正常に実行されると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメータの値は、失敗した接続の再試行時間 パラメータの値よりも小さくなければなりません。
完全同期レートを制限するかどうか
完全データ同期中、DTS はソースデータベースと宛先データベースの読み取りリソースと書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。完全データ同期タスクの 1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメータを構成して、宛先データベースサーバーの負荷を軽減できます。
説明同期タイプ パラメータで 完全データ同期 が選択されている場合にのみ、このパラメータを構成できます。
増分同期率を制限するかどうか
増分同期のスロットリングを有効にするかどうかを指定します。ビジネス要件に基づいて、増分同期のスロットリングを有効にできます。スロットリングを構成するには、1 秒あたりの増分同期の行数 RPS と 1 秒あたりの増分同期データ量 (MB) BPS パラメータを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。
[順方向タスクと逆方向タスクのハートビートテーブルの SQL 操作を削除するかどうか]
DTS インスタンスの実行中に、ハートビートテーブルの SQL 操作をソースデータベースに書き込むかどうかを指定します。有効な値:
[はい]: ハートビートテーブルの SQL 操作を書き込みません。この場合、DTS インスタンスのレイテンシが表示される場合があります。
[いいえ]: ハートビートテーブルの SQL 操作を書き込みます。この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。
環境タグ
DTS インスタンスを識別するために使用される環境タグ。ビジネス要件に基づいて環境タグを選択できます。この例では、このパラメータを構成する必要はありません。
ETL の設定
抽出、変換、ロード (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効な値:
[はい]: ETL 機能を構成します。コードエディタにデータ処理文を入力できます。詳細については、「データ移行タスクまたはデータ同期タスクでの ETL の構成」をご参照ください。
[いいえ]: ETL 機能を構成しません。
[監視とアラート]
データ同期タスクのアラートを構成するかどうかを指定します。タスクが失敗した場合、または同期レイテンシが指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。有効な値:
[いいえ]: アラートを有効にしません。
[はい]: アラートを構成します。この場合、アラートしきい値と アラート通知設定 も構成する必要があります。詳細については、「監視とアラートの構成」トピックの「DTS タスクの作成時に監視とアラートを構成する」セクションをご参照ください。
タスク設定を保存し、事前チェックを実行します。
関連する API 操作を呼び出して DTS タスクを構成するときに指定するパラメータを表示するには、次:タスク設定の保存と事前チェック にポインタを移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメータを表示する必要がない場合、またはすでに表示している場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後にのみ、データ同期タスクを開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。その後、事前チェックを再実行します。
事前チェック中に項目のアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。その後、事前チェックを再実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。その後、[再チェック] をクリックして、事前チェックを再実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になるまで待ちます。その後、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、データ同期インスタンスの [課金方法] パラメータと [インスタンスクラス] パラメータを構成します。次の表にパラメータを示します。
セクション
パラメータ
説明
新しいインスタンスクラス
課金方法
サブスクリプション: データ同期インスタンスの作成時にサブスクリプション料金を支払います。サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは 1 時間単位で課金されます。従量課金方法は、短期使用に適しています。従量課金データ同期インスタンスが不要になった場合は、インスタンスを解放してコストを削減できます。
リソースグループ設定
データ同期インスタンスが属するリソースグループ。デフォルト値: [デフォルトのリソースグループ]。詳細については、「リソース管理とは」をご参照ください。
インスタンスクラス
DTS では、同期速度が異なるインスタンスクラスが提供されています。ビジネス要件に基づいてインスタンスクラスを選択できます。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプション課金方式を選択した場合、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1~9か月、1年、2年、3年、または5年です。
説明このパラメーターは、サブスクリプション 課金方式を選択した場合にのみ使用できます。
[Data Transmission Service(従量課金制)サービス利用規約] をよく読んで選択してください。
[購入して開始] をクリックします。表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進捗状況を確認できます。
オブジェクト名マッピング機能を使用する
選択中のオブジェクト セクションで、Topic 名にポインターを移動します。
編集 を右クリックします。
テーブルの編集 ダイアログボックスで、次の表に示すパラメーターを設定します。
パラメーター
説明
テーブル名
ソーステーブルのデータが同期される Topic の名前。デフォルトでは、このパラメーターは ソースデータベースとターゲットデータベースの設定 ステップの 移行先データベース セクションにある トピック の値に設定されています。
重要ターゲットデータベースが ApsaraMQ for Kafka インスタンスの場合、Topic はターゲットの ApsaraMQ for Kafka インスタンスに存在する必要があります。存在しない場合、データ同期は失敗します。ターゲットが自己管理 Kafka クラスタであり、スキーマ同期がデータ同期インスタンスに含まれている場合、DTS はターゲットデータベースに指定された Topic を作成しようとします。
ソーステーブルの テーブル名 パラメーターの値をターゲットデータベースの Topic 名に変更すると、テーブルのデータは指定された Topic に書き込まれます。
フィルタリング条件
詳細については、「フィルター条件を指定する」をご参照ください。
パーティション数
データが同期される Topic のパーティション数。
パーティションキー
Kafka パーティションへのデータ転送ポリシー パラメーターを 主キーのハッシュ値に基づいて、データを個別のパーティションに転送 に設定した場合、このパラメーターを設定して、1 つ以上の列をパーティションキーとして指定し、ハッシュ値を計算します。 DTS は、計算されたハッシュ値に基づいて、ターゲット Topic の各パーティションに異なる行を配信します。
説明列の パーティションキー を選択するには、まず すべてのテーブルを同期 をオフにする必要があります。
[OK] をクリックします。
よくある質問
[Kafka データ圧縮形式] を変更できますか。
はい。詳細については、「[同期対象オブジェクトの変更]」をご参照ください。
[メッセージ受信確認メカニズム] を変更できますか。
はい。詳細については、「[同期対象オブジェクトの変更]」をご参照ください。