すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for ClickHouse:ApsaraMQ for Kafka インスタンスからのデータの同期

最終更新日:Jun 04, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスから ApsaraDB for ClickHouse クラスターにデータをリアルタイムで同期する方法について説明します。

制限事項

ApsaraMQ for Kafka インスタンスまたは Elastic Compute Service (ECS) インスタンスにデプロイされた自己管理 Kafka クラスタからのデータのみを ApsaraDB for ClickHouse インスタンスに同期できます。

前提条件

  • 同期先 ApsaraDB for ClickHouse クラスター:

    • 同期先クラスターが作成されていること。ソース ApsaraMQ for Kafka インスタンスと同期先クラスターは同じリージョンにデプロイされ、同じ VPC を使用していること。詳細については、「ApsaraDB for ClickHouse クラスターを作成する」をご参照ください。

    • 同期先クラスターのデータベースにログインするためのアカウントが作成されており、データベースに対する操作を実行する権限を持っていること。詳細については、「アカウント管理」をご参照ください。

  • ソース ApsaraMQ for Kafka インスタンス:

使用上の注意

  • トピックが ApsaraDB for ClickHouse Kafka 外部テーブルによってサブスクライブされている場合、そのトピックは他のコンシューマーによって消費されることはできません。

  • Kafka 外部テーブル、マテリアライズドビュー、およびローカルテーブルを作成する場合は、3 つのテーブルのフィールドタイプが一致している必要があります。

手順

この例では、ApsaraMQ for Kafka インスタンスから、Community 互換 Edition を実行する ApsaraDB for ClickHouse クラスターのデフォルトデータベース内の kafka_table_distributed 分散テーブルにデータを同期する方法を示します。

ステップ 1: 同期の原則を理解する

ApsaraDB for ClickHouse は、組み込みの Kafka テーブルエンジンとマテリアライズドビューメカニズムを使用して、ApsaraMQ for Kafka インスタンスからデータを同期します。これにより、リアルタイムのデータ消費とストレージを実現します。次の図は、同期プロセスを示しています。

  • Kafka トピック: 同期するソースデータを指定します。

  • ApsaraDB for ClickHouse Kafka 外部テーブル: 指定された Kafka トピックからソースデータをプルします。

  • マテリアライズドビュー: Kafka 外部テーブルからソースデータを読み取り、ApsaraDB for ClickHouse ローカルテーブルにデータを挿入します。

  • ローカルテーブル: 同期されたデータを格納します。

ステップ 2: ApsaraDB for ClickHouse データベースにログインする

ApsaraDB for ClickHouse データベースへのログイン方法の詳細については、「DMS を使用して ApsaraDB for ClickHouse クラスターに接続する」をご参照ください。

ステップ 3: Kafka 外部テーブルを作成する

ApsaraDB for ClickHouse は、組み込みの Kafka テーブルエンジンを使用して、指定された Kafka トピックからソースデータをプルします。Kafka 外部テーブルには、次の特徴があります。

  • デフォルトでは、Kafka 外部テーブルを直接クエリすることはできません。

  • Kafka 外部テーブルは Kafka データの消費にのみ使用され、データを格納しません。マテリアライズドビューを使用してデータを処理し、同期先テーブルにデータを挿入して格納する必要があります。

テーブル作成のサンプル文:

重要

Kafka 外部テーブルのフィールド形式は、Kafka データ型と一致している必要があります。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port1,host:port2,host:port3',
    kafka_topic_list = 'topic_name1,topic_name2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_num_consumers = N,]
    [kafka_thread_per_consumer = 1,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_auto_offset_reset = N]

次の表は、上記の文のパラメーターについて説明しています。

パラメーター

必須

説明

kafka_broker_list

はい

ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイント。エンドポイントはカンマ (,) で区切ります。エンドポイントの表示方法の詳細については、「エンドポイントを表示する」をご参照ください。

  • ApsaraMQ for Kafka インスタンスを使用する場合、ApsaraDB for ClickHouse はデフォルトでインスタンスのドメイン名を解析します。

  • 自己管理 Kafka クラスタを使用する場合、ApsaraDB for ClickHouse は固定形式の IP アドレスまたはカスタムドメイン名を使用してクラスタに接続します。サポートされているドメイン名のルールは次のとおりです。

    1. .com で終わるドメイン名。

    2. .local で終わり、kafka、mysql、rabbitmq のいずれかのキーワードを含むドメイン名。

