Data Transmission Service (DTS) を使用すると、ApsaraDB for MongoDB の変更データをストリーミング方式で Message Queue for Apache Kafka インスタンスに送信できます。以下の手順では、MongoDB レプリカセットをソース、Kafka インスタンスをターゲットとして同期タスクを作成する方法を説明します。
本手順の対象範囲:
サポートされるアーキテクチャ:レプリカセットおよびシャードクラスター
増分同期方法:Oplog(推奨)および Change Streams
データ形式:Canal JSON 形式で Kafka トピックに配信
同期範囲:コレクション単位での完全同期および増分同期
課金:完全データ同期は無料、増分データ同期は課金対象
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Message Queue for Apache Kafka インスタンス
ターゲット Kafka インスタンス内に、データ受信用に作成済みの トピック
(シャードクラスターのみ) すべてのシャードノードにエンドポイントを適用し、同じアカウントパスワードとエンドポイントを共有する必要があります。 詳細については、「シャードのエンドポイントを申請する」をご参照ください。
サポートされているソースおよびターゲットデータベースのバージョンについては、「同期ソリューションの概要」をご参照ください。
制限事項
同期タスクを作成する前に、以下の制限事項を確認してください。
ソースデータベースの制限
ソースサーバーには十分なアウトバウンド帯域幅が必要です。帯域幅が不足していると、同期速度が低下します。
コレクションの名前マッピングを設定する場合、1 つのタスクで同期できるコレクション数は最大 1,000 個です。この上限を超えるとリクエストエラーが発生します。1,000 個を超えるコレクションを同期する場合は、複数のタスクを作成するか、名前マッピングを適用せずにデータベース全体を同期してください。
シャードクラスターをソースとする場合:各同期対象コレクションの
_idフィールドは一意である必要があります。一意でない場合、データの不整合が発生する可能性があります。シャードクラスターをソースとする場合:mongos ノードの数は 10 個を超えてはならず、インスタンス内に孤立ドキュメントが存在してはなりません。「よくある質問」トピックをご参照いただき、孤立ドキュメントの削除方法をご確認ください。
スタンドアロンの ApsaraDB for MongoDB インスタンス、Azure Cosmos DB for MongoDB クラスター、Amazon DocumentDB エラスティッククラスターは、ソースとしてサポートされていません。
DTS は SRV エンドポイント経由で MongoDB に接続できません。
oplog は有効化され、ログデータを少なくとも 7 日間保持する必要があります。または、Change Streams を有効化し、DTS が過去 7 日間のデータ変更をサブスクライブできるようにする必要があります。いずれの条件も満たさない場合、DTS がソースの変更を取得できず、サービスレベルアグリーメント(SLA)の範囲外でデータ損失または不整合が発生する可能性があります。
重要- ソースデータベースの変更を記録するには、oplog を使用することを推奨します(推奨)。 - Change Streams を使用するには、MongoDB 4.0 以降が必要です。Change Streams を使用した双方向同期はサポートされていません。 - 非エラスティックの Amazon DocumentDB クラスターでは、Change Streams を有効化し、移行方法 を ChangeStream に、アーキテクチャ を シャードクラスター に設定します。
完全データ同期中は、データベースまたはコレクションのスキーマを変更したり、ARRAY 型のデータを変更したりしないでください。
シャードクラスターをソースとする場合、同期中に以下のコマンドを実行しないでください:
shardCollection、reshardCollection、unshardCollection、moveCollection、movePrimary。これらのコマンドはデータの不整合を引き起こす可能性があります。ソースデータベースがシャードクラスター構成の MongoDB インスタンスであり、その balancer がデータのバランス調整を行っている場合、インスタンスに遅延が発生する可能性があります。
その他の制限
コレクション単位の同期のみがサポートされています。
admin、config、localデータベースは同期できません。1 件のレコードが 10 MB を超えると、タスクは失敗します。
トランザクションは保持されません。DTS は各トランザクションをターゲット側で個別のレコードに変換します。
DTS タスク実行中に、ターゲット Kafka インスタンスのブローカーノードが追加または削除された場合、DTS タスクを再起動してください。
DTS がソースおよびターゲットインスタンスに接続できることを確認してください。たとえば、データベースインスタンスのセキュリティ設定や、自己管理型 Kafka インスタンスの
server.propertiesファイル内のlistenersおよびadvertised.listenersパラメーターが、DTS からのアクセスを制限していないことを確認してください。CPU 使用率がソースおよびターゲットデータベースともに 30 % 未満となるオフピーク時間帯に同期を実行してください。
DTS は、実行期間が 7 日間 未満の失敗したインスタンスを自動的に再試行します。トラフィックをターゲットに切り替える前に、同期インスタンスを停止またはリリースして、自動再開によるターゲットデータの上書きを防止してください。
DTS は、ターゲットデータベース内の最新同期データのタイムスタンプとソースデータベース内の現在時刻に基づいて、増分データ同期の同期遅延を計算します。ソースデータベースで長期間更新操作が行われない場合、同期遅延の値が不正確になる可能性があります。同期遅延が過度に高くなった場合は、ソースデータベースで更新操作を実行して遅延値を更新してください。
DTS タスクの実行に失敗した場合、DTS テクニカルサポートが 8 時間 以内にタスクの復旧を試みます。復旧中はタスクが再起動されることがあり、タスクのパラメーターが変更される場合があります。変更されるのは DTS タスクのパラメーターのみであり、データベースのパラメーターは変更されません。変更される可能性のあるパラメーターには、「インスタンスパラメーターの変更」セクションのパラメーターが含まれますが、これに限定されません。
増分同期方法として Oplog を選択したシャードクラスターをソースとする場合、DTS はシャード間の書き込み順序をターゲット Kafka トピックに対して保証しません。
シャードクラスターのソースの場合、完全データ同期中に MongoDB バランサーを無効化します。完全同期が完了し、増分同期が開始された後のみ、再度有効化してください。詳細については、「ApsaraDB for MongoDB バランサーの管理」をご参照ください。
課金
| 同期タイプ | 料金 |
|---|---|
| 完全データ同期 | 無料 |
| 増分データ同期 | 課金概要 |
同期タイプおよびサポートされる操作
完全同期 では、選択した MongoDB コレクションからすべての既存データをターゲット Kafka トピックにコピーします。DTS は DATABASE および COLLECTION オブジェクトをサポートしています。
増分同期 では、完全同期完了後に変更イベントを継続的に配信します。サポートされる操作は、増分同期方法によって異なります。
| 操作 | Oplog | Change Streams |
|---|---|---|
| INSERT | サポート | サポート |
| UPDATE | サポート | サポート |
| DELETE | サポート | サポート |
| CREATE COLLECTION / INDEX | サポート | 非サポート |
| DROP DATABASE / COLLECTION / INDEX | サポート | DROP DATABASE と DROP COLLECTION のみ |
| RENAME COLLECTION | サポート | サポート |
増分同期では、タスク開始後に作成されたデータベースは対象となりません。
必要なデータベースアカウント権限
| データベース | 必要な権限 |
|---|---|
| ソース ApsaraDB for MongoDB | 同期対象データベース、admin データベース、および local データベースに対する読み取り権限 |
アカウント作成手順については、「アカウント管理」をご参照ください。
同期タスクの作成
ステップ 1:データ同期ページを開く
DTS コンソールまたは DMS コンソールのいずれかを使用します。
DTS コンソール
DMS コンソール
手順はDMSコンソールのモードによって異なる場合があります。「シンプルモード」と「ビジネス要件に基づくDMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
ステップ 2:ソースおよびターゲットデータベースの構成
タスクの作成 をクリックし、以下の表に示すパラメーターを構成します。
一般
| パラメーター | 説明 |
|---|---|
| タスク名 | DTS タスクの名前です。DTS が自動的に名前を生成します。タスクを識別しやすいように、意味のある名前を指定してください。名前は一意である必要はありません。 |
ソースデータベース
| パラメーター | 説明 |
|---|---|
| 既存の接続を選択 | ソースインスタンスが DTS に登録済みの場合、ドロップダウンリストから選択してください。DTS が残りのフィールドを自動的に入力します。それ以外の場合は、以下のフィールドを構成してください。DMS コンソールでは、DMS データベースインスタンスの選択 リストから選択します。 |
| データベースタイプ | MongoDB を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンス リージョン | ソース ApsaraDB for MongoDB インスタンスのリージョンを選択します。 |
| Alibaba Cloud アカウント間でのデータ複製 | ソースデータベースが現在の Alibaba Cloud アカウントに属している場合は、いいえ を選択します。 |
| アーキテクチャ | 本例では レプリカセット を選択します。ソースが シャードクラスター の場合、シャードアカウント および シャードパスワード も入力してください。 |
| 移行方法 | 増分データの同期方法です。Oplog は、oplog 機能が有効化されている場合に推奨されます。ChangeStream は、ソースで Change Streams が有効化されている場合に利用可能です。詳細については、「Change Streams」をご参照ください。注: 非エラスティックの Amazon DocumentDB クラスターでは、ChangeStream のみがサポートされます。アーキテクチャ を シャードクラスター に、移行方法 を ChangeStream に設定した場合、シャードアカウント および シャードパスワード フィールドは不要です。 |
| インスタンス ID | ソース ApsaraDB for MongoDB インスタンスのインスタンス ID を選択します。 |
| 認証データベース | アカウント認証情報が格納されているデータベースです。デフォルトは admin です。 |
| データベースアカウント | ソースデータベースのアカウントです。「必要なデータベースアカウント権限」をご参照ください。 |
| データベースパスワード | データベースアカウントのパスワードです。 |
| 暗号化 | 接続暗号化タイプを指定します:暗号化なし、SSL 暗号化、または Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の値によって異なります。注: アーキテクチャ が シャードクラスター で、移行方法 が Oplog の場合、SSL 暗号化は利用できません。自己管理型 MongoDB で レプリカセット アーキテクチャを採用し、SSL 暗号化 を選択した場合、CA 証明書をアップロードして接続を検証してください。 |
ターゲットデータベース
| パラメーター | 説明 |
|---|---|
| 既存の接続を選択 | ターゲット Kafka インスタンスが DTS に登録済みの場合、ドロップダウンリストから選択してください。それ以外の場合は、以下のフィールドを構成してください。 |
| データベースタイプ | Kafka を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | ターゲット Kafka インスタンスのリージョンを選択します。 |
| Kafka インスタンス ID | ターゲット Kafka インスタンス ID を選択します。 |
| 暗号化 | セキュリティ要件に応じて、暗号化なし または SCRAM-SHA-256 を選択します。 |
| トピック | 同期データを受信するトピックを選択します。 |
| Kafka Schema Registry の使用 | Kafka Schema Registry は、Avro スキーマを保存および取得するための RESTful メタデータサービスです。いいえ を選択するとスキップされ、はいアラート通知設定 を選択すると、Schema Registry の URL または IP アドレスを指定してください。 |
ステップ 3:接続性のテスト
ページ下部の 接続性のテストと続行 をクリックします。
許可されている場合、DTS は、ソースおよびターゲットデータベースのセキュリティ設定に、DTS サーバーの CIDR ブロックを自動的に追加します。手動設定については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
Alibaba Cloud インスタンス をアクセス方法として使用しない自己管理型データベースの場合、接続性のテスト を DTS サーバーの CIDR ブロック ダイアログボックス内でクリックしてください。
ステップ 4:同期対象オブジェクトの構成
オブジェクトの構成 ステップで、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 同期タイプ | 増分データ同期 がデフォルトで選択されています。必要に応じて、完全データ同期 も選択できます。スキーマ同期 は利用できません。完全同期が有効化されている場合、DTS はまずすべての既存データをコピーし、その後で増分同期を開始します。 |
| 競合テーブルの処理モード | 事前チェックおよびエラー報告:ターゲットにソースと同じ名前のコレクションが存在する場合、事前チェックが失敗します。名前競合を解決するには、「オブジェクト名マッピング」をご利用ください。エラーを無視して続行:名前競合チェックをスキップします。ターゲットのレコードとソースのレコードでプライマリキーまたは一意キーが一致する場合、ターゲットのレコードが保持されます。 警告 このオプションはデータの不整合を引き起こす可能性があります。 |
| Kafka 内のデータ形式 | サポートされるのは Canal JSON のみです。 |
| Kafka データ圧縮形式 | Kafka に書き込まれるデータの圧縮形式です。オプション: LZ4(デフォルト — 圧縮率が低く、高速)、GZIP(圧縮率が高く、低速、CPU 使用率が高い)、Snappy(バランス型)。 |
| Kafka パーティションへのデータ配信ポリシー | 要件に基づいてパーティションルーティングポリシーを選択します。 |
| メッセージ確認応答メカニズム | 要件に応じて、「メッセージ確認応答メカニズム」を選択してください。 |
| DDL 情報を格納するトピック | DDL イベントを格納するトピックを選択します。空欄のままにした場合、DDL イベントはデータイベントと同じトピックに書き込まれます。 |
| 宛先インスタンスにおけるオブジェクト名の大文字小文字の区別 | 宛先におけるデータベースおよびコレクション名の大文字小文字の区別を制御します。デフォルトは DTS デフォルトポリシー宛先インスタンスにおけるオブジェクト名の大文字小文字の指定 です。「」をご参照ください。 |
| ソースオブジェクト | ソースオブジェクト セクションからオブジェクトを選択し、矢印アイコンをクリックして 選択済みオブジェクト に移動します。コレクション単位の選択のみがサポートされています。 |
ステップ 5:高度な設定の構成
次へ:高度な設定 をクリックし、以下のパラメーターを構成します。
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング専用クラスター | デフォルトでは、DTS はタスクを共有クラスターでスケジュールします。より高い安定性を得るために、専用クラスター を購入してください。 |
| 接続失敗時の再試行時間 | ソースまたはターゲットに到達できない場合の DTS の再試行時間です。範囲:10~1440 分。デフォルト:720 分。このパラメーターは 30 を超える値に設定することを推奨します。同一データベースを共有する複数のタスクで異なる再試行時間を指定した場合、最も短い値が有効になります。再試行時間は課金対象です。 |
| その他の問題発生時の再試行時間 | DDL または DML 操作が失敗した場合の DTS の再試行時間です。範囲:1~1440 分。デフォルト:10 分。最低でも 10 分以上に設定してください。接続失敗時の再試行時間 よりも短く設定する必要があります。 |
| 更新後のドキュメント全体を取得 | 移行方法 が ChangeStream の場合にのみ利用可能です。はい:各更新に対してドキュメント全体を同期しますが、これによりソース負荷が増加し、遅延が発生する可能性があります。DTS がドキュメント全体を取得できない場合、変更されたフィールドのみが送信されます。いいえ:変更されたフィールドのみを同期します。 |
| 完全データ同期のスロットリング有効化 | 有効化した場合、ソースデータベースへの QPS、完全データ移行の RPS、および 完全移行のデータ移行速度(MB/s) を構成して、ターゲットへの負荷を軽減します。完全データ同期 が選択されている場合にのみ利用可能です。 |
| 同期対象データのテーブルにおいて、プライマリキー _id は 1 つのデータ型のみ | 完全データ同期 が選択されている場合にのみ利用可能です。はい:完全同期中に DTS は _id フィールドのデータ型のスキャンをスキップします。いいえ:DTS は _id フィールドのデータ型をスキャンします。 |
| 増分データ同期のスロットリング有効化 | 有効化した場合、増分データ同期の RPS および 増分同期のデータ同期速度(MB/s) を構成して、ターゲットへの負荷を軽減します。 |
| 環境タグ | インスタンスを識別するための任意のタグです。 |
| ETL の構成 | ETL (抽出・変換・書き出し) ロジックを適用するには、有効化します。コードエディタにデータ処理文を入力します。詳細については、「ETL をデータ移行またはデータ同期タスクで設定する」をご参照ください。 |
| モニタリングとアラート | 有効にすると、アラートのしきい値と通知設定を指定します。タスクが失敗した場合、または同期遅延がしきい値を超えた場合、DTS はアラート連絡先に通知します。詳細については、「モニタリングとアラートの設定」をご参照ください。 |
ステップ 6:事前チェックの実行
次へ:タスク設定の保存と事前チェック をクリックします。
この構成の API パラメーターをプレビューするには、ボタンにカーソルを合わせ、OpenAPI パラメーターのプレビュー をクリックしてください。
事前チェックが成功するまで、タスクを開始できません。
事前チェックが失敗した場合、各失敗項目の横にある 詳細の表示 をクリックし、問題を解決した後、再チェック をクリックしてください。
項目に対してアラートが発行された場合:進行する前に、無視できないアラートを修正してください。無視可能なアラートの場合は、アラート詳細の確認 をクリックし、ダイアログで 無視 を選択して確認し、再チェック をクリックしてください。アラートを無視すると、データの不整合が発生する可能性があります。
ステップ 7:インスタンスの購入および開始
成功率 が 100 % に達するのを待ってから、次へ:インスタンスの購入 をクリックします。
購入 ページで、以下のパラメーターを構成します。
パラメーター 説明 課金方法 サブスクリプション:前払い方式。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスをリリースして、課金を停止してください。 リソースグループ設定 同期インスタンスのリソースグループです。デフォルト: デフォルトリソースグループ。「Resource Management とは? インスタンスクラス 同期速度の階層です。「データ同期インスタンスのインスタンスクラス」をご参照ください。 サブスクリプション期間 サブスクリプション 課金に利用可能です。オプション:1~9 か月、1 年、2 年、3 年、または 5 年。 Data Transmission Service(従量課金)サービス利用規約 をお読みになり、同意してください。
購入して開始 をクリックし、確認ダイアログで OK をクリックします。
タスクがタスク一覧に表示されます。そこで進捗状況を監視してください。
コレクションからトピックへのマッピングの構成
デフォルトでは、すべてのコレクションがターゲットデータベース構成で選択したトピックに書き込まれます。特定のコレクションを別のトピックにルーティングするには、以下の手順を実行します。
選択済みオブジェクト エリアで、コレクションレベルのターゲットトピック名の上にポインターを合わせます。
トピック名の横にある 編集 をクリックします。
テーブルの編集 ダイアログで、以下の設定を構成します。
パラメーター 説明 ターゲットトピック名 このコレクションのターゲットトピックです。トピックは Kafka インスタンス内に存在する必要があります。変更した場合、データは新しいトピックに書き込まれます。デフォルト:ターゲットデータベース構成時に選択したトピックです。 フィルター条件 オプションの行フィルターです。「フィルター条件の設定」をご参照ください。 パーティション数 トピックにデータを書き込む際のパーティション数です。 OK をクリックします。
データ配信の例
MongoDB の各増分変更は Canal JSON メッセージとしてシリアル化され、構成済みの Kafka トピックに配信されます。メッセージ構造は、増分同期方法および更新設定によって異なります。
シナリオの選択
| 目的 | 構成 |
|---|---|
| 低遅延同期および完全な DDL サポート | 移行方法 を Oplog に設定(シナリオ 1) |
| 部分的なドキュメント更新を伴う Change Streams | 移行方法 を ChangeStream に、更新後のドキュメント全体を取得 を いいえ に設定(シナリオ 2) |
| 各更新でドキュメント全体を含む Change Streams | 移行方法 を ChangeStream に、更新後のドキュメント全体を取得 を はい に設定(シナリオ 3) |
Canal JSON メッセージのフィールド
すべてのシナリオで、同じトップレベルの Canal JSON エンベロープが使用されます。以下のフィールドがすべてのメッセージに含まれます。
| フィールド | 型 | 説明 |
|---|---|---|
database | string | ソースデータベース名 |
table | string | ソースコレクション名 |
type | string | 操作タイプ:INSERT、UPDATE、DELETE、または DDL |
isDdl | boolean | true は DDL イベント(コレクションの削除、名前変更)用、false は DML イベント用 |
es | number | ソースデータベース内のイベントタイムスタンプ(UNIX ミリ秒) |
ts | number | DTS がイベントを処理したタイムスタンプ(UNIX ミリ秒) |
id | number | DTS 内部イベント ID |
pkNames | array | プライマリキーのフィールド名(通常は ["_id"]) |
data | array | 操作後のドキュメントデータ。部分更新(Oplog または full document 取得なしの ChangeStream)の場合、MongoDB 更新演算子($set、$unset)を使用して変更されたフィールドのみが含まれます。null は DDL イベント用です。 |
old | array | 更新前のドキュメント状態:通常 _id フィールドのみが含まれます。null は INSERT および DDL イベント用です。 |
sql | object または null | DDL 文の詳細(isDdl: true の場合)。null は DML イベント用です。 |
gtid | null | MongoDB には該当せず(MySQL 互換性のために予約)。 |
mysqlType | null | MongoDB には該当しません。 |
serverId | null | MongoDB には該当しません。 |
sqlType | null | MongoDB には該当しません。 |
シナリオ 1:Oplog
移行方法 を Oplog に設定します。
| ソースの変更タイプ | ソース文 | ターゲットトピックで受信されるデータ |
|---|---|---|
insert | db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}}) | 以下に例を示します |
update $set | db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}}) | 以下に例を示します |
update $set new field | db.kafka_test.update({"cid":"a"},{$set:{"salary":100}}) | 以下に例を示します |
update $unset (remove field) | db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}}) | 以下に例を示します |
delete | db.kafka_test.deleteOne({"cid":"a"}) | 以下に例を示します |
ddl drop | db.kafka_test.drop() | 以下に例を示します |
シナリオ 2:ChangeStream — 更新されたフィールドのみ
移行方法 を ChangeStream に設定します。更新後のドキュメント全体を取得 を いいえ に設定します。
INSERT および DELETE メッセージはシナリオ 1 と同一です。UPDATE メッセージには、変更されたフィールドのみが含まれます。
シナリオ 3:ChangeStream — 更新時にドキュメント全体
移行方法 を ChangeStream に設定します。更新後のドキュメント全体を取得 を はい に設定します。
UPDATE イベントでは、変更されたフィールドではなく、変更後のドキュメント全体が配信されます。
特殊ケース:ChangeStream における fullDocument の欠落
ChangeStream の UPDATE イベントの fullDocument フィールドが欠落している場合(たとえば、シャードコレクションでドキュメントがシャード間を移動した場合など)、配信されるメッセージは Oplog の動作($set または $unset 演算子を用いた部分更新)にフォールバックします。
例:fullDocument が欠落しているシャードコレクションの更新
ソースの基本データ:
use admin
db.runCommand({ enablesharding:"dts_test" })ソースの増分変更:
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})よくある質問
タスク作成後に Kafka データ圧縮形式を変更できますか?
はい。 「同期対象オブジェクトの変更」をご参照ください。
タスク作成後にメッセージ確認応答メカニズムを変更できますか?
はい。 「同期対象オブジェクトの変更」をご参照ください。