Hologres is highly compatible with fully managed Flink. In most cases, you can use Flink SQL to declare Hologres source tables, dimension tables, and sink tables, and then use SQL to define the data processing logic. However, in special business scenarios where Flink SQL cannot meet your computing needs, you must use DataStream to read and write data. This topic uses Ververica Runtime (VVR) 8.0.8 for Flink 1.17 as an example to demonstrate how to debug and develop a DataStream job that uses the Hologres connector.
Prerequisites
-
You have purchased a Hologres instance and created a database. For more information, see Create a database.
-
You have installed a code development platform, such as IntelliJ IDEA, for local code debugging.
Step 1: Download connector dependencies
To read and write Hologres data using DataStream, download the Hologres connector for fully managed Flink. For a list of released connector versions, see Hologres DataStream Connector.
-
Download the following two dependency JAR files:
-
ververica-connector-hologres-1.17-vvr-8.0.8.jar: For local debugging.
-
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar: For local debugging and online deployment.
NoteStarting from VVR 6.0 for Flink 1.15, you must use the corresponding uber JAR when you debug a commercial connector locally. For more information, see Run and debug jobs that contain connectors locally.
-
-
After the download is complete, run the following command to install the ververica-connector-hologres-1.17-vvr-8.0.8.jar file to your local Maven repository:
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jarIn the command,
$pathis the absolute path of the local directory where the ververica-connector-hologres-1.17-vvr-8.0.8.jar file is stored.
Step 2: Develop and debug locally
You must develop your project locally before you can deploy and run it on the fully managed Flink console. For example, the project code and pom.xml file for a Binlog source table are as follows:
-
Write the code locally:
-
DataStream API demo code:
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // When debugging locally, specify the absolute path of the uber JAR. Comment this out when packaging for upload. envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // Initialize the schema of the table to be read. The schema must match the fields of the Hologres table. You can define only a subset of the fields. TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Hologres parameters. Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // Build JDBCOptions. JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // Build HologresBinlogSource. long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }Parameter descriptions:
Parameter
Description
path_to_uber_jar
The absolute path of the local uber JAR. For Windows, you must add the disk partition, for example,
file:///D:/path/to/a-uber.jar.id
The schema of the table to be read. The schema must match the fields of the Hologres table. You can define only a subset of the fields.
yourEndpoint
The network domain name of the Hologres instance. Go to the instance product page in the Hologres console to obtain the domain name from the Network Information section.
yourUserName
The AccessKey ID of your Alibaba Cloud account. Go to the AccessKey Management page to obtain the AccessKey ID.
yourPassword
The AccessKey secret that corresponds to your Alibaba Cloud account.
yourDatabaseName
The name of the Hologres database.
yourTableName
The name of the Hologres table to read.
-
pom.xml file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- Log implementation log4j dependency --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
-
Debug and run locally.
-
Configure the required ClassLoader JAR file, which is ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar. For more information, see Step 2: Configure the required ClassLoader JAR file.
-
(Optional) If an error occurs because common Flink classes such as
org.apache.flink.configuration.Configurationare missing, select Add dependencies with provided scope to classpath in "Modify options".
After you complete the configuration, debug and run the project locally to ensure it runs successfully.
-
For more information about local debugging, see Run and debug jobs that contain connectors locally.
Step 3: Package and run
After you successfully debug the project locally, package the project and upload it to the Flink console along with the uber JAR.
-
Before packaging, comment out the following code:
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); -
Compile and package.
Use Maven to compile and package the application and its dependencies. The command is as follows:
mvn clean package -DskipTestsAfter the packaging is complete, a file named hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar is generated in your local directory.
-
Upload the JAR files.
On the Resource Management page of the Flink console, upload the packaged application JAR file and the ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar file. For more information, see Step 2: Upload the test JAR file and data file.
-
Deploy the JAR job.
On the Job O&M page of the Flink console, you can deploy the JAR job. For more information and parameter details, see Step 3: Deploy the JAR job.
-
Start the job and view the Flink computing results.
NoteIf you update the JAR file, you must re-upload it, deploy the job again, and then start the job.
-
On the Job O&M page of the Flink console, find the target job and click Start in the Actions column.
-
Configure resource information and basic settings.
For more information about job startup parameters, see Start a job.
-
Click Start.
After you click Start, the job's Status changes to Running, which indicates that the job is running correctly.
-
FAQ
-
Question 1: When you run and debug a Flink job in IntelliJ IDEA that includes a dependency on a commercial connector from Realtime Compute for Apache Flink, you may encounter a runtime error indicating that a connector-related class cannot be found. For example:
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter.-
Cause: This type of exception usually occurs because the uber JAR was not used correctly during local debugging.
-
Solution: See this topic or Run and debug jobs that contain connectors locally for instructions on how to correctly use the uber JAR for debugging.
-
-
Question 2: An error message indicates that the program cannot be executed because some common Flink classes are missing. For example:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration.-
Cause: This may be because a dependency is missing or was not loaded correctly.
-
Solution:
-
The required dependency, which is often flink-connector-base, is not imported in the pom.xml file. You can also search the exception package path to identify the required Flink dependency.
-
The `provided` dependency might not have been loaded at runtime. In IntelliJ IDEA, go to "Modify options" and check Add dependencies with provided scope to classpath.
-
-
-
Question 3: A
Incompatible magic valueerror is reported during runtime.-
Cause:
-
Cause 1: The version of the uber JAR may be inconsistent with the connector version.
-
Cause 2: The ClassLoader settings may be incorrect.
-
-
Solution:
-
For Cause 1: See this topic to select the correct versions of the connector and the uber JAR.
-
For Cause 2: See Configure the required ClassLoader JAR file to reconfigure the settings.
-
-
-
Question 4: A
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not foundexception is thrown at runtime.-
Cause: The uber JAR does not currently support 32-bit Java on Windows systems.
-
Solution: Install 64-bit Java. You can run the
java -versioncommand to check your Java installation information. If the output does not contain64-Bit, you are using 32-bit Java.
-
-
Question 5: A
Caused by: java.lang.ClassFormatErrorexception is thrown at runtime.-
Cause: This may be caused by the JDK version configured in IntelliJ IDEA.
-
Solution: Use a later version of JDK 8 or JDK 11.
-