This topic provides the DDL syntax that is used to create a Hologres dimension table, describes the parameters in the WITH clause and cache parameters, and provides sample code.

Note Realtime Compute for Apache Flink cannot write result data to Hologres external tables by using the Hologres connector. For more information about Hologres external tables, see Manage a foreign table.

What is Hologres?

Hologres is a real-time interactive analytics service that is developed by Alibaba Cloud. Hologres is compatible with the PostgreSQL protocol and is seamlessly integrated with the big data ecosystem. Hologres allows you to analyze and process up to petabytes of data with high concurrency and low latency. Hologres allows you to use existing business intelligence (BI) tools to perform multidimensional analysis and explore your business in an efficient manner.

Prerequisites

A Hologres table is created. For more information, see Manage an internal table.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Hologres connector.
  • We recommend that you use row-oriented storage to create a Hologres dimension table. Column-oriented storage consumes a large number of performance overheads for point queries.
    When you use row-oriented storage to create a Hologres dimension table, you must set the primary key to clustering key. The following statements show an example:
    begin;
    create table test(a int primary key, b text, c text, d float8, e int8);
    call set_table_property('test', 'orientation', 'row');
    call set_table_property('test', 'clustering_key', 'a');
    commit;
  • When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause.
  • The Hologres connector of Flink that uses VVR 4.0 or later supports join operations that are not based on the primary key fields of dimension tables. The Hologres connector of Flink that uses VVR earlier than 4.0 supports only join operations that are based on the primary key fields of dimension tables.

DDL syntax

CREATE TABLE hologres_dim(
  id INT,
  len INT,
  content VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourUsername>',
  'password'='<yourPassword>',
  'endpoint'='<yourEndpoint>'
);

Parameters in the WITH clause

Note Only Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.0.11 or later supports parameters that start with jdbc.
Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to hologres.
endPoint The endpoint of Hologres. Yes For more information, see Endpoints for connecting to Hologres.
tablename The name of the table.
Note If the public schema is not used, you must set tableName to schema.tableName.
Yes N/A.
dbname The name of the database. Yes N/A.
username The username that is used to access the database. You must enter the AccessKey ID of your Alibaba Cloud account. Yes N/A.
password The password that is used to access the database. You must enter the AccessKey secret of your Alibaba Cloud account. No N/A.
useRpcMode Specifies whether to connect to the Hologres connector by using remote procedure call (RPC). No Valid values:
  • true: Connect to the Hologres connector by using RPC.

    If RPC is used, the number of SQL connections is reduced.

  • false: Connect to the Hologres connector by using a Java Database Connectivity (JDBC) driver. This is the default value.

    JDBC drivers require SQL connections. This increases the number of JDBC connections.

connectionSize The maximum number of connections that can be created in a JDBC connection pool for a Flink job.

If the job has poor performance, we recommend that you increase the number of connections in the connection pool. The size of the JDBC connection pool is proportional to the data throughput.

No Default value: 3.
connectionPoolName The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool. No By default, each table uses its own connection pool. If the same connection pool is configured for multiple tables, the value of the connectorSize parameter for the tables that use the same connection pool must be the same.
Note Only Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.0.12 or later supports this parameter.
jdbcReadBatchSize The maximum number of data records that can be processed at the same time for a point query in a Hologres dimension table. No Default value: 128.
jdbcReadBatchQueueSize The maximum number of queued requests allowed in a thread to perform a point query in a Hologres dimension table. No Default value: 256.
jdbcScanFetchSize The number of data records that can be processed at the same time by calling the scan operation when you perform a one-to-many table join. In a one-to-many table join, no complete primary key is used. No Default value: 256.
jdbcScanTimeoutSeconds The maximum amount of time to wait for a scan operation to complete. No Default value: 60. Unit: seconds.
jdbcRetryCount The maximum number of retries allowed to read and write data if a connection failure occurs. No Default value: 10.
jdbcRetrySleepInitMs The fixed waiting duration for each retry.

The interval at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

No Default value: 1000. Unit: milliseconds.
jdbcRetrySleepStepMs The accumulated waiting duration for each retry.

The interval at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

No Default value: 5000. Unit: milliseconds.
jdbcConnectionMaxIdleMs The maximum duration for which the JDBC connection is idle.

If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is disconnected and released.

No Default value: 60000. Unit: milliseconds.
jdbcMetaCacheTTL The maximum time required for storing the TableSchema information in the cache. No Default value: 60000. Unit: milliseconds.
jdbcMetaAutoRefreshFactor The time required for the system to automatically refresh the cache. If the remaining time for storing data in the cache is less than the value of the jdbcMetaCacheTTL parameter or the jdbcMetaAutoRefreshFactor parameter, the system automatically refreshes the cache. No Default value: 4.

Remaining time for storing data in the cache = Maximum time required for storing data in the cache - Time for which data is stored in the cache

After the cache is automatically refreshed, the time for which data is stored in the cache is recalculated from 0.

Cache parameters

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • LRU: indicates that only the specified data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

  • None: indicates that data is not cached.
Note The default value of the cache parameter varies based on the VVR version:
  • VVR 4.x and later: None.
  • VVR versions earlier than 4.x: LRU.
cacheSize The maximum number of rows of data that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000.
cacheTTLMs The interval at which the system refreshes the cache. No The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
async Specifies whether to enable asynchronous data synchronization. No Valid values:
  • true: Asynchronous data synchronization is enabled.
  • false: Asynchronous data synchronization is disabled. This is the default value.
Note By default, data that is synchronized in asynchronous mode is unordered.

Data type mappings

Data type of Hologres Data type of Flink
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE PRECISION>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP
Note The Hologres connector supports only the conversion of the Hologres data types that are listed in the table.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;