MaxCompute provides a new version of the Flink connector plug-in. The Flink connector plug-in can be used to write data from Flink to MaxCompute standard tables and Transaction Table 2.0 tables. This facilitates data writing from Flink to MaxCompute. This topic describes how to use the new version of the Flink connector to write data from Flink to MaxCompute.
Background information
Write modes supported by the new version of the Flink connector
The new version of the Flink connector allows you to execute the UPSERT or INSERT statement to write data from Flink to MaxCompute. If the UPSERT statement is executed to write data, data can be grouped by one of the following items:
Primary key
Partition field
NoteIf the table to which data is written contains a large number of partitions, you can specify partition fields to group data. However, this may result in data skew.
When you configure parameters for writing data from Flink to MaxCompute, you can configure parameters of the Flink connector to specify the write mode. For more information about the parameters of the Flink connector, see Appendix: Parameters of the Flink connector of the new version.
We recommend that you set the checkpoint interval to more than 3 minutes for a deployment that executes the UPSERT statement to write data from Flink to MaxCompute. If the interval is set to an excessively small value, the write efficiency may not meet business requirements and a large number of small files may be generated.
Mappings between field data types of MaxCompute and Realtime Compute for Apache Flink
Data type of Realtime Compute for Apache Flink
Data type of MaxCompute
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIMEZONE and TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIMEZONE and TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
Write data from self-managed open source Flink to MaxCompute
Create a MaxCompute table.
You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to write data from Flink to MaxCompute. In this example, a Transaction Table 2.0 non-partitioned table of MaxCompute and a Transaction Table 2.0 partitioned table of MaxCompute are created. For more information about the configuration of table properties, see Parameters for Transaction Table 2.0 tables.
NoteTransaction Table 2.0 is in invitational preview. By default, you cannot directly use Transaction Table 2.0. To use this feature, click Free Trial on the page for the application of trial use of new features to apply for enabling this feature. For more information about Transaction Table 2.0, see Overview of Transaction Table 2.0.
-- Create a Transaction Table 2.0 non-partitioned table. CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; -- Create a Transaction Table 2.0 partitioned table. CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
Build an open source Flink cluster. Open source Flink 1.13, 1.15, 1.16, and 1.17 are supported. You can select a version based on your business requirements.
In this example, Flink 1.13 is used. You can click the download link of Flink 1.13 to download the package of Flink 1.13 to your on-premises environment and then decompress the package.
Download the package of the Flink connector and add the package of the Flink connector to the Flink cluster package.
Click the download link of the package of the Flink connector to download the JAR package of the Flink connector to your on-premises environment.
Add the JAR package of the Flink connector to the lib directory of the Flink installation package that is decompressed.
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Start the Flink service.
cd $FLINK_HOME/bin ./start-cluster.sh
Start the Flink SQL client.
cd $FLINK_HOME/bin ./sql-client.sh
Create Flink tables and configure the parameters of the Flink connector.
You can directly use the Flink SQL client to create Flink tables and configure parameters. You can also use the DataStream API of Flink to perform related operations. The following sample code provides examples of the operations.
Use the Flink SQL client
Go to the code editor of the Flink SQL client and execute the following statements to create tables and configure parameters.
-- Create a non-partitioned table that corresponds to the created MaxCompute non-partitioned table on the Flink SQL client. CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Create a partitioned table that corresponds to the created MaxCompute partitioned table on the Flink SQL client. CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
Write data to the Flink tables and query data in the MaxCompute tables to check whether the Flink data is written to MaxCompute.
-- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client. Insert into mf_flink values (1,'Danny',27, false); -- Query data in the MaxCompute table and view the returned results. select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client. Insert into mf_flink values (1,'Danny',28, false); -- Query data in the MaxCompute table and view the returned results. select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client. Insert into mf_flink_part values (1,'Danny',27, false, '01','01'); -- Query data in the MaxCompute table and view the returned results. select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client. Insert into mf_flink_part values (1,'Danny',30, false, '01','01'); -- Query data in the MaxCompute table and view the returned results. select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
Use the DataStream API
Before you use the DataStream API, add the following dependency.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>
NoteReplace xxx in the preceding code with the version of the JAR package.
Write code to create Flink tables and configure parameters. The following sample code provides an example.
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Write data from fully managed Flink to MaxCompute
Create a MaxCompute table.
You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to create a Transaction Table 2.0 table.
set odps.sql.type.system.odps2=true; drop table mf_flink_upsert; create table mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) partitioned by (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
Log on to the Realtime Compute for Apache Flink console and add the Flink connector. You must upload the package of the open source Flink connector. The following sample code provides the key parameter. For more information, see Manage custom connectors.
'connector' = 'maxcompute',
Use a Flink SQL draft to create a Flink table and construct Flink real-time data. After the draft is developed, deploy the draft.
On the SQL Editor page in the console of fully managed Flink, create and edit a Flink SQL draft. In the following example, a Flink source table and a temporary Flink result table are created, the real-time data generation logic is automatically constructed to write data to the source table, and then the computing logic is used to write data from the source table to the temporary result table. For more information about how to develop an SQL draft, see Develop an SQL draft.
-- Create a Flink source table. CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt as CURRENT_TIMESTAMP ) with ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- Create a temporary Flink result table. CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Execute the Flink computing logic. insert into test_c_d_g select c1 as c1, c2 as c2, gt as gt, date_format(gt, 'yyyyMMddHH') as ds from fake_src_table;
Parameters in the WITH clause:
odps.end.point
: Use the endpoint of the classic network of the region.upsert.write.bucket.num
: Use a value the same as the value of the write.bucket.num parameter that is configured for the Transaction Table 2.0 table created in MaxCompute.Query data in the MaxCompute table and check whether Flink data is written to MaxCompute.
select * from mf_flink_upsert where ds=2023061517; -- View the returned results. The actual returned results in MaxCompute may be different from the data in the following example because Flink data is randomly generated. +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
Appendix: Parameters of the Flink connector of the new version
Basic parameters
Parameter
Required
Default value
Description
connector
Yes
No default value
The type of the connector. Set the value to
MaxCompute
.odps.project.name
Yes
No default value
The name of the MaxCompute project.
odps.access.id
Yes
No default value
The AccessKey ID of your Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.
odps.access.key
Yes
No default value
The AccessKey secret of you Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.
odps.end.point
Yes
No default value
The endpoint of MaxCompute. For more information about the MaxCompute endpoints of each region, see Endpoints.
odps.tunnel.end.point
No
The public endpoint of MaxCompute Tunnel. If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.
For more information about the Tunnel endpoints of different network types in each region, see Endpoints.
odps.tunnel.quota.name
No
No default value
The name of the Tunnel quota that is used to access MaxCompute.
table.name
Yes
No default value
The name of the MaxCompute table. The table name is in the
[project.][schema.]table
format.odps.namespace.schema
No
false
Specifies whether to use the three-tier model. For more information about the three-tier model, see Schema-related operations.
sink.operation
Yes
insert
The write mode. Valid values:
insert
andupsert
.NoteOnly Transaction Table 2.0 supports data writing by using the UPSERT statement.
sink.parallelism
No
No default value
The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.
sink.meta.cache.time
No
400
The size of the metadata that is written to the cache.
sink.meta.cache.expire.time
No
1200
The cache timeout period for the metadata. Unit: seconds.
sink.coordinator.enable
No
Yes
Specifies whether to enable the coordinator mode.
Partition parameters
Parameter
Required
Default value
Description
sink.partition
No
No default value
The name of the partition. If you use dynamic partitioning, the value of this parameter is the name of the parent partition of a dynamic partition.
sink.partition.default-value
No
__DEFAULT_PARTITION__
The name of the default partition when dynamic partitioning is used.
sink.dynamic-partition.limit
No
100
The maximum number of partitions to which data can be imported in a single checkpoint when data is written to dynamic partitions.
sink.group-partition.enable
No
false
Specifies whether to group data by partition when data is written to dynamic partitions.
sink.partition.assigner.class
No
No default value
The PartitionAssigner implementation class.
Parameters for data writing in file cache mode
If a large number of dynamic partitions exist, you can enable the file cache mode. The following table describes the parameters that need to be configured for data writing in file cache mode.
Parameter
Required
Default value
Description
sink.file-cached.enable
No
false
Specifies whether to enable the file cache mode for data writing.
sink.file-cached.tmp.dirs
No
./local
The default directory for storing cached files in file cache mode.
sink.file-cached.writer.num
No
16
The number of threads that are used to upload data in parallel in file cache mode.
sink.bucket.check-interval
No
60000
The default interval at which the file size is checked in file cache mode. Unit: milliseconds.
sink.file-cached.rolling.max-size
No
16 MB
The maximum value of a single cached file in file cache mode. If the file size exceeds the value of this parameter, the file data is uploaded to the server.
sink.file-cached.memory
No
64 MB
The maximum size of off-heap memory used to write data to files in file cache mode.
sink.file-cached.memory.segment-size
No
128 KB
The size of the buffer used to write data to files in file cache mode.
sink.file-cached.flush.always
No
true
Specifies whether the cache is used for writing data to files in file cache mode.
sink.file-cached.write.max-retries
No
3
The number of retries for uploading data in file cache mode.
Parameters for data writing by using the
INSERT
orUPSERT
statementParameters for data writing by using the UPSERT statement
Parameter
Required
Default value
Description
upsert.writer.max-retries
No
3
The maximum number of writer flush retries allowed when data is written by using the UPSERT statement.
upsert.writer.buffer-size
No
64 MB
The buffer size of a single writer when data is written by using the UPSERT statement.
upsert.writer.bucket.buffer-size
No
1 MB
The buffer size of a single bucket when data is written by using the UPSERT statement.
upsert.write.bucket.num
Yes
N/A
The number of buckets for the table to which data is written. The value of this parameter must be the same as the value of the
write.bucket.num
parameter that is configured for the table to which data is written.upsert.write.slot-num
No
1
The number of Tunnel slots that are used for a single session when data is written by using the UPSERT statement.
upsert.commit.max-retries
No
3
The maximum number of commit session retries allowed when data is written by using the UPSERT statement.
upsert.commit.thread-num
No
16
The degree of parallelism of commit sessions when data is written by using the UPSERT statement.
upsert.major-compact.min-commits
No
100
The minimum number of commits that initiate major compaction when data is written by using the UPSERT statement.
upsert.commit.timeout
No
600
The timeout period for a commit session when data is written by using the UPSERT statement. Unit: seconds.
upsert.major-compact.enable
No
false
Specifies whether to enable major compaction when data is written by using the UPSERT statement.
Parameters for data writing by using the INSERT statement
Parameter
Required
Default value
Description
insert.commit.thread-num
No
16
The degree of parallelism of commit sessions when data is written by using the INSERT statement.
insert.arrow-writer.enable
No
false
Specifies whether to use the Arrow format when data is written by using the INSERT statement.
insert.arrow-writer.batch-size
No
512
The maximum number of rows in a batch of Arrow-formatted data when data is written by using the INSERT statement.
insert.arrow-writer.flush-interval
No
100000
The interval at which a writer flushes data when data is written by using the INSERT statement. Unit: milliseconds.
insert.writer.buffer-size
No
64 MB
The cache size for the buffered writer when data is written by using the INSERT statement.