golang读取kafka

1、golang 读取kafka 如下库

import "github.com/Shopify/sarama"

但此库不支持balanced 读取模式

package sarama
import “github.com/Shopify/sarama”

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of Producer.RequiredAcks. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

To consume messages, use the Consumer. Note that Sarama’s Consumer implementation does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.

package main

import (
    "flag"
    "log"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/wvanbergen/kafka/consumergroup"
    "github.com/wvanbergen/kazoo-go"
)

const (
    DefaultKafkaTopics   = "test_topic"
    DefaultConsumerGroup = "consumer_example.go"
)

var (
    consumerGroup  = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
    kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume")
    zookeeper      = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")

    zookeeperNodes []string
)

func init() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

func main() {
    flag.Parse()

    if *zookeeper == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    config := consumergroup.NewConfig()
    config.Offsets.Initial = sarama.OffsetNewest
    config.Offsets.ProcessingTimeout = 10 * time.Second

    zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

    kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

    consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
    if consumerErr != nil {
        log.Fatalln(consumerErr)
    }

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    go func() {
        <-c
        if err := consumer.Close(); err != nil {
            sarama.Logger.Println("Error closing the consumer", err)
        }
    }()

    go func() {
        for err := range consumer.Errors() {
            log.Println(err)
        }
    }()

    eventCount := 0
    offsets := make(map[string]map[int32]int64)

    for message := range consumer.Messages() {
        if offsets[message.Topic] == nil {
            offsets[message.Topic] = make(map[int32]int64)
        }

        eventCount += 1
        if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
            log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
        }

        // Simulate processing time
        time.Sleep(10 * time.Millisecond)

        offsets[message.Topic][message.Partition] = message.Offset
        consumer.CommitUpto(message)
    }

    log.Printf("Processed %d events.", eventCount)
    log.Printf("%+v", offsets)
}

2、若要使用balance读取模式

import "github.com/wvanbergen/kafka"

关于simple_consumer 和 balanced_consumer 的区别详见前文