すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka テーブルを ApsaraDB for OceanBase にリアルタイムで同期する

最終更新日:Oct 29, 2025

単一テーブルのリアルタイム同期タスクは、ソース Kafka の指定された Topic のコンテンツ構造に基づいて、宛先の ApsaraDB for OceanBase テーブル構造を初期化します。その後、タスクは指定された Kafka Topic から ApsaraDB for OceanBase に既存データを同期し、増分データをリアルタイムで継続的に同期します。このトピックでは、Kafka Topic データを ApsaraDB for OceanBase にリアルタイムで同期する方法について説明します。

前提条件

手順

ステップ 1: 同期タスクタイプを選択する

  1. Data Integration ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[Data Integration] > [Data Integration] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。

  2. 左側のナビゲーションウィンドウで [同期タスク] をクリックし、ページの上部にある [同期タスクの作成] をクリックして、同期タスク作成ページに移動します。次の基本情報を設定します:

    • データソースと宛先: KafkaApsaraDB for OceanBase

    • 新しいタスク名: 同期タスク名をカスタマイズします。

    • 同期タイプ: 単一テーブルリアルタイム

ステップ 2: ネットワークとリソースを設定する

  1. [ネットワークとリソースの設定] セクションで、同期タスクに使用する [リソースグループ] を選択します。[タスクリソース使用量] の CU 数を割り当てることができます。

  2. [ソースデータソース] で、追加した kafka データソースを選択します。[宛先データソース] で、追加した ApsaraDB for OceanBase データソースを選択します。次に、[接続性テスト] をクリックします。image

  3. ソースデータソースと宛先データソースの両方が正常に接続されたことを確認したら、[次へ] をクリックします。

ステップ 3: 同期リンクを設定する

1. Kafka ソースを設定する

ページ上部の Kafka データソースをクリックして、[Kafka ソース情報] を編集します。

image

  1. [Kafka ソース情報] セクションで、同期する Kafka クラスター内の Topic を選択します。

    他のパラメーターはデフォルト値を保持するか、ビジネス要件に基づいて設定を変更します。

  2. 右上隅の [データサンプリング] をクリックします。

    表示されるダイアログボックスで、[開始時間][サンプル数] を指定し、[収集開始] ボタンをクリックして、指定した Kafka Topic からデータをサンプリングします。Topic 内のデータをプレビューして、後続のデータ処理ノードのデータプレビューと視覚的な設定のための入力を提供することもできます。

  3. [出力フィールド設定] セクションで、必要に応じて同期するフィールドを選択します。

2. データ処理ノードを編集する

image アイコンをクリックして、データ処理メソッドを追加できます。サポートされているデータ処理メソッドは、データマスキング文字列の置換データフィルタリングJSON 解析フィールドの編集と値の割り当て です。ビジネス要件に基づいてデータ処理メソッドを配置できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。

image

データ処理ノードの設定が完了したら、右上隅の [データ出力プレビュー] ボタンをクリックできます。表示されるダイアログボックスで、[アップストリーム出力を再度取得] をクリックして、現在のデータ処理ノードで処理された後の Kafka Topic サンプルデータの結果をシミュレートします。

image

説明

データ出力プレビューは、Kafka ソースの [データサンプリング] に強く依存します。データ出力プレビューを実行する前に、Kafka ソースフォームでデータサンプリングを完了する必要があります。

3. ApsaraDB for OceanBase 宛先情報を設定する

ページ上部の ApsaraDB for OceanBase データ宛先をクリックして、[OceanBase 宛先情報] を編集します。

