すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:Realtime Compute for Apache Flink を使用した OSS または OSS-HDFS の読み書き

最終更新日:Dec 21, 2025

Realtime Compute for Apache Flink を使用すると、Object Storage Service (OSS) および OSS にデプロイされた Hadoop 分散ファイルシステム (HDFS) (OSS-HDFS) との間でデータを読み書きできます。 OSS または OSS-HDFS コネクタのプロパティを設定すると、Realtime Compute for Apache Flink は指定されたパスからデータを自動的に読み取り、入力ストリームとして使用します。 その後、Realtime Compute for Apache Flink は、計算結果を指定されたフォーマットで OSS または OSS-HDFS の指定されたパスに書き込みます。

前提条件

  • フルマネージド Flink が有効化されている必要があります。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

    フルマネージド Flink を有効化すると、作成された ワークスペース が 5〜10 分以内に [フルマネージド Flink] タブに表示されます。

  • SQL ジョブが作成されていること。

    SQL ジョブを作成する際に、Flink コンピュートエンジンとして Ververica Runtime (VVR) 8.0.1 以降を選択します。 詳細については、「ジョブの作成」をご参照ください。

制限事項

  • 同一の Alibaba Cloud アカウント内の OSS または OSS-HDFS サービスとの間でのみ、データの読み書きが可能です。

  • OSS にデータを書き込む場合、Avro、CSV、JSON、Raw などのローストアフォーマットではデータを書き込むことはできません。 詳細については、「FLINK-30635」をご参照ください。

操作手順

  1. SQL ドラフト作成ページに移動します。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

      開発コンソールが表示されます。

    3. 左側のナビゲーションメニューで、[開発] > [ETL] を選択します。

  2. SQL エディターで、データ定義言語 (DDL) とデータ操作言語 (DML) のコードを記述します。

    この例では、`srcbucket` バケットの `dir` パスにあるソーステーブルから、`destbucket` バケットの `test` パスにある結果テーブルにデータを書き込みます。

    説明

    次のコードを使用して OSS-HDFS からデータを読み取る場合は、srcbucket と destbucket のバケットで OSS-HDFS サービスが有効になっていることを確認してください。

    CREATE TEMPORARY TABLE source_table (
     `file.name` STRING NOT NULL,
     `file.path` STRING NOT NULL METADATA
    ) WITH (
      'connector'='filesystem',
      'path'='oss://srcbucket/dir/',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE target_table(
     `name` STRING,
     `path` STRING 
    ) with (
      'connector'='filesystem',
      'path'='oss://destbucket/test/',
      'format'='parquet'
    );
    
    INSERT INTO target_table SELECT * FROM source_table ;

    file.path や file.name などのソーステーブルでサポートされているメタデータ列、および WITH パラメーターの使用方法の詳細については、「Object Storage Service (OSS) コネクタ」をご参照ください。

  3. [保存] をクリックします。

  4. [詳細チェック] をクリックします。

    高度なチェック機能は、ジョブの SQL セマンティクス、ネットワーク接続、およびジョブで使用されるテーブルのメタデータを検査します。 また、結果エリアで [SQL 最適化] をクリックすると、潜在的な SQL の脅威とそれに対応する最適化の提案を表示できます。

  5. [デプロイ] をクリックします。

    ジョブを開発し、高度なチェックを完了した後、ジョブを本番環境にデプロイできます。

  6. (任意) このステップは、OSS-HDFS サービスからデータを読み取る場合にのみ必要です。

    ジョブをクリックします。 [デプロイメント詳細] タブの [実行パラメーター] [設定] セクションで、以下のように OSS-HDFS サービスの AccessKey ペア、エンドポイント、およびその他の情報を設定します。 その後、[保存] をクリックします。

    fs.oss.jindo.buckets: srcbucket;destbucket
    fs.oss.jindo.accessKeyId: LTAI**************** 
    fs.oss.jindo.accessKeySecret: yourAccessKeySecret
    fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.com

    次の表に設定項目を示します。

    設定項目

    説明

    fs.oss.jindo.buckets

    ソーステーブルのデータが格納されているバケットの名前と、結果テーブルのデータが書き込まれるバケットの名前です。 バケット名はセミコロン (;) で区切ります。 例:srcbucket;destbucket

    fs.oss.jindo.accessKeyId

    ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID です。 AccessKey ID の取得方法については、「RAM ユーザーの AccessKey 情報を表示」をご参照ください。

    fs.oss.jindo.accessKeySecret

    既存の AccessKey を使用するか、新しい AccessKey を作成します。 詳細については、「AccessKey ペアの作成」をご参照ください。 注意:AccessKey Secret の漏洩リスクを低減するため、AccessKey Secret は作成時にのみ表示され、後で表示することはできません。 安全に保管してください。

    fs.oss.jindo.endpoint

    OSS-HDFS サービスのエンドポイントです。 例:cn-hangzhou.oss-dls.aliyuncs.com。

  7. [ジョブ O&M] ページで [開始] をクリックし、ジョブが [実行中] 状態になるまで待ちます。

  8. OSS または OSS-HDFS の結果テーブルの指定されたストレージパスに書き込まれたデータを表示します。

    データが OSS に書き込まれた場合は、OSS コンソールのファイルリストの [OSS] タブで表示できます。 データが OSS-HDFS に書き込まれた場合は、OSS コンソールのファイルリストの [HDFS] タブで表示できます。