すべてのプロダクト
Search
ドキュメントセンター

DataHub:Canal 用 DataHub プラグイン

最終更新日:Jan 12, 2025

Canal は、MySQL データベースの増分ログを解析することにより、増分データのサブスクライブと消費に使用されます。初期の頃、Alibaba グループは、杭州と米国のデータセンター間でデータを同期する必要がありました。実装方法は、ビジネストリガーに基づいて増分変更を取得することでした。 2010 年以降、Alibaba グループはデータベースログを解析することにより増分変更を取得し始めました。このような変換により、データベースにおける増分データのサブスクライブと消費が促進されます。 Canal は、エンジンが MySQL 5.1.x、5.5.x、5.6.x、5.7.x、および 8.0.x であるソースデータベースをサポートしています。

背景情報

説明

Canal は、MySQL データベースの増分ログを解析することにより、増分データのサブスクライブと消費に使用されます。初期の頃、Alibaba グループは、杭州と米国のデータセンター間でデータを同期する必要がありました。実装方法は、ビジネストリガーに基づいて増分変更を取得することでした。 2010 年以降、Alibaba グループはデータベースログを解析することにより増分変更を取得し始めました。このような変換により、データベースにおける増分データのサブスクライブと消費が促進されます。 Canal は、エンジンが MySQL 5.1.x、5.5.x、5.6.x、5.7.x、および 8.0.x であるソースデータベースをサポートしています。

Canal を使用すると Kafka にデータを書き込むことができ、DataHub は Kafka プロトコルと互換性があります。したがって、Canal を使用して MySQL から DataHub に増分データを書き込むことができます。 Canal が Kafka にデータを書き込むことができるように Canal が DataHub にデータを書き込むことができるようにするために、オープンソースの Canal フレームワークに次の必要な変更が加えられました。

  • Kafka の TopicName は、DataHub の ProjectName.TopicName に対応しています。したがって、Kafka の TopicName のピリオド(.)をアンダースコア(_)に置き換えるロジックは、オープンソースの Canal フレームワークから削除されました。この変更により、Kafka の TopicName を正しい DataHub トピックにマッピングできます。

  • DataHub は、認証に PLAIN Simple Authentication and Security Layer(SASL)を使用します。したがって、環境変数 -Djava.security.auth.login.config=$kafka_jaas_conf がスタートアップスクリプトに追加されます。

手順

このトピックでは、Kafka のように Canal を使用して DataHub にデータを書き込む方法の基本的な例を示します。パラメーターとパラメーターの説明の詳細については、「canal」をご参照ください。

1. canal.deployer パッケージをダウンロードする

canal.deployer-1.1.5-SNAPSHOT.tar.gz パッケージをダウンロードします。 DataHub 用に変更されていない Canal は、DataHub にデータを書き込めない可能性があります。

2. canal.deployer パッケージを固定ディレクトリにコピーし、パッケージを解凍する

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3. パラメーターを変更する

3.1 インスタンス構成ファイルの変更conf/example/instance.properties

# 必要に応じてデータベース情報を変更します。
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: データベースのユーザー名とパスワード。
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# データベース名またはテーブル名に基づいて動的トピックを指定します。
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# ハッシュパーティション設定
#canal.mq.partitionsNum=3
# データベース名.テーブル名: 一意のプライマリキー。複数のテーブルはカンマ(,)で区切ります。
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

IP アドレスが指定されている MySQL データベースを初期化および構成する必要があります。詳細については、「QuickStart」をご参照ください。データベース名に基づく動的トピック名とプライマリハッシュキーの設定の詳細については、「MQ 関連パラメーター」をご参照ください。

3.2 Canal構成ファイルの変更 conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、および kafka.sasl.mechanism パラメーターを構成する必要があります。必要に応じて他のパラメーターも変更できます。 kafka.bootstrap.servers パラメーターは、宛先トピックが存在するリージョンの Kafka のエンドポイントを指定します。 Kafka の使用可能なエンドポイントの詳細については、「Kafka との互換性」をご参照ください。

3.3 JASS構成ファイルの変更 conf/kafka_client_producer_jaas.conf

kafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

4. Canal を有効化および無効化する

Canal を有効にする前に、DataHub トピックが作成されていることを確認してください。作成されたトピックの要件の詳細については、「Kafka との互換性」をご参照ください。

4.1 Canal を有効にする

cd /usr/local/canal/
sh bin/startup.sh

4.2 ログを表示する

vi logs/canal/canal.log コマンドを実行して、logs/canal/ ディレクトリにある canal.log ファイルを表示します。

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## Canal サーバーを起動します。
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## Canal サーバー [10.1.29.120:11111] を起動します。
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## Canal サーバーが実行中です ......

vi logs/example/example.log コマンドを実行して、インスタンスのログを表示します。

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [canal.properties] からプロパティファイルをロードしています。
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [example/instance.properties] からプロパティファイルをロードしています。
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 1-example の CannalInstance を起動します。
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 起動に成功しました....

vi logs/example/meta.log コマンドを実行して、メタデータログを表示します。

meta.log ファイルには、データベースの挿入、削除、変更ごとにレコードが生成されます。 meta.log ファイルを表示して、Canal がデータを収集したかどうかを確認できます。

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

4.3 Canal を無効にする

cd /usr/local/canal/
sh bin/stop.sh

DataHub トピック

宛先 DataHub トピックは TUPLE タイプで、次のスキーマを持ちます。

+-------+------+----------+-------------+
| Index | name |   type   |  allow NULL |
+-------+------+----------+-------------+
|   0   |  key |  STRING  |     true    |
|   1   |  val |  STRING  |     true    |
+-------+------+----------+-------------+

MySQL

ソース MySQL テーブルのスキーマ

mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid   | int(11) | YES  |     | NULL    |       |
| pid   | int(11) | YES  |     | NULL    |       |
| num   | int(11) | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)

データ

データが DataHub に書き込まれた後、key フィールドは null になり、val フィールドの値は JSON 文字列になります。

mysql> insert into orders values(1,2,3);

{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}