This topic describes how to create a Hologres result table. It also describes the parameters in the WITH clause, streaming semantics, and data type mapping involved when you create a Hologres result table.

Notice
  • This topic applies only to Blink 3.6.0 and later. If your Blink version is earlier than 3.6.0, you can
  • We recommend that you use Hologres 0.7 or later.
  • Hologres writes data asynchronously. Therefore, you must add blink.checkpoint.fail_on_checkpoint_error=true to the code so that a failover is triggered only when a job exception occurs.

Introduction to Hologres

Hologres is compatible with the PostgreSQL protocol and integrates seamlessly with the big data ecosystem. Hologres supports real-time analysis and the processing of petabytes of data with high concurrency and low latency. This allows you to use existing Business Intelligence (BI) tools to easily perform multidimensional analysis and business exploration.

Syntax

In Realtime Compute for Apache Flink, you can use Hologres to store output data. The following code shows an example:
create table Hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  type='hologres',
  dbname='<yourDbname>',
  tablename='<yourTablename>',
  username='<yourUsername>',
  password='<yourPassword>',
  endpoint='<yourEndpoint>',
  field_delimiter='|' -- This parameter is optional. 
);

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the result table. Yes Set the value to hologres.
dbname The name of the database.
Note If the public schema is not used, you must set tableName to schema.tableName.
Yes N/A.
tablename The name of the table. Yes N/A.
username The username that is used to access the database. Yes N/A.
password The password that is used to access the database. Yes N/A.
endpoint The virtual private cloud (VPC) endpoint of Hologres. Yes For more information, see Endpoints for connecting to Hologres.
field_delimiter The delimiter used between rows when data is being exported.
Notice Do not insert delimiters in data. This parameter takes effect only when the bulkload semantics is used.
No Default value: "\u0002".
mutateType The streaming write semantics. For more information, see Streaming data semantics. No Default value: insertorignore.
partitionrouter Specifies whether to write data to a partitioned table. No Default value: false.
Note For Blink 3.6.X, the Hologres streaming sink cannot automatically create partitioned tables. Before Realtime Compute for Apache Flink writes data to a partitioned table, you must manually create a sub-table in Hologres.
ignoredelete Specifies whether to ignore retraction messages. No Default value: false.
Note This parameter takes effect only after the streaming data semantics is used.
createPartTable Specifies whether to automatically create a non-existent partitioned table to which data is written based on partition values.
Note If the partition values contain hyphens (-), partitioned tables cannot be automatically created.
No
  • false: Partitioned tables cannot be automatically created. This is the default value.
  • true: Partitioned tables can be automatically created.
Note Only Blink versions later than 3.7 support this parameter.

Streaming data semantics

Stream processing, also known as streaming data or event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause a data loss.

Semantics can be classified into the following types based on the configurations of the Hologres streaming sink that you use and the attributes of the Hologres table:
  • Exactly-once: The system processes data or events only once even multiple faults occur.
  • At-least-once: If streaming data or events to be processed are lost, the system transfers the data again from the transmission source. Therefore, the system may process streaming data or events for multiple times. If the first retry succeeds, no further retries are required.
When you use streaming semantics in a Hologres result table, take note of the following points:
  • If no primary keys have been configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.
  • If primary keys have been configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must set the mutationType parameter to determine how the result table is updated. This parameter has the following valid values:
    • insertorignore (default value): Hologres keeps the first record and discards the subsequent records.
    • insertorreplace: Hologres completely replaces the existing record with the one that arrives later.
    • insertorupdate: Hologres partially replaces the existing record with the one that arrives later.
    Note
    • If the mutationType parameter is set to insertorupdate or insertorreplace, the system updates data based on the primary key.
    • The number of columns in the result table defined by Blink can be different from the number of columns in the Hologres physical table. Make sure that the value null can be used to fill the missing columns. Otherwise, an error is returned.
  • By default, the Hologres streaming sink can import data to only one non-partitioned table. If the sink imports data into the parent table of a partitioned table, data queries fail even if data import succeeds. To enable data to be automatically written to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following points:
    • You must set tablename to the name of the parent table.
    • Blink connectors do not automatically create partitioned tables. We recommend that you create a partitioned table before you import data. Otherwise, the data fails to be imported.

Merge data into a wide table

If you need to write data from multiple streaming jobs to one Hologres wide table, you can merge the data into a wide table.

For example, one Flink data stream contains fields A, B, and C, the other contains fields A, D, and E. The Hologres wide table WIDE_TABLE contains fields A, B, C, D, and E, among which field A is the primary key. You can perform the following operations:

  1. Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields A, B, and C, and the other is used to declare fields A, D, and E. Map the two result tables to the Hologres wide table WIDE_TABLE.
  2. Parameter settings of the two Hologres result tables:
    • Set mutatetype to insertorupdate. This indicates that data is updated based on the primary key.
    • Set ignoredelete to true. This prevents retraction messages from generating Delete requests.
  3. Insert data from the two Flink data streams into the two result tables.
Note Limits:
  • The wide table must have a primary key.
  • The data of each stream must contain all the fields in the primary key.
  • If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.

Data type mapping

Hologres BLINK
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<FLOAT>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<VARCHAR>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP