This topic describes how to run a Flink DataStream job to read data from Alibaba Cloud Message Queue for Apache Kafka.

Prerequisites

  • Java Development Kit (JDK) 8 is installed on your machine.
  • Maven 3.X is installed on your machine.
  • An integrated development environment (IDE) for Java or Scala is installed on your machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
  • A Message Queue for Apache Kafka instance that resides in the same VPC as your Realtime Compute for Apache Flink cluster in exclusive mode is created. A topic and a consumer group are created.

Background information

  • DataStream of Realtime Compute for Apache Flink is compatible with open source Apache Kafka 1.5.2. Alibaba Cloud Message Queue for Apache Kafka is compatible with open source Apache Kafka. Therefore, you can directly use the Kafka connector in the Maven repository to access Alibaba Cloud Message Queue for Apache Kafka.
  • Realtime Compute for Apache Flink in exclusive mode accesses Message Queue for Apache Kafka over a VPC. Simple Authentication and Security Layer (SASL) authentication is not required. If you access Alibaba Cloud Message Queue for Apache Kafka over the Internet in your IDE, SASL authentication is required. For more information about configurations of Message Queue for Apache Kafka, see kafka-java-demo.
Notice Only Blink 3.X supports this demo.

Develop a job

  1. Download and decompress alikafka-demo-master to your machine.
  2. In IntelliJ IDEA, choose File > Open to open the decompressed alikafka-demo-master.
  3. Double-click kafka.properties under the \alikafka-demo-master\src\main\resources directory to open the file. Then, change the values of the parameters bootstrap.servers, topic, and group.id to the values of the created Message Queue for Apache Kafka instance.
    ## Endpoints, which are obtained from the Message Queue for Apache Kafka console.
    
    ## You can enter public and VPC endpoints for the bootstrap.servers parameter. However, if you use Realtime Compute for Apache Flink in exclusive mode, you must enter VPC endpoints.
    bootstrap.servers=ip1:port,ip2:port,ip3:port
    
    ## The topic, which is created in the Message Queue for Apache Kafka console.
    topic=your_topic
    
    ## The consumer group, which is created in the Message Queue for Apache Kafka console.
    group.id=your_groupid
  4. Go to the directory where the pom.xml file is saved and run the following command to package the file:
    mvn clean package

    Based on artifactId that you set in the pom.xml file for your project, a JAR package named blink-datastreaming-1.0-SNAPSHOT.jar appears in the target directory. This indicates that job development is completed.

Publish a job

For more information about how to publish a job, see Publish a job.
Note Modify the configurations of blink.main.class, blink.job.name, and blink.main.jar as required.
The following example shows the job content:
-- The complete main class name, for example, com.alibaba.realtimecompute.DatastreamExample. This field is required.
blink.main.class=com.alibaba.blink.datastreaming.AliKafkaConsumerDemo

-- The job name.
blink.job.name=alikafkaconsumerdemo

-- The resource name of the JAR package that contains the complete main class name, for example, blink_datastream.jar.
blink.main.jar=blink-datastreaming-1.0-snapshot.jar

-- The default state backend configuration. This field takes effect when the job code is not explicitly configured.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000

-- The default checkpoint configuration. This field takes effect when the job code is not explicitly configured.
blink.checkpoint.interval.ms=180000

Verify the data reading result

  1. Send messages to Realtime Compute for Apache Flink in the Message Queue for Apache Kafka console.
  2. On the Job Administration page, view information in the taskmanager.out file of the sink node. In this example, the type of the sink node is print.
    If information similar to that shown in the following figure appears, Realtime Compute for Apache Flink has read data from Alibaba Cloud Message Queue for Apache Kafka. The information depends on the messages sent from the Message Queue for Apache Kafka console.Data reading succeeded