Data Transmission Service (DTS) は、MongoDB から Kafka クラスターへのデータ同期をサポートしています。このトピックでは、ソースデータベースとしてレプリカセットアーキテクチャを使用する MongoDB インスタンスと、ターゲットデータベースとして Message Queue for Apache Kafka インスタンスを使用して同期操作を実行する手順について説明します。
前提条件
宛先の Message Queue for Apache Kafka インスタンスを作成済みであること。
説明サポートされているソースデータベースとターゲットデータベースのバージョンについては、「同期ソリューションの概要」をご参照ください。
宛先の Message Queue for Apache Kafka インスタンスでデータを受信するための トピックを作成済みであること。
ソースデータベースが ApsaraDB for MongoDB シャードクラスターの場合、すべてのシャードノードのエンドポイントを申請する必要があります。シャードクラスターインスタンス内のシャードノードは、同じアカウントパスワードとエンドポイントを共有する必要があります。エンドポイントの申請方法の詳細については、「シャードのエンドポイントを申請する」をご参照ください。
考慮事項
タイプ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
課金
同期タイプ | タスク設定料金 |
完全データ同期 | 無料です。 |
増分データ同期 | 課金されます。詳細については、「課金の概要」をご参照ください。 |
同期タイプ
同期タイプ | 説明 |
完全同期 | ソースの ApsaraDB for MongoDB の同期オブジェクトのすべての既存データをターゲットの Kafka インスタンスに同期します。 説明 DATABASE と COLLECTION の完全同期がサポートされています。 |
増分同期 | 完全同期に加えて、ソースの ApsaraDB for MongoDB からターゲットの Kafka インスタンスに増分更新を同期できます。 Oplog の使用増分同期は、タスク開始後に作成されたデータベースをサポートしません。次の増分更新がサポートされています:
ChangeStream の使用次の増分更新がサポートされています:
|
データベースアカウントの権限
データベース | 必要な権限 | アカウントの作成と権限付与 |
ソース ApsaraDB for MongoDB | 同期するデータベース、admin データベース、および local データベースに対する読み取り権限。 |
手順
次のいずれかの方法でデータ同期ページに移動し、データ同期インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソールにログインします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、データ同期タスクが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソールにログインします。
上部のナビゲーションバーで、[Data + AI] にポインターを合わせ、 を選択します。
データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク設定ページに移動します。
ソースデータベースとターゲットデータベースを設定します。次の表にパラメーターを示します。
カテゴリ
設定
説明
なし
タスク名
DTS タスクの名前。DTS は自動的にタスク名を生成します。タスクを簡単に識別できるわかりやすい名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。
移行元データベース
既存の接続情報の選択
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
DTS へのインスタンスの登録に失敗した場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
MongoDB を選択します。
アクセス方法
Alibaba Cloud インスタンスを選択します。
インスタンスのリージョン
ソース ApsaraDB for MongoDB インスタンスのリージョンを選択します。
Alibaba Cloud アカウント間でデータを複製
この例では、現在の Alibaba Cloud アカウントのデータベースが使用されます。×を選択します。
アーキテクチャ
この例では レプリカセットを選択します。
説明ソース ApsaraDB for MongoDB インスタンスが シャードクラスターの場合は、Shardアカウントと Shardパスワードも入力する必要があります。
移行方法
ソースデータベースから増分データを同期するために使用されるメソッド。ビジネス要件に基づいてメソッドを選択します。有効な値:
Oplog (推奨):
このオプションは、ソースデータベースで oplog 機能が有効になっている場合に使用できます。
説明デフォルトでは、oplog 機能は自己管理 MongoDB データベースと ApsaraDB for MongoDB インスタンスの両方で有効になっています。この機能により、ログのプル速度が速いため、低遅延で増分データを同期できます。したがって、[移行方法] パラメーターには Oplog を選択することをお勧めします。
ChangeStream:
このオプションは、ソースデータベースで変更ストリームが有効になっている場合に使用できます。詳細については、「Change Streams」をご参照ください。
説明ソースデータベースが非エラスティック Amazon DocumentDB クラスターの場合、[移行方法] パラメーターは ChangeStream にのみ設定できます。
アーキテクチャ パラメーターに シャードクラスター を選択した場合、Shardアカウント および Shardパスワード パラメーターを設定する必要はありません。
インスタンス ID
ソース ApsaraDB for MongoDB のインスタンス ID を選択します。
認証データベース
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントのデータベース名を入力します。変更していない場合、デフォルトは admin です。
データベースアカウント
ソース ApsaraDB for MongoDB のデータベースアカウントを入力します。権限要件については、「データベースアカウントに必要な権限」をご参照ください。
データベースのパスワード
データベースへのアクセスに使用するパスワード。
暗号化
ソースデータベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、非暗号化、SSL 暗号化、または Mongo Atlas SSL を選択できます。暗号化 パラメーターで使用できるオプションは、アクセス方法 および アーキテクチャ パラメーターに選択された値によって決まります。DTS コンソールに表示されるオプションが優先されます。
説明[アーキテクチャ] パラメーターが [シャードクラスター] に設定され、ApsaraDB for MongoDB データベースの [移行方法] パラメーターが Oplog に設定されている場合、[暗号化] パラメーター SSL 暗号化 は使用できません。
ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースであり、アクセス方法 パラメーターが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメーターが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードしてソースデータベースへの接続を検証できます。
移行先データベース
既存の接続情報の選択
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
DTS へのインスタンスの登録に失敗した場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
Kafka を選択します。
アクセス方法
Alibaba Cloud インスタンスを選択します。
インスタンスのリージョン
宛先 Kafka インスタンスが存在するリージョンを選択します。
Kafka インスタンス ID
宛先 Kafka インスタンスの ID を選択します。
暗号化
ビジネスおよびセキュリティ要件に基づいて、非暗号化または SCRAM-SHA-256 を選択します。
トピック
ドロップダウンリストから、データを受信するために使用するトピックを選択します。
Kafka スキーマレジストリの使用
Kafka スキーマレジストリは、Avro スキーマを保存および取得するための RESTful インターフェイスを提供するメタデータサービスレイヤーです。
×: Kafka スキーマレジストリを使用しません。
○: Kafka スキーマレジストリを使用します。Avro スキーマ用に Kafka スキーマレジストリに登録されている URL または IP アドレスを入力する必要があります。
ページの下部にある 接続をテストして続行 をクリックします。
説明DTS サーバーからのアクセスを許可するために、DTS サーバーの CIDR ブロックがソースデータベースとターゲットデータベースのセキュリティ設定に自動または手動で追加できることを確認してください。詳細については、「DTS サーバーの IP アドレスをホワイトリストに追加する」をご参照ください。
ソースデータベースまたはターゲットデータベースが自己管理データベースであり、その アクセス方法 が Alibaba Cloud インスタンス に設定されていない場合、DTS サーバーの CIDR ブロック ダイアログボックスで 接続テスト をクリックします。
同期するオブジェクトを設定します。
オブジェクト設定 ステップで、同期するオブジェクトを設定します。
設定
説明
同期タイプ
デフォルトでは、[増分データ同期] が選択されています。[完全データ同期] のみを選択できます。[スキーマ同期] は選択できません。事前チェックが完了すると、DTS は選択したオブジェクトの既存データをソースデータベースからターゲットデータベースに同期します。既存データは、後続の増分同期の基礎となります。
競合するテーブルの処理モード
エラーの事前チェックと報告: ターゲットデータベースにソースデータベースのコレクションと同じ名前のコレクションが含まれているかどうかを確認します。ソースデータベースとターゲットデータベースに同じコレクション名のコレクションが含まれていない場合、事前チェックは合格します。それ以外の場合、事前チェック中にエラーが返され、データ同期インスタンスを開始できません。
説明ソースデータベースとターゲットデータベースに同じ名前のコレクションがあり、ターゲットデータベースのコレクションを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるコレクションの名前を変更できます。詳細については、「同期するオブジェクトの名前を変更する」をご参照ください。
エラーを無視して続行: ソースデータベースとターゲットデータベースの同じコレクション名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスに潜在的なリスクが生じる可能性があります。
ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じプライマリキー値または一意キー値を持つ場合、DTS はそのデータレコードをターゲットデータベースに同期しません。ターゲットデータベースの既存のデータレコードが保持されます。
データの初期化に失敗したり、特定の列のみが同期されたり、データ同期インスタンスが失敗したりする可能性があります。
Kafka のデータ形式
Canal JSON のみがサポートされています。
説明Kafka が受信するデータは、3 つのシナリオに分類できます。
Kafka 圧縮形式
Kafka 圧縮データの圧縮形式。ビジネス要件に基づいて圧縮形式を選択します。有効な値:
LZ4 (デフォルト): 低圧縮率、高圧縮速度。
GZIP: 高圧縮率、低圧縮速度。
説明GZIP 圧縮は大量の CPU リソースを消費します。
Snappy: 中程度の圧縮率、中程度の圧縮速度。
Kafka パーティションへのデータ転送ポリシー
要件に基づいてポリシーを選択します。
メッセージ肯定応答メカニズム
要件に基づいてメッセージ確認メカニズムを選択します。
DDL 情報を格納するトピック
移行先インスタンスでのオブジェクト名の大文字化
宛先インスタンスのデータベース名とコレクション名の大文字/小文字。デフォルトでは、[DTS デフォルトポリシー] が選択されています。他のオプションを選択して、オブジェクト名の大文字/小文字がソースまたはターゲットデータベースのオブジェクト名のデフォルトの大文字/小文字と一致するようにすることができます。詳細については、「宛先インスタンスのオブジェクト名の大文字/小文字を指定する」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、
アイコンをクリックしてオブジェクトを 選択中のオブジェクト セクションに追加します。説明コレクションレベルで同期するオブジェクトを選択できます。
選択中のオブジェクト
この例では、追加の設定は必要ありません。
マッピング機能を使用して、ソースデータベースのコレクションの宛先 Kafka インスタンスのマッピング情報を設定できます。
次へ:詳細設定 をクリックして詳細設定を行います。
設定
説明
タスクのスケジュールに使用する専用クラスターの選択
専用クラスターを指定しない場合、DTS はデフォルトでタスクを共有クラスターにスケジュールします。データ同期インスタンスの安定性を向上させたい場合は、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
失敗した接続のリトライ時間範囲。データ同期タスクの開始後にソースまたはターゲットデータベースへの接続に失敗した場合、DTS は時間範囲内にすぐに接続をリトライします。有効な値: 10~1440。単位: 分。デフォルト値: 720。このパラメーターを 30 より大きい値に設定することをお勧めします。DTS が指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTS はデータ同期タスクを再開します。それ以外の場合、データ同期タスクは失敗します。
説明同じソースまたはターゲットデータベースを持つ複数のデータ同期タスクに異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。
DTS が接続をリトライすると、DTS インスタンスに対して課金されます。ビジネス要件に基づいてリトライ時間範囲を指定することをお勧めします。ソースインスタンスと宛先インスタンスがリリースされた後、できるだけ早く DTS インスタンスをリリースすることもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題のリトライ時間範囲。たとえば、データ同期タスクの開始後に DDL または DML 操作が失敗した場合、DTS は時間範囲内にすぐに操作をリトライします。有効な値: 1~1440。単位: 分。デフォルト値: 10。このパラメーターを 10 より大きい値に設定することをお勧めします。失敗した操作が指定された時間範囲内に正常に実行されると、DTS はデータ同期タスクを再開します。それ以外の場合、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値より小さくする必要があります。
更新操作後にドキュメント全体を取得するかどうか
増分データ同期中に、更新操作に対応するドキュメントの完全なデータを宛先に同期するかどうかを指定します。
説明この設定項目は、移行方法 が ChangeStream に設定されている場合にのみ使用できます。
○: 更新されたフィールドを含むドキュメントの完全なデータを同期します。
重要この機能は MongoDB のネイティブ機能に基づいており、ソースデータベースの負荷を増加させる可能性があります。これにより、増分データ収集の速度が低下し、同期インスタンスで遅延が発生する可能性があります。
DTS が完全なデータを取得できない場合、更新されたフィールドのデータのみが同期されます。
×: 更新されたフィールドのデータのみを同期します。
完全同期レートを制限するかどうか
完全データ同期中、DTS はソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。完全データ同期タスクの 1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメーターを設定して、ターゲットデータベースサーバーの負荷を軽減できます。
説明このパラメーターは、同期タイプ パラメーターに 完全データ同期 が選択されている場合にのみ設定できます。
同期するデータのうち、同一テーブル内のプライマリキー_id のデータ型が一意かどうか
同期するデータの単一コレクション内のプライマリキー _id のデータ型は 1 つのみ。
説明このパラメーターは、同期タイプ パラメーターに 完全データ同期 が選択されている場合にのみ表示されます。
○: 完全データ同期中、DTS はソースデータベースから同期するデータのプライマリキーのデータ型をスキャンしません。
×: 完全データ同期中、DTS はソースデータベースから同期するデータのプライマリキーのデータ型をスキャンします。
増分同期率を制限するかどうか
増分データ同期のスロットリングを有効にするかどうかを指定します。ビジネス要件に基づいて、増分データ同期のスロットリングを有効にできます。スロットリングを設定するには、1 秒あたりの増分同期の行数 RPS および 1 秒あたりの増分同期データ量 (MB) BPS パラメーターを設定する必要があります。これにより、ターゲットデータベースサーバーの負荷が軽減されます。
環境タグ
要件に基づいてインスタンスを識別するための環境タグを選択できます。この例では、タグを選択する必要はありません。
ETL の設定
抽出・変換・書き出し (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効な値:
はい: ETL 機能を設定します。コードエディタにデータ処理ステートメントを入力できます。詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。
いいえ: ETL 機能を設定しません。
監視アラート
データ同期インスタンスのアラートを設定するかどうかを指定します。タスクが失敗した場合、または同期遅延が指定されたしきい値を超えた場合、アラート連絡先は通知を受け取ります。有効な値:
いいえ: アラートを有効にしません。
はい: アラートを設定します。この場合、アラートのしきい値と アラート通知設定も設定する必要があります。詳細については、「モニタリングとアラートの設定」トピックの「DTS タスク作成時のモニタリングとアラートの設定」セクションをご参照ください。
タスク設定を保存し、事前チェックを実行します。
関連する API 操作を呼び出して DTS タスクを設定する際に指定するパラメーターを表示するには、次:タスク設定の保存と事前チェック にポインターを合わせ、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示する必要がない場合、または表示済みの場合、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後にのみ、データ同期タスクを開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。次に、事前チェックを再実行します。
事前チェック中に項目のアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして問題をトラブルシューティングします。次に、事前チェックを再度実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。[詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再度事前チェック] をクリックして事前チェックを再度実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスに潜在的なリスクが生じる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になるまで待ちます。次に、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、データ同期タスクの課金方法とインスタンスクラスのパラメーターを設定します。次の表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション: データ同期インスタンスを作成するときにサブスクリプションの料金を支払います。サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは時間単位で課金されます。従量課金方法は短期使用に適しています。従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループ設定
データ同期インスタンスが属するリソースグループ。デフォルト値: デフォルトのリソースグループ。詳細については、「Resource Management とは」をご参照ください。
インスタンスクラス
DTS は、同期速度が異なるインスタンスクラスを提供します。ビジネス要件に基づいてインスタンスクラスを選択できます。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプション課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。サブスクリプション期間は、1~9 か月、1 年、2 年、3 年、または 5 年です。
説明このパラメーターは、サブスクリプション 課金方法を選択した場合にのみ使用できます。
[Data Transmission Service (従量課金) サービス規約] を読み、選択します。
[購入して開始] をクリックします。表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進行状況を表示できます。
マッピング情報
選択中のオブジェクト エリアで、コレクションレベルの宛先 Topic 名にマウスポインターを合わせます。
宛先 Topic 名の横にある 編集 をクリックします。
表示される テーブルの編集 ダイアログボックスで、マッピング情報を設定します。
設定
説明
対象トピックの名前
ソースコレクションが同期される宛先 Topic の名前。デフォルトでは、ソースデータベースとターゲットデータベースの設定 ステップの 移行先データベース セクションで選択された トピック です。
重要入力するトピック名は、宛先 Kafka インスタンスに存在する必要があります。そうでない場合、データ同期は失敗します。
対象トピックの名前 を変更すると、データは入力した Topic に書き込まれます。
フィルタリング条件
詳細については、「フィルター条件の設定」をご参照ください。
パーティション数
宛先トピックにデータを書き込むためのパーティションの数。
[OK] をクリックします。
データ配信シナリオ
シナリオ 1: Oplog を使用した増分データの同期
主なインスタンス設定
移行方法 で、Oplog を選択します。
データ配信の例
ソース増分変更タイプ | ソース増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
シナリオ 2: ChangeStream を使用した増分データの同期 (更新されたフィールドのデータの同期)
主なインスタンス設定
移行方法 で、ChangeStream を選択します。[更新操作後に完全なドキュメントを取得] で、× を選択します。
データ配信の例
ソース増分変更タイプ | ソース増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
シナリオ 3: ChangeStream を使用した増分データの同期 (更新されたフィールドに対応するドキュメントの完全なデータの同期)
主なインスタンス設定
移行方法 を ChangeStream に設定し、[更新操作後に完全なドキュメントを取得] を ○ に設定します。
データ配信の例
ソース増分変更タイプ | ソース増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
特殊なケース
注意事項
更新イベントの fullDocument フィールドが欠落している場合、データ配信結果はOplog を使用して増分データを同期する場合と同じになります。
例
ソースベースデータ | ソース増分変更ステートメント | 宛先トピックが受信したデータ |
| |
よくある質問
[Kafka データ圧縮形式] を変更できますか?
はい。詳細については、「[同期するオブジェクトの変更]」をご参照ください。
[メッセージ確認メカニズム] を変更できますか?
はい。詳細については、「[同期するオブジェクトの変更]」をご参照ください。