kafka_topic_list

はい

ApsaraMQ for Kafka インスタンス内のトピックの名前。トピックの名前はカンマ (,) で区切ります。トピックの名前の表示方法の詳細については、「ステップ 1: トピックを作成する」をご参照ください。

kafka_group_name

はい

ApsaraMQ for Kafka インスタンス内のコンシューマーグループの名前。詳細については、「ステップ 2: グループを作成する」をご参照ください。

kafka_format

はい

ApsaraDB for ClickHouse でサポートされているメッセージ本文の形式。

説明

ApsaraDB for ClickHouse でサポートされているメッセージ本文の形式の詳細については、入力データと出力データの形式 をご参照ください。

kafka_row_delimiter

いいえ

行を区切るために使用する行区切り文字。デフォルト値は \n です。データが書き込まれる実際の区切り形式に基づいて、このパラメーターを設定することもできます。

kafka_num_consumers

いいえ

テーブル内のデータを消費するコンシューマーの数。デフォルト値: 1。

説明
  • 1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定します。

  • パーティションごとに 1 つのコンシューマーしか割り当てることができないため、コンシューマーの数はトピック内のパーティションの数を超えることはできません。

kafka_thread_per_consumer

いいえ

各コンシューマーが消費のために独立したスレッドを開始するかどうかを指定します。デフォルト値: 0。有効な値:

  • 0: すべてのコンシューマーが消費に 1 つのスレッドを使用します。

  • 1: 各コンシューマーが消費のために独立したスレッドを開始します。

消費速度を向上させる方法の詳細については、Kafka パフォーマンスの最適化 をご参照ください。

kafka_max_block_size

いいえ

各バッチでテーブルに書き込むことができる Kafka メッセージの最大サイズ。デフォルト値: 65536。単位: バイト。

kafka_skip_broken_messages

いいえ

Kafka メッセージパーサーのダーティデータに対する許容範囲。デフォルト値: 0。kafka_skip_broken_messages=N を N に設定すると、Kafka エンジンは解析できない N 個の Kafka メッセージをスキップします。1 つのメッセージは 1 行のデータに相当します。

kafka_commit_every_batch

いいえ

コミット操作を実行する頻度を指定します。デフォルト値: 0。有効な値:

  • 0: ブロック全体が書き込まれた後にのみ、コミット操作が実行されます。

  • 1: データのバッチが書き込まれた後に、コミット操作が実行されます。

kafka_auto_offset_reset

いいえ

データソースとして使用される ApsaraMQ for Kafka インスタンス内のデータの消費を開始するオフセット。有効な値:

  • earliest (デフォルト値): ApsaraMQ for Kafka インスタンス内のデータは最も古いオフセットから消費されます。

  • latest: ApsaraMQ for Kafka インスタンス内のデータは最新のオフセットから消費されます。

説明

このパラメーターは、バージョン 21.8 の ApsaraDB for ClickHouse クラスターではサポートされていません。

パラメーターの詳細については、Kafka をご参照ください。

サンプル文:

CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- テーブルスキーマを定義するフィールド
    id Int32,
    name String               
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
    kafka_topic_list = 'testforCK',
    kafka_group_name = 'GroupForTestCK',
    kafka_format = 'CSV';

ステップ 4: 格納用の同期先テーブルを作成する

テーブル作成文は、クラスターのエディションによって異なります。

Enterprise Edition クラスターの場合は、ローカルテーブルのみを作成する必要があります。Community 互換 Edition クラスターの場合は、ビジネス要件に基づいて分散テーブルを作成できます。テーブル作成文の詳細については、「CREATE TABLE」をご参照ください。サンプル文:

Enterprise Edition

CREATE TABLE default.kafka_table_local ON CLUSTER default (
  id Int32,
  name String
) ENGINE = MergeTree()
ORDER BY (id);

上記の文を実行したときに ON CLUSTER is not allowed for Replicated database というエラーメッセージが表示された場合は、クラスターのマイナーエンジンバージョンを更新することでこの問題を解決できます。詳細については、「マイナーエンジンバージョンを更新する」をご参照ください。

