This topic describes how to write data from open source Apache Flink 1.10 to Hologres in real time.
Prerequisites
A Hologres instance is purchased and connected to a development tool. In this topic, the PostgreSQL client is used to connect to Hologres. For more information, see Use the PostgreSQL client to connect to Hologres.
A Flink cluster is created. You can download a binary file from the Flink official website and create a Flink cluster that is deployed in Standalone mode. For more information about how to create a Flink cluster, see Local Installation of Flink. In this topic, a Flink 1.10 cluster is created.
A connection for importing data from Flink is created based on your business requirements.
Procedure
Create a Hologres result table.
After the Hologres instance is connected to the development tool, you must create a result table. The result table stores data that is written in real time. Sample statements:
begin; create table order_details(user_id bigint, user_name text, item_id bigint, item_name text, price numeric(38, 2), province text, city text, ip text, longitude text, latitude text, sale_timestamp timestamptz not null, primary key(user_id, item_id)); call set_table_property ('order_details', 'segment_key', 'sale_timestamp'); commit;
Download and compile the Java ARchive (JAR) file of Flink.
Download and install the JAR file named hologres-flink-connector-1.10-jar-with-dependencies.jar on which the Hologres connector depends. Sample statement:
mvn install:install-file -Dfile=hologres-flink-connector-1.10-jar-with-dependencies.jar -DgroupId=org.apache.flink -DartifactId=hologres-flink-connector -Dversion=1.10 -Dpackaging=jar -DgeneratePom=true
Visit the official sample library of Hologres. Then, download and compile the JAR file. Sample statements:
git clone https://github.com/hologres/hologres-flink-examples.git cd hologres-flink-examples git checkout -b example mvn package -DskipTests
Submit a Flink deployment.
After you compile the JAR file, you must configure and submit a Flink deployment. Sample statement:
NoteIn the sample statement, a command is used to submit the Flink deployment. You can also submit the Flink deployment on the Flink web page.
flink run -c io.hologres.flink.example.HologresSinkExample ../hologres-flink-example/target/hologres-flink-examples-1.0.0-jar-with-dependencies.jar --endpoint $ENDPOINT --username $USERNAME --password $PASSWORD --database $DATABASE --tablename order_details
The following table describes the parameters in the syntax.
Parameter
Description
Example
endpoint
The endpoint of the Hologres instance.
You can view the endpoint of the Hologres instance in the Network Information section of the Instance Details page in the Hologres console.
NoteIf your Flink cluster is an on-premises Flink cluster, you must use the public endpoint to access the Hologres instance. If your Flink cluster is deployed in a virtual private cloud (VPC), you must use the VPC endpoint to access the Hologres instance.
ssseeee-cn-hangzhou.hologres.aliyuncs.com:80
username
The AccessKey ID of the Alibaba Cloud account.
You can obtain the AccessKey ID from the Security Management page.
None
password
The AccessKey secret of the Alibaba Cloud account.
You can obtain the AccessKey secret from the Security Management page.
None
database
The name of the Hologres database to be connected to.
hologres_demo
tablename
The name of the Hologres table to which the data is imported.
order_details
Query the synchronized data in Hologres.
After you finish the preceding operations, you can query the data that is written to Hologres in real time. Sample statement:
select count(1) from order_details; select item_id, sum(price) as total from order_details group by item_id limit 10;