You can use the Hologres DataStream connector to allow DataStream API operations to read data from or write data to Hologres. This topic describes how to use the Hologres DataStream connector in the console of fully managed Flink.
Background information
The connection configuration of the Hologres DataStream connector is the same as the connection configuration of the related Hologres SQL connector. For more information about the connection configuration of the Hologres SQL connector, see the "Parameters in the WITH clause" section in Create a Hologres source table, Create a Hologres result table, or Create a Hologres dimension table.
The Ververica Runtime (VVR) connectors are stored in the Maven central repository for you to use when you develop a draft. You can use the Hologres DataStream connector in one of the following ways:
(Recommended) Package the connector as a project dependency into the JAR file of your draft
Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
Check whether your settings.xml configuration file contains the
<mirrorOf>*</mirrorOf>
configuration.If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To avoid this issue, perform the following operations based on the actual scenario: If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>. If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>. If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.
Add the connector that you want to use to the Maven POM file as a project dependency.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${connector.version}</version> </dependency>
Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the dependencies, see the POM file in the Hologres sample code.
ImportantYou must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
Modify the Hologres connection configuration and table schema information.
Item
Description
Hologres connection configuration
Replace the connection configuration with the connection configuration of your Hologres instance. Make sure that the connection configuration of the Hologres DataStream connector is the same as the connection configuration of the related Hologres SQL connector. For more information about the connection configuration, see the "Parameters in the WITH clause" section in Create a Hologres source table, Create a Hologres result table, or Create a Hologres dimension table.
Table schema
The schema of the table. The data type mappings of the table must be the same as the data type mappings of the Hologres table that you define in an SQL statement. For more information, see the "Data type mappings" section in Create a Hologres source table, Create a Hologres result table, or Create a Hologres dimension table.
Build an implementation class to read data from a Hologres source table
VVR provides the implementation class HologresBulkreadInputFormat of RichInputFormat to read data from a Hologres source table. The following example shows how to build the implementation class HologresBulkreadInputFormat to read data from a Hologres source table.
// Define the table schema of the sink. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .build(); // Configure Hologres-related parameters. For more information about the parameters, see the SQL documentation. Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); // Build JDBC options. JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); String query = JDBCUtils.getSimpleSelectFromStatement( jdbcOptions.getTable(), schema.getFieldNames()); // Build HologresBulkreadInputFormat to read data from the Hologres source table. HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType()); env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo) .print(); env.execute();
Build an implementation class to read data from a Hologres binlog source table
VVR provides the implementation class HologresBinlogSource of Source to read data from a Hologres binlog source table. The following sample code shows how to build the implementation class HologresBinlogSource to read data from a Hologres binlog source table.
// Define the table schema of the sink. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .build(); // Configure Hologres-related parameters. For more information about the parameters, see the SQL documentation. Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // Build JDBC options. JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions)); RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions); // Build HologresBinlogSource to read data from the Hologres binlog source table. long startTimeMs = 0; HologresBinlogSource<RowData> source = new HologresBinlogSource<>( schema, config, jdbcOptions, recordConverter, startTimeMs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute();
NoteThe buildRecordConverter method is not included in the dependency of the VVR connector. This method is provided in Sample code.
For more information about Hologres binary log considerations and implementation principles, see Consume Hologres data in real time.
Build an implementation class to write data to a Hologres result table
VVR provides the implementation class HologresSinkFunction of OutputFormatSinkFunction to write data to a Hologres result table. The following sample code shows how to build the implementation class OutputFormatSinkFunction to write data to a Hologres result table.
// Initialize the schema of the result table to which data is written. TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .build(); // Configure Hologres-related parameters. For more information about the parameters, see the SQL documentation. Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); config.setBoolean(HologresConfigs.USE_RPC_MODE, true); HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config); // Build a Hologres Writer to write data in the data structure of the RowData class. AbstractHologresWriter<RowData> hologresWriter = buildHologresWriter(schema, config, hologresConnectionParam); // Build HologresSinkFunction to write data to a Hologres result table. HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType()); int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE); env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo) .addSink(sinkFunction); env.execute();
NoteThe buildHologresWriter method is not included in the dependency of the VVR connector. This method is provided in Sample code.
Upload the JAR package of the Hologres DataStream connector to the console of fully managed Flink
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Artifacts.
Click Upload Artifact and select the JAR package that you want to upload.
You can upload the JAR package of your self-managed connector or the JAR package of a connector provided by fully managed Flink. For the download links of the official JAR packages that are provided by fully managed Flink, see Connectors.
In the Additional Dependencies section of the Draft Editor page, select the JAR package that you want to use.