Community 互換 Edition

Single レプリカ Edition クラスターと Double レプリカ Edition クラスターで使用されるテーブルエンジンは異なります。使用しているレプリカタイプに基づいてテーブルエンジンを選択してください。

重要

Double レプリカ Edition クラスターでテーブルを作成する場合は、テーブルで MergeTree ファミリーの Replicated テーブルエンジンを使用していることを確認してください。テーブルで非 Replicated テーブルエンジンを使用すると、テーブル上のデータはレプリカ間で複製されません。これにより、データの不整合が発生する可能性があります。

Single レプリカ Edition

  1. ローカルテーブルを作成します。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = MergeTree()
    ORDER BY (id);
  2. オプション。分散テーブルを作成します。

    ローカルテーブルにのみデータをインポートする場合は、この手順をスキップします。

    マルチノードクラスターを使用する場合は、分散テーブルを作成することをお勧めします。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

Double レプリカ Edition

  1. ローカルテーブルを作成します。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = ReplicatedMergeTree()
    ORDER BY (id);
  2. オプション。分散テーブルを作成します。

    ローカルテーブルにのみデータをインポートする場合は、この手順をスキップします。

    マルチノードクラスターを使用する場合は、分散テーブルを作成することをお勧めします。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

ステップ 5:マテリアライズドビューを作成する

同期中、ApsaraDB for ClickHouse は、マテリアライズドビューを使用して Kafka 外部テーブルからソースデータを読み取り、ApsaraDB for ClickHouse ローカルテーブルにデータを挿入します。

マテリアライズドビューを作成するためのステートメントの例:

重要

SELECT 句のフィールドがデスティネーションテーブルスキーマと一致することを確認してください。または、変換関数を使用してデータ形式を変換し、スキーマの一貫性を確保することもできます。

CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;

次の表にパラメーターを示します。

パラメーター名

必須

説明

view_name

はい

ビューの名前。

consumer

dest_table

はい

Kafka データを格納するために使用されるデスティネーションテーブル。

  • Community 互換 Edition クラスター:

    • マルチノードクラスターを使用する場合は、分散テーブルにデータをインポートすることをお勧めします。

    • デスティネーションテーブルがローカルテーブルの場合、データはローカルテーブルに格納されます。

  • Enterprise Edition クラスター:Enterprise Edition クラスターは、分散テーブルをサポートしていません。データはローカルテーブルに格納されます。

  • Community 互換 Edition:kafka_table_distributed

  • Enterprise Edition:kafka_table_local

src_table

はい

Kafka 外部テーブル。

kafka_src_table

ステートメントの例:

Enterprise Edition

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;

Community 互換 Edition

次のステートメント例を実行して、ソースデータを kafka_table_distributed 分散テーブルに格納できます。

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

ステップ 6: データが同期されているかを確認する

  1. ApsaraMQ for Kafka インスタンスの Topic にメッセージを送信します。

    1. ApsaraMQ for Kafka コンソール にログインします。

    2. [インスタンス] ページで、管理するインスタンスの名前をクリックします。

    3. [トピック] ページで、必要なトピックを見つけ、[アクション] 列で [詳細] > [メッセージの送信] を選択します。

    4. [メッセージの作成と消費への送信] パネルで、送信するメッセージの内容を指定します。

      この例では、1,a2,b というメッセージが送信されます。

    5. [OK] をクリックします。

  2. ApsaraDB for ClickHouse データベースにログインし、分散テーブルをクエリして、データが同期されているかどうかを確認します。

    ApsaraDB for ClickHouse データベースへのログイン方法の詳細については、「DMS を使用して ClickHouse に接続する」をご参照ください。

    データ検証用のサンプル文:

    Enterprise Edition

    SELECT * FROM kafka_table_local; 

    Community 互換 Edition

    次の文は、分散テーブルをクエリする方法を示しています。

    • デスティネーションテーブルがローカルテーブルの場合、文中の分散テーブル名をローカルテーブル名に置き換える必要があります。

    • 複数のノードを持つ Community 互換 Edition クラスタを使用する場合は、分散テーブルからデータをクエリすることをお勧めします。そうしないと、クラスタ内の 1 つのノードからのみデータがクエリされ、結果が不完全になります。

    SELECT * FROM kafka_table_distributed; 

    クエリを実行して成功の結果を受け取った場合、ApsaraMQ for Kafka インスタンスから ApsaraDB for ClickHouse クラスタにデータが同期されていることを示します。

    サンプル結果:

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘

    クエリ結果が期待値と異なる場合は、「(オプション) ステップ 7: Kafka 外部テーブルの消費ステータスを表示する」に記載されている手順に従って、問題のトラブルシューティングを行うことができます。

