すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Object Storage Service (OSS)

最終更新日:Dec 27, 2025

このトピックでは、Object Storage Service (OSS) コネクタの使用方法について説明します。

Alibaba Cloud Object Storage Service (OSS) は、大量のデータを保存するための、安全でコスト効率が高く、信頼性の高いクラウドストレージサービスです。99.9999999999% (トゥエルブナイン) のデータ耐久性と 99.995% のデータ可用性を提供します。OSS は、ストレージコストの最適化に役立つ複数のストレージクラスを提供します。

カテゴリ

詳細

サポートされるタイプ

ソーステーブルと結果テーブル

実行モード

バッチモードとストリームモード

データ形式

Orc、Parquet、Avro、Csv、JSON、Raw

特定のモニタリングメトリック

なし

API タイプ

DataStream と SQL

結果テーブルでのデータの更新または削除のサポート

データの挿入のみが可能です。結果テーブルのデータを更新または削除することはできません。

制限事項

  • 全般

    • Ververica Runtime (VVR) 11 以降のバージョンのみが、GZIP、BZIP2、XZ、DEFLATE などの圧縮ファイルの OSS からの読み取りをサポートします。VVR 8 は圧縮ファイルを正しく処理できません。

    • 8.0.6 より前の VVR バージョンでは、同じアカウントに属する OSS バケットとの間でのデータの読み書きのみがサポートされています。アカウント間でデータにアクセスするには、VVR 8.0.6 以降のバージョンを使用し、バケット認証を設定する必要があります。詳細については、「バケット認証の設定」をご参照ください。

    • 新しいパーティションの増分読み取りはサポートされていません。

  • 結果テーブルのみ

    OSS にデータを書き込む際、Avro、CSV、JSON、Raw などの行ストアフォーマットはサポートされていません。詳細については、「FLINK-30635」をご参照ください。

構文

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

メタデータ列

ソーステーブルのメタデータ列を読み取って、OSS データに関するメタデータを取得できます。たとえば、OSS ソーステーブルでメタデータ列 file.path を定義した場合、この列の値は行を含むファイルのパスになります。次の例は、メタデータ列の使用方法を示しています。

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

次の表に、OSS ソーステーブルでサポートされているメタデータ列を示します。

キー

データ型

説明

file.path

STRING NOT NULL

行を含むファイルのパス。

file.name

STRING NOT NULL

行を含むファイルの名前。これはファイルパスの最後の要素です。

file.size

BIGINT NOT NULL

行を含むファイルのサイズ (バイト単位)。

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

行を含むファイルの変更時刻。

