このトピックでは、データ伝送サービスを使用して OceanBase Database から RocketMQ インスタンスにデータを同期する方法について説明します。
背景
Message Queue for Apache RocketMQ は、Alibaba Cloud が Apache RocketMQ に基づいて構築した分散メッセージ指向ミドルウェアです。低レイテンシ、高い同時実行性、そして高い信頼性を備えています。データ伝送サービスのデータ同期機能により、OceanBase Database の物理テーブルと RocketMQ データソースの間のデータをリアルタイムで同期し、メッセージ処理機能を拡張できます。
2 種類のテナントのデータ形式の詳細については、「OceanBase Database からメッセージキューシステムにデータが転送される際に使用されるデータ形式」をご参照ください。
前提条件
データ伝送サービスには、クラウドリソースにアクセスするための権限があります。詳細については、「データ伝送のロールに権限を付与する」をご参照ください。
ソース OceanBase データベースにデータ同期用の専用データベースユーザーを作成し、そのユーザーに対応する権限を付与しています。詳細については、「データベースユーザーを作成する」をご参照ください。
制限事項
データ伝送サービスは、バージョン 4.x および 5.x の RocketMQ インスタンス(商用版とコミュニティ版を含む)をサポートしています。
物理テーブルのみ同期できます。
データ同期中、データ伝送サービスでは、新しいテーブルを作成する前にテーブルを削除できます。つまり、
DROP TABLEを実行してからCREATE TABLEを実行できます。データ伝送サービスでは、テーブルの名前を変更して新しいテーブルを作成することはできません。つまり、RENAME TABLE a TO a_tmpを実行することはできません。同期対象のテーブルの名前、およびテーブル内の列の名前には、漢字を含めることはできません。
データ伝送サービスは、オブジェクトのデータベース名、テーブル名、および列名が特殊文字を含まない ASCII エンコードである場合にのみ、オブジェクトの移行をサポートします。特殊文字は、改行、スペース、および次の文字です。 . | " ' ` ( ) = ; / & \.
データ伝送サービスは、スタンバイ OceanBase データベースをソースとしてサポートしていません。
考慮事項
ソースが V4.0.0 ~ V4.3.x(V4.2.5 BP1 を除く)の OceanBase Database で、増分同期を選択した場合は、生成列に STORED 属性を指定する必要があります。この属性を指定しないと、生成列に関する情報は増分ログに保存されず、増分同期のデータエラーが発生する可能性があります。
更新された行にラージオブジェクト(LOB)列が含まれている場合は、次の点に注意してください。
LOB 列が更新された場合は、
UPDATEまたはDELETE操作の前に LOB 列に格納されていた値を使用しないでください。LOB 列には、JSON、GIS、XML、ユーザー定義型(UDT)、および LONGTEXT や MEDIUMTEXT などの TEXT などのデータ型が格納されます。
LOB 列が更新されていない場合、
UPDATEまたはDELETE操作の前後で LOB 列に格納されている値は NULL です。
タスクのデータ転送が再開されると、RocketMQ インスタンス内の一部のデータ(最後の1分以内)が重複する可能性があります。そのため、ダウンストリームシステムでは重複排除が必要です。
ノード間またはクライアントとサーバー間のクロックが同期していない場合、増分同期中のレイテンシが不正確になる可能性があります。
たとえば、クロックが標準時刻よりも早い場合、レイテンシは負になる可能性があります。クロックが標準時刻よりも遅い場合、レイテンシは正になる可能性があります。
データ同期タスクの作成時に 増分同期 のみを選択した場合、データ伝送サービスでは、ソースデータベースのローカル増分ログが少なくとも 48 時間保持されている必要があります。
データ同期タスクの作成時に フル同期 と 増分同期 を選択した場合、データ伝送サービスでは、ソースデータベースのローカル増分ログが少なくとも 7 日間保持されている必要があります。保持されていない場合、データ伝送サービスが増分ログを取得できないため、データ同期タスクが失敗したり、ソースデータベースとターゲットデータベースのデータに不整合が生じたりする可能性があります。
サポートされているソースインスタンスタイプとターゲットインスタンスタイプ
次の表で、OB_MySQL は OceanBase Database の MySQL テナントを、OB_Oracle は OceanBase Database の Oracle テナントを表します。
ソース | ターゲット |
OB_MySQL (OceanBase クラスタインスタンス) | RocketMQ (Alibaba Cloud 上の RocketMQ インスタンス) |
OB_MySQL (サーバーレスインスタンス) | RocketMQ (Alibaba Cloud 上の RocketMQ インスタンス) |
OB_Oracle (OceanBase クラスタインスタンス) | RocketMQ (Alibaba Cloud 上の RocketMQ インスタンス) |
手順
ApsaraDB for OceanBase コンソール にログインし、データ同期タスクを購入します。
詳細については、「データ同期タスクを購入する」をご参照ください。
[データ伝送] > [データ同期] を選択します。表示されるページで、データ同期タスクの [構成] をクリックします。

