This topic describes how to run a Flink DataStream job to read data from Log Service.
- Java Development Kit (JDK) 8 is installed on your on-premises machine.
- Maven 3.x is installed on your on-premises machine.
- An integrated development environment (IDE) for Java or Scala is installed on your on-premises machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
- A Logstore is created in Log Service, and test data exists in the Logstore.
Develop a job
- Download and decompress the SLS_Demo-master.zip package to your on-premises machine.
- In IntelliJ IDEA, choose to open the decompressed SLS_Demo-master folder.
- Double-click the ConsumerSample.java file in the \SLS_Demo-master\src\main\java\com\aliyun\openservices\log\flink directory. Then, configure the parameters related to Log Service in the ConsumerSample.java file.
private static final String SLS_ENDPOINT = "VPC endpoint";// Use the classic network endpoint or a virtual private cloud (VPC) endpoint in the production environment. // private static final String SLS_ENDPOINT = "public endpoint";// Use the public endpoint in the test environment. private static final String ACCESS_KEY_ID = "yourAK"; private static final String ACCESS_KEY_SECRET = "yourAS"; private static final String SLS_PROJECT = "yourProject"; private static final String SLS_LOGSTORE = "yourlogstore"; // 1. Specify the start offset, which indicates the timestamp to start reading data from Log Service. The timestamp is measured in seconds. 2. To read both full and incremental data from Log Service, set the StartInMs parameter to Consts.LOG_BEGIN_CURSOR. // 3. To read only incremental data from Log Service, set the StartInMs parameter to Consts.LOG_END_CURSOR. private static final String StartInMs = Consts.LOG_END_CURSOR;Note You must comment out <scope>provided</scope> when you perform local debugging in your IDE.
- Go to the directory where the pom.xml file is stored. Then, run the following command to package the file:
mvn clean package
A Java Archive (JAR) package named flink-log-connector-0.1.21-SNAPSHOT.jar appears in the target directory, based on the artifactId parameter that you configured in the pom.xml file for your project. This indicates that job development is complete.
Publish a job
-- Required. The full name of the main class. --blink.main.class=com.aliyun.openservices.log.flink.ConsumerSample -- The name of the job. blink.job.name=sls -- The resource name of the JAR package that contains the full name of the main class. If multiple JAR packages exist, you must specify this parameter. --blink.main.jar=flink-log-connector-0.1.21-snapshot.jar -- The default state backend configuration. This field takes effect when the job code is not explicitly configured. state.backend.type=niagara state.backend.niagara.ttl.ms=129600000 -- The default checkpoint configuration. The configuration takes effect when the job code is not explicitly configured. blink.checkpoint.interval.ms=180000
Verify the test results
On the Container Log tab of the Job Administration page, view information in the taskmanager.out file of the sink node. In this example, the type of the sink node is print.