All Products
Search
Document Center

Realtime Compute for Apache Flink:Use the Hologres connector in a YAML deployment for data ingestion (public preview)

Last Updated:Dec 12, 2024

This topic describes how to use the Hologres connector to synchronize data in a YAML deployment for data ingestion.

Background information

Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. Hologres is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries on petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities of the Hologres connector that is used in YAML deployments.

Item

Description

Table type

Data ingestion sink

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

  • numRecordsOut

  • numRecordsOutPerSecond

Note

For more information about the metrics, see Metrics.

API type

YAML API

Data update or deletion in a sink table

Supported

Features

Feature

Description

Synchronization of data from all tables in a database

Synchronizes full data and incremental data from all tables in a database or multiple tables in a database to each related sink table.

Synchronization of table schema changes

Synchronizes schema changes in each source table, such as an added, deleted, or renamed column, to the related sink table in real time during database synchronization.

Merging and synchronization of multiple tables in a sharded database

Allows you to use regular expressions to specify database names to match the source tables in multiple database shards of the data source. After the data of the source tables is merged, the data is synchronized to a downstream sink table with a name that corresponds to each source table.

Writing data to a partitioned table

Allows you to write data from an upstream table to a Hologres partitioned table.

Data type mappings

Uses multiple mapping policies to map upstream data types to Hologres data types. Hologres data types are wider than Flink Change Data Capture (CDC) data types.

Syntax

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

Parameters

Parameter

Description

Data type

Required

Default value

Remarks

type

The sink type.

String

Yes

No default value

The value of the parameter is hologres.

name

The sink name.

String

No

No default value

N/A.

dbname

The database name.

String

Yes

No default value

N/A.

username

The username that is used to access the database. Enter the AccessKey ID of your Alibaba Cloud account.

String

Yes

No default value

For more information about how to obtain the AccessKey secret of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic.

Important

To protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.

password

The password that is used to access the database. Enter the AccessKey secret of your Alibaba Cloud account.

String

Yes

No default value

For more information, see Console operations.

Important

To protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.

endpoint

The endpoint of Hologres.

String

Yes

No default value

For more information, see Endpoints for connecting to Hologres.

jdbcRetryCount

The maximum number of retries allowed to read and write data if a connection failure occurs.

Integer

No

10

N/A.

jdbcRetrySleepInitMs

The fixed waiting period for each retry.

Long

No

1000

Unit: milliseconds. The actual time to wait for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter.

jdbcRetrySleepStepMs

The accumulated waiting duration for each retry.

Long

No

5000

Unit: millisecond. The actual waiting duration for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter.

jdbcConnectionMaxIdleMs

The maximum duration for which the Java Database Connectivity (JDBC) connection is idle.

Long

No

60000

Unit: millisecond. If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released.

jdbcMetaCacheTTL

The maximum time for storing the TableSchema information in the cache.

Long

No

60000

Unit: millisecond.

jdbcMetaAutoRefreshFactor

The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, Realtime Compute for Apache Flink automatically refreshes the cache.

Integer

No

4

The remaining time for storing data in the cache is calculated by using the following formula: Remaining time for storing data in the cache = Maximum time that is allowed to store data in the cache - Duration for which data is cached. After the cache is automatically refreshed, the duration for which data is cached is recalculated from 0.

The time for triggering automatic cache refresh is calculated by using the following formula: Time for triggering an automatic refresh of the cache = Value of the jdbcMetaCacheTTL parameter/Value of the jdbcMetaAutoRefreshFactor parameter.

mutatetype

The data writing mode.

String

No

INSERT_OR_UPDATE

If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must specify the mutatetype parameter to determine how the sink table is updated. Valid values of the mutatetype parameter:

  • INSERT_OR_IGNORE: retains distinct data records and ignore duplicate data records.

  • INSERT_OR_REPLACE: replaces the existing record in an entire row with the record that arrives later.

  • INSERT_OR_UPDATE: updates specific columns of the existing data. For example, a table contains fields a, b, c, and d. The a field is the primary key and only data in the a and b fields are written to Hologres. If duplicate primary keys exist, the system updates only data in the b field. Data in the c and d fields remains unchanged.

createparttable

Specifies whether to automatically create a partitioned table to which data is written based on partition values.

Boolean

No

false

N/A.

sink.delete-strategy

The operation that is performed when a retraction message is received.

String

No

No default value

The value of the incrValue parameter varies based on the value of the incrMode parameter.

  • IGNORE_DELETE: ignores UPDATE BEFORE and DELETE messages. This operation applies to scenarios in which data is inserted or updated and no data deletion is required.

  • CHANGELOG_STANDARD: The framework of Realtime Compute for Apache Flink runs based on the principles of Flink SQL changelogs. Delete operations are not ignored. If an update operation is performed, the original data is deleted before new data is inserted. This ensures data accuracy. This operation applies to scenarios in which partial updates are not involved.

jdbcWriteBatchSize

The maximum number of data rows that can be processed by a Hologres streaming sink node at the same time when a JDBC driver is used.

