すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Go SDK を使用してメッセージを送受信する

最終更新日:Mar 19, 2025

このトピックでは、Go SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。

環境要件

Go がインストールされていること。詳細については、すべてのリリース をご参照ください。

説明

Windows システムでは kafka-confluent-go-demo を実行できません。

構成ファイルを作成する

  1. (オプション) Secure Sockets Layer(SSL)ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。

  2. aliware-kafka-demos ページに移動します。code アイコンをクリックし、[ZIP をダウンロード] を選択してデモプロジェクトをダウンロードします。次に、デモプロジェクトのパッケージを解凍します。

  3. 解凍したパッケージで、kafka-confluent-go-demo フォルダーを見つけ、Linux システムの /home ディレクトリにアップロードします。

  4. Linux システムにログオンし、/home/kafka-confluent-go-demo ディレクトリに移動して、conf/kafka.json 構成ファイルを変更します。

    {
      "topic": "XXX", // トピック名
      "group.id": "XXX", // グループID
      "bootstrap.servers" : "XXX:XX,XXX:XX,XXX:XX", // ブートストラップサーバー
      "security.protocol" : "plaintext", // セキュリティプロトコル
      "sasl.mechanism" : "XXX", // SASLメカニズム
      "sasl.username" : "XXX", // SASLユーザー名
      "sasl.password" : "XXX" // SASLパスワード
    }

    パラメーター

    説明

    topic

    Kafkaインスタンスで作成したトピックの名前です。 トピック管理 ページの ApsaraMQ for Kafka コンソール で、トピック名を取得できます。

    group.id

    ApsaraMQ for Kafka インスタンスで作成した グループ の ID。Group の管理ApsaraMQ for Kafka コンソール の [グループ] ページでグループ ID を取得できます。

    説明

    クライアントが producer.go を実行してメッセージを送信する場合、このパラメーターはオプションです。クライアントが consumer.go を実行してメッセージを受信する場合、このパラメーターは必須です。

    bootstrap.servers

    ApsaraMQ for Kafka インスタンスの SSL エンドポイントの IP アドレスとポート。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションでエンドポイントを取得できます。

    security.protocol

    Simple Authentication and Security Layer(SASL)ユーザーを認証するために使用するプロトコル。デフォルト値:plaintext。ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントのタイプに基づいて値を指定します。

    • デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを plaintext に設定します。

    • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを sasl_ssl に設定します。

    • SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを sasl_plaintext に設定します。

    sasl.mechanism

    メッセージの送受信に使用するセキュリティメカニズム。ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントのタイプに基づいて値を指定します。

    • デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを構成する必要はありません。

    • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを PLAIN に設定します。

    • SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを PLAIN または SCRAM-SHA-256 に設定します。

    sasl.username

    SASL ユーザーのユーザー名。SSL または SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターが必須です。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、SASL ユーザーに権限を付与するをご参照ください。

    sasl.password

    SASL ユーザーのパスワード。SSL または SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターが必須です。

メッセージを送信する

producer.go を実行してメッセージを送信するには、次のコマンドを実行します。

go run -mod=vendor producer/producer.go

次のサンプルコードは、producer.go の例を示しています。

package main

import (
    "encoding/json"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "path/filepath"
)

type KafkaConfig struct {
    Topic      string `json:"topic"`
    GroupId    string `json:"group.id"`
    BootstrapServers    string `json:"bootstrap.servers"`
    SecurityProtocol string `json:"security.protocol"`
    SslCaLocation string `json:"ssl.ca.location"`
    SaslMechanism string `json:"sasl.mechanism"`
    SaslUsername string `json:"sasl.username"`
    SaslPassword string `json:"sasl.password"`
}

// config は構造体へのポインターである必要があります。そうでない場合はパニックが発生します。
func loadJsonConfig() *KafkaConfig {
    workPath, err := os.Getwd()
    if err != nil {
        panic(err)
    }
    configPath := filepath.Join(workPath, "conf")
    fullPath := filepath.Join(configPath, "kafka.json")
    file, err := os.Open(fullPath);
    if (err != nil) {
        msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
        panic(msg)
    }

    defer file.Close()

    decoder := json.NewDecoder(file)
    var config = &KafkaConfig{}
    err = decoder.Decode(config);
    if (err != nil) {
        msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
        panic(msg)
    }
    json.Marshal(config)
    return  config
}

