All Products
Search
Document Center

MaxCompute:Use Flink (streaming data transfer in the new version)

Last Updated:Mar 18, 2024

MaxCompute provides a new version of the Flink connector plug-in. The Flink connector plug-in can be used to write data from Flink to MaxCompute standard tables and Transaction Table 2.0 tables. This facilitates data writing from Flink to MaxCompute. This topic describes how to use the new version of the Flink connector to write data from Flink to MaxCompute.

Background information

  • Write modes supported by the new version of the Flink connector

    The new version of the Flink connector allows you to execute the UPSERT or INSERT statement to write data from Flink to MaxCompute. If the UPSERT statement is executed to write data, data can be grouped by one of the following items:

    • Primary key

    • Partition field

      Note

      If the table to which data is written contains a large number of partitions, you can specify partition fields to group data. However, this may result in data skew.

When you configure parameters for writing data from Flink to MaxCompute, you can configure parameters of the Flink connector to specify the write mode. For more information about the parameters of the Flink connector, see Appendix: Parameters of the Flink connector of the new version.

  • We recommend that you set the checkpoint interval to more than 3 minutes for a deployment that executes the UPSERT statement to write data from Flink to MaxCompute. If the interval is set to an excessively small value, the write efficiency may not meet business requirements and a large number of small files may be generated.

  • Mappings between field data types of MaxCompute and Realtime Compute for Apache Flink

    Data type of Realtime Compute for Apache Flink

    Data type of MaxCompute

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIMEZONE and TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIMEZONE and TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

