This topic provides the DDL syntax that is used to create a Hologres source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note The Hologres connector can be used to store data of a source table for streaming jobs and batch jobs.

What is Hologres?

Hologres is a real-time interactive analytics service that is developed by Alibaba Cloud. Hologres is compatible with the PostgreSQL protocol and is seamlessly integrated with the big data ecosystem. Hologres allows you to analyze and process up to petabytes of data with high concurrency and low latency. Hologres allows you to use existing business intelligence (BI) tools to perform multidimensional analysis and explore your business in an efficient manner.

Prerequisites

A Hologres table is created. For more information, see Manage an internal table.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Hologres connector.
  • Realtime Compute for Apache Flink cannot write result data to Hologres external tables by using the Hologres connector. For more information about Hologres external tables, see Manage a foreign table.
  • By default, Flink performs a full table scan once only to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Flink does not read the data that is appended to the Hologres source table. Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information about how to consume Hologres data in real time, see Consume Hologres data in real time.
  • You cannot define watermarks for a Hologres source table in Change Data Capture (CDC) mode. If you want to perform window aggregation on a Hologres source table, you can use a different approach to perform time-based aggregation. For more information, see How do I perform window aggregation if watermarks are not supported?.

DDL syntax

create table hologres_source(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);
Note Flink does not allow you to define computed columns in source tables.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to hologres.
dbname The name of the database. Yes N/A.
tablename The name of the table.
Note If the public schema is not used, you must set the tablename parameter in the schema.tableName format.
Yes N/A.
username The username that is used to access the database. You must enter the AccessKey ID of your Alibaba Cloud account. Yes N/A.
password The password that is used to access the database. You must enter the AccessKey secret of your Alibaba Cloud account. Yes N/A.
endpoint The endpoint of Hologres. Yes For more information, see Endpoints for connecting to Hologres.
field_delimiter The delimiter that is used between rows of the data that you want to export.
Note This parameter is valid only when bulkread is set to true.
No Default value: "\u0002".
binlog Specifies whether to consume binary log data. No Valid values:
  • true: Binary log data is consumed.
  • false: Binary log data is not consumed. This is the default value.
Notice Only Ververica Platform (VVP) 2.4.0 and later support this parameter.
binlogMaxRetryTimes The number of retries after Flink fails to read the binary log data. No Default value: 60.
Notice Only VVP 2.4.0 and later support this parameter.
binlogRetryIntervalMs The interval between retires after Flink fails to read the binary log data. No Default value: 2000. Unit: milliseconds.
Notice Only VVP 2.4.0 and later support this parameter.
binlogBatchReadSize The number of rows in which the binary log data is read at a time. No Default value: 16.
Notice Only VVP 2.4.0 and later support this parameter.
cdcMode Specifies whether to read binary log data in CDC mode. No Valid values:
  • true: Binary log data is read in CDC mode.
  • false: Binary log data is read in non-CDC mode. This is the default value.
Notice Only VVP 2.4.0 and later support this parameter.
binlogStartupMode The mode in which binary log data is consumed. No Valid values:
  • initial: The system consumes full data and then reads binary log data to consume incremental data.
  • earliestOffset: The system starts data consumption from the earliest binary log data. This is the default value.
  • timestamp: The system consumes binary log data from the point in time that is specified by the startTime parameter.
Note This parameter is available only when the engine version of Realtime Compute for Apache Flink is VVR 4.0.13 or later and the Hologres version is Hologres V0.10 or later.
startTime The start time when Hologres data is consumed. If this parameter is not specified and jobs are not resumed from the state, Flink starts to consume the Hologres data from the earliest generated binary log. No The format is yyyy-MM-dd hh:mm:ss.

Sample code

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink
SELECT 
   name, age, birthday
from hologres_source;

Data type mappings

The following table describes the data type mappings between Hologres and Flink fields. We recommend that you declare the mappings in a DDL statement.
Data type of Hologres Data type of Flink
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE PRECISION>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE (alias: TIMESTAMPTZ) TIMESTAMP

Consume Hologres data in real time

