すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:OSSコネクタ

最終更新日:Jan 07, 2025

このトピックでは、オブジェクトストレージサービス(OSS)コネクタの使用方法について説明します。

Alibaba Cloud OSS は、99.9999999999%(12個の9)のデータ耐久性と 99.995% のデータ可用性を提供する、安全で費用対効果の高いオブジェクトストレージサービスです。OSS は、ストレージコストを管理および削減するための複数のストレージクラスを提供します。次の表に、OSSコネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

ソーステーブルと結果テーブル。

実行モード

バッチモードとストリーミングモード。

データ形式

ORC、PARQUET、AVRO、CSV、JSON、および RAW。

説明

Ververica Runtime(VVR)6.0.7 以降を使用する Apache Flink 用 Realtime Compute のみ、Parquet 形式のデータを読み取ることができます。

メトリック

なし。

APIタイプ

DataStream API と SQL API。

結果テーブルでのデータの更新または削除

結果テーブルのデータは更新または削除できません。データは結果テーブルにのみ挿入できます。

制限事項

  • 一般的な制限事項

    • VVR 4.0.14 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS からデータを読み取ったり、OSS にデータを書き込んだりできます。

    • VVR 8.0.6 以前を使用する Apache Flink 用 Realtime Compute のみ、Alibaba Cloud アカウント内の OSS バケットからのみデータを読み取ったり、書き込んだりできます。

      説明

      他の Alibaba Cloud アカウント内の OSS バケットからデータを読み書きする場合は、VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute を使用し、OSS バケットの認証情報を構成します。詳細については、このトピックのOSS バケットの認証情報の構成セクションをご参照ください。

  • 結果テーブルのみの制限事項

    • AVRO、CSV、JSON、RAW などの行指向ストレージ形式のデータは、OSS に書き込むことができません。詳細については、FLINK-30635 をご参照ください。

    • VVR 6.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS-HDFS にデータを書き込むことができます。詳細については、このトピックのOSS-HDFS へのデータの書き込みセクションをご参照ください。

構文

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

メタデータ列

OSS ソーステーブルにメタデータ列を指定して、OSS の関連メタデータを取得できます。たとえば、OSS ソーステーブルに file.path という名前のメタデータ列を指定すると、列の値は各データ行が格納されているファイルのパスになります。サンプルコード:

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

次の表に、OSS ソーステーブルでサポートされているメタデータ列を示します。

キー

データ型

説明

file.path

STRING NOT NULL

各データ行が格納されているファイルのパス。

file.name

STRING NOT NULL

各データ行が格納されているファイルの名前。ファイル名は、ファイルのルートパスから最も遠い要素です。

file.size

BIGINT NOT NULL

各データ行が格納されているファイルのバイト数。

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

各データ行が格納されているファイルが変更された時刻。

