Data Transmission Service (DTS) は、ApsaraDB for MongoDB レプリカセットまたはシャードクラスターインスタンスから AnalyticDB for PostgreSQL インスタンスへデータを継続的に同期します。このトピックでは、完全データ同期と増分同期について説明します。
前提条件
開始する前に、以下を確認してください。
ソース ApsaraDB for MongoDB インスタンスのデータ総量よりも大きい利用可能なストレージ容量を持つ AnalyticDB for PostgreSQL インスタンスがあること。10% の余裕を確保するために、ソースデータサイズの110% の宛先ストレージをプロビジョニングしてください。インスタンスを作成するには、「インスタンスの作成」をご参照ください。
データを受信するために、宛先インスタンスにプライマリキー列を持つデータベース、スキーマ、およびテーブルが作成されていること。SQL 構文については、「SQL 構文」をご参照ください。
各送信先列のデータの型は、対応する MongoDB フィールドと互換である必要があります。たとえば、
_idフィールドが ObjectId 型の場合、送信先列の型は varchar でなければなりません。送信先列に _id または _value という名前を付けてはいけません。
(シャードクラスターソースの場合) すべてのシャードノードのエンドポイント、ユーザー名、およびパスワードがあること。すべてのシャードアカウントは同じ認証情報を使用する必要があります。シャードエンドポイントを取得するには、「シャードのエンドポイントを申請する」をご参照ください。
課金
| 同期タイプ | 料金 |
|---|---|
| 完全データ同期 | 無料 |
| 増分同期 | 有料。詳細については、「課金概要」をご参照ください。 |
同期タイプ
| 同期タイプ | 説明 |
|---|---|
| 完全データ同期 | DTS は、ソースインスタンス内の選択されたコレクションから宛先インスタンスへ既存のすべてのデータをコピーします。 |
| 増分同期 | 完全データ同期が完了すると、DTS はソースから送信先へ挿入、更新、および削除操作を継続的に適用します。ファイル増分データの場合、$set コマンドのみがサポートされています。 |
必要な権限
DTS で利用可能な増分同期方法は、データベースアカウントに付与された権限によって異なります。
| データベース | 必要な権限 | 増分同期方法 | リファレンス |
|---|---|---|---|
| ソース ApsaraDB for MongoDB インスタンス | ソース、admin、およびlocal データベースに対する読み取り権限 | Oplog (低遅延同期に推奨) または ChangeStream | MongoDB データベースユーザーの権限を管理する |
| 宛先 AnalyticDB for PostgreSQL インスタンス | ターゲットデータベースに対する読み取りおよび書き込み権限。初期アカウントまたは RDS_SUPERUSER 権限を持つアカウントを使用します。 | N/A | データベースアカウントの作成と管理 および ユーザーと権限の管理 |
Oplog と ChangeStream の選択:
| 方法 | 要件 | 備考 |
|---|---|---|
| Oplog(推奨) | Oplog が有効で、少なくとも 7 日分のデータを保持していること | セルフマネージド MongoDB および ApsaraDB for MongoDB のいずれもデフォルトで有効です。ログのプルが高速なため、レイテンシーが低くなります。 |
| ChangeStream | MongoDB 4.0 以降、かつ Change Streams が有効であること | 双方向同期には対応していません。エラスティックでない Amazon DocumentDB クラスターでは必須です(移行方法 を ChangeStream に、アーキテクチャー を シャードクラスター に設定してください)。 |
制限事項
ソースデータベースの制限
| 制限 | 詳細 |
|---|---|
| 帯域幅 | ソースデータベースサーバーは十分なアウトバウンド帯域幅を持っている必要があります。そうでない場合、同期速度に影響します。 |
| コレクション名マッピング | コレクションの名前マッピングを構成する場合、単一タスクは最大1,000のコレクションをサポートします。より多くのコレクションの場合は、複数のタスクを作成するか、データベース全体を同期してください。 |
_idシャードクラスター: `_id` の一意性 | _id各コレクションの `_id` フィールドは一意である必要があります。そうでない場合、データの不整合が発生する可能性があります。 |
| シャードクラスター: Mongos ノードの制限 | Mongos ノードの数は10を超えることはできません。 |
| シャードクラスター: 孤立したドキュメント | ソースインスタンスには孤立したドキュメントが含まれていてはなりません。孤立したドキュメントはデータの不整合とタスクの失敗を引き起こします。詳細については、MongoDB ドキュメント および 孤立したドキュメントを削除するにはどうすればよいですか? |
| サポートされていないソースタイプ | スタンドアロン ApsaraDB for MongoDB インスタンス、Azure Cosmos DB for MongoDB クラスター、および Amazon DocumentDB エラスティッククラスターはソースとしてサポートされていません。DTS は SRV エンドポイントを介して MongoDB データベースに接続できません。 |
| Oplog またはチェンジストリームの保持 | oplog を有効にし、少なくとも7日間のログデータを保持するか、チェンジストリームを有効にする必要があります。DTS が過去7日間の変更にアクセスできない場合、同期は失敗し、データの不整合または損失が発生する可能性があります。これは DTS SLA の対象外です。 |
同期中の制限:
完全データ同期中、データベースまたはコレクションのスキーマを変更したり、ARRAY 型のデータを変更したりしないでください。そうすると、タスクの失敗またはデータの不整合が発生します。
ソースがシャードクラスターの場合、タスク実行中にデータ分布を変更するコマンド—
shardCollection、reshardCollection、unshardCollection、moveCollection、またはmovePrimary—を実行しないでください。これにより、データの不整合が発生します。シャードクラスターソースでバランサーがアクティブな場合、同期中に遅延が増加する可能性があります。
その他の制限
| 制限事項 | 詳細 |
|---|---|
| オブジェクトの種類 | 同期対象として選択できるのは、コレクションのみです。 |
| プライマリキーの要件 | 送信先テーブルには、単一列のユニークなプライマリキー(複合プライマリキーは不可)を設定する必要があります。bson_value("_id") 式を、プライマリキー列に割り当てます。 |
| 予約済みのカラム名 | 送信先テーブルには、_id または _value という名前のカラムを含めることはできません。 |
| 追加最適化テーブル(AO テーブル) | 送信先テーブルは、追加最適化(AO)テーブルであってはなりません。 |
| 除外されるデータベース | DTS は、admin、config、および local データベースからのデータ同期を行いません。 |
| トランザクション | トランザクション情報は保持されません。トランザクションは、個別のレコードとして送信先に書き込まれます。 |
| FLOAT/DOUBLE の精度 | DTS は、FLOAT および DOUBLE カラムから値を取得する際に ROUND(COLUMN,PRECISION) を使用します。FLOAT のデフォルト精度は 38 桁、DOUBLE のデフォルト精度は 308 桁です。ご要件に適合しているかご確認ください。 |
| タスクの自動再開 | DTS は、失敗したタスクを最大 7 日間、自動的に再開しようと試行します。ワークロードを送信先に切り替える前に、失敗したタスクを停止またはリリースするか、送信先に対する DTS の書き込み権限を削除するために REVOKE を実行してください。これを実施しない場合、自動再開後にソースデータが送信先データを上書きする可能性があります。 |
| 遅延の計算 | DTS は、最新の同期済みデータタイムスタンプに基づいて増分同期の遅延を算出します。ソースで長期間書き込みが行われない場合、報告される遅延値は不正確になる可能性があります。遅延の測定値を更新するには、ソースに対して更新操作を実行してください。 |
| DTS タスクの障害発生時 | DTS タスクが失敗した場合、DTS サポートが 8 時間以内に復旧を試みます。復旧中、タスクが再起動される場合や、タスクパラメーターが変更される場合がありますが、データベースのパラメーターは変更されません。 |
同期タスクの作成
ステップ1: データ同期ページを開く
次のいずれかの方法を使用します。
DTS コンソール
DTS コンソールにログインします。
左側のナビゲーションウィンドウで、[データ同期] をクリックします。
左上隅で、同期タスクが実行されるリージョンを選択します。
DMS コンソール
手順は、DMS コンソールモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソールにログインします。
上部ナビゲーションバーで、Data + AI の上にポインターを移動し、[DTS (DTS)] > [データ同期] を選択します。
[データ同期タスク] の横にあるドロップダウンリストから、同期インスタンスが存在するリージョンを選択します。
ステップ2: ソースおよび宛先データベースを構成する
[タスクの作成] をクリックして、以下のパラメーターを設定します。
| セクション | パラメーター | 説明 |
|---|---|---|
| 該当なし | タスク名 | タスクを識別するための説明的な名前を入力します。この名前は一意である必要はありません。このフィールドをスキップした場合、DTS が自動的に名前を生成します。 |
| ソースデータベース | 既存の接続を選択 | ソースインスタンスがすでに DTS に登録済みの場合、ドロップダウンリストから該当インスタンスを選択してください。選択後、DTS が残りのパラメーターを自動的に設定します。それ以外の場合は、以下のパラメーターを手動で設定してください。DMS コンソールで、「DMS データベースインスタンスの選択」から対象インスタンスを選択します。 |
| データベースタイプ | MongoDB を選択します。 | |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 | |
| インスタンスリージョン | ソース ApsaraDB for MongoDB インスタンスが配置されているリージョンを選択します。 | |
| Alibaba Cloud アカウント間でのデータ複製 | 現在の Alibaba Cloud アカウント配下のインスタンスを使用する場合は、いいえ を選択します。 | |
| アーキテクチャ | ソースインスタンスのアーキテクチャを選択します:レプリカセット または シャードクラスター。 シャードクラスター を選択した場合は、シャードアカウント および シャードパスワード も併せて設定してください。 | |
| 移行方法 | DTS がソースから増分データを読み取る方法を選択します:Oplog(oplog が有効な場合に推奨)または ChangeStream(MongoDB 4.0 以降、または Amazon DocumentDB の非エラスティッククラスター向け)。アーキテクチャ で シャードクラスター を選択し、移行方法 で ChangeStream を選択した場合、シャード認証情報は不要です。 | |
| インスタンス ID | ソース ApsaraDB for MongoDB インスタンスの ID を選択します。 | |
| 認証データベース | 認証データベースの名前を入力します。デフォルト値は admin です。 | |
| データベースアカウント | データベースアカウントを入力します。必要な権限については、「必要な権限」をご参照ください。 | |
| データベースパスワード | データベースアカウントのパスワードを入力します。 | |
| 暗号化 | 要件に応じて、暗号化なし、SSL 暗号化、または Mongo Atlas SSL を選択します。利用可能なオプションは、アクセス方法 および アーキテクチャ によって異なります。DTS コンソールに表示されるオプションが優先されます。 説明 アーキテクチャ が シャードクラスター かつ 移行方法 が Oplog の場合、SSL 暗号化 は利用できません。また、SSL 暗号化 を選択した自己管理型 MongoDB レプリカセットでは、CA 証明書をアップロードして接続を検証する必要があります。 | |
| 宛先データベース | 既存の接続を選択 | 宛先インスタンスがすでに DTS に登録済みの場合、ドロップダウンリストから該当インスタンスを選択してください。選択後、DTS が残りのパラメーターを自動的に設定します。それ以外の場合は、以下のパラメーターを手動で設定してください。DMS コンソールで、「DMS データベースインスタンスの選択」から対象インスタンスを選択します。 |
| データベースタイプ | AnalyticDB for PostgreSQL を選択します。 | |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 | |
| インスタンスリージョン | 宛先 AnalyticDB for PostgreSQL インスタンスが配置されているリージョンを選択します。 | |
| インスタンス ID | 宛先 AnalyticDB for PostgreSQL インスタンスの ID を選択します。 | |
| データベース名 | 同期データを受信する宛先インスタンス内のデータベース名を入力します。 | |
| データベースアカウント | データベースアカウントを入力します。必要な権限については、「必要な権限」をご参照ください。 | |
| データベースパスワード | データベースアカウントのパスワードを入力します。 |
ステップ3: 接続性をテストする
ページの下部で、[接続性のテストと続行] をクリックします。
DTS は、ソースおよびターゲットデータベースのセキュリティ設定にその CIDR ブロックを自動的に追加します。詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。
ソースまたはターゲットデータベースがセルフマネージドであり、アクセス方式がAlibaba Cloudインスタンスではない場合、[DTSサーバーのCIDRブロック] ダイアログボックスで、[接続テスト] をクリックします。
ステップ4: 同期するオブジェクトを構成する
[オブジェクトの構成] ステップで、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 同期タイプ | デフォルトでは、増分データ同期が選択されています。オプションで、[完全データ同期]を選択できます。スキーマ同期はサポートされていません。 |
| 同期する DDL および DML 操作 | 増分同期中にインスタンスレベルで同期する DDL 操作および DML 操作を選択します。コレクションレベルで設定するには、[選択されたオブジェクト] セクションでコレクションを右クリックします。 |
| 競合するテーブルの処理モード | [事前チェックとエラー報告] (デフォルト): 事前チェックは、宛先テーブルがソースコレクションと名前を共有していないことを検証します。重複する名前が存在する場合、タスクを開始できません。オブジェクト名マッピングを使用してオブジェクトの名前を変更します。詳細については、「オブジェクト名をマッピングする」をご参照ください。[エラーを無視して続行]: 重複名事前チェックをスキップします。 警告 これにより、データの不整合が発生する可能性があります。完全データ同期中、同じプライマリキーを持つレコードが宛先に存在する場合、DTS は既存の宛先レコードを保持します。増分データ同期中、DTS は既存の宛先レコードを上書きします。 |
| ソースオブジェクト | [ソースオブジェクト] からコレクションを選択し、 |
| 選択されたオブジェクト | 以下の手順で説明されているように、データベース名マッピング、テーブル名マッピング、およびフィールドマッピングを構成します。 |
データベース名マッピングを構成する:
[選択されたオブジェクト] で、同期するコレクションが含まれているデータベースを右クリックします。

