このトピックでは、Data Transmission Service (DTS) を使用して、ApsaraDB for MongoDB レプリカセットインスタンスから Function Compute 関数に増分データを同期する方法について説明します。 関数コードを記述して、関数に同期されたデータをさらに処理できます。
前提条件
ソースの ApsaraDB for MongoDB レプリカセットインスタンスが作成されていること。 詳細については、「レプリカセットインスタンスを作成する」をご参照ください。
デスティネーションサービスと関数が作成されており、関数の [ハンドラータイプ] パラメーターが [イベントハンドラー] に設定されていること。 関数の作成方法の詳細については、「関数をすばやく作成する」をご参照ください。
使用上の注意
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
特別なケース | ソースデータベースが自己管理 MongoDB データベースの場合、次の制限事項に注意してください。
説明 同期対象のオブジェクトとしてデータベース全体を選択した場合は、ハートビートテーブルを作成できます。 ハートビートテーブルは毎秒更新またはデータを受信します。 |
課金
同期タイプ | タスク構成料金 |
増分データ同期 | 課金されます。 詳細については、「課金の概要」をご参照ください。 |
データ同期をサポートする操作
同期タイプ | 説明 |
増分データ同期 | oplog を使用するDTS タスクは、タスクの実行開始後に作成されたデータベースから増分データを同期しません。 DTS は、次の操作によって生成された増分データを同期します。
変更ストリームを使用するDTS は、次の操作によって生成された増分データを同期します。
|
データベースアカウントに必要な権限
データベース | 必要な権限 | 参照 |
ソース MongoDB インスタンス | source、admin、および local データベースに対する読み取り権限。 |
手順
次のいずれかの方法を使用して [データ同期] ページに移動し、データ同期インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソール にログオンします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、データ同期タスクが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。 詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソール にログオンします。
上部のナビゲーションバーで、ポインターを [データ + AI] に移動し、 を選択します。
データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク構成ページに移動します。
ソースデータベースとデスティネーションデータベースを構成します。 次の表にパラメーターを示します。
セクション
パラメーター
説明
該当なし
タスク名
DTS タスクの名前。 DTS はタスク名を自動的に生成します。 タスクを簡単に識別できる説明的な名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。
移行元データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。 DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。 詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
ソースデータベースのタイプ。 MongoDB を選択します。
アクセス方法
ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソース MongoDB インスタンスが存在するリージョン。
Alibaba Cloud アカウント間でデータを複製
Alibaba Cloud アカウント間でデータを同期するかどうかを指定します。 この例では、× が選択されています。
アーキテクチャ
ソースデータベースがデプロイされているアーキテクチャ。 レプリカセット を選択します。
[移行方法]
ソースデータベースから増分データを同期するために使用される方法。 ビジネス要件に基づいて方法を選択します。 有効な値:
Oplog (推奨):
このオプションは、ソースデータベースで oplog 機能が有効になっている場合に利用できます。
説明デフォルトでは、自己管理 MongoDB データベースと ApsaraDB for MongoDB インスタンスの両方で oplog 機能が有効になっています。 この機能により、ログのプル速度が速いため、低レイテンシで増分データを同期できます。 したがって、[移行方法] パラメーターには Oplog を選択することをお勧めします。
ChangeStream:
このオプションは、ソースデータベースで変更ストリームが有効になっている場合に利用できます。 詳細については、「変更ストリーム」をご参照ください。
説明ソースデータベースが非エラスティック Amazon DocumentDB クラスターの場合、[移行方法] パラメーターは ChangeStream のみに設定できます。
シャードクラスター を アーキテクチャ パラメーターに選択した場合、Shardアカウント パラメーターと Shardパスワード パラメーターを構成する必要はありません。
インスタンス ID
ソース MongoDB インスタンスの ID。
認証データベース
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントとパスワードを格納する認証データベースの名前。 認証データベースの名前を事前に変更していない場合、デフォルト値は [admin] です。
データベースアカウント
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウント。 アカウントに必要な権限については、このトピックの「データベースアカウントに必要な権限」セクションをご参照ください。
データベースのパスワード
データベースへのアクセスに使用されるパスワード。
暗号化
ソースデータベースへの接続を暗号化するかどうかを指定します。 ビジネス要件に基づいて、非暗号化、SSL 暗号化、または Mongo Atlas SSL を選択できます。 暗号化 パラメーターで使用できるオプションは、アクセス方法 パラメーターと アーキテクチャ パラメーターで選択された値によって決まります。 DTS コンソールに表示されるオプションが優先されます。
説明[アーキテクチャ] パラメーターが [シャーディングクラスター] に設定され、ApsaraDB for MongoDB データベースの [移行方法] パラメーターが Oplog に設定されている場合、[暗号化] パラメーター SSL 暗号化 は使用できません。
ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースで、アクセス方法 パラメーターが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメーターが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードして、ソースデータベースへの接続を検証できます。
移行先データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。 DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。 詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
デスティネーションデータベースのタイプ。 Function Compute (FC) を選択します。
アクセス方法
デスティネーションデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
デスティネーションデータベースが存在するリージョン。 デフォルトでは、ソースデータベースの インスタンスのリージョン パラメーターと同じ値で、変更できません。
サービス
デスティネーション関数が属するサービスの名前。
関数
同期されたデータを受信するデスティネーション関数。
サービスのバージョンとエイリアス
サービスのバージョンまたはエイリアス。 ビジネス要件に基づいてこのパラメーターを構成します。
デフォルトバージョン を選択した場合、サービスバージョン パラメーターの値は [LATEST] に固定されます。
バージョンの指定 を選択した場合、サービスバージョン パラメーターを構成する必要があります。
エイリアスの指定 を選択した場合、サービスのエイリアス パラメーターを構成する必要があります。
説明Function Compute の用語の詳細については、「用語」をご参照ください。
ページの下部にある 接続をテストして続行 をクリックします。
説明DTS サーバーの CIDR ブロックをソースデータベースとデスティネーションデータベースのセキュリティ設定に自動または手動で追加して、DTS サーバーからのアクセスを許可できるようにしてください。 詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。
ソースデータベースまたはデスティネーションデータベースが自己管理データベースで、その アクセス方法 が Alibaba Cloud インスタンス に設定されていない場合は、DTS サーバーの CIDR ブロック ダイアログボックスの 接続テスト をクリックします。
同期するオブジェクトを構成します。
オブジェクト設定 ステップで、同期するオブジェクトを構成します。
パラメーター
説明
同期タイプ
デフォルトでは、増分データ同期 が選択されており、値を変更することはできません。
データ形式
デスティネーション関数に同期されるデータの格納形式。 [Canal Json] 形式のみがサポートされています。
説明Kafka クラスタのデータ形式Canal JSON 形式のパラメーターの詳細については、「Kafka クラスターのデータ形式」トピックの「Canal Json」セクションをご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、
アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。説明同期対象のオブジェクトとしてデータベースまたはコレクションを選択します。
選択中のオブジェクト
選択中のオブジェクト セクションで、同期するデータを確認します。
説明選択したオブジェクトを削除するには、選択中のオブジェクト セクションで削除するオブジェクトを選択し、
アイコンをクリックします。データベースまたはコレクション別に増分データを同期するには、選択中のオブジェクト を右クリックし、表示されるダイアログボックスで操作を選択します。
次へ:詳細設定 をクリックして、詳細設定を構成します。
パラメーター
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、専用クラスターを指定しない場合、DTS は共有クラスターにタスクをスケジュールします。 データ同期インスタンスの安定性を向上させるには、専用クラスターを購入します。 詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
失敗した接続の再試行時間の範囲。 データ同期タスクの開始後にソースデータベースまたはデスティネーションデータベースに接続できない場合、DTS は指定された時間範囲内で直ちに接続を再試行します。 有効な値: 10 ~ 1440。 単位: 分。 デフォルト値: 720。 このパラメーターは 30 より大きい値に設定することをお勧めします。 DTS が指定された時間範囲内にソースデータベースとデスティネーションデータベースに再接続すると、DTS はデータ同期タスクを再開します。 そうしないと、データ同期タスクは失敗します。
説明同じソースデータベースまたはデスティネーションデータベースを持つ複数のデータ同期タスクに異なる再試行時間の範囲を指定した場合、最も短い再試行時間の範囲が優先されます。
DTS が接続を再試行すると、DTS インスタンスの料金が発生します。 ビジネス要件に基づいて再試行時間の範囲を指定することをお勧めします。 また、ソースインスタンスとデスティネーションインスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題の再試行時間の範囲。 たとえば、データ同期タスクの開始後に DDL または DML 操作の実行に失敗した場合、DTS は指定された時間範囲内で直ちに操作を再試行します。 有効な値: 1 ~ 1440。 単位: 分。 デフォルト値: 10。 このパラメーターは 10 より大きい値に設定することをお勧めします。 失敗した操作が指定された時間範囲内で正常に実行されると、DTS はデータ同期タスクを再開します。 そうしないと、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値よりも小さくなければなりません。
[更新後にドキュメント全体を取得する。]
増分データ同期中に、更新操作後にドキュメントの完全なデータをデスティネーションデータベースに同期するかどうかを指定します。
説明このパラメーターは、移行方法 を ChangeStream に設定した場合にのみ使用できます。
○: 更新操作後にドキュメントの完全なデータを同期します。
×: 更新されたデータのみを同期します。
増分同期率を制限するかどうか
増分データ同期の調整を有効にするかどうかを指定します。 ビジネス要件に基づいて、増分データ同期の調整を有効にできます。 調整を構成するには、1 秒あたりの増分同期の行数 RPS パラメーターと 1 秒あたりの増分同期データ量 (MB) BPS パラメーターを構成する必要があります。 これにより、デスティネーションデータベースサーバーの負荷が軽減されます。
環境タグ
DTS インスタンスを識別するために使用される環境タグ。 ビジネス要件に基づいて環境タグを選択できます。 この例では、このパラメーターを構成する必要はありません。
ETL の設定
抽出・変換・書き出し (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETL とは」をご参照ください。 有効な値:
[はい]: ETL 機能を構成します。 コードエディタにデータ処理文を入力できます。 詳細については、「データ移行タスクまたはデータ同期タスクで ETL を構成する」をご参照ください。
[いいえ]: ETL 機能を構成しません。
[監視とアラート]
データ同期インスタンスのアラートを構成するかどうかを指定します。 タスクが失敗した場合、または同期レイテンシが指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。 有効な値:
[いいえ]: アラートを有効にしません。
[はい]: アラートを構成します。 この場合、アラートのしきい値と アラート通知設定 も構成する必要があります。 詳細については、「監視とアラートを構成する」トピックの「DTS タスクを作成するときに監視とアラートを構成する」セクションをご参照ください。
タスク設定を保存し、事前チェックを実行します。
関連する API 操作を呼び出して DTS タスクを構成するときに指定するパラメーターを表示するには、次:タスク設定の保存と事前チェック にポインターを移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示する必要がない場合、またはすでに表示している場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTS は事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。 その後、事前チェックを再実行します。
事前チェック中に項目のアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。 その後、事前チェックを再度実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されるメッセージで、[OK] をクリックします。 その後、[再事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になるまで待ちます。 その後、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、データ同期タスクの [課金方法] パラメーターと [インスタンスクラス] パラメーターを構成します。 次の表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション: データ同期インスタンスを作成するときにサブスクリプションの料金を支払います。 サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは 1 時間単位で課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスを解放してコストを削減できます。
リソースグループ設定
データ同期インスタンスが属するリソースグループ。 デフォルト値: [デフォルトリソースグループ]。 詳細については、「リソース管理とは」をご参照ください。
インスタンスクラス
DTS は、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプション課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1 ~ 9 か月、1 年、2 年、3 年、または 5 年にすることができます。
説明このパラメーターは、サブスクリプション 課金方法を選択した場合にのみ使用できます。
[データ伝送サービス (従量課金) サービス条件] を読んで選択します。
[購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進捗状況を確認できます。
次のステップ
同期される単一のデータ入力のサイズが 16 MB を超えると、DTS タスクでエラーが報告されます。同期するオブジェクトを変更するか、ETL 機能を使用してサイズの大きいデータ入力を除外できます。詳細については、「既存のデータ同期タスクの ETL 構成を変更する」セクション(データ移行または同期タスクでの ETL の構成に関する Topic)と「同期するオブジェクトを変更する」をご参照ください。
ビジネス要件に基づいて関数コードを記述します。 詳細については、「概要」をご参照ください。
デスティネーション関数が受信するデータの形式
デスティネーション関数が受信するデータは、オブジェクトタイプです。 ソースデータベースの増分データは、配列形式の Records フィールドに格納されます。 配列内の各要素は、オブジェクトタイプのデータレコードを示します。 次の表に、オブジェクトタイプのデータレコードのフィールドを示します。
デスティネーション関数は、次の 2 種類の SQL 操作を記録するデータを受信します。
DDL: CreateIndex、CreateCollection、DropIndex、DropCollection などのデータスキーマ変更の操作。
DML: INSERT、UPDATE、DELETE などのデータ管理の操作。
パラメーター | カテゴリ | 説明 |
| ブール値 | 操作が DDL 操作かどうかを示します。 有効な値:
|
| 文字列 | SQL 操作のタイプ。
|
| 文字列 | MongoDB データベースの名前。 |
| 文字列 | MongoDB データベースのコレクションの名前。 |
| 文字列 | MongoDB データベースのプライマリキーの名前。 値を _id に設定します。 |
| Long | ソースデータベースで操作が実行された時刻。 値は 13 桁の UNIX タイムスタンプです。 単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| Long | デスティネーションデータベースで操作の実行が開始された時刻。 値は 13 桁の UNIX タイムスタンプです。 単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| オブジェクト配列 | オブジェクトタイプの要素が 1 つだけ含まれている配列。 要素のキーは doc で、要素の値は JSON 文字列です。 説明 値を逆シリアル化して、データレコードを取得します。 |
old | オブジェクト配列 | 元のデータが格納されている配列。 フィールドの形式は、data フィールドの形式と同じです。 重要 このフィールドは、 フィールドの値が UPDATE の場合にのみ使用でき、 |
| Int | 操作のシリアル番号。 |
DDL 操作とデスティネーション関数が受信するデータの例
コレクションを作成する
SQL 文
db.createCollection("testCollection")デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}' // コレクション testCollection を作成します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000, // ソース MongoDB データベースで操作が実行された時刻。
'database': 'MongoDBTest', // 操作が実行されたデータベースの名前。
'id': 0, // 操作のシリアル番号。
'isDdl': True, // 操作が DDL 操作かどうかを示します。
'table': 'testCollection', // 操作が実行されたコレクションの名前。
'ts': 1694056437510 // デスティネーション Function Compute 関数で操作の実行が開始された時刻。
}]
}コレクションを削除する
SQL 文
db.testCollection.drop()デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}' // コレクション testCollection を削除します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}インデックスを作成する
SQL 文
db.testCollection.createIndex({name:1})デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}' // コレクション testCollection にインデックス name_1 を作成します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}インデックスを削除する
SQL 文
db.testCollection.dropIndex({name:1})デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}' // コレクション testCollection からインデックス name_1 を削除します。
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}DML 操作とデスティネーション関数が受信するデータの例
データを挿入する
SQL 文
// 一度に複数のデータレコードを挿入します。
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 一度に 1 つのデータレコードを挿入します。
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}' // コレクション user にデータレコード {"name":"jack","age":20} を挿入します。
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False, // 操作が DML 操作かどうかを示します。
'table': 'user', // 操作が実行されたコレクションの名前。
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}' // コレクション user にデータレコード {"name":"lili","age":20} を挿入します。
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}データを更新する
SQL 文
db.user.update({"name":"jack"},{$set:{"age":30}}) デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}' // name が jack であるデータレコードの age を 30 に更新します。
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}' // 更新されるデータレコード。
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}データを削除する
SQL 文
db.user.remove({"name":"jack"})デスティネーション関数が受信するデータ
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}' // name が jack であるデータレコードを削除します。
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}