1、golang 读取kafka 如下库
import "github.com/Shopify/sarama"
但此库不支持balanced 读取模式
package sarama
import “github.com/Shopify/sarama”
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of Producer.RequiredAcks
. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
To consume messages, use the Consumer. Note that Sarama’s Consumer implementation does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
package main
import (
"flag"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"
)
const (
DefaultKafkaTopics = "test_topic"
DefaultConsumerGroup = "consumer_example.go"
)
var (
consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume")
zookeeper = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")
zookeeperNodes []string
)
func init() {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
func main() {
flag.Parse()
if *zookeeper == "" {
flag.PrintDefaults()
os.Exit(1)
}
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")
consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
if err := consumer.Close(); err != nil {
sarama.Logger.Println("Error closing the consumer", err)
}
}()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
offsets := make(map[string]map[int32]int64)
for message := range consumer.Messages() {
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
eventCount += 1
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}
// Simulate processing time
time.Sleep(10 * time.Millisecond)
offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", offsets)
}
2、若要使用balance读取模式
import "github.com/wvanbergen/kafka"
关于simple_consumer 和 balanced_consumer 的区别详见前文