All Products
Search
Document Center

Hologres:Write data from open source Apache Flink 1.10 to Hologres in real time

Last Updated:Mar 05, 2024

This topic describes how to write data from open source Apache Flink 1.10 to Hologres in real time.

Prerequisites

  • A Hologres instance is purchased and connected to a development tool. In this topic, the PostgreSQL client is used to connect to Hologres. For more information, see Use the PostgreSQL client to connect to Hologres.

  • A Flink cluster is created. You can download a binary file from the Flink official website and create a Flink cluster that is deployed in Standalone mode. For more information about how to create a Flink cluster, see Local Installation of Flink. In this topic, a Flink 1.10 cluster is created.

  • A connection for importing data from Flink is created based on your business requirements.

Procedure

  1. Create a Hologres result table.

    After the Hologres instance is connected to the development tool, you must create a result table. The result table stores data that is written in real time. Sample statements:

    begin;
    create table order_details(user_id bigint, user_name text, item_id bigint, item_name text, price numeric(38, 2), province text, city text, ip text, longitude text, latitude text, sale_timestamp timestamptz not null, primary key(user_id, item_id));
    call set_table_property ('order_details', 'segment_key', 'sale_timestamp');
    commit;
  2. Download and compile the Java ARchive (JAR) file of Flink.

    1. Download and install the JAR file named hologres-flink-connector-1.10-jar-with-dependencies.jar on which the Hologres connector depends. Sample statement:

      mvn install:install-file -Dfile=hologres-flink-connector-1.10-jar-with-dependencies.jar -DgroupId=org.apache.flink -DartifactId=hologres-flink-connector -Dversion=1.10 -Dpackaging=jar -DgeneratePom=true
    2. Visit the official sample library of Hologres. Then, download and compile the JAR file. Sample statements:

      git clone https://github.com/hologres/hologres-flink-examples.git
      cd hologres-flink-examples
      git checkout -b example
      mvn package -DskipTests
  3. Submit a Flink deployment.

    After you compile the JAR file, you must configure and submit a Flink deployment. Sample statement:

    Note

    In the sample statement, a command is used to submit the Flink deployment. You can also submit the Flink deployment on the Flink web page.

    flink run -c io.hologres.flink.example.HologresSinkExample ../hologres-flink-example/target/hologres-flink-examples-1.0.0-jar-with-dependencies.jar --endpoint $ENDPOINT --username $USERNAME --password $PASSWORD --database $DATABASE --tablename order_details

    The following table describes the parameters in the syntax.

    Parameter

    Description

    Example

    endpoint

    The endpoint of the Hologres instance.

    You can view the endpoint of the Hologres instance in the Network Information section of the Instance Details page in the Hologres console.

    Note

    If your Flink cluster is an on-premises Flink cluster, you must use the public endpoint to access the Hologres instance. If your Flink cluster is deployed in a virtual private cloud (VPC), you must use the VPC endpoint to access the Hologres instance.

    ssseeee-cn-hangzhou.hologres.aliyuncs.com:80

    username

    The AccessKey ID of the Alibaba Cloud account.

    You can obtain the AccessKey ID from the Security Management page.

    None

    password

    The AccessKey secret of the Alibaba Cloud account.

    You can obtain the AccessKey secret from the Security Management page.

    None

    database

    The name of the Hologres database to be connected to.

    hologres_demo

    tablename

    The name of the Hologres table to which the data is imported.

    order_details

  4. Query the synchronized data in Hologres.

    After you finish the preceding operations, you can query the data that is written to Hologres in real time. Sample statement:

    select count(1) from order_details;
    select item_id, sum(price) as total from order_details group by item_id limit 10;