SAP HANA Connector について、その構文構造、WITH パラメーター、および Dataphin 内での使用例を含めて、SAP HANA Connectorを詳しく見ていきましょう。
背景情報
SAP HANA は、インメモリ ストレージを活用してデータ処理速度を向上させるマルチモデル データベースであり、高度な分析と高速トランザクションの両方を統合プラットフォーム上で実現します。
サポートされているバージョン
Flink バージョン | SAP HANA バージョン | SAP HANA ドライバー バージョン |
1.15.3 | 無制限 | 2.19.16 |
SAP HANA コネクタは、特定の JDBC ドライバー属性に依存しません。ドライバーが SAP HANA データベースへの接続を確立できる限り、一般的に互換性があります。
制限事項
同時読み取りは一度に 1 つに制限されています。大規模なデータセットの場合、完全な読み取りでパフォーマンスのボトルネックが発生する可能性があります。オフライン処理などの代替ソリューションをお勧めします。
コネクタは単一テーブルからのデータ収集のみをサポートしています。複数テーブルおよびデータベース全体の収集はサポートされていません。
システムは、
クエリを事前に設定された頻度で実行します。手動で構成するスケジュールされたリクエスト時間 を手動で構成する必要があります。構成間隔が長いとリアルタイムの応答性が低下する可能性があり、間隔が短いほどデータベースのクエリ負荷が増加する可能性があります。システムは
INSERTタイプのデータを収集します。CDCに類似したデータ変更をキャプチャするには、プライマリキーGROUP BY句に基づいてCDCデータを生成するなど、代替の技術ソリューションが必要です。ソーステーブルには、更新時間を記録するための
update_timeフィールドが含まれている必要があり、各レコードの更新でこのフィールドの値を変更する必要があります(システムはこのフィールドを使用して、レコードが更新されたかどうかを判断します)。システムは、
update_timeオフセットに基づいて更新されたデータを取得します。update_timeが実際のデータベース書き込み時間よりも早い場合、システムはこのデータを見逃す可能性があります。(システムにはデフォルトで 5 秒の待機時間が含まれています)。データ内の update_time は連続している必要があります。データ A の
update_timeがデータ B のupdate_timeよりも早い場合、データの損失を防ぐために、データ B の前にデータ A がクエリに表示される必要があります。データ量が多いテーブル(10,000 行を超える)では、パフォーマンスが大幅に低下する可能性があります。このようなテーブルの場合、
update_time列にインデックスを作成することをお勧めします。デフォルトでは、Flink SQL タスクによって読み取られるデータは
INSERTタイプです。ChangeLogデータに変換するには、変換を自分で処理する必要があります(たとえば、プライマリキーで集計するか、更新時間でソートして最新のデータを取得するなど)。プライマリキーが更新されると、データに不整合が生じる可能性があるため、注意して進めてください。コネクタは、
DELETE操作のデータ収集をサポートしていません。ダウンストリームの削除と収集が必要な場合は、論理削除の削除フィールドを構成することを検討してください(データを物理的に削除せずに削除済みとしてマークします)。
パフォーマンス リファレンス
テスト環境では、1CU の Yarn リソース(1 CPU、4GB メモリ、単一同時実行)を使用して、3000RPS の消費率が達成されました。
実際のタスク消費率は、特定のタスクロジック、入出力ネットワーク帯域幅、フィールドサイズ、その他の要因によって異なる場合があります。このパラメーター値は参考値です。
コネクタ関連の説明
コネクタは次のカテゴリをサポートしています。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル |
ネイティブ DDL アクセス | サポートされています |
データソースコーデックアクセス | サポートされていません |
メタテーブル | サポートされています |
実行モード | ストリームモード |
データ形式 | 該当なし |
一意の監視メトリック | なし |
API タイプ | SQL |
構文構造
CREATE TABLE input_table (
id BIGINT
name STRING,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'scan-jdbc',
'driver' = 'com.sap.db.jdbc.Driver’,
'url' = '<JDBC URL>',
'table-name' = '<yourTableName>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'scan-period.seconds' = '<PeriodSeconds>',
'scan.start.time' = '<StartTime>',
'scan.table.time.column' = '<UpdateTime>'
);WITH パラメーター
全般
```html
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルタイプを指定します。
String
はい
なし
静的フィールドは
scan-jdbcです。driver
JDBC ドライバーを指定します。
String
はい
なし
静的フィールドは
com.sap.db.jdbc.Driverです。url
データベース接続の JDBC URL。
String
はい
なし
形式は
jdbc:sap://host:port/databaseです。table-name
読み取るテーブルの名前。
String
はい
なし
データを読み取るテーブルを指定します。
username
データベースアクセス用のユーザー名。
String
はい
なし
SAP HANA アクセス用のユーザー名を入力します。
password
データベースアクセス用のパスワード。
String
はい
なし
SAP HANA アクセス用のパスワードを入力します。
ソーステーブル
パラメーター
説明
データ型
必須
デフォルト値
備考
scan.period.seconds
スキャン期間を定義します。
Integer
いいえ
10
単位:秒。
scan.start.time
更新の読み取りを開始する時刻を決定します。
Date
いいえ
現在時刻
形式は
yyyy-MM-dd hh-mm-ssです。または、タイムゾーン付きでyyyy-MM-dd hh-mm-ss Zです。説明完全読み取りの後に増分読み取りを行う場合は、「earliest」に設定できます。
パラメーターの例:
earliest2024-03-01 23:55:552024-03-01 23:55:55 +0800
scan.table.time.column
行の更新時刻を決定するために使用される列を指定します。
Timestamp
はい
なし
行の変更を追跡するために更新時刻を反映する列を示します。
timestampタイプである必要があります。scan.delay.seconds
データベースにクエリを実行する前の遅延を指定します。
Integer
いいえ
5
データベースにクエリを実行する前の遅延を秒単位で指定します。たとえば、
update_time<'2024-03-08 12:00:00'のクエリを設定した場合、時間のずれを補正し、データ遅延を削減するために、2024-03-08 12:00:05に実行されます。
型のマッピング
SAP HANA タイプ | Flink SQL タイプ |
DATE | DATE |
TIME | TIME |
SECONDDTE | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
SMALLDECIMAL | サポートされていません |
DECIMAL | DECIMAL |
REAL | FLOAT |
DOUBLE | DOUBLE |
FLOAT(n) | DOUBLE に対応 |
BOOLEAN | BOOLEAN |
VARCHAR | STRING |
NVARCHAR | STRING |
ALPHANUM | STRING |
短いテキスト | String |
VARBINARY | VARBINARY |
BLOB | サポートされていません |
CLOB | サポートされていません |
NCLOB | サポートされていません |
テキスト | String |
ARRAY | サポートされていません |
ST_GEOMETRY | サポートされていません |
ST_POINT | サポートされていません |
使用例
ソーステーブル
CREATE TABLE input_table ( id BIGINT name STRING, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'scan-jdbc', 'driver' = 'com.sap.db.jdbc.Driver’, 'url' = jdbc:sap://127.***.***.1:39041?databaseName=HXE', 'table-name' = 'table_id_name_time', 'username' = 'root', 'password' = 'root', 'scan-period.seconds' = '20', 'scan.start.time' = '2024-02-20 18:30:00', 'scan.table.time.column' = 'update_time' );
タイムゾーン設定
SAP HANA データベースのタイムゾーンが Flink タスクのタイムゾーンと異なる場合は、タイムゾーン設定を適宜調整する必要があります。そうしないと、この不一致により、変更データの読み取り失敗やデータの重複読み取りなどの問題が発生する可能性があります。 SAP HANA JDBC ドライバーはタイムゾーンパラメーターの設定をサポートしていないため、JVM のデフォルトのタイムゾーン(user.timezone)が使用されます。これをオーバーライドするには、Flink SQL タスクの Flink 構成に次のパラメーターを追加します。
JVM タイムゾーンの設定は、Flink タスクにおけるすべての時間関連処理に影響することに注意してください。そのため、この変更を行う際には注意が必要です。
env.java.opts: -Duser.timezone=xxx
-- UTC 時間を設定する場合:UTC は大文字にする必要があることに注意してください
env.java. opts: -Duser.timezone=UTC
-- 北京時間を設定する場合:
env.java.opts: -Duser.timezone=GMT+8