This topic demonstrates how to write data from open source Flink to Hologres in real time.
Prerequisites
-
Activate a Hologres instance and connect a development tool. This example uses a psql client to connect to Hologres. For more information, see PSQL Client.
-
Create a Flink cluster. Go to the official Flink website to download the binary file and start a standalone cluster. For more information, see Cluster Setup. This example uses a Flink 1.10 cluster.
-
Prepare an input data source for Flink as needed.
Procedure
-
Create a sink table in Hologres.
After connecting a development tool to the Hologres instance, create a sink table to accept real-time data. The following is a sample statement.
begin; create table order_details(user_id bigint, user_name text, item_id bigint, item_name text, price numeric(38, 2), province text, city text, ip text, longitude text, latitude text, sale_timestamp timestamptz not null, primary key(user_id, item_id)); call set_table_property ('order_details', 'segment_key', 'sale_timestamp'); commit; -
Download and compile the Flink JAR file.
-
Download and install the dependency JAR file for the Hologres connector: hologres-flink-connector-1.10-jar-with-dependencies.jar. The following is a sample statement.
mvn install:install-file -Dfile=hologres-flink-connector-1.10-jar-with-dependencies.jar -DgroupId=org.apache.flink -DartifactId=hologres-flink-connector -Dversion=1.10 -Dpackaging=jar -DgeneratePom=true -
Go to the official Hologres Git sample repository, then download and compile the JAR file. The following are sample statements.
git clone https://github.com/hologres/hologres-flink-examples.git cd hologres-flink-examples git checkout -b example mvn package -DskipTests
-
-
Submit the Flink job.
After compiling the JAR file, configure the job parameters and submit the Flink job. The following is a sample statement.
NoteThis example submits the Flink job from the command line. You can also use the Flink web UI.
flink run -c io.hologres.flink.example.HologresSinkExample ../hologres-flink-example/target/hologres-flink-examples-1.0.0-jar-with-dependencies.jar --endpoint $ENDPOINT --username $USERNAME --password $PASSWORD --database $DATABASE --tablename order_detailsThe following table describes the parameters.
Parameter
Description
Example
endpoint
The Endpoint of the Hologres instance.
Go to the instance product page in the Hologres console and obtain the Endpoint from the Network Information section.
NoteIf you use a local Flink cluster, use the public Endpoint of Hologres. If you use an Alibaba Cloud Virtual Private Cloud (VPC), use the VPC Endpoint of Hologres.
ssseeee-cn-hangzhou.hologres.aliyuncs.com:80username
The AccessKey ID of your Alibaba Cloud account.
Click AccessKey Management to obtain the AccessKey ID.
None
password
The AccessKey secret of your Alibaba Cloud account.
Click AccessKey Management to obtain the AccessKey secret.
None
database
The name of the Hologres database to connect to.
hologres_demotablename
The name of the Hologres table that accepts the data.
order_details -
Query data in Hologres.
After the job starts, query the written data in Hologres in real time. The following are sample statements.
select count(1) from order_details; select item_id, sum(price) as total from order_details group by item_id limit 10;