すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Object Storage Service (OSS)

最終更新日:Dec 20, 2025

このトピックでは、Object Storage Service (OSS) コネクタの使用方法について説明します。

Alibaba Cloud の Object Storage Service (OSS) は、高い信頼性、安全性、コスト効率を誇るクラウドストレージサービスで、大容量のストレージを提供します。99.9999999999% (トゥエルブナイン) のデータ耐久性と 99.995% のデータ可用性を実現するように設計されています。複数のストレージクラスから選択して、ストレージコストを最適化できます。

カテゴリ

詳細

サポートされるタイプ

ソーステーブルと結果テーブル

実行モード

バッチモードとストリーミングモード

データ形式

Orc、Parquet、Avro、Csv、JSON、Raw

特定のモニタリングメトリック

なし

API タイプ

Datastream と SQL

結果テーブルのデータの更新または削除

結果テーブルのデータの更新および削除はサポートされていません。データの挿入のみ可能です。

制限事項

  • 全般

    • Ververica Runtime (VVR) 11 以降のバージョンのみが、OSS からの圧縮ファイル (GZIP、BZIP2、XZ、DEFLATE) の読み取りをサポートします。VVR 8 は圧縮ファイルを正しく処理できません。

    • VVR 8.0.6 より前の Flink コンピュートエンジンバージョンでは、同じアカウント内でのみ OSS への読み書きが可能です。他のアカウントの OSS への読み書きを行うには、VVR 8.0.6 以降の Flink コンピュートエンジンを使用し、バケット認証情報を設定する必要があります。詳細については、「バケット認証情報の設定」をご参照ください。

  • 結果テーブルのみ

    • OSS への書き込み時、Avro、CSV、JSON、Raw などの行指向フォーマットはサポートされていません。詳細については、「FLINK-30635」をご参照ください。

構文

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

メタデータ列

ソーステーブルのメタデータ列を読み取ることで、OSS データに関するメタデータを取得できます。たとえば、OSS ソーステーブルでメタデータ列 file.path を定義した場合、この列の値は行データを含むファイルのパスになります。次の例は、メタデータ列の使用方法を示しています。

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

次の表に、OSS ソーステーブルがサポートするメタデータ列を示します。

キー

データ型

説明

file.path

STRING NOT NULL

行データが格納されているファイルのパス。

file.name

STRING NOT NULL

行データが格納されているファイルの名前。これは、ファイルのルートパスから最も遠い要素です。

file.size

BIGINT NOT NULL

行データが格納されているファイルのサイズ (バイト単位)。

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

行データが格納されているファイルの変更時刻。

