すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for MongoDB:MongoShake を使用して MongoDB インスタンス間で一方向同期を実行する

最終更新日:Nov 09, 2025

Alibaba Cloud が開発したオープンソースツールである MongoShake を使用して、MongoDB データベース間でデータを同期できます。この機能は、データ分析、ディザスタリカバリ、およびアクティブ/アクティブシナリオに適しています。このトピックでは、ApsaraDB for MongoDB インスタンス間のリアルタイムデータ同期を例として、設定手順について説明します。

MongoShake の概要

MongoShake は、Alibaba Cloud が Go 言語で開発した汎用 Platform as a Service (PaaS) ツールです。MongoDB データベースから操作ログ (oplog) を読み取り、さまざまな目的でデータを複製します。

MongoShake は、ログデータをサブスクライブして消費する機能も提供します。SDK、Kafka、MetaQ などのさまざまなメソッドを使用して接続できます。これにより、ログのサブスクリプション、データセンターの同期、非同期キャッシュのエビクションなどのシナリオに適しています。

説明

MongoShake の詳細については、「GitHub の MongoShake ホームページ」をご参照ください。

サポートされているデータソース

ソースデータベース

ターゲットデータベース

ECS インスタンス上の自己管理 MongoDB データベース

ECS インスタンス上の自己管理 MongoDB データベース

オンプレミスの自己管理 MongoDB データベース

オンプレミスの自己管理 MongoDB データベース

ApsaraDB for MongoDB インスタンス

ApsaraDB for MongoDB インスタンス

サードパーティクラウドの MongoDB データベース

サードパーティクラウドの MongoDB データベース

使用上の注意

  • 完全なデータ同期が完了する前に、ソースデータベースでデータ定義言語 (DDL) 操作を実行しないでください。実行すると、データの不整合が発生する可能性があります。

  • ローカルデータベースは同期できません。admin データベースは同期できます。詳細については、「admin データベースから非 admin データベースへのビジネスデータの移行」をご参照ください。

データベースユーザーに必要な権限

データソース

必要な権限

ソース MongoDB インスタンス

readAnyDatabase 権限、ローカルデータベースに対する読み取り権限、および mongoshake データベースに対する readWrite 権限。

説明

mongoshake データベースは、増分同期が開始されると、MongoShake プログラムによってソースインスタンスに自動的に作成されます。

宛先 MongoDB インスタンス

readWriteAnyDatabase 権限、またはターゲットデータベースに対する readWrite 権限。

説明

MongoDB データベースユーザーの作成と権限付与の詳細については、「DMS を使用した MongoDB データベースユーザーの管理」または「db.createUser コマンド」をご参照ください。

準備

  1. 最適な同期パフォーマンスを確保するために、ソース ApsaraDB for MongoDB レプリカセットインスタンスが Virtual Private Cloud (VPC) を使用していることを確認してください。インスタンスがクラシックネットワークを使用している場合は、ネットワークタイプを VPC に切り替えます。詳細については、「インスタンスのネットワークタイプをクラシックネットワークから VPC に切り替える」をご参照ください。

  2. 同期先として ApsaraDB for MongoDB レプリカセットインスタンスを作成します。インスタンスを作成するときは、ソース ApsaraDB for MongoDB レプリカセットインスタンスと同じ VPC を選択して、ネットワーク遅延を最小限に抑えます。詳細については、「レプリカセットインスタンスの作成」をご参照ください。

  3. MongoShake を実行するための ECS インスタンスを作成します。インスタンスを作成するときは、ソース ApsaraDB for MongoDB インスタンスと同じ VPC を選択して、ネットワーク遅延を最小限に抑えます。詳細については、「ECS インスタンスの作成」をご参照ください。

  4. ECS インスタンスのプライベート IP アドレスをソースおよび宛先 MongoDB インスタンスのホワイトリストに追加します。ECS インスタンスがソースおよび宛先 MongoDB インスタンスに接続できることを確認してください。詳細については、「ホワイトリストの変更」をご参照ください。

説明

ネットワークタイプが前述の要件を満たしていない場合は、ソースおよび宛先 MongoDB インスタンスのパブリックエンドポイントを申請できます。次に、ECS インスタンスのパブリック IP アドレスをソースインスタンスと宛先インスタンスのホワイトリストに追加します。これにより、インターネット経由でデータを同期できます。詳細については、「パブリックエンドポイントの申請」および「ホワイトリストの変更」をご参照ください。

手順