(オプション) ステップ 7: Kafka 外部テーブルの消費ステータスを表示する

同期されたデータが ApsaraMQ for Kafka インスタンスのデータと一致しない場合は、システムテーブルを使用して Kafka 外部テーブルの消費ステータスを確認し、メッセージ消費の問題をトラブルシューティングできます。

Community 互換 Edition および Enterprise Edition V23.8 以降

system.kafka_consumers システムテーブルをクエリすることで、Kafka 外部テーブルの消費ステータスを表示します。サンプル文:

select * from system.kafka_consumers;

次の表は、system.kafka_consumers テーブルのフィールドについて説明しています。

フィールド名

説明

database

Kafka 外部テーブルが配置されているデータベース。

table

Kafka 外部テーブルの名前。

consumer_id

Kafka コンシューマー ID。

1 つのテーブルを複数のコンシューマーが消費できます。コンシューマーは、Kafka 外部テーブルの作成時に kafka_num_consumers パラメーターで指定されます。

assignments.topic

Kafka トピック。

assignments.partition_id

Kafka パーティション ID。

パーティションは 1 つのコンシューマーにのみ割り当てることができます。

assignments.current_offset

現在のオフセット。

exceptions.time

最新の 10 件の例外のタイムスタンプ。

exceptions.text

最新の 10 件の例外のテキスト。

last_poll_time

最新のポーリングのタイムスタンプ。

num_messages_read

コンシューマーが読み取ったメッセージの数。

last_commit_time

最新のコミットのタイムスタンプ。

num_commits

コンシューマーのコミットの総数。

last_rebalance_time

最新の Kafka リバランスのタイムスタンプ。

num_rebalance_revocations

コンシューマーのパーティションが取り消された回数。

num_rebalance_assignments

コンシューマーが Kafka クラスタに割り当てられた回数。

is_currently_used

コンシューマーが使用中かどうかを示します。

last_used

コンシューマーが最後に使用された時刻。単位: マイクロ秒。

rdkafka_stat

データベースの内部統計情報。詳細については、librdkafka をご参照ください。

デフォルト値は 3000 で、3 秒ごとに統計情報が生成されることを示します。

説明

ApsaraDB for ClickHouse が statistics_interval_ms=0 に設定されている場合、Kafka 外部テーブルの統計情報収集を無効にすることができます。

Community 互換 Edition V23.8 より前

system.kafka システムテーブルをクエリすることで、Kafka 外部テーブルの消費ステータスを表示します。サンプル文:

SELECT * FROM system.kafka;

次の表は、system.kafka テーブルのフィールドについて説明しています。

フィールド名

説明

database

Kafka 外部テーブルが配置されているデータベースの名前。

table

Kafka 外部テーブルの名前。

topic

Kafka 外部テーブルが消費するトピックの名前。

consumer_group

Kafka 外部テーブルが消費するグループの名前。

last_read_message_count

Kafka 外部テーブルがプルしたメッセージの数。

status

Kafka 外部テーブルの消費ステータス。有効な値:

  • ビューなし: Kafka 外部テーブルのビューは作成されていません。

  • ビューをアタッチ: Kafka 外部テーブルのビューが作成されています。

  • 正常: 正常な状態。

    Kafka 外部テーブルがデータを消費すると、Kafka 外部テーブルのステータスは 正常 になります。

  • 解析をスキップ: 解析エラーはスキップされます。

  • エラー: 消費の例外が発生しています。

exception

エラーの詳細。

説明

statuserror に設定されている場合、エラーの詳細が返されます。

よくある質問