「[データベース名]」を、送信先の AnalyticDB for PostgreSQL インスタンス内のスキーマの名前に変更します。

(オプション) [同期する DDL および DML 操作を選択] で、増分データ同期中に同期する操作を選択します。

[OK] をクリックします。
テーブル名マッピングを構成する:
[選択されたオブジェクト] で、コレクションを右クリックします。

[テーブル名] を、宛先インスタンス内のテーブルの名前に変更します。

(オプション) データの一部を同期するためのフィルター条件を指定します。詳細については、「フィルター条件を指定する」をご参照ください。

(オプション) [同期する DDL および DML 操作を選択] で、増分データ同期中に同期する操作を選択します。

フィールドマッピングを構成する:
DTS は、コレクション内の各フィールドに対して、デフォルトの bson_value() 式を生成します。次のようにマッピングを確認し、更新してください。
「[値の割り当て]」列では、
bson_value()式が引用符で囲まれたソースフィールド名を表示します。たとえば、bson_value("age")は、MongoDB のageフィールドをマップします。(オプション) 行の横にある
アイコンをクリックして、同期する必要がないフィールドを削除します。デフォルトの式が適合するかどうかに基づいてマッピングを構成します。
準拠する式を持つフィールド
[列名] を送信先列の名前に設定します。
各列の [タイプ] を選択します。タイプがソース フィールドと互換であることを確認してください。
(任意) [長さ] および [精度] を設定します。
すべての列に対して繰り返します。
準拠しない式を持つフィールド
説明たとえば、親子関係を持つネストされたフィールドの場合。
行の横にある
アイコンをクリックします。[+ 列の追加] をクリックします。

