Realtime Compute for Apache Flink can read data from and write data to Object Storage Service (OSS) and Hadoop Distributed File System (HDFS) deployed on OSS (OSS-HDFS) using the filesystem connector. After you configure the connector properties, Flink automatically reads from the specified path as an input stream and writes computation results in the specified format to the specified output path.
Prerequisites
Before you begin, ensure that you have:
Activated fully managed Flink. For more information, see Activate Realtime Compute for Apache Flink. After activation, the workspace appears on the Fully Managed Flink tab within 5 to 10 minutes.
Created an SQL job using Ververica Runtime (VVR) 8.0.1 or later as the Flink compute engine. For more information, see Create a job.
Limitations
OSS and OSS-HDFS services must be in the same Alibaba Cloud account as the Flink workspace.
When writing to OSS, row store formats are not supported: Avro, CSV, JSON, and raw. For more information, see FLINK-30635.
Step 1: Open the SQL editor
Log on to the Realtime Compute for Apache Flink management console.
Find the target workspace and click Console in the Actions column.
In the left navigation menu, choose Development > ETL.
Step 2: Write the DDL and DML code
In the SQL editor, define your source and sink tables using the filesystem connector, then write a DML statement to move data between them.
The following example reads data from the dir path in the srcbucket bucket and writes it to the test path in the destbucket bucket. Both source and sink tables use Parquet format.
To use this code with OSS-HDFS, make sure that the OSS-HDFS service is enabled for bothsrcbucketanddestbucket.
CREATE TEMPORARY TABLE source_table (
`file.name` STRING NOT NULL,
`file.path` STRING NOT NULL METADATA -- metadata column: full file path
) WITH (
'connector' = 'filesystem', -- required: use the FileSystem connector
'path' = 'oss://srcbucket/dir/', -- required: oss://<bucket>/<path>/
'format' = 'parquet' -- required: file format
);
CREATE TEMPORARY TABLE target_table (
`name` STRING,
`path` STRING
) WITH (
'connector' = 'filesystem', -- required: use the FileSystem connector
'path' = 'oss://destbucket/test/', -- required: oss://<bucket>/<path>/
'format' = 'parquet' -- required: file format
);
INSERT INTO target_table SELECT * FROM source_table;The source table supports metadata columns that expose file-level attributes, such as file.path and file.name. For the full list of metadata columns and WITH parameters, see OSS connector.
Step 3: Validate and deploy the job
Click Save.
Click Advanced Check. Advanced Check examines SQL semantics, network connectivity, and the metadata of tables used by the job. In the results area, click SQL Optimization to view potential SQL issues and optimization suggestions.
Click Deploy to deploy the job to the production environment.
Step 4: (OSS-HDFS only) Configure the OSS-HDFS connection
Skip this step if you are connecting to standard OSS.
Click the job name to open the job details.
On the Deployment Details tab, go to the Running Parameter Configuration section.
Enter the following configuration, then click Save.
fs.oss.jindo.buckets: srcbucket;destbucket
fs.oss.jindo.accessKeyId: LTAI****************
fs.oss.jindo.accessKeySecret: yourAccessKeySecret
fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.comAll four parameters are required when connecting to OSS-HDFS.
| Parameter | Required | Description |
|---|---|---|
fs.oss.jindo.buckets | Yes | Names of the source and sink buckets, separated by a semicolon (;). Example: srcbucket;destbucket. |
fs.oss.jindo.accessKeyId | Yes | AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user. For more information, see View the AccessKey information of a RAM user. |
fs.oss.jindo.accessKeySecret | Yes | AccessKey secret. The secret is only shown when you create the AccessKey pair and cannot be retrieved later. For more information, see Create an AccessKey pair. |
fs.oss.jindo.endpoint | Yes | OSS-HDFS service endpoint. Example: cn-hangzhou.oss-dls.aliyuncs.com. |
Step 5: Start the job and verify the output
On the Job O&M page, click Start and wait for the job to enter the Running state.
Verify the output in the OSS console:
For OSS: view data on the OSS tab of the file list.
For OSS-HDFS: view data on the HDFS tab of the file list.
What's next
To learn about all available connector parameters and supported formats, see OSS connector.