安装kafka-sarama库
直接附带我的go mod:
gorequire (
github.com/IBM/sarama v1.45.0
github.com/xdg/scram v1.0.5
go.uber.org/zap v1.27.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.21.0 // indirect
)
go
import (
"crypto/sha256"
"crypto/sha512"
"github.com/xdg/scram"
)
var (
// SHA256 ...
SHA256 scram.HashGeneratorFcn = sha256.New
// SHA512 ...
SHA512 scram.HashGeneratorFcn = sha512.New
)
// XDGSCRAMClient ...
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
// Begin ...
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
// Step ...
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
// Done ...
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
gopackage main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/IBM/sarama"
)
type Kafka struct {
brokers []string
topics []string
startOffset int64
version string
ready chan bool
group string
channelBufferSize int
assignor string
}
var assignor = "range"
var topics = []string{"your_topic"} // topic
var group = "consumer_group" // 订阅消费组
var user = "user" // sarma用户
var password = "pwd" // sarama 密码
var brokers = []string{"127.0.0.1:6279"} // 你的kafka host
func NewKafka() *Kafka {
return &Kafka{
brokers: brokers,
topics: topics,
group: group,
channelBufferSize: 1000,
ready: make(chan bool),
assignor: assignor,
}
}
func (k *Kafka) Connect() func() {
log.Println("kafka init...")
config := sarama.NewConfig()
config.Version = sarama.V2_4_1_0 // 版本号必须注意要同步,否则会报错
// 分区分配策略
switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.ChannelBufferSize = k.channelBufferSize // channel长度
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Net.SASL.Enable = true
config.Net.SASL.User = user
config.Net.SASL.Password = password
config.Net.SASL.Handshake = true
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
// 创建client
log.Println("brokers: ", brokers)
newClient, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatalf("Error creating client: %v", err)
}
// 获取所有的topic
topics, err := newClient.Topics()
if err != nil {
log.Fatal(err)
}
log.Println("topics: ", topics)
// 根据client创建consumerGroup
client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, k.topics, k); err != nil {
// 当setup失败的时候,error会返回到这里
log.Println("Error from consumer: %v", err)
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
k.ready = make(chan bool)
}
}()
<-k.ready
log.Println("Sarama consumer up and running!...")
// 保证在系统退出时,通道里面的消息被消费
return func() {
log.Println("kafka close")
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Println("Error closing client: %v", err)
}
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
log.Println("setup")
session.ResetOffset("lvmi", 0, 13, "")
log.Println(session.Claims())
// Mark the consumer as ready
close(k.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("cleanup")
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// 具体消费消息
for message := range claim.Messages() {
log.Printf("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
// 更新位移
session.MarkMessage(message, "")
}
return nil
}
func main() {
k := NewKafka()
c := k.Connect()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
log.Println("terminating: via signal")
}
c()
}
本文作者:伞菌
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!