Integer

No

256

Unit: row.

Note

You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.

jdbcWriteBatchByteSize

The maximum number of bytes of data that can be processed by a Hologres streaming sink node at the same time when a JDBC driver is used.

Long

No

2097152 (2 × 1024 × 1024 bytes = 2 MB)

Note

You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.

jdbcWriteFlushInterval

The maximum period of time required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used.

Long

No

10000

Unit: millisecond.

Note

You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.

ignoreNullWhenUpdate

Specifies whether to ignore the null value that is written in the data if mutatetype='insertOrUpdate' is specified.

Boolean

No

false

Valid values:

  • false: The null value is written to a Hologres sink table. This is the default value.

  • true: The null value that is written in the data is ignored.

jdbcEnableDefaultForNotNullColumn

Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.

Boolean

No

true

Valid values:

  • true: The Hologres connector fills a default value. This is the default value. If you set this parameter to true, the system converts a null value into a default value based on the following rules:

    • If the column is of the STRING type, the column is left empty.

    • If the column is of the NUMBER data type, the null value is converted into 0.

    • If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted into 1970-01-01 00:00:00.

  • false: The Hologres connector does not fill a default value. If a null value is written to a non-null column for which the default value is not configured in the Hologres table, an error is returned.

remove-u0000-in-text.enabled

Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from data of the STRING type that is written to the sink table if the data contains the invalid characters \u0000.

Boolean

No

false

Valid values:

  • false: The Hologres connector does not process data. If the data that is written to the sink table contains dirty data, the "ERROR: invalid byte sequence for encoding "UTF8": 0x00" error message may be reported. This is the default value.

    In this case, you need to remove the dirty data from the source table in advance or define the dirty data processing logic in the SQL statement.

  • true: The Hologres connector removes the invalid characters \u0000 from the data of the STRING type to prevent the error message from being reported.

deduplication.enabled

Specifies whether to perform deduplication when data is written in batches in jdbc or jdbc_fixed mode.

Boolean

No

true

Valid values:

  • true: If the data that is written in batches contains data that has the same primary key, deduplication is performed. Only the last data record is retained. This is the default value. In this example, two fields are used and the first field is the primary key.

    • If the INSERT (1,'a') and INSERT (1,'b') records are written to the sink table in chronological order, only the record (1,'b') that arrives later is retained and written to the Hologres sink table after deduplication is performed.

    • The (1,'a') record already exists in the Hologres sink table. In this case, the DELETE (1,'a') and INSERT (1,'b') records are written to the sink table in chronological order. Only the (1,'b') record that arrives later is retained and written to the Hologres sink table. In this case, data is directly updated rather than deleted and then inserted into the table.

  • false: No deduplication is performed when data is written in batches. If the primary key of the inserted data has the same value as the primary key of the data that is written in batches, the data in batches is written, and then the data that needs to be inserted is written.

sink.type-normalize-strategy

The data mapping policy.

String

No

STANDARD

The mapping policy that is used to convert the upstream data type into a Hologres data type in Hologres sink tables. Valid values:

  • STANDARD: converts data of the Flink CDC type into data of the AnalyticDB for PostgreSQL type based on the conversion standards.

  • BROADEN: converts data of the Flink CDC type into data of the Hologres type. Hologres data types are wider than Flink CDC data types.

  • ONLY_BIGINT_OR_TEXT: converts all data of the Flink CDC data type into data of the BIGINT or STRING type in Hologres.

table_property.*

The properties of a Hologres physical table.

String

No

No default value

When you create a Hologres table, you can configure physical table properties in the WITH clause. You can configure table property settings based on your business requirements to sort and query data in an efficient manner.

Warning

By default, the value of table_property.distribution_key is the primary key value. If you set this parameter to another value, the written data may be invalid.

Data type mappings

You can use the sink.type-normalize-strategy parameter to specify a policy for converting the upstream data type into a Hologres data type.

Note
  • The first time you start a YAML deployment, we recommend that you specify sink.type-normalize-strategy. If you specify sink.type-normalize-strategy after the deployment is started, you must delete the downstream tables and restart the deployment without states to allow the policy to take effect.

  • Only the following array data types are supported: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.

  • The NUMERIC data type cannot be used as the data type of a primary key in Hologres. If the data type of a primary key is mapped to the NUMERIC data type, the data type of the primary key is converted into the VARCHAR data type.

STANDARD

The following table describes the data type mappings if sink.type-normalize-strategy is set to STANDARD.

Flink CDC data type

Hologres data type

CHAR

bpchar

STRING

text

VARCHAR

text (when the data length is greater than 10,485,760 bytes)

varchar (when the data length is less than or equal to 10,485,760 bytes)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Supported array data types

MAP

Not supported

ROW

Not supported

BROADEN

If you set sink.type-normalize-strategy to BROADEN, data of the Flink CDC type is converted into data of a Hologres data type. Hologres data types are wider than Flink CDC data types. The following table describes the data type mappings.

