All Products
Document Center

Simple Log Service:Use Sarama-Kafka-Go to consume data

Last Updated:Aug 22, 2023

This topic describes how to use Sarama-Kafka-Go to consume data.






The address of the cluster to which you want to establish the initial connection. You can specify a value in the Project.Endpoint:Port format. The endpoint varies based on the region of your Simple Log Service project. For more information, see Endpoints.

  • Virtual private network (VPC): Set the port number to 10011.

  • Internet: Set the port number to 10012.


The version of Kafka.



The ID of the consumer group.



The name of the Simple Log Service Logstore.



The name of the Simple Log Service project.



The AccessKey pair of your Alibaba Cloud account. You can specify a value in the {access-key-id}#{access-key-secret} format. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user that is granted the write permissions on the Simple Log Service project. For more information about how to grant a RAM user the write permissions on a project, see Use custom policies to grant permissions to a RAM user. For more information about how to obtain an AccessKey pair, see AccessKey pair.



Set the value to PLAIN.



The minimum number of bytes that can be pulled from the cluster in a request. Default value: 1.



The maximum number of bytes that can be pulled from the cluster in a request.

1024 * 1024


The time to wait for a retry after a read request on a partition fails. Default value: 2 s.

2 s


The maximum time that the broker waits before the bytes specified by the conf.Consumer.Fetch.Min parameter become available. Default value: 250 ms. Recommend value range: [100,500]. Unit: milliseconds.

250 ms


The maximum time period for which a consumer expects a message to be processed.

200 ms


Specifies whether to automatically commit updated offsets. Default value: true.



The interval at which updated offsets are committed. If the conf.Consumer.Offsets.AutoCommit.Enable parameter is set to false, this parameter is invalid. Default value: 1 s.

1 s


The offset from which a consumer starts to consume data. Common values: OffsetNewest and OffsetOldest. Default value: OffsetNewest.

  • OffsetNewest: The latest offset is used. A consumer starts to read data from the latest message. If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the latest message.

  • OffsetOldest: The earliest offset is used. A consumer starts to read data from the earliest message. If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the beginning.



The maximum number of retry attempts that are allowed when a request fails. Default value: 3.


Sample code

package main

// SIGUSR1 toggle the pause/resume consumption
import (


func main() {

	endpoint  := ""
	port      := "10012"
	version   := "2.1.0"
	project   := "test-project"                 
	topics    := "your sls logstore"                       
	// The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. 
	// In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey ID and AccessKey secret to a configuration file. 
	// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
  accessId  := os.Getenv("SLS_ACCESS_KEY_ID") 
  accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET")     
	group     := "test-groupId"                

	keepRunning := true
	log.Println("Starting a new Sarama consumer")

	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)

 	 * Construct a new Sarama configuration. 
	 * Before you initialize a consumer or a producer, you must define the version of your Kafka cluster. 
	brokers := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}

	conf := sarama.NewConfig()
	conf.Version = version

	conf.Net.TLS.Enable = true
	conf.Net.SASL.Enable = true
	conf.Net.SASL.User = project
	conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
	conf.Net.SASL.Mechanism = "PLAIN"

	conf.Consumer.Fetch.Min = 1
	conf.Consumer.Fetch.Default = 1024 * 1024
	conf.Consumer.Retry.Backoff = 2 * time.Second
	conf.Consumer.MaxWaitTime = 250 * time.Millisecond
	conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
	conf.Consumer.Return.Errors = false
	conf.Consumer.Offsets.AutoCommit.Enable = true
	conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
	conf.Consumer.Offsets.Retry.Max = 3

 	 * Configure a new Sarama consumer group.
	consumer := Consumer{
		ready: make(chan bool),

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(brokers, group, conf)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)

	consumptionIsPaused := false
	wg := &sync.WaitGroup{}
	go func() {
		defer wg.Done()
		for {
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
      if ctx.Err() != nil {
			consumer.ready = make(chan bool)

	log.Println("Sarama consumer up and running!...")

	sigusr1 := make(chan os.Signal, 1)
	signal.Notify(sigusr1, syscall.SIGUSR1)

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

	for keepRunning {
		select {
		case <-ctx.Done():
			log.Println("terminating: context cancelled")
			keepRunning = false
		case <-sigterm:
			log.Println("terminating: via signal")
			keepRunning = false
		case <-sigusr1:
			toggleConsumptionFlow(client, &consumptionIsPaused)
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
	if *isPaused {
		log.Println("Resuming consumption")
	} else {
		log.Println("Pausing consumption")

	*isPaused = !*isPaused

type Consumer struct {
	ready chan bool

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	return nil

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message := <-claim.Messages():
			realUnixTimeSeconds := message.Timestamp.Unix()
			if realUnixTimeSeconds < 2000000 {
				realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000

			log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s", string(message.Value), realUnixTimeSeconds, message.Topic)
			session.MarkMessage(message, "")

		case <-session.Context().Done():
			return nil