すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Python 用 SDK を使用してメッセージを送受信する

最終更新日:Mar 29, 2025

このトピックでは、Python 用 SDK を使用して ApsaraMQ for Kafka に接続し、Linux サーバーでメッセージを送受信する方法について説明します。

始める前に

Python 依存関係ライブラリをインストールする

Python 依存関係ライブラリをインストールするには、次のコマンドを実行します。

pip install confluent-kafka==1.9.2
重要

confluent-kafka 1.9.2 以前をインストールすることをお勧めします。そうでない場合、インターネット経由でメッセージを送信するときに SSL_HANDSHAKE エラーが返されます。

構成ファイルを準備する

デモ プロジェクトをダウンロードし、使用するエンドポイントに基づいて対応する構成を変更してから、デモ プロジェクトを Linux サーバーにアップロードします。

  1. aliware-kafka-demos ページに移動します。image アイコンをクリックし、[download ZIP] を選択してデモ プロジェクトをダウンロードします。次に、デモ プロジェクトのパッケージを解凍します。

    説明

    ダウンロードしたデモ プロジェクトのパッケージには、SSL ルート証明書が含まれています。 SSL ルート証明書を個別に使用する場合、SSL ルート証明書をダウンロードする をクリックします。

  2. 解凍したデモ プロジェクトで、kafka-confluent-python-demo フォルダを見つけ、使用するエンドポイントに基づいて setting.py 構成ファイルを変更します。

    デフォルト エンドポイント

    vpc ディレクトリで、setting.py 構成ファイルを変更します。

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    パラメーター

    説明

    bootstrap_servers

    ApsaraMQ for Kafka インスタンスのデフォルト エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションでエンドポイントを取得できます。

    topic_name

    トピック名。トピック管理ApsaraMQ for Kafka コンソール の [トピック] ページでトピック名を取得できます。

    group_name

    グループ 名。Group の管理ApsaraMQ for Kafka コンソール の [グループ] ページでグループ名を取得できます。

    SSL エンドポイント

    vpc-ssl ディレクトリで、setting.py 構成ファイルを変更します。

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    パラメーター

    説明

    sasl_plain_username

    簡易認証およびセキュリティ層 ( SASL ) ユーザーのユーザー名。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合、インスタンスを使用して SASL ユーザーがメッセージを送受信する権限を持っていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。

    sasl_plain_password

    SASL ユーザーのパスワード。

    ca_location

    SSL ルート証明書が保存されているパス。サンプル コードの XXX をローカル パスに置き換えます。例:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert

    bootstrap_servers

    ApsaraMQ for Kafka インスタンスの SSL エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションでエンドポイントを取得できます。

    topic_name

    トピック名。トピック管理ApsaraMQ for Kafka コンソール の [トピック] ページでトピック名を取得できます。

    group_name

    グループ 名。Group の管理ApsaraMQ for Kafka コンソール の [グループ] ページでグループ名を取得できます。

  3. kafka-confluent-python-demo フォルダを Linux サーバーの /home ディレクトリにアップロードします。

メッセージを送信する

使用するエンドポイントに基づいてメッセージを送信します。

デフォルト エンドポイント

  1. /home/kafka-confluent-python-demo/vpc サブディレクトリにアクセスするには、次のコマンドを実行します。

    cd /home/kafka-confluent-python-demo/vpc
  2. メッセージを送信するには、次のコマンドを実行します。

    python kafka_producer.py

次のサンプル コードは、kafka_producer.py の例を示しています。

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting
# プロデューサーを初期化します。
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})

def delivery_report(err, msg):
    """ 配信結果を示すために、生成されたメッセージごとに 1 回呼び出されます。
        poll() または flush() によってトリガーされます。"""  //日本語コメント
    if err is not None:
        print('メッセージの配信に失敗しました: {}'.format(err)) //日本語出力
    else:
        print('メッセージが {} [{}] に配信されました'.format(msg.topic(), msg.partition())) //日本語出力

# 非同期伝送モードでメッセージを送信します。
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

# プログラムが終了したら、flush() メソッドを呼び出します。
p.flush()

SSL エンドポイント

  1. /home/kafka-confluent-python-demo/vpc-ssl サブディレクトリにアクセスするには、次のコマンドを実行します。

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. メッセージを送信するには、次のコマンドを実行します。

    python kafka_producer.py

次のサンプル コードは、kafka_producer.py の例を示しています。

kafka_producer.py

from confluent_kafka import Producer
import setting

conf = setting.kafka_setting

p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
   'ssl.endpoint.identification.algorithm': 'none',
   'sasl.mechanisms':'PLAIN',
   'ssl.ca.location':conf['ca_location'],
   'security.protocol':'SASL_SSL',
   'sasl.username':conf['sasl_plain_username'],
   'sasl.password':conf['sasl_plain_password']})


def delivery_report(err, msg):
    if err is not None:
        print('メッセージの配信に失敗しました: {}'.format(err)) //日本語出力
    else:
        print('メッセージが {} [{}] に配信されました'.format(msg.topic(), msg.partition())) //日本語出力

p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)

p.flush()

メッセージをサブスクライブする

使用するエンドポイントに基づいてメッセージをサブスクライブします。

デフォルト エンドポイント

  1. /home/kafka-confluent-python-demo/vpc サブディレクトリにアクセスするには、次のコマンドを実行します。

    cd /home/kafka-confluent-python-demo/vpc
  2. メッセージをサブスクライブするには、次のコマンドを実行します。

    python kafka_consumer.py

次のサンプル コードは、kafka_consumer.py の例を示しています。

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print("コンシューマー エラー: {}".format(msg.error())) //日本語出力
            continue

    print('受信したメッセージ: {}'.format(msg.value().decode('utf-8'))) //日本語出力

c.close()

SSL エンドポイント

  1. /home/kafka-confluent-python-demo/vpc-ssl サブディレクトリにアクセスするには、次のコマンドを実行します。

    cd /home/kafka-confluent-python-demo/vpc-ssl
  2. メッセージをサブスクライブするには、次のコマンドを実行します。

    python kafka_consumer.py

次のサンプル コードは、kafka_consumer.py の例を示しています。

kafka_consumer.py

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'ssl.endpoint.identification.algorithm': 'none',
    'sasl.mechanisms':'PLAIN',
    'ssl.ca.location':conf['ca_location'],
    'security.protocol':'SASL_SSL',
    'sasl.username':conf['sasl_plain_username'],
    'sasl.password':conf['sasl_plain_password'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest',
    'fetch.message.max.bytes':'1024*512'
})

c.subscribe([conf['topic_name']])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
       else:
           print("コンシューマー エラー: {}".format(msg.error())) //日本語出力
           continue

    print('受信したメッセージ: {}'.format(msg.value().decode('utf-8'))) //日本語出力

c.close()