This topic describes how to use the SDK for PHP to connect to an endpoint of a Message Queue for Apache Kafka instance and send and subscribe to messages.

Environment requirements

  • GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.
  • PHP is installed. For more information, see Downloads.
  • PHP Extension Community Library (PECL) is installed. For more information, see Downloading PECL extensions.

Install the C++ dependency library

  1. Run the following command to go to the /etc/yum.repos.d/ directory in which the yum repository is installed:
    cd /etc/yum.repos.d/
  2. Create a yum repository configuration file named confluent.repo.
    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. Run the following command to install the C++ dependency library:
    yum install librdkafka-devel

Install the PHP dependency library

  1. Run the following command to install the PHP dependency library:
    pecl install rdkafka
  2. In the PHP initialization file php.ini, add the following code to enable Kafka extensions:
    extension=rdkafka.so

Prepare a configuration file

  1. Optional:Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must download this certificate.
  2. In the decompressed package, go to the kafka-php-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the setting.php file in the folder.
    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];
    Parameter Description
    sasl_plain_username The username of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    sasl_plain_password The password of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    bootstrap_servers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic_name The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumer_id The ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the Message Queue for Apache Kafka console.
  3. After the required parameters are configured, upload all files in the folder in which the configuration file is located to the PHP installation directory on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.

Send messages

Run the following command to run kafka-producer.php to send messages:

php kafka-producer.php
The following sample code provides an example of kafka-producer.php:
Note In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl. or ssl. from the sample code.
<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

For more information about the sample code, see php-rdkafka.

Subscribe to messages

Run the following command to run kafka-consumer.php to subscribe to messages:

php kafka-consumer.php
The following sample code provides an example of kafka-consumer.php:
Note In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl. or ssl. from the sample code.
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('group.id', $setting['consumer_id']);

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

For more information about the sample code, see php-rdkafka.