この例では、MongoShake はデフォルトで /test/mongoshake ディレクトリにインストールされます。

  1. ECS サーバーにログインします。

    説明

    ビジネスシナリオに基づいてログイン方法を選択します。詳細については、「ECS サーバーのログイン方法の概要」をご参照ください。

  2. 次のコマンドを実行して MongoShake プログラムをダウンロードし、mongoshake.tar.gz に名前を変更します。

    wget "http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz" -O mongoshake.tar.gz
    説明

    このトピックでは、MongoShake 2.4.16 をダウンロードするためのリンクを提供します。MongoShake の最新バージョンをダウンロードするには、「リリースページ」をご参照ください。

  3. 次のコマンドを実行して、MongoShake パッケージを /test/mongoshake ディレクトリに解凍します。

    tar zxvf mongoshake.tar.gz && mv mongo-shake-v2.4.16 /test/mongoshake && cd /test/mongoshake/mongo-shake-v2.4.16
  4. vi collector.conf コマンドを実行して、MongoShake の collector.conf 設定ファイルを変更します。次の表に、主要なパラメーターを示します。

    パラメーター

    説明

    mongo_urls

    ソース MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。

    説明

    mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

    説明

    パスワードにアットマーク (@) を含めることはできません。含めると、接続は失敗します。

    tunnel.address

    宛先 MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。

    説明

    tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

    説明

    パスワードにアットマーク (@) を含めることはできません。含めると、接続は失敗します。

    sync_mode

    データ同期メソッド。有効な値:

    • all: 完全データ同期と増分データ同期の両方を実行します。

    • full: 完全データ同期のみを実行します。

    • incr: 増分データ同期のみを実行します。

    説明

    デフォルト値は incr です。

    sync_mode = all

    説明

    collector.conf ファイルのパラメーターの完全なリストについては、「付録」をご参照ください。

  5. 次のコマンドを実行して同期タスクを開始し、ログ情報を出力します。

    ./collector.linux -conf=collector.conf -verbose
  6. ログ情報を確認します。次のログエントリが表示されると、完全なデータ同期が完了し、増分データ同期が開始されます。

    [09:38:57 CST 2019/06/20] [INFO] (mongoshake/collector.(*ReplicationCoordinator).Run:80) finish full sync, start incr sync with timestamp: fullBeginTs[1560994443], fullFinishTs[1560994737]

MongoShake のステータスを監視する

増分データ同期が開始されたら、別のコマンドラインウィンドウを開き、次のコマンドを実行して MongoShake を監視します。

cd /test/mongoshake && ./mongoshake-stat --port=9100
説明

mongoshake-stat は Python スクリプトです。スクリプトを実行する前に、Python 2.7 をインストールしてください。詳細については、「Python 公式ウェブサイト」をご参照ください。

監視出力のサンプル:监控结果

パラメーターの説明:

パラメーター

説明

logs_get/sec

1 秒あたりに取得される oplog の数。

logs_repl/sec

1 秒あたりの oplog 再生操作の数。

logs_success/sec

1 秒あたりの成功した oplog 再生操作の数。

lsn.time

最後の oplog が送信された時刻。

lsn_ack.time

宛先が書き込み操作を確認した時刻。

lsn_ckpt.time

チェックポイントが永続化された時刻。

now.time

現在の時刻。

replset

ソースデータベースレプリカセットの名前。

admin データベースから非 admin データベースへのビジネスデータの移行

MongoDB では、admin データベースにビジネスデータを保存することは推奨されていません。これは、ロック動作や内部コマンドとの競合により、インスタンスのパフォーマンスが低下する可能性があるためです。

MongoShake は、admin データベースから非 admin データベースへのビジネスデータの同期をサポートしています。

手順セクションの手順に従います。ステップ 4 で collector.conf ファイルを変更するときに、次の設定項目を追加します:

filter.pass.special.db = admin

# admin データベースから newDB にすべてのビジネスコレクションを移行します。
transform.namespace = admin:newDB
# または、admin データベースの abc コレクションをターゲットデータベースの def コレクションに移行します。複数のルールを設定できます。
transform.namespace = admin.abc:target.def

付録

表 1. collector.conf パラメーター

カテゴリ

パラメーター

説明

なし

conf.version

現在の設定ファイルのバージョン番号。この値は変更しないでください。

conf.version = 4

グローバル設定オプション

id

同期タスクの ID。この値はカスタマイズできます。ログ名、再開可能な転送のためのチェックポイント情報を格納するデータベースの名前、および宛先データベースの名前に使用されます。

id = mongoshake

master_quorum

高可用性オプション。プライマリおよびスタンバイの MongoShake ノードが同じソースからデータを同期する場合、プライマリ MongoShake ノードに対してこのパラメーターを true に設定します。

