All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for Python to send and receive messages

Last Updated:Mar 28, 2025

This topic describes how to use the SDK for Python to connect to ApsaraMQ for Kafka to send and receive messages on a Linux server.

Before you start

Install the Python dependency library

Run the following command to install the Python dependency library:

pip install confluent-kafka==1.9.2
Important

We recommend that you install confluent-kafka 1.9.2 or earlier. Otherwise, the SSL_HANDSHAKE error is returned when you send messages over the Internet.

Prepare a configuration file

Download the demo project, modify the corresponding configurations based on the endpoint that you use, and then upload the demo project to the Linux server.

  1. Go to the aliware-kafka-demos page. Click the image icon and select Download ZIP to download the demo project. Then, decompress the package of the demo project.

    Note

    The downloaded package of the demo project includes the SSL root certificate. If you want to separately use the SSL root certificate, click Download the SSL root certificate.

  2. In the decompressed demo project, find the kafka-confluent-python-demo folder and modify the setting.py configuration file based on the endpoint that you use.

    Default endpoint

    In the vpc directory, modify the setting.py configuration file.

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    Parameter

    Description

    bootstrap_servers

    The default endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic_name

    The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    group_name

    The group name. You can obtain the group name on the Groups page in the ApsaraMQ for Kafka console.

    SSL endpoint

    In the vpc-ssl directory, modify the setting.py configuration file.

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    Parameter

    Description

    sasl_plain_username

    The username of the Simple Authentication and Security Layer (SASL) user.

    Note
    • If the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.

    sasl_plain_password

    The password of the SASL user.

    ca_location

    The path to which the SSL root certificate is saved. Replace XXX in the sample code with the local path. Example: /home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert.

    bootstrap_servers

    The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic_name

    The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    group_name

    The group name. You can obtain the group name on the Groups page in the ApsaraMQ for Kafka console.

  3. Upload the kafka-confluent-python-demo folder to the /home directory on the Linux server.

Send messages

Send messages based on the endpoint that you use.

Default endpoint

  1. Run the following command to access the /home/kafka-confluent-python-demo/vpc subdirectory:

    cd /home/kafka-confluent-python-demo/vpc
  2. Run the following command to send messages:

    python kafka_producer.py

The following sample code provides an example of kafka_producer.py:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting
# Initialize a producer. 
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# Send messages in asynchronous transmission mode. 
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

# When the program is ended, call the flush() method. 
p.flush()

SSL endpoint

  1. Run the following command to access the /home/kafka-confluent-python-demo/vpc-ssl subdirectory:

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. Run the following command to send messages:

    python kafka_producer.py

The following sample code provides an example of kafka_producer.py:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting

p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
   'ssl.endpoint.identification.algorithm': 'none',
   'sasl.mechanisms':'PLAIN',
   'ssl.ca.location':conf['ca_location'],
   'security.protocol':'SASL_SSL',
   'sasl.username':conf['sasl_plain_username'],
   'sasl.password':conf['sasl_plain_password']})


def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

p.flush()

Subscribe to messages

Subscribe to messages based on the endpoint that you use.

Default endpoint

  1. Run the following command to access the /home/kafka-confluent-python-demo/vpc subdirectory:

    cd /home/kafka-confluent-python-demo/vpc
  2. Run the following command to subscribe to messages:

    python kafka_consumer.py

The following sample code provides an example of kafka_consumer.py:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print("Consumer error: {}".format(msg.error()))
            continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

SSL endpoint

  1. Run the following command to access the /home/kafka-confluent-python-demo/vpc-ssl subdirectory:

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. Run the following command to subscribe to messages:

    python kafka_consumer.py

The following sample code provides an example of kafka_consumer.py:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'ssl.endpoint.identification.algorithm': 'none',
    'sasl.mechanisms':'PLAIN',
    'ssl.ca.location':conf['ca_location'],
    'security.protocol':'SASL_SSL',
    'sasl.username':conf['sasl_plain_username'],
    'sasl.password':conf['sasl_plain_password'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest',
    'fetch.message.max.bytes':'1024*512'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
       else:
           print("Consumer error: {}".format(msg.error()))
           continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()