すべてのプロダクト
Search
ドキュメントセンター

Dataphin:SAP HANA

最終更新日:Feb 06, 2025

SAP HANA Connector について、その構文構造、WITH パラメーター、および Dataphin 内での使用例を含めて、SAP HANA Connectorを詳しく見ていきましょう。

背景情報

SAP HANA は、インメモリ ストレージを活用してデータ処理速度を向上させるマルチモデル データベースであり、高度な分析と高速トランザクションの両方を統合プラットフォーム上で実現します。

サポートされているバージョン

Flink バージョン

SAP HANA バージョン

SAP HANA ドライバー バージョン

1.15.3

無制限

2.19.16

説明

SAP HANA コネクタは、特定の JDBC ドライバー属性に依存しません。ドライバーが SAP HANA データベースへの接続を確立できる限り、一般的に互換性があります。

制限事項

  • 同時読み取りは一度に 1 つに制限されています。大規模なデータセットの場合、完全な読み取りでパフォーマンスのボトルネックが発生する可能性があります。オフライン処理などの代替ソリューションをお勧めします。

  • コネクタは単一テーブルからのデータ収集のみをサポートしています。複数テーブルおよびデータベース全体の収集はサポートされていません。

  • システムは、クエリを事前に設定された頻度で実行します。手動で構成するスケジュールされたリクエスト時間 を手動で構成する必要があります。構成間隔が長いとリアルタイムの応答性が低下する可能性があり、間隔が短いほどデータベースのクエリ負荷が増加する可能性があります。

  • システムは INSERT タイプのデータを収集します。CDC に類似したデータ変更をキャプチャするには、プライマリキー GROUP BY 句に基づいて CDC データを生成するなど、代替の技術ソリューションが必要です。

  • ソーステーブルには、更新時間を記録するための update_time フィールドが含まれている必要があり、各レコードの更新でこのフィールドの値を変更する必要があります(システムはこのフィールドを使用して、レコードが更新されたかどうかを判断します)。

  • システムは、update_time オフセットに基づいて更新されたデータを取得します。update_time が実際のデータベース書き込み時間よりも早い場合、システムはこのデータを見逃す可能性があります。(システムにはデフォルトで 5 秒の待機時間が含まれています)。

  • データ内の update_time は連続している必要があります。データ A の update_time がデータ B の update_time よりも早い場合、データの損失を防ぐために、データ B の前にデータ A がクエリに表示される必要があります。

  • データ量が多いテーブル(10,000 行を超える)では、パフォーマンスが大幅に低下する可能性があります。このようなテーブルの場合、update_time 列にインデックスを作成することをお勧めします。

  • デフォルトでは、Flink SQL タスクによって読み取られるデータは INSERT タイプです。ChangeLog データに変換するには、変換を自分で処理する必要があります(たとえば、プライマリキーで集計するか、更新時間でソートして最新のデータを取得するなど)。プライマリキーが更新されると、データに不整合が生じる可能性があるため、注意して進めてください

  • コネクタは、DELETE 操作のデータ収集をサポートしていません。ダウンストリームの削除と収集が必要な場合は、論理削除の削除フィールドを構成することを検討してください(データを物理的に削除せずに削除済みとしてマークします)。

パフォーマンス リファレンス

テスト環境では、1CU の Yarn リソース(1 CPU、4GB メモリ、単一同時実行)を使用して、3000RPS の消費率が達成されました。

重要

実際のタスク消費率は、特定のタスクロジック、入出力ネットワーク帯域幅、フィールドサイズ、その他の要因によって異なる場合があります。このパラメーター値は参考値です。

コネクタ関連の説明

コネクタは次のカテゴリをサポートしています。

カテゴリ

詳細

サポートされているタイプ

ソーステーブル

ネイティブ DDL アクセス

サポートされています

データソースコーデックアクセス

サポートされていません

メタテーブル

サポートされています

実行モード

ストリームモード

データ形式

該当なし

一意の監視メトリック

なし

API タイプ

SQL

構文構造

CREATE TABLE input_table (
  id BIGINT
  name STRING,
  update_time TIMESTAMP(3)
) WITH (
    'connector' = 'scan-jdbc',
    'driver' = 'com.sap.db.jdbc.Driver’,
    'url' = '<JDBC URL>',
    'table-name' = '<yourTableName>',
    'username' = '<yourUsername>',
    'password' = '<yourPassword>',
    'scan-period.seconds' = '<PeriodSeconds>',
    'scan.start.time' = '<StartTime>',
    'scan.table.time.column' = '<UpdateTime>'
);

