This topic provides the DDL syntax that is used to create a Hologres result table, describes the parameters in the WITH clause, and provides streaming semantics and data type mappings.

Note A Hologres connector can be used to store data of a result table for streaming jobs and batch jobs.

What is Hologres?

Hologres is a real-time interactive analytics service that is developed by Alibaba Cloud. It is compatible with the PostgreSQL protocol and seamlessly integrated with the big data ecosystem. Hologres allows you to analyze and process petabytes of data with high concurrency and low latency. Hologres provides an easy method for you to use the existing business intelligence (BI) tools to perform multidimensional analysis and explore your business.

Prerequisites

A Hologres table is created. For more information, see Manage an internal table.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Hologres connectors.
  • You cannot use Hologres connectors to write result data to Hologres external tables. For more information about Hologres external tables, see Manage a foreign table.

DDL syntax

create table hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourUsername>',
  'password'='<yourPassword>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to hologres.
dbname The name of the database. Yes N/A.
tablename The name of the table.
Note If the public schema is not used, you must set tablename to schema.tableName.
Yes N/A.
username The username that is used to access the database. You must enter the AccessKey ID of your Alibaba Cloud account. Yes N/A.
password The password that is used to access the database. You must enter the AccessKey secret of your Alibaba Cloud account. Yes N/A.
endpoint The endpoint of the Hologres instance. Yes Format: <ip>:<port>.
field_delimiter The field delimiter. The Hologres streaming sink can split a string into arrays based on the value of this parameter and import the arrays into Hologres. No Default value: "\u0002".
mutatetype The data writing mode. For more information, see Streaming semantics. No Default value: insertorignore.
partitionrouter Specifies whether to write data to a partitioned table. No Default value: false.
ignoredelete Specifies whether to ignore retraction messages. No Default value: true.
Note This parameter takes effect only when the streaming semantics is used.
createparttable Specifies whether to automatically create a partitioned table to which data is written based on partition values.
Note If the partition values contain hyphens (-), partitioned tables cannot be automatically created.
No
  • false: Partitioned tables are not automatically created. This is the default value.
  • true: Partitioned tables are automatically created.
Note
  • Only VVR that is later than version 2.1 supports this parameter.
  • Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partitioned table is created. Use this parameter with caution.
useRpcMode VVP 2.4.0 and later connect to Hologres by using a Java Database Connectivity (JDBC) driver. VVP that is earlier than version 2.4.0 connects to Hologres by using remote procedure call (RPC). No JDBC drivers require SQL connections, which increases the number of JDBC connections. If you want to reduce the number of SQL connections, you can set this parameter to true.
Notice Only VVP 2.4.0 and later support this parameter.
connectionSize The size of the JDBC connection pool created in a Flink job.

If the job has poor performance, we recommend that you increase the size of the JDBC connection pool. The size of the JDBC connection pool is proportional to the data throughput.

No Default value: 3.
Notice Only VVP 2.4.0 and later support this parameter.
jdbcWriteBatchSize The maximum number of rows of data that a Hologres streaming sink node can write to Hologres at a time when a JDBC driver is used. No Default value: 256.
Notice Only VVP 2.4.0 and later support this parameter.
jdbcWriteFlushInterval The maximum period of time required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at a time when a JDBC driver is used. No Default value: 10000. Unit: milliseconds.
Notice Only VVP 2.4.0 and later support this parameter.

Streaming semantics

Stream processing, also known as streaming data or event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause data loss.

Semantics can be classified into the following types based on the configurations of the Hologres streaming sink that you use and the attributes of the Hologres table:
  • Exactly-once: The system processes data or events only once even if multiple faults occur.
  • At-least-once: If streaming data or events to be processed are lost, the system transfers the data again from the transmission source. Therefore, the system may process streaming data or events for multiple times. If the first retry succeeds, no further retries are required.
When you use streaming semantics in a Hologres result table, take note of the following points:
  • If no primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.
  • If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must set the mutatetype parameter to determine how the result table is updated. This parameter has the following valid values:
    • insertorignore: retains the first record and discards the subsequent records. This is the default value.
    • insertorreplace: replaces existing data records in an entire row.
    • insertorupdate: updates existing data in specified fields. For example, a table has fields a, b, c, and d. The a field is the primary key. Only data in the a and b fields are written to Hologres. If duplicate primary keys exist, the system updates data only in the b field. Data in the c and d fields remains unchanged.
    Note
    • If the mutatetype parameter is set to insertorupdate or insertorreplace, the system updates data based on the primary key.
    • The number of columns in the result table defined by Flink can be different from the number of columns in the Hologres physical table. Make sure that the value Null can be used to pad the missing columns. Otherwise, an error is returned.
  • By default, the Hologres streaming sink can import data into only one non-partitioned table. If the sink imports data into the parent table of a partitioned table, data queries fail even if data import succeeds. To enable data to be automatically written to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following points:
    • You must set tablename to the name of the parent table.
    • If no partitioned table is created, you must configure createparttable=true in the WITH clause to support the automatic creation of a partitioned table. Otherwise, data import fails.

Merge data into a wide table

If you need to write data from multiple streaming jobs to one Hologres wide table, you can merge the data into a wide table. For example,

one Flink data stream contains fields A, B, and C, the other contains fields A, D, and E. The Hologres wide table WIDE_TABLE contains fields A, B, C, D, and E, among which field A is the primary key. You can perform the following operations:

  1. Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields A, B, and C, and the other is used to declare fields A, D, and E. Map the two result tables to the Hologres wide table WIDE_TABLE.
  2. Parameter settings of the two Hologres result tables:
    • Set mutatetype to insertorupdate. This indicates that data is updated based on the primary key.
    • Set ignoredelete to true. This prevents retraction messages from generating Delete requests.
  3. Insert data from the two Flink data streams into the two result tables.
Note Limits:
  • The wide table must have a primary key.
  • The data of each stream must contain all the fields in the primary key.
  • If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.

Data type mapping

Data type of Hologres Data type of Flink
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE PRECISION>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP