All Products
Search
Document Center

Realtime Compute for Apache Flink:MaxCompute connector

Last Updated:Dec 22, 2023

This topic describes how to use the MaxCompute connector.

Background information

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?

The following table describes the capabilities supported by the MaxCompute connector.

Item

Description

Table type

Source table, dimension table, and result table.

Running mode

Streaming mode and batch mode.

Data format

N/A.

Metric

  • Metrics for source tables

    numRecordsIn: the total number of data records that are read by using the MaxCompute connector.

    numRecordsInPerSecond: the number of data records that are read by using the MaxCompute connector per second.

    numBytesIn: the total number of bytes of data that is read by using the MaxCompute connector after data is decompressed.

    numBytesInPerSecond: the number of bytes of data that is read by using the MaxCompute connector per second after data is decompressed.

  • Metrics for result tables

    numRecordsOut: the total number of data records that are written by using the MaxCompute connector.

    numRecordsOutPerSecond: the number of data records that are written by using the MaxCompute connector per second.

    numBytesOut: the total number of bytes of data that is written by using the MaxCompute connector before data is compressed.

    numBytesOutPerSecond: the number of bytes of data that is written by using the MaxCompute connector per second before data is compressed.

  • Metrics for dimension tables

    dim.odps.cacheSize: the number of data records that are cached in a dimension table.

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

DataStream API and SQL API.

Data update or deletion in a result table

If MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel is used, data can only be inserted into a result table. If MaxCompute Upsert Tunnel is used, data in a result table can be updated or deleted and data can be inserted into a result table.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.

  • The MaxCompute connector supports only the at-least-once semantics.

    Note

    The at-least-once semantics is used to prevent data loss. In specific cases, duplicate data may be written to MaxCompute. Duplicate data may be generated based on the tunnel that you use. For more information about MaxCompute Tunnel, see FAQ about upstream and downstream storage.

  • By default, a full MaxCompute source table is used. The source table reads data only from the partition that is specified by the partition parameter. After the source table reads all data from the partition, the source table stops running and does not monitor whether a new partition is generated and the status of the source table is changed to FINISHED.

    If you want the source table to continuously monitor whether new partitions are generated, configure the startPartition parameter in the WITH clause to use an incremental source table.

    Note
    • Each time a dimension table is updated, the dimension table checks for the latest partition.

    • After the source table starts to run, the source table does not read the data that is newly added to a partition. We recommend that you run a deployment when the partition data is complete.