WITH パラメーター

  • 全般

    ```html

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルタイプを指定します。

    String

    はい

    なし

    静的フィールドは scan-jdbc です。

    driver

    JDBC ドライバーを指定します。

    String

    はい

    なし

    静的フィールドは com.sap.db.jdbc.Driver です。

    url

    データベース接続の JDBC URL。

    String

    はい

    なし

    形式は jdbc:sap://host:port/database です。

    table-name

    読み取るテーブルの名前。

    String

    はい

    なし

    データを読み取るテーブルを指定します。

    username

    データベースアクセス用のユーザー名。

    String

    はい

    なし

    SAP HANA アクセス用のユーザー名を入力します。

    password

    データベースアクセス用のパスワード。

    String

    はい

    なし

    SAP HANA アクセス用のパスワードを入力します。

  • ソーステーブル

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    scan.period.seconds

    スキャン期間を定義します。

    Integer

    いいえ

    10

    単位:秒。

    scan.start.time

    更新の読み取りを開始する時刻を決定します。

    Date

    いいえ

    現在時刻

    形式は yyyy-MM-dd hh-mm-ss です。または、タイムゾーン付きで yyyy-MM-dd hh-mm-ss Z です。

    説明

    完全読み取りの後に増分読み取りを行う場合は、「earliest」に設定できます。

    パラメーターの例:

    • earliest

    • 2024-03-01 23:55:55

    • 2024-03-01 23:55:55 +0800

    scan.table.time.column

    行の更新時刻を決定するために使用される列を指定します。

    Timestamp

    はい

    なし

    行の変更を追跡するために更新時刻を反映する列を示します。timestamp タイプである必要があります。

    scan.delay.seconds

    データベースにクエリを実行する前の遅延を指定します。

    Integer

    いいえ

    5

    データベースにクエリを実行する前の遅延を秒単位で指定します。たとえば、update_time<'2024-03-08 12:00:00' のクエリを設定した場合、時間のずれを補正し、データ遅延を削減するために、2024-03-08 12:00:05 に実行されます。

型のマッピング

SAP HANA タイプ

Flink SQL タイプ

DATE

DATE

TIME

TIME

SECONDDTE

TIMESTAMP

TIMESTAMP

TIMESTAMP

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

SMALLDECIMAL

サポートされていません

DECIMAL

DECIMAL

REAL

FLOAT

DOUBLE

DOUBLE

FLOAT(n)

DOUBLE に対応

BOOLEAN

BOOLEAN

VARCHAR

STRING

NVARCHAR

STRING

ALPHANUM

STRING

短いテキスト

String

VARBINARY

VARBINARY

BLOB

サポートされていません

CLOB

サポートされていません

NCLOB

サポートされていません

テキスト

String

ARRAY

サポートされていません

ST_GEOMETRY

サポートされていません

ST_POINT

サポートされていません

使用例

  • ソーステーブル

    CREATE TABLE input_table (
      id BIGINT
      name STRING,
      update_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'scan-jdbc',
        'driver' = 'com.sap.db.jdbc.Driver’,
        'url' = jdbc:sap://127.***.***.1:39041?databaseName=HXE',
        'table-name' = 'table_id_name_time',
        'username' = 'root',
        'password' = 'root',
        'scan-period.seconds' = '20',
        'scan.start.time' = '2024-02-20 18:30:00',
        'scan.table.time.column' = 'update_time'
    );

タイムゾーン設定

SAP HANA データベースのタイムゾーンが Flink タスクのタイムゾーンと異なる場合は、タイムゾーン設定を適宜調整する必要があります。そうしないと、この不一致により、変更データの読み取り失敗やデータの重複読み取りなどの問題が発生する可能性があります。 SAP HANA JDBC ドライバーはタイムゾーンパラメーターの設定をサポートしていないため、JVM のデフォルトのタイムゾーン(user.timezone)が使用されます。これをオーバーライドするには、Flink SQL タスクの Flink 構成に次のパラメーターを追加します。

重要

JVM タイムゾーンの設定は、Flink タスクにおけるすべての時間関連処理に影響することに注意してください。そのため、この変更を行う際には注意が必要です。

env.java.opts: -Duser.timezone=xxx

-- UTC 時間を設定する場合:UTC は大文字にする必要があることに注意してください
env.java. opts: -Duser.timezone=UTC

-- 北京時間を設定する場合:
env.java.opts: -Duser.timezone=GMT+8