All Products
Search
Document Center

Simple Log Service:Use Sarama-Kafka-Go to consume data

Last Updated:Aug 22, 2023

This topic describes how to use Sarama-Kafka-Go to consume data.

Parameters

Parameter

Description

Example

brokers

The address of the cluster to which you want to establish the initial connection. You can specify a value in the Project.Endpoint:Port format. The endpoint varies based on the region of your Simple Log Service project. For more information, see Endpoints.

  • Virtual private network (VPC): Set the port number to 10011.

  • Internet: Set the port number to 10012.

project.cn-hangzhou.log.aliyuncs.com:10012

version

The version of Kafka.

2.1.0

groupId

The ID of the consumer group.

kafka-test

topics

The name of the Simple Log Service Logstore.

Logstore

conf.Net.SASL.User

The name of the Simple Log Service project.

Project

conf.Net.SASL.Password

The AccessKey pair of your Alibaba Cloud account. You can specify a value in the {access-key-id}#{access-key-secret} format. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user that is granted the write permissions on the Simple Log Service project. For more information about how to grant a RAM user the write permissions on a project, see Use custom policies to grant permissions to a RAM user. For more information about how to obtain an AccessKey pair, see AccessKey pair.

None

conf.Net.SASL.Mechanism

Set the value to PLAIN.

PLAIN

conf.Consumer.Fetch.Min

The minimum number of bytes that can be pulled from the cluster in a request. Default value: 1.

1

conf.Consumer.Fetch.Default

The maximum number of bytes that can be pulled from the cluster in a request.

1024 * 1024

conf.Consumer.Retry.Backoff

The time to wait for a retry after a read request on a partition fails. Default value: 2 s.

2 s

conf.Consumer.MaxWaitTime

The maximum time that the broker waits before the bytes specified by the conf.Consumer.Fetch.Min parameter become available. Default value: 250 ms. Recommend value range: [100,500]. Unit: milliseconds.

250 ms

conf.Consumer.MaxProcessingTime

The maximum time period for which a consumer expects a message to be processed.

200 ms

conf.Consumer.Offsets.AutoCommit.Enable

Specifies whether to automatically commit updated offsets. Default value: true.

true

conf.Consumer.Offsets.AutoCommit.Interval

The interval at which updated offsets are committed. If the conf.Consumer.Offsets.AutoCommit.Enable parameter is set to false, this parameter is invalid. Default value: 1 s.

1 s

conf.Consumer.Offsets.Initial

The offset from which a consumer starts to consume data. Common values: OffsetNewest and OffsetOldest. Default value: OffsetNewest.

  • OffsetNewest: The latest offset is used. A consumer starts to read data from the latest message. If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the latest message.

  • OffsetOldest: The earliest offset is used. A consumer starts to read data from the earliest message. If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the beginning.

OffsetNewest

conf.Consumer.Offsets.Retry.Max

The maximum number of retry attempts that are allowed when a request fails. Default value: 3.

3

Sample code

package main

// SIGUSR1 toggle the pause/resume consumption
import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
)

func main() {

	endpoint  := "cn-beijing.log.aliyuncs.com"
	port      := "10012"
	version   := "2.1.0"
	project   := "test-project"                 
	topics    := "your sls logstore"                       
	// The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. 
	// In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey ID and AccessKey secret to a configuration file. 
	// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
  accessId  := os.Getenv("SLS_ACCESS_KEY_ID") 
  accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET")     
	group     := "test-groupId"                

	keepRunning := true
	log.Println("Starting a new Sarama consumer")

	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	/**
 	 * Construct a new Sarama configuration. 
	 * Before you initialize a consumer or a producer, you must define the version of your Kafka cluster. 
	 */
	brokers := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}

	conf := sarama.NewConfig()
	conf.Version = version

	conf.Net.TLS.Enable = true
	conf.Net.SASL.Enable = true
	conf.Net.SASL.User = project
	conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
	conf.Net.SASL.Mechanism = "PLAIN"

	conf.Consumer.Fetch.Min = 1
	conf.Consumer.Fetch.Default = 1024 * 1024
	conf.Consumer.Retry.Backoff = 2 * time.Second
	conf.Consumer.MaxWaitTime = 250 * time.Millisecond
	conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
	conf.Consumer.Return.Errors = false
	conf.Consumer.Offsets.AutoCommit.Enable = true
	conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
	conf.Consumer.Offsets.Retry.Max = 3

	/**
 	 * Configure a new Sarama consumer group.
 	 */
	consumer := Consumer{
		ready: make(chan bool),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(brokers, group, conf)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	consumptionIsPaused := false
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			 
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			 
      if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	 
	log.Println("Sarama consumer up and running!...")

	sigusr1 := make(chan os.Signal, 1)
	signal.Notify(sigusr1, syscall.SIGUSR1)

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

	for keepRunning {
		select {
		case <-ctx.Done():
			log.Println("terminating: context cancelled")
			keepRunning = false
		case <-sigterm:
			log.Println("terminating: via signal")
			keepRunning = false
		case <-sigusr1:
			toggleConsumptionFlow(client, &consumptionIsPaused)
		}
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
	if *isPaused {
		client.ResumeAll()
		log.Println("Resuming consumption")
	} else {
		client.PauseAll()
		log.Println("Pausing consumption")
	}

	*isPaused = !*isPaused
}

 
type Consumer struct {
	ready chan bool
}

 
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	 
	close(consumer.ready)
	return nil
}

 
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

 
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	 
	 
	 
	for {
		select {
		case message := <-claim.Messages():
			realUnixTimeSeconds := message.Timestamp.Unix()
			if realUnixTimeSeconds < 2000000 {
				realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000
			}

			log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s", string(message.Value), realUnixTimeSeconds, message.Topic)
			session.MarkMessage(message, "")

		 
		  
		case <-session.Context().Done():
			return nil
		}
	}
}