MaxCompute provides a new Flink connector plugin. You can use this plugin to write data from Flink to standard and Delta tables in MaxCompute. This topic describes the capabilities of the new Flink connector and the main procedures for writing data to MaxCompute.
Background information
Supported write modes
The new Flink connector supports writing data to MaxCompute in upsert or insert mode. In upsert mode, data can be written in one of the following two ways:
Group by primary key
Group by partition field
If the table has many partitions, you can group data by the partition field. Note that this method may cause data skew.
For more information about the data writing process and recommended parameter configurations for the Flink connector in upsert mode, see Real-time data ingestion into a data warehouse.
When you configure the Flink job to write data to MaxCompute, you can set Flink connector parameters to specify a write mode. For a complete list of connector parameters, see Appendix: All parameters of the new Flink connector.
Set the checkpoint interval for Flink upsert jobs to more than 3 minutes. Otherwise, write efficiency may be low and many small files may be generated.
The following table maps the field data types between MaxCompute and Realtime Compute for Apache Flink.
Flink data type
MaxCompute data type
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
NoteThe Flink TIMESTAMP data type does not include a time zone, while the MaxCompute TIMESTAMP data type does. This difference causes an 8-hour time discrepancy. Use TIMESTAMP_LTZ(9) to align the timestamps.
-- FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT 'Creation time', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Write data from a self-managed open source Flink cluster to MaxCompute
Preparations: Create a MaxCompute table.
First, create a MaxCompute table to which you can write Flink data. The following example shows how to create a non-partitioned Delta table and a partitioned Delta table to demonstrate the main process for writing Flink data to MaxCompute. For information about table properties, see Delta table parameters.
-- Create a non-partitioned Delta table. CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; -- Create a partitioned Delta table. CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Build an open source Flink cluster. Flink versions 1.13, 1.15, 1.16, and 1.17 are supported. Select the Flink connector that corresponds to your Flink version:
NoteThe Flink connector for version 1.16 can be used for Flink 1.17.
This topic uses Flink Connector 1.13 as an example. Download the package to your local environment and decompress it.
Download the Flink connector and add it to the Flink cluster package.
Download the Flink connector JAR package to your local environment.
Add the Flink connector JAR package to the lib directory of the decompressed Flink installation package.
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Start the Flink service.
cd $FLINK_HOME/bin ./start-cluster.shStart the Flink SQL client.
cd $FLINK_HOME/bin ./sql-client.shCreate a Flink table and configure the Flink connector parameters.
You can create a Flink table and configure parameters using either Flink SQL or the Flink DataStream API. The following sections provide core examples for both methods.
Use Flink SQL
Go to the Flink SQL editor and run the following commands to create tables and configure parameters.
-- Register a corresponding non-partitioned table in Flink SQL. CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Register a corresponding partitioned table in Flink SQL. CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );Write data to the Flink tables and query the MaxCompute tables to verify that the data was written successfully.
-- Insert data into the non-partitioned table in the Flink SQL client. INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- Query the data in MaxCompute and check the result. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Insert data into the non-partitioned table in the Flink SQL client. INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- Query the data in MaxCompute and check the result. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Insert data into the partitioned table in the Flink SQL client. INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- Query the data in MaxCompute and check the result. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Insert data into the partitioned table in the Flink SQL client. INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- Query the data in MaxCompute and check the result. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
Use the DataStream API
When you use the DataStream API, first add the following dependency.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>NoteReplace xxx with the actual version number.
The following sample code shows how to create a table and configure parameters.
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Write data from fully managed Flink on Alibaba Cloud to MaxCompute
Preparations: Create a MaxCompute table.
First, create a MaxCompute table to which you can write Flink data. The following example shows how to create a Delta table.
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Log on to the Realtime Compute for Apache Flink console and view the Flink connector information. The Flink connector is pre-loaded on the Ververica Platform (VVP) for fully managed Flink on Alibaba Cloud.
Use a Flink SQL job to create a Flink table and generate real-time Flink data. After you develop the job, you can deploy it.
On the Flink job development page, create and edit a Flink SQL job. The following example creates a Flink source table and a temporary Flink sink table. This example also includes logic to automatically generate real-time data and write it to the source table. The job logic then writes the data from the source table to the temporary sink table. For more information about SQL job development, see Job development map.
-- Create a Flink source table. CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- Create a temporary Flink sink table. CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Flink computing logic INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;Where:
odps.end.point: Use the internal network endpoint for the corresponding region.upsert.write.bucket.num: The value of this parameter must be the same as the value of the write.bucket.num property of the Delta table in MaxCompute.Query the data in MaxCompute to verify that the Flink data was written successfully.
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- Result: Because the Flink data is randomly generated, your query result in MaxCompute may differ from the example. +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
Appendix: All parameters of the new Flink connector
Basic parameters
Parameter
Required
Default value
Description
connector
Yes
None
The connector type. Set this to
MaxCompute.odps.project.name
Yes
None
The name of the MaxCompute project.
odps.access.id
Yes
None
The AccessKey ID of your Alibaba Cloud account. You can view this information on the AccessKey Pair page.
odps.access.key
Yes
None
The AccessKey secret of your Alibaba Cloud account. You can view this information on the AccessKey Pair page.
odps.end.point
Yes
None
The endpoint of MaxCompute. For the endpoints of MaxCompute in different regions, see Endpoints.
odps.tunnel.end.point
No
None
The public endpoint of the Tunnel service. If you do not configure a Tunnel endpoint, traffic is automatically routed to the Tunnel endpoint that corresponds to the network where the MaxCompute service is located. If you configure a Tunnel endpoint, traffic is routed to the specified endpoint and automatic routing is disabled.
For the Tunnel endpoints for different regions and network types, see Endpoints.
odps.tunnel.quota.name
No
None
The name of the Tunnel quota used to access MaxCompute.
table.name
Yes
None
The name of the MaxCompute table. The format is
[project.][schema.]table.odps.namespace.schema
No
false
Specifies whether to use the three-layer model. For more information about the three-layer model, see Schema operations.
sink.operation
Yes
insert
The write mode. Valid values:
insertandupsert.NoteOnly MaxCompute Delta tables support upsert writes.
sink.parallelism
No
None
The degree of parallelism for writes. If this is not set, the degree of parallelism of the input data is used by default.
NoteMake sure that the value of the
write.bucket.numtable property is an integer multiple of this parameter's value. This ensures optimal write performance and saves the most memory on the sink node.sink.meta.cache.time
No
400
The size of the metadata cache.
sink.meta.cache.expire.time
No
1200
The timeout period for the metadata cache in seconds (s).
sink.coordinator.enable
No
Yes.
Specifies whether to enable coordinator mode.
Partition parameters
Parameter
Required
Default value
Description
sink.partition
No
None
The name of the partition to write to.
If you use dynamic partitioning, this is the name of the parent partition of the dynamic partition.
sink.partition.default-value
No
__DEFAULT_PARTITION__
The default partition name to use for dynamic partitioning.
sink.dynamic-partition.limit
No
100
When writing to dynamic partitions, this is the maximum number of partitions that can be imported at the same time in a single checkpoint.
NoteDo not significantly increase this value. Writing to too many partitions at the same time can cause an out-of-memory (OOM) error on the sink node. If the number of concurrent partitions for writing exceeds the threshold, the write job fails.
sink.group-partition.enable
No
false
When writing to dynamic partitions, specifies whether to group data by partition.
sink.partition.assigner.class
No
None
The PartitionAssigner implementation class.
Parameters for writing in FileCached mode
If you have many dynamic partitions, you can use the file cache mode. You can use the following parameters to configure the cache file information for data writing.
Parameter
Required
Default value
Description
sink.file-cached.enable
No
false
Specifies whether to enable writing in FileCached mode. Valid values:
false: Disables the mode.
true: Enables the mode.
NoteUse the file cache mode when there are many dynamic partitions.
sink.file-cached.tmp.dirs
No
./local
The default directory for cached files in file cache mode.
sink.file-cached.writer.num
No
16
The number of concurrent threads for a single task to upload data in file cache mode.
NoteDo not significantly increase this value. Writing to too many partitions at the same time can cause an OOM error.
sink.bucket.check-interval
No
60000
The interval to check the file size in file cache mode. Unit: milliseconds (ms).
sink.file-cached.rolling.max-size
No
16 M
The maximum size of a single cache file in file cache mode.
If a file exceeds this size, its data is uploaded to the server.
sink.file-cached.memory
No
64 M
The maximum size of off-heap memory used for writing files in file cache mode.
sink.file-cached.memory.segment-size
No
128 KB
The size of the buffer used for writing files in file cache mode.
sink.file-cached.flush.always
No
true
In file cache mode, specifies whether to use a cache when writing files.
sink.file-cached.write.max-retries
No
3
The number of retries for uploading data in file cache mode.
Parameters for
insertorupsertwritesUpsert write parameters
Parameter
Required
Default value
Description
upsert.writer.max-retries
No
3
The number of retries if an Upsert Writer fails to write to a bucket.
upsert.writer.buffer-size
No
64 m
The cache size for a single Upsert Writer in Flink.
NoteWhen the total buffer size of all buckets reaches the threshold, the system automatically flushes the data to the server.
A single Upsert Writer can write to multiple buckets at the same time. Increase this value to improve write efficiency.
If you write to many partitions, there is a risk of OOM errors. In this case, consider decreasing this value.
upsert.writer.bucket.buffer-size
No
1 m
The cache size for a single bucket in Flink. If the Flink server is low on memory resources, you can decrease this value.
upsert.write.bucket.num
Yes
None
The number of buckets for the sink table. This value must be the same as the value of the
write.bucket.numproperty of the sink table.upsert.write.slot-num
No
1
The number of Tunnel slots used by a single session.
upsert.commit.max-retries
No
3
The number of retries for an upsert session commit.
upsert.commit.thread-num
No
16
The degree of parallelism for upsert session commits.
Do not set this parameter to a large value. A high number of concurrent commits increases resource consumption, which may cause performance issues or excessive resource usage.
upsert.major-compact.min-commits
No
100
The minimum number of commits required to trigger a major compaction.
upsert.commit.timeout
No
600
The timeout period for an upsert session commit to wait. Unit: seconds (s).
upsert.major-compact.enable
No
false
Specifies whether to enable major compaction.
upsert.flush.concurrent
No
2
The maximum number of buckets that can be written to in a single partition at the same time.
NoteEach time data in a bucket is flushed, a Tunnel slot is occupied.
NoteFor more information about recommended parameter configurations for upsert writes, see Recommended parameter configurations for upsert writes.
Insert write parameters
Parameter
Required
Default value
Description
insert.commit.thread-num
No
16
The degree of parallelism for commit sessions.
insert.arrow-writer.enable
No
false
Specifies whether to use the Arrow format.
insert.arrow-writer.batch-size
No
512
The maximum number of rows in an Arrow batch.
insert.arrow-writer.flush-interval
No
100000
The interval at which the writer flushes data. Unit: milliseconds (ms).
insert.writer.buffer-size
No
64 M
The cache size for the buffered writer.