This topic explains how to quickly build a data synchronization job from Kafka to Hologres to ingest real-time log data using the Realtime Compute for Apache Flink console.
Prerequisites
-
Ensure that your RAM user or RAM role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permissions.
-
You have created a Flink workspace. For more information, see Create a workspace.
-
Upstream and downstream storage
-
You have created an ApsaraMQ for Kafka instance. For more information, see Step 2: Purchase and deploy an instance.
-
You have created a Hologres instance. For more information, see Purchase a Hologres instance.
NoteYour ApsaraMQ for Kafka and Hologres instances must be in the same region and VPC as your Realtime Compute for Apache Flink workspace. Otherwise, you must establish network connectivity. For more information, see Access services across VPCs or Access the internet.
-
Step 1: Configure an IP whitelist
To allow Flink to access your Kafka and Hologres instances, add the CIDR block of your Flink workspace to the Kafka and Hologres IP whitelists.
-
Obtain the VPC CIDR block of your Flink workspace.
-
Log in to the Realtime Compute for Apache Flink console.
-
In the Actions column of the target workspace, choose .
-
In the Workspace Details dialog box, find the VSwitch's CIDR block.
The dialog box displays basic workspace information and a list of VSwitches. In the CIDR block column, view the CIDR block for each availability zone. Take note of this CIDR block for the next step.
-
-
Add the CIDR block of the Flink workspace to the IP whitelist of your Kafka instance.
You must configure the IP whitelist for a VPC endpoint. For detailed steps, see Configure an IP whitelist. In the whitelist editing dialog box, click Add Whitelist IP to add the CIDR block.
-
Add the CIDR block of the Flink workspace to the IP whitelist of your Hologres instance.
Log in to your Hologres instance and configure its IP whitelist. For detailed steps, see IP whitelist. On the whitelist configuration page in the HoloWeb Security Center, enter the CIDR block in the IP Address field of the Edit IP Whitelist dialog box and click OK.
Step 2: Prepare Kafka test data
Use the Faker connector of Realtime Compute for Apache Flink to generate and write data to ApsaraMQ for Kafka. Follow these steps in the development console of Realtime Compute for Apache Flink.
-
In the ApsaraMQ for Kafka console, create a topic named users.
For more information, see Create a topic.
-
Create a job to write data to ApsaraMQ for Kafka.
-
Log on to the development console of Realtime Compute for Apache Flink.
-
In the Actions column of the target workspace, click Console.
-
In the left-side navigation pane, choose .
-
Click the
icon, and then click New Stream Draft. Enter a File Name and select an Engine Version.Realtime Compute for Apache Flink also offers various code and data synchronization templates. Each includes specific use cases, code samples, and instructions. Click a template to quickly learn the features and syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.
Parameter
Description
Example
File Name
The name of the job.
NoteThe job name must be unique within the current workspace.
flink-test
Engine Version
The Flink engine version for the current job.
Select a version labeled Recommended or Stable for higher reliability and performance. For more information about engine versions, see Release notes and Engine versions.
vvr-8.0.8-flink-1.17
-
Click Create.
-
Write the SQL statements for the job.
Copy the following code to the editor and modify the parameters for your environment.
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'properties.enable.idempotence'='false' ); INSERT INTO sink SELECT * FROM source;The following table describes the parameters that you need to modify.
Parameter
Example value
Description
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
The ApsaraMQ for Kafka broker endpoints.
A comma-separated list of broker endpoints in 'host:port' format. You can find the VPC Domain Name endpoint in the Endpoint Information section on the Instance Details page.
topic
users
The ApsaraMQ for Kafka topic name.
-
-
Start the job.
-
On the page, click Deploy.
-
In the Deploy draft dialog box, click Confirm.
-
Configure resources for the job. For more information, see Configure resources for a job.
-
On the page, find the target deployment and click Start in the Actions column. For more information about the startup configuration, see Start a deployment.
-
On the Deployments page, you can monitor the runtime information and status of the deployment.
Because the Faker source generates a bounded stream, the deployment's status changes to FINISHED about one minute after starting. When the deployment is finished, the data has been written to the 'users' topic. The following code shows a sample of the JSON-formatted data.
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
-
Step 3: Create and start a data sync job
Flink CDC
-
Log on to the Realtime Compute for Apache Flink development console to create a data synchronization job.
-
Log on to the Realtime Compute for Apache Flink console.
-
In the Actions column for the target workspace, click Console.
-
In the left-side navigation pane, choose .
-
Click the
icon and then click New ETL Draft. Enter a name and select an engine version.Parameter
Description
Example
Name
The name of the job.
NoteThe job name must be unique within the project.
flink-test
Engine Version
The Flink engine version for the job.
Select a version labeled Recommended or Stable. These versions provide higher reliability and performance. For more information about engine versions, see Release notes and Engine versions.
vvr-8.0.8-flink-1.17
-
Click Create.
-
-
Write a Flink CDC job. Copy the following code to the editor and update the parameters for your environment.
The following job synchronizes JSON-formatted table data from the users topic in Kafka to the users table in the test_schema schema of the flink_test_db database in Hologres.
source: type: kafka name: Kafka Source properties.bootstrap.servers: alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092 topic: users scan.startup.mode: earliest-offset value.format: json json.infer-schema.flatten-nested-columns.enable: true sink: type: hologres name: Hologres Sink endpoint: hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 dbname: flink_test_db username: ****** password: ** sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT transform: - source-table: \.*.\.* projection: \* primary-keys: id route: - source-table: users sink-table: test_schema.usersThe following table describes the parameters to modify.
Parameter
Example
Description
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
The addresses of the Kafka brokers.
The format is a comma-separated list of host:port entries. You can obtain the Domain Name Endpoint for the VPC network type from the Network Information section on the instance details page.
topic
users
The name of the Kafka topic.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
The endpoint of the Hologres instance.
The format is <ip>:<port>. You can obtain the VPC endpoint from the Network Information section on the instance details page in the Hologres console.
username
**
The username and password for the Hologres database. Enter the AccessKey ID and AccessKey Secret of your Alibaba Cloud account.
ImportantTo prevent leaking your AccessKey pair, use variable management to specify your AccessKey ID and AccessKey Secret. For more information, see Manage variables.
password
**
dbname
flink_test_db
The name of the Hologres database.
source-table
users
The source table. By default, the topic name is used.
sink-table
test_schema.users
The destination table. Specify the table in the
schema.table_nameformat. -
Click Save.
-
On the page, click Deploy.
-
On the page, find the target deployment and click Start in the Actions column. For more information about job startup configurations, see Start a job.
After the job starts, you can view its runtime information and status on the Deployments page. This page displays a list of deployments with metrics such as Status, health score, CPU, and memory, and provides actions such as Start and Stop.
SQL
-
Log on to the Realtime Compute for Apache Flink development console to create a data synchronization job.
-
Log on to the Realtime Compute for Apache Flink console.
-
In the Actions column for the target workspace, click Console.
-
In the left-side navigation pane, choose , and then click New.
-
Click the
icon and then click New Stream Draft. Enter a name and select an engine version.Parameter
Description
Example
Name
The name of the job.
NoteThe job name must be unique within the project.
flink-test
Engine Version
The Flink engine version for the job.
Select a version labeled Recommended or Stable. These versions provide higher reliability and performance. For more information about engine versions, see Release notes and Engine versions.
vvr-8.0.8-flink-1.17
-
Click Create.
-
-
Write the SQL job. Copy the following code to the SQL editor and update the parameters for your environment.
You can use an INSERT INTO statement to synchronize data from the users topic in Kafka to the users table in the flink_test_db database of Hologres.
Hologres provides special optimizations for JSON and JSONB data types. You can use an INSERT INTO statement to write nested JSON data to Hologres.
This method requires you to first create the users table in Hologres and then run the following SQL statements to write data to the table.
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, -- The data in this column is nested JSON. `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Automatically expand nested columns. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '******', 'password' = '******', 'dbname' = 'flink_test_db', 'tablename' = 'users' ); INSERT INTO holo SELECT * FROM kafka_users;The following table describes the parameters to modify.
Parameter
Example
Description
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
The addresses of the Kafka brokers.
The format is a comma-separated list of host:port entries. You can obtain the Domain Name Endpoint for the VPC network type from the Network Information section on the instance details page.
topic
users
The name of the Kafka topic.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
The endpoint of the Hologres instance.
The format is <ip>:<port>. You can obtain the VPC endpoint from the Network Information section on the instance details page in the Hologres console.
username
******
The username and password for the Hologres database. Enter the AccessKey ID and AccessKey Secret of your Alibaba Cloud account.
ImportantTo prevent leaking your AccessKey pair, use variable management to specify your AccessKey ID and AccessKey Secret. For more information, see Manage variables.
password
******
dbname
flink_test_db
The name of the Hologres database.
tablename
users
The name of the Hologres table.
Note-
If you use an INSERT INTO statement to synchronize data, you must create the users table and its columns in the destination database in advance.
-
If the schema is not public, you must specify the tablename parameter in the
schema.table_nameformat.
-
-
Click Save.
-
On the page, click Deploy.
-
On the page, find the target deployment and click Start in the Actions column. For more information about job startup configurations, see Start a job.
After the job starts, you can view its runtime information and status on the Deployments page. This page displays a list of deployments with metrics such as Status, health score, CPU, and memory, and provides actions such as Start and Stop.
Step 4: View the full synchronization result
Log on to the Hologres management console.
-
On the Instances page, click the name of your target instance.
-
In the upper-right corner of the page, click Connect to Instance.
-
On the Metadata Management tab, view the table schema and data of the synchronized users table in the flink_test_db database.
In the left-side navigation tree, expand your instance name > flink_test_db > test_schema > Tables. The synchronized users table appears.
The schema and data of the synchronized table are as follows.
-
Table schema
Double-click the users table name to view the table schema.
The users table schema contains the following fields: id (BIGINT, primary key), first_name (TEXT), last_name (TEXT), address.country (TEXT), address.state (TEXT), and address.city (TEXT).
NoteDuring full data synchronization, we recommend defining the Kafka metadata partition and offset as the primary key for the Hologres table. This prevents data duplication if the job fails over and retransmits data.
-
Table data
In the upper-right corner of the users table page, click Query table. Enter the following command and click Run.
SELECT * FROM test_schema.users;The command returns the following result.
The query returns multiple rows. This confirms that records were successfully synchronized to the users table. Each returned row contains complete data for the id, first_name, last_name, address.country, address.state, and address.city columns.
-
Step 5: Observe automatic schema synchronization
-
In the ApsaraMQ for Kafka console, manually send a message that contains a new column.
-
Log in to the ApsaraMQ for Kafka console.
-
On the Instances page, click the name of the target instance.
-
On the Topics page, click the name of the target topic (users).
-
Click Send Message.
-
Configure the message.
In the Start to Send and Consume Message dialog box, configure the parameters as follows.
Parameter
Example
Method of Sending
Select Console.
Message Key
Enter flinktest.
Message Content
Copy and paste the following JSON content into the Message Content field.
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }NoteIn this example, house-points is a new nested column.
Send to Specified Partition
Select Yes.
Partition ID
Enter 0.
-
Click OK.
-
-
In the Hologres console, view the schema and data changes in the users table.
-
Log in to the Hologres console.
-
On the Instances page, click the name of the target instance.
-
In the upper-right corner of the page, click Connect to Instance.
-
On the Metadata Management tab, double-click the users table name.
-
Click Query table, enter the following statement, and then click Running.
SELECT * FROM test_schema.users; -
View the query results.
The query returns the following result.
The result shows that the record with ID 100001 is successfully written to Hologres, and two new columns, house-points.house and house-points.points, are added to the Hologres table.
NoteThe message sent to ApsaraMQ for Kafka contains only one nested column: house-points. However, because json.infer-schema.flatten-nested-columns.enable is specified in the WITH clause, Realtime Compute for Apache Flink automatically flattens this column, using the nested field access paths as the new column names.
-
References
-
For information on the CREATE TABLE AS (CTAS) statement, see CREATE TABLE AS (CTAS) statement.
-
For information on using Message Queue for Apache Kafka as a source or result table, see Message Queue for Apache Kafka.
-
To improve job performance by adjusting node parallelism and resources, see Configure job deployment.