本文介紹如何建立RabbitMQ Connector,將雲訊息佇列 RabbitMQ 版的資料同步至ApsaraMQ for Kafka。
前提條件
開通Object Storage Service服務並建立儲存空間(Bucket)。更多資訊,請參見控制台建立儲存空間。
開通Serverless應用引擎服務。更多資訊,請參見準備工作。
購買並部署ApsaraMQ for Kafka執行個體。更多資訊,請參見購買和部署執行個體。
步驟一:建立RabbitMQ資源
登入雲訊息佇列 RabbitMQ 版控制台,建立RabbitMQ執行個體。操作步驟,請參見建立執行個體。
單擊已建立的執行個體,在執行個體詳情頁面建立以下資源。
在左側導覽列,單擊靜態使用者名稱密碼,然後單擊建立使用者名稱密碼。更多資訊,請參見建立使用者名稱密碼。
建立完成後儲存使用者名稱和密碼。

在左側導覽列,單擊Vhost列表,然後單擊建立Vhost。更多資訊,請參見建立Exchange。
在左側導覽列,單擊Queue列表,在當前 Vhost右側的切換下拉式清單中,選擇已建立的Vhost,然後單擊建立Queue。更多資訊,請參見建立Queue。
步驟二:建立Connector
下載RabbitMQ Connector檔案,上傳至提前建立好的OSS Bucket。更多資訊,請參見控制台上傳檔案。
登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。
在左側導覽列,選擇。
在任务列表頁面,單擊创建任务。
在创建任务面板。設定任務名稱,配置以下配置項。
任務建立
在Source(源)設定精靈,選擇数据提供方為Apache Kafka Connect,單擊下一步。
在连接器配置設定精靈,設定以下配置項,然後單擊下一步。
配置項
參數
說明
Kafka Connect外掛程式
Bucket儲存桶
選擇OSS Bucket。
檔案
選擇上傳的.ZIP檔案。
Kafka資源資訊
Kafka 參數配置
選擇Source Connect。
Kafka執行個體
選擇前提條件中建立的執行個體。
Virtual Private Cloud
預設選擇部署Kafka執行個體時選擇的VPC ID且不可更改。
交換器
預設選擇部署Kafka執行個體時選擇的vSwitch ID且不可更改。
安全性群組
選擇安全性群組。
Kafka Connect配置資訊
解析當前ZIP包下的properties檔案
選擇建立properties檔案。選擇.ZIP檔案中包含的SourceConnector對應的.properties檔案。路徑為/etc/source-xxx.properties。在輸入框中更新下欄欄位的取值。
connector.class:啟動並執行Connector的包名稱,無需修改。
tasks.max:Task的最大數量。
rabbitmq.host:填寫RabbitMQ執行個體VPC存取點地址。可在RabbitMQ執行個體詳情頁面的存取點資訊地區查看。
rabbitmq.username:填寫步驟一:建立RabbitMQ資源中建立的RabbitMQ執行個體靜態使用者名稱。
rabbitmq.password:填寫步驟一:建立RabbitMQ資源中建立的RabbitMQ執行個體靜態使用者名稱密碼。
rabbitmq.virtual.host:填寫步驟一:建立RabbitMQ資源中建立的Vhost。
kafka.topic:目標Kafka Topic,請在投遞資料前,提前建立好目標Topic。
rabbitmq.queue:填寫步驟一:建立RabbitMQ資源中建立的Queue。
範例程式碼如下:
connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector name=rabbitmq-source-connector # RabbitMQ執行個體VPC存取點資訊。 rabbitmq.host=xxx # RabbitMQ執行個體靜態使用者名稱密碼。 rabbitmq.password=xxx # RabbitMQ執行個體靜態使用者名稱。 rabbitmq.username=xxx # RabbitMQ執行個體Vhost。 rabbitmq.virtual.host=xxx # 目標Kafka Topic。 kafka.topic=xxx # RabbitMQ執行個體隊列。 rabbitmq.queue=xxx tasks.max=4在实例配置設定精靈,設定以下參數,然後單擊下一步。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設定最小Worker數量。
最大Worker數
設定最大Worker數量。此數值不得超過Task的最大數量。
橫向擴縮容閾值 %
當利用率大於或小於設定的CPU和Memory數值時,觸發自動擴容或縮容。僅當最小Worker數和最大Worker數值不相等時,需要配置此參數。
Kafka Connect Worker 配置
自動建立Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka執行個體中自動建立Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,並將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用於儲存來源資料位移量,命名規則為
connect-eb-offset-<任務名稱>。Config Topic:用於儲存Connectors以及Tasks的配置資訊,命名規則為
connect-eb-config-<任務名稱>。Status Topic:用於儲存Connectors以及Tasks狀態資訊,命名規則為
connect-eb-status-<任務名稱>。Kafka Connect Consumer Group:Kafka Connect Worker用於消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用於消費源Topic中的資料,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>。
在回合組態地區,將日誌投遞方式設定為投遞至SLS或者投遞至Kafka,在角色授權卡片設定Connect依賴的角色配置,然後單擊儲存。
重要建議配置的角色包含AliyunSAEFullAccess許可權,否則可能會導致任務運行失敗。
任務屬性
設定此任務的重試策略及無效信件佇列。更多資訊,請參見重試和死信。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector
登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表。
在实例列表頁面的頂部功能表列選擇地區,然後在執行個體列表中,單擊目標執行個體名稱。
在左側導覽列,單擊Queue列表,然後單擊目標Queue右側操作列的詳情。
在Queue詳情頁面,單擊被綁定資訊頁簽的添加被綁定。
在添加被綁定面板,選擇源Exchange為amq.direct,單擊確定。
在被綁定資訊頁簽,單擊amq.direct Exchange右側操作列的發送訊息,向Kafka的目標Topic發送訊息。更多資訊,請參見發送訊息。

登入雲訊息佇列 Kafka 版控制台,在執行個體列表頁面,單擊目標執行個體。
在目標執行個體頁面,單擊目標Topic,然後單擊訊息查詢,查看插入的訊息資料。

常見報錯
情境一:所有Tasks運行失敗
錯誤資訊:
All tasks under connector mongo-source failed, please check the error trace of the task.解決方案:在訊息流程入任務詳情頁面,單擊基礎資訊地區的診斷連結,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤資訊。
情境二:Kafka Connect退出
錯誤資訊:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.解決方案:由於狀態擷取可能會有延遲,建議您先嘗試重新整理頁面。若重新整理後仍然是失敗狀態,您可以按照以下步驟查看錯誤資訊。
在訊息流程入任務詳情頁面的Worker資訊地區,單擊SAE應用後的執行個體名稱,跳轉到SAE應用詳情頁面。
在基本資料頁面,單擊執行個體部署資訊頁簽。
在執行個體右側操作列,單擊Webshell登入Kafka Connect運行環境。

執行
vi /home/admin/connector-bootstrap.log命令,查看Connector開機記錄,尋找其中是否包含錯誤資訊。執行
vi /opt/kafka/logs/connect.log命令,查看Connector作業記錄,在其中尋找ERROR或者WARN欄位來查看是否有錯誤資訊。
基於錯誤資訊提示進行修複操作後,可以重新啟動對應任務。
情境三:Connector參數校正失敗
錯誤資訊:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`解決方案:此時需要根據錯誤資訊,找出具體哪個參數出錯,更新對應參數即可。若基於上述錯誤資訊無法定位具體的出錯參數,可以參考上文情境二中的步驟登入Kafka Connect運行環境,執行以下命令,查詢參數是否校正通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate該指令會返回Connector參數中每個參數是否校正通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}