WITH パラメーター

  • 全般

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    connector

    テーブルのタイプ。

    String

    はい

    なし

    値は filesystem である必要があります。

    path

    ファイルシステムパス。

    String

    はい

    なし

    パスは oss://my_bucket/my_path のような URI フォーマットである必要があります。

    説明

    VVR 8.0.6 以降では、このパラメーターを設定した後、指定されたファイルシステムパスとの間でデータを読み書きするためにバケット認証も設定する必要があります。詳細については、「バケット認証の設定」をご参照ください。

    format

    ファイル形式。

    String

    はい

    なし

    有効な値:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • ソーステーブル固有

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    source.monitor-interval

    新しいファイルを監視する間隔。値は 0 より大きい必要があります。

    Duration

    いいえ

    なし

    このパラメーターを設定しない場合、指定されたパスは一度だけスキャンされ、ソースは有界になります。

    各ファイルはそのパスによって識別され、一度だけ処理されます。

    処理されたファイルは、ソースのライフサイクル中、状態に保存されます。状態はチェックポイントとセーブポイント中に保存されます。間隔を短くすると、新しいファイルをより速く検出できますが、ファイルシステムまたはオブジェクトストアのスキャン頻度が高くなります。

  • 結果テーブル固有

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    partition.default-name

    パーティションフィールドが NULL または空文字列の場合のパーティション名。

    String

    いいえ

    _DEFAULT_PARTITION__

    なし。

    sink.rolling-policy.file-size

    ローリングされる前のファイルの最大サイズ。

    MemorySize

    いいえ

    128 MB

    ディレクトリに書き込まれたデータは、パートファイルに分割されます。パーティションのデータを受信するシンクの各サブタスクは、そのパーティションに対して少なくとも 1 つのパートファイルを作成します。ローリングポリシーに基づいて、現在進行中のパートファイルが閉じられ、新しいファイルが作成されます。ポリシーは、パートファイルのサイズと開いたままにできる最大時間に基づいてパートファイルをローリングします。

    説明

    列ストアフォーマットの場合:

    ファイルは、ローリングポリシーの基準を満たしていなくても、チェックポイント中に常にローリングされます。

    ファイルは、ポリシー基準を満たしたとき、またはチェックポイントが発生したときにローリングされます。

    行ストアフォーマットの場合、ファイルはローリングポリシーの基準を満たしたときにのみローリングされます。

    sink.rolling-policy.rollover-interval

    パートファイルがローリングされる前に開いたままにできる最大時間。

    Duration

    いいえ

    30min

    チェック頻度は sink.rolling-policy.check-interval プロパティによって制御されます。

    sink.rolling-policy.check-interval

    時間ベースのローリングポリシーをチェックする間隔。

    Duration

    いいえ

    1min

    このプロパティは、sink.rolling-policy.rollover-interval プロパティに基づいてファイルをローリングすべきかどうかをチェックする頻度を制御します。

    auto-compaction

    ストリーミング結果テーブルの自動コンパクションを有効にするかどうか。データはまず一時ファイルに書き込まれます。チェックポイントが完了すると、そのチェックポイントからの一時ファイルがマージされます。一時ファイルはマージされるまで表示されません。

    Boolean

    いいえ

    false

    ファイルコンパクションを有効にすると、小さいファイルはターゲットファイルサイズに基づいて大きいファイルにマージされます。本番環境でファイルコンパクションを使用する場合は、次の点に注意してください:

    • チェックポイント内のファイルのみがマージされます。各チェックポイントに対して少なくとも 1 つのファイルが生成されます。

    • ファイルはコンパクション前には表示されません。データ可視性遅延は チェックポイント間隔 + コンパクション時間 です。

    • コンパクション時間が長いと、バックプレッシャーが発生し、チェックポイントに必要な時間が長くなる可能性があります。

    compaction.file-size

    コンパクションされたファイルのターゲットサイズ。

    MemorySize

    いいえ

    128 MB

    デフォルト値は、sink.rolling-policy.file-size で指定されたローリングファイルサイズと同じです。

    sink.partition-commit.trigger

    パーティションをコミットするトリガーのタイプ。

    String

    いいえ

    process-time

    パーティションテーブルへの書き込みには、Flink は 2 種類のパーティションコミットトリガーを提供します:

    • process-time:パーティションコミットトリガーは、パーティション作成時間と現在のシステム時間に基づきます。パーティション時間エクストラクターやウォーターマークジェネレーターは必要ありません。現在のシステム時間が、パーティション作成システム時間と sink.partition-commit.delay の値の合計を超えると、パーティションはすぐにコミットされます。このトリガーはより一般的ですが、精度は低くなります。たとえば、データ遅延や障害により、パーティションが早期にコミットされる可能性があります。

    • partition-time:このトリガーは、抽出されたパーティション時間に基づいており、ウォーターマークの生成が必要です。ジョブはウォーターマーク生成をサポートする必要があり、パーティションは時間 (時間単位や日単位など) に基づいて作成されます。ウォーターマークが、パーティション作成システム時間と sink.partition-commit.delay の値の合計を超えると、パーティションはすぐにコミットされます。

    sink.partition-commit.delay

    パーティションがコミットされるまでの最大遅延。これは、この遅延が経過するまでパーティションがコミットされないことを意味します。

    Duration

    いいえ

    0s

    • パーティションが日単位で作成される場合は、これを 1 d に設定できます。

    • パーティションが時間単位で作成される場合は、これを 1 h に設定します。

    sink.partition-commit.watermark-time-zone

    LONG 型のウォーターマークを TIMESTAMP に解析するために使用されるタイムゾーン。結果の TIMESTAMP はパーティション時間と比較され、パーティションをコミットすべきかどうかを判断します。

    String

    いいえ

    UTC

    このパラメーターは、sink.partition-commit.trigger が `partition-time` に設定されている場合にのみ有効です。

    • これが正しく設定されていない場合、たとえば、ソースの行時間が TIMESTAMP_LTZ 列で定義されていて、このプロパティが設定されていない場合、パーティションのコミットが数時間後になることがあります。デフォルト値は UTC であり、これはウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを意味します。

    • ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションタイムゾーンである必要があります。このプロパティの有効な値は、完全なタイムゾーン名 (例: 'America/Los_Angeles') またはカスタムタイムゾーン (例: 'GMT-08:00') のいずれかです。

    partition.time-extractor.kind

    パーティションフィールドから時間を抽出する時間エクストラクター。

    String

    いいえ

    default

    有効な値:

    • default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを設定できます。

    • custom:エクストラクタクラスを指定する必要があります。

    partition.time-extractor.class

    PartitionTimeExtractor インターフェイスを実装するエクストラクタクラス。

    String

    いいえ

    なし

    なし。

    partition.time-extractor.timestamp-pattern

    パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。

    String

    いいえ

    なし

    デフォルトでは、最初のフィールドは yyyy-MM-dd hh:mm:ss パターンを使用して抽出されます。

    • パーティションフィールド 'dt' からタイムスタンプを抽出するには、`$dt` と設定できます。

    • 複数のパーティションフィールド (年、月、日、時など) からタイムスタンプを抽出するには、$year-$month-$day $hour:00:00 と設定できます。

    • 2 つのパーティションフィールド dt と hour からタイムスタンプを抽出するには、$dt $hour:00:00 と設定できます。

    partition.time-extractor.timestamp-formatter

    パーティションのタイムスタンプ文字列値をタイムスタンプに変換するフォーマッタ。パーティションのタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern プロパティで表現されます。

    String

    いいえ

    yyyy-MM-dd HH:mm:ss

    たとえば、パーティションのタイムスタンプが複数のパーティションフィールド (年、月、日など) から抽出される場合、partition.time-extractor.timestamp-pattern プロパティを $year$month$day に、partition.time-extractor.timestamp-formatter プロパティを `yyyyMMdd` に設定できます。デフォルトのフォーマッターは yyyy-MM-dd HH:mm:ss です。このタイムスタンプフォーマッターは、Java の DateTimeFormatter と互換性があります。

    sink.partition-commit.policy.kind

    パーティションコミットポリシーのタイプ。

    String

    いいえ

    なし

    パーティションコミットポリシーは、パーティションの書き込みが完了し、読み取り準備ができたことをダウンストリームコンシューマーに通知します。有効な値:

    • success-file:ディレクトリに `_success` ファイルを追加します。

    • custom:指定されたクラスを使用してコミットポリシーを作成します。複数のコミットポリシーを同時に指定できます。

    sink.partition-commit.policy.class

    PartitionCommitPolicy インターフェイスを実装するパーティションコミットポリシークラス。

    String

    いいえ

    なし

    このクラスは `custom` コミットポリシーでのみ使用できます。

    sink.partition-commit.success-file.name

    `success-file` パーティションコミットポリシーで使用するファイルの名前。

    String

    いいえ

    _SUCCESS

    なし。

    sink.parallelism

    外部ファイルシステムにファイルを書き込むための並列度。

    Integer

    いいえ

    なし

    デフォルトでは、シンクの並列度はアップストリームのチェインされたオペレーターの並列度と同じです。異なる並列度を設定した場合、ファイル書き込みオペレーターは指定されたシンクの並列度を使用します。ファイルコンパクションが有効な場合、コンパクションオペレーターも指定されたシンクの並列度を使用します。

    説明

    この値は 0 より大きくなければなりません。そうでない場合、例外がスローされます。