func doInitProducer(cfg *KafkaConfig) *kafka.Producer {
    fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
    // 共通引数
    var kafkaconf = &kafka.ConfigMap{
        "api.version.request": "true",
        "message.max.bytes": 1000000,
        "linger.ms": 10,
        "retries": 30,
        "retry.backoff.ms": 1000,
        "acks": "1"}
    kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers)

    switch cfg.SecurityProtocol {
        case "plaintext" :
            kafkaconf.SetKey("security.protocol", "plaintext");
        case "sasl_ssl":
            kafkaconf.SetKey("security.protocol", "sasl_ssl");
            kafkaconf.SetKey("ssl.ca.location", "conf/ca-cert.pem");
            kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
            kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
            kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
            kafkaconf.SetKey("enable.ssl.certificate.verification", "false");
            kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None")
    case "sasl_plaintext":
            kafkaconf.SetKey("sasl.mechanism", "PLAIN")
            kafkaconf.SetKey("security.protocol", "sasl_plaintext");
            kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
            kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
            kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
    default:
            panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
    }

    producer, err := kafka.NewProducer(kafkaconf)
    if err != nil {
        panic(err)
    }
    fmt.Print("init kafka producer success\n")
    return producer;
}

func main() {
    // 正しいプロトコルを選択してください
    // 9092 は PLAINTEXT 用
    // 9093 は SASL_SSL 用、sasl.username と sasl.password を提供する必要があります
    // 9094 は SASL_PLAINTEXT 用、sasl.username と sasl.password を提供する必要があります
    cfg := loadJsonConfig();
    producer := doInitProducer(cfg)

    defer producer.Close()

    // 生成されたメッセージの配信レポートハンドラー
    go func() {
        for e := range producer.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // トピックにメッセージを非同期で生成します
    topic := cfg.Topic
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // シャットダウン前にメッセージ配信を待ちます
    producer.Flush(15 * 1000)
}
                        

メッセージを受信する

consumer.go を実行してメッセージを受信するには、次のコマンドを実行します。

go run -mod=vendor consumer/consumer.go

次のサンプルコードは、consumer.go の例を示しています。

package main

import (
    "encoding/json"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "path/filepath"
)
type KafkaConfig struct {
    Topic      string `json:"topic"`
    GroupId    string `json:"group.id"`
    BootstrapServers    string `json:"bootstrap.servers"`
    SecurityProtocol string `json:"security.protocol"`
    SaslMechanism string `json:"sasl.mechanism"`
    SaslUsername string `json:"sasl.username"`
    SaslPassword string `json:"sasl.password"`
}

// config は構造体へのポインターである必要があります。そうでない場合はパニックが発生します。
func loadJsonConfig() *KafkaConfig {
    workPath, err := os.Getwd()
    if err != nil {
        panic(err)
    }
    configPath := filepath.Join(workPath, "conf")
    fullPath := filepath.Join(configPath, "kafka.json")
    file, err := os.Open(fullPath);
    if (err != nil) {
        msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
        panic(msg)
    }

    defer file.Close()

    decoder := json.NewDecoder(file)
    var config = &KafkaConfig{}
    err = decoder.Decode(config);
    if (err != nil) {
        msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
        panic(msg)
    }
    json.Marshal(config)
    return  config
}


func doInitConsumer(cfg *KafkaConfig) *kafka.Consumer {
    fmt.Print("init kafka consumer, it may take a few seconds to init the connection\n")
    // 共通引数
    var kafkaconf = &kafka.ConfigMap{
        "api.version.request": "true",
        "auto.offset.reset": "latest",
        "heartbeat.interval.ms": 3000,
        "session.timeout.ms": 30000,
        "max.poll.interval.ms": 120000,
        "fetch.max.bytes": 1024000,
        "max.partition.fetch.bytes": 256000}
    kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers);
    kafkaconf.SetKey("group.id", cfg.GroupId)

    switch cfg.SecurityProtocol {
    case "plaintext" :
        kafkaconf.SetKey("security.protocol", "plaintext");
    case "sasl_ssl":
        kafkaconf.SetKey("security.protocol", "sasl_ssl");
        kafkaconf.SetKey("ssl.ca.location", "./conf/ca-cert.pem");
        kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
        kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
        kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
        kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None");
        kafkaconf.SetKey("enable.ssl.certificate.verification", "false")
    case "sasl_plaintext":
        kafkaconf.SetKey("security.protocol", "sasl_plaintext");
        kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
        kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
        kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)

    default:
        panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
    }

    consumer, err := kafka.NewConsumer(kafkaconf)
    if err != nil {
        panic(err)
    }
    fmt.Print("init kafka consumer success\n")
    return consumer;
}

func main() {

    // 正しいプロトコルを選択してください
    // 9092 は PLAINTEXT 用
    // 9093 は SASL_SSL 用、sasl.username と sasl.password を提供する必要があります
    // 9094 は SASL_PLAINTEXT 用、sasl.username と sasl.password を提供する必要があります
    cfg := loadJsonConfig();
    consumer := doInitConsumer(cfg)

    consumer.SubscribeTopics([]string{cfg.Topic}, nil)

    for {
        msg, err := consumer.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // クライアントはすべてのエラーから自動的に回復しようとします。
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    consumer.Close()
}