Data Transmission Service (DTS) を使用して、ApsaraDB for MongoDB シャードクラスターインスタンスから Function Compute 関数に増分データを同期できます。関数コードを記述して、関数に同期されたデータをさらに処理できます。
前提条件
ソースの ApsaraDB for MongoDB シャードクラスターインスタンスが作成されていること。詳細については、「シャードクラスターインスタンスを作成する」をご参照ください。
デスティネーションサービスと関数が作成されており、関数の [ハンドラータイプ] パラメーターが [イベントハンドラー] に設定されていること。関数の作成方法の詳細については、「関数を簡単に作成する」をご参照ください。
使用方法に関する注意事項
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
課金
同期タイプ | タスク構成料金 |
増分データ同期 | 課金されます。詳細については、「課金の概要」をご参照ください。 |
データ同期をサポートする操作
同期タイプ | 説明 |
増分データ同期 | oplog を使用するDTS タスクは、タスクの実行開始後に作成されたデータベースから増分データを同期しません。DTS は、次の操作によって生成された増分データを同期します。
変更ストリームを使用するDTS は、次の操作によって生成された増分データを同期します。
|
データベースアカウントに必要な権限
データベース | 必要な権限 | 参照 |
ソース ApsaraDB for MongoDB インスタンス | ソースデータベース、admin データベース、および local データベースに対する読み取り権限。 |
手順
次のいずれかの方法を使用して [データ同期] ページに移動し、データ同期インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソール にログオンします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、データ同期タスクが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソール にログオンします。
上部のナビゲーションバーで、[データ + AI] にポインターを移動し、 を選択します。
データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク構成ページに移動します。
ソースデータベースとデスティネーションデータベースを構成します。次の表にパラメーターを示します。
セクション
パラメーター
説明
該当なし
タスク名
DTS タスクの名前。DTS はタスク名を自動的に生成します。タスクを識別しやすい説明的な名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。
移行元データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
ソースデータベースのタイプ。MongoDB を選択します。
アクセス方法
ソースデータベースのアクセス方法。Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソース MongoDB インスタンスが存在するリージョン。
Alibaba Cloud アカウント間でデータを複製
Alibaba Cloud アカウント間でデータを同期するかどうかを指定します。この例では、× が選択されています。
アーキテクチャ
ソースデータベースがデプロイされているアーキテクチャ。[シャードクラスター] を選択します。
[移行方法]
ソースデータベースから増分データを同期するために使用される方法。ビジネス要件に基づいて方法を選択します。有効な値:
Oplog(推奨):
このオプションは、ソースデータベースで oplog 機能が有効になっている場合に選択できます。
説明デフォルトでは、自己管理 MongoDB データベースと ApsaraDB for MongoDB インスタンスの両方で oplog 機能が有効になっています。この機能により、ログのプル速度が速いため、低レイテンシで増分データを同期できます。そのため、[移行方法] パラメーターには Oplog を選択することをお勧めします。
ChangeStream:
このオプションは、ソースデータベースで変更ストリームが有効になっている場合に選択できます。詳細については、変更ストリーム を参照してください。
説明ソースデータベースが非エラスティック Amazon DocumentDB クラスターの場合、[移行方法] パラメーターは ChangeStream にのみ設定できます。
シャードクラスター を アーキテクチャ パラメーターに選択した場合、Shardアカウント パラメーターと Shardパスワード パラメーターを構成する必要はありません。
インスタンス ID
ソース MongoDB インスタンスの ID。
認証データベース
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントとパスワードを格納する認証データベースの名前。認証データベースの名前を事前に変更していない場合、デフォルト値は [admin] です。
データベースアカウント
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウント。アカウントに必要な権限については、このトピックの「データベースアカウントに必要な権限」セクションを参照してください。
データベースのパスワード
データベースへのアクセスに使用するパスワード。
Shardアカウント
ソース ApsaraDB for MongoDB インスタンスのシャードへのアクセスに使用するアカウント。
Shardパスワード
ソース ApsaraDB for MongoDB インスタンスのシャードアカウントのパスワード。
暗号化
ソースデータベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、非暗号化、SSL 暗号化、または Mongo Atlas SSL を選択できます。暗号化 パラメーターで使用できるオプションは、アクセス方法 パラメーターと アーキテクチャ パラメーターで選択した値によって決まります。DTS コンソールに表示されるオプションが優先されます。
説明[アーキテクチャ] パラメーターが [シャードクラスター] に設定され、[移行方法] パラメーターが ApsaraDB for MongoDB データベースの Oplog に設定されている場合、[暗号化] パラメーターの SSL 暗号化 は使用できません。
ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースで、アクセス方法 パラメーターが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメーターが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードしてソースデータベースへの接続を検証できます。
移行先データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
デスティネーションデータベースのタイプ。Function Compute (FC) を選択します。
アクセス方法
デスティネーションデータベースのアクセス方法。Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
デスティネーションデータベースが存在するリージョン。デフォルトでは、ソースデータベースの インスタンスのリージョン パラメーターと同じ値になり、変更できません。
サービス
デスティネーション関数が属するサービスの名前。
関数
同期されたデータを受信するデスティネーション関数。
サービスのバージョンとエイリアス
サービスのバージョンまたはエイリアス。ビジネス要件に基づいてこのパラメーターを構成します。
デフォルトバージョン を選択した場合、サービスバージョン パラメーターの値は [LATEST] に固定されます。
バージョンの指定 を選択した場合、サービスバージョン パラメーターを構成する必要があります。
エイリアスの指定 を選択した場合、サービスのエイリアス パラメーターを構成する必要があります。
説明Function Compute の用語の詳細については、「用語」をご参照ください。
ページの下部にある 接続をテストして続行 をクリックします。
説明DTS サーバーの CIDR ブロックをソースデータベースとデスティネーションデータベースのセキュリティ設定に自動または手動で追加して、DTS サーバーからのアクセスを許可できるようにしてください。詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。
ソースデータベースまたはデスティネーションデータベースが自己管理データベースであり、その アクセス方法 が Alibaba Cloud インスタンス に設定されていない場合は、DTS サーバーの CIDR ブロック ダイアログボックスの 接続テスト をクリックします。
同期するオブジェクトを構成します。
オブジェクト設定 ステップで、同期するオブジェクトを構成します。
パラメーター
説明
同期タイプ
デフォルトでは、増分データ同期 が選択されており、値を変更することはできません。
データ形式
デスティネーション関数に同期されるデータの格納形式。[Canal Json] 形式のみがサポートされています。
説明Canal JSON 形式のパラメーターの詳細については、「Kafka クラスターのデータ形式」トピックの「Canal Json」セクションを参照してください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、
アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。説明同期するオブジェクトとしてデータベースまたはコレクションを選択します。
選択中のオブジェクト
選択中のオブジェクト セクションで、同期するデータを確認します。
説明選択したオブジェクトを削除するには、選択中のオブジェクト セクションで削除するオブジェクトを選択し、
アイコンをクリックします。データベースまたはコレクション別に増分データを同期するには、選択中のオブジェクト を右クリックし、表示されるダイアログボックスで操作を選択します。
次へ:詳細設定 をクリックして、詳細設定を構成します。
パラメーター
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、専用クラスターを指定しない場合、DTS は共有クラスターにタスクをスケジュールします。データ同期インスタンスの安定性を向上させるには、専用クラスターを購入します。詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
接続失敗時のリトライ時間の範囲。データ同期タスクの開始後にソースデータベースまたはデスティネーションデータベースに接続できない場合、DTS は指定された時間範囲内で直ちに接続を再試行します。有効な値:10 ~ 1440。単位:分。デフォルト値:720。このパラメーターには 30 より大きい値を設定することをお勧めします。指定された時間範囲内に DTS がソースデータベースとデスティネーションデータベースに再接続すると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
説明同じソースデータベースまたはデスティネーションデータベースを持つ複数のデータ同期タスクに異なるリトライ時間の範囲を指定した場合、最も短いリトライ時間の範囲が優先されます。
DTS が接続を再試行すると、DTS インスタンスの料金が発生します。ビジネス要件に基づいてリトライ時間の範囲を指定することをお勧めします。ソースインスタンスとデスティネーションインスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題のリトライ時間の範囲。たとえば、データ同期タスクの開始後に DDL 操作または DML 操作の実行に失敗した場合、DTS は指定された時間範囲内で直ちに操作を再試行します。有効な値:1 ~ 1440。単位:分。デフォルト値:10。このパラメーターには 10 より大きい値を設定することをお勧めします。指定された時間範囲内に失敗した操作が正常に実行されると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値よりも小さくする必要があります。
[更新後にドキュメント全体を取得する]
増分データ同期中に、更新操作後のドキュメントの完全なデータをターゲットデータベースに同期するかどうかを指定します。
説明このパラメーターは、移行方法 を ChangeStream に設定した場合にのみ使用できます。
○: 更新操作の後、ドキュメントの完全なデータを同期します。
×: 更新されたデータのみを同期します。
増分同期率を制限するかどうか
増分データ同期の速度制限を有効にするかどうかを指定します。ビジネス要件に基づいて、増分データ同期の速度制限を有効にすることができます。速度制限を設定するには、1 秒あたりの増分同期の行数 RPS と 1 秒あたりの増分同期データ量 (MB) BPS パラメーターを設定する必要があります。これにより、ターゲットデータベースサーバーの負荷が軽減されます。
環境タグ
DTS インスタンスを識別するために使用される環境タグです。ビジネス要件に基づいて環境タグを選択できます。この例では、このパラメーターを設定する必要はありません。
ETL の設定
抽出、変換、書き出し(ETL)機能を有効にするかどうかを指定します。 詳細については、「ETL とは」をご参照ください。有効な値:
[はい]: ETL 機能を設定します。コードエディタにデータ処理ステートメントを入力できます。詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。
[いいえ]: ETL 機能を構成しません。
監視とアラート
データ同期インスタンスのアラートを設定するかどうかを指定します。 タスクが失敗した場合、または同期遅延が指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。有効な値:
[いいえ]: アラートを有効にしません。
[はい]: アラートを設定します。 この場合、アラートのしきい値と アラート通知設定 も設定する必要があります。 詳細については、「DTS タスクの作成時にモニタリングとアラートを設定する」セクションのモニタリングとアラートを設定する Topic をご参照ください。
タスク設定を保存し、事前チェックを実行します。
DTS タスクを設定するために関連 API 操作を呼び出すときに指定するパラメーターを表示するには、次:タスク設定の保存と事前チェック にポインターを移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示する必要がない場合、またはすでに表示済みの場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後にのみ、データ同期タスクを開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。次に、事前チェックを再実行します。
事前チェック中に項目に対してアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。次に、事前チェックを再度実行します。
アラート項目を無視できる場合は、[アラートの詳細の確認] をクリックします。[詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再チェック] をクリックして、事前チェックを再度実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になったら、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、データ同期タスクの課金方法とインスタンスクラスのパラメーターを設定します。次の表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション: データ同期インスタンスを作成するときにサブスクリプション料金を支払います。サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは時間単位で課金されます。従量課金方法は、短期使用に適しています。従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループ設定
データ同期インスタンスが属するリソースグループ。デフォルト値: [デフォルトのリソースグループ]。詳細については、「リソース管理とは」をご参照ください。
インスタンスクラス
DTS は、同期速度が異なるインスタンスクラスを提供します。ビジネス要件に基づいてインスタンスクラスを選択できます。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプション課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。サブスクリプション期間は、1 ~ 9 か月、1 年、2 年、3 年、または 5 年です。
説明このパラメーターは、サブスクリプション 課金方法を選択した場合にのみ使用できます。
[Data Transmission Service (従量課金) サービス規約] を読んで選択します。
[購入して開始] をクリックします。表示されるダイアログボックスで、OK をクリックします。
タスクの進捗状況はタスクリストで確認できます。
宛先関数で受信されるデータのフォーマット
宛先関数で受信されるデータは、オブジェクト型です。ソースデータベースの増分データは、配列形式の Records フィールドに格納されます。配列内の各要素は、オブジェクト型のデータレコードを示します。次の表は、オブジェクト型のデータレコードのフィールドについて説明しています。
宛先関数は、次の 2 種類の SQL 操作を記録するデータを受信します。
DDL: CreateIndex、CreateCollection、DropIndex、DropCollection などのデータスキーマ変更に対する操作。
DML: INSERT、UPDATE、DELETE などのデータ管理に対する操作。
パラメータ | カテゴリ | 説明 |
| ブール値 | 操作が DDL 操作かどうかを示します。有効な値:
|
| 文字列 | SQL 操作のタイプ。
|
| 文字列 | MongoDB データベースの名前。 |
| 文字列 | MongoDB データベースのコレクションの名前。 |
| 文字列 | MongoDB データベースのプライマリキーの名前。値を _id に設定します。 |
| Long | ソースデータベースで操作が実行された時刻。値は 13 桁の UNIX タイムスタンプです。単位:ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| Long | 宛先データベースで操作の実行が開始された時刻。値は 13 桁の UNIX タイムスタンプです。単位:ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| オブジェクト配列 | オブジェクト型の要素を 1 つだけ含む配列。要素のキーは doc で、要素の値は JSON 文字列です。 説明 値をデシリアライズして、データレコードを取得します。 |
old | オブジェクト配列 | 元のデータが格納されている配列。フィールドのフォーマットは、data フィールドと同じです。 重要 このフィールドは、 フィールドの値が UPDATE の場合にのみ使用可能であり、 |
| Int | 操作のシリアル番号。 |
DDL 操作と宛先関数で受信されるデータの例
コレクションを作成する
SQL 文
db.createCollection("testCollection")宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}' // コレクション testCollection を作成し、_id をプライマリキーとして設定します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}コレクションを削除する
SQL 文
db.testCollection.drop()宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}' // コレクション testCollection を削除します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}インデックスを作成する
SQL 文
db.testCollection.createIndex({name:1})宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}' // コレクション testCollection にインデックス name_1 を作成します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}インデックスを削除する
SQL 文
db.testCollection.dropIndex({name:1})宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}' // コレクション testCollection からインデックス name_1 を削除します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}DML 操作と宛先関数で受信されるデータの例
データを挿入する
SQL 文
// 一度に複数のデータレコードを挿入します。
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 一度に 1 つのデータレコードを挿入します。
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}' // name が jack、age が 20 のデータレコードを挿入します。
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}' // name が lili、age が 20 のデータレコードを挿入します。
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}データを更新する
SQL 文
db.user.update({"name":"jack"},{$set:{"age":30}}) 宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}' // name が jack のデータレコードの age を 30 に更新します。
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}' // 更新前のデータレコード。
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}データを削除する
SQL 文
db.user.remove({"name":"jack"})宛先関数で受信されるデータ
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}' // name が jack のデータレコードを削除します。
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}