[列名]、[タイプ]、[長さ]、および[精度]を設定します。
bson_value()式を [値の割り当て] に入力します。ネストされたフィールドの場合、階層の各レベルを指定します(「フィールドマッピングの例」を参照)。重要送信先テーブルのプライマリキー列に
bson_value("_id")を割り当てます。各列に対して、
bson_value()式で完全なフィールドパスを指定します。最上位レベルのフィールドのみを指定した場合(例:bson_value("person"))は、DTS がサブフィールドに増分データを書き込まなくなります。
すべての列に対して繰り返します。
[OK] をクリックします。
ステップ5: 詳細設定を構成する
[次へ: 詳細設定] をクリックして、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング用の専用クラスター | デフォルトでは、DTS は共有クラスターを使用します。安定性を向上させるには、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。 |
| 失敗した接続の再試行時間 | DTS が失敗した接続を再試行する時間範囲。有効な値: 10~1,440分。デフォルト: 720分。これを少なくとも30分に設定してください。同じソースまたは宛先が複数のタスクで共有されている場合、最短の構成された再試行時間が優先されます。DTS は再試行期間中、インスタンスに対して課金します。 |
| その他の問題の再試行時間 | DTS が失敗した DDL または DML 操作を再試行する時間範囲。有効な値: 1~1,440分。デフォルト: 10分。これを少なくとも10分に設定してください。この値は [失敗した接続の再試行時間] よりも小さくする必要があります。 |
| 完全データ同期のスロットリングを有効にする | ソースデータベースへの1秒あたりのクエリ数 (QPS)、完全データ移行の RPS、および完全移行のデータ移行速度 (MB/秒) を設定することで、完全データ同期中のリソース使用量を制限します。[完全データ同期] が選択されている場合にのみ利用可能です。 |
| 同期するデータのテーブル内のプライマリキー `_id` のデータ型は1つのみ | コレクション内のすべてのドキュメントで _id フィールドが単一のデータの型を持つかどうかを指定します。 はい: DTS はデータの型のスキャンをスキップし、コレクションごとに 1 つの _id 型のドキュメントのみを同期します。 いいえ: DTS はすべての _id データの型をスキャンし、すべてのドキュメントを同期します。 [フルデータ同期] が選択されている場合にのみ使用できます。 |
| 増分データ同期のスロットリングを有効にする | 増分データ同期の RPS および増分同期のデータ同期速度 (MB/秒) を設定することで、増分データ同期中のリソース使用量を制限します。 |
| 環境タグ | DTS インスタンスに環境ラベルをタグ付けします。オプション。 |
| ETL を構成する | 抽出・変換・書き出し (ETL) 機能を有効にして、同期中にデータ変換を適用します。詳細については、「ETL とは」および「データ移行またはデータ同期タスクで ETL を構成する」をご参照ください。 |
| モニタリングとアラート | タスクが失敗した場合、または同期遅延がしきい値を超過した場合に連絡先に通知するアラートを構成します。詳細については、「DTS タスクを作成する際のモニタリングとアラートの構成」をご参照ください。 |
ステップ6: 事前チェックを実行する
[次へ:タスク設定の保存と事前チェック] をクリックします。
このタスク構成の API パラメーターを表示するには、ボタンにポインターを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。
DTS はタスクを開始する前に事前チェックを実行します。タスクは事前チェックが合格した後にのみ開始されます。
事前チェックが失敗した場合、各失敗した項目の横にある[詳細の表示]をクリックし、問題を解決して、事前チェックを再度実行してください。
事前チェックでアラートがトリガーされた場合:アラートを無視できない場合は、これを解決して再実行します。無視できる場合は、[アラートの詳細を確認] > [無視] > [OK] をクリックし、その後 [再び事前チェック] をクリックします。
ステップ7: インスタンスを購入して開始する
[成功率] が [100%] になるまで待ってから、[次へ:インスタンスの購入] をクリックします。
[購入] ページで、以下のパラメーターを設定します。
| セクション | パラメーター | 説明 |
|---|---|---|
| 新しいインスタンスクラス | 課金方法 | [サブスクリプション]: 固定期間の前払い。長期利用に費用対効果が高い。[従量課金]: 時間単位で課金されます。短期利用に適しています。不要になったらインスタンスをリリースして課金を停止します。 |
| リソースグループ設定 | インスタンスのリソースグループを選択します。デフォルト: デフォルトリソースグループ。詳細については、「Resource Management とは | |
| インスタンスクラス | 必要な同期速度に基づいてインスタンスクラスを選択します。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。 | |
| サブスクリプション期間 | サブスクリプション期間と作成するインスタンス数を指定します。1~9か月、または1、2、3、5年間利用可能です。[サブスクリプション] 課金方法でのみ利用可能です。 |
読み取り、選択します。[Data Transmission Service(従量課金)利用規約]
[購入および開始] をクリックします。ダイアログボックスで、[OK] をクリックします。
タスクはタスクリストに表示されます。そこで同期進行状況を監視できます。
フィールドマッピングの例
この例は、ネストされた MongoDB ドキュメントから AnalyticDB for PostgreSQL のフラット列へフィールドをマッピングする方法を示しています。
ソースドキュメントの構造:
{
"_id": "62cd344c85c1ea6a2a9f****",
"person": {
"name": "neo",
"age": 26,
"sex": "male"
}
}宛先テーブルスキーマ:
| 列名 | タイプ | 注意事項 |
|---|---|---|
| mongo_id | varchar | プライマリキー列 |
| person_name | varchar | |
| person_age | decimal |
DTS でのフィールドマッピング構成:
| 列名 | タイプ | 値の割り当て |
|---|---|---|
| mongo_id | STRING | bson_value("_id") |
| person_name | STRING | bson_value("person","name") |
| person_age | DECIMAL | bson_value("person","age") |
同期後、宛先テーブルには以下が含まれます:
| mongo_id | person_name | person_age |
|---|---|---|
| 62cd344c85c1ea6a2a9f**** | neo | 26 |
ネストされたフィールドに対しては、常に bson_value() で完全なフィールド パスを指定する必要があります。代わりに bson_value("person") を使用して bson_value("person","name") を使用しない場合、person オブジェクト全体が単一の値としてマップされ、DTS が個々のサブフィールドに対して増分更新を書き込むことを防ぎます。