Alibaba Cloud が開発したオープンソースツールである MongoShake を使用して、MongoDB データベース間でデータを同期できます。この機能は、データ分析、ディザスタリカバリ、およびアクティブ/アクティブシナリオに適しています。このトピックでは、ApsaraDB for MongoDB インスタンス間のリアルタイムデータ同期を例として、設定手順について説明します。
MongoShake の概要
MongoShake は、Alibaba Cloud が Go 言語で開発した汎用 Platform as a Service (PaaS) ツールです。MongoDB データベースから操作ログ (oplog) を読み取り、さまざまな目的でデータを複製します。
MongoShake は、ログデータをサブスクライブして消費する機能も提供します。SDK、Kafka、MetaQ などのさまざまなメソッドを使用して接続できます。これにより、ログのサブスクリプション、データセンターの同期、非同期キャッシュのエビクションなどのシナリオに適しています。
MongoShake の詳細については、「GitHub の MongoShake ホームページ」をご参照ください。
サポートされているデータソース
ソースデータベース | ターゲットデータベース |
ECS インスタンス上の自己管理 MongoDB データベース | ECS インスタンス上の自己管理 MongoDB データベース |
オンプレミスの自己管理 MongoDB データベース | オンプレミスの自己管理 MongoDB データベース |
ApsaraDB for MongoDB インスタンス | ApsaraDB for MongoDB インスタンス |
サードパーティクラウドの MongoDB データベース | サードパーティクラウドの MongoDB データベース |
使用上の注意
完全なデータ同期が完了する前に、ソースデータベースでデータ定義言語 (DDL) 操作を実行しないでください。実行すると、データの不整合が発生する可能性があります。
ローカルデータベースは同期できません。admin データベースは同期できます。詳細については、「admin データベースから非 admin データベースへのビジネスデータの移行」をご参照ください。
データベースユーザーに必要な権限
データソース | 必要な権限 |
ソース MongoDB インスタンス | readAnyDatabase 権限、ローカルデータベースに対する読み取り権限、および mongoshake データベースに対する readWrite 権限。 説明 mongoshake データベースは、増分同期が開始されると、MongoShake プログラムによってソースインスタンスに自動的に作成されます。 |
宛先 MongoDB インスタンス | readWriteAnyDatabase 権限、またはターゲットデータベースに対する readWrite 権限。 |
MongoDB データベースユーザーの作成と権限付与の詳細については、「DMS を使用した MongoDB データベースユーザーの管理」または「db.createUser コマンド」をご参照ください。
準備
最適な同期パフォーマンスを確保するために、ソース ApsaraDB for MongoDB レプリカセットインスタンスが Virtual Private Cloud (VPC) を使用していることを確認してください。インスタンスがクラシックネットワークを使用している場合は、ネットワークタイプを VPC に切り替えます。詳細については、「インスタンスのネットワークタイプをクラシックネットワークから VPC に切り替える」をご参照ください。
同期先として ApsaraDB for MongoDB レプリカセットインスタンスを作成します。インスタンスを作成するときは、ソース ApsaraDB for MongoDB レプリカセットインスタンスと同じ VPC を選択して、ネットワーク遅延を最小限に抑えます。詳細については、「レプリカセットインスタンスの作成」をご参照ください。
MongoShake を実行するための ECS インスタンスを作成します。インスタンスを作成するときは、ソース ApsaraDB for MongoDB インスタンスと同じ VPC を選択して、ネットワーク遅延を最小限に抑えます。詳細については、「ECS インスタンスの作成」をご参照ください。
ECS インスタンスのプライベート IP アドレスをソースおよび宛先 MongoDB インスタンスのホワイトリストに追加します。ECS インスタンスがソースおよび宛先 MongoDB インスタンスに接続できることを確認してください。詳細については、「ホワイトリストの変更」をご参照ください。
ネットワークタイプが前述の要件を満たしていない場合は、ソースおよび宛先 MongoDB インスタンスのパブリックエンドポイントを申請できます。次に、ECS インスタンスのパブリック IP アドレスをソースインスタンスと宛先インスタンスのホワイトリストに追加します。これにより、インターネット経由でデータを同期できます。詳細については、「パブリックエンドポイントの申請」および「ホワイトリストの変更」をご参照ください。
手順
この例では、MongoShake はデフォルトで /test/mongoshake ディレクトリにインストールされます。
ECS サーバーにログインします。
説明ビジネスシナリオに基づいてログイン方法を選択します。詳細については、「ECS サーバーのログイン方法の概要」をご参照ください。
次のコマンドを実行して MongoShake プログラムをダウンロードし、
mongoshake.tar.gzに名前を変更します。wget "http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz" -O mongoshake.tar.gz説明このトピックでは、MongoShake 2.4.16 をダウンロードするためのリンクを提供します。MongoShake の最新バージョンをダウンロードするには、「リリースページ」をご参照ください。
次のコマンドを実行して、MongoShake パッケージを /test/mongoshake ディレクトリに解凍します。
tar zxvf mongoshake.tar.gz && mv mongo-shake-v2.4.16 /test/mongoshake && cd /test/mongoshake/mongo-shake-v2.4.16vi collector.confコマンドを実行して、MongoShake の collector.conf 設定ファイルを変更します。次の表に、主要なパラメーターを示します。パラメーター
説明
例
mongo_urls
ソース MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。
説明VPC エンドポイントを使用して相互接続し、ネットワーク遅延を最小限に抑えます。
接続文字列 URI 形式の詳細については、「レプリカセットインスタンスの接続の説明」をご参照ください。
mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717説明パスワードにアットマーク (@) を含めることはできません。含めると、接続は失敗します。
tunnel.address
宛先 MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。
説明VPC エンドポイントを使用して相互接続し、ネットワーク遅延を最小限に抑えます。
接続文字列 URI 形式の詳細については、「レプリカセットインスタンスの接続の説明」をご参照ください。
tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717説明パスワードにアットマーク (@) を含めることはできません。含めると、接続は失敗します。
sync_mode
データ同期メソッド。有効な値:
all: 完全データ同期と増分データ同期の両方を実行します。
full: 完全データ同期のみを実行します。
incr: 増分データ同期のみを実行します。
説明デフォルト値は incr です。
sync_mode = all説明collector.conf ファイルのパラメーターの完全なリストについては、「付録」をご参照ください。
次のコマンドを実行して同期タスクを開始し、ログ情報を出力します。
./collector.linux -conf=collector.conf -verboseログ情報を確認します。次のログエントリが表示されると、完全なデータ同期が完了し、増分データ同期が開始されます。
[09:38:57 CST 2019/06/20] [INFO] (mongoshake/collector.(*ReplicationCoordinator).Run:80) finish full sync, start incr sync with timestamp: fullBeginTs[1560994443], fullFinishTs[1560994737]
MongoShake のステータスを監視する
増分データ同期が開始されたら、別のコマンドラインウィンドウを開き、次のコマンドを実行して MongoShake を監視します。
cd /test/mongoshake && ./mongoshake-stat --port=9100mongoshake-stat は Python スクリプトです。スクリプトを実行する前に、Python 2.7 をインストールしてください。詳細については、「Python 公式ウェブサイト」をご参照ください。
監視出力のサンプル:
パラメーターの説明:
パラメーター | 説明 |
logs_get/sec | 1 秒あたりに取得される oplog の数。 |
logs_repl/sec | 1 秒あたりの oplog 再生操作の数。 |
logs_success/sec | 1 秒あたりの成功した oplog 再生操作の数。 |
lsn.time | 最後の oplog が送信された時刻。 |
lsn_ack.time | 宛先が書き込み操作を確認した時刻。 |
lsn_ckpt.time | チェックポイントが永続化された時刻。 |
now.time | 現在の時刻。 |
replset | ソースデータベースレプリカセットの名前。 |
admin データベースから非 admin データベースへのビジネスデータの移行
MongoDB では、admin データベースにビジネスデータを保存することは推奨されていません。これは、ロック動作や内部コマンドとの競合により、インスタンスのパフォーマンスが低下する可能性があるためです。
MongoShake は、admin データベースから非 admin データベースへのビジネスデータの同期をサポートしています。
手順セクションの手順に従います。ステップ 4 で collector.conf ファイルを変更するときに、次の設定項目を追加します:
filter.pass.special.db = admin
# admin データベースから newDB にすべてのビジネスコレクションを移行します。
transform.namespace = admin:newDB
# または、admin データベースの abc コレクションをターゲットデータベースの def コレクションに移行します。複数のルールを設定できます。
transform.namespace = admin.abc:target.def付録
表 1. collector.conf パラメーター
カテゴリ | パラメーター | 説明 | 例 |
なし | conf.version | 現在の設定ファイルのバージョン番号。この値は変更しないでください。 |
|
グローバル設定オプション | id | 同期タスクの ID。この値はカスタマイズできます。ログ名、再開可能な転送のためのチェックポイント情報を格納するデータベースの名前、および宛先データベースの名前に使用されます。 |
|
master_quorum | 高可用性オプション。プライマリおよびスタンバイの MongoShake ノードが同じソースからデータを同期する場合、プライマリ MongoShake ノードに対してこのパラメーターを 有効な値:
説明 デフォルト値は false です。 |
| |
full_sync.http_port | HTTP ポート。このポートを開くと、インターネットから完全同期の現在のステータスを表示できます。 説明 デフォルト値は 9101 です。 |
| |
incr_sync.http_port | HTTP ポート。このポートを開くと、インターネットから増分同期の現在のステータスを表示できます。 説明 デフォルト値は 9100 です。 |
| |
system_profile_port | プロファイリングポート。内部スタック情報を表示するために使用されます。 |
| |
log.level | ログレベル。有効な値:
デフォルト値: info。 |
| |
log.dir | ログファイルと PID ファイルのディレクトリ。このパラメーターが設定されていない場合、デフォルトで現在のパスの logs ディレクトリが使用されます。 説明 このパラメーターのパスは絶対パスである必要があります。 |
| |
log.file | ログファイルの名前。この値はカスタマイズできます。 説明 デフォルト値は collector.log です。 |
| |
log.flush | 画面上のログの更新頻度。有効な値:
説明 デフォルト値は false です。 |
| |
sync_mode | データ同期メソッド。有効な値:
説明 デフォルト値は incr です。 |
| |
mongo_urls | ソース MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。 説明
|
| |
mongo_cs_url | ソースがシャードクラスターインスタンスの場合、Configserver (CS) ノードのエンドポイントを入力する必要があります。Configserver ノードのエンドポイントを申請するには、「シャードのエンドポイントの申請」をご参照ください。 この例では、データベースアカウントは test で、admin データベースに属しています。 |
| |
mongo_s_url | ソースがシャードクラスターインスタンスの場合、少なくとも 1 つの Mongos ノードのエンドポイントを入力する必要があります。複数の Mongos アドレスはカンマ (,) で区切ります。Mongos ノードのエンドポイントを申請するには、「シャードのエンドポイントの申請」をご参照ください。 この例では、データベースアカウントは test で、admin データベースに属しています。 |
| |
tunnel | 同期用のチャネルのタイプ。有効な値:
説明 デフォルト値は direct です。 |
| |
tunnel.address | 宛先のエンドポイント。次のアドレスがサポートされています:
この例では、データベースアカウントは test で、admin データベースに属しています。 |
| |
tunnel.message | チャネル内のデータのタイプ。このパラメーターは、tunnel パラメーターが
説明 デフォルト値は raw です。 |
| |
mongo_connect_mode | MongoDB インスタンスの接続モード。このパラメーターは、tunnel パラメーターが
説明 デフォルト値は secondaryPreferred です。 |
| |
filter.namespace.black | データ同期のブラックリストを指定します。指定された名前空間は、ターゲットデータベースに同期されません。複数の名前空間はセミコロン (;) で区切ります。 説明 名前空間は、MongoDB のコレクションまたはインデックスの正規名です。これは、データベース名とコレクション名またはインデックス名の組み合わせです。例: |
| |
filter.namespace.white | データ同期のホワイトリストを指定します。指定された名前空間のみがターゲットデータベースに同期されます。複数の名前空間はセミコロン (;) で区切ります。 |
| |
filter.pass.special.db | 特別なデータベースの同期を有効にします。通常の同期中、admin、local、mongoshake、config、system.views などのデータベースはシステムによって除外されます。特別な要件のためにこれらのデータベースの同期を有効にすることができます。複数のデータベース名はセミコロン (;) で区切ります。 |
| |
filter.ddl_enable | DDL 同期を有効にするかどうかを指定します。有効な値:
説明 この機能は、ソースが MongoDB シャードクラスターインスタンスの場合はサポートされていません。 |
| |
checkpoint.storage.url | 再開可能な転送をサポートするために、チェックポイントストレージの場所を設定します。これが設定されていない場合、プログラムはインスタンスタイプに基づいて次のデータベースに書き込みます:
この例では、データベースアカウントは test で、admin データベースに属しています。 |
| |
checkpoint.storage.db | チェックポイントを格納するデータベースの名前。 説明 デフォルト値は mongoshake です。 |
| |
checkpoint.storage.collection | チェックポイントを格納するコレクションの名前。プライマリおよびスタンバイの MongoShake ノードが同じソースからデータを同期するように有効にする場合、重複名による競合を防ぐためにこのテーブル名を変更できます。 説明 デフォルト値は ckpt_default です。 |
| |
checkpoint.start_position | 再開可能な転送の開始位置。チェックポイントが既に存在する場合、このパラメーターは無効です。値の形式は 説明 デフォルト値は 1970-01-01T00:00:00Z です。 |
| |
transform.namespace | ソースデータベースまたはコレクションの名前を変更し、それをターゲットデータベースに同期します。たとえば、ソースデータベースの |
| |
完全データ同期オプション | full_sync.reader.collection_parallel | MongoShake が一度に同時にプルできるコレクションの最大数を設定します。 |
|
full_sync.reader.write_document_parallel | MongoShake が単一のコレクションに書き込むための同時スレッド数を設定します。 |
| |
full_sync.reader.document_batch_size | 宛先へのドキュメントの単一書き込みのバッチサイズを設定します。たとえば、128 は、書き込まれる前に 128 個のドキュメントが集約されることを意味します。 |
| |
full_sync.collection_exist_drop | ソースコレクションと同じ名前を持つ宛先データベース内のコレクションの処理方法を設定します。有効な値:
|
| |
full_sync.create_index | 同期完了後にインデックスを作成するかどうかを指定します。有効な値:
|
| |
full_sync.executor.insert_on_dup_update | ターゲットデータベースに重複する
|
| |
full_sync.executor.filter.orphan_document | ソースがシャードクラスターインスタンスの場合に孤立ドキュメントをフィルターするかどうかを指定します。有効な値:
|
| |
full_sync.executor.majority_enable | 宛先でマジョリティ書き込み機能を有効にするかどうかを指定します。有効な値:
|
| |
増分データ同期オプション | incr_sync.mongo_fetch_method | 増分データをプルするためのメソッドを設定します。有効な値:
デフォルト値: oplog |
|
incr_sync.oplog.gids | クラウドクラスターの双方向レプリケーションを設定するために使用されます。 |
| |
incr_sync.shard_key | MongoShake が内部で同時実行性を処理する方法。このパラメーターは変更しないでください。 |
| |
incr_sync.worker | oplog を転送するための同時スレッド数。ホストのパフォーマンスが十分な場合は、スレッド数を増やすことができます。 説明 ソースがシャードクラスターインスタンスの場合、スレッド数はシャードの数と等しくなければなりません。 |
| |
incr_sync.worker.oplog_compressor | データ圧縮を有効にして、ネットワーク帯域幅の消費を削減します。有効な値:
説明 このパラメーターは、tunnel パラメーターが |
| |
incr_sync.target_delay | ソースと宛先の間に遅延同期を設定します。ソースの変更は通常、リアルタイムで宛先に同期されます。偶発的な操作を防ぐために、このパラメーターを設定して同期を遅延させることができます。たとえば、 説明 値 0 は、遅延同期が無効であることを示します。 |
| |
incr_sync.worker.batch_queue_size | MongoShake の内部キューの設定パラメーター。必要がない限り変更しないでください。 |
| |
incr_sync.adaptive.batching_max_size |
| ||
incr_sync.fetcher.buffer_capacity |
| ||
MongoDB 同期オプション ( | incr_sync.executor.upsert |
|
|
incr_sync.executor.insert_on_dup_update |
|
| |
incr_sync.conflict_write_to | 同期中に書き込み競合が発生した場合に、競合するドキュメントを記録するかどうかを指定します。有効な値:
|
| |
incr_sync.executor.majority_enable | 宛先でマジョリティ書き込みを有効にするかどうかを指定します。有効な値:
説明 この機能を有効にすると、パフォーマンスに影響します。 |
|
よくある質問
まず「MongoShake よくある質問」をご参照ください。MongoShake の使用中に他の問題が発生した場合は、「GitHub Issues」で直接フィードバックを提供してください。