全部產品
Search
文件中心

ApsaraMQ for Kafka:Python SDK收發訊息

更新時間:Mar 04, 2025

本文介紹如何在Linux伺服器中使用Python SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。

環境準備

添加Python依賴庫

執行以下命令安裝依賴庫。

pip install confluent-kafka==1.9.2
重要

建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送訊息會報SSL_HANDSHAKE錯誤。

準備配置

下載Demo工程,根據實際存取點修改相應配置,然後將Demo工程上傳至Linux伺服器。

  1. 訪問aliware-kafka-demos,單擊image表徵圖,然後在下拉框選擇Download ZIP,下載Demo工程並解壓。

    說明

    下載的Demo工程中已包含SSL根憑證,如需單獨使用,請下載SSL根憑證

  2. 在解壓的Demo工程中,找到kafka-confluent-python-demo檔案夾,根據實際的存取點修改設定檔setting.py。

    預設存取點

    vpc目錄中,修改設定檔setting.py。

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    bootstrap_servers

    預設存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    topic_name

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    group_name

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

    SSL存取點

    vpc-ssl目錄中,修改設定檔setting.py。

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    sasl_plain_username

    SASL使用者名稱。

    說明
    • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台实例详情頁面的配置信息地區擷取預設的用户名密码
    • 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權

    sasl_plain_password

    SASL使用者名稱密碼。

    ca_location

    SSL根憑證的路徑。用本地路徑替換樣本中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert

    bootstrap_servers

    SSL存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    topic_name

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    group_name

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

  3. 將kafka-confluent-python-demo檔案夾上傳到Linux伺服器的/home路徑下。

發送訊息

根據實際的存取點,按照以下方式發送訊息。

預設存取點

  1. 執行以下命令,進入到/home/kafka-confluent-python-demo/vpc路徑。

    cd /home/kafka-confluent-python-demo/vpc
  2. 執行以下命令,發送訊息。

    python kafka_producer.py

訊息程式kafka_producer.py範例程式碼如下:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting
# 初始化一個Producer對象。
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# 非同步發送訊息。
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

# 在程式結束時,調用flush。
p.flush()

SSL存取點

  1. 執行以下命令,進入到/home/kafka-confluent-python-demo/vpc-ssl路徑。

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. 執行以下命令,發送訊息。

    python kafka_producer.py

訊息程式kafka_producer.py範例程式碼如下:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting

p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
   'ssl.endpoint.identification.algorithm': 'none',
   'sasl.mechanisms':'PLAIN',
   'ssl.ca.location':conf['ca_location'],
   'security.protocol':'SASL_SSL',
   'sasl.username':conf['sasl_plain_username'],
   'sasl.password':conf['sasl_plain_password']})


def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

p.flush()

訂閱訊息

根據實際的存取點,按照以下方式訂閱訊息。

預設存取點

  1. 執行以下命令,進入到/home/kafka-confluent-python-demo/vpc路徑。

    cd /home/kafka-confluent-python-demo/vpc
  2. 執行以下命令,訂閱訊息。

    python kafka_consumer.py

訊息程式kafka_consumer.py範例程式碼如下:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print("Consumer error: {}".format(msg.error()))
            continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

SSL存取點

  1. 執行以下命令,進入到/home/kafka-confluent-python-demo/vpc-ssl路徑。

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. 執行以下命令,訂閱訊息。

    python kafka_consumer.py

訊息程式kafka_consumer.py範例程式碼如下:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'ssl.endpoint.identification.algorithm': 'none',
    'sasl.mechanisms':'PLAIN',
    'ssl.ca.location':conf['ca_location'],
    'security.protocol':'SASL_SSL',
    'sasl.username':conf['sasl_plain_username'],
    'sasl.password':conf['sasl_plain_password'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest',
    'fetch.message.max.bytes':'1024*512'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
       else:
           print("Consumer error: {}".format(msg.error()))
           continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()