Data Integration を使用すると、MySQL、Oracle、PolarDB などのソースから Hologres などの宛先にデータベース全体をリアルタイムで同期できます。このトピックでは、MySQL をソース、Kafka を宛先として使用する例を取り上げ、MySQL データベース全体を Kafka に完全同期および増分同期する方法について説明します。
前提条件
Serverless リソースグループまたはデータ統合専用リソースグループを購入します。
MySQL および Kafka データソースを作成します。詳細については、「データソース設定」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
操作手順
1. 同期タスクタイプの選択
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ統合に移動] をクリックします。
左側のナビゲーションウィンドウで、[同期タスク] をクリックします。次に、ページの上部にある [同期タスクの作成] をクリックします。次の基本情報を設定します。
ソース:
MySQL。[宛先]:
Kafka。[タスク名]: 同期タスクのカスタム名を入力します。
[タスクタイプ]:
リアルタイムデータベース全体の同期。[同期手順]: [完全初期化] と [増分同期] を選択します。
2. ネットワークとリソースの設定
[ネットワークとリソースの構成] セクションで、同期タスクの [リソースグループ] を選択します。[タスクリソースの使用量] で CU (コンピューティングユニット) を割り当てることができます。
[ソース] をご利用の
MySQLデータソースに、[宛先] をご利用のKafkaデータソースに設定します。次に、[接続性テスト] をクリックします。
ソースと宛先の両方のデータソースの接続性テストが成功したら、[次へ] をクリックします。
3. 同期するデータベースとテーブルの選択
[ソーステーブル] セクションで、同期するソーステーブルを選択します。
アイコンをクリックして、右側の [選択されたテーブル] セクションに移動します。

