すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka から Hologres にデータを同期するためのリアルタイム ETL 同期タスクを作成する

最終更新日:Apr 02, 2025

DataWorks では、リアルタイム ETL (抽出、変換、書き出し) 同期タスクを使用して、Kafka から Hologres にデータを同期できます。このタスクは、指定された Kafka トピックの構造に基づいて、宛先 Hologres テーブルのスキーマを初期化します。次に、指定された Kafka トピックから宛先 Hologres テーブルに一度に全データを同期し、トピックから宛先 Hologres テーブルにリアルタイムで増分データを同期します。このトピックでは、Kafka から Hologres にデータを同期するためのリアルタイム ETL 同期タスクを作成する方法について説明します。

制限事項

  • 使用する Kafka サービスのバージョンは 0.10.2 から 2.2.0 の範囲内である必要があります。

  • この例で作成される同期タスクは、データ統合専用リソースグループ のみを使用して実行できます。

データソースを準備する

Kafka データソースを準備する

DataWorks に Kafka データソースを追加します。詳細については、「Kafka データソース」をご参照ください。

Hologres データソースを準備する

  • DataWorks にデータソースとして追加する Hologres インスタンスに関する情報を取得する

    Hologres コンソール にログインします。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。[インスタンス] ページで、DataWorks にデータソースとして追加する Hologres インスタンスを見つけ、インスタンス名をクリックします。[インスタンスの詳細] ページで、Hologres インスタンスに関する次の情報を取得します。インスタンス ID とリージョン。Hologres インスタンスで VPC (仮想プライベートクラウド) ネットワークタイプが有効になっている場合は、VPC ID と vSwitch ID も取得できます。

  • Hologres データソースを準備する

    詳細については、「Hologres データソースを追加する」をご参照ください。

データ統合専用リソースグループを作成し、リソースグループとデータソース間のネットワーク接続を確立する

データ同期を行う前に、データ統合専用リソースグループとデータソース間のネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続を構成する」をご参照ください。

説明

Kafka と Hologres は、次のネットワークタイプをサポートしています。

  • Kafka: インターネットまたは指定された VPC

  • Hologres: インターネット、指定された VPC、または AnyTunnel

  • データ統合専用リソースグループと Hologres または Kafka データソースが同じリージョンにある場合は、AnyTunnel または SingleTunnel に基づいてリージョン内の VPC を使用して、リソースグループとデータソース間のネットワーク接続を確立できます。Hologres または Kafka データソースのネットワークタイプが SingleTunnel の場合は、次の手順を実行してネットワーク接続を確立する必要があります。

    1. データ統合専用リソースグループを VPC に関連付け、リソースグループのカスタムルートを追加します。

    2. 必要な IP アドレスまたは CIDR ブロックを Hologres または Kafka データソースの IP アドレスホワイトリストに追加します。

  • データ統合専用リソースグループと Hologres または Kafka データソースが異なるリージョンにある場合は、インターネット経由でリソースグループとデータソース間のネットワーク接続を確立できます。このようなネットワーク接続を確立するには、データ統合専用リソースグループの Elastic IP アドレス ([EIP]) をデータソースの IP アドレスホワイトリストに追加する必要があります。

手順 1:データ統合専用リソースグループを VPC に関連付け、リソースグループのカスタムルートを追加する

説明

