このトピックでは、ApsaraMQ for Kafka インスタンスから ApsaraDB for ClickHouse クラスターにデータをリアルタイムで同期する方法について説明します。
制限事項
ApsaraMQ for Kafka インスタンスまたは Elastic Compute Service (ECS) インスタンスにデプロイされた自己管理 Kafka クラスタからのデータのみを ApsaraDB for ClickHouse インスタンスに同期できます。
前提条件
同期先 ApsaraDB for ClickHouse クラスター:
同期先クラスターが作成されていること。ソース ApsaraMQ for Kafka インスタンスと同期先クラスターは同じリージョンにデプロイされ、同じ VPC を使用していること。詳細については、「ApsaraDB for ClickHouse クラスターを作成する」をご参照ください。
同期先クラスターのデータベースにログインするためのアカウントが作成されており、データベースに対する操作を実行する権限を持っていること。詳細については、「アカウント管理」をご参照ください。
ソース ApsaraMQ for Kafka インスタンス:
トピックが作成されていること。詳細については、「ステップ 1: トピックを作成する」をご参照ください。
コンシューマーグループが作成されていること。詳細については、「ステップ 2: グループを作成する」をご参照ください。
使用上の注意
トピックが ApsaraDB for ClickHouse Kafka 外部テーブルによってサブスクライブされている場合、そのトピックは他のコンシューマーによって消費されることはできません。
Kafka 外部テーブル、マテリアライズドビュー、およびローカルテーブルを作成する場合は、3 つのテーブルのフィールドタイプが一致している必要があります。
手順
この例では、ApsaraMQ for Kafka インスタンスから、Community 互換 Edition を実行する ApsaraDB for ClickHouse クラスターのデフォルトデータベース内の kafka_table_distributed 分散テーブルにデータを同期する方法を示します。
ステップ 1: 同期の原則を理解する
ApsaraDB for ClickHouse は、組み込みの Kafka テーブルエンジンとマテリアライズドビューメカニズムを使用して、ApsaraMQ for Kafka インスタンスからデータを同期します。これにより、リアルタイムのデータ消費とストレージを実現します。次の図は、同期プロセスを示しています。
Kafka トピック: 同期するソースデータを指定します。
ApsaraDB for ClickHouse Kafka 外部テーブル: 指定された Kafka トピックからソースデータをプルします。
マテリアライズドビュー: Kafka 外部テーブルからソースデータを読み取り、ApsaraDB for ClickHouse ローカルテーブルにデータを挿入します。
ローカルテーブル: 同期されたデータを格納します。
ステップ 2: ApsaraDB for ClickHouse データベースにログインする
ApsaraDB for ClickHouse データベースへのログイン方法の詳細については、「DMS を使用して ApsaraDB for ClickHouse クラスターに接続する」をご参照ください。
ステップ 3: Kafka 外部テーブルを作成する
ApsaraDB for ClickHouse は、組み込みの Kafka テーブルエンジンを使用して、指定された Kafka トピックからソースデータをプルします。Kafka 外部テーブルには、次の特徴があります。
デフォルトでは、Kafka 外部テーブルを直接クエリすることはできません。
Kafka 外部テーブルは Kafka データの消費にのみ使用され、データを格納しません。マテリアライズドビューを使用してデータを処理し、同期先テーブルにデータを挿入して格納する必要があります。
テーブル作成のサンプル文:
Kafka 外部テーブルのフィールド形式は、Kafka データ型と一致している必要があります。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]次の表は、上記の文のパラメーターについて説明しています。
パラメーター | 必須 | 説明 |
kafka_broker_list | はい | ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイント。エンドポイントはカンマ (,) で区切ります。エンドポイントの表示方法の詳細については、「エンドポイントを表示する」をご参照ください。
|
kafka_topic_list | はい | ApsaraMQ for Kafka インスタンス内のトピックの名前。トピックの名前はカンマ (,) で区切ります。トピックの名前の表示方法の詳細については、「ステップ 1: トピックを作成する」をご参照ください。 |
kafka_group_name | はい | ApsaraMQ for Kafka インスタンス内のコンシューマーグループの名前。詳細については、「ステップ 2: グループを作成する」をご参照ください。 |
kafka_format | はい | ApsaraDB for ClickHouse でサポートされているメッセージ本文の形式。 説明 ApsaraDB for ClickHouse でサポートされているメッセージ本文の形式の詳細については、入力データと出力データの形式 をご参照ください。 |
kafka_row_delimiter | いいえ | 行を区切るために使用する行区切り文字。デフォルト値は \n です。データが書き込まれる実際の区切り形式に基づいて、このパラメーターを設定することもできます。 |
kafka_num_consumers | いいえ | テーブル内のデータを消費するコンシューマーの数。デフォルト値: 1。 説明
|
kafka_thread_per_consumer | いいえ | 各コンシューマーが消費のために独立したスレッドを開始するかどうかを指定します。デフォルト値: 0。有効な値:
消費速度を向上させる方法の詳細については、Kafka パフォーマンスの最適化 をご参照ください。 |
kafka_max_block_size | いいえ | 各バッチでテーブルに書き込むことができる Kafka メッセージの最大サイズ。デフォルト値: 65536。単位: バイト。 |
kafka_skip_broken_messages | いいえ | Kafka メッセージパーサーのダーティデータに対する許容範囲。デフォルト値: 0。 |
kafka_commit_every_batch | いいえ | コミット操作を実行する頻度を指定します。デフォルト値: 0。有効な値:
|
kafka_auto_offset_reset | いいえ | データソースとして使用される ApsaraMQ for Kafka インスタンス内のデータの消費を開始するオフセット。有効な値:
説明 このパラメーターは、バージョン 21.8 の ApsaraDB for ClickHouse クラスターではサポートされていません。 |
パラメーターの詳細については、Kafka をご参照ください。
サンプル文:
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- テーブルスキーマを定義するフィールド
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';ステップ 4: 格納用の同期先テーブルを作成する
テーブル作成文は、クラスターのエディションによって異なります。
Enterprise Edition クラスターの場合は、ローカルテーブルのみを作成する必要があります。Community 互換 Edition クラスターの場合は、ビジネス要件に基づいて分散テーブルを作成できます。テーブル作成文の詳細については、「CREATE TABLE」をご参照ください。サンプル文:
Enterprise Edition
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);上記の文を実行したときに ON CLUSTER is not allowed for Replicated database というエラーメッセージが表示された場合は、クラスターのマイナーエンジンバージョンを更新することでこの問題を解決できます。詳細については、「マイナーエンジンバージョンを更新する」をご参照ください。
Community 互換 Edition
Single レプリカ Edition クラスターと Double レプリカ Edition クラスターで使用されるテーブルエンジンは異なります。使用しているレプリカタイプに基づいてテーブルエンジンを選択してください。
Double レプリカ Edition クラスターでテーブルを作成する場合は、テーブルで MergeTree ファミリーの Replicated テーブルエンジンを使用していることを確認してください。テーブルで非 Replicated テーブルエンジンを使用すると、テーブル上のデータはレプリカ間で複製されません。これにより、データの不整合が発生する可能性があります。
Single レプリカ Edition
ローカルテーブルを作成します。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);オプション。分散テーブルを作成します。
ローカルテーブルにのみデータをインポートする場合は、この手順をスキップします。
マルチノードクラスターを使用する場合は、分散テーブルを作成することをお勧めします。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Double レプリカ Edition
ローカルテーブルを作成します。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);オプション。分散テーブルを作成します。
ローカルテーブルにのみデータをインポートする場合は、この手順をスキップします。
マルチノードクラスターを使用する場合は、分散テーブルを作成することをお勧めします。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
ステップ 5:マテリアライズドビューを作成する
同期中、ApsaraDB for ClickHouse は、マテリアライズドビューを使用して Kafka 外部テーブルからソースデータを読み取り、ApsaraDB for ClickHouse ローカルテーブルにデータを挿入します。
マテリアライズドビューを作成するためのステートメントの例:
SELECT 句のフィールドがデスティネーションテーブルスキーマと一致することを確認してください。または、変換関数を使用してデータ形式を変換し、スキーマの一貫性を確保することもできます。
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;次の表にパラメーターを示します。
パラメーター名 | 必須 | 説明 | 例 |
view_name | はい | ビューの名前。 | consumer |
dest_table | はい | Kafka データを格納するために使用されるデスティネーションテーブル。
|
|
src_table | はい | Kafka 外部テーブル。 | kafka_src_table |
ステートメントの例:
Enterprise Edition
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;Community 互換 Edition
次のステートメント例を実行して、ソースデータを kafka_table_distributed 分散テーブルに格納できます。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;ステップ 6: データが同期されているかを確認する
ApsaraMQ for Kafka インスタンスの Topic にメッセージを送信します。
ApsaraMQ for Kafka コンソール にログインします。
[インスタンス] ページで、管理するインスタンスの名前をクリックします。
[トピック] ページで、必要なトピックを見つけ、[アクション] 列で を選択します。
[メッセージの作成と消費への送信] パネルで、送信するメッセージの内容を指定します。
この例では、
1,aと2,bというメッセージが送信されます。[OK] をクリックします。
ApsaraDB for ClickHouse データベースにログインし、分散テーブルをクエリして、データが同期されているかどうかを確認します。
ApsaraDB for ClickHouse データベースへのログイン方法の詳細については、「DMS を使用して ClickHouse に接続する」をご参照ください。
データ検証用のサンプル文:
Enterprise Edition
SELECT * FROM kafka_table_local;Community 互換 Edition
次の文は、分散テーブルをクエリする方法を示しています。
デスティネーションテーブルがローカルテーブルの場合、文中の分散テーブル名をローカルテーブル名に置き換える必要があります。
複数のノードを持つ Community 互換 Edition クラスタを使用する場合は、分散テーブルからデータをクエリすることをお勧めします。そうしないと、クラスタ内の 1 つのノードからのみデータがクエリされ、結果が不完全になります。
SELECT * FROM kafka_table_distributed;クエリを実行して成功の結果を受け取った場合、ApsaraMQ for Kafka インスタンスから ApsaraDB for ClickHouse クラスタにデータが同期されていることを示します。
サンプル結果:
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘クエリ結果が期待値と異なる場合は、「(オプション) ステップ 7: Kafka 外部テーブルの消費ステータスを表示する」に記載されている手順に従って、問題のトラブルシューティングを行うことができます。
(オプション) ステップ 7: Kafka 外部テーブルの消費ステータスを表示する
同期されたデータが ApsaraMQ for Kafka インスタンスのデータと一致しない場合は、システムテーブルを使用して Kafka 外部テーブルの消費ステータスを確認し、メッセージ消費の問題をトラブルシューティングできます。
Community 互換 Edition および Enterprise Edition V23.8 以降
system.kafka_consumers システムテーブルをクエリすることで、Kafka 外部テーブルの消費ステータスを表示します。サンプル文:
select * from system.kafka_consumers;次の表は、system.kafka_consumers テーブルのフィールドについて説明しています。
フィールド名 | 説明 |
database | Kafka 外部テーブルが配置されているデータベース。 |
table | Kafka 外部テーブルの名前。 |
consumer_id | Kafka コンシューマー ID。 1 つのテーブルを複数のコンシューマーが消費できます。コンシューマーは、Kafka 外部テーブルの作成時に kafka_num_consumers パラメーターで指定されます。 |
assignments.topic | Kafka トピック。 |
assignments.partition_id | Kafka パーティション ID。 パーティションは 1 つのコンシューマーにのみ割り当てることができます。 |
assignments.current_offset | 現在のオフセット。 |
exceptions.time | 最新の 10 件の例外のタイムスタンプ。 |
exceptions.text | 最新の 10 件の例外のテキスト。 |
last_poll_time | 最新のポーリングのタイムスタンプ。 |
num_messages_read | コンシューマーが読み取ったメッセージの数。 |
last_commit_time | 最新のコミットのタイムスタンプ。 |
num_commits | コンシューマーのコミットの総数。 |
last_rebalance_time | 最新の Kafka リバランスのタイムスタンプ。 |
num_rebalance_revocations | コンシューマーのパーティションが取り消された回数。 |
num_rebalance_assignments | コンシューマーが Kafka クラスタに割り当てられた回数。 |
is_currently_used | コンシューマーが使用中かどうかを示します。 |
last_used | コンシューマーが最後に使用された時刻。単位: マイクロ秒。 |
rdkafka_stat | データベースの内部統計情報。詳細については、librdkafka をご参照ください。 デフォルト値は 3000 で、3 秒ごとに統計情報が生成されることを示します。 説明 ApsaraDB for ClickHouse が statistics_interval_ms=0 に設定されている場合、Kafka 外部テーブルの統計情報収集を無効にすることができます。 |
Community 互換 Edition V23.8 より前
system.kafka システムテーブルをクエリすることで、Kafka 外部テーブルの消費ステータスを表示します。サンプル文:
SELECT * FROM system.kafka;次の表は、system.kafka テーブルのフィールドについて説明しています。
フィールド名 | 説明 |
database | Kafka 外部テーブルが配置されているデータベースの名前。 |
table | Kafka 外部テーブルの名前。 |
topic | Kafka 外部テーブルが消費するトピックの名前。 |
consumer_group | Kafka 外部テーブルが消費するグループの名前。 |
last_read_message_count | Kafka 外部テーブルがプルしたメッセージの数。 |
status | Kafka 外部テーブルの消費ステータス。有効な値:
|
exception | エラーの詳細。 説明 status が error に設定されている場合、エラーの詳細が返されます。 |