WITH パラメーター

  • 全般

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    なし

    値は filesystem である必要があります。

    path

    ファイルシステムパス。

    String

    はい

    なし

    パスは URI 形式である必要があります。例:oss://my_bucket/my_path

    説明

    VVR 8.0.6 以降では、このパラメーターを設定した後、指定されたファイルシステムパスのデータを読み書きするためにバケット認証情報を設定する必要もあります。詳細については、「バケット認証情報の設定」をご参照ください。

    format

    ファイル形式。

    String

    はい

    なし

    有効な値:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • ソーステーブルのみ

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    source.monitor-interval

    新しいファイルをモニタリングする間隔。0 より大きい値を設定する必要があります。

    Duration

    いいえ

    なし

    この設定項目が設定されていない場合、指定されたパスは一度だけスキャンされ、ソースは有界になります。

    各ファイルはパスによって一意に識別されます。新しいファイルが検出されると、一度処理されます。

    処理されたファイルは、ソースのライフサイクル全体を通じてステートに保存されます。したがって、ソースのステートはチェックポイントとセーブポイント中に保存されます。間隔が短いほど、新しいファイルをより速く検出できますが、ファイルシステムまたは OSS の走査頻度も高くなります。

  • 結果テーブルのみ

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    partition.default-name

    パーティションフィールドの値が NULL または空文字列の場合のパーティション名。

    String

    いいえ

    _DEFAULT_PARTITION__

    なし。

    sink.rolling-policy.file-size

    ローリングされる前のファイルの最大サイズ。

    MemorySize

    いいえ

    128 MB

    ディレクトリに書き込まれたデータは、パートファイルに分割されます。パーティションのデータを受信するシンクの各サブタスクは、そのパーティションに対して少なくとも 1 つのパートファイルを生成します。設定可能なローリングポリシーに基づいて、現在処理中のパートファイルが閉じられ、新しいファイルが作成されます。このポリシーは、サイズとファイルが開いたままでいられる最大タイムアウト期間に基づいてパートファイルをローリングします。

    説明

    列指向フォーマットの場合:

    ファイルが設定されたローリングポリシーを満たしていなくても、チェックポイントが実行されると常にローリングされます。

    したがって、ファイルが設定されたローリングポリシーを満たすか、チェックポイントが実行されると、ファイルは常にローリングされます。

    行指向フォーマットの場合、ファイルは設定されたローリングポリシーを満たした場合にのみローリングされます。

    sink.rolling-policy.rollover-interval

    パートファイルがローリングされる前に開いたままでいられる最大期間。

    Duration

    いいえ

    30min

    チェック頻度は sink.rolling-policy.check-interval プロパティによって制御されます。

    sink.rolling-policy.check-interval

    時間ベースのローリングポリシーのチェック間隔。

    Duration

    いいえ

    1min

    このプロパティは、sink.rolling-policy.rollover-interval プロパティに基づいてファイルをローリングすべきかどうかを判断するためのチェックを制御します。

    auto-compaction

    ストリーミング結果テーブルで自動コンパクション機能を有効にするかどうかを指定します。データはまず一時ファイルに書き込まれます。チェックポイントが完了すると、そのチェックポイントで生成された一時ファイルがマージされます。一時ファイルはマージされるまで表示されません。

    Boolean

    いいえ

    false

    ファイルコンパクションを有効にすると、複数の small ファイルがターゲットファイルサイズに基づいて large ファイルにマージされます。本番環境でファイルコンパクションを使用する場合は、次の点に注意してください:

    • チェックポイント内のファイルのみがマージされます。少なくともチェックポイントと同じ数のファイルが生成されます。

    • ファイルはマージされるまで表示されません。ファイルの可視化時間は チェックポイント間隔 + マージ期間 です。

    • マージ時間が長いと、バックプレッシャーが発生し、チェックポイントに必要な時間が長くなる可能性があります。

    compaction.file-size

    コンパクションのターゲットファイルサイズ。

    MemorySize

    いいえ

    128 MB

    デフォルト値は、ローリングファイルサイズ sink.rolling-policy.file-size と同じです。

    sink.partition-commit.trigger

    パーティションコミットトリガーのタイプ。

    String

    いいえ

    process-time

    パーティションテーブルへの書き込みには、Flink は 2 種類のパーティションコミットトリガーを提供します:

    • process-time:このトリガーは、パーティションの作成時刻と現在のシステム時刻に基づきます。パーティションタイムエクストラクタもウォーターマークジェネレーターも必要ありません。現在のシステム時刻が、パーティション作成システム時刻と sink.partition-commit.delay の合計を超えると、パーティションはすぐにコミットされます。このトリガーはより一般的ですが、精度は低くなります。たとえば、データ遅延やエラーにより、パーティションが早すぎるタイミングでコミットされる可能性があります。

    • partition-time:このトリガーは、抽出されたパーティション時刻に基づいており、ウォーターマークの生成が必要です。ジョブはウォーターマーク生成をサポートする必要があり、パーティションは時間 (時間単位や日単位など) で分割されます。ウォーターマークがパーティション作成システム時刻と sink.partition-commit.delay の合計を超えると、パーティションはすぐにコミットされます。

    sink.partition-commit.delay

    パーティションがコミットされるまでの最大遅延。これは、この遅延が経過するまでパーティションがコミットされないことを示します。

    Duration

    いいえ

    0s

    • パーティションが日単位で作成される場合は、これを 1 d に設定します。

    • パーティションが時間単位で作成される場合は、これを 1 h に設定します。

    sink.partition-commit.watermark-time-zone

    LONG 型のウォーターマークを TIMESTAMP 型に解析するために使用されるタイムゾーン。ウォーターマークの結果の TIMESTAMP は、パーティション時刻と比較され、パーティションをコミットすべきかどうかを判断します。

    String

    いいえ

    UTC

    このパラメーターは、sink.partition-commit.trigger が partition-time に設定されている場合にのみ有効です。

    • これが正しく設定されていない場合、たとえば、ソースの rowtime が TIMESTAMP_LTZ 列で定義されているのにこのプロパティが設定されていない場合、パーティションが数時間コミットされないことがあります。デフォルト値の UTC は、ウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを意味します。

    • ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションのタイムゾーンでなければなりません。このプロパティの値は、完全なタイムゾーン名 (例:'America/Los_Angeles') またはカスタムタイムゾーン (例:'GMT-08:00') のいずれかです。

    partition.time-extractor.kind

    パーティションフィールドから時間を抽出するためのタイムエクストラクタ。

    String

    いいえ

    default

    有効な値:

    • default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを設定できます。

    • custom:エクストラクタクラスを指定する必要があります。

    partition.time-extractor.class

    PartitionTimeExtractor インターフェイスを実装するエクストラクタクラス。

    String

    いいえ

    なし

    なし。

    partition.time-extractor.timestamp-pattern

    パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。

    String

    いいえ

    なし

    デフォルトでは、最初のフィールドが yyyy-MM-dd hh:mm:ss パターンに基づいて抽出されます。

    • パーティションフィールド 'dt' からタイムスタンプを抽出するには、$dt を設定します。

    • 複数のパーティションフィールド (year、month、day、hour など) からタイムスタンプを抽出するには、$year-$month-$day $hour:00:00 を設定します。

    • 2 つのパーティションフィールド dt と hour からタイムスタンプを抽出するには、$dt $hour:00:00 を設定します。

    partition.time-extractor.timestamp-formatter

    パーティションのタイムスタンプ文字列値をタイムスタンプに変換するフォーマッタ。パーティションのタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern プロパティで表現されます。

    String

    いいえ

    yyyy-MM-dd HH:mm:ss

    たとえば、パーティションのタイムスタンプが複数のパーティションフィールド (year、month、day など) から抽出される場合、partition.time-extractor.timestamp-pattern プロパティを $year$month$day に、partition.time-extractor.timestamp-formatter プロパティを yyyyMMdd に設定できます。デフォルトのフォーマッタは yyyy-MM-dd HH:mm:ss です。このタイムスタンプフォーマッタは、Java の DateTimeFormatter と互換性があります。

    sink.partition-commit.policy.kind

    パーティションコミットポリシーのタイプ。

    String

    いいえ

    なし

    パーティションコミットポリシーは、パーティションが完全に書き込まれ、読み取り準備ができたことを下流のコンシューマーに通知します。有効な値:

    • success-file:ディレクトリに _success ファイルを追加します。

    • custom:指定されたクラスを使用してコミットポリシーを作成します。複数のコミットポリシーを同時に指定できます。

    sink.partition-commit.policy.class

    PartitionCommitPolicy インターフェイスを実装するパーティションコミットポリシークラス。

    String

    いいえ

    なし

    このクラスは、カスタムコミットポリシーでのみ使用できます。

    sink.partition-commit.success-file.name

    success-file パーティションコミットポリシーを使用する場合のファイル名。

    String

    いいえ

    _SUCCESS

    なし。

    sink.parallelism

    外部ファイルシステムにファイルを書き込むための並列度。

    Integer

    いいえ

    なし

    デフォルトでは、シンクの並列度は上流の連鎖オペレーターの並列度と同じです。上流の連鎖オペレーターとは異なる並列度を設定した場合、ファイルを書き込むオペレーターは指定されたシンクの並列度を使用します。ファイルコンパクションが有効になっている場合、コンパクションオペレーターも指定されたシンクの並列度を使用します。

    説明

    この値は 0 より大きくなければなりません。そうでない場合、例外がスローされます。

