全部產品
Search
文件中心

ApsaraMQ for Kafka:建立Elasticsearch Sink Connector

更新時間:Dec 27, 2024

當您需要將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Elasticsearch的索引,您可以通過Elasticsearch Sink Connector實現。本文介紹如何建立Elasticsearch Sink Connector。

前提條件

在匯出資料前,請確保您已完成以下操作:
  • 雲訊息佇列 Kafka 版
    • 雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector
    • 雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic
  • Function Compute
  • Elasticsearch
    說明
    • Function Compute使用的Elasticsearch用戶端版本為7.7.0,為保持相容,您需建立7.0或以上版本的Elasticsearch執行個體。
    • 配置白名單時,您可以設定網段為0.0.0.0/0,代表整個VPC可訪問,訪問成功後根據需要修改為對應的網段。

注意事項

  • 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Elasticsearch。關於Connector的限制說明,請參見使用限制
  • 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述
  • Function Compute的函數調用支援日誌查詢,以便您迅速排查問題。具體操作步驟,請參見配置日誌
  • 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。
  • 如果Elasticsearch Sink Connector存取點是私網存取點,Function Compute運行環境預設無法訪問,為確保網路暢通,需在Function Compute控制台為函數服務配置與Elasticsearch一致的VPC和vSwitch資訊。更多資訊,請參見更新服務

建立並部署Elasticsearch Sink Connector

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊Connector 任务列表

  4. Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector

  5. 创建 Connector設定精靈頁面,完成以下操作。
    1. 配置基本信息頁簽,按需配置以下參數,然後單擊下一步
      重要 雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色
      • 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Elasticsearch的功能。
      • 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
      關於該服務關聯角色的更多資訊,請參見服務關聯角色
      參數描述樣本值
      名称Connector的名稱。命名規則:
      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
      • 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。

      Connector的資料同步任務必須使用名稱為connect-任務名稱Group。如果您未手動建立該Group,系統將為您自動建立。

      kafka-elasticsearch-sink
      实例預設配置為執行個體的名稱與執行個體ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服务頁簽,選擇資料來源訊息佇列Kafka版,並配置以下參數,然後單擊下一步
      參數描述樣本值
      数据源 Topic需要同步資料的Topic。elasticsearch-test-input
      消费线程并发数資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      消费初始位置開始消費的位置。取值說明如下:
      • 最早位点:從最初位點開始消費。
      • 最近位点:從最新位點開始消費。
      最早位点
      VPC ID資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。vpc-bp1xpdnd3l***
      vSwitch ID資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。vsw-bp1d2jgg81***
      失败处理訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下:
      • 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
      • 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
      說明
      继续订阅
      创建资源方式選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
      • 自动创建
      • 手动创建
      自动创建
      Connector 消费组Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Group的名稱必須為connect-任務名稱connect-kafka-elasticsearch-sink
      任务位点 Topic用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-offset開頭。
      • 分區數:Topic的分區數量必須大於1。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-offset-kafka-elasticsearch-sink
      任务配置 Topic用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-config開頭。
      • 分區數:Topic的分區數量必須為1。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-config-kafka-elasticsearch-sink
      任务状态 Topic用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-status開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-status-kafka-elasticsearch-sink
      死信队列 Topic用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
      connect-error-kafka-elasticsearch-sink
      异常数据 Topic用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎類型為Local儲存,標準版暫不支援。
      connect-error-kafka-elasticsearch-sink
    3. 配置目标服务頁簽,選擇目標服務Elasticsearch,並配置以下參數,然後單擊创建
      參數描述樣本值
      ES 实例 IDElasticsearch執行個體ID。es-cn-oew1o67x0000****
      接入地址Elasticsearch執行個體的公網或私網地址。更多資訊,請參見查看執行個體的基本資料es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
      接入端口訪問Elasticsearch的公網或私網連接埠,取值如下:
      • 9200:基於HTTP或HTTPS。
      • 9300:基於TCP。

      更多資訊,請參見查看執行個體的基本資料

      9300
      用户名登入Kibana控制台的使用者名稱,預設為elastic。您也可以建立自訂使用者。具體操作,請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控elastic
      用户密码登入Kibana控制台的密碼。elastic使用者的密碼在建立執行個體時設定,如果忘記可重設。具體操作,請參見重設執行個體訪問密碼********
      索引Elasticsearch的索引名稱。elastic_test
      說明
      • 使用者名稱和使用者密碼會被用來初始化Elasticsearch對象,通過bulk投遞訊息,請確認帳號對索引有寫入權限。
      • 使用者名稱和使用者密碼是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Function Compute的函數,任務建立成功後,雲訊息佇列 Kafka 版不儲存相關資訊。
      建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
  6. 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署

配置函數服務

您在雲訊息佇列 Kafka 版控制台成功建立並部署Elasticsearch Sink Connector後,Function Compute會自動為您建立給該Connector使用的函數服務,服務命名格式為kafka-service-<connector_name>-<隨機String>

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,選擇更多 > 函数配置
    頁面跳轉至Function Compute控制台。
  2. 在Function Compute控制台,找到自動建立的函數服務,並配置其VPC和交換器資訊。請確保該資訊和您Elasticsearch相同。配置的具體步驟,請參見更新服務

發送測試訊息

您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Elasticsearch。

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试

  2. 发送消息面板,發送測試訊息。

    • 发送方式選擇控制台

      1. 消息 Key文字框中輸入訊息的Key值,例如demo。

      2. 消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。

      3. 設定发送到指定分区,選擇是否指定分區。

        • 單擊,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態

        • 單擊,不指定分區。

    • 发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。

    • 发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。

驗證結果

雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,登入Kibana控制台,執行GET /<index_name>/_search查看索引,驗證資料匯出結果。

雲訊息佇列 Kafka 版資料匯出至Elasticsearch的格式樣本如下:
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_****",
        "_type" : "_doc",
        "_id" : "TX3TZHgBfHNEDGoZ****",
        "_score" : 1.0,
        "_source" : {
          "msg_body" : {
            "key" : "test",
            "offset" : 2,
            "overflowFlag" : false,
            "partition" : 2,
            "timestamp" : 1616599282417,
            "topic" : "dv****",
            "value" : "test1",
            "valueSize" : 8
          },
          "doc_as_upsert" : true
        }
      }
    ]
  }
}