Realtime Compute for Apache Flink を使用すると、Object Storage Service (OSS) および OSS にデプロイされた Hadoop 分散ファイルシステム (HDFS) (OSS-HDFS) との間でデータを読み書きできます。 OSS または OSS-HDFS コネクタのプロパティを設定すると、Realtime Compute for Apache Flink は指定されたパスからデータを自動的に読み取り、入力ストリームとして使用します。 その後、Realtime Compute for Apache Flink は、計算結果を指定されたフォーマットで OSS または OSS-HDFS の指定されたパスに書き込みます。
前提条件
フルマネージド Flink が有効化されている必要があります。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
フルマネージド Flink を有効化すると、作成された ワークスペース が 5〜10 分以内に [フルマネージド Flink] タブに表示されます。
SQL ジョブが作成されていること。
SQL ジョブを作成する際に、Flink コンピュートエンジンとして Ververica Runtime (VVR) 8.0.1 以降を選択します。 詳細については、「ジョブの作成」をご参照ください。
制限事項
同一の Alibaba Cloud アカウント内の OSS または OSS-HDFS サービスとの間でのみ、データの読み書きが可能です。
OSS にデータを書き込む場合、Avro、CSV、JSON、Raw などのローストアフォーマットではデータを書き込むことはできません。 詳細については、「FLINK-30635」をご参照ください。
操作手順
SQL ドラフト作成ページに移動します。
対象のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
開発コンソールが表示されます。
左側のナビゲーションメニューで、 を選択します。
SQL エディターで、データ定義言語 (DDL) とデータ操作言語 (DML) のコードを記述します。
この例では、`srcbucket` バケットの `dir` パスにあるソーステーブルから、`destbucket` バケットの `test` パスにある結果テーブルにデータを書き込みます。
説明次のコードを使用して OSS-HDFS からデータを読み取る場合は、srcbucket と destbucket のバケットで OSS-HDFS サービスが有効になっていることを確認してください。
CREATE TEMPORARY TABLE source_table ( `file.name` STRING NOT NULL, `file.path` STRING NOT NULL METADATA ) WITH ( 'connector'='filesystem', 'path'='oss://srcbucket/dir/', 'format'='parquet' ); CREATE TEMPORARY TABLE target_table( `name` STRING, `path` STRING ) with ( 'connector'='filesystem', 'path'='oss://destbucket/test/', 'format'='parquet' ); INSERT INTO target_table SELECT * FROM source_table ;file.path や file.name などのソーステーブルでサポートされているメタデータ列、および WITH パラメーターの使用方法の詳細については、「Object Storage Service (OSS) コネクタ」をご参照ください。
[保存] をクリックします。
[詳細チェック] をクリックします。
高度なチェック機能は、ジョブの SQL セマンティクス、ネットワーク接続、およびジョブで使用されるテーブルのメタデータを検査します。 また、結果エリアで [SQL 最適化] をクリックすると、潜在的な SQL の脅威とそれに対応する最適化の提案を表示できます。
[デプロイ] をクリックします。
ジョブを開発し、高度なチェックを完了した後、ジョブを本番環境にデプロイできます。
(任意) このステップは、OSS-HDFS サービスからデータを読み取る場合にのみ必要です。
ジョブをクリックします。 [デプロイメント詳細] タブの [実行パラメーター] [設定] セクションで、以下のように OSS-HDFS サービスの AccessKey ペア、エンドポイント、およびその他の情報を設定します。 その後、[保存] をクリックします。
fs.oss.jindo.buckets: srcbucket;destbucket fs.oss.jindo.accessKeyId: LTAI**************** fs.oss.jindo.accessKeySecret: yourAccessKeySecret fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.com次の表に設定項目を示します。
設定項目
説明
fs.oss.jindo.buckets
ソーステーブルのデータが格納されているバケットの名前と、結果テーブルのデータが書き込まれるバケットの名前です。 バケット名はセミコロン (;) で区切ります。 例:
srcbucket;destbucket。fs.oss.jindo.accessKeyId
ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID です。 AccessKey ID の取得方法については、「RAM ユーザーの AccessKey 情報を表示」をご参照ください。
fs.oss.jindo.accessKeySecret
既存の AccessKey を使用するか、新しい AccessKey を作成します。 詳細については、「AccessKey ペアの作成」をご参照ください。 注意:AccessKey Secret の漏洩リスクを低減するため、AccessKey Secret は作成時にのみ表示され、後で表示することはできません。 安全に保管してください。
fs.oss.jindo.endpoint
OSS-HDFS サービスのエンドポイントです。 例:cn-hangzhou.oss-dls.aliyuncs.com。
[ジョブ O&M] ページで [開始] をクリックし、ジョブが [実行中] 状態になるまで待ちます。
OSS または OSS-HDFS の結果テーブルの指定されたストレージパスに書き込まれたデータを表示します。
データが OSS に書き込まれた場合は、OSS コンソールのファイルリストの [OSS] タブで表示できます。 データが OSS-HDFS に書き込まれた場合は、OSS コンソールのファイルリストの [HDFS] タブで表示できます。