All Products
Search
Document Center

ApsaraMQ for Kafka:Gunakan SDK untuk Python untuk mengirim dan menerima pesan

Last Updated:Jul 06, 2025

Topik ini menjelaskan cara menggunakan SDK untuk Python untuk terhubung ke ApsaraMQ for Kafka guna mengirim dan menerima pesan di Server Linux.

Sebelum memulai

Instal pustaka dependensi Python

Jalankan perintah berikut untuk menginstal pustaka dependensi Python:

pip install confluent-kafka==1.9.2
Penting

Kami merekomendasikan Anda menginstal confluent-kafka 1.9.2 atau versi lebih lama. Jika tidak, kesalahan SSL_HANDSHAKE akan dikembalikan saat Anda mengirim pesan melalui Internet.

Siapkan file konfigurasi

Unduh proyek demo, modifikasi konfigurasi sesuai titik akhir yang Anda gunakan, lalu unggah proyek demo ke server Linux.

  1. Buka halaman aliware-kafka-demos. Klik ikon image dan pilih Download ZIP untuk mengunduh proyek demo. Kemudian, ekstrak paket proyek demo tersebut.

    Catatan

    Paket proyek demo yang diunduh mencakup sertifikat root SSL. Jika Anda ingin menggunakan sertifikat root SSL secara terpisah, klik Unduh sertifikat root SSL.

  2. Dalam proyek demo yang telah diekstrak, temukan folder kafka-confluent-python-demo dan modifikasi file konfigurasi setting.py berdasarkan titik akhir yang Anda gunakan.

    Titik akhir default

    Di direktori vpc, modifikasi file konfigurasi setting.py.

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    Parameter

    Deskripsi

    bootstrap_servers

    Titik akhir default dari instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di bagian Endpoint Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.

    topic_name

    Nama topik. Anda dapat memperoleh nama topik di halaman Topics di Konsol ApsaraMQ for Kafka.

    group_name

    Nama grup. Anda dapat memperoleh nama grup di halaman Groups di Konsol ApsaraMQ for Kafka.

    Titik akhir SSL

    Di direktori vpc-ssl, modifikasi file konfigurasi setting.py.

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    Parameter

    Deskripsi

    sasl_plain_username

    Nama pengguna Simple Authentication and Security Layer (SASL).

    Catatan
    • Jika fitur ACL tidak diaktifkan untuk instance ApsaraMQ for Kafka, Anda dapat memperoleh nama pengguna dan kata sandi pengguna SASL dari parameter Username dan Password di bagian Configuration Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.

    • Jika fitur ACL diaktifkan untuk instance ApsaraMQ for Kafka, pastikan bahwa pengguna SASL memiliki izin untuk mengirim dan menerima pesan menggunakan instance tersebut. Untuk informasi lebih lanjut, lihat Berikan izin kepada pengguna SASL.

    sasl_plain_password

    Kata sandi pengguna SASL.

    ca_location

    Lokasi penyimpanan sertifikat root SSL. Ganti XXX dalam kode contoh dengan jalur lokal. Contoh: /home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert.

    bootstrap_servers

    Titik akhir SSL dari instance ApsaraMQ for Kafka. Anda dapat memperoleh titik akhir di bagian Endpoint Information pada halaman Instance Details di Konsol ApsaraMQ for Kafka.

    topic_name

    Nama topik. Anda dapat memperoleh nama topik di halaman Topics di Konsol ApsaraMQ for Kafka.

    group_name

    Nama grup. Anda dapat memperoleh nama grup di halaman Groups di Konsol ApsaraMQ for Kafka.

  3. Unggah folder kafka-confluent-python-demo ke direktori /home di server Linux.

Kirim pesan

Kirim pesan berdasarkan titik akhir yang Anda gunakan.

Titik akhir default

  1. Jalankan perintah berikut untuk mengakses subdirektori /home/kafka-confluent-python-demo/vpc:

    cd /home/kafka-confluent-python-demo/vpc
  2. Jalankan perintah berikut untuk mengirim pesan:

    python kafka_producer.py

Kode contoh berikut memberikan contoh kafka_producer.py:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting
# Inisialisasi produser. 
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})

def delivery_report(err, msg):
    """ Dipanggil satu kali untuk setiap pesan yang diproduksi untuk menunjukkan hasil pengiriman.
        Dipicu oleh poll() atau flush(). """
    if err is not None:
        print('Pengiriman pesan gagal: {}'.format(err))
    else:
        print('Pesan dikirim ke {} [{}]'.format(msg.topic(), msg.partition()))

# Kirim pesan dalam mode transmisi asinkron. 
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

# Saat program diakhiri, panggil metode flush(). 
p.flush()

Titik akhir SSL

  1. Jalankan perintah berikut untuk mengakses subdirektori /home/kafka-confluent-python-demo/vpc-ssl:

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. Jalankan perintah berikut untuk mengirim pesan:

    python kafka_producer.py

Kode contoh berikut memberikan contoh kafka_producer.py:

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting

p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
   'ssl.endpoint.identification.algorithm': 'none',
   'sasl.mechanisms':'PLAIN',
   'ssl.ca.location':conf['ca_location'],
   'security.protocol':'SASL_SSL',
   'sasl.username':conf['sasl_plain_username'],
   'sasl.password':conf['sasl_plain_password']})


def delivery_report(err, msg):
    if err is not None:
        print('Pengiriman pesan gagal: {}'.format(err))
    else:
        print('Pesan dikirim ke {} [{}]'.format(msg.topic(), msg.partition()))

p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

p.flush()

Berlangganan pesan

Berlangganan pesan berdasarkan titik akhir yang Anda gunakan.

Titik akhir default

  1. Jalankan perintah berikut untuk mengakses subdirektori /home/kafka-confluent-python-demo/vpc:

    cd /home/kafka-confluent-python-demo/vpc
  2. Jalankan perintah berikut untuk berlangganan pesan:

    python kafka_consumer.py

Kode contoh berikut memberikan contoh kafka_consumer.py:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print("Kesalahan konsumen: {}".format(msg.error()))
            continue

    print('Pesan diterima: {}'.format(msg.value().decode('utf-8')))

c.close()

Titik akhir SSL

  1. Jalankan perintah berikut untuk mengakses subdirektori /home/kafka-confluent-python-demo/vpc-ssl:

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. Jalankan perintah berikut untuk berlangganan pesan:

    python kafka_consumer.py

Kode contoh berikut memberikan contoh kafka_consumer.py:

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'ssl.endpoint.identification.algorithm': 'none',
    'sasl.mechanisms':'PLAIN',
    'ssl.ca.location':conf['ca_location'],
    'security.protocol':'SASL_SSL',
    'sasl.username':conf['sasl_plain_username'],
    'sasl.password':conf['sasl_plain_password'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest',
    'fetch.message.max.bytes':'1024*512'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
       else:
           print("Kesalahan konsumen: {}".format(msg.error()))
           continue

    print('Pesan diterima: {}'.format(msg.value().decode('utf-8')))

c.close()