Syntax

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to odps.

    endpoint

    The endpoint of MaxCompute.

    STRING

    Yes

    No default value

    For more information, see Endpoints.

    tunnelEndpoint

    The endpoint of MaxCompute Tunnel.

    STRING

    No

    No default value

    For more information, see Endpoints.

    Note
    • This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).

    • If this parameter is not configured, MaxCompute allocates the tunnel connections based on the Server Load Balancer (SLB) service.

    project

    The name of the MaxCompute project.

    STRING

    Yes

    No default value

    N/A.

    tableName

    The name of the MaxCompute table.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The AccessKey ID that is used to access MaxCompute.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    accessKey

    The AccessKey secret that is used to access MaxCompute.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    partition

    The name of the partition in a MaxCompute partitioned table.

    STRING

    No

    No default value

    You do not need to configure this parameter for a non-partitioned MaxCompute table or an incremental MaxCompute source table.

    Note

    For more information about how to configure the partition parameter for a partitioned MaxCompute table, see FAQ about upstream and downstream storage.

    compressAlgorithm

    The compression algorithm that is used by MaxCompute Tunnel.

    STRING

    No

    • VVR 4.0.13 and later: ZLIB

    • VVR 6.0.1 and later: SNAPPY

    Valid values:

    • RAW (no compression)

    • ZLIB

    • SNAPPY

      Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    quotaName

    The name of the quota for the exclusive Tunnel resource groups of MaxCompute.

    STRING

    No

    No default value

    You can configure this parameter to use the exclusive Tunnel resource groups of MaxCompute.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.

    • If you configure this parameter, you must delete the tunnelEndpoint parameter. If you do not delete the tunnelEndpoint parameter, the tunnel that is specified by the tunnelEndpoint parameter is used.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    maxPartitionCount

    The maximum number of partitions from which data can be read.

    INTEGER

    No

    100

    If the number of partitions from which data is read exceeds the value of this parameter, the error message "The number of matched partitions exceeds the default limit" appears.

    Important

    If data is read from a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the startup speed of the deployment slows down. To avoid this issue, you need to check whether data needs to be read from a large number of partitions and configure the partition parameter based on your business requirements. If your business requires data from a large number of partitions, manually increase the value of the maxPartitionCount parameter.

  • Parameters only for incremental MaxCompute source tables

    The incremental source table monitors new partitions by intermittently polling the MaxCompute server to obtain all partition information. Before the source table reads data from new partitions, data writing in the partitions must be complete. For more information, see What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition? You can configure the startPartition parameter to specify the start partition from which data is read. Only data in the partitions whose alphabetical order is greater than or equal to the alphabetical order of the partition that is specified by the startPartition parameter is read. For example, the alphabetical order of the partition year=2023,month=10 is less than the alphabetical order of the partition year=2023,month=9. In this case, you can add a zero before the number of the month to the name of the partition that is declared in the code to ensure that the alphabetical order of the partition is valid. This way, you can change the value of the partition parameter from year=2023,month=9 to year=2023,month=09.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    startPartition

    The start partition from which incremental data is read.

    STRING

    Yes

    No default value

    • If you configure this parameter, an incremental source table is used. As a result, the partition parameter is ignored.

    • If the source table is a multi-level partitioned table, you must configure the value of each partition column in descending order based on partition levels.

    Note

    For more information about how to configure the startPartition parameter, see FAQ about upstream and downstream storage.

    subscribeIntervalInSec

    The interval at which MaxCompute is polled to obtain the information about partitions.

    INTEGER

    No

    30

    Unit: seconds.

    modifiedTableOperation

    The operation that is performed when data in a partition is modified during partition reading.

    Enum (NONE, SKIP)

    No

    NONE

    Download sessions are saved in checkpoints. Each time you resume a session from a checkpoint, Flink attempts to resume the reading progress from the session. However, the session is unavailable because data in the partition is modified. In this case, the deployment is repeatedly restarted. To resolve this issue, you can configure this parameter. Valid values:

    • NONE: If you set this parameter to NONE, you must change the value of the startPartition parameter to make the alphabetical order of the partition that is specified by the startPartition parameter greater than the alphabetical order of the unavailable partition and start the deployment without states.

    • SKIP: If you do not want to start the deployment without states, you can set this parameter to SKIP. In this case, Flink skips the unavailable partition when Flink attempts to resume the session from the checkpoint.

    Important

    If you set this parameter to NONE or SKIP, the data that is read from the partition in which data is modified is retained, and the data that is not read is ignored.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    useStreamTunnel

    Specifies whether to use MaxCompute Streaming Tunnel to upload data.

    BOOLEAN

    No

    false

    Valid values:

    • true: MaxCompute Streaming Tunnel is used to upload data.

    • false: MaxCompute Batch Tunnel is used to upload data.

    Note

    flushIntervalMs

    The interval at which the flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    LONG

    No

    30000 (30s)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table when the size of the buffer data exceeds the specified threshold.

    If you use Streaming Tunnel, the data that is flushed is immediately written to the destination MaxCompute table. If you use Batch Tunnel, the data that is flushed is written to the destination MaxCompute table only after the checkpointing operation is complete. We recommend that you set this parameter to 0 to disable the scheduled flushing feature.

    Unit: milliseconds.

    Note

    This parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    batchSize

    The interval at which a flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    LONG

    No

    67108864 (64 MB)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.

    Unit: byte.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    • This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    numFlushThreads

    The number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.

    INTEGER

    No

    1

    Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    dynamicPartitionLimit

    The maximum number of dynamic partitions to which data can be written.

    INTEGER

    No

    100

    If the number of dynamic partitions to which data is written in the result table between two checkpointing operations exceeds the value of the dynamicPartitionLimit parameter, the error message "Too many dynamic partitions" appears.

    Important

    If data is written to a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the speed of the checkpointing operations slows down and data in the result table is flushed. To avoid this issue, you need to check whether data needs to be written to a large number of partitions. If your business requires data to be written to a large number of partitions, manually increase the value of the dynamicPartitionLimit parameter.

    retryTimes

    The maximum number of retries that can be performed for a request on the MaxCompute server.

    INTEGER

    No

    3

    The MaxCompute service may be unavailable for a short period of time when you create a session or submit a session, or data is flushed in the result table. If the MaxCompute service becomes unavailable, the MaxCompute server is requested based on the configuration of this parameter.

    sleepMillis

    The retry interval.

    INTEGER

    No

    1000

    Unit: milliseconds.

    enableUpsert

    Specifies whether to use MaxCompute Upsert Tunnel to upload data.

    BOOLEAN

    No

    false

    Valid values:

    • true: MaxCompute Upsert Tunnel is used to process INSERT, UPDATE_AFTER, and DELETE data in Flink.

    • false: MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel that is specified by the useStreamTunnel parameter is used to process INSERT and UPDATE_AFTER data in Flink.

    Important

    If an issue such as an error, a deployment failure, or a long-time processing fault occurs when a MaxCompute sink commits a session in upsert mode, we recommend that you set the Parallelism parameter of sink operators to a value that is less than or equal to 10.

    upsertAsyncCommit

    Specifies whether to use the asynchronous mode when a MaxCompute sink commits a session in upsert mode.

    BOOLEAN

    No

    false

    Valid values:

    • true: The asynchronous mode is used. If you use the asynchronous mode, the time that is consumed to commit the session is reduced but the data that is written after the session is committed cannot be immediately read.

    • false: The synchronous mode is used by default. When the MaxCompute sink commits the session, the system waits until the server processes the session.

    upsertCommitTimeoutMs

    The timeout period for which a MaxCompute sink commits a session in upsert mode.

    INTEGER

    No

    120000

    (120s)

    N/A.

  • Parameters only for dimension tables

    When a deployment starts, a MaxCompute dimension table pulls full data from a partition that is specified by the partition parameter. You can set the partition parameter to max_pt(). If the cache is reloaded after the cache entries expire, data of the latest partition specified by the partition parameter is re-parsed. If the partition parameter is set to max_two_pt(), the dimension table can pull data from two partitions. If the partition parameter is not set to max_two_pt(), data of only one partition can be pulled.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    cache

    The cache policy.

    STRING

    Yes

    No default value

    You must set the cache parameter to ALL for a MaxCompute dimension table and explicitly declare the setting in the DDL statement. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

    ALL: indicates that all data in the dimension table is cached. Before the system runs a deployment, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.

    Note
    • If the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.

    • If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data of the dimension table to each subtask. For more information, see FAQ about upstream and downstream storage.

    • If you use an ultra-large MaxCompute dimension table, frequent garbage collections (GCs) of Java virtual machine (JVM) may cause deployment exceptions. To resolve this issue, you can increase the memory of the node where the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.

    cacheSize

    The maximum number of rows of data that can be cached.

    LONG

    No

    100000

    If the number of data records in the dimension table exceeds the value of the cacheSize parameter, the error message "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" appears.

    Important

    If a large number of data records exists in a dimension table, a large amount of JVM heap memory is consumed. In this case, the startup speed of deployments and the speed of the updates of the dimension table slow down. To avoid this issue, you need to check whether a large number of data records need to be cached. If your business requires a large number of data records to be cached in a dimension table, manually increase the value of this parameter.

    cacheTTLMs

    The cache timeout period.

    LONG

    No

    Long.MAX_VALUE

    Unit: milliseconds.

    cacheReloadTimeBlackList

    The time periods during which cache is not refreshed. The cache is not refreshed during the time periods specified by this parameter.

    STRING

    No

    N/A.

    This parameter is suitable for large-scale online promotional events such as peak hours of activities. You can configure this parameter to prevent deployments from being unstable when the cache is refreshed. For more information about how to configure the parameter, see FAQ about upstream and downstream storage.

    maxLoadRetries

    The maximum number of retries that can be performed to refresh the cache. The first time that data is pulled when the deployment is started, the cache is refreshed. If the number of retries exceeds the value of this parameter, the deployment fails to run.

    INTEGER

    No

    10

    N/A.

