All Products
Search
Document Center

Realtime Compute for Apache Flink:OSS connector

Last Updated:Feb 02, 2024

This topic describes how to use the Object Storage Service (OSS) connector.

Alibaba Cloud OSS is a secure and cost-effective object storage service that offers a data durability of 99.9999999999% (twelve 9's) and a data availability of 99.995%. OSS provides multiple storage classes to help you manage and reduce storage costs. The following table describes the capabilities supported by the OSS connector.

Item

Description

Table type

Source table and result table

Running mode

Batch mode and streaming mode

Data format

ORC, PARQUET, AVRO, CSV, JSON, and RAW

Note

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later can read data in the Parquet format.

Metric

None

API type

DataStream API and SQL API.

Data update or deletion in a result table

Data in a result table cannot be updated or deleted. Data can only be inserted into a result table.

Limits

  • General limits

    • Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later can read data from or write data to OSS.

    • You can use Realtime Compute for Apache Flink to read data from or write data to only OSS buckets within your Alibaba Cloud account.

  • Limits only on result tables

    • Data in row-oriented storage formats, such as AVRO, CSV, JSON, and RAW, cannot be written to OSS. For more information, see FLINK-30635.

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.6 or later can write data to OSS-HDFS. For more information, see Write data to OSS-HDFS.

Syntax

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

Metadata columns

You can specify a metadata column in an OSS source table to obtain the related metadata of OSS. For example, if you specify a metadata column named file.path in an OSS source table, the value of the column is the path of the file in which each row of data is stored. Sample code:

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

The following table describes the metadata columns supported by OSS source tables.

Key

Data type

Description

file.path

STRING NOT NULL

The path of the file in which each row of data is stored.

file.name

STRING NOT NULL

The name of the file in which each row of data is stored. The file name is the farthest element from the root path of the file.

file.size

BIGINT NOT NULL

The number of bytes in the file in which each row of data is stored.

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

The time when the file in which each row of data is stored was modified.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to filesystem.

    path

    The Uniform Resource Identifier (URI) of the file system.

    STRING

    Yes

    No default value

    Example: oss://my_bucket/my_path.

    format

    The format of the file.

    STRING

    Yes

    No default value

    Valid values:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    source.monitor-interval

    The interval at which the source table monitors the generation of new files. The value of this parameter must be greater than 0.

    DURATION

    No

    No default value

    If you do not configure this parameter, the specified path is scanned only once. In this case, the data source is bounded.

    Each file is uniquely identified by the path of the file. Each time a new file is detected, the file is processed.

    The processed files are stored in the state during the lifecycle of the data source. Therefore, the state of the data source is saved when checkpoints and savepoints are generated. If you set this parameter to a small value, new files can be quickly detected but the file system or OSS is frequently traversed.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    partition.default-name

    The name of the partition when the value of the partition field is null or an empty string.

    STRING

    No

    _DEFAULT_PARTITION__

    N/A.

    sink.rolling-policy.file-size

    The maximum size of a file before the file is rolled.

    MemorySize

    No

    128 MB

    The data that is written to the specified directory is split and saved in PART files. Each subtask in which a sink operator of a partition receives the data of the partition generates at least one PART file. Files are rolled based on the rolling policy that is configured. If the current PART file that is in the in-progress state is to be closed, a new PART file is generated. A PART file is rolled based on the size of the file and the maximum period of time for which the specified file can be open.

    sink.rolling-policy.rollover-interval

    The maximum period of time for which a PART file is open before the file is rolled.

    DURATION

    No

    30 minutes

    The check frequency is specified by the sink.rolling-policy.check-interval parameter.

    sink.rolling-policy.check-interval

    The interval at which Flink checks whether a file needs to be rolled based on the rolling policy.

    DURATION

    No

    1 minute

    This parameter specifies whether the specified file needs to be rolled based on the value of the sink.rolling-policy.rollover-interval parameter.

    compaction.file-size

    The size of the destination file into which temporary files are merged.

    MemorySize

    No

    128 MB

    The default value is the same as the value of the sink.rolling-policy.file-size parameter.

    sink.partition-commit.trigger

    The type of the partition commit trigger.

    STRING

    No

    process-time

    Flink provides the following types of partition commit triggers that can be used to write data to a partitioned table:

    • process-time: This partition commit trigger is used based on the partition creation time and the current system time and does not require a partition time extractor or a watermark generator. If the current system time passes the sum of the partition creation time and the time specified by the sink.partition-commit.delay parameter, the process-time trigger immediately commits a partition. This trigger is suitable for general purposes but does not ensure accuracy. For example, if a data delay or a failure occurs, this trigger may commit a partition earlier than expected.

    • partition-time: This partition commit trigger is used based on the extracted partition creation time and requires a watermark generator. If you want to use this trigger for a deployment, make sure that the deployment supports watermark generation and data is partitioned by time such as hour or day. If the time specified by the watermark passes the sum of the partition creation time and the time specified by the sink.partition-commit.delay parameter, the partition-time trigger immediately commits a partition.

    sink.partition-commit.delay

    The maximum delay that is allowed before a partition can be committed. The partition is not committed until the delay time is reached.

    DURATION

    No

    0s

    • If data is partitioned by day, you need to set this parameter to 1 d.

    • If data is partitioned by hour, you need to set this parameter to 1 h.

    sink.partition-commit.watermark-time-zone

    The time zone that is used when a watermark of the LONG data type is converted into a watermark of the TIMESTAMP data type. After the conversion, Flink compares the watermark of the TIMESTAMP data type with the partition creation time to determine whether the partition needs to be committed.

    STRING

    No

    UTC

    This parameter takes effect only when the sink.partition-commit.trigger parameter is set to partition-time.

    • The configuration of the sink.partition-commit.watermark-time-zone parameter has an impact on partition commitment. For example, if you specify ROWTIME for a TIMESTAMP_LTZ column in the source table and do not configure this parameter, the partition may be committed several hours later than the expected time. The default value UTC indicates that a watermark is defined on a TIMESTAMP column or no watermark is defined.

    • If a watermark is defined on a TIMESTAMP_LTZ column, the time zone of the watermark must be the session time zone. The valid value for this parameter can be a full name of a time zone such as 'America/Los_Angeles' or a custom time zone such as 'GMT-08:00'.

    partition.time-extractor.kind

    The time extractor that extracts time from a partition field.

    STRING

    No

    default

    Valid values:

    • default: By default, you can configure the timestamp pattern or formatter. This is the default value.

    • custom: The extractor class must be specified.

    partition.time-extractor.class

    The extractor class that implements the PartitionTimeExtractor interface.

    STRING

    No

    No default value

    N/A.

    partition.time-extractor.timestamp-pattern

    The default construction method that allows you to use partition fields to obtain a valid timestamp pattern.

    STRING

    No

    No default value

    By default, the first field is extracted in the yyyy-MM-dd hh:mm:ss format.

    • If you want to extract the timestamp of a partition from the dt partition field, you can set this parameter to $dt.

    • If you want to extract the timestamp of a partition from multiple partition fields, such as year, month, day, and hour, you can set this parameter to $year-$month-$day $hour:00:00.

    • If you want to extract the timestamp of a partition from the dt and hour partition fields, you can set this parameter to $dt $hour:00:00.

    partition.time-extractor.timestamp-formatter

    The formatter that is used to convert a partition timestamp string value into a timestamp. The partition timestamp string value is specified by the partition.time-extractor.timestamp-pattern parameter.

    STRING

    No

    yyyy-MM-dd HH:mm:ss

    For example, if you want to extract the timestamp of a partition from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern parameter to $year$month$day and the partition.time-extractor.timestamp-formatter parameter to yyyyMMdd. The default value of this parameter is yyyy-MM-dd HH:mm:ss. The timestamp formatter specified by this parameter is compatible with DateTimeFormatter of Java.

    sink.partition-commit.policy.kind

    The type of the partition commit policy.

    STRING

    No

    No default value

    The partition commit policy allows Flink to notify a downstream application that data writing to a partition is complete and data can be read from the partition. Valid values:

    • success-file: The _success file is added to the specified directory.

    • custom: A commit policy is created based on the specified class. You can specify multiple commit policies at the same time.

    sink.partition-commit.policy.class

    The partition commit policy class that implements the PartitionCommitPolicy interface.

    STRING

    No

    No default value

    This class is available only when the sink.partition-commit.policy.kind parameter is set to custom.

    sink.partition-commit.success-file.name

    The name of the file that is used if the sink.partition-commit.policy.kind parameter is set to success-file.

    STRING

    No

    _SUCCESS

    N/A.

    sink.parallelism

    The degree of parallelism for writing files to the file system.

    INTEGER

    No

    No default value

    By default, the value of the sink.parallelism parameter is the same as the degree of parallelism of the upstream chained operators. If the value of the sink.parallelism parameter is different from the degree of parallelism of the upstream chained operators, the operator that writes files uses the value of this parameter. If file merging is enabled, the operator that merges files also uses the value of this parameter.

    Note

    The value must be greater than 0. Otherwise, an error occurs.

Write data to OSS-HDFS

Add the following configurations to the Additional Configuration section of the Advanced tab in the console of fully managed Flink:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

The following table describes the parameters.

Parameter

Description

fs.oss.jindo.buckets

The name of the bucket of the OSS-HDFS service to which data is written. You can specify multiple bucket names. Separate the bucket names with semicolons (;). When Flink writes data to an OSS path, the data is written to the OSS-HDFS service if the related bucket name is contained in the value of the fs.oss.jindo.buckets parameter.

fs.oss.jindo.accessKeyId

The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain the AccessKey ID of an Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user.

fs.oss.jindo.accessKeySecret

The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret, see View the information about AccessKey pairs of a RAM user.

You must also configure the endpoint of the OSS-HDFS service. You can use one of the following methods to configure the endpoint of the OSS-HDFS service:

  • Add the following configuration to the Additional Configuration section of the Advanced tab in the console of fully managed Flink:

    fs.oss.jindo.endpoint: xxx
  • Configure the endpoint of the OSS-HDFS service in the OSS path.

    oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>

    In the OSS path, user-defined-oss-hdfs-bucket specifies the name of the related bucket and oss-hdfs-endpoint specifies the endpoint of the OSS-HDFS service. The value of the fs.oss.jindo.buckets parameter must contain <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.

    For example, if the bucket name is jindo-test and the endpoint of the OSS-HDFS service is

    cn-beijing.oss-dls.aliyuncs.com, the OSS path must be oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, and the value of the fs.oss.jindo.buckets parameter must contain jindo-test.cn-beijing.oss-dls.aliyuncs.com.

Sample code

  • Sample code for a source table

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • Sample code for a result table

    • Write data to a partitioned table

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- Use the time in milliseconds.
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Specify a watermark in a TIMESTAMP_LTZ column.
      ) WITH (
        'connector' = 'datagen'
      );
      
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- The time zone is 'Asia/Shanghai'.
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      
      -- Execute the following streaming SQL statement to insert data into a file system table.
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • Write data to a non-partitioned table

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;