This topic describes how to use the SDK for PHP to connect to the SSL endpoint of a Message Queue for Apache Kafka instance and use the PLAIN mechanism to send and consume messages over the Internet.

Prerequisites

  • GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.
  • PHP is installed. For more information, see Downloads.
  • PHP Extension Community Library (PECL) is installed. For more information, see Downloading PECL extensions.

Install the C++ library

  1. Run the following command to switch to the yum repository directory /etc/yum.repos.d/:
    cd /etc/yum.repos.d/
  2. Create a yum repository configuration file named confluent.repo.
    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. Run the following command to install the C++ library:
    yum install librdkafka-devel

Install the PHP library

  1. Run the following command to install the PHP library:
    pecl install rdkafka
  2. In the PHP initialization file php.ini, add the following line to enable Kafka extensions:
    extension=rdkafka.so

Preparations

  1. Download an SSL root certificate.
  2. Create a Message Queue for Apache Kafka configuration file.
    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];
    Parameter Description
    sasl_plain_username The username of the Simple Authentication and Security Layer (SASL) user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    sasl_plain_password The password of the SASL user.
    bootstrap_servers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic_name The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumer_id The name of the consumer group. Group You can obtain the name of the consumer on the Groups page in the Message Queue for Apache Kafka console.

Send messages

For more information about the sample code, see php-rdkafka.

  1. Create a producer program named kafka-producer.php that contains the following code:
    <?php
    
    $setting = require __DIR__ . '/setting.php';
    
    $conf = new RdKafka\Conf();
    $conf->set('sasl.mechanisms', 'PLAIN');
    $conf->set('api.version.request', 'true');
    $conf->set('sasl.username', $setting['sasl_plain_username']);
    $conf->set('sasl.password', $setting['sasl_plain_password']);
    $conf->set('security.protocol', 'SASL_SSL');
    $conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
    $conf->set('message.send.max.retries', 5);
    $rk = new RdKafka\Producer($conf);
    # if want to debug, set log level to LOG_DEBUG
    $rk->setLogLevel(LOG_INFO);
    $rk->addBrokers($setting['bootstrap_servers']);
    $topic = $rk->newTopic($setting['topic_name']);
    $a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
    $rk->poll(0);
    while ($rk->getOutQLen() > 0) {
        $rk->poll(50);
    }
    echo "send succ" . PHP_EOL;
  2. Run the following command to run kafka-producer.php to send messages:
    php kafka-producer.php

Consume messages

For more information about the sample code, see php-rdkafka.

  1. Create a consumer program named kafka-consumer.php that contains the following code:
    <?php
    $setting = require __DIR__ . '/setting.php';
    $conf = new RdKafka\Conf();
    $conf->set('sasl.mechanisms', 'PLAIN');
    $conf->set('api.version.request', 'true');
    $conf->set('sasl.username', $setting['sasl_plain_username']);
    $conf->set('sasl.password', $setting['sasl_plain_password']);
    $conf->set('security.protocol', 'SASL_SSL');
    $conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
    
    $conf->set('group.id', $setting['consumer_id']);
    
    $conf->set('metadata.broker.list', $setting['bootstrap_servers']);
    
    $topicConf = new RdKafka\TopicConf();
    
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafka\KafkaConsumer($conf);
    
    $consumer->subscribe([$setting['topic_name']]);
    
    echo "Waiting for partition assignment... (make take some time when\n";
    echo "quickly re-joining the group after leaving it.)\n";
    
    while (true) {
        $message = $consumer->consume(30 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    ?>
  2. Run the following command to run kafka-consumer.php to consume messages:
    php kafka-consumer.php