This topic provides the DDL syntax that is used to create a Hologres source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note A Hologres connector can be used to store data of a source table for streaming jobs and batch jobs.

What is Hologres?

Hologres is a real-time interactive analytics service that is developed by Alibaba Cloud. It is compatible with the PostgreSQL protocol and closely connected to the big data ecosystem. Hologres allows you to analyze and process petabytes of data with high parallelism and low latency. Hologres provides an easy method for you to use the existing business intelligence (BI) tools to perform multidimensional analysis and explore your business.

Prerequisites

A Hologres table is created. For more information, see Manage an internal table.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Hologres connectors.
  • Hologres connectors cannot access Hologres external tables. For more information about Hologres external tables, see Manage a foreign table.
  • By default, Flink performs a full table scan once only to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Flink does not read the data that is appended to the Hologres source table. Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information about how to consume Hologres data in real time, see Consume Hologres data in real time.

DDL syntax

create table hologres_source(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);
Note Flink does not allow you to define computed columns in source tables.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to hologres.
dbname The name of the database. Yes N/A.
tablename The name of the table.
Note If the public schema is not used, you must set the tablename parameter in the schema.tableName format.
Yes N/A.
username The username that is used to log on to the database. You must enter the AccessKey ID of your Alibaba Cloud account. Yes N/A.
password The password that is used to log on to the database. You must enter the AccessKey secret of your Alibaba Cloud account. Yes N/A.
endpoint The endpoint of Hologres. Yes For more information, see Endpoints for connecting to Hologres.
field_delimiter The delimiter used between rows when data is being exported.
Note This parameter is valid only when bulkread is set to true.
No Default value: "\u0002".
binlog Specifies whether to consume binary log data. No Valid values:
  • true: Binary log data is consumed.
  • false: Binary log data is not consumed. This is the default value.
Notice Only VVP 2.4.0 and later support this parameter.
binlogMaxRetryTimes The number of retries after Flink fails to read the binary log data. No Default value: 60.
Notice Only VVP 2.4.0 and later support this parameter.
binlogRetryIntervalMs The interval between retires after Flink fails to read the binary log data. No Default value: 2000. Unit: milliseconds.
Notice Only VVP 2.4.0 and later support this parameter.
binlogBatchReadSize The number of rows in which the binary log data is read at a time. No Default value: 16.
Notice Only VVP 2.4.0 and later support this parameter.
cdcMode Specifies whether to read binary log data in Change Data Capture (CDC) mode. No Valid values:
  • true: Binary log data is read in CDC mode.
  • false: Binary log data is read in non-CDC mode. This is the default value.
Notice Only VVP 2.4.0 and later support this parameter.
startTime The start time when Hologres data is consumed. If this parameter is not specified and jobs are not resumed from the state, Flink starts to consume the Hologres data from the earliest generated binary log. No The format is yyyy-MM-dd hh:mm:ss.

Sample code

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink
SELECT 
   name, age, birthday
from hologres_source;

Data type mapping

The following table describes the data type mappings between Hologres and Flink fields. We recommend that you declare the mappings in a DDL statement.
Data type of Hologres Data type of Flink
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE PRECISION>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE (alias: TIMESTAMPTZ) TIMESTAMP

Consume Hologres data in real time

Flink that uses Ververica Platform (VVP) 2.4.0 or later can consume the binary log data of Hologres in real time. This section describes the details.
  • Limits
    • You are not allowed to change the property of an existing table to enable the binary logging feature. You must create another table.
    • Partitioned tables do not support the binary logging feature. If you want to enable this feature, you must use a non-partitioned table.
    • Flink cannot consume data of the TIMESTAMP type in real time. Therefore, when you create a Hologres source table, make sure that the date and time data in the table is of the TIMESTAMPTZ type.
    • After you enable the binary logging feature, column-oriented tables have larger overheads than row-oriented tables in theory. Therefore, we recommend that you enable the binary logging feature for row-oriented tables if data is frequently updated.
  • Enable binary logging
    By default, the binary logging feature is disabled. Therefore, when you write DDL statements to create a table, you must configure the binlog.level and binlog.ttl parameters. The following code shows an example.
    begin;
    create table test_message_src(
     id int primary key, 
     title text not null, 
     body text
    );
    call set_table_property('test_message_src', 'orientation', 'row');
    call set_table_property('test_message_src', 'clustering_key', 'id');
    call set_table_property('test_message_src', 'binlog.level', 'replica'); 
    call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
    commit;
    If binlog.level is set to replica, binary logging is enabled. binlog.ttl indicates the time-to-live (TTL) of binary logging. The value is indicated in seconds.
  • Usage notes
    • The UPDATE statement generates two binary logging records, which are the data records before and after the UPDATE statement is executed. In this case, two data records are consumed. The binary logging feature ensures that the two data records are consecutive and in the correct order. The data record generated before the update operation is placed before the data record generated after the update operation.
    • We recommend that the parallelism of Flink jobs be the same as the number of shards in your Hologres table.
      You can log on to the Hologres console and execute the following statement to query the number of shards in your table. tablename indicates the name of your table.
      select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Consumption mode
    • Non-CDC mode
      In this mode, the binary log data consumed by a source table is transmitted to a descendant node as regular Flink data. The change type of all the data is INSERT. The following sample code shows how to process data of a type specified by hg_binlog_event_type based on your business requirement. The following sample code shows the DDL statement to create a Hologres source table in non-CDC mode:
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
    • CDC mode
      In this mode, each row of the binary log data consumed by a source table is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER based on hg_binlog_event_type. This way, the binary log data can be mapped to the source table. This is similar to the CDC feature in MySQL and PostgreSQL. The following sample code shows the DDL statement to create a Hologres source table in CDC mode:
      create table test_message_src_binlog_table(
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'cdcMode' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
  • Principles
    A binary log consist of binary log system fields and user-defined table fields. The following table describes specific fields.
    Field Data type Description
    hg_binlog_lsn BIGINT A field in the binary log system. This field indicates the ordinal number of a binary log. The ordinal numbers for binary logs in a shard are generated in ascending order but may not be consecutive. The ordinal numbers in different shards can be the same and out of order.
    hg_binlog_event_type BIGINT A field in the binary log system. This field indicates the change type of a record. Valid values:
    • INSERT=5: indicates that a row or column is inserted.
    • DELETE=2: indicates that a row or column is deleted.
    • BEFORE_UPDATE=3: indicates that a row or column is saved before it is updated.
    • AFTER_UPDATE=7: indicates that a row or column is saved after it is updated.
    hg_binlog_timestamp_us BIGINT A field in the binary log system. This field indicates the timestamp of the system. Unit: microseconds.
    user_table_column_1 Custom A field in a user table.
    ... ... A field in a user table.
    user_table_column_n Custom A field in a user table.