このトピックでは、Object Storage Service (OSS) コネクタの使用方法について説明します。
Alibaba Cloud Object Storage Service (OSS) は、大量のデータを保存するための、安全でコスト効率が高く、信頼性の高いクラウドストレージサービスです。99.9999999999% (トゥエルブナイン) のデータ耐久性と 99.995% のデータ可用性を提供します。OSS は、ストレージコストの最適化に役立つ複数のストレージクラスを提供します。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブルと結果テーブル |
実行モード | バッチモードとストリームモード |
データ形式 | Orc、Parquet、Avro、Csv、JSON、Raw |
特定のモニタリングメトリック | なし |
API タイプ | DataStream と SQL |
結果テーブルでのデータの更新または削除のサポート | データの挿入のみが可能です。結果テーブルのデータを更新または削除することはできません。 |
制限事項
全般
Ververica Runtime (VVR) 11 以降のバージョンのみが、GZIP、BZIP2、XZ、DEFLATE などの圧縮ファイルの OSS からの読み取りをサポートします。VVR 8 は圧縮ファイルを正しく処理できません。
8.0.6 より前の VVR バージョンでは、同じアカウントに属する OSS バケットとの間でのデータの読み書きのみがサポートされています。アカウント間でデータにアクセスするには、VVR 8.0.6 以降のバージョンを使用し、バケット認証を設定する必要があります。詳細については、「バケット認証の設定」をご参照ください。
新しいパーティションの増分読み取りはサポートされていません。
結果テーブルのみ
OSS にデータを書き込む際、Avro、CSV、JSON、Raw などの行ストアフォーマットはサポートされていません。詳細については、「FLINK-30635」をご参照ください。
構文
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);メタデータ列
ソーステーブルのメタデータ列を読み取って、OSS データに関するメタデータを取得できます。たとえば、OSS ソーステーブルでメタデータ列 file.path を定義した場合、この列の値は行を含むファイルのパスになります。次の例は、メタデータ列の使用方法を示しています。
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)次の表に、OSS ソーステーブルでサポートされているメタデータ列を示します。
キー | データ型 | 説明 |
file.path | STRING NOT NULL | 行を含むファイルのパス。 |
file.name | STRING NOT NULL | 行を含むファイルの名前。これはファイルパスの最後の要素です。 |
file.size | BIGINT NOT NULL | 行を含むファイルのサイズ (バイト単位)。 |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 行を含むファイルの変更時刻。 |
WITH パラメーター
全般
パラメーター
説明
データ型
必須
デフォルト値
注
connector
テーブルのタイプ。
String
はい
なし
値は
filesystemである必要があります。path
ファイルシステムパス。
String
はい
なし
パスは
oss://my_bucket/my_pathのような URI フォーマットである必要があります。説明VVR 8.0.6 以降では、このパラメーターを設定した後、指定されたファイルシステムパスとの間でデータを読み書きするためにバケット認証も設定する必要があります。詳細については、「バケット認証の設定」をご参照ください。
format
ファイル形式。
String
はい
なし
有効な値:
csv
json
avro
parquet
orc
raw
ソーステーブル固有
パラメーター
説明
データ型
必須
デフォルト値
注
source.monitor-interval
新しいファイルを監視する間隔。値は 0 より大きい必要があります。
Duration
いいえ
なし
このパラメーターを設定しない場合、指定されたパスは一度だけスキャンされ、ソースは有界になります。
各ファイルはそのパスによって識別され、一度だけ処理されます。
処理されたファイルは、ソースのライフサイクル中、状態に保存されます。状態はチェックポイントとセーブポイント中に保存されます。間隔を短くすると、新しいファイルをより速く検出できますが、ファイルシステムまたはオブジェクトストアのスキャン頻度が高くなります。
結果テーブル固有
パラメーター
説明
データ型
必須
デフォルト値
注
partition.default-name
パーティションフィールドが NULL または空文字列の場合のパーティション名。
String
いいえ
_DEFAULT_PARTITION__
なし。
sink.rolling-policy.file-size
ローリングされる前のファイルの最大サイズ。
MemorySize
いいえ
128 MB
ディレクトリに書き込まれたデータは、パートファイルに分割されます。パーティションのデータを受信するシンクの各サブタスクは、そのパーティションに対して少なくとも 1 つのパートファイルを作成します。ローリングポリシーに基づいて、現在進行中のパートファイルが閉じられ、新しいファイルが作成されます。ポリシーは、パートファイルのサイズと開いたままにできる最大時間に基づいてパートファイルをローリングします。
説明列ストアフォーマットの場合:
ファイルは、ローリングポリシーの基準を満たしていなくても、チェックポイント中に常にローリングされます。
ファイルは、ポリシー基準を満たしたとき、またはチェックポイントが発生したときにローリングされます。
行ストアフォーマットの場合、ファイルはローリングポリシーの基準を満たしたときにのみローリングされます。
sink.rolling-policy.rollover-interval
パートファイルがローリングされる前に開いたままにできる最大時間。
Duration
いいえ
30min
チェック頻度は sink.rolling-policy.check-interval プロパティによって制御されます。
sink.rolling-policy.check-interval
時間ベースのローリングポリシーをチェックする間隔。
Duration
いいえ
1min
このプロパティは、sink.rolling-policy.rollover-interval プロパティに基づいてファイルをローリングすべきかどうかをチェックする頻度を制御します。
auto-compaction
ストリーミング結果テーブルの自動コンパクションを有効にするかどうか。データはまず一時ファイルに書き込まれます。チェックポイントが完了すると、そのチェックポイントからの一時ファイルがマージされます。一時ファイルはマージされるまで表示されません。
Boolean
いいえ
false
ファイルコンパクションを有効にすると、小さいファイルはターゲットファイルサイズに基づいて大きいファイルにマージされます。本番環境でファイルコンパクションを使用する場合は、次の点に注意してください:
チェックポイント内のファイルのみがマージされます。各チェックポイントに対して少なくとも 1 つのファイルが生成されます。
ファイルはコンパクション前には表示されません。データ可視性遅延は
チェックポイント間隔 + コンパクション時間です。コンパクション時間が長いと、バックプレッシャーが発生し、チェックポイントに必要な時間が長くなる可能性があります。
compaction.file-size
コンパクションされたファイルのターゲットサイズ。
MemorySize
いいえ
128 MB
デフォルト値は、sink.rolling-policy.file-size で指定されたローリングファイルサイズと同じです。
sink.partition-commit.trigger
パーティションをコミットするトリガーのタイプ。
String
いいえ
process-time
パーティションテーブルへの書き込みには、Flink は 2 種類のパーティションコミットトリガーを提供します:
process-time:パーティションコミットトリガーは、パーティション作成時間と現在のシステム時間に基づきます。パーティション時間エクストラクターやウォーターマークジェネレーターは必要ありません。現在のシステム時間が、パーティション作成システム時間と sink.partition-commit.delay の値の合計を超えると、パーティションはすぐにコミットされます。このトリガーはより一般的ですが、精度は低くなります。たとえば、データ遅延や障害により、パーティションが早期にコミットされる可能性があります。
partition-time:このトリガーは、抽出されたパーティション時間に基づいており、ウォーターマークの生成が必要です。ジョブはウォーターマーク生成をサポートする必要があり、パーティションは時間 (時間単位や日単位など) に基づいて作成されます。ウォーターマークが、パーティション作成システム時間と sink.partition-commit.delay の値の合計を超えると、パーティションはすぐにコミットされます。
sink.partition-commit.delay
パーティションがコミットされるまでの最大遅延。これは、この遅延が経過するまでパーティションがコミットされないことを意味します。
Duration
いいえ
0s
パーティションが日単位で作成される場合は、これを
1 dに設定できます。パーティションが時間単位で作成される場合は、これを
1 hに設定します。
sink.partition-commit.watermark-time-zone
LONG 型のウォーターマークを TIMESTAMP に解析するために使用されるタイムゾーン。結果の TIMESTAMP はパーティション時間と比較され、パーティションをコミットすべきかどうかを判断します。
String
いいえ
UTC
このパラメーターは、sink.partition-commit.trigger が `partition-time` に設定されている場合にのみ有効です。
これが正しく設定されていない場合、たとえば、ソースの行時間が TIMESTAMP_LTZ 列で定義されていて、このプロパティが設定されていない場合、パーティションのコミットが数時間後になることがあります。デフォルト値は UTC であり、これはウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを意味します。
ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションタイムゾーンである必要があります。このプロパティの有効な値は、完全なタイムゾーン名 (例: 'America/Los_Angeles') またはカスタムタイムゾーン (例: 'GMT-08:00') のいずれかです。
partition.time-extractor.kind
パーティションフィールドから時間を抽出する時間エクストラクター。
String
いいえ
default
有効な値:
default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを設定できます。
custom:エクストラクタクラスを指定する必要があります。
partition.time-extractor.class
PartitionTimeExtractor インターフェイスを実装するエクストラクタクラス。
String
いいえ
なし
なし。
partition.time-extractor.timestamp-pattern
パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。
String
いいえ
なし
デフォルトでは、最初のフィールドは
yyyy-MM-dd hh:mm:ssパターンを使用して抽出されます。パーティションフィールド 'dt' からタイムスタンプを抽出するには、`$dt` と設定できます。
複数のパーティションフィールド (年、月、日、時など) からタイムスタンプを抽出するには、
$year-$month-$day $hour:00:00と設定できます。2 つのパーティションフィールド dt と hour からタイムスタンプを抽出するには、
$dt $hour:00:00と設定できます。
partition.time-extractor.timestamp-formatter
パーティションのタイムスタンプ文字列値をタイムスタンプに変換するフォーマッタ。パーティションのタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern プロパティで表現されます。
String
いいえ
yyyy-MM-dd HH:mm:ss
たとえば、パーティションのタイムスタンプが複数のパーティションフィールド (年、月、日など) から抽出される場合、partition.time-extractor.timestamp-pattern プロパティを
$year$month$dayに、partition.time-extractor.timestamp-formatter プロパティを `yyyyMMdd` に設定できます。デフォルトのフォーマッターはyyyy-MM-dd HH:mm:ssです。このタイムスタンプフォーマッターは、Java の DateTimeFormatter と互換性があります。sink.partition-commit.policy.kind
パーティションコミットポリシーのタイプ。
String
いいえ
なし
パーティションコミットポリシーは、パーティションの書き込みが完了し、読み取り準備ができたことをダウンストリームコンシューマーに通知します。有効な値:
success-file:ディレクトリに `_success` ファイルを追加します。
custom:指定されたクラスを使用してコミットポリシーを作成します。複数のコミットポリシーを同時に指定できます。
sink.partition-commit.policy.class
PartitionCommitPolicy インターフェイスを実装するパーティションコミットポリシークラス。
String
いいえ
なし
このクラスは `custom` コミットポリシーでのみ使用できます。
sink.partition-commit.success-file.name
`success-file` パーティションコミットポリシーで使用するファイルの名前。
String
いいえ
_SUCCESS
なし。
sink.parallelism
外部ファイルシステムにファイルを書き込むための並列度。
Integer
いいえ
なし
デフォルトでは、シンクの並列度はアップストリームのチェインされたオペレーターの並列度と同じです。異なる並列度を設定した場合、ファイル書き込みオペレーターは指定されたシンクの並列度を使用します。ファイルコンパクションが有効な場合、コンパクションオペレーターも指定されたシンクの並列度を使用します。
説明この値は 0 より大きくなければなりません。そうでない場合、例外がスローされます。
バケット認証の設定
VVR 8.0.6 以降のバージョンのみがバケット認証の設定をサポートします。
ファイルシステムパスを指定した後、指定したパスとの間でデータを読み書きするには、バケット認証も設定する必要があります。バケット認証を設定するには、リアルタイムコンピューティング開発コンソールの[デプロイメント詳細]ページの[パラメーター]タブの[追加設定]セクションに、次のコードを追加します。
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx次の表にパラメーターを示します。
設定項目 | 説明 |
fs.oss.bucket.<bucketName>.accessKeyId | パラメーターの説明:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
OSS-HDFS への書き込み
まず、リアルタイムコンピューティング開発コンソールの[デプロイメント詳細] ページの[パラメーター] タブの[追加設定] セクションに、次の構成を追加します。
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx次の表にパラメーターを示します。
設定項目 | 説明 |
fs.oss.jindo.buckets | 書き込み先の OSS-HDFS サービス内のバケット名。複数のバケットをセミコロンで区切って設定できます。Flink が OSS パスに書き込む際、対応するバケットが `fs.oss.jindo.buckets` に含まれている場合、データは OSS-HDFS サービスに書き込まれます。 |
fs.oss.jindo.accessKeyId | 既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。 説明 AccessKey Secret の漏洩リスクを軽減するため、AccessKey Secret は作成時に一度しか表示されず、後で確認することはできません。安全に保管してください。 |
fs.oss.jindo.accessKeySecret |
OSS-HDFS エンドポイントも設定する必要があります。OSS-HDFS エンドポイントは 2 つの方法で設定できます:
パラメーター設定
[リアルタイムコンピューティング開発コンソール] の [デプロイメント詳細] ページの [パラメーター] タブの [追加設定] セクションに、以下の構成を追加できます。
fs.oss.jindo.endpoint: xxxパス設定
OSS パスで OSS-HDFS エンドポイントを設定できます。
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>user-defined-oss-hdfs-bucket:バケットの名前。oss-hdfs-endpoint:OSS-HDFS エンドポイント。fs.oss.jindo.buckets設定項目には <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> を含める必要があります。
たとえば、バケット名が `jindo-test` でエンドポイントが `cn-beijing.oss-dls.aliyuncs.com` の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> である必要があり、fs.oss.jindo.buckets 設定項目には jindo-test.cn-beijing.oss-dls.aliyuncs.com を含める必要があります。
# OSS パス
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# 追加設定
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com外部 HDFS ファイルに書き込む際、ファイルパスが hdfs://** の場合、次の設定を追加してアクセスユーザー名を指定または切り替えることもできます。
リアルタイムコンピューティング開発コンソールの[デプロイメント詳細]ページにある[パラメーター]タブの[追加設定]セクションに、以下の構成を追加できます。
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs例
ソーステーブル
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;結果テーブル
パーティションテーブルへの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- ミリ秒単位の時間 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを定義 ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- ユーザー設定のタイムゾーンが 'Asia/Shanghai' であると仮定 'sink.partition-commit.policy.kind'='success-file' ); -- ファイルシステムテーブルに挿入するストリーミング SQL INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;非パーティションテーブルへの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定の詳細については、「DataStream コネクタの使用」をご参照ください。
次のコードは、DataStream API を使用して OSS および OSS-HDFS に書き込む方法の例を示しています。
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);OSS-HDFS に書き込むには、[リアルタイムコンピューティング開発コンソール] の [デプロイメント詳細] ページの [パラメーター] タブにある [追加設定] セクションで、関連する OSS-HDFS パラメーターも設定する必要があります。 詳細については、「OSS-HDFS への書き込み」をご参照ください。
関連ドキュメント
Flink がサポートするコネクタの詳細については、「サポートされるコネクタ」をご参照ください。
Tablestore (OTS) コネクタの使用方法の詳細については、「Tablestore (OTS)」をご参照ください。
ストリーミングデータレイクハウス用の Paimon コネクタの使用方法の詳細については、「ストリーミングデータレイクハウスのための Paimon」をご参照ください。