This topic describes how to use the SDK for Python to connect to an endpoint of a ApsaraMQ for Kafka instance and send and subscribe to messages.

Set up the development environment

Install the Python library

Run the following command to install the Python library:
pip install confluent-kafka==1.9.2
Important We recommend that you install confluent-kafka 1.9.2 or earlier. Otherwise, the SSL_HANDSHAKE error is reported if you send messages over the Internet.

Prepare a configuration file

  1. Optional: Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must install this certificate.
  2. Visit the aliware-kafka-demos page. On the page that appears, click the code icon and select Download ZIP to download the demo package. Then, decompress the demo package.
  3. In the decompressed package, find the kafka-confluent-python-demo folder and upload the folder to your Linux system.
  4. Log on to your Linux system, go to the /etc directory, and then modify the kafka.properties configuration file.
    kafka_setting = {
        'sasl_plain_username': 'XXX', # If you use the default endpoint to connect to your Message Queue for Apache Kafka instance, delete this configuration. 
        'sasl_plain_password': 'XXX', # If you use the default endpoint to connect to your Message Queue for Apache Kafka instance, delete this configuration. 
        'bootstrap_servers': ["XXX", "XXX", "XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }
    ParameterDescription
    sasl_plain_usernameThe username of the Simple Authentication and Security Layer (SASL) user.
    Note
    • If the ACL feature is not enabled for your ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
    • If the ACL feature is enabled for your ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    sasl_plain_passwordThe password of the SASL user.
    bootstrap_serversThe SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
    topic_nameThe name of the topic. You can obtain the name of the topic on the Topics page in the ApsaraMQ for Kafka console.
    consumer_idThe ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the ApsaraMQ for Kafka console.

Send messages

Run the following command to send a message. Python 3.9 is used in the following sample code.

python kafka_producer.py
The following sample code provides examples of kafka_producer.py:
  • Default endpoint
    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    # Initialize a producer. 
    p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    # Send messages in asynchronous transmission mode. 
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    # When the program is ended, call the flush() method. 
    p.flush()
  • SSL endpoint
    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    
    p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
       'ssl.endpoint.identification.algorithm': 'none',
       'sasl.mechanisms':'PLAIN',
       'ssl.ca.location':conf['ca_location'],
       'security.protocol':'SASL_SSL',
       'sasl.username':conf['sasl_plain_username'],
       'sasl.password':conf['sasl_plain_password']})
    
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    p.flush()

Subscribe to messages

Run the following command to subscribe to messages. Python 3.9 is used in the following sample code.

python kafka_consumer.py
The following sample code provides examples of kafka_consumer.py:
  • Default endpoint
    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print("Consumer error: {}".format(msg.error()))
                continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
  • SSL endpoint
    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'ssl.endpoint.identification.algorithm': 'none',
        'sasl.mechanisms':'PLAIN',
        'ssl.ca.location':conf['ca_location'],
        'security.protocol':'SASL_SSL',
        'sasl.username':conf['sasl_plain_username'],
        'sasl.password':conf['sasl_plain_password'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest',
        'fetch.message.max.bytes':'1024*512'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
           if msg.error().code() == KafkaError._PARTITION_EOF:
              continue
           else:
               print("Consumer error: {}".format(msg.error()))
               continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()