All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for Go to send and receive messages

Last Updated:Mar 15, 2024

This topic describes how to use the SDK for Go to connect to ApsaraMQ for Kafka to send and receive messages.

Environment requirements

Go is installed. For more information, visit the download page of Go.

Note

You cannot run kafka-confluent-go-demo in the Windows system.

Create configuration files

  1. (Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.

  2. Go to the aliware-kafka-demos page. Click the code icon and select Download ZIP to download the demo project. Then, decompress the package of the demo project.

  3. In the decompressed package, find the kafka-confluent-go-demo folder and upload the folder to the /home directory in your Linux system.

  4. Log on to the Linux system, go to the /home/kafka-confluent-go-demo directory, and then modify the conf/kafka.json configuration file.

    {
      "topic": "XXX",
      "group.id": "XXX",
      "bootstrap.servers" : "XXX:XX,XXX:XX,XXX:XX",
      "security.protocol" : "plaintext",
      "sasl.mechanism" : "XXX",
      "sasl.username" : "XXX",
      "sasl.password" : "XXX"
    }

    Parameter

    Description

    topic

    The name of the topic that you created on the ApsaraMQ for Kafka instance. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    group.id

    The ID of the group that you created on the ApsaraMQ for Kafka instance. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.

    Note

    If the client runs producer.go to send messages, this parameter is optional. If the client runs consumer.go to receive messages, this parameter is required.

    bootstrap.servers

    The IP address and port of the SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    security.protocol

    The protocol that you want to use to authenticate Simple Authentication and Security Layer (SASL) users. Default value: plaintext. Specify a value based on the type of the endpoint that you use to connect to the ApsaraMQ for Kafka instance:

    • If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to plaintext.

    • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to sasl_ssl.

    • If you use the SASL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to sasl_plaintext.

    sasl.mechanism

    The security mechanism that you want to use to send and receive messages. Specify a value based on the type of the endpoint that you use to connect to the ApsaraMQ for Kafka instance:

    • If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, you do not need to configure this parameter.

    • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to PLAIN.

    • If you use the SASL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to PLAIN or SCRAM-SHA-256.

    sasl.username

    The username of the SASL user. This parameter is required if you use the SSL or SASL endpoint to connect to the ApsaraMQ for Kafka instance.

    Note
    • If the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.

    sasl.password

    The password of the SASL user. This parameter is required if you use the SSL or SASL endpoint to connect to the ApsaraMQ for Kafka instance.

Send messages

Run the following command to run producer.go to send messages:

go run -mod=vendor producer/producer.go

The following sample code provides an example of producer.go:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "path/filepath"
)

type KafkaConfig struct {
    Topic      string `json:"topic"`
    GroupId    string `json:"group.id"`
    BootstrapServers    string `json:"bootstrap.servers"`
    SecurityProtocol string `json:"security.protocol"`
    SslCaLocation string `json:"ssl.ca.location"`
    SaslMechanism string `json:"sasl.mechanism"`
    SaslUsername string `json:"sasl.username"`
    SaslPassword string `json:"sasl.password"`
}

// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
    workPath, err := os.Getwd()
    if err != nil {
        panic(err)
    }
    configPath := filepath.Join(workPath, "conf")
    fullPath := filepath.Join(configPath, "kafka.json")
    file, err := os.Open(fullPath);
    if (err != nil) {
        msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
        panic(msg)
    }

    defer file.Close()

    decoder := json.NewDecoder(file)
    var config = &KafkaConfig{}
    err = decoder.Decode(config);
    if (err != nil) {
        msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
        panic(msg)
    }
    json.Marshal(config)
    return  config
}

