すべてのプロダクト
Search
ドキュメントセンター

:Lindorm CDCデータソースのデータへのアクセス

最終更新日:Jan 14, 2025

Lindormは、Lindorm Distributed Processing System(LDPS)という計算エンジンサービスを提供しています。 Lindormインスタンスに対してLDPSがアクティブ化されると、Lindorm Change Data Capture(CDC)データソースがLindormインスタンスに割り当てられます。 Lindormインスタンスに対してアクティブ化されている他のエンジンサービスに格納されているデータの変更は、CDCデータソースに同期されます。 Spark SQLを使用して、CDCデータソースからこれらのデータ変更をクエリできます。

前提条件

  • Lindorm Tunnel Service(LTS)がLindormインスタンスに対してアクティブ化されています。 詳細については、LTSサービスを購入してLTS Web UIにログオンするをご参照ください。
  • LindormTableのサブスクリプションチャネルが作成されています。 詳細については、データサブスクリプション用のプルチャネルを作成するをご参照ください。
    説明 サブスクリプションチャネルを作成する際は、以下の点にご注意ください。
    • [メッセージの列名にファミリープレフィックスを無視する] を選択しないでください。
    • [json][シリアル化タイプ] パラメーターに選択します。
    • 1つのトピック名は、1つのLindormテーブル名にのみ対応します。
  • HBaseテーブルのLINDORM_HBASE_CATALOG属性を設定します。 詳細については、LindormTableのデータにアクセスするをご参照ください。
    説明 LINDORM_HBASE_CATALOG属性は、Spark SQLスキーマとHBaseテーブルのスキーマ間のマッピングを指定します。 Lindorm CDCデータソースは、この属性の値に基づいてHBaseテーブルのスキーマを抽出します。

制限事項

  • HBaseテーブルのみがサポートされています。 HBaseテーブルとは、HBaseクライアントを使用してLindormTableにデータが書き込まれるテーブルです。
  • リアルタイム変更追跡機能では、[JSON] 形式のファイルのみを使用できます。

ジョブの送信方法

Lindorm CDCデータソースのSparkジョブを記述して送信するには、次のいずれかの方法を使用できます。
説明 Lindorm CDCデータソースとのデータの読み書きに使用される構文については、Lindorm CDCデータソースを設定するをご参照ください。

Lindorm CDCデータソースを設定する

Lindorm CDCデータソースのテーブルスキーマとデータベーススキーマ

  • LDPSによって提供されるLindorm CDCデータソースの名前はlindorm_cdcです。
  • Lindorm CDCデータソースのネームスペースは管理できません。 Lindorm CDCデータソースのテーブルのみを管理できます。 Lindorm CDCデータソースのテーブルは、データサブスクリプションチャネルの作成時に指定した [トピック] と同じ名前を使用します。

Lindorm CDCデータソースのスキーマ

Lindorm CDCデータソースは、LINDORM_HBASE_CATALOG属性に基づいてHBaseテーブルのスキーマを抽出し、抽出されたスキーマをLindorm CDCデータソースのスキーマとして使用します。 Lindorm CDCデータソースはKafkaからデータを読み取ります。 各操作レコードが保存されます。 次の表に、Lindorm CDCデータソースのスキーマでサポートされているメタフィールドを示します。
フィールドカテゴリ説明構成
_cdc_timestamp_kafkalong操作レコードがKafkaに書き込まれたときのタイムスタンプ。 単位:ミリ秒。構成は不要です。 スキーマに含まれているデフォルトの構成値が使用されます。
_cdc_operation_typestring操作レコードの変更タイプ。
  • C:データを追加します。
  • U:データを更新します。
  • D:データを削除します。
構成は不要です。 スキーマに含まれているデフォルトの構成値が使用されます。
_cdc_timestamp_lindormlongLDPS以外のLindormエンジンサービスによって操作レコードが処理されたときのタイムスタンプ。 単位:ミリ秒。spark.sql.catalog.lindorm_cdc.lindormTsEnabled
_cdc_timestamp_ltslongLTSによって操作レコードが処理されたときのタイムスタンプ。 単位:ミリ秒。spark.sql.catalog.lindorm_cdc.ltsTsEnabled

Lindorm CDCデータソースの構成項目

次の表に、Lindorm CDCデータソースの構成項目を示します。
構成項目必須説明
spark.sql.catalog.lindorm_cdc.username
  • JARジョブまたはPythonジョブを送信する場合は、このパラメーターが必要です。
  • SQLジョブを送信する場合は、このパラメーターはオプションです。 この場合、システムはこのパラメーターに自動的に値を割り当てます。
LindormTableへの接続に使用するユーザー名。root(デフォルトのユーザー名)
spark.sql.catalog.lindorm_cdc.password
  • JARジョブまたはPythonジョブを送信する場合は、このパラメーターが必要です。
  • SQLジョブを送信する場合は、このパラメーターはオプションです。 この場合、システムはこのパラメーターに自動的に値を割り当てます。
LindormTableへの接続に使用するパスワード。root(デフォルトのパスワード)
spark.sql.catalog.lindorm_cdc.lindormTsEnabledいいえLindormが操作レコードを処理したときのタイムスタンプをスキーマに含めるかどうかを指定します。 デフォルト値はfalseです。 このパラメーターをtrueに設定すると、_cdc_timestamp_lindormフィールドがLindorm CDCデータソースのスキーマに追加されます。true
spark.sql.catalog.lindorm_cdc.ltsTsEnabledいいえLTSが操作レコードを処理したときのタイムスタンプをスキーマに含めるかどうかを指定します。 デフォルト値はfalseです。 このパラメーターをtrueに設定すると、_cdc_timestamp_ltsフィールドがLindorm CDCデータソースのスキーマに追加されます。true

Lindorm CDCデータソースでサポートされているステートメント

次の表に、Lindorm CDCデータソースで実行できるステートメントを示します。
ステートメント説明
USE table_name指定されたテーブルを使用します。USE test
SHOW TABLESすべてのテーブルを表示します。SHOW TABLES
DESCRIBE table_name指定されたテーブルの詳細を表示します。DESC test または DESCRIBE test
SELECTSELECTステートメントの詳細については、Spark SQL をご参照ください。
説明 SELECTステートメントを実行する場合は、次の項目に注意してください。
  • _cdc_timestamp_kafka > $startTimestamp and _cdc_timestamp_kafka < $endTimestamp を使用して、読み取るデータの範囲を指定する必要があります。
  • _cdc_operation_type フィールドの値がDに設定されている場合、row key として指定されているフィールドの値のみが表示されます。 他のフィールドには空の文字列が表示されます。