All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for PHP to send and receive messages

Last Updated:Mar 11, 2026

Connect a PHP application to ApsaraMQ for Kafka to send and receive messages. The examples in this guide use the php-rdkafka extension, a PHP wrapper around the librdkafka C library.

Prerequisites

Before you begin, make sure the following software is installed on your Linux server:

  • GNU Compiler Collection (GCC): Required to compile native extensions. For installation instructions, see Installing GCC.

  • PHP: For download and installation instructions, see the PHP downloads page.

  • PHP Extension Community Library (PECL): Used to install PHP extensions. For details, see Downloading PECL extensions.

You also need:

  • An ApsaraMQ for Kafka instance with a topic and consumer group created. Get the endpoint, topic name, and group ID from the ApsaraMQ for Kafka console.

  • If you connect through the SSL endpoint: the Simple Authentication and Security Layer (SASL) username and password for your instance.

Step 1: Install librdkafka

The php-rdkafka extension depends on the librdkafka C library. Install it from the Confluent package repository.

  1. Switch to the yum repository directory:

       cd /etc/yum.repos.d/
  2. Create a file named confluent.repo with the following content:

       [Confluent.dist]
       name=Confluent repository (dist)
       baseurl=https://packages.confluent.io/rpm/5.1/7
       gpgcheck=1
       gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
       enabled=1
    
       [Confluent]
       name=Confluent repository
       baseurl=https://packages.confluent.io/rpm/5.1
       gpgcheck=1
       gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
       enabled=1
  3. Install the librdkafka development package:

       yum install librdkafka-devel

Step 2: Install the php-rdkafka extension

  1. Install the extension through PECL:

       pecl install rdkafka
  2. Add the following line to your php.ini file to enable the extension:

       extension=rdkafka.so

Step 3: Configure connection parameters

  1. (Optional) If you connect through the SSL endpoint, download the SSL root certificate.

  2. Download the demo project from aliware-kafka-demos and extract the archive.

  3. In the extracted package, navigate to the kafka-php-demo folder. Open the subfolder that matches your endpoint type (SSL or default), and edit the setting.php file:

       <?php
    
       return [
           'sasl_plain_username' => '<YOUR_SASL_USERNAME>',
           'sasl_plain_password' => '<YOUR_SASL_PASSWORD>',
           'bootstrap_servers'   => '<HOST1>:<PORT1>,<HOST2>:<PORT2>',
           'topic_name'          => '<YOUR_TOPIC_NAME>',
           'consumer_id'         => '<YOUR_CONSUMER_GROUP_ID>',
       ];

    Replace the placeholders with your actual values:

    PlaceholderDescriptionWhere to find it
    <YOUR_SASL_USERNAME>SASL username. Not required for the default endpoint.Instance Details > Configuration Information > Username in the ApsaraMQ for Kafka console. If the ACL feature is enabled, make sure the SASL user is authorized to send and receive messages. For more information, see Grant permissions to SASL users.
    <YOUR_SASL_PASSWORD>SASL password. Not required for the default endpoint.Same location as the username.
    <HOST1>:<PORT1>,<HOST2>:<PORT2>Bootstrap servers (the endpoint of your ApsaraMQ for Kafka instance).Instance Details > Endpoint Information in the ApsaraMQ for Kafka console.
    <YOUR_TOPIC_NAME>Topic name.Topics page in the ApsaraMQ for Kafka console.
    <YOUR_CONSUMER_GROUP_ID>Consumer group ID.Groups page in the ApsaraMQ for Kafka console.
  4. Upload all files in the folder to the PHP installation directory on your server. If you use the SSL endpoint, make sure the SSL root certificate file (ca-cert.pem) is included.

Step 4: Send messages

Run kafka-producer.php to send a message:

php kafka-producer.php

SSL endpoint

Use this code when connecting through the SSL endpoint with SASL authentication:

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();

// SASL authentication
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);

// SSL/TLS encryption
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');

// Producer settings
$conf->set('api.version.request', 'true');
$conf->set('message.send.max.retries', 5);

$producer = new RdKafka\Producer($conf);
$producer->setLogLevel(LOG_INFO);  // Set to LOG_DEBUG for troubleshooting
$producer->addBrokers($setting['bootstrap_servers']);

$topic = $producer->newTopic($setting['topic_name']);

// Send a message to an automatically assigned partition
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$producer->poll(0);

// Wait for all outstanding messages to be delivered
while ($producer->getOutQLen() > 0) {
    $producer->poll(50);
}

echo "send succ" . PHP_EOL;

Default endpoint

Use this code when connecting through the default endpoint without SASL authentication:

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();

// Producer settings
$conf->set('api.version.request', 'true');
$conf->set('message.send.max.retries', 5);

$producer = new RdKafka\Producer($conf);
$producer->setLogLevel(LOG_INFO);  // Set to LOG_DEBUG for troubleshooting
$producer->addBrokers($setting['bootstrap_servers']);

$topic = $producer->newTopic($setting['topic_name']);

// Send a message to an automatically assigned partition
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$producer->poll(0);

// Wait for all outstanding messages to be delivered
while ($producer->getOutQLen() > 0) {
    $producer->poll(50);
}

echo "send succ" . PHP_EOL;

For more information about the php-rdkafka producer API, see php-rdkafka.

Step 5: Subscribe to messages

Run kafka-consumer.php to start consuming messages:

php kafka-consumer.php

SSL endpoint

Use this code when connecting through the SSL endpoint with SASL authentication:

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();

// SASL authentication
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);

// SSL/TLS encryption
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');

// Consumer settings
$conf->set('api.version.request', 'true');
$conf->set('group.id', $setting['consumer_id']);
$conf->set('session.timeout.ms', 10000);
$conf->set('request.timeout.ms', 305000);
$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (may take some time when" . PHP_EOL;
echo "quickly re-joining the group after leaving it.)" . PHP_EOL;

while (true) {
    $message = $consumer->consume(30 * 1000);  // 30-second poll timeout
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            // Successfully consumed a message
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more" . PHP_EOL;
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out" . PHP_EOL;
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

Default endpoint

Use this code when connecting through the default endpoint without SASL authentication:

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();

// Consumer settings
$conf->set('api.version.request', 'true');
$conf->set('group.id', $setting['consumer_id']);
$conf->set('session.timeout.ms', 10000);
$conf->set('request.timeout.ms', 305000);
$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (may take some time when" . PHP_EOL;
echo "quickly re-joining the group after leaving it.)" . PHP_EOL;

while (true) {
    $message = $consumer->consume(30 * 1000);  // 30-second poll timeout
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            // Successfully consumed a message
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more" . PHP_EOL;
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out" . PHP_EOL;
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

For more information about the php-rdkafka consumer API, see php-rdkafka.

What's next