Flink that uses VVP 2.4.0 or later can consume the binary log data of Hologres in real time. This section describes the details.
  • Limits
    • In Hologres V1.1 and later, you can choose to enable or disable binary logging based on your business requirements. You can also specify the time to live (TTL) for retaining binary log data based on your business requirements. For more information, see Subscribe to Hologres binlogs.
    • You are not allowed to change the property of an existing table to enable the binary logging feature. You must create another table.
    • Partitioned tables do not support the binary logging feature. If you want to enable this feature, you must use a non-partitioned table.
    • Flink cannot consume data of the TIMESTAMP type in real time. Therefore, when you create a Hologres source table, make sure that the date and time data in the table is of the TIMESTAMPTZ type.
    • After you enable the binary logging feature, column-oriented tables have larger overheads than row-oriented tables in theory. Therefore, we recommend that you enable the binary logging feature for row-oriented tables if data is frequently updated.
    • Binary log data of the ARRAY type cannot be consumed. Only binary log data of the following types can be consumed: INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, and TIMESTAMPTZ.
      Note If a field in a table that is used in a job is of the data type that cannot be consumed, such as SMALLINT, the job cannot be published even if the data of the field is not consumed.
  • Enable binary logging
    By default, the binary logging feature is disabled. Therefore, when you write DDL statements to create a table, you must configure the binlog.level and binlog.ttl parameters. The following code shows an example.
    begin;
    create table test_message_src(
     id int primary key, 
     title text not null, 
     body text
    );
    call set_table_property('test_message_src', 'orientation', 'row');
    call set_table_property('test_message_src', 'clustering_key', 'id');
    call set_table_property('test_message_src', 'binlog.level', 'replica'); -- In Hologres V1.1 or later, you can enable binary logging after you create a table. 
    call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
    commit;
    If binlog.level is set to replica, binary logging is enabled. binlog.ttl indicates the TTL of binary logging. The value is indicated in seconds.
  • Precautions
    • The UPDATE statement generates two binary logging records, which are the data records before and after the UPDATE statement is executed. In this case, two data records are consumed. The binary logging feature ensures that the two data records are consecutive and in the correct order. The data record generated before the update operation is placed before the data record generated after the update operation.
    • We recommend that the parallelism of Flink jobs be the same as the number of shards in your Hologres table.
      You can log on to the Hologres console and execute the following statement to query the number of shards in your table. tablename indicates the name of your table.
      select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Consumption mode
    • Non-CDC mode
      In this mode, the binary log data consumed by a source table is transmitted to a descendant node as regular Flink data. The change type of all the data is INSERT. The following sample code shows how to process data of a type specified by hg_binlog_event_type based on your business requirement. The following sample code shows the DDL statement to create a Hologres source table in non-CDC mode:
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
    • CDC mode
      In this mode, each row of the binary log data consumed by a source table is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER based on hg_binlog_event_type. This way, the binary log data can be mapped to the source table. This is similar to the CDC feature in MySQL and PostgreSQL. The following sample code shows the DDL statement to create a Hologres source table in non-CDC mode:
      create table test_message_src_binlog_table(
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'cdcMode' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
    • Consumption of full and incremental data in a source table
      • Background information
        When you join a dimension table and a source table, not all data of the source table can be used due to reasons such as the TTL value that is specified for binary log data. Binary logging is not enabled for historical data and is enabled only for specific new data. In the previous method for resolving this issue, the TTL for the binlog table is set to a large value. This method may cause the following issues:
        • Historical binary log data is stored for a long period of time, which occupies a large number of storage resources.
        • Binary log data contains data update records. If full binary log data is consumed, data that is not required for your business is also consumed. As a result, a large number of computing resources are occupied and users cannot focus only on the latest data.

        In VVR 4.0.13 and later and Hologres V0.10 and later, Hologres binlog source tables in CDC mode support consumption of full and incremental data. This way, the system reads historical full data from the database and then smoothly switches to read incremental data from binary log data. This method can be used to resolve the preceding issue.

      • Scenario

        This method is suitable for scenarios in which historical data does not contain binary log data and you want to consume full data.

      • Limits
        • You can use this method if the destination table contains a primary key. If the destination table does not contain a primary key, we recommend that you use a Hologres CDC source table.
        • If you use Hologres whose version is earlier than V1.1, you can enable the binary logging feature based on your business requirements. You can enable the binary logging feature for a table that contains historical data.
      • Sample code
        CREATE TABLE test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) WITH (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogStartUpMode' = 'initial', -- Read full historical data and then consume incremental data from binary log data. 
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
          );
  • Principles
    A binary log consists of binary log system fields and custom table fields. The following table describes the fields of binary logs.
    Field Data type Description
    hg_binlog_lsn BIGINT A field in the binary log system. This field indicates the ordinal number of a binary log. The ordinal numbers for binary logs in a shard are generated in ascending order but may not be consecutive. The ordinal numbers in different shards can be the same and out of order.
    hg_binlog_event_type BIGINT A field in the binary log system. This field indicates the change type of a record. Valid values:
    • INSERT=5: indicates that a row or column is inserted.
    • DELETE=2: indicates that a row or column is deleted.
    • BEFORE_UPDATE=3: indicates that a row or column is saved before the row or column is updated.
    • AFTER_UPDATE=7: indicates that a row or column is saved after the row or column is updated.
    hg_binlog_timestamp_us BIGINT A field in the binary log system. This field indicates the timestamp of the system. Unit: microseconds.
    user_table_column_1 Custom A field in a user table.
    ... ... A field in a user table.
    user_table_column_n Custom A field in a user table.