有効な値:

  • true: 有効

  • false: 無効

説明

デフォルト値は false です。

master_quorum = false

full_sync.http_port

HTTP ポート。このポートを開くと、インターネットから完全同期の現在のステータスを表示できます。

説明

デフォルト値は 9101 です。

full_sync.http_port = 9101

incr_sync.http_port

HTTP ポート。このポートを開くと、インターネットから増分同期の現在のステータスを表示できます。

説明

デフォルト値は 9100 です。

incr_sync.http_port = 9100

system_profile_port

プロファイリングポート。内部スタック情報を表示するために使用されます。

system_profile_port = 9200

log.level

ログレベル。有効な値:

  • error: エラーレベルの情報を含むログ。

  • warning: 警告レベルの情報を含むログ。

  • info: 現在のシステムステータスを反映するログ。

  • debug: デバッグ情報を含むログ。

デフォルト値: info。

log.level = info

log.dir

ログファイルと PID ファイルのディレクトリ。このパラメーターが設定されていない場合、デフォルトで現在のパスの logs ディレクトリが使用されます。

説明

このパラメーターのパスは絶対パスである必要があります。

log.dir = ./logs/

log.file

ログファイルの名前。この値はカスタマイズできます。

説明

デフォルト値は collector.log です。

log.file = collector.log

log.flush

画面上のログの更新頻度。有効な値:

  • true: すべてのログエントリを出力します。これはパフォーマンスに影響します。

  • false: すべてのログが出力されることを保証しませんが、パフォーマンスは保証されます。

説明

デフォルト値は false です。

log.flush = false

sync_mode

データ同期メソッド。有効な値:

  • all: 完全データ同期と増分データ同期の両方を実行します。

  • full: 完全データ同期のみを実行します。

  • incr: 増分データ同期のみを実行します。

説明

デフォルト値は incr です。

sync_mode = all

mongo_urls

ソース MongoDB インスタンスの接続文字列 URI。この例では、データベースアカウントは test で、admin データベースに属しています。

説明

mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

mongo_cs_url

ソースがシャードクラスターインスタンスの場合、Configserver (CS) ノードのエンドポイントを入力する必要があります。Configserver ノードのエンドポイントを申請するには、「シャードのエンドポイントの申請」をご参照ください。

この例では、データベースアカウントは test で、admin データベースに属しています。

mongo_cs_url = mongodb://test:****@dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717/admin

mongo_s_url

ソースがシャードクラスターインスタンスの場合、少なくとも 1 つの Mongos ノードのエンドポイントを入力する必要があります。複数の Mongos アドレスはカンマ (,) で区切ります。Mongos ノードのエンドポイントを申請するには、「シャードのエンドポイントの申請」をご参照ください。

この例では、データベースアカウントは test で、admin データベースに属しています。

mongos_s_url = mongodb://test:****@s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717/admin

tunnel

同期用のチャネルのタイプ。有効な値:

  • direct: 宛先 MongoDB インスタンスに直接データを同期します。

  • rpc: NET/RPC を使用してデータを同期します。

  • tcp: TCP を使用してデータを同期します。

  • file: ファイル転送を使用してデータを同期します。

  • kafka: Kafka を使用してデータを同期します。

  • mock: テスト専用です。データはチャネルに書き込まれません。

説明

デフォルト値は direct です。

tunnel = direct

tunnel.address

宛先のエンドポイント。次のアドレスがサポートされています:

  • tunnel パラメーターを direct に設定した場合、宛先 MongoDB インスタンスの接続文字列 URI を入力します。

  • tunnel パラメーターを rpc に設定した場合、宛先インスタンスの RPC 受信側アドレスを入力します。

  • tunnel パラメーターを tcp に設定した場合、宛先インスタンスの TCP 受信側アドレスを入力します。

  • tunnel パラメーターを file に設定した場合、宛先インスタンスのデータのファイルパスを入力します。

  • tunnel パラメーターを kafka に設定した場合、Kafka アドレスを入力します。例: topic@brokers1,brokers2

  • tunnel パラメーターを mock に設定した場合、このパラメーターは空のままにします。

この例では、データベースアカウントは test で、admin データベースに属しています。

tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

tunnel.message

チャネル内のデータのタイプ。このパラメーターは、tunnel パラメーターが kafka または file に設定されている場合にのみ有効です。有効な値:

  • raw: デフォルトのタイプ。データは集約モードで書き込みおよび読み取りされます。

  • json: データを JSON 形式で Kafka に書き込みます。これにより、ユーザーは直接読み取ることができます。

  • bson: データを BSON バイナリ形式で Kafka に書き込みます。

説明

