This topic provides the DDL syntax that is used to create a Hologres result table, describes the parameters in the WITH clause, and provides streaming semantics and data type mappings.

Note The Hologres connector can be used to store data of a result table for streaming deployments and batch deployments.

Background information

  • Introduction to Hologres

    Hologres is a real-time interactive analytics service that is developed by Alibaba Cloud. Hologres is compatible with the PostgreSQL protocol and is seamlessly integrated with the big data ecosystem. Hologres allows you to analyze and process up to petabytes of data with high concurrency and low latency. Hologres allows you to use existing business intelligence (BI) tools to perform multidimensional analysis and explore your business in an efficient manner.

  • Features
    The following table describes the features that are supported by the Hologres connector.
    Feature Description
    Streaming semantics Allows the system to write changelogs to a Hologres result table.
    Merge data into a wide table Updates only data in specific fields instead of data in an entire row.
    Create Hologres tables as result tables in the CREATE TABLE AS and CREATE DATABASE AS statements Synchronizes data from a database or a single table in real time and synchronizes schema changes of each table to the related Hologres table in real time during database synchronization or table synchronization.

Prerequisites

A Hologres table is created. For more information, see Manage an internal table.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Hologres connector.
  • You cannot use the Hologres connector to write result data to Hologres external tables. For more information about Hologres external tables, see Manage a foreign table.

DDL syntax

CREATE TABLE hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourUsername>',
  'password'='<yourPassword>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);

Parameters in the WITH clause

Note Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.11-flink-1.13 or later supports all parameters that start with jdbc.
Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to hologres.
dbname The name of the database. Yes N/A.
tablename The name of the table.
Note If the public schema is not used, you must set tablename to schema.tableName.
Yes N/A.
username The username that is used to access the database. You must enter the AccessKey ID of your Alibaba Cloud account. Yes N/A.
password The password that is used to access the database. You must enter the AccessKey secret of your Alibaba Cloud account. Yes N/A.
endpoint The endpoint of the Hologres instance. Yes Format: <ip>:<port>.
sdkMode The SDK mode. No Valid values:
  • jdbc: Data is written by using a Java Database Connectivity (JDBC) driver. This is the default value.
  • jdbc_copy: Data is written in fixed copy mode. Fixed copy is a new feature that is supported in Hologres 1.3. If data is written by using a JDBC driver, data is inserted during data writing. In fixed copy mode, data is written in streaming mode and is not written in batches. Therefore, data writing in fixed copy mode provides higher throughput, lower data latency, and less client memory consumption than data writing by using a JDBC driver.
  • rpc: Data is written in remote procedure call (RPC) mode. If you set this parameter to rpc, the effect is similar to the effect when you set the useRpcMode parameter to true.
  • jdbc_fixed: Data is written in fixed JDBC mode. This feature is in public review. Only Hologres 1.3 or later supports this feature. Compared with data writing by using a JDBC driver, data writing in fixed JDBC mode does not occupy connections.
Note Only Realtime Compute for Apache Flink that uses VVR 6.X or later supports this parameter.
field_delimiter The field delimiter. The Hologres streaming sink can split a string into arrays based on the value of this parameter and import the arrays to Hologres. No Default value: "\u0002".
mutatetype The data writing mode. For more information, see Streaming semantics. No Default value: insertorignore.
partitionrouter Specifies whether to write data to a partitioned table. No Default value: false.
ignoredelete Specifies whether to ignore retraction messages. No Default value: true.
Note This parameter takes effect only when the streaming semantics is used.
createparttable Specifies whether to automatically create a partitioned table to which data is written based on partition values.
Note If the partition values contain hyphens (-), partitioned tables cannot be automatically created.
No
  • false: Partitioned tables are not automatically created. This is the default value.
  • true: Partitioned tables are automatically created.
Note
  • Only VVR whose version is later than 2.1 supports this parameter.
  • Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partitioned table is created. Use this parameter with caution.
useRpcMode Specifies whether to connect to the Hologres connector by using RPC. No Valid values:
  • true: Connect to the Hologres connector by using RPC.

    If RPC is used, the number of SQL connections is reduced.

  • false: Connect to the Hologres connector by using a JDBC driver. This is the default value.

    JDBC drivers require SQL connections. This increases the number of JDBC connections.

connectionSize The maximum number of connections that can be created in the JDBC connection pool for a Flink deployment.

If the deployment has poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.

No Default value: 3.
jdbcWriteBatchSize The maximum number of rows of data that can be processed by a Hologres streaming sink node at the same time when a JDBC driver is used. No Default value: 256.
jdbcWriteFlushInterval The maximum period of time that is required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used. No Default value: 10000. Unit: milliseconds.
ignoreNullWhenUpdate Specifies whether to ignore the null value that is written in the data if the mutatetype='insertOrUpdate' setting is used. No Valid values:
  • false: Write the null value to a Hologres result table. This is the default value.
  • true: Ignore the null value that is written in the data.