インターネット経由でデータ統合専用リソースグループとデータソース間のネットワーク接続を確立する場合は、この手順をスキップできます。

  1. データ統合専用リソースグループを VPC に関連付けます。

    1. DataWorks コンソールの リソースグループ ページ に移動し、使用するデータ統合専用リソースグループを見つけ、[アクション] 列の [ネットワーク設定] をクリックします。

    2. 表示されるページの [VPC バインディング] タブで、[VPC の関連付けを追加] をクリックします。[VPC の関連付けを追加] パネルで、次のパラメータを構成して、リソースグループを VPC に関連付けます。

      • VPC: [VPC] ドロップダウンリストから、データソースが存在する VPC を選択します。

      • ゾーンと VSwitch: データソースが存在するゾーンと VSwitch を優先的に選択します。データソースが存在するゾーンが [ゾーン] ドロップダウンリストに表示されない場合は、ランダムなゾーンとランダムな VSwitch を選択します。ただし、選択した VSwitch が属する VPC が、データソースが存在する VPC に接続できることを確認する必要があります。

      • セキュリティグループ: ビジネス要件に基づいてセキュリティグループを選択します。選択したセキュリティグループは、次の要件を満たしている必要があります。

        • セキュリティグループのインバウンドルールで、9092 から 9094 までの範囲の Kafka データソースの HTTP ポートからのアクセスが許可されます。ECS コンソール にログインして、セキュリティグループを表示できます。

        • データ統合専用リソースグループが関連付けられている VSwitch の CIDR ブロックが、インバウンドルールで権限付与オブジェクトとして指定されています。

  2. データ統合専用リソースグループのカスタムルートを追加します。

    説明

    前のサブステップでデータソースが存在するゾーンと VSwitch を選択した場合は、このサブステップをスキップできます。別のゾーンと別の VSwitch を選択した場合は、このサブステップで操作を実行して、データ統合専用リソースグループのカスタムルートを追加する必要があります。

    1. DataWorks コンソールの リソースグループ ページ に移動し、使用するデータ統合専用リソースグループを見つけ、[アクション] 列の [ネットワーク設定] をクリックします。

    2. 表示されるページの [VPC バインディング] タブで、VPC の関連付けレコードを見つけ、[アクション] 列の [カスタムルート] をクリックします。

    3. [カスタムルート] パネルで、[ルートを追加] をクリックします。[ルートを追加] ダイアログボックスで、次のパラメータを構成して、データ統合専用リソースグループのカスタムルートを追加します。

      • 宛先 VPC: データソースが存在するリージョンと VPC を選択します。

      • 宛先 VSwitch: データソースが存在する VSwitch を選択します。

手順 2:データソースの IP アドレスホワイトリストを構成する

  1. 必要な IP アドレスまたは CIDR ブロックを取得します。このサブセクションでは、ApsaraMQ for Kafka インスタンスに基づいて追加された Kafka データソースを使用します。

    必要な IP アドレスまたは CIDR ブロックを取得する方法については、「IP アドレスホワイトリストを構成する」をご参照ください。

  2. 必要な IP アドレスまたは CIDR ブロックをデータソースの IP アドレスホワイトリストに追加します。

    1. EIP または CIDR ブロックを Kafka データソースの IP アドレスホワイトリストに追加します。

      ApsaraMQ for Kafka コンソール にログインします。使用する ApsaraMQ for Kafka インスタンスを見つけ、インスタンスの [ホワイトリスト管理] ページに移動します。このページで、[ホワイトリストの作成] をクリックして、EIP または CIDR ブロックを IP アドレスホワイトリストに追加します。

    2. EIP または CIDR ブロックを Hologres データソースの IP アドレスホワイトリストに追加します。

      HoloWeb コンソール にログインします。使用する Hologres インスタンスを見つけ、[セキュリティセンター] タブに移動します。左側のナビゲーションウィンドウで、[IP アドレスホワイトリスト] をクリックします。[IP アドレスホワイトリスト] ページで、[IP アドレスをホワイトリストに追加] をクリックします。[IP アドレスをホワイトリストに追加] ダイアログボックスで、[IP アドレス] フィールドに追加する EIP または CIDR ブロックを入力し、[OK] をクリックします。holo

同期タスクを作成および構成する

  1. DataWorks コンソールにログインし、データ統合 ページに移動します。[データ統合] ページの上部で、ソースタイプと宛先タイプを選択し、[作成] をクリックします。

  2. [データ同期ソリューションの作成] ページの [基本設定] セクションと [ネットワークとリソースの構成] セクションで、同期タスクの基本情報とリソースグループを構成します。

    1. [新しいノード名]: ビジネス要件に基づいて名前を指定します。

    2. [同期方法]: [単一テーブルのリアルタイム同期] を選択します。

    3. [ネットワークとリソースの構成]: このセクションで、準備した Kafka データソース、Hologres データソース、およびデータ統合専用リソースグループを選択します。次に、[すべてのリソースグループとデータソースの接続をテスト] をクリックして、リソースグループとデータソース間のネットワーク接続をテストします。

  3. Kafka データソースを構成します。

    構成ページの上部にあるウィザードで [kafkasource] をクリックし、Kafka データソースを構成します。配置来源

    1. Kafka データソースの基本情報を構成します。

      • データを同期する Kafka トピックを選択します。

      • 他のパラメータのデフォルト値を保持するか、ビジネス要件に基づいて構成を変更します。

    2. [データサンプリング] をクリックします。

      表示されるダイアログボックスで、[開始位置] パラメータと [サンプリングされたデータレコード] パラメータを構成し、[収集開始] をクリックします。システムは、指定した Kafka トピックからデータをサンプリングします。Kafka トピックのデータをプレビューできます。Kafka トピックのデータは、データ処理ノードのデータプレビューと可視化構成の入力データとして使用されます。

  4. データ処理ノードを構成します。

    添加 アイコンをクリックして、データ処理方法を追加できます。次のデータ処理方法がサポートされています。[データマスキングルール][文字列置換ルール][フィルタ条件][JSON 解析][フィールドの編集と値の割り当て]。ビジネス要件に基づいてデータ処理方法を調整できます。同期タスクが実行されると、指定した処理順序に基づいてデータが処理されます。数据处理 データ処理ノードを構成した後、構成ページの右上隅にある [データ出力のプレビュー] をクリックできます。[データ出力のプレビュー] ダイアログボックスで、[先祖ノードの出力を再取得] をクリックして、データ処理ノードが指定された Kafka トピックからサンプリングされたデータを処理し、処理結果をプレビューできるようにします。

    [データ出力のプレビュー] ダイアログボックスで、入力データを変更するか、[手動でデータを構築] をクリックして入力データをカスタマイズできます。次に、[プレビュー] をクリックして、データ処理ノードによって入力データが処理された後に生成された結果をプレビューできます。データ処理ノードで例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、データ処理ノードの構成を確認し、できるだけ早く期待される結果が得られるかどうかを判断できます。

    説明

    データ処理ノードによって入力データが処理された後に生成された結果をプレビューする前に、Kafka データソースの [データサンプリング] 設定を構成する必要があります。

    输出预览

  5. Hologres データソースを構成します。

    構成ページの上部にあるウィザードで [hologres] をクリックし、Hologres データソースを構成します。去向信息

    1. [基本情報] セクションで、パラメータを構成します。

      • データを書き込む Hologres スキーマを選択します。

      • [宛先テーブル] パラメータを構成します。[テーブルの作成] または [既存のテーブルを使用] を選択できます。

      • テーブル名を入力するか、[テーブル名] ドロップダウンリストからテーブル名を選択します。

    2. [宛先テーブル] を [テーブルの作成] に設定した場合に自動的に作成される宛先 Hologres テーブルのスキーマを編集します。

      [宛先テーブル] パラメータで [テーブルの作成] を選択した場合は、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、自動的に作成される宛先 Hologres テーブルのスキーマを編集します。[先祖ノードの出力カラムに基づいてテーブルスキーマを再生成] をクリックして、先祖ノードの出力カラムに基づいてテーブルスキーマを再生成することもできます。生成されたテーブルスキーマからカラムを選択し、カラムをプライマリキーとして構成できます。

      説明

      宛先 Hologres テーブルにはプライマリキーが必要です。そうでない場合、構成を保存できません。

      编辑建表结构

    3. ソースのフィールドと宛先のフィールド間のマッピングを構成します。

      [宛先テーブル] パラメータを [既存のテーブルを使用] に設定して Hologres データソースの基本情報を構成した後、またはテーブルスキーマ設定を保存した後、システムは [同一名マッピング] 原則に基づいて、ソースのカラムと宛先のカラム間のマッピングを自動的に確立します。ビジネス要件に基づいてマッピングを変更できます。ソースの 1 つのカラムを宛先の複数カラムにマッピングできます。ソースの複数カラムを宛先の同じカラムにマッピングすることはできません。ソースのカラムに宛先にマッピングされたカラムがない場合、ソースのカラムのデータは宛先に同期されません。字段映射

    4. 先祖データ処理ノードによって生成された動的カラムの処理ポリシーを構成します。

      データ処理ノードによって生成された動的カラムの処理ポリシーは、先祖データ処理ノードによって生成された動的カラムの処理方法を制御するために使用されます。[JSON 解析ノード] のみ動的カラムを生成できます。[動的出力フィールド] セクションで [JSON 解析] ノードを構成する場合は、[JSON 解析ノードによって生成された動的カラムの処理ポリシー] を構成する必要があります。

      動的カラムとは、固定カラム名のないカラムのことです。同期タスクは、ソースの入力データに基づいて動的カラムの名前と値を自動的に解析し、カラムから宛先にデータを同期します。次の表に、動的カラムでサポートされている処理ポリシーを示します。

      パラメータ

      説明

      カラムを追加

      宛先 Hologres テーブルに動的カラムと同じ名前のカラムがない場合、システムは宛先 Hologres テーブルにカラムを自動的に追加し、動的カラムのデータを追加されたカラムに書き込みます。

      無視

      宛先 Hologres テーブルに動的カラムと同じ名前のカラムがない場合、システムは動的カラムを無視し、ソーステーブルにマッピングされたカラムを持つ他のカラムのデータを宛先 Hologres テーブルに書き込みます。

      エラーを報告

      宛先 Hologres テーブルに動的カラムと同じ名前のカラムがない場合、エラーが報告され、同期タスクは停止します。

  6. 詳細パラメータを構成します。

    構成ページの右上隅にある [詳細パラメータの構成] をクリックします。[詳細パラメータの構成] ダイアログボックスで、並列処理やメモリリソースなどの項目を構成します。指定された Kafka トピックのデータ量とトピックのパーティション数に基づいて各項目を構成できます。次の手順に基づいて項目を構成することをお勧めします。

    • Kafka からデータを読み取るために使用される並列スレッド数 = Kafka トピックのパーティション数

    • Hologres にデータを書き込むために使用される並列スレッド数 = Kafka トピックのパーティション数

    • メモリサイズ = 1.5 GB + (256 MB × Kafka トピックのパーティション数)

    • 分散実行モードを有効にする条件: Kafka トピックに含まれるパーティションが 12 個を超える

    • サブタスク数 = Kafka トピックのパーティション数を 6 で割った商を切り上げた値

    • 単一サブタスクの Kafka からデータを読み取るために使用される並列スレッド数 = 6

    • 単一サブタスクの Hologres にデータを書き込むために使用される並列スレッド数 = 6

    • 単一サブタスクが占有できるメモリサイズ = 2 GB

    同期タスクのパフォーマンスとリソース消費量は、ソースと宛先のデータ量、ネットワーク環境、DataWorks の負荷などの要因の影響を受けます。ビジネス要件に基づいて、前述の手順を参照して項目の設定を変更できます。

  7. アラートルールを構成します。

    同期タスクで発生する例外をできるだけ早く特定して対応するために、同期タスクに異なるアラートルールを構成できます。

    1. 構成ページの右上隅にある [アラートルールの構成] をクリックして、同期タスクによって生成されるリアルタイム同期サブタスクのアラートルールを構成します。

    2. [アラートルールの構成] をクリックします。

      アラートルールの構成方法の詳細については、「リアルタイム同期ノードのアラートルールを構成するためのベストプラクティス」をご参照ください。

      説明

      [アラートの理由] パラメータを [DDL 通知] に設定した場合、[適用可能な DDL] パラメータには [カラムを追加] のみ選択できます。システムがデータ処理ノードによって動的カラムが生成されたことを検出した場合、エラーがトリガーされます。トリガー条件は、宛先 Hologres テーブルにカラムを追加する操作ではありません。新增报警

    3. アラートルールを管理します。

      アラートルールを有効または無効にすることができます。アラートの重大度レベルに基づいて異なるアラート受信者を指定して、システムがアラート通知を送信できるようにすることもできます。报警规则

  8. リソースグループを構成します。

    構成ページの右上隅にある [リソースグループの構成] をクリックし、同期タスクの実行に使用するデータ統合専用リソースグループを構成します。

  9. 同期タスクのテストを実行します。

    上記の構成が完了したら、構成ページの右上隅にある [シミュレート実行を実行] をクリックして、同期タスクがサンプリングされたデータを宛先 Hologres テーブルに同期できるようにします。宛先 Hologres テーブルで同期結果を表示できます。同期タスクの特定の構成が無効である場合、テスト実行中に例外が発生した場合、またはダーティデータが生成された場合、システムはリアルタイムでエラーを報告します。これにより、同期タスクの構成を確認し、できるだけ早く期待される結果が得られるかどうかを判断できます。模拟运行

    1. 構成ページの右上隅にある [シミュレート実行を実行] をクリックします。表示されるダイアログボックスで、指定された Kafka トピックからのデータサンプリングのパラメータ ([開始位置] パラメータと [サンプリングされたデータレコード] パラメータを含む) を構成します。

    2. [収集開始] をクリックして、同期タスクが指定された Kafka トピックからデータをサンプリングできるようにします。

    3. [プレビュー] をクリックして、同期タスクがサンプリングされたデータを宛先 Hologres テーブルに同期できるようにします。

上記の構成が完了したら、[構成の完了] をクリックします。

同期タスクの O&M を実行する

同期タスクを開始する

同期タスクの構成が完了すると、[データ統合] ページに移動します。作成された同期タスクを見つけて、[アクション] 列の [開始] をクリックして、同期タスクを開始できます。启动同步任务

同期タスクの実行ステータスを表示する

同期タスクの構成が完了すると、[データ統合] ページでタスクを見つけて、[タスク名] をクリックするか、[アクション] 列の [実行の詳細] をクリックして、タスクの実行の詳細を表示できます。実行の詳細ページには、同期タスクに関する次の情報が表示されます。任务详情

  • [基本情報]: データソースやリソースグループなど、同期タスクに関する基本情報を表示できます。

  • [実行ステータス]: 同期タスクには、[スキーマ移行][リアルタイム同期] のステージがあります。各ステージの同期タスクの実行ステータスを表示できます。

  • [詳細]: [スキーマ移行] タブと [リアルタイム同期] タブで、スキーマ移行ステージとリアルタイム同期ステージの同期タスクの詳細を表示できます。

    • [スキーマ移行]: このタブには、宛先テーブルの生成方法などの情報が表示されます。宛先テーブルの生成方法には、[既存のテーブルを使用] と [テーブルの作成] があります。宛先テーブルの生成方法が [テーブルの作成] の場合、テーブルの作成に使用される DDL 文が表示されます。

    • [リアルタイム同期]: このタブには、リアルタイムの読み取りおよび書き込みトラフィック、ダーティデータ情報、フェールオーバー、操作ログなど、リアルタイム同期に関する統計が表示されます。

同期タスクを再実行する

  • 同期タスクを直接再実行する

    [データ統合] ページの [ノード] セクションで同期タスクを見つけ、[詳細] > [再実行][アクション] 列の を選択して、同期タスクの構成を変更せずに再実行します。

  • 同期タスクの構成を変更してから再実行する

    [データ統合] ページの [ノード] セクションで同期タスクを見つけ、同期タスクの構成を変更してから、[完了] をクリックします。同期タスクの [アクション] 列に表示される [更新の適用] をクリックして、最新の構成を反映するために同期タスクを再実行します。