このトピックでは、オブジェクトストレージサービス(OSS)コネクタの使用方法について説明します。
Alibaba Cloud OSS は、99.9999999999%(12個の9)のデータ耐久性と 99.995% のデータ可用性を提供する、安全で費用対効果の高いオブジェクトストレージサービスです。OSS は、ストレージコストを管理および削減するための複数のストレージクラスを提供します。次の表に、OSSコネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | ソーステーブルと結果テーブル。 |
実行モード | バッチモードとストリーミングモード。 |
データ形式 | ORC、PARQUET、AVRO、CSV、JSON、および RAW。 説明 Ververica Runtime(VVR)6.0.7 以降を使用する Apache Flink 用 Realtime Compute のみ、Parquet 形式のデータを読み取ることができます。 |
メトリック | なし。 |
APIタイプ | DataStream API と SQL API。 |
結果テーブルでのデータの更新または削除 | 結果テーブルのデータは更新または削除できません。データは結果テーブルにのみ挿入できます。 |
制限事項
一般的な制限事項
VVR 4.0.14 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS からデータを読み取ったり、OSS にデータを書き込んだりできます。
VVR 8.0.6 以前を使用する Apache Flink 用 Realtime Compute のみ、Alibaba Cloud アカウント内の OSS バケットからのみデータを読み取ったり、書き込んだりできます。
説明他の Alibaba Cloud アカウント内の OSS バケットからデータを読み書きする場合は、VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute を使用し、OSS バケットの認証情報を構成します。詳細については、このトピックのOSS バケットの認証情報の構成セクションをご参照ください。
結果テーブルのみの制限事項
AVRO、CSV、JSON、RAW などの行指向ストレージ形式のデータは、OSS に書き込むことができません。詳細については、FLINK-30635 をご参照ください。
VVR 6.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS-HDFS にデータを書き込むことができます。詳細については、このトピックのOSS-HDFS へのデータの書き込みセクションをご参照ください。
構文
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);
メタデータ列
OSS ソーステーブルにメタデータ列を指定して、OSS の関連メタデータを取得できます。たとえば、OSS ソーステーブルに file.path
という名前のメタデータ列を指定すると、列の値は各データ行が格納されているファイルのパスになります。サンプルコード:
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)
次の表に、OSS ソーステーブルでサポートされているメタデータ列を示します。
キー | データ型 | 説明 |
file.path | STRING NOT NULL | 各データ行が格納されているファイルのパス。 |
file.name | STRING NOT NULL | 各データ行が格納されているファイルの名前。ファイル名は、ファイルのルートパスから最も遠い要素です。 |
file.size | BIGINT NOT NULL | 各データ行が格納されているファイルのバイト数。 |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 各データ行が格納されているファイルが変更された時刻。 |
WITH 句のパラメータ
共通パラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
デフォルト値なし
値を
filesystem
に設定します。path
ファイルシステムのパス。
String
はい
デフォルト値なし
パスは Uniform Resource Identifier(URI)形式です。例:
oss://my_bucket/my_path
。説明VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute でこのパラメータを構成した後、ファイルシステムの指定されたパスからデータを読み取ったり、書き込んだりする OSS バケットの認証情報を構成する必要があります。詳細については、このトピックのOSS バケットの認証情報の構成セクションをご参照ください。
format
ファイルの形式。
String
はい
デフォルト値なし
有効な値:
csv
json
avro
parquet
orc
raw
ソーステーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
source.monitor-interval
ソーステーブルが新しいファイルの生成を監視する間隔。このパラメータの値は 0 より大きくする必要があります。
Duration
いいえ
デフォルト値なし
このパラメータを構成しない場合、指定されたパスは 1 回だけスキャンされます。この場合、データソースは有界です。
各ファイルは、ファイルのパスによって一意に識別されます。新しいファイルが検出されるたびに、ファイルが処理されます。
処理されたファイルは、データソースのライフサイクル中に状態に格納されます。したがって、データソースの状態は、チェックポイントとセーブポイントが生成されたときに保存されます。このパラメータを小さい値に設定すると、新しいファイルをすばやく検出できますが、ファイルシステムまたは OSS が頻繁にトラバースされます。
結果テーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
partition.default-name
パーティションフィールドの値が null または空の文字列の場合のパーティションの名前。
String
いいえ
_DEFAULT_PARTITION__
該当なし
sink.rolling-policy.file-size
ファイルがロールされる前のファイルの最大サイズ。
MemorySize
いいえ
128 MB
指定されたディレクトリに書き込まれるデータは分割され、PART ファイルに保存されます。パーティションのシンクオペレータがパーティションのデータを受信する各サブタスクは、少なくとも 1 つの PART ファイルを生成します。ファイルは、構成されているローリングポリシーに基づいてロールされます。処理中の現在の PART ファイルを閉じる必要がある場合は、新しい PART ファイルが生成されます。PART ファイルは、ファイルのサイズと、指定されたファイルを開いたままにできる最大期間に基づいてロールされます。
説明ファイルに列指向ストレージが使用されている場合、
ファイルがローリングポリシーの要件を満たしていなくても、チェックポイントの実行時にファイルはロールされます。
この場合、ファイルがローリングポリシーの要件を満たしているか、チェックポイントが実行されている場合、ファイルは常にロールされます。
ファイルに行指向ストレージが使用されている場合、ファイルがローリングポリシーの要件を満たしている場合にのみ、ファイルはロールされます。
sink.rolling-policy.rollover-interval
PART ファイルがロールされる前に開いたままにできる最大期間。
Duration
いいえ
30 分
チェック頻度は、sink.rolling-policy.check-interval パラメータで指定されます。
sink.rolling-policy.check-interval
Apache Flink 用 Realtime Compute が、ローリングポリシーに基づいてファイルをロールする必要があるかどうかをチェックする間隔。
Duration
いいえ
1 分
このパラメータは、sink.rolling-policy.rollover-interval パラメータの値に基づいて、指定されたファイルをロールする必要があるかどうかを指定します。
auto-compaction
ストリーミング結果テーブルの自動ファイルマージ機能を有効にするかどうかを指定します。この機能が有効になっている場合、データは最初に一時ファイルに書き込まれます。チェックポイント操作が完了すると、チェックポイント中に生成された一時ファイルがマージされます。一時ファイルは、マージされるまで表示されません。
Boolean
いいえ
false
自動ファイルマージ機能が有効になっている場合、複数の小さなファイルが、指定された宛先ファイルのサイズに基づいて大きなファイルにマージされます。本番環境でこの機能を使用する場合は、次の点に注意してください。
チェックポイント中に生成されたファイルのみをマージできます。生成されるファイルの数は、チェックポイントの数以上になります。
一時ファイルは、マージされるまで表示されません。
チェックポイント間隔とマージ期間
が経過すると、ファイルが表示されます。マージ期間が過度に長い場合、バックプレッシャーが発生する可能性があります。これにより、チェックポイントに必要な時間が長くなります。
compaction.file-size
一時ファイルがマージされる宛先ファイルのサイズ。
MemorySize
いいえ
128 MB
デフォルト値は、sink.rolling-policy.file-size パラメータの値と同じです。
sink.partition-commit.trigger
パーティションコミットトリガーのタイプ。
String
いいえ
process-time
Apache Flink 用 Realtime Compute は、パーティションテーブルにデータを書き込むために使用できる次のタイプのパーティションコミットトリガーを提供します。
process-time:このパーティションコミットトリガーは、パーティションの作成時刻と現在のシステム時刻に基づいて使用され、パーティション時間エクストラクタまたはウォーターマークジェネレータを必要としません。現在のシステム時刻がパーティションの作成時刻と sink.partition-commit.delay パラメータで指定された時間の合計を超えると、process-time トリガーはすぐにパーティションをコミットします。このトリガーは一般的な目的に適していますが、正確性は保証されません。たとえば、データの遅延または障害が発生した場合、このトリガーは予想よりも早くパーティションをコミットする可能性があります。
partition-time:このパーティションコミットトリガーは、抽出されたパーティション作成時刻に基づいて使用され、ウォーターマークジェネレータが必要です。デプロイメントにこのトリガーを使用する場合は、デプロイメントがウォーターマーク生成をサポートしており、データが時間(時または日など)でパーティション分割されていることを確認してください。ウォーターマークで指定された時刻がパーティションの作成時刻と sink.partition-commit.delay パラメータで指定された時間の合計を超えると、partition-time トリガーはすぐにパーティションをコミットします。
sink.partition-commit.delay
パーティションをコミットできるようになる前に許容される最大遅延。遅延時間に達するまで、パーティションはコミットされません。
Duration
いいえ
0 秒
データが日でパーティション分割されている場合、このパラメータを
1 d
に設定する必要があります。データが時間でパーティション分割されている場合、このパラメータを
1 h
に設定する必要があります。
sink.partition-commit.watermark-time-zone
LONG データ型のウォーターマークを TIMESTAMP データ型のウォーターマークに変換するときに使用されるタイムゾーン。変換後、Apache Flink 用 Realtime Compute は、TIMESTAMP データ型のウォーターマークをパーティション作成時刻と比較して、パーティションをコミットする必要があるかどうかを判断します。
String
いいえ
UTC
このパラメータは、sink.partition-commit.trigger パラメータが partition-time に設定されている場合にのみ有効になります。
sink.partition-commit.watermark-time-zone パラメータの構成は、パーティションのコミットメントに影響します。たとえば、ソーステーブルの TIMESTAMP_LTZ 列に ROWTIME を指定し、このパラメータを構成しない場合、パーティションは予想時刻よりも数時間遅れてコミットされる可能性があります。デフォルト値の UTC は、ウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを示します。
ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションタイムゾーンである必要があります。このパラメータの有効な値は、「America/Los_Angeles」などのタイムゾーンのフルネーム、または「GMT-08:00」などのカスタムタイムゾーンです。
partition.time-extractor.kind
パーティションフィールドから時間を抽出する時間エクストラクタ。
String
いいえ
default
有効な値:
default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを構成できます。これはデフォルト値です。
custom:エクストラクタクラスを指定する必要があります。
partition.time-extractor.class
PartitionTimeExtractor インターフェースを実装するエクストラクタクラス。
String
いいえ
デフォルト値なし
該当なし
partition.time-extractor.timestamp-pattern
パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。
String
いいえ
デフォルト値なし
デフォルトでは、最初のフィールドは
yyyy-MM-dd hh:mm:ss
形式で抽出されます。dt パーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを $dt に設定できます。
年、月、日、時間など、複数のパーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを
$year-$month-$day $hour:00:00
に設定できます。dt および hour パーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを
$dt $hour:00:00
に設定できます。
partition.time-extractor.timestamp-formatter
パーティションタイムスタンプ文字列値をタイムスタンプに変換するために使用されるフォーマッタ。パーティションタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern パラメータで指定されます。
String
いいえ
yyyy-MM-dd HH:mm:ss
たとえば、年、月、日など、複数のパーティションフィールドからパーティションのタイムスタンプを抽出する場合は、partition.time-extractor.timestamp-pattern パラメータを
$year$month$day
に、partition.time-extractor.timestamp-formatter パラメータを yyyyMMdd に設定できます。このパラメータのデフォルト値はyyyy-MM-dd HH:mm:ss
です。このパラメータで指定されたタイムスタンプフォーマッタは、Java の DateTimeFormatter と互換性があります。sink.partition-commit.policy.kind
パーティションコミットポリシーのタイプ。
String
いいえ
デフォルト値なし
パーティションコミットポリシーにより、Apache Flink 用 Realtime Compute は、パーティションへのデータ書き込みが完了し、パーティションからデータを読み取ることができることをダウンストリームアプリケーションに通知できます。有効な値:
success-file:_success ファイルが指定されたディレクトリに追加されます。
custom:指定されたクラスに基づいてコミットポリシーが作成されます。複数のコミットポリシーを同時に指定できます。
sink.partition-commit.policy.class
PartitionCommitPolicy インターフェースを実装するパーティションコミットポリシークラス。
String
いいえ
デフォルト値なし
このクラスは、sink.partition-commit.policy.kind パラメータが custom に設定されている場合にのみ使用できます。
sink.partition-commit.success-file.name
sink.partition-commit.policy.kind パラメータが success-file に設定されている場合に使用されるファイルの名前。
String
いいえ
_SUCCESS
該当なし
sink.parallelism
ファイルシステムへのファイル書き込みの並列度。
Integer
いいえ
デフォルト値なし
デフォルトでは、sink.parallelism パラメータの値は、アップストリームチェーンオペレータの並列度と同じです。sink.parallelism パラメータの値がアップストリームチェーンオペレータの並列度と異なる場合、ファイルを書くオペレータはこのパラメータの値を使用します。ファイルマージが有効になっている場合、ファイルをマージするオペレータもこのパラメータの値を使用します。
説明値は 0 より大きくする必要があります。そうでない場合、エラーが発生します。
OSSバケットの認証情報を構成する
VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS バケットの認証情報を構成できます。
ファイルシステムのパスを指定した後、OSS バケットの認証情報を構成する必要があります。これにより、ファイルシステムの指定されたパスからデータを読み取ったり、書き込んだりできます。OSS バケットの認証情報を構成するには、次の手順を実行します。 Apache Flink 用 Realtime Compute の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx
次の表に、上記の構成のパラメータを示します。
パラメータ | 説明 |
fs.oss.bucket.<bucketName>.accessKeyId | パラメータ:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
OSS-HDFS へのデータの書き込み
Apache Flink 用 Realtime Compute の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx
次の表に、上記の構成のパラメータを示します。
パラメータ | 説明 |
fs.oss.jindo.buckets | データが書き込まれる OSS-HDFS サービスのバケットの名前。複数のバケット名を指定できます。バケット名はセミコロン(;)で区切ります。Flink が OSS パスにデータを書き込むとき、関連するバケット名が fs.oss.jindo.buckets パラメータの値に含まれている場合、データは OSS-HDFS サービスに書き込まれます。 |
fs.oss.jindo.accessKeyId | Alibaba Cloud アカウントの AccessKey ID。Alibaba Cloud アカウントの AccessKey シークレットの取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。 |
fs.oss.jindo.accessKeySecret | Alibaba Cloud アカウントの AccessKey シークレット。Alibaba Cloud アカウントの AccessKey シークレットの取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。 |
また、OSS-HDFS サービスのエンドポイントも構成する必要があります。次のいずれかの方法を使用して、OSS-HDFS サービスのエンドポイントを構成できます。
[構成] タブの Apache Flink 用 Realtime Compute の開発コンソールRealtime Compute for Apache Flink の開発コンソールで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。
fs.oss.jindo.endpoint: xxx
OSS パスで OSS-HDFS サービスのエンドポイントを構成します。
サンプルコード:
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
OSSパスでは、user-defined-oss-hdfs-bucket は関連バケットの名前を指定し、oss-hdfs-endpoint は OSS-HDFS サービスのエンドポイントを指定します。 fs.oss.jindo.buckets パラメーターの値には、<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> が含まれている必要があります。
たとえば、バケット名が jindo-test で、OSS-HDFS サービスのエンドポイントが
たとえば、バケット名が jindo-test で、OSS-HDFS サービスのエンドポイントが cn-beijing.oss-dls.aliyuncs.com の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> である必要があり、fs.oss.jindo.buckets パラメータの値には jindo-test.cn-beijing.oss-dls.aliyuncs.com が含まれている必要があります。
サンプルコード
ソーステーブルのサンプルコード
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
結果テーブルのサンプルコード
パーティションテーブルへのデータの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- ミリ秒単位の時間を使用します。 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを指定します。 ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- タイムゾーンは「Asia/Shanghai」です。 'sink.partition-commit.policy.kind'='success-file' ); -- 次のストリーミング SQL ステートメントを実行して、ファイルシステムテーブルにデータを挿入します。 INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;
非パーティションテーブルへのデータの書き込み
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
DataStream API を呼び出してデータを読み書きする場合は、関連タイプの DataStream コネクタを使用して Apache Flink 用 Realtime Compute に接続する必要があります。DataStream コネクタの構成方法の詳細については、DataStream コネクタの設定をご参照ください。
次のサンプルコードは、DataStream API を使用して OSS または OSS-HDFS にデータを書き込む方法を示しています。
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);
OSS-HDFS にデータを書き込む場合は、上記のコードを実行し、次の手順を実行します。 Apache Flink 用 Realtime Compute の開発コンソールRealtime Compute for Apache Flink の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、OSS-HDFS 関連の構成を [その他の構成] フィールドに追加します。詳細については、このトピックのOSS-HDFS へのデータの書き込みセクションをご参照ください。
参考資料
Apache Flink 用 Realtime Compute でサポートされているコネクタの詳細については、サポートされているコネクタをご参照ください。
Tablestore コネクタの使用方法の詳細については、Tablestore コネクタをご参照ください。
Apache Paimon コネクタの使用方法の詳細については、Apache Paimon コネクタをご参照ください。