image

  1. [OceanBase 宛先情報] セクションで、書き込む OceanBase テーブルについて [テーブルを自動的に作成] するか [既存のテーブルを使用] するかを選択します。

    • テーブルを自動的に作成することを選択した場合、デフォルトでデータソーステーブルと同じ名前のテーブルが作成されます。宛先テーブル名は手動で変更できます。

    • 既存のテーブルを使用することを選択した場合、ドロップダウンリストから同期する宛先テーブルを選択します。

  2. (オプション) 宛先テーブルのスキーマを変更します。

    宛先テーブルパラメーターで [テーブルを自動的に作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先テーブルのスキーマを編集します。また、[先祖ノードの出力列に基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力列に基づいてスキーマを再生成することもできます。生成されたスキーマから列を選択し、その列をプライマリキーとして設定できます。

    説明

    宛先テーブルにはプライマリキーが必要です。そうでない場合、設定は保存できません。

  3. ソースのフィールドと宛先のフィールド間のマッピングを設定します。

    上記の設定が完了すると、システムは [同名マッピング] の原則に基づいて、ソースのフィールドと宛先のフィールド間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのフィールドは、宛先の複数のフィールドにマッピングできます。ソースの複数のフィールドを、宛先の同じフィールドにマッピングすることはできません。ソースのフィールドにマッピングされた宛先のフィールドがない場合、ソースのフィールドのデータは宛先に同期されません。

4. アラートルールを設定する

同期タスクの失敗がビジネスデータの同期に遅延を引き起こすのを防ぐために、同期タスクに異なるアラートルールを設定できます。

  1. ページの右上隅にある [アラートルールの設定] をクリックして、[アラートルールの設定] パネルに移動します。

  2. [アラートルールの設定] パネルで、[アラートルールの追加] をクリックします。[アラートルールの追加] ダイアログボックスで、パラメーターを設定してアラートルールを設定します。

    説明

    このステップで設定したアラートルールは、同期タスクによって生成されるリアルタイム同期サブタスクに対して有効になります。同期タスクの設定が完了したら、「リアルタイム同期タスクの管理」を参照してリアルタイム同期タスクページに移動し、リアルタイム同期サブタスクに設定されたアラートルールを変更できます。

  3. アラートルールを管理します。

    作成されたアラートルールを有効または無効にできます。また、アラートの重大度レベルに基づいて、異なるアラート受信者を指定することもできます。

5. 詳細パラメーターを設定する

DataWorks では、特定のパラメーターの設定を変更できます。ビジネス要件に基づいてこれらのパラメーターの値を変更できます。

説明

予期しないエラーやデータ品質の問題を防ぐために、パラメーターの値を変更する前に、パラメーターの意味を理解することをお勧めします。

  1. 設定ページの右上隅にある [詳細パラメーターの設定] をクリックします。

  2. [詳細パラメーターの設定] パネルで、目的のパラメーターの値を変更します。

6. リソースグループを設定する

ページの右上隅にある [リソースグループの設定] をクリックして、現在の同期タスクの実行に使用されるリソースグループを表示および変更できます。

7. 同期タスクのテストを実行する

上記の設定が完了したら、設定ページの右上隅にある [シミュレーション実行] をクリックして、同期タスクがサンプリングされたデータを宛先テーブルに同期できるようにします。同期結果は宛先テーブルで表示できます。同期タスクの特定の設定が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの設定を確認し、期待される結果が早期に得られるかどうかを判断できます。

  1. 表示されるダイアログボックスで、指定したテーブルからのデータサンプリングのパラメーター ([開始位置] および [サンプリングデータレコード] パラメーターを含む) を設定します。

  2. [収集開始] をクリックして、同期タスクがソースからデータをサンプリングできるようにします。

  3. [プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先に同期できるようにします。

8. 同期タスクを実行する

  1. 同期タスクの設定が完了したら、ページの下部にある [完了] をクリックします。

  2. [タスク] セクションの [同期タスク] ページで、作成した同期タスクを見つけ、[操作] 列の [開始] をクリックします。

  3. [Tasks] セクションで同期タスクの [名前または ID] をクリックし、詳細な実行プロセスを表示します。

同期タスクの O&M 操作を実行する

同期タスクのステータスを表示する

データ同期ソリューションが作成された後、タスクページに移動して、ワークスペースで作成されたすべてのデータ同期ソリューションと各ソリューションの基本情報を表示できます。

image

  • アクション列で同期タスクを [開始] または [停止] できます。また、[その他] ドロップダウンリストで同期タスクを [編集] または [表示] することもできます。

  • 開始されたタスクについては、[実行概要] で基本的な実行ステータスを確認したり、対応する概要エリアをクリックして実行の詳細を表示したりできます。

image

Kafka から ApsaraDB for OceanBase への単一テーブルリアルタイム同期タスクは、2 つのステップで構成されます:

  • スキーマ移行: 宛先テーブルの作成方法 (既存のテーブルまたは自動テーブル作成) が含まれます。自動テーブル作成が選択されている場合、テーブル作成用の DDL が表示されます。

  • リアルタイムデータ同期: リアルタイムの読み取り/書き込みトラフィック、ダーティデータ、フェールオーバー、操作ログなどのリアルタイム同期の統計情報が含まれます。

同期タスクを再実行する

特別な場合、同期するフィールド、宛先テーブルのフィールド、またはテーブル名情報を変更したい場合は、目的の同期タスクの [操作] 列にある [再実行] をクリックすることもできます。これにより、システムは宛先に行われた変更を同期します。すでに同期されていて変更されていないテーブルのデータは、再度同期されません。

  • 同期タスクの設定を変更せずに直接 [再実行] をクリックすると、システムが同期タスクを再実行します。

  • 同期タスクの設定を変更してから [完了] をクリックします。同期タスクの操作列に表示される [更新を適用] をクリックして、最新の設定を有効にするために同期タスクを再実行します。