All Products
Search
Document Center

MaxCompute:Use Flink to write data to MaxCompute

Last Updated:Sep 17, 2025

MaxCompute provides a new Flink connector plugin. You can use this plugin to write data from Flink to standard and Delta tables in MaxCompute. This topic describes the capabilities of the new Flink connector and the main procedures for writing data to MaxCompute.

Background information

  • Supported write modes

    The new Flink connector supports writing data to MaxCompute in upsert or insert mode. In upsert mode, data can be written in one of the following two ways:

    • Group by primary key

    • Group by partition field

      If the table has many partitions, you can group data by the partition field. Note that this method may cause data skew.

  • For more information about the data writing process and recommended parameter configurations for the Flink connector in upsert mode, see Real-time data ingestion into a data warehouse.

  • When you configure the Flink job to write data to MaxCompute, you can set Flink connector parameters to specify a write mode. For a complete list of connector parameters, see Appendix: All parameters of the new Flink connector.

  • Set the checkpoint interval for Flink upsert jobs to more than 3 minutes. Otherwise, write efficiency may be low and many small files may be generated.

  • The following table maps the field data types between MaxCompute and Realtime Compute for Apache Flink.

    Flink data type

    MaxCompute data type

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    Note

    The Flink TIMESTAMP data type does not include a time zone, while the MaxCompute TIMESTAMP data type does. This difference causes an 8-hour time discrepancy. Use TIMESTAMP_LTZ(9) to align the timestamps.

    -- FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT 'Creation time',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Write data from a self-managed open source Flink cluster to MaxCompute

  1. Preparations: Create a MaxCompute table.

    First, create a MaxCompute table to which you can write Flink data. The following example shows how to create a non-partitioned Delta table and a partitioned Delta table to demonstrate the main process for writing Flink data to MaxCompute. For information about table properties, see Delta table parameters.

    -- Create a non-partitioned Delta table.
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    -- Create a partitioned Delta table.
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. Build an open source Flink cluster. Flink versions 1.13, 1.15, 1.16, and 1.17 are supported. Select the Flink connector that corresponds to your Flink version:

    Note
    • The Flink connector for version 1.16 can be used for Flink 1.17.

    • This topic uses Flink Connector 1.13 as an example. Download the package to your local environment and decompress it.

  3. Download the Flink connector and add it to the Flink cluster package.

    1. Download the Flink connector JAR package to your local environment.

    2. Add the Flink connector JAR package to the lib directory of the decompressed Flink installation package.

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Start the Flink service.

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Start the Flink SQL client.

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Create a Flink table and configure the Flink connector parameters.

    You can create a Flink table and configure parameters using either Flink SQL or the Flink DataStream API. The following sections provide core examples for both methods.

    Use Flink SQL

    1. Go to the Flink SQL editor and run the following commands to create tables and configure parameters.

      -- Register a corresponding non-partitioned table in Flink SQL.
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
      
      -- Register a corresponding partitioned table in Flink SQL.
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
    2. Write data to the Flink tables and query the MaxCompute tables to verify that the data was written successfully.

      -- Insert data into the non-partitioned table in the Flink SQL client.
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      -- Query the data in MaxCompute and check the result.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the non-partitioned table in the Flink SQL client.
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      -- Query the data in MaxCompute and check the result.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the partitioned table in the Flink SQL client.
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      -- Query the data in MaxCompute and check the result.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Insert data into the partitioned table in the Flink SQL client.
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      -- Query the data in MaxCompute and check the result.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    Use the DataStream API

    1. When you use the DataStream API, first add the following dependency.

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      Note

      Replace xxx with the actual version number.

    2. The following sample code shows how to create a table and configure parameters.

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Write data from fully managed Flink on Alibaba Cloud to MaxCompute

  1. Preparations: Create a MaxCompute table.

    First, create a MaxCompute table to which you can write Flink data. The following example shows how to create a Delta table.

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. Log on to the Realtime Compute for Apache Flink console and view the Flink connector information. The Flink connector is pre-loaded on the Ververica Platform (VVP) for fully managed Flink on Alibaba Cloud.

  3. Use a Flink SQL job to create a Flink table and generate real-time Flink data. After you develop the job, you can deploy it.

    On the Flink job development page, create and edit a Flink SQL job. The following example creates a Flink source table and a temporary Flink sink table. This example also includes logic to automatically generate real-time data and write it to the source table. The job logic then writes the data from the source table to the temporary sink table. For more information about SQL job development, see Job development map.

    -- Create a Flink source table.
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- Create a temporary Flink sink table.
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI****************',
        		'odps.access.key'='********************',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Flink computing logic
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    Where:

    odps.end.point: Use the internal network endpoint for the corresponding region.

    upsert.write.bucket.num: The value of this parameter must be the same as the value of the write.bucket.num property of the Delta table in MaxCompute.

  4. Query the data in MaxCompute to verify that the Flink data was written successfully.

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    -- Result: Because the Flink data is randomly generated, your query result in MaxCompute may differ from the example.
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

Appendix: All parameters of the new Flink connector

  • Basic parameters

    Parameter

    Required

    Default value

    Description

    connector

    Yes

    None

    The connector type. Set this to MaxCompute.

    odps.project.name

    Yes

    None

    The name of the MaxCompute project.

    odps.access.id

    Yes

    None

    The AccessKey ID of your Alibaba Cloud account. You can view this information on the AccessKey Pair page.

    odps.access.key

    Yes

    None

    The AccessKey secret of your Alibaba Cloud account. You can view this information on the AccessKey Pair page.

    odps.end.point

    Yes

    None

    The endpoint of MaxCompute. For the endpoints of MaxCompute in different regions, see Endpoints.

    odps.tunnel.end.point

    No

    None

    The public endpoint of the Tunnel service. If you do not configure a Tunnel endpoint, traffic is automatically routed to the Tunnel endpoint that corresponds to the network where the MaxCompute service is located. If you configure a Tunnel endpoint, traffic is routed to the specified endpoint and automatic routing is disabled.

    For the Tunnel endpoints for different regions and network types, see Endpoints.

    odps.tunnel.quota.name

    No

    None

    The name of the Tunnel quota used to access MaxCompute.

    table.name

    Yes

    None

    The name of the MaxCompute table. The format is [project.][schema.]table.

    odps.namespace.schema

    No

    false

    Specifies whether to use the three-layer model. For more information about the three-layer model, see Schema operations.

    sink.operation

    Yes

    insert

    The write mode. Valid values: insert and upsert.

    Note

    Only MaxCompute Delta tables support upsert writes.

    sink.parallelism

    No

    None

    The degree of parallelism for writes. If this is not set, the degree of parallelism of the input data is used by default.

    Note

    Make sure that the value of the write.bucket.num table property is an integer multiple of this parameter's value. This ensures optimal write performance and saves the most memory on the sink node.

    sink.meta.cache.time

    No

    400

    The size of the metadata cache.

    sink.meta.cache.expire.time

    No

    1200

    The timeout period for the metadata cache in seconds (s).

    sink.coordinator.enable

    No

    Yes.

    Specifies whether to enable coordinator mode.

  • Partition parameters

    Parameter

    Required

    Default value

    Description

    sink.partition

    No

    None

    The name of the partition to write to.

    If you use dynamic partitioning, this is the name of the parent partition of the dynamic partition.

    sink.partition.default-value

    No

    __DEFAULT_PARTITION__

    The default partition name to use for dynamic partitioning.

    sink.dynamic-partition.limit

    No

    100

    When writing to dynamic partitions, this is the maximum number of partitions that can be imported at the same time in a single checkpoint.

    Note

    Do not significantly increase this value. Writing to too many partitions at the same time can cause an out-of-memory (OOM) error on the sink node. If the number of concurrent partitions for writing exceeds the threshold, the write job fails.

    sink.group-partition.enable

    No

    false

    When writing to dynamic partitions, specifies whether to group data by partition.

    sink.partition.assigner.class

    No

    None

    The PartitionAssigner implementation class.

  • Parameters for writing in FileCached mode

    If you have many dynamic partitions, you can use the file cache mode. You can use the following parameters to configure the cache file information for data writing.

    Parameter

    Required

    Default value

    Description

    sink.file-cached.enable

    No

    false

    Specifies whether to enable writing in FileCached mode. Valid values:

    • false: Disables the mode.

    • true: Enables the mode.

      Note

      Use the file cache mode when there are many dynamic partitions.

    sink.file-cached.tmp.dirs

    No

    ./local

    The default directory for cached files in file cache mode.

    sink.file-cached.writer.num

    No

    16

    The number of concurrent threads for a single task to upload data in file cache mode.

    Note

    Do not significantly increase this value. Writing to too many partitions at the same time can cause an OOM error.

    sink.bucket.check-interval

    No

    60000

    The interval to check the file size in file cache mode. Unit: milliseconds (ms).

    sink.file-cached.rolling.max-size

    No

    16 M

    The maximum size of a single cache file in file cache mode.

    If a file exceeds this size, its data is uploaded to the server.

    sink.file-cached.memory

    No

    64 M

    The maximum size of off-heap memory used for writing files in file cache mode.

    sink.file-cached.memory.segment-size

    No

    128 KB

    The size of the buffer used for writing files in file cache mode.

    sink.file-cached.flush.always

    No

    true

    In file cache mode, specifies whether to use a cache when writing files.

    sink.file-cached.write.max-retries

    No

    3

    The number of retries for uploading data in file cache mode.

  • Parameters for insert or upsert writes

    Upsert write parameters

    Parameter

    Required

    Default value

    Description

    upsert.writer.max-retries

    No

    3

    The number of retries if an Upsert Writer fails to write to a bucket.

    upsert.writer.buffer-size

    No

    64 m

    The cache size for a single Upsert Writer in Flink.

    Note
    • When the total buffer size of all buckets reaches the threshold, the system automatically flushes the data to the server.

    • A single Upsert Writer can write to multiple buckets at the same time. Increase this value to improve write efficiency.

    • If you write to many partitions, there is a risk of OOM errors. In this case, consider decreasing this value.

    upsert.writer.bucket.buffer-size

    No

    1 m

    The cache size for a single bucket in Flink. If the Flink server is low on memory resources, you can decrease this value.

    upsert.write.bucket.num

    Yes

    None

    The number of buckets for the sink table. This value must be the same as the value of the write.bucket.num property of the sink table.

    upsert.write.slot-num

    No

    1

    The number of Tunnel slots used by a single session.

    upsert.commit.max-retries

    No

    3

    The number of retries for an upsert session commit.

    upsert.commit.thread-num

    No

    16

    The degree of parallelism for upsert session commits.

    Do not set this parameter to a large value. A high number of concurrent commits increases resource consumption, which may cause performance issues or excessive resource usage.

    upsert.major-compact.min-commits

    No

    100

    The minimum number of commits required to trigger a major compaction.

    upsert.commit.timeout

    No

    600

    The timeout period for an upsert session commit to wait. Unit: seconds (s).

    upsert.major-compact.enable

    No

    false

    Specifies whether to enable major compaction.

    upsert.flush.concurrent

    No

    2

    The maximum number of buckets that can be written to in a single partition at the same time.

    Note

    Each time data in a bucket is flushed, a Tunnel slot is occupied.

    Note

    For more information about recommended parameter configurations for upsert writes, see Recommended parameter configurations for upsert writes.

    Insert write parameters

    Parameter

    Required

    Default value

    Description

    insert.commit.thread-num

    No

    16

    The degree of parallelism for commit sessions.

    insert.arrow-writer.enable

    No

    false

    Specifies whether to use the Arrow format.

    insert.arrow-writer.batch-size

    No

    512

    The maximum number of rows in an Arrow batch.

    insert.arrow-writer.flush-interval

    No

    100000

    The interval at which the writer flushes data. Unit: milliseconds (ms).

    insert.writer.buffer-size

    No

    64 M

    The cache size for the buffered writer.