All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for PHP to send and receive messages

Last Updated:Mar 15, 2024

This topic describes how to use the SDK for PHP to connect to ApsaraMQ for Kafka to send and receive messages.

Environment requirements

Install the C++ library

  1. Run the following command to switch to the /etc/yum.repos.d/ yum repository directory:

    cd /etc/yum.repos.d/
  2. Create a yum repository configuration file named confluent.repo.

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. Run the following command to install the C++ library:

    yum install librdkafka-devel

Install the PHP library

  1. Run the following command to install the PHP library:

    pecl install rdkafka
  2. In the PHP initialization file php.ini, add the following line to enable Kafka extensions:

    extension=rdkafka.so

Prepare configuration files

  1. (Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.

  2. Go to the aliware-kafka-demos page, click download to download the demo project to your on-premises machine, and then decompress the package of the demo project.

  3. In the decompressed package, go to the kafka-php-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the setting.php file in the folder.

    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];

    Parameter

    Description

    sasl_plain_username

    The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    Note
    • If the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.

    sasl_plain_password

    The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    bootstrap_servers

    The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic_name

    The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    consumer_id

    The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.

  4. After the required parameters are configured, upload all files in the folder in which the configuration file is located to the PHP installation directory on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.

Send messages

Run the following command to run kafka-producer.php to send messages:

php kafka-producer.php

The following sample code provides an example of kafka-producer.php:

Note

In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl. or ssl. from the sample code.

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

For more information about the sample code, see php-rdkafka.

Subscribe to messages

Run the following command to run kafka-consumer.php to subscribe to messages:

php kafka-consumer.php

The following sample code provides an example of kafka-consumer.php:

Note

In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl. or ssl. from the sample code.

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('session.timeout.ms', 10000);

$conf->set('request.timeout.ms', 305000);

$conf->set('group.id', $setting['consumer_id']);

$conf->set('ssl.endpoint.identification.algorithm', 'none');

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

For more information about the sample code, see php-rdkafka.