Data type mappings

For more information about the data types that are supported by MaxCompute, see MaxCompute V2.0 data type edition.

Data type of MaxCompute

Data type of Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Important

If a MaxCompute physical table contains fields of a nested composite data type of Flink, such as ARRAY, MAP, or STRUCT, and contains fields of JSON type, you must configure tblproperties('columnar.nested.type'='true') when you create the MaxCompute physical table to allow Flink to read data from and write data to the physical table.

Sample code

SQL deployment

  • Sample code for a source table

    • Read data from a full MaxCompute source table

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • Read data from an incremental MaxCompute source table

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        startPartition'='yyyy=2018, MM=09,dd=05' -- Incremental data is read from the partition 20180905.
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • Sample code for a result table

    • Write data to static partitions

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- Data is written to the static partition 20180905. 
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • Write data to dynamic partitions

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' -- The partition value is not provided. This means that data is written to a partition specified by the ds field. 
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • Sample code for a dimension table

    • Join a dimension table and another table

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- You must declare a primary key when you join a dimension table and another table. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • Join a dimension table and multiple tables

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- You do not need to declare a primary key when you join a dimension table and multiple tables. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream APIs

Important
  • If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors. The MaxCompute DataStream connectors of different versions are stored in the Maven central repository.

  • To protect intellectual property, you can perform local debugging on a deployment that uses the MaxCompute DataStream connector for a maximum of 30 minutes in Realtime Compute for Apache Flink that uses VVR 6.0.6 or later. If local debugging takes more than 30 minutes, an error is returned and the deployment exits. For more information about how to run or debug a Flink deployment that includes the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.

  • When you run or debug a deployment that includes the MaxCompute connector of Alibaba Cloud Realtime Compute for Apache Flink in an integrated development environment (IDE), an issue that specific connector-related classes cannot be found may occur. In this case, you must download the file whose suffix is uber.jar in the Maven repository and add the file as an additional dependency of the deployment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. For example, if the version of the ververica-connector-odps dependency of MaxCompute is 1.15-vvr-6.0.6, you can view the ververica-connector-odps-1.15-vvr-6.0.6-uber.jar package in the directory of the Maven repository and download the package to your on-premises directory.

The Maven dependencies of the MaxCompute connector contain the classes required to create full MaxCompute source tables, incremental MaxCompute source tables, MaxCompute result tables, and MaxCompute dimension tables.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${connector.version}</version>
</dependency>

We recommend that you declare a MaxCompute table by using SQL statements when you use the MaxCompute DataStream connector. You can call Table API operations to access MaxCompute tables or call DataStream API operations to access data streams.

  • Access a source table by using the DataStream connector

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=201809*'",
        ")");
    DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
    source.print();
    env.execute("odps source"); 
  • Access a result table by using the DataStream connector

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=20180905'",
        ")");
    DataStream<Row> data = env.fromElements(
        Row.of("id0", 3.),
        Row.of("id1", 4.));
    tEnv.fromDataStream(data).insertInto("odps_sink").execute();

FAQ