バケット認証の設定

説明

VVR 8.0.6 以降のバージョンのみがバケット認証の設定をサポートします。

ファイルシステムパスを指定した後、指定したパスとの間でデータを読み書きするには、バケット認証も設定する必要があります。バケット認証を設定するには、リアルタイムコンピューティング開発コンソール[デプロイメント詳細]ページの[パラメーター]タブの[追加設定]セクションに、次のコードを追加します。

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

次の表にパラメーターを示します。

設定項目

説明

fs.oss.bucket.<bucketName>.accessKeyId

パラメーターの説明:

  • <bucketName>:ファイルシステムパスパラメーターに入力したバケット名に置き換えます。

  • 既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。

    説明

    AccessKey Secret の漏洩リスクを軽減するため、AccessKey Secret は作成時に一度しか表示されず、後で確認することはできません。安全に保管してください。

fs.oss.bucket.<bucketName>.accessKeySecret

OSS-HDFS への書き込み

まず、リアルタイムコンピューティング開発コンソール[デプロイメント詳細] ページの[パラメーター] タブの[追加設定] セクションに、次の構成を追加します。

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

次の表にパラメーターを示します。

設定項目

説明

fs.oss.jindo.buckets

書き込み先の OSS-HDFS サービス内のバケット名。複数のバケットをセミコロンで区切って設定できます。Flink が OSS パスに書き込む際、対応するバケットが `fs.oss.jindo.buckets` に含まれている場合、データは OSS-HDFS サービスに書き込まれます。