バケット認証情報の設定

説明

バケット認証情報は、リアルタイムコンピューティングエンジン VVR 8.0.6 以降でのみ設定できます。

指定したファイルシステムパスでデータを読み書きするには、[開発コンソール][デプロイメント詳細] タブにある [その他の設定] セクションに、次のコードを追加して、バケット認証情報も設定する必要があります。

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

次の表にパラメーターを示します。

設定項目

説明

fs.oss.bucket.<bucketName>.accessKeyId

パラメーターの説明:

  • <bucketName>:ファイルシステムパスパラメーターで指定したバケット名に置き換えます。

  • 既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。

    説明

    AccessKey Secret の漏洩リスクを低減するため、AccessKey Secret は作成時にのみ表示され、後で表示することはできません。安全に保管してください。

fs.oss.bucket.<bucketName>.accessKeySecret

OSS-HDFS への書き込み

まず、[開発コンソール][デプロイメント詳細] タブの [その他の構成] セクションに、次の構成を追加します:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

次の表にパラメーターを示します。

設定項目

説明

fs.oss.jindo.buckets

データが書き込まれる OSS-HDFS サービスのバケット名。複数のバケット名をセミコロンで区切って設定できます。Flink が OSS パスに書き込む際、対応するバケットが fs.oss.jindo.buckets に含まれている場合、データは OSS-HDFS サービスに書き込まれます。

