This topic describes how to use the Hologres connector to synchronize data in a YAML deployment for data ingestion.
Background information
Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. Hologres is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries on petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities of the Hologres connector that is used in YAML deployments.
Item | Description |
Table type | Data ingestion sink |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | YAML API |
Data update or deletion in a sink table | Supported |
Features
Feature | Description |
Synchronizes full data and incremental data from all tables in a database or multiple tables in a database to each related sink table. | |
Synchronizes schema changes in each source table, such as an added, deleted, or renamed column, to the related sink table in real time during database synchronization. | |
Merging and synchronization of multiple tables in a sharded database | Allows you to use regular expressions to specify database names to match the source tables in multiple database shards of the data source. After the data of the source tables is merged, the data is synchronized to a downstream sink table with a name that corresponds to each source table. |
Allows you to write data from an upstream table to a Hologres partitioned table. | |
Uses multiple mapping policies to map upstream data types to Hologres data types. Hologres data types are wider than Flink Change Data Capture (CDC) data types. |
Syntax
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
Parameters
Parameter | Description | Data type | Required | Default value | Remarks |
type | The sink type. | String | Yes | No default value | The value of the parameter is |
name | The sink name. | String | No | No default value | N/A. |
dbname | The database name. | String | Yes | No default value | N/A. |
username | The username that is used to access the database. Enter the AccessKey ID of your Alibaba Cloud account. | String | Yes | No default value | For more information about how to obtain the AccessKey secret of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Console operations topic. Important To protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys. |
password | The password that is used to access the database. Enter the AccessKey secret of your Alibaba Cloud account. | String | Yes | No default value | For more information, see Console operations. Important To protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys. |
endpoint | The endpoint of Hologres. | String | Yes | No default value | For more information, see Endpoints for connecting to Hologres. |
jdbcRetryCount | The maximum number of retries allowed to read and write data if a connection failure occurs. | Integer | No | 10 | N/A. |
jdbcRetrySleepInitMs | The fixed waiting period for each retry. | Long | No | 1000 | Unit: milliseconds. The actual time to wait for a retry is calculated by using the following formula: |
jdbcRetrySleepStepMs | The accumulated waiting duration for each retry. | Long | No | 5000 | Unit: millisecond. The actual waiting duration for a retry is calculated by using the following formula: |
jdbcConnectionMaxIdleMs | The maximum duration for which the Java Database Connectivity (JDBC) connection is idle. | Long | No | 60000 | Unit: millisecond. If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released. |
jdbcMetaCacheTTL | The maximum time for storing the TableSchema information in the cache. | Long | No | 60000 | Unit: millisecond. |
jdbcMetaAutoRefreshFactor | The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, Realtime Compute for Apache Flink automatically refreshes the cache. | Integer | No | 4 | The remaining time for storing data in the cache is calculated by using the following formula: Remaining time for storing data in the cache = Maximum time that is allowed to store data in the cache - Duration for which data is cached. After the cache is automatically refreshed, the duration for which data is cached is recalculated from 0. The time for triggering automatic cache refresh is calculated by using the following formula: Time for triggering an automatic refresh of the cache = Value of the jdbcMetaCacheTTL parameter/Value of the jdbcMetaAutoRefreshFactor parameter. |
mutatetype | The data writing mode. | String | No | INSERT_OR_UPDATE | If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must specify the mutatetype parameter to determine how the sink table is updated. Valid values of the mutatetype parameter:
|
createparttable | Specifies whether to automatically create a partitioned table to which data is written based on partition values. | Boolean | No | false | N/A. |
sink.delete-strategy | The operation that is performed when a retraction message is received. | String | No | No default value | The value of the incrValue parameter varies based on the value of the incrMode parameter.
|
jdbcWriteBatchSize | The maximum number of data rows that can be processed by a Hologres streaming sink node at the same time when a JDBC driver is used. | Integer | No | 256 | Unit: row. Note You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met. |
jdbcWriteBatchByteSize | The maximum number of bytes of data that can be processed by a Hologres streaming sink node at the same time when a JDBC driver is used. | Long | No | 2097152 (2 × 1024 × 1024 bytes = 2 MB) | Note You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met. |
jdbcWriteFlushInterval | The maximum period of time required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used. | Long | No | 10000 | Unit: millisecond. Note You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met. |
ignoreNullWhenUpdate | Specifies whether to ignore the null value that is written in the data if mutatetype='insertOrUpdate' is specified. | Boolean | No | false | Valid values:
|
jdbcEnableDefaultForNotNullColumn | Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table. | Boolean | No | true | Valid values:
|
remove-u0000-in-text.enabled | Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from data of the STRING type that is written to the sink table if the data contains the invalid characters \u0000. | Boolean | No | false | Valid values:
|
deduplication.enabled | Specifies whether to perform deduplication when data is written in batches in jdbc or jdbc_fixed mode. | Boolean | No | true | Valid values:
|
sink.type-normalize-strategy | The data mapping policy. | String | No | STANDARD | The mapping policy that is used to convert the upstream data type into a Hologres data type in Hologres sink tables. Valid values:
|
table_property.* | The properties of a Hologres physical table. | String | No | No default value | When you create a Hologres table, you can configure physical table properties in the WITH clause. You can configure table property settings based on your business requirements to sort and query data in an efficient manner. Warning By default, the value of table_property.distribution_key is the primary key value. If you set this parameter to another value, the written data may be invalid. |
Data type mappings
You can use the sink.type-normalize-strategy
parameter to specify a policy for converting the upstream data type into a Hologres data type.
The first time you start a YAML deployment, we recommend that you specify sink.type-normalize-strategy. If you specify sink.type-normalize-strategy after the deployment is started, you must delete the downstream tables and restart the deployment without states to allow the policy to take effect.
Only the following array data types are supported: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.
The NUMERIC data type cannot be used as the data type of a primary key in Hologres. If the data type of a primary key is mapped to the NUMERIC data type, the data type of the primary key is converted into the VARCHAR data type.
STANDARD
The following table describes the data type mappings if sink.type-normalize-strategy is set to STANDARD.
Flink CDC data type | Hologres data type |
CHAR | bpchar |
STRING | text |
VARCHAR | text (when the data length is greater than 10,485,760 bytes) |
varchar (when the data length is less than or equal to 10,485,760 bytes) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Supported array data types |
MAP | Not supported |
ROW | Not supported |
BROADEN
If you set sink.type-normalize-strategy to BROADEN, data of the Flink CDC type is converted into data of a Hologres data type. Hologres data types are wider than Flink CDC data types. The following table describes the data type mappings.
Flink CDC data type | Hologres data type |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Supported array data types |
MAP | Not supported |
ROW | Not supported |
ONLY_BIGINT_OR_TEXT
If you set sink.type-normalize-strategy
to ONLY_BIGINT_OR_TEXT, all data of the Flink CDC data type is converted into data of the BIGINT or STRING type in Hologres. The following table describes the data type mappings.
Flink CDC data type | Hologres data type |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | Supported array data types |
MAP | Not supported |
ROW | Not supported |
Writing data to a partitioned table
Hologres sinks allow you to write data to a Hologres partitioned table. You can use the sinks with the transform module to write upstream data to a Hologres partitioned table. Take note of the following points:
The partition key must be included in the primary key. If one of the upstream non-primary keys is used as the partition key, the upstream and downstream primary keys may be inconsistent. This may cause data inconsistency between the upstream and downstream tables during data synchronization.
Columns of the TEXT, VARCHAR, and INT data types can be used as partition keys. In Hologres V1.3.22 and later, the columns of the DATE type can also be used as partition keys.
You must set createparttable to true to allow the system to automatically create child partitioned tables. Otherwise, you must manually create child partitioned tables.
To view an example on how to write data to a partitioned table, see Writing data to a partitioned table.
Synchronization of table schema changes
You can use the schema.change.behavior
parameter in a CDC pipeline deployment in YAML format to specify the policy for synchronizing table schema changes. Valid values of schema.change.behavior
: IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, and EXCEPTION. Hologres sinks do not support the TRY_EVOLVE policy. If you use the LENIENT or EVOLVE policy to synchronize table schema changes, the schema of the sink table may change. The following sections provide information about how to use the LENIENT or EVOLVE policy to handle different schema change events.
LENIENT (default value)
If you set schema.change.behavior to LENIENT, table schema changes are synchronized based on the description of the following operations:
Add a nullable column: The statement automatically adds the related column to the end of the schema of the sink table and synchronizes data to the added column.
Delete a nullable column: The statement automatically fills null values in the nullable column of the sink table instead of deleting the column from the table.
Add a non-nullable column: The statement automatically adds the related column to the end of the schema of the sink table and synchronizes the data of the new column. The new column is automatically set to a nullable column and the data before the column is added is automatically set to null values.
Rename a column: The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the sink table and the column that uses the original name is filled with null values. For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the sink table and the col_a column is automatically filled with null values.
Change the data type of a column: not supported. Hologres does not support column data type changes. You must specify the sink.type-normalize-strategy parameter.
The following schema changes are not supported:
Change of constraints, such as the primary key or index
Deletion of a non-nullable column
Change from not null to nullable
EVOLVE
If you set schema.change.behavior to EVOLVE, table schema changes are synchronized based on the description of the following operations:
Add a nullable column: supported.
Delete a nullable column: not supported.
Add a non-nullable column: A nullable column is added to the sink table.
Rename a column: supported. The column is renamed in the sink table.
Change the data type of a column: not supported. Hologres does not support column data type changes. You must use Hologres with sink.type-normalize-strategy.
The following schema changes are not supported:
Change of constraints, such as the primary key or index
Deletion of a non-nullable column
Change from not null to nullable
If the sink table of a deployment in which the EVOLVE value is used is not deleted and the deployment is restarted without states, the deployment may fail due to schema inconsistency between upstream tables and sink tables. In this case, you must manually change the schema of the sink tables.
For more information about how to enable the EVOLVE mode, see Enabling the EVOLVE mode.
Sample code
Conversion of Flink CDC data types into Hologres data types
You can use the sink.type-normalize-strategy
parameter to convert Flink CDC data types into Hologres data types. Hologres data types are wider than the Flink CDC data types. Sample code:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Writing data to a partitioned table
You can convert the upstream field create_time of the timestamp data type into a field of the date data type, and use the new field as the partition key of the Hologres table. Sample code:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline
Enabling the EVOLVE mode
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolve
Synchronization of data from a single table in a database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Synchronization of data from all tables in a database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Merging and synchronization of multiple tables in a sharded database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline
Synchronization of the schema of a specific table
The schema of a Hologres table corresponds to the database of a MySQL table. You can synchronize the schema of a specific Hologres sink table to a MySQL table. Sample code:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db2.user\.*r
pipeline:
name: MySQL to Hologres Pipeline
Synchronization of a new table without restarting a deployment
If you want to synchronize a new table without restarting a deployment in real time, set scan.binlog.newly-added-table.enable to true. Sample code:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Synchronization of an existing table after a deployment is restarted
If you want to synchronize an existing table in a deployment, set scan.newly-added-table.enabled to true and restart the deployment.
If you use the scan.binlog.newly-added-table.enabled = true configuration in a deployment to synchronize a new table, you cannot synchronize an existing table by using the scan.newly-added-table.enabled = true configuration and then restarting the deployment. If you attempt to use the preceding method, data may be repeatedly sent.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Excluding specific tables during synchronization of data from all tables in a database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
References
For more information about sources, sinks, transforms, and routes for data ingestion, see Data ingestion development references .
For more information about how to develop a YAML draft for data ingestion, see Develop a YAML draft for data ingestion (public preview).