既存のタスクの構成を参照する場合は、[構成の参照] をクリックします。詳細については、「データ同期タスクの構成を参照およびクリアする」をご参照ください。
[ソースとターゲットの選択] ページで、パラメーターを構成します。
パラメーター
説明
同期タスク名
数字と文字の組み合わせに設定することをお勧めします。スペースを含めることはできず、長さは 64 文字以下にする必要があります。
ソース
OceanBase データソースを作成済みの場合は、ドロップダウンリストから選択します。作成していない場合は、ドロップダウンリストの [新しいデータソース] をクリックし、右側に表示されるダイアログボックスで作成します。パラメーターの詳細については、「OceanBase データソースを作成する」をご参照ください。
ターゲット
RocketMQ データソースを作成済みの場合は、ドロップダウンリストから選択します。作成していない場合は、ドロップダウンリストの [新しいデータソース] をクリックし、右側に表示されるダイアログボックスで作成します。詳細については、「RocketMQ データソースを作成する」をご参照ください。
タグ(オプション)
ドロップダウンリストからターゲットタグを選択します。[タグの管理] をクリックして、タグを作成、変更、および削除することもできます。詳細については、「タグを使用してデータ同期タスクを管理する」をご参照ください。
[次へ] をクリックします。[同期タイプの選択] ページで、現在のデータ同期タスクの同期タイプを指定します。

サポートされている同期タイプは、[フル同期] と [増分同期] です。[増分同期] は [DML 同期] のみサポートしています。サポートされている DML 操作は、
INSERT、DELETE、およびUPDATEです。必要に応じてオプションを選択できます。詳細については、「DDL/DML 同期を構成する」をご参照ください。[次へ] をクリックします。[同期オブジェクトの選択] ページで、現在のデータ同期タスクで同期するオブジェクトを選択します。
[オブジェクトの指定] オプションまたは [一致ルール] オプションを使用して、同期オブジェクトを指定できます。このトピックでは、[オブジェクトの指定] オプションを使用して同期オブジェクトを指定する方法について説明します。一致ルールの構成方法については、「一致ルールを構成および変更する」トピックの「データベースとメッセージキューインスタンス間のデータ移行/同期のワイルドカードパターン」セクションをご参照ください。
説明[同期タイプの選択] 手順で [DDL 同期] を選択した場合は、[一致ルール] オプションを使用して同期オブジェクトを選択することをお勧めします。これにより、一致ルールを満たすすべての新しいオブジェクトが同期されます。[オブジェクトの指定] オプションを使用して同期オブジェクトを選択した場合、新しいオブジェクトまたは名前が変更されたオブジェクトは同期されません。
OceanBase データベースから RocketMQ インスタンスにデータを同期する場合、複数のテーブルを選択して複数のトピックにマッピングできます。
[同期オブジェクトの選択] セクションで、[オブジェクトの指定] を選択します。
左側のペインで、同期するオブジェクトを選択します。
[>] をクリックします。
[オブジェクトをトピックにマッピング] ダイアログボックスの [既存のトピック] ドロップダウンリストをクリックし、ターゲットトピックを選択します。
既存のトピックの名前を入力し、表示されたら選択することもできます。

[OK] をクリックします。
データ伝送サービスでは、テキストを使用してオブジェクトをインポートできます。また、オブジェクトのトピックの変更、行フィルターの設定、単一またはすべてのオブジェクトの削除も可能です。ターゲットデータベースのオブジェクトは、トピック > データベース > テーブルの構造で一覧表示されます。
説明[一致ルール] を選択して同期オブジェクトを指定する場合、オブジェクトの名前変更は、指定された一致ルールの構文に基づいて実装されます。操作エリアでは、フィルタリング条件を設定し、シャーディング列と同期する列を選択することのみ可能です。詳細については、「一致ルールを構成および変更する」をご参照ください。

