當您需要將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Elasticsearch的索引,您可以通過Elasticsearch Sink Connector實現。本文介紹如何建立Elasticsearch Sink Connector。
前提條件
在匯出資料前,請確保您已完成以下操作:- 雲訊息佇列 Kafka 版
- 為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
- 為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
- Function Compute
- Elasticsearch
- 在Elasticsearch管理主控台建立執行個體和索引。更多資訊,請參見快速入門。
- 在Elasticsearch白名單配置中添加了Function Compute服務地址所在的網段。更多資訊,請參見配置執行個體公網或私網訪問白名單。
說明- Function Compute使用的Elasticsearch用戶端版本為7.7.0,為保持相容,您需建立7.0或以上版本的Elasticsearch執行個體。
- 配置白名單時,您可以設定網段為0.0.0.0/0,代表整個VPC可訪問,訪問成功後根據需要修改為對應的網段。
注意事項
- 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Elasticsearch。關於Connector的限制說明,請參見使用限制。
- 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述。
- Function Compute的函數調用支援日誌查詢,以便您迅速排查問題。具體操作步驟,請參見配置日誌。
- 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。
- 如果Elasticsearch Sink Connector存取點是私網存取點,Function Compute運行環境預設無法訪問,為確保網路暢通,需在Function Compute控制台為函數服務配置與Elasticsearch一致的VPC和vSwitch資訊。更多資訊,請參見更新服務。
建立並部署Elasticsearch Sink Connector
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
- 在创建 Connector設定精靈頁面,完成以下操作。
- 在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。重要 雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色。
- 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Elasticsearch的功能。
- 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
參數 描述 樣本值 名称 Connector的名稱。命名規則: - 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
- 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-elasticsearch-sink 实例 預設配置為執行個體的名稱與執行個體ID。 demo alikafka_post-cn-st21p8vj**** - 在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
參數 描述 樣本值 数据源 Topic 需要同步資料的Topic。 elasticsearch-test-input 消费线程并发数 資料來源Topic的消費線程並發數。預設值為6。取值說明如下: - 1
- 2
- 3
- 6
- 12
6 消费初始位置 開始消費的位置。取值說明如下: - 最早位点:從最初位點開始消費。
- 最近位点:從最新位點開始消費。
最早位点 VPC ID 資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。 vpc-bp1xpdnd3l*** vSwitch ID 資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。 vsw-bp1d2jgg81*** 失败处理 訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下: - 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
- 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
說明- 如何查看日誌,請參見Connector相關操作。
- 如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅 创建资源方式 選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。 - 自动创建
- 手动创建
自动创建 Connector 消费组 Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Group的名稱必須為connect-任務名稱。 connect-kafka-elasticsearch-sink 任务位点 Topic 用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-offset開頭。
- 分區數:Topic的分區數量必須大於1。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-elasticsearch-sink 任务配置 Topic 用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-config開頭。
- 分區數:Topic的分區數量必須為1。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-elasticsearch-sink 任务状态 Topic 用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-status開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-elasticsearch-sink 死信队列 Topic 用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。 - Topic:建議以connect-error開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
connect-error-kafka-elasticsearch-sink 异常数据 Topic 用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。 - Topic:建議以connect-error開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
connect-error-kafka-elasticsearch-sink - 在配置目标服务頁簽,選擇目標服務為Elasticsearch,並配置以下參數,然後單擊创建。
參數 描述 樣本值 ES 实例 ID Elasticsearch執行個體ID。 es-cn-oew1o67x0000**** 接入地址 Elasticsearch執行個體的公網或私網地址。更多資訊,請參見查看執行個體的基本資料。 es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com 接入端口 訪問Elasticsearch的公網或私網連接埠,取值如下: - 9200:基於HTTP或HTTPS。
- 9300:基於TCP。
更多資訊,請參見查看執行個體的基本資料。
9300 用户名 登入Kibana控制台的使用者名稱,預設為elastic。您也可以建立自訂使用者。具體操作,請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控。 elastic 用户密码 登入Kibana控制台的密碼。elastic使用者的密碼在建立執行個體時設定,如果忘記可重設。具體操作,請參見重設執行個體訪問密碼。 ******** 索引 Elasticsearch的索引名稱。 elastic_test 說明- 使用者名稱和使用者密碼會被用來初始化Elasticsearch對象,通過bulk投遞訊息,請確認帳號對索引有寫入權限。
- 使用者名稱和使用者密碼是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Function Compute的函數,任務建立成功後,雲訊息佇列 Kafka 版不儲存相關資訊。
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
- 在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
- 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
配置函數服務
您在雲訊息佇列 Kafka 版控制台成功建立並部署Elasticsearch Sink Connector後,Function Compute會自動為您建立給該Connector使用的函數服務,服務命名格式為kafka-service-<connector_name>-<隨機String>。
- 在Connector 任务列表頁面,找到目標Connector,在其右側操作列,選擇。頁面跳轉至Function Compute控制台。
- 在Function Compute控制台,找到自動建立的函數服務,並配置其VPC和交換器資訊。請確保該資訊和您Elasticsearch相同。配置的具體步驟,請參見更新服務。
發送測試訊息
您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Elasticsearch。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
驗證結果
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,登入Kibana控制台,執行GET /<index_name>/_search查看索引,驗證資料匯出結果。
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}