このトピックでは、Object Storage Service (OSS) コネクタの使用方法について説明します。
Alibaba Cloud の Object Storage Service (OSS) は、高い信頼性、安全性、コスト効率を誇るクラウドストレージサービスで、大容量のストレージを提供します。99.9999999999% (トゥエルブナイン) のデータ耐久性と 99.995% のデータ可用性を実現するように設計されています。複数のストレージクラスから選択して、ストレージコストを最適化できます。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブルと結果テーブル |
実行モード | バッチモードとストリーミングモード |
データ形式 | Orc、Parquet、Avro、Csv、JSON、Raw |
特定のモニタリングメトリック | なし |
API タイプ | Datastream と SQL |
結果テーブルのデータの更新または削除 | 結果テーブルのデータの更新および削除はサポートされていません。データの挿入のみ可能です。 |
制限事項
全般
Ververica Runtime (VVR) 11 以降のバージョンのみが、OSS からの圧縮ファイル (GZIP、BZIP2、XZ、DEFLATE) の読み取りをサポートします。VVR 8 は圧縮ファイルを正しく処理できません。
VVR 8.0.6 より前の Flink コンピュートエンジンバージョンでは、同じアカウント内でのみ OSS への読み書きが可能です。他のアカウントの OSS への読み書きを行うには、VVR 8.0.6 以降の Flink コンピュートエンジンを使用し、バケット認証情報を設定する必要があります。詳細については、「バケット認証情報の設定」をご参照ください。
結果テーブルのみ
OSS への書き込み時、Avro、CSV、JSON、Raw などの行指向フォーマットはサポートされていません。詳細については、「FLINK-30635」をご参照ください。
構文
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);メタデータ列
ソーステーブルのメタデータ列を読み取ることで、OSS データに関するメタデータを取得できます。たとえば、OSS ソーステーブルでメタデータ列 file.path を定義した場合、この列の値は行データを含むファイルのパスになります。次の例は、メタデータ列の使用方法を示しています。
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)次の表に、OSS ソーステーブルがサポートするメタデータ列を示します。
キー | データ型 | 説明 |
file.path | STRING NOT NULL | 行データが格納されているファイルのパス。 |
file.name | STRING NOT NULL | 行データが格納されているファイルの名前。これは、ファイルのルートパスから最も遠い要素です。 |
file.size | BIGINT NOT NULL | 行データが格納されているファイルのサイズ (バイト単位)。 |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 行データが格納されているファイルの変更時刻。 |
WITH パラメーター
全般
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
なし
値は
filesystemである必要があります。path
ファイルシステムパス。
String
はい
なし
パスは URI 形式である必要があります。例:
oss://my_bucket/my_path。説明VVR 8.0.6 以降では、このパラメーターを設定した後、指定されたファイルシステムパスのデータを読み書きするためにバケット認証情報を設定する必要もあります。詳細については、「バケット認証情報の設定」をご参照ください。
format
ファイル形式。
String
はい
なし
有効な値:
csv
json
avro
parquet
orc
raw
ソーステーブルのみ
パラメーター
説明
データ型
必須
デフォルト値
備考
source.monitor-interval
新しいファイルをモニタリングする間隔。0 より大きい値を設定する必要があります。
Duration
いいえ
なし
この設定項目が設定されていない場合、指定されたパスは一度だけスキャンされ、ソースは有界になります。
各ファイルはパスによって一意に識別されます。新しいファイルが検出されると、一度処理されます。
処理されたファイルは、ソースのライフサイクル全体を通じてステートに保存されます。したがって、ソースのステートはチェックポイントとセーブポイント中に保存されます。間隔が短いほど、新しいファイルをより速く検出できますが、ファイルシステムまたは OSS の走査頻度も高くなります。
結果テーブルのみ
パラメーター
説明
データ型
必須
デフォルト値
備考
partition.default-name
パーティションフィールドの値が NULL または空文字列の場合のパーティション名。
String
いいえ
_DEFAULT_PARTITION__
なし。
sink.rolling-policy.file-size
ローリングされる前のファイルの最大サイズ。
MemorySize
いいえ
128 MB
ディレクトリに書き込まれたデータは、パートファイルに分割されます。パーティションのデータを受信するシンクの各サブタスクは、そのパーティションに対して少なくとも 1 つのパートファイルを生成します。設定可能なローリングポリシーに基づいて、現在処理中のパートファイルが閉じられ、新しいファイルが作成されます。このポリシーは、サイズとファイルが開いたままでいられる最大タイムアウト期間に基づいてパートファイルをローリングします。
説明列指向フォーマットの場合:
ファイルが設定されたローリングポリシーを満たしていなくても、チェックポイントが実行されると常にローリングされます。
したがって、ファイルが設定されたローリングポリシーを満たすか、チェックポイントが実行されると、ファイルは常にローリングされます。
行指向フォーマットの場合、ファイルは設定されたローリングポリシーを満たした場合にのみローリングされます。
sink.rolling-policy.rollover-interval
パートファイルがローリングされる前に開いたままでいられる最大期間。
Duration
いいえ
30min
チェック頻度は sink.rolling-policy.check-interval プロパティによって制御されます。
sink.rolling-policy.check-interval
時間ベースのローリングポリシーのチェック間隔。
Duration
いいえ
1min
このプロパティは、sink.rolling-policy.rollover-interval プロパティに基づいてファイルをローリングすべきかどうかを判断するためのチェックを制御します。
auto-compaction
ストリーミング結果テーブルで自動コンパクション機能を有効にするかどうかを指定します。データはまず一時ファイルに書き込まれます。チェックポイントが完了すると、そのチェックポイントで生成された一時ファイルがマージされます。一時ファイルはマージされるまで表示されません。
Boolean
いいえ
false
ファイルコンパクションを有効にすると、複数の small ファイルがターゲットファイルサイズに基づいて large ファイルにマージされます。本番環境でファイルコンパクションを使用する場合は、次の点に注意してください:
チェックポイント内のファイルのみがマージされます。少なくともチェックポイントと同じ数のファイルが生成されます。
ファイルはマージされるまで表示されません。ファイルの可視化時間は
チェックポイント間隔 + マージ期間です。マージ時間が長いと、バックプレッシャーが発生し、チェックポイントに必要な時間が長くなる可能性があります。
compaction.file-size
コンパクションのターゲットファイルサイズ。
MemorySize
いいえ
128 MB
デフォルト値は、ローリングファイルサイズ sink.rolling-policy.file-size と同じです。
sink.partition-commit.trigger
パーティションコミットトリガーのタイプ。
String
いいえ
process-time
パーティションテーブルへの書き込みには、Flink は 2 種類のパーティションコミットトリガーを提供します:
process-time:このトリガーは、パーティションの作成時刻と現在のシステム時刻に基づきます。パーティションタイムエクストラクタもウォーターマークジェネレーターも必要ありません。現在のシステム時刻が、パーティション作成システム時刻と sink.partition-commit.delay の合計を超えると、パーティションはすぐにコミットされます。このトリガーはより一般的ですが、精度は低くなります。たとえば、データ遅延やエラーにより、パーティションが早すぎるタイミングでコミットされる可能性があります。
partition-time:このトリガーは、抽出されたパーティション時刻に基づいており、ウォーターマークの生成が必要です。ジョブはウォーターマーク生成をサポートする必要があり、パーティションは時間 (時間単位や日単位など) で分割されます。ウォーターマークがパーティション作成システム時刻と sink.partition-commit.delay の合計を超えると、パーティションはすぐにコミットされます。
sink.partition-commit.delay
パーティションがコミットされるまでの最大遅延。これは、この遅延が経過するまでパーティションがコミットされないことを示します。
Duration
いいえ
0s
パーティションが日単位で作成される場合は、これを
1 dに設定します。パーティションが時間単位で作成される場合は、これを
1 hに設定します。
sink.partition-commit.watermark-time-zone
LONG 型のウォーターマークを TIMESTAMP 型に解析するために使用されるタイムゾーン。ウォーターマークの結果の TIMESTAMP は、パーティション時刻と比較され、パーティションをコミットすべきかどうかを判断します。
String
いいえ
UTC
このパラメーターは、sink.partition-commit.trigger が partition-time に設定されている場合にのみ有効です。
これが正しく設定されていない場合、たとえば、ソースの rowtime が TIMESTAMP_LTZ 列で定義されているのにこのプロパティが設定されていない場合、パーティションが数時間コミットされないことがあります。デフォルト値の UTC は、ウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを意味します。
ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションのタイムゾーンでなければなりません。このプロパティの値は、完全なタイムゾーン名 (例:'America/Los_Angeles') またはカスタムタイムゾーン (例:'GMT-08:00') のいずれかです。
partition.time-extractor.kind
パーティションフィールドから時間を抽出するためのタイムエクストラクタ。
String
いいえ
default
有効な値:
default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを設定できます。
custom:エクストラクタクラスを指定する必要があります。
partition.time-extractor.class
PartitionTimeExtractor インターフェイスを実装するエクストラクタクラス。
String
いいえ
なし
なし。
partition.time-extractor.timestamp-pattern
パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。
String
いいえ
なし
デフォルトでは、最初のフィールドが
yyyy-MM-dd hh:mm:ssパターンに基づいて抽出されます。パーティションフィールド 'dt' からタイムスタンプを抽出するには、$dt を設定します。
複数のパーティションフィールド (year、month、day、hour など) からタイムスタンプを抽出するには、
$year-$month-$day $hour:00:00を設定します。2 つのパーティションフィールド dt と hour からタイムスタンプを抽出するには、
$dt $hour:00:00を設定します。
partition.time-extractor.timestamp-formatter
パーティションのタイムスタンプ文字列値をタイムスタンプに変換するフォーマッタ。パーティションのタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern プロパティで表現されます。
String
いいえ
yyyy-MM-dd HH:mm:ss
たとえば、パーティションのタイムスタンプが複数のパーティションフィールド (year、month、day など) から抽出される場合、partition.time-extractor.timestamp-pattern プロパティを
$year$month$dayに、partition.time-extractor.timestamp-formatter プロパティを yyyyMMdd に設定できます。デフォルトのフォーマッタはyyyy-MM-dd HH:mm:ssです。このタイムスタンプフォーマッタは、Java の DateTimeFormatter と互換性があります。sink.partition-commit.policy.kind
パーティションコミットポリシーのタイプ。
String
いいえ
なし
パーティションコミットポリシーは、パーティションが完全に書き込まれ、読み取り準備ができたことを下流のコンシューマーに通知します。有効な値:
success-file:ディレクトリに _success ファイルを追加します。
custom:指定されたクラスを使用してコミットポリシーを作成します。複数のコミットポリシーを同時に指定できます。
sink.partition-commit.policy.class
PartitionCommitPolicy インターフェイスを実装するパーティションコミットポリシークラス。
String
いいえ
なし
このクラスは、カスタムコミットポリシーでのみ使用できます。
sink.partition-commit.success-file.name
success-file パーティションコミットポリシーを使用する場合のファイル名。
String
いいえ
_SUCCESS
なし。
sink.parallelism
外部ファイルシステムにファイルを書き込むための並列度。
Integer
いいえ
なし
デフォルトでは、シンクの並列度は上流の連鎖オペレーターの並列度と同じです。上流の連鎖オペレーターとは異なる並列度を設定した場合、ファイルを書き込むオペレーターは指定されたシンクの並列度を使用します。ファイルコンパクションが有効になっている場合、コンパクションオペレーターも指定されたシンクの並列度を使用します。
説明この値は 0 より大きくなければなりません。そうでない場合、例外がスローされます。
バケット認証情報の設定
バケット認証情報は、リアルタイムコンピューティングエンジン VVR 8.0.6 以降でのみ設定できます。
指定したファイルシステムパスでデータを読み書きするには、[開発コンソール] の [デプロイメント詳細] タブにある [その他の設定] セクションに、次のコードを追加して、バケット認証情報も設定する必要があります。
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx次の表にパラメーターを示します。
設定項目 | 説明 |
fs.oss.bucket.<bucketName>.accessKeyId | パラメーターの説明:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
OSS-HDFS への書き込み
まず、[開発コンソール] の [デプロイメント詳細] タブの [その他の構成] セクションに、次の構成を追加します:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx次の表にパラメーターを示します。
設定項目 | 説明 |
fs.oss.jindo.buckets | データが書き込まれる OSS-HDFS サービスのバケット名。複数のバケット名をセミコロンで区切って設定できます。Flink が OSS パスに書き込む際、対応するバケットが fs.oss.jindo.buckets に含まれている場合、データは OSS-HDFS サービスに書き込まれます。 |
fs.oss.jindo.accessKeyId | 既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。 説明 AccessKey Secret の漏洩リスクを低減するため、AccessKey Secret は作成時にのみ表示され、後で表示することはできません。安全に保管してください。 |
fs.oss.jindo.accessKeySecret |
さらに、OSS-HDFS のエンドポイントを設定する必要があります。OSS-HDFS エンドポイントは、次の 2 つの方法のいずれかで設定できます:
パラメーター設定
[Real-time Compute for Apache Flink 開発コンソール] の [デプロイメント詳細] タブで、[ランタイムパラメーター設定] エリアの [追加構成] セクションに、次の構成を追加します:
fs.oss.jindo.endpoint: xxxパス設定
OSS パスで OSS-HDFS エンドポイントを設定します。
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>user-defined-oss-hdfs-bucket:バケットの名前。oss-hdfs-endpoint:OSS-HDFS サービスのエンドポイント。fs.oss.jindo.buckets設定項目には <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> を含める必要があります。
たとえば、バケット名が jindo-test で、エンドポイントが cn-beijing.oss-dls.aliyuncs.com の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> となり、fs.oss.jindo.buckets 設定項目には jindo-test.cn-beijing.oss-dls.aliyuncs.com を含める必要があります。
# OSS パス
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# その他の設定
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.comファイルパスが hdfs://** である外部 HDFS ファイルに書き込む場合、次の設定を追加してアクセス用のユーザー名を指定または変更することもできます。
[開発コンソール] の [デプロイメント詳細] タブの [その他の構成] セクションに、以下の構成を追加します:
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs使用例
ソーステーブル
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;結果テーブル
パーティションテーブルへの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- ミリ秒単位の時間 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを定義 ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- ユーザーが設定したタイムゾーンが 'Asia/Shanghai' であると仮定 'sink.partition-commit.policy.kind'='success-file' ); -- ファイルシステムテーブルに挿入するストリーミング SQL INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;非パーティションテーブルへの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。詳細については、「DataStream コネクタの使用方法」をご参照ください。
次の例は、DataStream API を使用して OSS および OSS-HDFS に書き込む方法を示しています:
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);OSS-HDFS に書き込むには、[Real-time Compute for Apache Flink 開発コンソール] の [デプロイメント詳細] タブにある [実行パラメーター設定] エリアの [その他の設定] フィールドで、関連する OSS-HDFS パラメーターも設定する必要があります。 詳細については、「OSS-HDFS への書き込み」をご参照ください。
関連ドキュメント
Flink がサポートするコネクタの詳細については、「サポートされるコネクタ」をご参照ください。
Tablestore (OTS) コネクタの詳細については、「Tablestore (OTS) コネクタ」をご参照ください。
ストリーミングデータレイク用 Paimon コネクタの詳細については、「ストリーミングデータレイク用 Paimon コネクタ」をご参照ください。