デフォルト値は raw です。

tunnel.message = raw

mongo_connect_mode

MongoDB インスタンスの接続モード。このパラメーターは、tunnel パラメーターが direct に設定されている場合にのみ有効です。有効な値:

  • primary: プライマリノードからデータをプルします。

  • secondaryPreferred: セカンダリノードからデータをプルします。

  • standalone: 指定された単一ノードからデータをプルします。

説明

デフォルト値は secondaryPreferred です。

mongo_connect_mode = secondaryPreferred

filter.namespace.black

データ同期のブラックリストを指定します。指定された名前空間は、ターゲットデータベースに同期されません。複数の名前空間はセミコロン (;) で区切ります。

説明

名前空間は、MongoDB のコレクションまたはインデックスの正規名です。これは、データベース名とコレクション名またはインデックス名の組み合わせです。例: mongodbtest.customer

filter.namespace.black = mongodbtest.customer;testdata.test123

filter.namespace.white

データ同期のホワイトリストを指定します。指定された名前空間のみがターゲットデータベースに同期されます。複数の名前空間はセミコロン (;) で区切ります。

filter.namespace.white = mongodbtest.customer;test123

filter.pass.special.db

特別なデータベースの同期を有効にします。通常の同期中、admin、local、mongoshake、config、system.views などのデータベースはシステムによって除外されます。特別な要件のためにこれらのデータベースの同期を有効にすることができます。複数のデータベース名はセミコロン (;) で区切ります。

filter.pass.special.db = admin;mongoshake

filter.ddl_enable

DDL 同期を有効にするかどうかを指定します。有効な値:

  • true: 有効

  • false: 無効

説明

この機能は、ソースが MongoDB シャードクラスターインスタンスの場合はサポートされていません。

filter.ddl_enable = false

checkpoint.storage.url

再開可能な転送をサポートするために、チェックポイントストレージの場所を設定します。これが設定されていない場合、プログラムはインスタンスタイプに基づいて次のデータベースに書き込みます:

  • MongoDB レプリカセットインスタンス: mongoshake データベースに書き込みます。

  • MongoDB シャードクラスターインスタンス: Configserver ノードの admin データベースに書き込みます。

この例では、データベースアカウントは test で、admin データベースに属しています。

checkpoint.storage.url = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

checkpoint.storage.db

チェックポイントを格納するデータベースの名前。

説明

デフォルト値は mongoshake です。

checkpoint.storage.db = mongoshake

checkpoint.storage.collection

チェックポイントを格納するコレクションの名前。プライマリおよびスタンバイの MongoShake ノードが同じソースからデータを同期するように有効にする場合、重複名による競合を防ぐためにこのテーブル名を変更できます。

説明

デフォルト値は ckpt_default です。

checkpoint.storage.collection = ckpt_default

checkpoint.start_position

再開可能な転送の開始位置。チェックポイントが既に存在する場合、このパラメーターは無効です。値の形式は YYYY-MM-DDTHH:MM:SSZ です。

説明

デフォルト値は 1970-01-01T00:00:00Z です。

checkpoint.start_position = 1970-01-01T00:00:00Z

transform.namespace

ソースデータベースまたはコレクションの名前を変更し、それをターゲットデータベースに同期します。たとえば、ソースデータベースの Database A.Collection B の名前を Database C.Collection D に変更し、それをターゲットデータベースに同期します。

transform.namespace = fromA.fromB:toC.toD

完全データ同期オプション

full_sync.reader.collection_parallel

MongoShake が一度に同時にプルできるコレクションの最大数を設定します。

full_sync.reader.collection_parallel = 6

full_sync.reader.write_document_parallel

MongoShake が単一のコレクションに書き込むための同時スレッド数を設定します。

full_sync.reader.write_document_parallel = 8

full_sync.reader.document_batch_size

宛先へのドキュメントの単一書き込みのバッチサイズを設定します。たとえば、128 は、書き込まれる前に 128 個のドキュメントが集約されることを意味します。

full_sync.reader.document_batch_size = 128

full_sync.collection_exist_drop

ソースコレクションと同じ名前を持つ宛先データベース内のコレクションの処理方法を設定します。有効な値:

  • true: 同じ名前の宛先コレクションを削除してから同期します。

    警告

    この操作は宛先のコレクションを削除します。事前にデータをバックアップしてください。

  • false: 宛先データベースで同じ名前のコレクションが検出された場合、エラーを報告して終了します。

full_sync.collection_exist_drop = true

full_sync.create_index

同期完了後にインデックスを作成するかどうかを指定します。有効な値:

  • foreground: 前景インデックスを作成します。

  • background: バックグラウンドインデックスを作成します。

  • none: インデックスを作成しません。

