Data Transmission Service (DTS) を使用して、ApsaraDB for MongoDB レプリカセットインスタンスの増分変更を、Function Compute 関数に直接ストリーミングします。各 INSERT、UPDATE、DELETE、DDL イベントは Canal JSON 形式で配信されるため、関数コードでデータを処理・変換・送信先へ転送できます。別途コンシューマー層を構築する必要はありません。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
ApsaraDB for MongoDB レプリカセットインスタンス。詳細については、「レプリカセットインスタンスの作成」をご参照ください。
Function Compute サービスおよび、「ハンドラータイプ」が [イベントハンドラー] に設定された関数。詳細については、「関数をすばやく作成する」をご参照ください。
課金
増分データ同期は有料機能です。詳細については、「課金概要」をご参照ください。
同期タイプ | タスク構成料金 |
増分データ同期 | 課金対象です。詳細については、「課金概要」をご参照ください。 |
制限事項
ソースデータベース
| 制約事項 | 詳細 |
|---|---|
| 帯域幅 | ソースデータベースをホストするサーバーには、十分なアウトバウンド帯域幅が必要です。そうでない場合、同期速度に影響が出ます。 |
| 主キーまたは一意キー | 同期対象のコレクションには、主キー制約 (PRIMARY KEY constraint) または一意制約 (UNIQUE constraint) が設定されている必要があります。また、すべてのフィールド値が一意である必要があります。この条件を満たさないと、ターゲット関数で重複レコードが受信される可能性があります。 |
| 単一レコードサイズ | レコードあたり最大 16 MB です。これより大きいレコードでは DTS がエラーを報告します。抽出・変換・書き出し (ETL) 機能を使用して、大規模なフィールドを除外してください。 |
| コレクション数 | タスクあたり最大 1,000 コレクションです。1,000 を超えるコレクションを同期する場合は、複数のタスクを構成するか、データベースレベルでの同期を実行してください。 |
| 非対応ソース | Azure Cosmos DB for MongoDB クラスターおよび Amazon DocumentDB エラスティッククラスターはサポートされていません。 |
| oplog / change streams | ソースデータベースでは oplog を有効化し、少なくとも 7 日分のログデータを保持する必要があります。または、過去 7 日分をカバーするように change streams を有効化する必要があります。いずれの条件も満たされない場合、DTS が変更を検知できない可能性があり、データの不整合または損失を引き起こすことがあります。これは DTS のサービスレベルアグリーメント (SLA) の対象外です。 |
| change streams に対応する MongoDB バージョン | change streams は MongoDB 4.0 以降を必要とします。change streams を使用した双方向同期はサポートされていません。 |
| Amazon DocumentDB(非エラスティック) | change streams を有効化し、移行方法 を ChangeStream に、アーキテクチャ を シャードクラスター に設定します。 |
| SRV エンドポイント | DTS は SRV エンドポイント経由で MongoDB に接続できません。 |
その他の制限事項
DTS は
admin、config、localデータベースを同期できません。完全データ同期はサポートされていません — 増分同期のみ対応しています。
クロスリージョン同期はサポートされていません。
オブジェクト名マッピング機能はサポートされていません。
データエラーを回避するため、1 つのターゲット関数に対して DTS タスクは 1 つだけ割り当ててください。
トランザクションコンテキストは保持されません。各トランザクションは個別のレコードに変換されます。
DTS タスクが失敗した場合、DTS テクニカルサポートが 8 時間以内に復旧を試みます。復旧中、タスクは再起動され、タスクパラメーター(データベースパラメーターではない)が変更される場合があります。
特殊ケース(自己管理 MongoDB)
タスク実行中にプライマリ/セカンダリ スイッチオーバーが発生すると、タスクが失敗します。
同期遅延は、最新の同期済みレコードのタイムスタンプと現在のソースタイムスタンプとの差分で算出されます。ソースで長期間書き込みが行われていない場合、報告される遅延値は不正確になる可能性があります。遅延値を更新するには、ソースデータベースにレコードを書き込んでください。
データベース全体を同期する場合は、ハートビートテーブルを作成してください。DTS はハートビートテーブルを毎秒更新するため、遅延値の精度が保たれます。
サポートされる操作
DTS が同期する操作は、選択した 移行方法 によって異なります。
| 操作 | oplog | change streams |
|---|---|---|
| INSERT | 対応 | 対応 |
| UPDATE | 対応 | 対応 |
| DELETE | 対応 | 対応 |
| CREATE COLLECTION | 対応 | 非対応 |
| CREATE INDEX | 対応 | 非対応 |
| DROP DATABASE | 対応 | 対応 |
| DROP COLLECTION | 対応 | 対応 |
| DROP INDEX | 対応 | 非対応 |
| RENAME COLLECTION | 対応 | 対応 |
oplog を使用する場合
DTS タスクは、タスク開始後に作成されたデータベースからの増分データを同期しません。DTS が同期する増分データは、以下の操作によって生成されます。
CREATE COLLECTION および CREATE INDEX
データベース、コレクション、およびインデックス の DROP
RENAME COLLECTION
コレクション内のドキュメントに対する挿入、更新、削除操作
説明ドキュメントの増分データを同期する場合、
$setコマンドを使用した更新操作のみが対応します。
change streams を使用する場合
DTS が同期する増分データは、以下の操作によって生成されます。
DROP DATABASE および DROP COLLECTION
RENAME COLLECTION
コレクション内のドキュメントに対する挿入、更新、削除操作
説明ドキュメントの増分データを同期する場合、
$setコマンドを使用した更新操作のみが対応します。
ファイルの増分データを同期する場合、$set コマンドのみが対応します。タスク開始後に作成されたデータベースの増分データは同期されません(oplog モードに適用)。必要なデータベースアカウント権限
| データベース | 必要な権限 | 参考情報 |
|---|---|---|
| ソース MongoDB インスタンス | ソース、admin、および local データベースの読み取り | MongoDB データベースユーザーの権限管理 |
同期タスクの作成
ステップ 1:データ同期ページを開く
以下のいずれかのコンソールを使用します。
DTS コンソール
DMS コンソール
具体的なナビゲーション手順は、DMSコンソールモードとレイアウトによって異なります。「シンプルモード」および「DMSコンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
ステップ 2:ソースおよびターゲットデータベースの構成
タスクの作成 をクリックし、以下の表に記載されているパラメーターを構成します。
一般
| パラメーター | 説明 |
|---|---|
| タスク名 | DTS タスクの名前です。DTS がデフォルト名を生成します。タスクを識別しやすいように、意味のある名前を指定してください。名前の重複は許可されます。 |
ソースデータベース
| パラメーター | 説明 |
|---|---|
| 既存の接続を選択 | ドロップダウンリストから登録済みのデータベースインスタンスを選択するか、手動で接続を構成します。 |
| データベースタイプ | MongoDB を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | ソース MongoDB インスタンスが配置されているリージョンです。 |
| Alibaba Cloud アカウント間でのデータ複製 | 同一アカウント内での同期の場合は、いいえ を選択します。 |
| アーキテクチャ | レプリカセット を選択します。 |
| 移行方法 | DTS がソースから増分データを読み取る方法。「[Oplog]」(推奨):ソースで oplog 機能が有効になっている場合に利用可能です。Oplog は高速なログプルにより低遅延を実現します。「[ChangeStream]」:チェンジストリームが有効になっている場合に利用可能です。詳細については、「Change Streams」をご参照ください。[シャードクラスタ] を [アーキテクチャ] で選択した場合、[シャードアカウント] パラメーターおよび [シャードパスワード] パラメーターは不要です。 |
| インスタンス ID | ソース MongoDB インスタンスの ID です。 |
| 認証データベース | アカウントおよびパスワードを格納するデータベースです。デフォルト値:admin。 |
| データベースアカウント | 必要な読み取り権限を持つアカウントです。 |
| データベースパスワード | データベースアカウントのパスワードです。 |
| 暗号化 | 接続暗号化: 暗号化なし、SSL 暗号化、または Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の値によって異なります。アーキテクチャ が シャードクラスター で、移行方法 が Oplog の場合、SSL 暗号化は利用できません。Alibaba Cloud インスタンスアクセスを使用しない自己管理 MongoDB レプリカセットの場合、SSL 暗号化 を選択した際に CA 証明書をアップロードできます。 |
ターゲットデータベース
| パラメーター | 説明 |
|---|---|
| 既存の接続を選択 | ドロップダウンリストから登録済みのインスタンスを選択するか、手動で構成します。 |
| データベースタイプ | Function Compute を選択します。 |
| アクセス方法 | Alibaba Cloud インスタンス を選択します。 |
| インスタンスリージョン | デフォルトでソースと同じリージョンになります。変更できません。 |
| サービス | ターゲット関数を含む Function Compute サービスです。 |
| 関数 | 同期データを受信するターゲット関数です。 |
| サービスバージョンおよびエイリアス | データを受信する関数のバージョンを決定します。デフォルトバージョン を選択すると、サービスバージョンが LATEST に固定されます。指定バージョン を選択した場合は、サービスバージョン パラメーターを構成する必要があります。指定エイリアス を選択した場合は、サービスエイリアス パラメーターを構成する必要があります。Function Compute の用語については、「用語」をご参照ください。 |
ステップ 3:接続性のテスト
ページ下部の 接続性のテストと続行 をクリックします。
DTS サーバーの CIDR ブロックを、ソースおよびターゲットデータベースのセキュリティ設定に追加する必要があります。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。ソースまたはターゲットが、[Alibaba Cloud Instance] アクセスを使用しない自己管理データベースである場合、[接続テスト] を [DTS サーバーの CIDR ブロック] ダイアログボックスでクリックします。
ステップ 4:同期対象オブジェクトの構成
[オブジェクトの設定] ステップで、以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 同期タイプ | 固定値:増分データ同期。 |
| データ形式 | 固定値:Canal Json。フィールドの説明については、「Kafka クラスターのデータ形式」をご参照ください。 |
| ソースオブジェクト | データベースまたはコレクションを選択し、右向き矢印アイコンをクリックして 選択済みオブジェクト に移動します。 |
| 選択済みオブジェクト | 同期対象のオブジェクトを確認します。オブジェクトを削除するには、該当オブジェクトを選択し、左向き矢印アイコンをクリックします。データベースレベルまたはコレクションレベルの同期を構成するには、選択済みオブジェクト 内で右クリックします。 |
次へ:高度な設定 をクリックします。
ステップ 5:高度な設定の構成
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング専用クラスター | デフォルトでは、DTS はタスクを共有クラスターにスケジュールします。安定性を向上させるには、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。 |
| 接続失敗時の再試行時間 | ソースまたはターゲットデータベースに到達不能な場合の DTS の再試行時間です。有効範囲:10~1,440 分。デフォルト値:720 分。30 分を超える値を設定してください。複数のタスクが同一データベースを共有する場合、最も短い再試行時間が適用されます。再試行期間中は、DTS インスタンスに対して課金されます。 |
| その他の問題発生時の再試行時間 | DDL または DML 操作が失敗した場合の DTS の再試行時間です。有効範囲:1~1,440 分。デフォルト値:10 分。10 分を超える値を設定してください。この値は 接続失敗時の再試行時間 よりも短くする必要があります。 |
| 更新後の完全ドキュメントの取得 | 移行方法 が ChangeStream の場合にのみ利用可能です。はい:更新後に完全なドキュメントを同期します。いいえ:変更されたフィールドのみを同期します。 |
| 増分データ同期のスロットリングを有効化 | スループットを制限してターゲットの負荷を軽減します。増分データ同期の RPS および 増分同期のデータ同期速度 (MB/s) を構成します。 |
| 環境タグ | DTS インスタンスを識別するための任意のタグです。 |
| ETL の構成 | 送信先に到達する前にデータをフィルターまたは変換するには、ETL 機能を有効化します。詳細については、「ETL とは」および「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。 |
| モニタリングとアラート | タスク失敗または遅延がしきい値を超えた場合にアラートを設定します。詳細については、「DTS タスク作成時のモニタリングとアラートの設定」をご参照ください。 |
ステップ 6:事前チェックの実行
次へ:タスク設定の保存と事前チェック をクリックします。
このタスク構成の OpenAPI パラメーターをプレビューするには、次へ:タスク設定の保存と事前チェック にポインターを合わせ、OpenAPI パラメーターのプレビュー をクリックします。
DTS はタスク開始前に事前チェックを実行します。事前チェックが失敗した場合:
各失敗項目について、詳細の表示 をクリックして問題を解決し、その後 再チェック をクリックします。
無視可能なアラート項目については、アラート詳細の確認 > 無視 > OK > 再チェック の順にクリックします。アラートを無視すると、データの不整合が発生する可能性があります。
ステップ 7:インスタンスの購入
成功率 が 100% になるまで待ち、その後 次へ:インスタンスの購入 をクリックします。
購入ページで、以下のパラメーターを構成します。
| パラメーター | 説明 |
|---|---|
| 課金方法 | サブスクリプション:前払い方式。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスを解放して、課金を回避してください。 |
| リソースグループ設定 | インスタンスのリソースグループです。デフォルト値:デフォルトリソースグループResource Management とは |
| インスタンスクラス | 同期スループットクラス。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。 |
| サブスクリプション期間 | 課金方法 が サブスクリプション の場合に利用可能です。選択肢:1~9 か月、または 1、2、3、5 年です。 |
Data Transmission Service (従量課金) サービス利用規約 に同意します。
購入して開始 をクリックし、ダイアログボックスで OK をクリックします。
タスクがタスク一覧に表示されます。そこから進捗状況を監視できます。
次のステップ
レコードが 16 MB を超える場合、タスクでエラーが報告されます。同期オブジェクトを変更するか、ETL を使用して large なレコードをフィルターします。「既存のデータ同期タスクの ETL 構成を変更する」および「同期対象のオブジェクトを変更する」をご参照ください。
受信データを処理するための関数コードを作成してください。詳細については、「概要」をご参照ください。
関数が受信するデータ形式
DTS はデータを Object として関数に配信します。増分レコードは Records フィールドに配列として格納されます。配列の各要素は、以下のフィールドを持つ Object です。
関数は、2 種類の操作を受信します。
DDL:スキーマ変更 — CreateIndex、CreateCollection、DropIndex、DropCollection。
DML:データ変更 — INSERT、UPDATE、DELETE。
| フィールド | 型 | 説明 |
|---|---|---|
isDdl | ブール値 | True(DDL の場合)、False(DML の場合)。 |
type | 文字列 | DML: DELETE、UPDATE、または INSERT。DDL: DDL。 |
database | 文字列 | MongoDB のデータベース名です。 |
table | 文字列 | コレクション名です。 |
pkNames | 文字列 | プライマリキー名です。MongoDB の場合は常に _id です。 |
es | Long | ソースデータベースで操作が実行されたときの UNIX タイムスタンプ(ミリ秒単位)です。 |
ts | Long | DTS がターゲットへの書き込みを開始したときの UNIX タイムスタンプ(ミリ秒単位)です。 |
data | Object 配列 | 1 要素の配列です。要素には doc というキーと、JSON 文字列を値とするペアが含まれます。値をデシリアライズしてレコードを取得します。 |
old | Object 配列 | 元のデータが格納される配列です。フィールドの形式は data フィールドと同じです。type フィールドの値が UPDATE の場合にのみ利用可能です。 |
id | Int | 操作のシリアル番号です。 |
DDL の例
コレクションの作成
コレクションの削除
インデックスの作成
インデックスの削除
DML の例
データの挿入
SQL ステートメント:
// 複数のレコードを一度に挿入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 1 件ずつレコードを挿入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})関数が受信するデータ:
{
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}データの更新
SQL ステートメント:
db.user.update({"name":"jack"},{$set:{"age":30}})関数が受信するデータ:
{
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}データの削除
SQL ステートメント:
db.user.remove({"name":"jack"})関数が受信するデータ:
{
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}