This topic describes how to run a Flink DataStream job to read data from Alibaba Cloud Message Queue for Apache Kafka.
Prerequisites
- Java Development Kit (JDK) 8 is installed on your machine.
- Maven 3.X is installed on your machine.
- An integrated development environment (IDE) for Java or Scala is installed on your machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
- A Message Queue for Apache Kafka instance that resides in the same VPC as your Realtime Compute for Apache Flink cluster in exclusive mode is created. A topic and a consumer group are created.
Background information
- DataStream of Realtime Compute for Apache Flink is compatible with open source Apache Kafka 1.5.2. Alibaba Cloud Message Queue for Apache Kafka is compatible with open source Apache Kafka. Therefore, you can directly use the Kafka connector in the Maven repository to access Alibaba Cloud Message Queue for Apache Kafka.
- Realtime Compute for Apache Flink in exclusive mode accesses Message Queue for Apache Kafka over a VPC. Simple Authentication and Security Layer (SASL) authentication is not required. If you access Alibaba Cloud Message Queue for Apache Kafka over the Internet in your IDE, SASL authentication is required. For more information about configurations of Message Queue for Apache Kafka, see kafka-java-demo.
Notice Only Blink 3.X supports this demo.
Develop a job
Publish a job
For more information about how to publish a job, see Publish a job.
The following example shows the job content:
Note Modify the configurations of blink.main.class, blink.job.name, and blink.main.jar as required.
-- The complete main class name, for example, com.alibaba.realtimecompute.DatastreamExample. This field is required.
blink.main.class=com.alibaba.blink.datastreaming.AliKafkaConsumerDemo
-- The job name.
blink.job.name=alikafkaconsumerdemo
-- The resource name of the JAR package that contains the complete main class name, for example, blink_datastream.jar.
blink.main.jar=blink-datastreaming-1.0-snapshot.jar
-- The default state backend configuration. This field takes effect when the job code is not explicitly configured.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
-- The default checkpoint configuration. This field takes effect when the job code is not explicitly configured.
blink.checkpoint.interval.ms=180000