Hologres is highly compatible with fully managed Flink. In most cases, you can use Flink SQL to declare Hologres source tables, dimension tables, and result tables to express the data processing logic. In special business scenarios, if Flink SQL cannot meet your business requirements, you can use DataStream connectors to read and write data. This topic describes how to develop and debug a DataStream draft based on the Hologres connector. In this example, vvr-8.0.8-flink-1.17 is used.
Prerequisites
A Hologres instance is created, and a database is created for the Hologres instance. For more information, see Create a database.
A code development platform, such as IntelliJ IDEA, is installed for on-premises code debugging.
Step 1: Download the JAR packages on which the connector depends
When you use a DataStream connector to read data from and write data to Hologres, you must download the Hologres connector to connect to fully managed Flink. For more information about the versions of the Hologres connector, see Hologres DataStream connectors.
You must download the following JAR packages:
ververica-connector-hologres-1.17-vvr-8.0.8.jar: This JAR package is used for local debugging.
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar: This JAR package is used for local debugging and online deployment.
NoteFor Realtime Compute for Apache Flink that uses vvr-6.0-flink-1.15 or later, you must use the uber JAR file of a specific version of the connector when you use a connector to debug a deployment in an on-premises environment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
After you download the preceding JAR packages, run the following command to install the JAR package ververica-connector-hologres-1.17-vvr-8.0.8.jar in your local Maven repository:
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar
$path
is the absolute path in which the JAR package ververica-connector-hologres-1.17-vvr-8.0.8.jar is stored.
Step 2: Develop a draft and debug the draft in an on-premises environment
Before you can create and run a deployment in the development console of Realtime Compute for Apache Flink, you must develop a draft on an on-premises environment. The following sample code describes how to build an implementation class to read data from a Hologres binary log source table.
Write code in an on-premises environment.
Sample code for using DataStream API to read data from a Hologres binary log source table
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // Specify the absolute path of the uber JAR file when you debug the draft in an on-premises environment and comment out the absolute path when you package and upload the JAR file of the draft. envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // Initialize the schema of the source table that you want to read. Make sure that the fields in the table schema match the fields in the Hologres table. You can define only specific fields. TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Configure Hologres-related parameters. Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // Build Java Database Connectivity (JDBC) options. JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // Build HologresBinlogSource to read data from the Hologres binlog source table. long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }
The following table describes the parameters.
Parameter
Description
path_to_uber_jar
The absolute path of the local uber JAR file. If you use the Windows operating system, you must add disk partitions to the path, such as
file:///D:/path/to/a-uber.jar
.id
The schema of the source table that you want to read. Make sure that the fields in the table schema match the fields in the Hologres table. You can define only specific fields.
yourEndpoint
The endpoint of the Hologres instance. You can view the endpoint of the Hologres instance in the Network Information section of the Instance Details page of the Hologres instance in the Hologres console.
yourUserName
The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the AccessKey Pair page.
yourPassword
The AccessKey secret of your Alibaba Cloud account.
yourDatabaseName
The name of the Hologres database.
yourTableName
The name of the Hologres table from which you want to read data.
pom.xml file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- Implement the log4j dependency in the log. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
Debug and run the draft in an on-premises environment.
You must add the ClassLoader JAR package, such as ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar, to the draft configuration. For more information, see Step 2: Configure the ClassLoader JAR package that is required to run the deployment.
Optional. If an error message indicating that common Flink classes such as
org.apache.flink.configuration.Configuration
are missing appears, you must select Add dependencies with provided scope to classpath for the Modify options parameter.
After the configuration is complete, you can debug and run the draft in the on-premises environment.
For more information about how to debug a deployment that contains a connector, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
Step 3: Package the program of the draft and run the deployment for the draft
After local debugging is successful, you can package the program of the draft into a JAR file and upload the JAR file to the development console of Realtime Compute for Apache Flink together with the uber JAR file.
Before you package the program of the draft, comment out the following code:
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
Compile and package the program of the draft.
Use Maven to compile and package the program of the draft and the dependencies. Sample command:
mvn clean package -DskipTests
After the packaging operation is complete, a JAR file named hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar is generated on your on-premises machine.
Upload the JAR file.
On the Artifacts page of the development console of Realtime Compute for Apache Flink, upload the generated JAR file and the ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar file. For more information, see Step 2: Upload the test JAR package and data file.
Create a JAR deployment.
Create a JAR deployment on the Deployments page in the development console of Realtime Compute for Apache Flink. For more information about how to create a JAR deployment and configure the parameters, see Step 3: Create a JAR deployment.
Start the deployment and view the computing result.
NoteIf you update the JAR file, you must upload the JAR file, create a JAR deployment, and start the deployment again.
On the Deployments page in the development console of Realtime Compute for Apache Flink, find the desired deployment and click Start in the Actions column.
In the Start Job dialog box, configure the parameters.
For more information about the parameters that you must configure when you start your deployment, see Start a deployment.
Click Start.
After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.
FAQ
Issue 1: When I run or debug a deployment that contains a connector of Alibaba Cloud Realtime Compute for Apache Flink in IntelliJ IDEA, an issue that a connector-related class cannot be found may occur. For example, when I run a deployment, the following error occurs:
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter
. What do I do?Possible cause: The uber JAR file is not properly used when you debug the deployment in an on-premises environment.
Solution: Use the uber JAR file correctly for local debugging. For more information, see this topic or Run or debug a Flink deployment that includes a connector in an on-premises environment.
Issue 2: An error message indicating that common Flink classes are missing appears. For example, the
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration
message appears. What do I do?Possible cause: A dependency may be missing or may not be loaded as expected.
Solution:
The error message appears because no dependency is added to the pom.xml file. You can identify which dependency is missing and add the dependency to the pom.xml file. In most cases, flink-connector-base may be missing. You can also search for the path of the exception package to identify which Flink dependency is missing.
The error message appears because the provided dependency may not be loaded as expected. You can select Add dependencies with provided scope to classpath for the Modify options parameter in IntelliJ IDEA.
Issue 3: What do I do if the error message
Incompatible magic value
appears when I run a deployment?Possible causes:
Cause 1: The version of the uber JAR file may be different from the version of the connector.
Cause 2: The configuration of the ClassLoader JAR package is invalid.
Solutions:
Solution for Cause 1: You can refer to this topic to select the related version of the connector and the uber JAR file.
Solution for Cause 2: Reconfigure the ClassLoader JAR package. For more information, see Step 2: Configure the ClassLoader JAR package that is required to run the deployment.
Issue 4: What do I do if the error message
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found
appears when I run a deployment?Possible cause: The uber JAR file does not support 32-bit Java in Windows.
Solution: Install 64-bit Java. You can run the
java -version
command to view the Java installation information. If the installation information does not contain64-Bit
, 32-bit Java is installed.
Issue 5: What do I do if the error message
Caused by: java.lang.ClassFormatError
appears when I run a deployment?Possible cause: The Java Development Kit (JDK) version configured for IntelliJ IDEA is invalid.
Solution: Use JDK 8 or JDK 11.