func doInitProducer(cfg *KafkaConfig) *kafka.Producer {
    fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
    //common arguments
    var kafkaconf = &kafka.ConfigMap{
        "api.version.request": "true",
        "message.max.bytes": 1000000,
        "linger.ms": 10,
        "retries": 30,
        "retry.backoff.ms": 1000,
        "acks": "1"}
    kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers)

    switch cfg.SecurityProtocol {
        case "plaintext" :
            kafkaconf.SetKey("security.protocol", "plaintext");
        case "sasl_ssl":
            kafkaconf.SetKey("security.protocol", "sasl_ssl");
            kafkaconf.SetKey("ssl.ca.location", "conf/ca-cert.pem");
            kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
            kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
            kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
            kafkaconf.SetKey("enable.ssl.certificate.verification", "false");
            kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None")
    case "sasl_plaintext":
            kafkaconf.SetKey("sasl.mechanism", "PLAIN")
            kafkaconf.SetKey("security.protocol", "sasl_plaintext");
            kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
            kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
            kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
    default:
            panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
    }

    producer, err := kafka.NewProducer(kafkaconf)
    if err != nil {
        panic(err)
    }
    fmt.Print("init kafka producer success\n")
    return producer;
}

func main() {
    // Choose the correct protocol
    // 9092 for PLAINTEXT
    // 9093 for SASL_SSL, need to provide sasl.username and sasl.password
    // 9094 for SASL_PLAINTEXT, need to provide sasl.username and sasl.password
    cfg := loadJsonConfig();
    producer := doInitProducer(cfg)

    defer producer.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range producer.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := cfg.Topic
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    producer.Flush(15 * 1000)
}
                        

Receive messages

Run the following command to run consumer.go to receive messages:

go run -mod=vendor consumer/consumer.go

The following sample code provides an example of consumer.go:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "path/filepath"
)
type KafkaConfig struct {
    Topic      string `json:"topic"`
    GroupId    string `json:"group.id"`
    BootstrapServers    string `json:"bootstrap.servers"`
    SecurityProtocol string `json:"security.protocol"`
    SaslMechanism string `json:"sasl.mechanism"`
    SaslUsername string `json:"sasl.username"`
    SaslPassword string `json:"sasl.password"`
}

// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
    workPath, err := os.Getwd()
    if err != nil {
        panic(err)
    }
    configPath := filepath.Join(workPath, "conf")
    fullPath := filepath.Join(configPath, "kafka.json")
    file, err := os.Open(fullPath);
    if (err != nil) {
        msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
        panic(msg)
    }

    defer file.Close()

    decoder := json.NewDecoder(file)
    var config = &KafkaConfig{}
    err = decoder.Decode(config);
    if (err != nil) {
        msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
        panic(msg)
    }
    json.Marshal(config)
    return  config
}


func doInitConsumer(cfg *KafkaConfig) *kafka.Consumer {
    fmt.Print("init kafka consumer, it may take a few seconds to init the connection\n")
    //common arguments
    var kafkaconf = &kafka.ConfigMap{
        "api.version.request": "true",
        "auto.offset.reset": "latest",
        "heartbeat.interval.ms": 3000,
        "session.timeout.ms": 30000,
        "max.poll.interval.ms": 120000,
        "fetch.max.bytes": 1024000,
        "max.partition.fetch.bytes": 256000}
    kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers);
    kafkaconf.SetKey("group.id", cfg.GroupId)

    switch cfg.SecurityProtocol {
    case "plaintext" :
        kafkaconf.SetKey("security.protocol", "plaintext");
    case "sasl_ssl":
        kafkaconf.SetKey("security.protocol", "sasl_ssl");
        kafkaconf.SetKey("ssl.ca.location", "./conf/ca-cert.pem");
        kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
        kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
        kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
        kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None");
        kafkaconf.SetKey("enable.ssl.certificate.verification", "false")
    case "sasl_plaintext":
        kafkaconf.SetKey("security.protocol", "sasl_plaintext");
        kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
        kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
        kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)

    default:
        panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
    }

    consumer, err := kafka.NewConsumer(kafkaconf)
    if err != nil {
        panic(err)
    }
    fmt.Print("init kafka consumer success\n")
    return consumer;
}

func main() {

    // Choose the correct protocol
    // 9092 for PLAINTEXT
    // 9093 for SASL_SSL, need to provide sasl.username and sasl.password
    // 9094 for SASL_PLAINTEXT, need to provide sasl.username and sasl.password
    cfg := loadJsonConfig();
    consumer := doInitConsumer(cfg)

    consumer.SubscribeTopics([]string{cfg.Topic}, nil)

    for {
        msg, err := consumer.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    consumer.Close()
}