全部產品
Search
文件中心

ApsaraMQ for Kafka:建立MaxCompute Sink Connector

更新時間:Mar 28, 2025

本文介紹如何建立使用MaxCompute Sink Connector,您可以通過MaxCompute Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至MaxCompute的表中。

前提條件

詳細步驟,請參見建立前提

注意事項

如需使用MaxCompute資料分割函數,建立表時需額外建立一個分區列,列名為time,類型為STRING。

步驟一:建立目標資源

通過MaxCompute用戶端建立表。更多資訊,請參見建立表

本文以名稱為kafka_to_maxcompute的表為例。表中有3列資料,並使用資料分割函數。該表的建表語句如下:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);

如不使用資料分割函數,語句如下:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);

執行成功後,如下圖所示:執行成果

表管理頁面,查看建立的表資訊。表

步驟二:建立MaxCompute Sink Connector並啟動

  1. 登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。

  2. 在左側導覽列,選擇Connector生態整合 > 工作清單

  3. 工作清單頁面,單擊建立任務

  4. 创建任务面板,設定任务名称描述,配置以下參數。

    • 任務建立

      1. Source(源)設定精靈,選擇数据提供方訊息佇列 Kafka 版,設定以下參數,然後單擊下一步

        參數

        說明

        樣本

        地域

        源Kafka執行個體所在的地區。

        華東1(杭州)

        kafka執行個體

        資料來源所在的Kafka執行個體ID。

        alikafka_post-cn-9hdsbdhd****

        Topic

        資料來源所在的Kafka執行個體Topic。

        guide-sink-topic

        Group ID

        資料來源所在的Kafka執行個體中的Group ID。

        • 快速建立:自動建立以GID_EVENTBRIDGE_xxx命名的Group ID。

        • 使用已有:選擇已建立的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的訊息收發。

        使用已有

        消费位点

        • 最新位點:從最新位點開始消費。

        • 最早位點:從最初位點開始消費。

        最新位點

        網路設定

        有跨境傳輸資料需求時選擇自建公網,其他情況可選擇基礎網路

        基礎網路

        資料格式

        資料格式是針對支援二進位傳遞的資料來源端推出的指定內容格式的編碼能力。支援多種資料格式編碼,如無特殊編碼訴求可將格式設定為Json。

        • Json(預設Json格式編碼,位元據按照utf-8 編碼為Json格式放入Payload。)

        • Text(文字格式設定編碼,位元據按照utf-8編碼為字串放入Payload。)

        • Binary(二進位格式編碼,位元據按照Base64編碼為字串放入Payload。)

        Json

        批量推送条数

        調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。

        2000

        批量推送间隔(单位:秒)

        調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。

        3

      2. Filtering(过滤)設定精靈,定義資料模式過濾發送的請求。更多資訊,請參見事件模式

      3. Transform(转换)設定精靈,設定資料清洗,實現分割、映射、富化及動態路由等繁雜資料加工能力。更多資訊,請參見使用Function Compute實現訊息資料清洗

      4. Sink(目标)設定精靈,選擇服务类型MaxCompute acs.maxcompute,配置以下參數。

        參數

        說明

        樣本

        帳號AccessKey ID

        阿里雲帳號的AccessKey ID,用於訪問MaxCompute服務。

        yourAccessKeyID

        帳號AccessKey Secret

        阿里雲帳號的AccessKey Secret。

        yourAccessKeySecret

        MaxCompute專案名稱

        選擇已建立的MaxCompute專案。

        test_compute

        MaxCompute表名稱

        選擇已建立的MaxCompute表。

        kafka_to_maxcompute

        MaxCompute表入參

        選擇MaxCompute表後,此處會展示表的列名和類型資訊,配置資料擷取規則即可。下面是一條訊息樣本,本樣本中Topic列對應的值從Topic欄位提取,則定義資料擷取規則$.topic

        {
          'data': {
            'topic': 't_test',
            'partition': 2,
            'offset': 1,
            'timestamp': 1717048990499,
            'headers': {
              'headers': [],
              'isReadOnly': False
            },
            'key': 'MaxCompute-K1',
            'value': 'MaxCompute-V1'
          },
          'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1',
          'source': 'acs:alikafka',
          'specversion': '1.0',
          'type': 'alikafka:Topic:Message',
          'datacontenttype': 'application/json; charset=utf-8',
          'time': '2024-05-30T06:03:10.499Z',
          'aliyunaccountid': '1413397765616316'
        }

        topic:$.data.topic

        valuename:$.data.value

        valueage:$.data.offset

        分區配置

        • 關閉:不開啟分區能力。

        • 開啟:開啟分區能力。

          開啟分區後需配置分區值等參數:

          • 分取值支援{yyyy}、{MM}、{dd}、{HH}、{mm}時間變數參數,分別對應年、月、日、時、分。時間變數大小寫敏感。

          • 分取值支援填寫常量。

        開啟

        {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix

        網路設定

        • 專用網路:通過Virtual Private Cloud將Kafka訊息投遞到MaxCompute。

        • 公網:通過公網將Kafka訊息投遞到MaxCompute。

        公網

        VPC

        選擇VPC ID。僅當網路設定專用網路時需要配置此參數。

        vpc-bp17fapfdj0dwzjkd****

        交換器

        選擇vSwitch ID。僅當網路設定專用網路時需要配置此參數。

        vsw-bp1gbjhj53hdjdkg****

        安全性群組

        選擇安全性群組。僅當網路設定專用網路時需要配置此參數。

        test_group

    • 任務屬性

      配置事件推送失敗時的重試策略及錯誤發生時的處理方式。更多資訊,請參見重試和死信

  5. 完成上述配置後,單擊儲存。在工作清單頁面,找到剛建立的MaxCompute Sink Connector任務,此時狀態欄為啟動中,當狀態變為運行中時,Connector建立成功。

步驟三:測試MaxCompute Sink Connector

  1. 工作清單頁面,在MaxCompute Sink Connector任務的事件來源列單擊源Topic。

  2. 在Topic詳情頁面,單擊體驗發送訊息

  3. 快速體驗訊息收發面板,按照下圖配置訊息內容,然後單擊確定

    發送訊息

  4. 進入MaxCompute控制台,執行以下SQL語句查看分區資訊。

    show PARTITIONS kafka_to_maxcompute;

    查詢結果如下所示:分區

  5. 根據分區資訊,執行以下語句,查看分區內資料。

    SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    查詢結果如下所示:分區內資料