Data Transmission Service (DTS) は、ApsaraDB for MongoDB のシャードクラスターから増分変更をキャプチャし、各変更イベントを Canal JSON フォーマットで Function Compute (FC) の関数に配信します。関数コードを記述して、イベントを処理、変換、またはダウンストリームに転送します。
関数が受信する内容
各呼び出しは、Records 配列を持つオブジェクトを渡します。配列内のすべての要素は、1 つの変更イベントを表します。
| フィールド | タイプ | 説明 |
|---|---|---|
isDdl | ブール値 | True は DDL 操作用です。 False は DML 操作用です。 |
type | 文字列 | DML: INSERT、UPDATE、または DELETE。DDL: DDL |
database | 文字列 | MongoDB データベース名 |
table | 文字列 | コレクション名 |
pkNames | 文字列 | プライマリキー名。MongoDB の場合は常に _id です |
es | Long | 13 桁の UNIX タイムスタンプ (ミリ秒) — ソースで操作が発生した時刻 |
ts | Long | 13 桁の UNIX タイムスタンプ (ミリ秒) — DTS が宛先への書き込みを開始した時刻 |
data | オブジェクト配列 | 1 つの要素を持つ配列。要素には doc キーがあり、その値はドキュメントを表す JSON 文字列です。レコードを読み取るには、値を逆シリアル化します |
old | オブジェクト配列 | data と同じフォーマットです。type が UPDATE の場合にのみ存在し、更新前のドキュメントの状態が含まれます |
id | Int | 操作のシリアル番号 |
関数は、次の 2 種類の操作を受け取ります。
DDL — スキーマの変更:
CreateIndex、CreateCollection、DropIndex、DropCollectionDML — データの変更:
INSERT、UPDATE、DELETE
ペイロードの例
コレクションの作成 (DDL)
db.createCollection("testCollection"){
"Records": [{
"data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056437000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056437510
}]
}コレクションの削除 (DDL)
db.testCollection.drop(){
"Records": [{
"data": [{"doc": "{\"drop\": \"testCollection\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056577000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056577789
}]
}インデックスの作成 (DDL)
db.testCollection.createIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056670000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056670719
}]
}インデックスの削除 (DDL)
db.testCollection.dropIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056817000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "$cmd",
"ts": 1694056818035
}]
}ドキュメントの挿入 (DML)
// バッチ挿入
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})
// 単一挿入
db.user.insert({"name": "jack", "age": 20}){
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}ドキュメントの更新 (DML)
db.user.update({"name": "jack"}, {$set: {"age": 30}}){
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}UPDATE 操作では、DTS が増分データを同期する際に $set コマンドのみが同期的に実行されます。ドキュメントの削除 (DML)
db.user.remove({"name": "jack"}){
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}制限事項
同期タスクを作成する前に、これらの制約を確認してください。
範囲の制約:
増分データ同期のみがサポートされています。完全データ同期はサポートされていません。
リージョン間同期はサポートされていません。
DTS は
admin、config、またはlocalデータベースからデータを同期できません。オブジェクトマッピングはサポートされていません。
トランザクション情報は保持されません。トランザクションは、宛先で個別のレコードに変換されます。
ソースデータベースの制約:
ソースは、10 個以下の Mongos ノードを持つシャードクラスターアーキテクチャを使用する必要があります。
ソースは、Azure Cosmos DB for MongoDB クラスターまたは Amazon DocumentDB エラスティッククラスターであってはなりません。
同期するコレクションには、PRIMARY KEY 制約または UNIQUE 制約が必要であり、すべてのフィールドが一意である必要があります。そうでない場合、ターゲットデータベースに重複したデータレコードが含まれる可能性があります。
単一のドキュメントは 16 MB を超えることはできません。このサイズを超えるドキュメントは、宛先関数に書き込むことができず、エラーをトリガーします。必要に応じて、抽出、変換、ロード (ETL) 機能を使用して大きなフィールドをフィルター処理します。
タスクごとに最大 1,000 個のコレクションを同期します。より多くのコレクションを同期するには、複数のタスクを作成するか、データベースレベルで同期します。
同期タスクの実行中にソースインスタンスをスケーリングすることはできません。
DTS は SRV エンドポイントを介して MongoDB データベースに接続できません。
ソースデータベースのバランサーがアクティブな場合、タスクに遅延が発生する可能性があります。
ソースデータベースがシャードクラスターアーキテクチャを使用する自社運用 MongoDB データベースである場合、[アクセス方法] パラメーターを [Express Connect、VPN Gateway、または Smart Access Gateway] または [Cloud Enterprise Network (CEN)] に設定します。
制約の記述:
INSERT 操作の場合、挿入されるデータにはシャードキーが含まれている必要があります。
UPDATE 操作の場合、シャードキーは変更できません。
宛先関数ごとに 1 つの DTS タスクのみを設定します。複数のタスクが同じ関数に書き込むと、データエラーが発生する可能性があります。
oplog と変更ストリームの要件:
oplog を有効にし、少なくとも 7 日間のログデータを保持するか、変更ストリームを有効にして少なくとも過去 7 日間の変更をカバーする必要があります。どちらの条件も満たされない場合、DTS はソースの変更をキャプチャできず、DTS サービスレベルアグリーメント (SLA) の対象外となるデータの不整合や損失を引き起こす可能性があります。
変更ストリームの制限 (該当する場合):
変更ストリームには MongoDB 4.0 以降が必要です。
変更ストリームを使用する場合、双方向同期はサポートされていません。
非エラスティック Amazon DocumentDB クラスターの場合は、変更ストリームを使用します。[移行方法] を [ChangeStream] に、[アーキテクチャ] を [シャードクラスター] に設定します。
新しいデータベース:
DTS は、同期タスクの開始後に作成されたデータベースからの増分データを同期しません。
サポートされる操作
キャプチャされる操作は、移行方法によって異なります。
oplog の使用 (推奨):
CREATE COLLECTION、CREATE INDEXDROP DATABASE、DROP COLLECTION、DROP INDEXRENAME COLLECTIONドキュメントレベルの INSERT、UPDATE、DELETE
変更ストリームの使用:
DROP DATABASE、DROP COLLECTIONRENAME COLLECTIONドキュメントレベルの INSERT、UPDATE、DELETE
課金
増分データ同期は課金対象です。料金の詳細については、「課金概要」をご参照ください。
| 課金方法 | 説明 |
|---|---|
| サブスクリプション | 1〜9 か月、または 1、2、3、5 年分を前払いします。長期利用の場合、より費用対効果が高くなります |
| 従量課金 | 時間単位で課金されます。不要になったインスタンスをリリースすると、課金が停止します |
DTS は、接続再試行期間中もインスタンスに対して課金します。
前提条件
開始する前に、以下を確認してください。
実行中の ApsaraDB for MongoDB シャードクラスターインスタンス。詳細については、「シャードクラスターインスタンスの作成」をご参照ください
Function Compute サービスおよび関数で、ハンドラータイプ が [イベントハンドラー] に設定されています。詳細については、「関数をすばやく作成する」をご参照ください。
ソース ApsaraDB for MongoDB インスタンス上のデータベースアカウントに、ソース、
admin、およびlocalデータベースに対する読み取り権限があること。詳細については、「MongoDB データベースユーザーの権限管理」をご参照ください
同期タスクの作成
ステップ 1: データ同期ページの表示
いずれかのコンソールを使用して [データ同期] ページに移動します。
DTS コンソール
DMS コンソール
正確なナビゲーションパスは、お客様の DMS コンソールのレイアウトによって異なります。「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」を参照してください。
ステップ 2: ソースデータベースとターゲットデータベースの設定
[タスクの作成] をクリックし、次のパラメーターを設定します。
| セクション | パラメーター | 説明 |
|---|---|---|
| 該当なし | タスク名 | DTS が自動生成する名前です。タスクを容易に識別できるよう、意味のある名前を指定してください。一意性は必須ではありません。 |
| ソースデータベース | 既存の接続を選択 | 登録済みのデータベースインスタンスを選択すると、以下のパラメーターが自動で入力されます。または、手動で設定することもできます。 |
| データベースタイプ | MongoDB | |
| アクセス方法 | Alibaba Cloud インスタンス | |
| インスタンスリージョン | ソース MongoDB インスタンスのリージョン | |
| Alibaba Cloud アカウント間でのデータ複製 | 同一アカウント内での同期を行う場合は、いいえ を選択します。 | |
| アーキテクチャ | シャードクラスター | |
| 移行方法 | DTS による増分データの取得方法です。オプション:推奨の Oplog、または下記の「詳細」をご参照ください。ChangeStream | |
| インスタンス ID | ソース MongoDB インスタンスの ID | |
| 認証用データベース | アカウントの認証情報を格納するデータベースです。デフォルト値:admin | |
| データベースアカウント | 必要な読み取り権限を持つソースデータベースのアカウント | |
| データベースパスワード | データベースアカウントのパスワード | |
| シャードアカウント | ソースインスタンス内のシャードにアクセスするためのアカウント | |
| シャードパスワード | シャードアカウントのパスワード | |
| 暗号化 | 接続時の暗号化モードです。オプション: 非暗号化、SSL 暗号化、Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の選択内容によって異なります。アーキテクチャ が シャードクラスター かつ 移行方法 が Oplog の場合、SSL 暗号化 | |
| 送信先データベース | 既存の接続を選択 | 登録済みの Function Compute インスタンスを選択すると、以下のパラメーターが自動で入力されます。または、手動で設定することもできます。 |
| データベースタイプ | Function Compute | |
| アクセス方法 | Alibaba Cloud インスタンス | |
| インスタンスリージョン | ソースリージョンと一致します。変更できません。 | |
| サービス | 送信先関数を含む Function Compute サービス | |
| 関数 | 同期されたデータを受信する関数 | |
| サービスバージョンおよびエイリアス | サービスのバージョンまたはエイリアスです。オプション: デフォルトバージョン(固定値:LATEST)、指定したバージョン(サービスバージョン の指定が必要)、指定したエイリアス(サービスエイリアス用語 |
移行方法の選択:
| 方法 | 使用する状況 |
|---|---|
| Oplog (推奨) | ソースで oplog が有効になっている場合 (自社運用 MongoDB と ApsaraDB for MongoDB の両方でデフォルト)。ログのプルが高速なため、同期遅延が短縮されます |
| ChangeStream | oplog が無効になっている場合、または非エラスティック Amazon DocumentDB クラスターを使用している場合 (変更ストリームが必要)。MongoDB 4.0 以降が必要です。双方向同期はサポートされていません |
[アーキテクチャ] が [シャードクラスター] で、[移行方法] が [ChangeStream] の場合、[シャードアカウント] と [シャードパスワード] パラメーターは不要です。
ステップ 3: 接続テスト
[接続テストと次へ] をクリックします。
DTS サーバーの CIDR ブロックを、ソースおよび送信先のセキュリティグループまたは許可リストに追加することを確認してください。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。ソースまたは送信先でセルフマネージドアクセス方法を使用している場合は、まず「[接続テスト]」を「[DTS サーバーの CIDR ブロック]」ダイアログでクリックします。
ステップ 4: 同期するオブジェクトの選択
[オブジェクトの設定] ステップで、以下を設定します。
| パラメーター | 説明 |
|---|---|
| 同期タイプ | [増分データ同期] に固定されています。変更できません |
| データ形式 | [Canal Json] に固定されています。フィールドの説明については、「Kafka クラスター Topic のデータ形式」の Canal Json セクションをご参照ください |
| ソースオブジェクト | 同期するデータベースまたはコレクションを選択し、 |
| 選択したオブジェクト | 選択したオブジェクトを確認します。 |
ステップ 5: 詳細設定
[次へ: 詳細設定] をクリックし、以下を設定します。
| パラメーター | 説明 |
|---|---|
| タスクスケジューリング用の専用クラスター | デフォルトでは、DTS はタスクを共有クラスターにスケジュールします。より高い安定性を得るために、専用クラスターをご購入ください。詳しくは、「DTS 専用クラスターとは |
| 接続失敗時の再試行時間 | ソースまたは宛先が到達不能な場合に DTS が再試行する時間。範囲: 10〜1440 分。デフォルト: 720。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは宛先を共有する場合、最も短い再試行時間が優先されます |
| その他の問題に対する再試行時間 | DTS が失敗した DDL または DML 操作を再試行する時間。範囲: 1〜1440 分。デフォルト: 10。少なくとも 10 分に設定してください。[接続失敗時の再試行時間] |
| 更新後にドキュメント全体を取得 | ChangeStream のみ。[はい]: 更新後に完全なドキュメントを送信します。[いいえ]: 変更されたフィールドのみを送信します |
| 増分データ同期のスループット制限を有効化 | 送信先のロードを軽減するために、同期スループットを制限します。[増分データ同期の RPS] と [増分同期のデータ同期速度 (MB/s)] |
| 環境タグ | DTS インスタンスの環境を識別するためのタグ。オプション |
| ETL の設定 | ETL 機能を有効化して、転送中のデータを変換します。詳細については、「ETL とは?」および「データ移行またはデータ同期タスクで ETL を設定する |
| モニタリングとアラート | タスクが失敗した場合、または同期遅延がしきい値を超えた場合にアラートを送信します。詳細については、「DTS タスクの作成時にモニタリングとアラートを設定する |
ステップ 6: 事前チェックの実行
[次へ: タスク設定の保存と事前チェック] をクリックします。
このタスク設定の API パラメーターをプレビューするには、[次へ: タスク設定の保存と事前チェック] にポインターを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。
DTS は、同期タスクを開始する前に事前チェックを実行します。タスクは、事前チェックに合格した後にのみ開始されます。
項目が失敗した場合: 失敗した項目の横にある [詳細の表示] をクリックし、報告された問題を修正してから、[再度事前チェック] をクリックします。
アラートがトリガーされた場合:
アラートを無視できない場合: [詳細の表示] をクリックし、問題を修正して、事前チェックを再実行します。
アラートを無視できる場合: [アラート詳細の確認] をクリックし、ダイアログで [無視] をクリックし、[OK] で確認してから、[再度事前チェック] をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。
ステップ 7: インスタンスの購入
[成功率] が [100%] に達するまで待ってから、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、次の項目を設定します。
| セクション | パラメーター | 説明 |
|---|---|---|
| 新規インスタンスクラス | 課金方法 | サブスクリプション または 従量課金 |
| リソースグループ設定 | インスタンスのリソースグループ。デフォルト: デフォルトリソースグループResource Management とは | |
| インスタンスクラス | 同期速度を決定します。詳細については、「データ同期インスタンスのインスタンスクラス | |
| サブスクリプション期間 | サブスクリプション 課金でのみ利用可能です。オプション: 1〜9 か月、または 1、2、3、5 年 |
[Data Transmission Service (従量課金) 利用規約] を読んで選択します。
[購入して開始] をクリックし、確認ダイアログで [OK] をクリックします。
タスクリストでタスクの進捗状況を追跡します。
次のステップ
Function Compute のイベントハンドラーを記述して、Canal JSON ペイロードを処理します。
isDdlフィールドを使用して DDL と DML のロジックを分岐させ、各data要素のdoc文字列を逆シリアル化してドキュメントフィールドにアクセスします。関数への大規模フィールドのドキュメントの負荷を軽減するには、データが関数に到達する前に ETL フィルターを設定します。詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。
同期の健全性をモニターするには、[高度な設定] でアラートの設定を行うか、タスク作成後にアラートの設定を行います。詳細については、「DTS タスクを作成するときにモニタリングとアラートの設定を行う」をご参照ください。
タスクが失敗した場合、DTS テクニカルサポートは 8 時間以内にそのタスクを復元しようと試みます。復元中に、タスクが再起動され、タスクパラメーター(データベースパラメーターではない)が調整される場合があります。変更可能なパラメーターのリストについては、「インスタンスパラメーターの変更」をご参照ください。