This topic describes how to use the SDK for Python to connect to the SSL endpoint of a Message Queue for Apache Kafka instance and use the PLAIN mechanism to send and consume messages over the Internet.

Prerequisites

  • Python is installed. For more information, see Download Python.
    Note Python 2.7, 3.4, 3.5, 3.6, and 3.7 are supported.
  • pip is installed. For more information, see Installation.

Install the Python library

Run the following command to install the Python library:
pip install kafka-python

Preparations

  1. Download an SSL root certificate.
  2. Create a Message Queue for Apache Kafka configuration file named setting.py.
    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'bootstrap_servers': ["XXX", "XXX", "XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }
    Parameter Description
    sasl_plain_username The username of the Simple Authentication and Security Layer (SASL) user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    sasl_plain_password The password of the SASL user.
    bootstrap_servers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic_name The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumer_id The name of the consumer group. Group You can obtain the name of the consumer group on the Groups page in the Message Queue for Apache Kafka console.

Send messages

  1. Create a producer program named aliyun_kafka_producer.py.
    #!/usr/bin/env python
    # encoding: utf-8
    
    import ssl
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    print conf
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("ca-cert")
    
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            retries=5,
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    partitions = producer.partitions_for(conf['topic_name'])
    print 'Partitions of the topic: %s' % partitions
    
    try:
        future = producer.send(conf['topic_name'], 'hello aliyun-kafka!')
        future.get()
        print 'send message succeed.'
    except KafkaError, e:
        print 'send message failed.'
        print e
  2. Run the following command to run aliyun_kafka_producer.py to send messages:
    python aliyun_kafka_producer.py

Consume messages

  1. Create a consumer program named aliyun_kafka_consumer.py.
    #!/usr/bin/env python
    # encoding: utf-8
    
    import ssl
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("ca-cert")
    
    consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    print 'consumer start to consuming...'
    consumer.subscribe((conf['topic_name'], ))
    for message in consumer:
        print message.topic, message.offset, message.key, message.value, message.value, message.partition
  2. Run the following command to run aliyun_kafka_consumer.py to consume messages:
    python aliyun_kafka_consumer.py