fs.oss.jindo.accessKeyId

既存の AccessKey を使用するか、新しい AccessKey を作成します。詳細については、「AccessKey の作成」をご参照ください。

説明

AccessKey Secret の漏洩リスクを軽減するため、AccessKey Secret は作成時に一度しか表示されず、後で確認することはできません。安全に保管してください。

fs.oss.jindo.accessKeySecret

OSS-HDFS エンドポイントも設定する必要があります。OSS-HDFS エンドポイントは 2 つの方法で設定できます:

パラメーター設定

[リアルタイムコンピューティング開発コンソール][デプロイメント詳細] ページの [パラメーター] タブの [追加設定] セクションに、以下の構成を追加できます。

fs.oss.jindo.endpoint: xxx

パス設定

OSS パスで OSS-HDFS エンドポイントを設定できます。

oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
  • user-defined-oss-hdfs-bucket:バケットの名前。

  • oss-hdfs-endpoint:OSS-HDFS エンドポイント。

  • fs.oss.jindo.buckets 設定項目には <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> を含める必要があります。

たとえば、バケット名が `jindo-test` でエンドポイントが `cn-beijing.oss-dls.aliyuncs.com` の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> である必要があり、fs.oss.jindo.buckets 設定項目には jindo-test.cn-beijing.oss-dls.aliyuncs.com を含める必要があります。

# OSS パス 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# 追加設定 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
説明

外部 HDFS ファイルに書き込む際、ファイルパスが hdfs://** の場合、次の設定を追加してアクセスユーザー名を指定または切り替えることもできます。

リアルタイムコンピューティング開発コンソール[デプロイメント詳細]ページにある[パラメーター]タブの[追加設定]セクションに、以下の構成を追加できます。

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

  • ソーステーブル

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • 結果テーブル

    • パーティションテーブルへの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- ミリ秒単位の時間
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを定義
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- ユーザー設定のタイムゾーンが 'Asia/Shanghai' であると仮定
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      -- ファイルシステムテーブルに挿入するストリーミング SQL
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • 非パーティションテーブルへの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

重要

DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定の詳細については、「DataStream コネクタの使用」をご参照ください。

次のコードは、DataStream API を使用して OSS および OSS-HDFS に書き込む方法の例を示しています。

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

OSS-HDFS に書き込むには、[リアルタイムコンピューティング開発コンソール][デプロイメント詳細] ページの [パラメーター] タブにある [追加設定] セクションで、関連する OSS-HDFS パラメーターも設定する必要があります。 詳細については、「OSS-HDFS への書き込み」をご参照ください。

関連ドキュメント