fs.oss.jindo.accessKeyId

既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。

説明

AccessKey Secret の漏洩リスクを低減するため、AccessKey Secret は作成時にのみ表示され、後で表示することはできません。安全に保管してください。

fs.oss.jindo.accessKeySecret

さらに、OSS-HDFS のエンドポイントを設定する必要があります。OSS-HDFS エンドポイントは、次の 2 つの方法のいずれかで設定できます:

パラメーター設定

[Real-time Compute for Apache Flink 開発コンソール][デプロイメント詳細] タブで、[ランタイムパラメーター設定] エリアの [追加構成] セクションに、次の構成を追加します:

fs.oss.jindo.endpoint: xxx

パス設定

OSS パスで OSS-HDFS エンドポイントを設定します。

oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
  • user-defined-oss-hdfs-bucket:バケットの名前。

  • oss-hdfs-endpoint:OSS-HDFS サービスのエンドポイント。

  • fs.oss.jindo.buckets 設定項目には <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> を含める必要があります。

たとえば、バケット名が jindo-test で、エンドポイントが cn-beijing.oss-dls.aliyuncs.com の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> となり、fs.oss.jindo.buckets 設定項目には jindo-test.cn-beijing.oss-dls.aliyuncs.com を含める必要があります。

# OSS パス 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# その他の設定 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
説明

ファイルパスが hdfs://** である外部 HDFS ファイルに書き込む場合、次の設定を追加してアクセス用のユーザー名を指定または変更することもできます。

[開発コンソール][デプロイメント詳細] タブの [その他の構成] セクションに、以下の構成を追加します:

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

使用例

  • ソーステーブル

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • 結果テーブル

    • パーティションテーブルへの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- ミリ秒単位の時間
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを定義
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- ユーザーが設定したタイムゾーンが 'Asia/Shanghai' であると仮定
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      -- ファイルシステムテーブルに挿入するストリーミング SQL
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • 非パーティションテーブルへの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

重要

DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。詳細については、「DataStream コネクタの使用方法」をご参照ください。

次の例は、DataStream API を使用して OSS および OSS-HDFS に書き込む方法を示しています:

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

OSS-HDFS に書き込むには、[Real-time Compute for Apache Flink 開発コンソール][デプロイメント詳細] タブにある [実行パラメーター設定] エリアの [その他の設定] フィールドで、関連する OSS-HDFS パラメーターも設定する必要があります。 詳細については、「OSS-HDFS への書き込み」をご参照ください。

関連ドキュメント