WITH 句のパラメータ

  • 共通パラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    デフォルト値なし

    値を filesystem に設定します。

    path

    ファイルシステムのパス。

    String

    はい

    デフォルト値なし

    パスは Uniform Resource Identifier(URI)形式です。例:oss://my_bucket/my_path

    説明

    VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute でこのパラメータを構成した後、ファイルシステムの指定されたパスからデータを読み取ったり、書き込んだりする OSS バケットの認証情報を構成する必要があります。詳細については、このトピックのOSS バケットの認証情報の構成セクションをご参照ください。

    format

    ファイルの形式。

    String

    はい

    デフォルト値なし

    有効な値:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • ソーステーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    source.monitor-interval

    ソーステーブルが新しいファイルの生成を監視する間隔。このパラメータの値は 0 より大きくする必要があります。

    Duration

    いいえ

    デフォルト値なし

    このパラメータを構成しない場合、指定されたパスは 1 回だけスキャンされます。この場合、データソースは有界です。

    各ファイルは、ファイルのパスによって一意に識別されます。新しいファイルが検出されるたびに、ファイルが処理されます。

    処理されたファイルは、データソースのライフサイクル中に状態に格納されます。したがって、データソースの状態は、チェックポイントとセーブポイントが生成されたときに保存されます。このパラメータを小さい値に設定すると、新しいファイルをすばやく検出できますが、ファイルシステムまたは OSS が頻繁にトラバースされます。

  • 結果テーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    partition.default-name

    パーティションフィールドの値が null または空の文字列の場合のパーティションの名前。

    String

    いいえ

    _DEFAULT_PARTITION__

    該当なし

    sink.rolling-policy.file-size

    ファイルがロールされる前のファイルの最大サイズ。

    MemorySize

    いいえ

    128 MB

    指定されたディレクトリに書き込まれるデータは分割され、PART ファイルに保存されます。パーティションのシンクオペレータがパーティションのデータを受信する各サブタスクは、少なくとも 1 つの PART ファイルを生成します。ファイルは、構成されているローリングポリシーに基づいてロールされます。処理中の現在の PART ファイルを閉じる必要がある場合は、新しい PART ファイルが生成されます。PART ファイルは、ファイルのサイズと、指定されたファイルを開いたままにできる最大期間に基づいてロールされます。

    説明

    ファイルに列指向ストレージが使用されている場合、

    ファイルがローリングポリシーの要件を満たしていなくても、チェックポイントの実行時にファイルはロールされます。

    この場合、ファイルがローリングポリシーの要件を満たしているか、チェックポイントが実行されている場合、ファイルは常にロールされます。

    ファイルに行指向ストレージが使用されている場合、ファイルがローリングポリシーの要件を満たしている場合にのみ、ファイルはロールされます。

    sink.rolling-policy.rollover-interval

    PART ファイルがロールされる前に開いたままにできる最大期間。

    Duration

    いいえ

    30 分

    チェック頻度は、sink.rolling-policy.check-interval パラメータで指定されます。

    sink.rolling-policy.check-interval

    Apache Flink 用 Realtime Compute が、ローリングポリシーに基づいてファイルをロールする必要があるかどうかをチェックする間隔。

    Duration

    いいえ

    1 分

    このパラメータは、sink.rolling-policy.rollover-interval パラメータの値に基づいて、指定されたファイルをロールする必要があるかどうかを指定します。

    auto-compaction

    ストリーミング結果テーブルの自動ファイルマージ機能を有効にするかどうかを指定します。この機能が有効になっている場合、データは最初に一時ファイルに書き込まれます。チェックポイント操作が完了すると、チェックポイント中に生成された一時ファイルがマージされます。一時ファイルは、マージされるまで表示されません。

    Boolean

    いいえ

    false

    自動ファイルマージ機能が有効になっている場合、複数の小さなファイルが、指定された宛先ファイルのサイズに基づいて大きなファイルにマージされます。本番環境でこの機能を使用する場合は、次の点に注意してください。

    • チェックポイント中に生成されたファイルのみをマージできます。生成されるファイルの数は、チェックポイントの数以上になります。

    • 一時ファイルは、マージされるまで表示されません。チェックポイント間隔とマージ期間が経過すると、ファイルが表示されます。

    • マージ期間が過度に長い場合、バックプレッシャーが発生する可能性があります。これにより、チェックポイントに必要な時間が長くなります。

    compaction.file-size

    一時ファイルがマージされる宛先ファイルのサイズ。

    MemorySize

    いいえ

    128 MB

    デフォルト値は、sink.rolling-policy.file-size パラメータの値と同じです。

    sink.partition-commit.trigger

    パーティションコミットトリガーのタイプ。

    String

    いいえ

    process-time

    Apache Flink 用 Realtime Compute は、パーティションテーブルにデータを書き込むために使用できる次のタイプのパーティションコミットトリガーを提供します。

    • process-time:このパーティションコミットトリガーは、パーティションの作成時刻と現在のシステム時刻に基づいて使用され、パーティション時間エクストラクタまたはウォーターマークジェネレータを必要としません。現在のシステム時刻がパーティションの作成時刻と sink.partition-commit.delay パラメータで指定された時間の合計を超えると、process-time トリガーはすぐにパーティションをコミットします。このトリガーは一般的な目的に適していますが、正確性は保証されません。たとえば、データの遅延または障害が発生した場合、このトリガーは予想よりも早くパーティションをコミットする可能性があります。

    • partition-time:このパーティションコミットトリガーは、抽出されたパーティション作成時刻に基づいて使用され、ウォーターマークジェネレータが必要です。デプロイメントにこのトリガーを使用する場合は、デプロイメントがウォーターマーク生成をサポートしており、データが時間(時または日など)でパーティション分割されていることを確認してください。ウォーターマークで指定された時刻がパーティションの作成時刻と sink.partition-commit.delay パラメータで指定された時間の合計を超えると、partition-time トリガーはすぐにパーティションをコミットします。

    sink.partition-commit.delay

    パーティションをコミットできるようになる前に許容される最大遅延。遅延時間に達するまで、パーティションはコミットされません。

    Duration

    いいえ

    0 秒

    • データが日でパーティション分割されている場合、このパラメータを 1 d に設定する必要があります。

    • データが時間でパーティション分割されている場合、このパラメータを 1 h に設定する必要があります。

    sink.partition-commit.watermark-time-zone

    LONG データ型のウォーターマークを TIMESTAMP データ型のウォーターマークに変換するときに使用されるタイムゾーン。変換後、Apache Flink 用 Realtime Compute は、TIMESTAMP データ型のウォーターマークをパーティション作成時刻と比較して、パーティションをコミットする必要があるかどうかを判断します。

    String

    いいえ

    UTC

    このパラメータは、sink.partition-commit.trigger パラメータが partition-time に設定されている場合にのみ有効になります。

    • sink.partition-commit.watermark-time-zone パラメータの構成は、パーティションのコミットメントに影響します。たとえば、ソーステーブルの TIMESTAMP_LTZ 列に ROWTIME を指定し、このパラメータを構成しない場合、パーティションは予想時刻よりも数時間遅れてコミットされる可能性があります。デフォルト値の UTC は、ウォーターマークが TIMESTAMP 列で定義されているか、ウォーターマークが定義されていないことを示します。

    • ウォーターマークが TIMESTAMP_LTZ 列で定義されている場合、ウォーターマークのタイムゾーンはセッションタイムゾーンである必要があります。このパラメータの有効な値は、「America/Los_Angeles」などのタイムゾーンのフルネーム、または「GMT-08:00」などのカスタムタイムゾーンです。

    partition.time-extractor.kind

    パーティションフィールドから時間を抽出する時間エクストラクタ。

    String

    いいえ

    default

    有効な値:

    • default:デフォルトでは、タイムスタンプパターンまたはフォーマッタを構成できます。これはデフォルト値です。

    • custom:エクストラクタクラスを指定する必要があります。

    partition.time-extractor.class

    PartitionTimeExtractor インターフェースを実装するエクストラクタクラス。

    String

    いいえ

    デフォルト値なし

    該当なし

    partition.time-extractor.timestamp-pattern

    パーティションフィールドを使用して有効なタイムスタンプパターンを取得できるデフォルトの構築メソッド。

    String

    いいえ

    デフォルト値なし

    デフォルトでは、最初のフィールドは yyyy-MM-dd hh:mm:ss 形式で抽出されます。

    • dt パーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを $dt に設定できます。

    • 年、月、日、時間など、複数のパーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを $year-$month-$day $hour:00:00 に設定できます。

    • dt および hour パーティションフィールドからパーティションのタイムスタンプを抽出する場合は、このパラメータを $dt $hour:00:00 に設定できます。

    partition.time-extractor.timestamp-formatter

    パーティションタイムスタンプ文字列値をタイムスタンプに変換するために使用されるフォーマッタ。パーティションタイムスタンプ文字列値は、partition.time-extractor.timestamp-pattern パラメータで指定されます。

    String

    いいえ

    yyyy-MM-dd HH:mm:ss

    たとえば、年、月、日など、複数のパーティションフィールドからパーティションのタイムスタンプを抽出する場合は、partition.time-extractor.timestamp-pattern パラメータを $year$month$day に、partition.time-extractor.timestamp-formatter パラメータを yyyyMMdd に設定できます。このパラメータのデフォルト値は yyyy-MM-dd HH:mm:ss です。このパラメータで指定されたタイムスタンプフォーマッタは、Java の DateTimeFormatter と互換性があります。

    sink.partition-commit.policy.kind

    パーティションコミットポリシーのタイプ。

    String

    いいえ

    デフォルト値なし

    パーティションコミットポリシーにより、Apache Flink 用 Realtime Compute は、パーティションへのデータ書き込みが完了し、パーティションからデータを読み取ることができることをダウンストリームアプリケーションに通知できます。有効な値:

    • success-file:_success ファイルが指定されたディレクトリに追加されます。

    • custom:指定されたクラスに基づいてコミットポリシーが作成されます。複数のコミットポリシーを同時に指定できます。

    sink.partition-commit.policy.class

    PartitionCommitPolicy インターフェースを実装するパーティションコミットポリシークラス。

    String

    いいえ

    デフォルト値なし

    このクラスは、sink.partition-commit.policy.kind パラメータが custom に設定されている場合にのみ使用できます。

    sink.partition-commit.success-file.name

    sink.partition-commit.policy.kind パラメータが success-file に設定されている場合に使用されるファイルの名前。

    String

    いいえ

    _SUCCESS

    該当なし

    sink.parallelism

    ファイルシステムへのファイル書き込みの並列度。

    Integer

    いいえ

    デフォルト値なし

    デフォルトでは、sink.parallelism パラメータの値は、アップストリームチェーンオペレータの並列度と同じです。sink.parallelism パラメータの値がアップストリームチェーンオペレータの並列度と異なる場合、ファイルを書くオペレータはこのパラメータの値を使用します。ファイルマージが有効になっている場合、ファイルをマージするオペレータもこのパラメータの値を使用します。

    説明

    値は 0 より大きくする必要があります。そうでない場合、エラーが発生します。