4. 宛先 Topic へのマッピング
データを同期するテーブルを選択すると、選択したテーブルが [宛先テーブルのマッピングルール] セクションに自動的に表示されます。宛先テーブルのプロパティはマッピング待ちの状態です。ソーステーブルと宛先テーブル間のマッピングを手動で定義して、データの読み取りと書き込みの関係を決定する必要があります。その後、[操作] 列の [リフレッシュ] をクリックできます。ソーステーブルと宛先テーブル間のマッピングを直接リフレッシュできます。また、宛先テーブルに関連する設定を構成した後に、ソーステーブルと宛先テーブル間のマッピングをリフレッシュすることもできます。
同期するテーブルを選択し、[マッピング結果の一括リフレッシュ] をクリックします。マッピングルールが設定されていない場合、宛先テーブルのデフォルトの命名規則は
${SourceDBName}_${TableName}です。この名前のテーブルが宛先に存在しない場合、新しいテーブルが自動的に作成されます。[宛先 Topic 名のカスタムマッピングルール] 列で、[設定] ボタンをクリックして、宛先 Topic の命名規則を設定します。
組み込み変数と手動で入力した文字列を組み合わせて、[宛先 Topic 名] を作成できます。組み込み変数を編集することもできます。たとえば、ソーステーブル名にサフィックスを追加して [宛先 Topic 名] を作成する新しい Topic 命名規則を作成できます。
[データ書き込みのキーの値] 列で、[設定] ボタンをクリックして書き込みキーを設定します。
a. 宛先 Topic のフィールドの追加と値の割り当て
宛先 Topic のステータスが [作成待ち] の場合、元のテーブルスキーマのフィールドに加えて、新しいフィールドを追加できます。次の手順を実行します。
単一のテーブルにフィールドを追加して値を割り当てるには、[宛先 Topic フィールドへの値の割り当て] 列の [設定] ボタンをクリックします。[追加フィールド] ページで、[フィールドの追加] をクリックしてフィールドを追加し、値を割り当てます。
一括で値を割り当てるには: 複数のテーブルを選択します。リストの下部で、 を選択して、複数の宛先テーブルにまたがる同じフィールドに値を割り当てます。
説明定数と変数を割り当てることができます。
アイコンをクリックして、割り当てモードを切り替えます。
b. DML 処理ルールの設定
Data Integration は、デフォルトの DML 処理ルールを提供します。ビジネス要件に基づいて、宛先テーブルの DML 処理ルールを設定することもできます。
単一の宛先テーブルの DML 処理ルールを設定する: DML 処理ルールを設定する宛先テーブルを見つけ、[DML ルールの設定] 列の [設定] をクリックして、テーブルの DML 処理ルールを設定します。
一度に複数の宛先テーブルの DML 処理ルールを設定する: DML 処理ルールを設定する宛先テーブルを選択し、ページの下部にある [一括変更] をクリックしてから、[DML ルールの設定] をクリックします。
c. 宛先 Topic プロパティの設定
宛先 Topic 名の列にある
アイコンをクリックして、[パーティション数] や [レプリカ数] などの Topic プロパティを設定します。
d. ソース分割列の設定
[ソース分割列] のドロップダウンリストから、ソーステーブルのフィールドまたは [分割しない] を選択します。
e. 完全同期の実行
同期タスクタイプを選択したときに、[同期ステップ] セクションの [完全同期] チェックボックスをオンにした場合、このステップで特定のテーブルの完全同期を無効にすることができます。
5. アラートルールの設定
同期タスクの失敗によって業務データの同期に遅延が発生するのを防ぐために、同期タスクにさまざまなアラートルールを設定できます。
ページの右上隅にある [アラートルールの設定] をクリックして、[リアルタイム同期サブノードのアラートルール設定] パネルに移動します。
[アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを構成します。
説明このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了した後、「リアルタイム同期タスクの管理」を参照して、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。
アラートルールの管理
作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。
6. 詳細パラメーターの設定
ビジネス要件に基づいて、同期タスクに設定された特定のパラメーターの値を変更できます。たとえば、[最大読み取り接続数] パラメーターに適切な値を指定して、現在の同期タスクがソースデータベースに過度の負荷をかけ、データ生成に影響が及ぶのを防ぐことができます。
予期しないエラーやデータ品質の問題を防ぐため、パラメーターの値を変更する前に、パラメーターの意味を理解することを推奨します。
設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。
[詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。
7. DDL 処理ルールの設定
ソースに対して DDL 操作が実行される場合があります。ページの右上隅にある [DDL 機能の設定] をクリックして、ビジネス要件に基づいてソースからの DDL メッセージを処理するルールを設定できます。
詳細については、「DDL メッセージを処理するルールの設定」をご参照ください。
8. リソースグループの表示と変更
ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。
9. 同期タスクの実行
同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。
[データ統合] ページの [同期タスク] セクションで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。
[タスク] セクションの同期タスクの [名前/ID] をクリックし、同期タスクの詳細な実行プロセスを表示します。
同期タスクの O&M (運用保守)
タスク実行ステータスの表示
同期タスクを作成した後、[同期タスク] ページで同期タスクのリストとその基本情報を表示できます。
[操作] 列では、同期タスクを [開始] または [停止] できます。[その他] メニューでは、[編集] や [表示] などの他の操作を実行できます。
実行中のタスクについては、[実行概要] セクションでその基本ステータスを表示し、概要内のエリアをクリックして実行の詳細を表示できます。

MySQL データベース全体から Kafka へのリアルタイム同期タスクは、次の 3 つのステップで構成されます。
スキーマ移行: 既存の Topic を使用するか、新しい Topic を自動的に作成して、宛先 Topic がどのように作成されるかを示します。Topic が自動的に作成される場合、そのデータ定義言語 (DDL) 文が表示されます。
完全データ初期化: オフラインで同期されたテーブル、同期の進行状況、書き込まれたレコード数に関する情報を表示します。
リアルタイムデータ同期: リアルタイムの進行状況、DDL レコード、データ操作言語 (DML) レコード、アラート情報などのリアルタイム同期統計を表示します。
同期タスクの再実行
特殊なケース、例えばソースへのテーブルの追加や削除、宛先テーブルのスキーマや名前の変更などが発生した場合、同期タスクの [操作] 列にある [その他] をクリックし、続いて [再実行] をクリックすることで、変更を反映したタスクを再実行できます。再実行プロセス中、同期タスクは新しく追加されたテーブルから宛先へのデータ同期のみ、またはマッピングされたソーステーブルからスキーマや名前が変更された宛先テーブルへのデータ同期のみを行います。
タスクの設定を変更せずに同期タスクを再実行する場合は、[操作] 列の [その他] をクリックし、[再実行] をクリックしてタスクを再実行し、完全同期と増分同期を再度実行します。
タスクにテーブルを追加または削除した後に同期タスクを再実行する場合は、変更後に [完了] をクリックします。この場合、同期タスクの [操作] 列に [更新の適用] が表示されます。[更新の適用] をクリックして、システムに同期タスクの再実行をトリガーします。再実行プロセス中、同期タスクは新しく追加されたテーブルから宛先にデータを同期します。元のテーブルのデータは再度同期されません。
付録:Kafka メッセージフォーマット
リアルタイム同期タスクを設定して実行すると、ソースデータベースのデータが JSON フォーマットで Kafka Topic に書き込まれます。タスクはまず、指定されたソーステーブルのすべての既存データを対応する Kafka Topic に書き込みます。その後、リアルタイムプロセスを開始して、増分データを継続的に Topic に書き込みます。ソーステーブルからの増分 DDL 変更も JSON フォーマットで Topic に書き込まれます。メッセージフォーマットの詳細については、「付録:メッセージフォーマット」をご参照ください。
完全同期タスクによって Kafka に書き込まれたデータの場合、JSON 構造内の payload.sequenceId、payload.timestamp.eventTime、および payload.timestamp.checkpointTime フィールドはすべて -1 に設定されます。