本文介绍如何使用Python SDK通过接入点接入云消息队列 Kafka 版并收发消息。
环境准备
添加Python依赖库
执行以下命令安装依赖库。
pip install confluent-kafka==1.9.2
重要
建议您安装confluent-kafka 1.9.2及以下版本的依赖库,否则使用公网发送消息会报SSL_HANDSHAKE
错误。
准备配置
可选:下载SSL根证书。如果是SSL接入点,需下载该证书。
- 访问aliware-kafka-demos,单击图标,然后在下拉框选择Download ZIP,下载Demo包并解压。
在解压的Demo工程中,找到kafka-confluent-python-demo文件夹,将此文件夹上传在Linux系统。
登录Linux系统,进入文件目录,修改配置文件setting.py。
kafka_setting = { 'sasl_plain_username': 'XXX', #如果是默认接入点实例,请删除该配置。 'sasl_plain_password': 'XXX', #如果是默认接入点实例,请删除该配置。 'bootstrap_servers': '[xxx,xxx,xxx]', 'topic_name': 'XXX', 'group_name': 'XXX' }
参数
描述
sasl_plain_username
SASL用户名。
说明- 如果实例未开启ACL,您可以在云消息队列 Kafka 版控制台的实例详情页面的配置信息区域获取默认的用户名和密码。
- 如果实例已开启ACL,请确保要使用的SASL用户已被授予向云消息队列 Kafka 版实例收发消息的权限。具体操作,请参见SASL用户授权。
sasl_plain_password
SASL用户名密码。
bootstrap_servers
SSL接入点。您可在云消息队列 Kafka 版控制台的实例详情页面的接入点信息区域获取。
topic_name
Topic名称。您可在云消息队列 Kafka 版控制台的Topic 管理页面获取。
group_name
Group名称。您可在云消息队列 Kafka 版控制台的Group 管理页面获取。
发送消息
执行以下命令发送消息(当前示例的Python版本为3.9)。
python kafka_producer.py
消息程序kafka_producer.py示例代码如下:
默认接入点
from confluent_kafka import Producer import setting conf = setting.kafka_setting # 初始化一个Producer对象。 p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # 异步发送消息。 p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) # 在程序结束时,调用flush。 p.flush()
SSL接入点
from confluent_kafka import Producer import setting conf = setting.kafka_setting p = Producer({'bootstrap.servers':conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password']}) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) p.flush()
订阅消息
执行以下命令订阅消息(当前示例的Python版本为3.9)。
python kafka_consumer.py
消息程序kafka_consumer.py示例代码如下:
默认接入点
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()
SSL接入点
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password'], 'ssl.endpoint.identification.algorithm':'none', 'group.id': conf['group_name'], 'auto.offset.reset': 'latest', 'fetch.message.max.bytes':'1024*512' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()