full_sync.create_index = none

full_sync.executor.insert_on_dup_update

ターゲットデータベースに重複する _id フィールドが存在する場合に、INSERT 文を UPDATE 文に変更するかどうかを指定します。有効な値:

  • true: 変更

  • false: 変更しない

full_sync.executor.insert_on_dup_update = false

full_sync.executor.filter.orphan_document

ソースがシャードクラスターインスタンスの場合に孤立ドキュメントをフィルターするかどうかを指定します。有効な値:

  • true: フィルターする

  • false: フィルターしない

full_sync.executor.filter.orphan_document = false

full_sync.executor.majority_enable

宛先でマジョリティ書き込み機能を有効にするかどうかを指定します。有効な値:

  • true: 有効

  • false: 無効

full_sync.executor.majority_enable = false

増分データ同期オプション

incr_sync.mongo_fetch_method

増分データをプルするためのメソッドを設定します。有効な値:

  • oplog: ソースデータベースから oplog をプルします。

  • change_stream: ソースデータベースから変更イベントをプルします。このメソッドは MongoDB 4.0 以降でのみサポートされます。

デフォルト値: oplog

incr_sync.mongo_fetch_method = oplog

incr_sync.oplog.gids

クラウドクラスターの双方向レプリケーションを設定するために使用されます。

incr_sync.oplog.gids = xxxxxxxxxxxx

incr_sync.shard_key

MongoShake が内部で同時実行性を処理する方法。このパラメーターは変更しないでください。

incr_sync.shard_key = collection

incr_sync.worker

oplog を転送するための同時スレッド数。ホストのパフォーマンスが十分な場合は、スレッド数を増やすことができます。

説明

ソースがシャードクラスターインスタンスの場合、スレッド数はシャードの数と等しくなければなりません。

incr_sync.worker = 8

incr_sync.worker.oplog_compressor

データ圧縮を有効にして、ネットワーク帯域幅の消費を削減します。有効な値:

  • none: 圧縮なし

  • gzip: gzip 形式で圧縮

  • zlib: zlib 形式で圧縮

  • deflate: deflate 形式で圧縮

説明

このパラメーターは、tunnel パラメーターが direct に設定されていない場合にのみ使用できます。tunnel が direct に設定されている場合、このパラメーターを none に設定します。

incr_sync.worker.oplog_compressor = none

incr_sync.target_delay

ソースと宛先の間に遅延同期を設定します。ソースの変更は通常、リアルタイムで宛先に同期されます。偶発的な操作を防ぐために、このパラメーターを設定して同期を遅延させることができます。たとえば、incr_sync.target_delay = 1800 は 30 分の遅延を設定します。単位は秒です。

説明

値 0 は、遅延同期が無効であることを示します。

incr_sync.target_delay = 1800

incr_sync.worker.batch_queue_size

MongoShake の内部キューの設定パラメーター。必要がない限り変更しないでください。

incr_sync.worker.batch_queue_size = 64

incr_sync.adaptive.batching_max_size

incr_sync.adaptive.batching_max_size = 1024

incr_sync.fetcher.buffer_capacity

incr_sync.fetcher.buffer_capacity = 256

MongoDB 同期オプション (direct モードのみ)

incr_sync.executor.upsert

UPDATE (重複フィールド) または一意なインデックスが存在しない場合に INSERT 文を _id 文に変更するかどうかを指定します。有効な値:

  • true: 変更

  • false: 変更しない

incr_sync.executor.upsert = false

incr_sync.executor.insert_on_dup_update

INSERT (重複フィールド) または一意なインデックスが存在しない場合に UPDATE 文を _id 文に変更するかどうかを指定します。有効な値:

  • true: 変更

  • false: 変更しない

incr_sync.executor.insert_on_dup_update = false

incr_sync.conflict_write_to

同期中に書き込み競合が発生した場合に、競合するドキュメントを記録するかどうかを指定します。有効な値:

  • none: 記録しない

  • db: 競合ログを mongoshake_conflict に書き込みます

  • sdk: 競合ログを SDK に書き込みます

incr_sync.conflict_write_to = none

incr_sync.executor.majority_enable

宛先でマジョリティ書き込みを有効にするかどうかを指定します。有効な値:

  • true: 有効

  • false: 無効

説明

この機能を有効にすると、パフォーマンスに影響します。

incr_sync.executor.majority_enable = false

よくある質問

まず「MongoShake よくある質問」をご参照ください。MongoShake の使用中に他の問題が発生した場合は、「GitHub Issues」で直接フィードバックを提供してください。