This topic describes the common scenarios of and best practices for using YAML deployments to ingest data. This helps you quickly create a data synchronization task.
Synchronize all data in a MySQL database to a Hologres database
You can synchronize data to Hologres to build a real-time data warehouse by using a YAML deployment for data ingestion. This helps implement efficient and scalable real-time data processing and analysis based on the powerful real-time data processing capabilities of Realtime Compute for Apache Flink and the capabilities of Hologres, such as binary logging, hybrid row-column storage, and strong resource isolation.
The following sample code shows how to use a YAML deployment for data ingestion to synchronize all data in a MySQL database to a Hologres database:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
Use more data type mappings
The Hologres connector cannot change the data types of columns but supports multiple data type mappings. You can map MySQL data types to Hologres data types. This facilitates change of data sources and allows you to skip unnecessary data type changes. Then, deployments can be run as expected. You can specify the sink.type-normalize-strategy
parameter to perform data type change. The default value of this parameter is STANDARD. For more information, see Use the Hologres connector in a YAML deployment for data ingestion (public preview).
If you set this parameter to ONLY_BIGINT_OR_TEXT, all data of the MySQL data type is mapped to data of the INT8 or TEXT type in Hologres. Hologres can map data of the INT and BIGINT data types to data of the INT8 type. If the data type of a column in MySQL is changed from INT to BIGINT, a deployment can process the data type change and does not report errors.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
Write data to a partitioned table
If you use the Hologres connector in a YAML deployment for data ingestion, you can write data to Hologres partitioned tables. For more information, see the "Writing data to a partitioned table" section of the Use the Hologres connector in a YAML deployment for data ingestion (public preview) topic.
Synchronize all data in a MySQL database to a Kafka database
You can use an SQL deployment to synchronize data from all tables in a MySQL database to Kafka. For more information, see Synchronize data from all tables in a MySQL database to Kafka. You can also use a YAML deployment for data ingestion to synchronize data.
For example, a MySQL database named kafka_test contains two tables: customers and products. The following sample code shows how to separately synchronize data from the two tables to the customers and products topics.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
sink:
type: upsert-kafka
name: Upsert Kafka Sink
properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
If you do not use the route module, topics are created in Kafka with names in the database.table format. For example, a MySQL table named
kafka_test.customers
corresponds to a Kafka topic namedkafka_test.customers
.If you use Message Queue for Apache Kafka, you must configure the following parameters:
aliyun.kafka.accessKeyId
,aliyun.kafka.accessKeySecret
,aliyun.kafka.instanceId
,aliyun.kafka.endpoint
, andaliyun.kafka.regionId
. By default, Alibaba Cloud Message Queue for Apache Kafka does not automatically create topics. For more information, see FAQ about automatic topic creation. You must create a topic before synchronization. For more information about how to create a topic, see Step 3: Create resources.The data stored in the topic to which data is to be synchronized by using the YAML deployment for data ingestion is not the raw binary logs. You can use the Upsert Kafka connector to read the raw binary logs from Kafka topics by using a Flink SQL deployment.
Synchronize the binary logs from MySQL to Kafka
When you synchronize all data in a MySQL database to Kafka, snapshots of MySQL tables are generated in Kafka. In some scenarios, you need to store the raw binary logs to facilitate data auditing and data replay.
You can use a YAML deployment for data ingestion to synchronize the raw binary logs from MySQL to Kafka. This allows you to read binary logs in a distributed manner and fix data hotspot issues.
For example, a MySQL database named kafka_test contains two tables: customers and products. The following sample code shows how to separately synchronize data from the two tables to the customers and products topics.
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
properties.enable.idempotence: false
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
The following sample code shows the message body of a Kafka message generated by an UPDATE statement in the customers table:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}
If the route module is configured, null is displayed after JSON data is written to a database.
You can write binary logs to a Kafka topic in the canal-json or debezium-json format. The default format is debezium-json. For more information, see Kafka connector.
By default, all data is written to Partition 0 of a topic. You can set the
partition.strategy
parameter to adjust the Kafka partitioning strategy. For more information, see Kafka connector. The following sample code shows how to write data to partitions based on the hash values of primary keys and ensure that data with the same primary key is in the same partition and sorted in order:source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false partition.strategy: hash-by-key
Message Queue for Apache Kafka does not support idempotent or transactional write operations. When you use a YAML deployment for data ingestion to synchronize data to Message Queue for Apache Kafka, you must set the
properties.enable.idempotence
parameter to false to disable the idempotent write feature for the sink.If you want to write data of all tables to one topic in Kafka, you can set the
topic
parameter to specify the topic to which data is to be written. You do not need to configure additional route modules. The following sample code shows how to write all data to the kafka_test topic:source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false topic: kafka_test
Synchronize Kafka data to Hologres
After all data in a MySQL database is synchronized to a Hologres database and the binary logs are synchronized from MySQL to Kafka, MySQL data is stored in Kafka. You can use a YAML deployment for data ingestion to synchronize the Kafka data to Hologres to build a real-time data warehouse.
For example, the inventory topic in Kafka contains two tables: customers and products in the debezium-json format. The following sample code shows how to separately synchronize data from the two tables to the corresponding tables in Hologres.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
The Kafka data source can read data in the canal-json or debezium-json (default) format.
If the data format is debezium-json, you need to manually add a primary key to the table by using a rule in the transform module. Sample code:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
If you want to store the data in a single table in multiple partitions, or merge the data in different partitions of a table, you need to set debezium-json.distributed-tables or canal-json.distributed-tables to true.
The Kafka data source supports multiple schema inference policies. You can specify schema inference policies by using schema.inference.strategy. For more information about schema inference and change synchronization policies, see Kafka connector.
Schema change
You can use a YAML deployment for data ingestion to automatically synchronize the schema changes of a data source to a sink. To prevent high-risk operations such as table deletion and table clearing from affecting the sink, you can set the schema.change.behavior
parameter in the pipeline module to specify how to process schema changes. The default value of the schema.change.behavior
parameter is LENIENT, which indicates that deleting or clearing downstream tables is not allowed, and specific schema changes are supported to ensure data integrity. For more information, see the "Schema change processing methods" section of the Data ingestion development references topic.
The following sample code shows how to set the value of the schema.change.behavior parameter to EVOLVE:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
If you set the value of the schema.change.behavior parameter to EVOLVE, both the DROP TABLE and TRUNCATE TABLE operations are directly synchronized to the sink.
To flexibly manage schema changes, you can set the include.schema.changes
and exclude.schema.changes
parameters in the sink module to apply and exclude specific schema changes. For more information, see the "Management of schema changes received by the sink" section of the Data ingestion development references topic.
The following sample code shows how to skip the operations of deleting tables, deleting columns, and clearing tables when you synchronize data:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
Table adding
In a YAML deployment for data ingestion, you can add tables in the following scenarios:
The new table is an empty table with no historical data. You need to insert data into the table.
The table already exists before you run the deployment. You need to synchronize historical data of the table.
The new table is an empty table with no historical data
You can create an empty table that can be matched by a deployment when the deployment is running without the need to synchronize historical data. YAML deployments for data ingestion allow you to create such tables without restarting the deployment. You must set the scan.binlog.newly-added-table.enabled
parameter to true for the MySQL data source.
For example, a MySQL database contains a table named customers. After you run a YAML deployment for data ingestion, a table named products is created in the MySQL database. The following sample code shows how to synchronize the products table without restarting the deployment:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
After the YAML deployment is complete, all new tables are automatically created in the holo_test database in the sink.
The table already exists before you run the deployment and historical data synchronization is required
For example, a MySQL database contains the customers and products tables. The following sample code shows how to synchronize only the customers table when you start the deployment:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
After the deployment is running for a period of time, you need to synchronize all tables and historical data in the database. Sample code:
Create a savepoint and stop the deployment.
Add the table that you want to match to the MySQL source table configuration and set the
scan.newly-added-table.enabled
parameter to true.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
Restart the deployment from the created savepoint.
You cannot set both the scan.binlog.newly-added-table.enabled
and scan.newly-added-table.enabled
parameters to true at the same time.
Add metadata columns
You can use the transform module to add metadata columns to the sink during data writing. In the following sample code, data is synchronized to a Hologres deployment. The change type of the data change record is also written to the sink. For more information about metadata columns, see Metadata columns.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
transform:
- source-table: holo_test.customers
projection: \*, __data_event_type__ as op
description: add op
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
Start a deployment at a specific timestamp
When you start a stateless YAML deployment for data ingestion, you can specify the start time of the data source. This allows you to read data at a specific position in binary logs.