Note Only Realtime Compute for Apache Flink that uses VVR 4.0 or later supports this parameter.
connectionPoolName The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool. No By default, each table uses its own connection pool. If the same connection pool is configured for multiple tables, the value of the connectorSize parameter for the tables that use the same connection pool must be the same.
Note Only Realtime Compute for Apache Flink that uses VVR 4.0.12 or later supports this parameter.
jdbcWriteBatchByteSize The maximum number of bytes that can be processed by a Hologres streaming sink node at the same time. No Default value: 2097152 (2 × 1024 × 1024). The default value indicates 2 MB.
Note If you configure both the jdbcWriteBatchSize parameter and the jdbcWriteFlushInterval parameter, the system writes data to a result table when the data processed by a Hologres streaming sink node meets the condition that is specified by one of the two parameters.
jdbcWriteFlushInterval The interval at which a flush operation is performed for the data entries that are processed by a Hologres streaming sink node at the same time. No Default value: 10000. Unit: milliseconds.
Note If you configure both the jdbcWriteBatchSize parameter and the jdbcWriteFlushInterval parameter, the system writes data to a result table when the data processed by a Hologres streaming sink node meets the condition that is specified by one of the two parameters.
jdbcEnableDefaultForNotNullColumn Specifies whether a null value can be written to a non-null column for which the default value is not configured in the Hologres table. No Valid values:
  • true: Null values can be written. This is the default value.
    If you set this parameter to true, the system converts the null value into a default value based on the data type of a column in the following scenarios when the system writes the null value to the non-null column for which the default value is not configured in the Hologres table:
    • If the column is of the STRING type, the column is left empty.
    • If the column is of the NUMBER type, the null value is converted into 0.
    • If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted into 1970-01-01 00:00:00.
  • false: Null values cannot be written.
jdbcRetryCount The maximum number of retries allowed to write and query data if a connection failure occurs. No Default value: 10.
jdbcRetrySleepInitMs The fixed waiting duration for each retry.

The interval at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

No Default value: 1000. Unit: milliseconds.
jdbcRetrySleepStepMs The accumulated waiting duration for each retry.

The interval at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

No Default value: 5000. Unit: milliseconds.
jdbcConnectionMaxIdleMs The maximum duration for which the JDBC connection is idle.

If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released.

No Default value: 60000. Unit: milliseconds.
jdbcMetaCacheTTL The maximum time for storing the TableSchema information in the cache. No Default value: 60000. Unit: milliseconds.
jdbcMetaAutoRefreshFactor If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, the system automatically refreshes the cache. The time for triggering an automatic refresh of the cache is calculated by using the following formula: Value of the jdbcMetaCacheTTL parameter/Value of the jdbcMetaAutoRefreshFactor parameter. No Default value: 4.

Remaining time for storing data in the cache = Value of the jdbcMetaCacheTTL parameter - Time for which data is stored in the cache.

After the cache is automatically refreshed, the time for which data is stored in the cache is recalculated from 0.

Streaming semantics

Stream processing, which is also known as streaming data or streaming event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or streaming events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause data loss.

Semantics can be classified into the following types based on the configurations of the Hologres streaming sink that you use and the attributes of the Hologres table:
  • Exactly-once: The system processes data or events only once even if multiple faults occur.
  • At-least-once: If streaming data or streaming events that you want to process are lost, the system transfers the data again from the transmission source. Therefore, the system may process streaming data or streaming events multiple times. If the first retry is successful, no subsequent retries are required.
When you use streaming semantics in a Hologres result table, take note of the following points:
  • If no primary key is configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.
  • If the primary key is configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must configure the mutatetype parameter to determine how the result table is updated. The mutatetype parameter has the following valid values:
    • insertorignore: retains the first record and discards the subsequent records. This is the default value.
    • insertorreplace: retains the first record and replaces the existing record in an entire row with the one that arrives later.
    • insertorupdate: retains the first record and updates specific columns of the existing data. For example, a table has fields a, b, c, and d. The a field is the primary key. Only data in the a and b fields are written to Hologres. If duplicate primary keys exist, the system updates only data in the b field. Data in the c and d fields remains unchanged.
    Note
    • If the mutatetype parameter is set to insertorupdate or insertorreplace, the system updates data based on the primary key.
    • The number of columns in the result table that is defined by Flink can be different from the number of columns in the Hologres physical table. Make sure that the null value can be used to pad the missing columns. Otherwise, an error is returned.
  • By default, the Hologres streaming sink node can import data to only one non-partitioned table. If the sink node imports data to the parent table of a partitioned table, data queries fail even if the data import is successful. To enable data to be automatically written to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following points:
    • You must set tablename to the name of the parent table.
    • If no partitioned table is created, you must configure createparttable=true in the WITH clause to enable the automatic creation of a partitioned table. Otherwise, data import fails.

Merge data into a wide table

If you want to write data from multiple streaming deployments to one Hologres wide table, you can merge the data into a wide table. For example,

one Flink data stream contains fields A, B, and C, and the other contains fields A, D, and E. The Hologres wide table WIDE_TABLE contains fields A, B, C, D, and E, among which field A is the primary key. You can perform the following operations:

  1. Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields A, B, and C, and the other table is used to declare fields A, D, and E. Map the two result tables to the Hologres wide table named WIDE_TABLE.
  2. Parameter settings of the two Hologres result tables:
    • Set mutatetype to insertorupdate. This indicates that data is updated based on the primary key.
    • Set ignoredelete to true. This prevents retraction messages from generating Delete requests.
  3. Insert data from the two Flink data streams into the two result tables.
Note Take note of the following limits:
  • The wide table must have a primary key.
  • The data of each stream must contain all the fields in the primary key.
  • If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.

Create Hologres tables as result tables in the CREATE TABLE AS and CREATE DATABASE AS statements

You can execute the CREATE TABLE AS or CREATE DATABASE AS statements to synchronize data from a single table or an entire database in real time. The changes to the schema of the source table can also be synchronized to the related Hologres table in real time during database or table synchronization.

If the schema of the source table is changed, Flink synchronizes the changes to the table schema to the related Hologres table and then writes data to the Hologres table. Flink automatically completes this process. For more information, see CREATE TABLE AS statement, CREATE DATABASE AS statement, and Ingest data into data warehouses in real time.

Data type mappings

Data type of Hologres Data type of Flink
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP

Sample code

CREATE TEMPORARY TABLE datagen_source(
  name varchar,
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar,
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;