OSSバケットの認証情報を構成する

説明

VVR 8.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、OSS バケットの認証情報を構成できます。

ファイルシステムのパスを指定した後、OSS バケットの認証情報を構成する必要があります。これにより、ファイルシステムの指定されたパスからデータを読み取ったり、書き込んだりできます。OSS バケットの認証情報を構成するには、次の手順を実行します。 Apache Flink 用 Realtime Compute の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

次の表に、上記の構成のパラメータを示します。

パラメータ

説明

fs.oss.bucket.<bucketName>.accessKeyId

パラメータ:

  • <bucketName>:このパラメータを、宛先ファイルシステムの URI で指定した OSS バケットの名前に置き換えます。

  • accessKeyId:OSS バケットにアクセスするために使用される Alibaba Cloud アカウントの AccessKey ID を入力します。Alibaba Cloud アカウントの AccessKey ID の取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。

  • accessKeySecret:OSS バケットにアクセスするために使用される Alibaba Cloud アカウントの AccessKey シークレットを入力します。Alibaba Cloud アカウントの AccessKey シークレットの取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。

fs.oss.bucket.<bucketName>.accessKeySecret

OSS-HDFS へのデータの書き込み

Apache Flink 用 Realtime Compute の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

次の表に、上記の構成のパラメータを示します。

パラメータ

説明

fs.oss.jindo.buckets

データが書き込まれる OSS-HDFS サービスのバケットの名前。複数のバケット名を指定できます。バケット名はセミコロン(;)で区切ります。Flink が OSS パスにデータを書き込むとき、関連するバケット名が fs.oss.jindo.buckets パラメータの値に含まれている場合、データは OSS-HDFS サービスに書き込まれます。

fs.oss.jindo.accessKeyId

Alibaba Cloud アカウントの AccessKey ID。Alibaba Cloud アカウントの AccessKey シークレットの取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。

fs.oss.jindo.accessKeySecret

Alibaba Cloud アカウントの AccessKey シークレット。Alibaba Cloud アカウントの AccessKey シークレットの取得方法の詳細については、RAM ユーザーの AccessKey ペアに関する情報を表示するをご参照ください。

また、OSS-HDFS サービスのエンドポイントも構成する必要があります。次のいずれかの方法を使用して、OSS-HDFS サービスのエンドポイントを構成できます。

  • [構成] タブの Apache Flink 用 Realtime Compute の開発コンソールRealtime Compute for Apache Flink の開発コンソールで、[パラメータ] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。

    fs.oss.jindo.endpoint: xxx
  • OSS パスで OSS-HDFS サービスのエンドポイントを構成します。

    サンプルコード:

    oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>

    OSSパスでは、user-defined-oss-hdfs-bucket は関連バケットの名前を指定し、oss-hdfs-endpoint は OSS-HDFS サービスのエンドポイントを指定します。 fs.oss.jindo.buckets パラメーターの値には、<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint> が含まれている必要があります。

    たとえば、バケット名が jindo-test で、OSS-HDFS サービスのエンドポイントが

    たとえば、バケット名が jindo-test で、OSS-HDFS サービスのエンドポイントが cn-beijing.oss-dls.aliyuncs.com の場合、OSS パスは oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir> である必要があり、fs.oss.jindo.buckets パラメータの値には jindo-test.cn-beijing.oss-dls.aliyuncs.com が含まれている必要があります。

サンプルコード

  • ソーステーブルのサンプルコード

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • 結果テーブルのサンプルコード

    • パーティションテーブルへのデータの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- ミリ秒単位の時間を使用します。
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- TIMESTAMP_LTZ 列にウォーターマークを指定します。
      ) WITH (
        'connector' = 'datagen'
      );
      
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- タイムゾーンは「Asia/Shanghai」です。
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      
      -- 次のストリーミング SQL ステートメントを実行して、ファイルシステムテーブルにデータを挿入します。
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • 非パーティションテーブルへのデータの書き込み

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

重要

DataStream API を呼び出してデータを読み書きする場合は、関連タイプの DataStream コネクタを使用して Apache Flink 用 Realtime Compute に接続する必要があります。DataStream コネクタの構成方法の詳細については、DataStream コネクタの設定をご参照ください。

次のサンプルコードは、DataStream API を使用して OSS または OSS-HDFS にデータを書き込む方法を示しています。

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

OSS-HDFS にデータを書き込む場合は、上記のコードを実行し、次の手順を実行します。 Apache Flink 用 Realtime Compute の開発コンソールRealtime Compute for Apache Flink の開発コンソールにログオンします。[構成] タブの [デプロイメント] ページで、[パラメータ] セクションの右上隅にある [編集] をクリックし、OSS-HDFS 関連の構成を [その他の構成] フィールドに追加します。詳細については、このトピックのOSS-HDFS へのデータの書き込みセクションをご参照ください。

参考資料