操作
説明
オブジェクトのインポート
右側のリストで、右上隅にある オブジェクトのインポート をクリックします。
表示されるダイアログボックスで、[OK] をクリックします。
重要この操作は以前の選択を上書きします。注意して進めてください。
同期オブジェクトのインポート同期オブジェクトの設定をダウンロードしてインポートする ダイアログボックスで、同期するオブジェクトをインポートします。CSV ファイルをインポートして、行フィルター条件、フィルター列、およびシャーディング列を設定できます。詳細については、「」をご参照ください。
検証する をクリックします。
検証に成功したら、[OK] をクリックします。
トピックの変更
データ伝送サービスでは、ターゲットデータベースのオブジェクトのトピックを変更できます。詳細については、「トピックを変更する」をご参照ください。
設定の構成
WHERE句を使用して行ごとにデータをフィルタリングし、シャーディング列と同期する列を選択できます。[設定] ダイアログボックスでは、次の操作を実行できます。
[行フィルター] セクションで、標準 SQL
WHERE句を指定して、行ごとにデータをフィルタリングします。詳細については、「SQL 条件を使用してデータをフィルタリングする」をご参照ください。[シャーディング列] ドロップダウンリストから、使用するシャーディング列を選択します。複数のフィールドをシャーディング列として選択できます。このパラメーターはオプションです。
特に指定がない限り、プライマリキーをシャーディング列として選択します。プライマリキーの負荷が分散されていない場合は、一意の識別子を持つ負荷分散フィールドをシャーディング列として選択して、パフォーマンスの問題を回避します。シャーディング列は、次の目的で使用できます。
負荷分散: ターゲットテーブルが同時書き込みをサポートしている場合、メッセージの送信に使用されるスレッドは、シャーディング列に基づいて認識できます。
順序性: データ伝送サービスは、シャーディング列の値が同じであれば、メッセージが順番に受信されることを保証します。順序性は、列に対して DML 文を実行する順序を指定します。
[列の選択] セクションで、同期する列を選択します。詳細については、「列フィルタリング」をご参照ください。
1 つまたはすべてのオブジェクトを削除する
データ伝送サービスでは、データマッピング中に右側のリストに追加された 1 つまたはすべての同期オブジェクトを削除できます。
単一の同期オブジェクトを削除する
右側のリストで、削除するオブジェクトにマウスポインターを置き、[削除] をクリックして同期オブジェクトを削除します。
すべての同期オブジェクトを削除する
右側のリストで、右上隅にある [すべて削除] をクリックします。表示されるダイアログボックスで、[OK] をクリックしてすべての同期オブジェクトを削除します。
[次へ] をクリックします。[同期オプション] ページで、パラメーターを構成します。
フル同期
次の表に、完全同期[同期タイプの選択] ページで を選択した場合にのみ表示されるフル同期パラメーターを示します。

パラメーター
説明
読み取り同時実行性
フル同期中にソースからデータを読み取るための同時実行性。最大値は 512 です。同時実行性が高いと、ソースに過度のストレスがかかり、ビジネスに影響を与える可能性があります。
書き込み同時実行性
フル同期中にデスティネーションにデータを書き込むための同時実行性。最大値は 512 です。書き込み同時実行性が高いと、ターゲットに過度のストレスがかかり、ビジネスに影響を与える可能性があります。
フル同期のレート制限
必要に応じて、フル同期のレートを制限するかどうかを選択できます。フル同期のレートを制限することを選択した場合は、1 秒あたりのレコード数(RPS)と 1 秒あたりのバイト数(BPS)を指定する必要があります。RPS は、フル同期中にデスティネーションに同期されるデータ行の最大数を指定し、BPS は、フル同期中にデスティネーションに同期されるデータの最大バイト数を指定します。
説明ここで指定した RPS 値と BPS 値は、調整のためだけにあります。実際のフル同期のパフォーマンスは、ソースとデスティネーションの設定やインスタンスの仕様などの要因によって異なります。
増分同期
次の表に、差分同期同期の種類を選択 ページで を選択した場合にのみ表示される増分同期パラメーターを示します。

