You can use MaxCompute DataStream connectors to allow the DataStream API operations to read data from or write data to MaxCompute. This topic describes how to use MaxCompute DataStream connectors in the console of fully managed Flink.

Background information

MaxCompute DataStream connectors are developed based on the MaxCompute SQL connector. MaxCompute DataStream connectors have the same configurations as those specified by the WITH options for the MaxCompute SQL connector. For more information about the parameters in the WITH clause of the MaxCompute SQL connector, see Create a full MaxCompute source table, Create an incremental MaxCompute source table, and Create a MaxCompute result table.

The Ververica Runtime (VVR) connectors are placed in the Maven central repository for you to use when you develop a job. You can use Kafka DataStream connectors in one of the following ways:

(Recommended) Package the connector as a project dependency into the JAR file of your job

  1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

    If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    This change prevents the two SNAPSHOT repositories that you configured in Step 1 from being overwritten. If only an asterisk (*) is enclosed in the mirrorOf element, the SNAPSHOT repositories are overwritten.

  3. Add the connector that you want to use to the Maven POM file as a project dependency.
    • The following code shows the dependency for a MaxCompute source table and a MaxCompute result table.
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>
    • The following code shows the dependency for an incremental MaxCompute source table.
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>
    Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the mappings among connector versions, VVR or Flink versions, and connector types, see DataStream connectors. For more information about the dependencies, see the POM file in the MaxCompute sample code.
    Notice
    • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
    • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. Modify the MaxCompute connection configurations and the table schema for the connectors.
    Item Description
    MaxCompute connection configurations Specify the MaxCompute connection configurations. MaxCompute DataStream connectors have the same configurations as the MaxCompute SQL connector. For more information, see
    Table schema The schema of the table.
    • Connection to a MaxCompute source table
      VVR provides the ODPSStreamSource class that implements the SourceFunction interface. You can use the class to read data from a MaxCompute source table. The following sample code shows how to read data from a table partition in a MaxCompute source table.
      // Specify the connection configurations. 
      Configuration conf = new Configuration();
      conf.setString("endpoint", "yourEndpoint");
      conf.setString("tunnelEndpoint","YourTunnelEndpoint");
      conf.setString("project", "yourProjectName");
      conf.setString("tablename", "yourTableName");
      conf.setString("accessid", "yourAccessKeyID");
      conf.setString("accesskey", "yourAccessKeySecret");
      conf.setString("partition", "ds=yourPartitionName");
      
      // Configure the table schema. 
      TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // Create a MaxCompute source function. 
      ODPSStreamSource odpsSource =
          new OdpsSourceBuilder(schema, conf).buildSourceFunction();
      DataStreamSource<RowData> source = env.addSource(odpsSource);
      source.addSink(new PrintSinkFunction<>());
      env.execute("odps source");
      Note
      • VVR does not provide the OdpsSourceBuilder class as a connector dependency. The MaxCompute sample code provides the OdpsSourceBuilder class to help you create ODPSStreamSource objects.
      • A MaxCompute source table reads data in the data structure of the GenericRowData class. The GenericRowData class implements the RowData interface.
    • Connection to an incremental MaxCompute source table
      VVR provides the ContinuousODPSStreamSource class that implements the SourceFunction interface. You can use the class to read data from an incremental MaxCompute source table. The following sample code shows how to read data from a table partition in an incremental MaxCompute source table.
      // Specify the connection configurations. 
      Configuration conf = new Configuration();
      conf.setString("endpoint", "yourEndpoint");
      conf.setString("tunnelEndpoint","YourTunnelEndpoint");
      conf.setString("project", "yourProjectName");
      conf.setString("tablename", "yourTableName");
      conf.setString("accessid", "yourAccessKeyID");
      conf.setString("accesskey", "yourAccessKeySecret");
      conf.setString("startPartition", "ds=yourPartitionName");
      
      // Configure the table schema. 
      TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // Create a continuous MaxCompute source function. 
      ContinuousODPSStreamSource continuousOdpsSource =
              new OdpsSourceBuilder(schema, conf).buildContinuousOdpsSource();
      DataStreamSource<RowData> source = env.addSource(continuousOdpsSource);
      source.addSink(new PrintSinkFunction<>());
      env.execute("continuous odps source");
      Note
      • VVR does not provide the OdpsSourceBuilder class as a connector dependency. The sample code provides the OdpsSourceBuilder class to help you create ODPSStreamSource objects.
      • An incremental MaxCompute source table reads data in the data structure of the GenericRowData class. The GenericRowData class implements the RowData interface.
    • Connection to a MaxCompute result table
      VVR provides the OdpsOutputFormat class that implements the OutputFormat interface. You can use the class that calls the MaxCompute Tunnel service to upload data to the destination table or partition. The following sample code shows how to create a MaxCompute sink. The sink has an OdpsOutputFormat object that is used to upload data to the destination partition.
      // Specify the connection configurations. 
      DescriptorProperties properties = new DescriptorProperties();
      properties.putString("endpoint", "yourEndpoint");
      properties.putString("tunnelEndpoint","YourTunnelEndpoint");
      properties.putString("project", "yourProjectName");
      properties.putString("tablename", "yourTableName");
      properties.putString("accessid", "yourAccessKeyID");
      properties.putString("accesskey", "yourAccessKeySecret");
      properties.putString("partition", "ds=yourPartitionName");
      
      // Configure the table schema. 
      TableSchema schema = TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // Enable the MaxCompute sink to submit the data to MaxCompute during checkpointing. This helps ensure exactly-once delivery. 
      env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);
      
      // Create a MaxCompute sink function. 
      TupleOutputFormatSinkFunction<Row> odpsSink = new TupleOutputFormatSinkFunction<>(
              new OdpsOutputFormat(schema, properties));
      
      env.fromCollection(
              Arrays.asList(Row.of("aa", "111"), Row.of("bb", "222"))
      ).map(new MapFunction<Row, Tuple2<Boolean, Row>>() {
          @Override
          public Tuple2<Boolean, Row> map(Row row) throws Exception {
              return Tuple2.of(true, row);
          }
      }).addSink(odpsSink);
      env.execute("odps sink");
      Note
      • A MaxCompute sink submits data to MaxCompute during checkpointing or when the job is completed. Then, you can query the submitted data from MaxCompute. If you need the data in a sink to be available in MaxCompute for real-time queries, you can call the enableCheckpointing method to enable the checkpointing feature. The first parameter of the enableCheckpointing method indicates the checkpoint interval in milliseconds. For more information, see Principles.
      • Data that is sent to a MaxCompute sink must be in the Tuple2<Boolean, Row> data structure. Tuple2 represents a tuple that has two fields. The sample code shows how to use the MapFunction class to convert data into this data structure.

Upload the JAR package of the Kafka DataStream connector to the console of fully managed Flink

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. Click Upload Artifact and select the JAR package that you want to upload.

    You can upload the JAR package of your self-managed connector or the JAR package of a connector that is provided by fully managed Flink. For the download links of the official JAR packages that are provided by fully managed Flink, see Connectors.

  5. In the Additional Dependencies section of the Draft Editor page, select the JAR package that you want to use.