Write data from self-managed open source Flink to MaxCompute

  1. Create a MaxCompute table.

    You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to write data from Flink to MaxCompute. In this example, a Transaction Table 2.0 non-partitioned table of MaxCompute and a Transaction Table 2.0 partitioned table of MaxCompute are created. For more information about the configuration of table properties, see Parameters for Transaction Table 2.0 tables.

    Note

    Transaction Table 2.0 is in invitational preview. By default, you cannot directly use Transaction Table 2.0. To use this feature, click Free Trial on the page for the application of trial use of new features to apply for enabling this feature. For more information about Transaction Table 2.0, see Overview of Transaction Table 2.0.

    -- Create a Transaction Table 2.0 non-partitioned table.
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    -- Create a Transaction Table 2.0 partitioned table.
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. Build an open source Flink cluster. Open source Flink 1.13, 1.15, 1.16, and 1.17 are supported. You can select a version based on your business requirements.

    In this example, Flink 1.13 is used. You can click the download link of Flink 1.13 to download the package of Flink 1.13 to your on-premises environment and then decompress the package.

  3. Download the package of the Flink connector and add the package of the Flink connector to the Flink cluster package.

    1. Click the download link of the package of the Flink connector to download the JAR package of the Flink connector to your on-premises environment.

    2. Add the JAR package of the Flink connector to the lib directory of the Flink installation package that is decompressed.

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Start the Flink service.

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Start the Flink SQL client.

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Create Flink tables and configure the parameters of the Flink connector.

    You can directly use the Flink SQL client to create Flink tables and configure parameters. You can also use the DataStream API of Flink to perform related operations. The following sample code provides examples of the operations.

    Use the Flink SQL client

    1. Go to the code editor of the Flink SQL client and execute the following statements to create tables and configure parameters.

      -- Create a non-partitioned table that corresponds to the created MaxCompute non-partitioned table on the Flink SQL client.
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- Create a partitioned table that corresponds to the created MaxCompute partitioned table on the Flink SQL client.
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. Write data to the Flink tables and query data in the MaxCompute tables to check whether the Flink data is written to MaxCompute.

      -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client.
      Insert into mf_flink values (1,'Danny',27, false);
      -- Query data in the MaxCompute table and view the returned results.
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client.
      Insert into mf_flink values (1,'Danny',28, false);
      -- Query data in the MaxCompute table and view the returned results.
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client.
      Insert into mf_flink_part values (1,'Danny',27, false, '01','01');
      -- Query data in the MaxCompute table and view the returned results.
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client.
      Insert into mf_flink_part values (1,'Danny',30, false, '01','01');
      -- Query data in the MaxCompute table and view the returned results.
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    Use the DataStream API

    1. Before you use the DataStream API, add the following dependency.

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      Note

      Replace xxx in the preceding code with the version of the JAR package.

    2. Write code to create Flink tables and configure parameters. The following sample code provides an example.

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Write data from fully managed Flink to MaxCompute

  1. Create a MaxCompute table.

    You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to create a Transaction Table 2.0 table.

    set odps.sql.type.system.odps2=true;
    drop table mf_flink_upsert;
    create table mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      partitioned by (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. Log on to the Realtime Compute for Apache Flink console and add the Flink connector. You must upload the package of the open source Flink connector. The following sample code provides the key parameter. For more information, see Manage custom connectors.

    'connector' = 'maxcompute',
  3. Use a Flink SQL draft to create a Flink table and construct Flink real-time data. After the draft is developed, deploy the draft.

    On the SQL Editor page in the console of fully managed Flink, create and edit a Flink SQL draft. In the following example, a Flink source table and a temporary Flink result table are created, the real-time data generation logic is automatically constructed to write data to the source table, and then the computing logic is used to write data from the source table to the temporary result table. For more information about how to develop an SQL draft, see Develop an SQL draft.

    -- Create a Flink source table. 
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt as CURRENT_TIMESTAMP
    ) with (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- Create a temporary Flink result table.
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Execute the Flink computing logic.
    insert into test_c_d_g
    select  c1 as c1,
            c2 as c2,
            gt as gt,
            date_format(gt, 'yyyyMMddHH') as ds
    from    fake_src_table;

    Parameters in the WITH clause:

    odps.end.point: Use the endpoint of the classic network of the region.

    upsert.write.bucket.num: Use a value the same as the value of the write.bucket.num parameter that is configured for the Transaction Table 2.0 table created in MaxCompute.

  4. Query data in the MaxCompute table and check whether Flink data is written to MaxCompute.

    select * from mf_flink_upsert where ds=2023061517;
    
    -- View the returned results. The actual returned results in MaxCompute may be different from the data in the following example because Flink data is randomly generated.
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

Appendix: Parameters of the Flink connector of the new version

  • Basic parameters

    Parameter

    Required

    Default value

    Description

    connector

    Yes

    No default value

    The type of the connector. Set the value to MaxCompute.

    odps.project.name

    Yes

    No default value

    The name of the MaxCompute project.

    odps.access.id

    Yes

    No default value

    The AccessKey ID of your Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.

    odps.access.key

    Yes

    No default value

    The AccessKey secret of you Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.

    odps.end.point

    Yes

    No default value

    The endpoint of MaxCompute. For more information about the MaxCompute endpoints of each region, see Endpoints.

    odps.tunnel.end.point

    No

    The public endpoint of MaxCompute Tunnel. If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.

    For more information about the Tunnel endpoints of different network types in each region, see Endpoints.

    odps.tunnel.quota.name

    No

    No default value

    The name of the Tunnel quota that is used to access MaxCompute.

    table.name

    Yes

    No default value

    The name of the MaxCompute table. The table name is in the [project.][schema.]table format.

    odps.namespace.schema

    No

    false

    Specifies whether to use the three-tier model. For more information about the three-tier model, see Schema-related operations.

    sink.operation

    Yes

    insert

    The write mode. Valid values: insert and upsert.

    Note

    Only Transaction Table 2.0 supports data writing by using the UPSERT statement.

    sink.parallelism

    No

    No default value

    The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.

    sink.meta.cache.time

    No

    400

    The size of the metadata that is written to the cache.

    sink.meta.cache.expire.time

    No

    1200

    The cache timeout period for the metadata. Unit: seconds.

    sink.coordinator.enable

    No

    Yes

    Specifies whether to enable the coordinator mode.

  • Partition parameters

    Parameter

    Required

    Default value

    Description

    sink.partition

    No

    No default value

    The name of the partition. If you use dynamic partitioning, the value of this parameter is the name of the parent partition of a dynamic partition.

    sink.partition.default-value

    No

    __DEFAULT_PARTITION__

    The name of the default partition when dynamic partitioning is used.

    sink.dynamic-partition.limit

    No

    100

    The maximum number of partitions to which data can be imported in a single checkpoint when data is written to dynamic partitions.

    sink.group-partition.enable

    No

    false

    Specifies whether to group data by partition when data is written to dynamic partitions.

    sink.partition.assigner.class

    No

    No default value

    The PartitionAssigner implementation class.

  • Parameters for data writing in file cache mode

    If a large number of dynamic partitions exist, you can enable the file cache mode. The following table describes the parameters that need to be configured for data writing in file cache mode.

    Parameter

    Required

    Default value

    Description

    sink.file-cached.enable

    No

    false

    Specifies whether to enable the file cache mode for data writing.

    sink.file-cached.tmp.dirs

    No

    ./local

    The default directory for storing cached files in file cache mode.

    sink.file-cached.writer.num

    No

    16

    The number of threads that are used to upload data in parallel in file cache mode.

    sink.bucket.check-interval

    No

    60000

    The default interval at which the file size is checked in file cache mode. Unit: milliseconds.

    sink.file-cached.rolling.max-size

    No

    16 MB

    The maximum value of a single cached file in file cache mode. If the file size exceeds the value of this parameter, the file data is uploaded to the server.

    sink.file-cached.memory

    No

    64 MB

    The maximum size of off-heap memory used to write data to files in file cache mode.

    sink.file-cached.memory.segment-size

    No

    128 KB

    The size of the buffer used to write data to files in file cache mode.

    sink.file-cached.flush.always

    No

    true

    Specifies whether the cache is used for writing data to files in file cache mode.

    sink.file-cached.write.max-retries

    No

    3

    The number of retries for uploading data in file cache mode.

  • Parameters for data writing by using the INSERT or UPSERT statement

    Parameters for data writing by using the UPSERT statement

    Parameter

    Required

    Default value

    Description

    upsert.writer.max-retries

    No

    3

    The maximum number of writer flush retries allowed when data is written by using the UPSERT statement.

    upsert.writer.buffer-size

    No

    64 MB

    The buffer size of a single writer when data is written by using the UPSERT statement.

    upsert.writer.bucket.buffer-size

    No

    1 MB

    The buffer size of a single bucket when data is written by using the UPSERT statement.

    upsert.write.bucket.num

    Yes

    N/A

    The number of buckets for the table to which data is written. The value of this parameter must be the same as the value of the write.bucket.num parameter that is configured for the table to which data is written.

    upsert.write.slot-num

    No

    1

    The number of Tunnel slots that are used for a single session when data is written by using the UPSERT statement.

    upsert.commit.max-retries

    No

    3

    The maximum number of commit session retries allowed when data is written by using the UPSERT statement.

    upsert.commit.thread-num

    No

    16

    The degree of parallelism of commit sessions when data is written by using the UPSERT statement.

    upsert.major-compact.min-commits

    No

    100

    The minimum number of commits that initiate major compaction when data is written by using the UPSERT statement.

    upsert.commit.timeout

    No

    600

    The timeout period for a commit session when data is written by using the UPSERT statement. Unit: seconds.

    upsert.major-compact.enable

    No

    false

    Specifies whether to enable major compaction when data is written by using the UPSERT statement.

    Parameters for data writing by using the INSERT statement

    Parameter

    Required

    Default value

    Description

    insert.commit.thread-num

    No

    16

    The degree of parallelism of commit sessions when data is written by using the INSERT statement.

    insert.arrow-writer.enable

    No

    false

    Specifies whether to use the Arrow format when data is written by using the INSERT statement.

    insert.arrow-writer.batch-size

    No

    512

    The maximum number of rows in a batch of Arrow-formatted data when data is written by using the INSERT statement.

    insert.arrow-writer.flush-interval

    No

    100000

    The interval at which a writer flushes data when data is written by using the INSERT statement. Unit: milliseconds.

    insert.writer.buffer-size

    No

    64 MB

    The cache size for the buffered writer when data is written by using the INSERT statement.