パラメーター
説明
書き込み同時実行性
増分同期中にターゲットにデータを書き込むための同時実行性。最大値は 512 です。書き込み同時実行性が高いと、ターゲットに過度のストレスがかかり、ビジネスに影響を与える可能性があります。
増分同期のレート制限
必要に応じて、増分同期のレートを制限するかどうかを選択できます。増分同期のレートを制限することを選択した場合は、RPS と BPS を指定する必要があります。RPS は、増分同期中にターゲットに同期されるデータ行の最大数を指定し、BPS は、増分同期中にターゲットに同期されるデータの最大バイト数を指定します。
説明ここで指定した RPS 値と BPS 値は、調整のためだけにあります。実際の増分同期のパフォーマンスは、ソースとターゲットの設定やインスタンスの仕様などの要因によって異なります。
増分同期の開始タイムスタンプ
[フル同期] を選択した場合は、このパラメーターは使用できません。
[増分同期] を選択したが [フル同期] を選択していない場合は、データの同期を開始する時刻を指定します。デフォルト値は現在のシステム時刻です。詳細については、「増分同期のタイムスタンプを設定する」をご参照ください。
詳細パラメーター

パラメーター
説明
シリアル化方法
ターゲット RocketMQ インスタンスにデータを同期するためのメッセージ形式。有効な値: [デフォルト]、[canal]、[dataworks] (バージョン 2.0 対応)、[shareplex]、[defaultextendcolumntype]、[debezium]、[debeziumflatten]、および [debeziumsmt]。詳細については、「データ形式」をご参照ください。
重要OceanBase Database の MySQL テナントのみが [debezium]、[debeziumflatten]、[debeziumsmt] をサポートしています。
パーティションルール
RocketMQ インスタンスにデータを同期する場合、[ハッシュ] のみサポートされます。[ハッシュ] は、データ伝送サービスがハッシュアルゴリズムを使用して、プライマリキーまたはシャーディング列の値に基づいて RocketMQ のメッセージキューを選択することを示します。
ビジネスシステム識別(オプション)
[シリアル化方法] で [dataworks] を選択した場合にのみ、データのソースビジネスシステムを識別するこのパラメーターが表示されます。ビジネスシステム識別子は 1 ~ 20 文字で構成されます。
ターゲット

パラメーター
説明
プロデューサーグループの名前を入力してください
複数のトピックにデータを書き込むことができるプロデューサーグループを指定します。
メッセージ追跡を許可するかどうか
メッセージ追跡を許可するかどうかを指定します。メッセージ追跡が有効になっている場合、プロデューサーによる RocketMQ サーバーへの送信からコンシューマーによる消費までのメッセージのライフサイクル全体におけるノード時間とステータスを含む完全なタスク情報を追跡できます。メッセージ追跡機能は、本番環境でのトラブルシューティングのためのデータサポートを提供します。
[事前チェック] をクリックします。
事前チェック中、データ転送サービスはソースとターゲット間の接続を検出します。 事前チェック中にエラーが返された場合は、次の操作を実行できます。
問題を特定してトラブルシューティングし、再度事前チェックを実行します。
失敗した事前チェック項目の [アクション] 列にある [スキップ] をクリックします。操作の結果を尋ねるダイアログボックスで、
[OK] をクリックします。
事前チェックに合格したら、[タスクの開始] をクリックします。
タスクを今すぐ開始する必要がない場合は、[保存] をクリックします。[同期タスク] ページで、または後でバッチ操作を実行することで、タスクを手動で開始できます。バッチ操作の詳細については、「データ同期タスクのバッチ操作を実行する」をご参照ください。
データ伝送サービスでは、同期タスクの実行中に同期オブジェクトを変更できます。詳細については、「同期オブジェクトとそのフィルター条件を表示および変更する」をご参照ください。データ同期タスクが開始されると、選択した同期タイプに基づいて実行されます。詳細については、「データ同期タスクの詳細を表示する」をご参照ください。
ネットワーク障害またはプロセスの起動が遅いなどの理由でデータ同期タスクの実行に例外が発生した場合は、同期タスクの [同期タスク] ページまたは [詳細] ページで [復元] をクリックできます。