Flink CDC data type

Hologres data type

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Supported array data types

MAP

Not supported

ROW

Not supported

ONLY_BIGINT_OR_TEXT

If you set sink.type-normalize-strategy to ONLY_BIGINT_OR_TEXT, all data of the Flink CDC data type is converted into data of the BIGINT or STRING type in Hologres. The following table describes the data type mappings.

Flink CDC data type

Hologres data type

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

Supported array data types

MAP

Not supported

ROW

Not supported

Writing data to a partitioned table

Hologres sinks allow you to write data to a Hologres partitioned table. You can use the sinks with the transform module to write upstream data to a Hologres partitioned table. Take note of the following points:

  • The partition key must be included in the primary key. If one of the upstream non-primary keys is used as the partition key, the upstream and downstream primary keys may be inconsistent. This may cause data inconsistency between the upstream and downstream tables during data synchronization.

  • Columns of the TEXT, VARCHAR, and INT data types can be used as partition keys. In Hologres V1.3.22 and later, the columns of the DATE type can also be used as partition keys.

  • You must set createparttable to true to allow the system to automatically create child partitioned tables. Otherwise, you must manually create child partitioned tables.

To view an example on how to write data to a partitioned table, see Writing data to a partitioned table.

Synchronization of table schema changes

You can use the schema.change.behavior parameter in a CDC pipeline deployment in YAML format to specify the policy for synchronizing table schema changes. Valid values of schema.change.behavior: IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, and EXCEPTION. Hologres sinks do not support the TRY_EVOLVE policy. If you use the LENIENT or EVOLVE policy to synchronize table schema changes, the schema of the sink table may change. The following sections provide information about how to use the LENIENT or EVOLVE policy to handle different schema change events.

LENIENT (default value)

If you set schema.change.behavior to LENIENT, table schema changes are synchronized based on the description of the following operations:

  • Add a nullable column: The statement automatically adds the related column to the end of the schema of the sink table and synchronizes data to the added column.

  • Delete a nullable column: The statement automatically fills null values in the nullable column of the sink table instead of deleting the column from the table.

  • Add a non-nullable column: The statement automatically adds the related column to the end of the schema of the sink table and synchronizes the data of the new column. The new column is automatically set to a nullable column and the data before the column is added is automatically set to null values.

  • Rename a column: The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the sink table and the column that uses the original name is filled with null values. For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the sink table and the col_a column is automatically filled with null values.

  • Change the data type of a column: not supported. Hologres does not support column data type changes. You must specify the sink.type-normalize-strategy parameter.

  • The following schema changes are not supported:

    • Change of constraints, such as the primary key or index

    • Deletion of a non-nullable column

    • Change from not null to nullable

EVOLVE

If you set schema.change.behavior to EVOLVE, table schema changes are synchronized based on the description of the following operations:

  • Add a nullable column: supported.

  • Delete a nullable column: not supported.

  • Add a non-nullable column: A nullable column is added to the sink table.

  • Rename a column: supported. The column is renamed in the sink table.

  • Change the data type of a column: not supported. Hologres does not support column data type changes. You must use Hologres with sink.type-normalize-strategy.

  • The following schema changes are not supported:

    • Change of constraints, such as the primary key or index

    • Deletion of a non-nullable column

    • Change from not null to nullable

Warning

If the sink table of a deployment in which the EVOLVE value is used is not deleted and the deployment is restarted without states, the deployment may fail due to schema inconsistency between upstream tables and sink tables. In this case, you must manually change the schema of the sink tables.

For more information about how to enable the EVOLVE mode, see Enabling the EVOLVE mode.

Sample code

Conversion of Flink CDC data types into Hologres data types

You can use the sink.type-normalize-strategy parameter to convert Flink CDC data types into Hologres data types. Hologres data types are wider than the Flink CDC data types. Sample code:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Writing data to a partitioned table

You can convert the upstream field create_time of the timestamp data type into a field of the date data type, and use the new field as the partition key of the Hologres table. Sample code:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

Enabling the EVOLVE mode

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

Synchronization of data from a single table in a database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Synchronization of data from all tables in a database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Merging and synchronization of multiple tables in a sharded database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

Synchronization of the schema of a specific table

The schema of a Hologres table corresponds to the database of a MySQL table. You can synchronize the schema of a specific Hologres sink table to a MySQL table. Sample code:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

Synchronization of a new table without restarting a deployment

If you want to synchronize a new table without restarting a deployment in real time, set scan.binlog.newly-added-table.enable to true. Sample code:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Synchronization of an existing table after a deployment is restarted

If you want to synchronize an existing table in a deployment, set scan.newly-added-table.enabled to true and restart the deployment.

Warning

If you use the scan.binlog.newly-added-table.enabled = true configuration in a deployment to synchronize a new table, you cannot synchronize an existing table by using the scan.newly-added-table.enabled = true configuration and then restarting the deployment. If you attempt to use the preceding method, data may be repeatedly sent.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Excluding specific tables during synchronization of data from all tables in a database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

References