All Products
Search
Document Center

DataHub plug-in for Canal

Last Updated: Aug 06, 2021

Canal is used to subscribe to and consume incremental data by parsing incremental logs of MySQL databases. In the early days, Alibaba Group needed to synchronize data between the data centers in Hangzhou and the United States. The implementation method was to obtain incremental changes based on business triggers. Since 2010, Alibaba Group began to obtain incremental changes by parsing database logs. Such transformation boosts the subscription to and consumption of the incremental data in databases. Canal supports source databases whose engines are MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x.

1. Background information

Note

Canal is used to subscribe to and consume incremental data by parsing incremental logs of MySQL databases. In the early days, Alibaba Group needed to synchronize data between the data centers in Hangzhou and the United States. The implementation method was to obtain incremental changes based on business triggers. Since 2010, Alibaba Group began to obtain incremental changes by parsing database logs. Such transformation boosts the subscription to and consumption of the incremental data in databases. Canal supports source databases whose engines are MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x.

Canal allows you to write data to Kafka, and DataHub is compatible with the Kafka protocol. Therefore, you can use Canal to write incremental data from MySQL to DataHub. To ensure that Canal can write data to DataHub like Kakfa, the following necessary changes have been made to the open source Canal framework:

  • Kafka TopicName corresponds to ProjectName.TopicName in DataHub. Therefore, the logic used to replace the period (.) in Kafka TopicNamewith the underscore (_) is removed from the open source Canal framework. This change ensures that Kafka TopicName can be mapped to the correct DataHub topic.

  • DataHub uses PLAIN Simple Authentication and Security Layer (SASL) for authentication. Therefore, the environment variable -Djava.security.auth.login.config=$kafka_jaas_conf is added to the startup script.

2. Instructions

This topic provides a basic example on how to use Canal to write data to DataHub like Kafka. For more information about parameters and parameter descriptions, see canal.

1. Download the canal.deployer package

Download the canal.deployer-1.1.5-SNAPSHOT.tar.gz package. Canal that has not been modified for DataHub may not be able to write data to DataHub.

2. Copy the canal.deployer package to a fixed directory and decompress the package.

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3.Modify parameters

3.1 Modify the instance configuration file conf/example/instance.properties

# Modify the database information as needed.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# The username and password of the database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# Specify a dynamic topic based on the database name or table name.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# Database name.Table name: the unique primary key. Separate multiple table names with commas (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

The MySQL database whose IP address is specified must be initialized and configured. For more information, see QuickStart. For more information about the dynamic topic name based on the database name and the settings of the primary hash key, see mq-related parameters.

3.2 Modify the Canal configuration file conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

You must set the canal.serverMode, kafka.bootstrap.servers, kafka.security.protocol, and kafka.sasl.mechanism parameters. You can also modify other parameters as required. The kafka.bootstrap.servers parameter specifies the endpoint of Kafka in the region where the destination topic resides. For more information about available endpoints of Kafka, see Compatibility with Kafka.

3.3 Modify the jass configuration file conf/kafka_client_producer_jaas.conf

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

4. Start and stop Canal

Before you start Canal, make sure that a DataHub topic is created. For more information about the requirements for the created topic, see Compatibility with Kafka.

4.1 Start Canal

cd /usr/local/canal/
sh bin/startup.sh

4.2 View logs

Run the vi logs/canal/canal.log command to view the canal.log file in the logs/canal/ directory.

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Run the vi logs/example/example.log command to view the logs of an instance.

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Run the vi logs/example/meta.log command to view metadata logs.

A record is generated for each insertion, deletion, and modification of the database in the meta.log file. You can view the meta.log file to check whether Canal has collected data.

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

4.3 Stop Canal

cd /usr/local/canal/
sh bin/stop.sh

3. Examples

DataHub topic

The destination DataHub topic is of the TUPLE type and has the following schema:

+-------+------+----------+-------------+
| Index | name |   type   |  allow NULL |
+-------+------+----------+-------------+
|   0   |  key |  STRING  |     true    |
|   1   |  val |  STRING  |     true    |
+-------+------+----------+-------------+

MySQL

Schema of the source MySQL table

mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid   | int(11) | YES  |     | NULL    |       |
| pid   | int(11) | YES  |     | NULL    |       |
| num   | int(11) | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)

Data

After the data is written to DataHub, the key field is null, and the value of the val field is a JSON string.

mysql> insert into orders values(1,2,3);

{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}