This topic describes how to create a Hologres result table. It also describes the parameters in the WITH clause, streaming semantics, and data type mapping involved when you create a Hologres result table.
- This topic applies only to Blink 3.6.0 and later. If your Blink version is earlier
than 3.6.0, you can
- Update your Blink version to 3.6.0 or later. For more information, see Manage Blink versions of a Realtime Compute for Apache Flink cluster deployed in exclusive mode.
- Submit a ticket to obtain the required JAR files for the update.
- We recommend that you use Hologres 0.7 or later.
- Hologres writes data asynchronously. Therefore, you must add blink.checkpoint.fail_on_checkpoint_error=true to the code so that a failover is triggered only when a job exception occurs.
Introduction to Hologres
Hologres is compatible with the PostgreSQL protocol and integrates seamlessly with the big data ecosystem. Hologres supports real-time analysis and the processing of petabytes of data with high concurrency and low latency. This allows you to use existing Business Intelligence (BI) tools to easily perform multidimensional analysis and business exploration.
Syntax
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>',
tablename='<yourTablename>',
username='<yourUsername>',
password='<yourPassword>',
endpoint='<yourEndpoint>',
field_delimiter='|' -- This parameter is optional.
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
type | The type of the result table. | Yes | Set the value to hologres. |
dbname | The name of the database.
Note If the public schema is not used, you must set tableName to schema.tableName.
|
Yes | N/A. |
tablename | The name of the table. | Yes | N/A. |
username | The username that is used to access the database. | Yes | N/A. |
password | The password that is used to access the database. | Yes | N/A. |
endpoint | The virtual private cloud (VPC) endpoint of Hologres. | Yes | For more information, see Endpoints for connecting to Hologres. |
field_delimiter | The delimiter used between rows when data is being exported.
Notice Do not insert delimiters in data. This parameter takes effect only when the bulkload semantics is used.
|
No | Default value: "\u0002". |
mutateType | The streaming write semantics. For more information, see Streaming data semantics. | No | Default value: insertorignore. |
partitionrouter | Specifies whether to write data to a partitioned table. | No | Default value: false.
Note For Blink 3.6.X, the Hologres streaming sink cannot automatically create partitioned
tables. Before Realtime Compute for Apache Flink writes data to a partitioned table,
you must manually create a sub-table in Hologres.
|
ignoredelete | Specifies whether to ignore retraction messages. | No | Default value: false.
Note This parameter takes effect only after the streaming data semantics is used.
|
createPartTable | Specifies whether to automatically create a non-existent partitioned table to which
data is written based on partition values.
Note If the partition values contain hyphens (-), partitioned tables cannot be automatically
created.
|
No |
Note Only Blink versions later than 3.7 support this parameter.
|
Streaming data semantics
Stream processing, also known as streaming data or event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause a data loss.
- Exactly-once: The system processes data or events only once even multiple faults occur.
- At-least-once: If streaming data or events to be processed are lost, the system transfers the data again from the transmission source. Therefore, the system may process streaming data or events for multiple times. If the first retry succeeds, no further retries are required.
- If no primary keys have been configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.
- If primary keys have been configured in the Hologres physical table, the Hologres
streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key
are written to the table, you must set the mutationType parameter to determine how the result table is updated. This parameter has the following
valid values:
- insertorignore (default value): Hologres keeps the first record and discards the subsequent records.
- insertorreplace: Hologres completely replaces the existing record with the one that arrives later.
- insertorupdate: Hologres partially replaces the existing record with the one that arrives later.
Note- If the mutationType parameter is set to insertorupdate or insertorreplace, the system updates data based on the primary key.
- The number of columns in the result table defined by Blink can be different from the number of columns in the Hologres physical table. Make sure that the value null can be used to fill the missing columns. Otherwise, an error is returned.
- By default, the Hologres streaming sink can import data to only one non-partitioned
table. If the sink imports data into the parent table of a partitioned table, data
queries fail even if data import succeeds. To enable data to be automatically written
to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following points:
- You must set tablename to the name of the parent table.
- Blink connectors do not automatically create partitioned tables. We recommend that you create a partitioned table before you import data. Otherwise, the data fails to be imported.
Merge data into a wide table
If you need to write data from multiple streaming jobs to one Hologres wide table, you can merge the data into a wide table.
For example, one Flink data stream contains fields A, B, and C, the other contains fields A, D, and E. The Hologres wide table WIDE_TABLE contains fields A, B, C, D, and E, among which field A is the primary key. You can perform the following operations:
- Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields A, B, and C, and the other is used to declare fields A, D, and E. Map the two result tables to the Hologres wide table WIDE_TABLE.
- Parameter settings of the two Hologres result tables:
- Set mutatetype to insertorupdate. This indicates that data is updated based on the primary key.
- Set ignoredelete to true. This prevents retraction messages from generating Delete requests.
- Insert data from the two Flink data streams into the two result tables.
- The wide table must have a primary key.
- The data of each stream must contain all the fields in the primary key.
- If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.
Data type mapping
Hologres | BLINK |
---|---|
INT | INT |
INT[] | ARRAY<INT> |
BIGINT | BIGINT |
BIGINT[] | ARRAY<BIGINT> |
REAL | FLOAT |
REAL[] | ARRAY<FLOAT> |
DOUBLE PRECISION | DOUBLE |
DOUBLE PRECISION[] | ARRAY<DOUBLE> |
BOOLEAN | BOOLEAN |
BOOLEAN[] | ARRAY<BOOLEAN> |
TEXT | VARCHAR |
TEXT[] | ARRAY<VARCHAR> |
NUMERIC | DECIMAL |
DATE | DATE |
TIMESTAMP WITH TIMEZONE | TIMESTAMP |