Kafka自訂連接器可以從Kafka叢集中提取資料或將Kafka資料推送到其他系統,本文介紹Kafka自訂連接器中的基本概念和串連配置。
基本概念
Connectors
Connectors位於邏輯抽象層,用於指定資料來源及資料目標。負責將資料從源服務中複製到Kafka的Topic或將資料從Topic複製到目標服務。
Tasks
Tasks是無狀態的邏輯執行單元,每個Connector執行個體會協同管理多個Tasks用於進行資料轉送。
Workers
Workers是用於承載Connectors執行個體和Tasks線程啟動並執行進程,支援兩種運行模式:單機模式(Standalone mode)和分布式模式(Distributed mode),一個Worker進程中可以運行多個Connector執行個體線程和Task線程。
- Standalone mode:只有一個獨立的Worker,所有工作都在此Worker中進行,不具備容錯性。
- Distributed mode:具備可擴充性和自動容錯能力,可以啟動多個Worker,這些Worker採用同一個group.id,構成Worker叢集,通過自動協調(類似Kafka的消費組中的協調均衡機制),在多個Worker之間調度執行Connectors和Tasks。如果增加Worker、關閉Worker或某個Worker進程意外失敗,其他Workers將會檢測到這種變化進行自動協調(Rebalance),重新分配Connectors以及Tasks。當Workers處於Distributed mode模式時,需要配置如下配置項:
- plugin.path:Kafka Connect架構層的配置項,主要在Kafka Connect啟動時用於定址Connectors可執行內容,這種可執行內容有如下兩種形式。可配置多個路徑,不同路徑之間用逗號分隔。例如:
/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors其中,前一個目錄包含Plugin及其第三方依賴所有類檔案的uber JAR。後一個目錄包含Plugin所需的所有JAR以及第三方依賴。
- group.id(預設connect-cluster):Connect Cluster Group使用的唯一名稱,不能和Consumer Group ID(消費者組)衝突。
- config.storage.topic:用於儲存Connector和任務配置的Topic,這是一個含有單個分區、多個副本的Topic,您需要手動建立此Topic,以確保是單個分區(自動建立可能會有多個分區)。
- offset.storage.topic(預設connect-offsets):用於儲存offsets的Topic,此Topic應該配置多個分區和副本。
- status.storage.topic(預設connect-status):用於儲存狀態的Topic,此Topic可以有多個分區和副本。
- plugin.path:Kafka Connect架構層的配置項,主要在Kafka Connect啟動時用於定址Connectors可執行內容,這種可執行內容有如下兩種形式。可配置多個路徑,不同路徑之間用逗號分隔。例如:
Converters
資料格式轉換組件,用於在Kafka與外部服務之間進行訊息資料的序列化以及還原序列化操作,使得資料格式以及結構滿足業務需求,Workers以及Connectors均可配置Converters,Connectors的Converter配置項可覆蓋Workers的Converter配置項,一般包含Avro、Protobuf、String、JSON、JSON Schema以及ByteArray等轉換格式。
串連配置
重要 控制台和ZIP檔案同時指定串連配置時,控制台設定檔將覆蓋ZIP設定檔內容。
| 參數(必填) | 說明 | 樣本 |
| name | Connector的名稱。一般命名為不包含ISO控制符的字串。 | mongo-sink |
| connector.class | Connector類的名稱或者別名。必須是org.apache.kafka.connect.connector.Connector的子類。 | com.mongodb.kafka.connect.MongoSinkConnector |
| task.max | 最大任務數量。取值範圍為[1,Kafka中Topic的最大分區數]。 | 1 |
| topics | 當Kafka參數配置參數為Sink Connect時,該參數指定資料來源Topic,不同Topic之間以半形逗號(,)進行分隔。 | sourceA,sourceB |
其他選填參數,請參見Kafka Connect Configs。
以JdbcSinkConnector為例,配置如下:
name=testConnector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=connect-test