添加Python依賴庫
執行以下命令安裝依賴庫。
pip install confluent-kafka==1.9.2
重要 建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送訊息會報SSL_HANDSHAKE錯誤。
準備配置
下載Demo工程,根據實際存取點修改相應配置,然後將Demo工程上傳至Linux伺服器。
在解壓的Demo工程中,找到kafka-confluent-python-demo檔案夾,根據實際的存取點修改設定檔setting.py。
預設存取點
在vpc目錄中,修改設定檔setting.py。
kafka_setting = {
'bootstrap_servers': 'XXX:xxx,XXX:xxx',
'topic_name': 'XXX',
'group_name': 'XXX'
}
SSL存取點
在vpc-ssl目錄中,修改設定檔setting.py。
kafka_setting = {
'sasl_plain_username': 'XXX',
'sasl_plain_password': 'XXX',
'ca_location': '/XXX/mix-4096-ca-cert',
'bootstrap_servers': 'XXX:xxx,XXX:xxx',
'topic_name': 'XXX',
'group_name': 'XXX'
}
參數 | 描述 |
sasl_plain_username | SASL使用者名稱。
說明 - 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
|
sasl_plain_password | SASL使用者名稱密碼。 |
ca_location | SSL根憑證的路徑。用本地路徑替換樣本中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert。 |
bootstrap_servers | SSL存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。 |
topic_name | Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。 |
group_name | Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。 |
將kafka-confluent-python-demo檔案夾上傳到Linux伺服器的/home路徑下。
發送訊息
根據實際的存取點,按照以下方式發送訊息。
預設存取點
執行以下命令,進入到/home/kafka-confluent-python-demo/vpc路徑。
cd /home/kafka-confluent-python-demo/vpc
執行以下命令,發送訊息。
python kafka_producer.py
訊息程式kafka_producer.py範例程式碼如下:
kafka_producer.py
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
# 初始化一個Producer對象。
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# 非同步發送訊息。
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
# 在程式結束時,調用flush。
p.flush()
SSL存取點
執行以下命令,進入到/home/kafka-confluent-python-demo/vpc-ssl路徑。
cd /home/kafka-confluent-python-demo/vpc-ssl
執行以下命令,發送訊息。
python kafka_producer.py
訊息程式kafka_producer.py範例程式碼如下:
kafka_producer.py
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
'ssl.endpoint.identification.algorithm': 'none',
'sasl.mechanisms':'PLAIN',
'ssl.ca.location':conf['ca_location'],
'security.protocol':'SASL_SSL',
'sasl.username':conf['sasl_plain_username'],
'sasl.password':conf['sasl_plain_password']})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
p.flush()
訂閱訊息
根據實際的存取點,按照以下方式訂閱訊息。
預設存取點
執行以下命令,進入到/home/kafka-confluent-python-demo/vpc路徑。
cd /home/kafka-confluent-python-demo/vpc
執行以下命令,訂閱訊息。
python kafka_consumer.py
訊息程式kafka_consumer.py範例程式碼如下:
kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
'bootstrap.servers': conf['bootstrap_servers'],
'group.id': conf['group_name'],
'auto.offset.reset': 'latest'
})
c.subscribe([conf['topic_name']])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
SSL存取點
執行以下命令,進入到/home/kafka-confluent-python-demo/vpc-ssl路徑。
cd /home/kafka-confluent-python-demo/vpc-ssl
執行以下命令,訂閱訊息。
python kafka_consumer.py
訊息程式kafka_consumer.py範例程式碼如下:
kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
'bootstrap.servers': conf['bootstrap_servers'],
'ssl.endpoint.identification.algorithm': 'none',
'sasl.mechanisms':'PLAIN',
'ssl.ca.location':conf['ca_location'],
'security.protocol':'SASL_SSL',
'sasl.username':conf['sasl_plain_username'],
'sasl.password':conf['sasl_plain_password'],
'group.id': conf['group_name'],
'auto.offset.reset': 'latest',
'fetch.message.max.bytes':'1024*512'
})
c.subscribe([conf['topic_name']])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()