编辑
2025-01-15
技术
00

目录

前置准备
XDGSCRAM 解密
消费者接入:

前置准备

安装kafka-sarama库

直接附带我的go mod:

go
require ( github.com/IBM/sarama v1.45.0 github.com/xdg/scram v1.0.5 go.uber.org/zap v1.27.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/xdg/stringprep v1.0.3 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/text v0.21.0 // indirect )

XDGSCRAM 解密

go
import ( "crypto/sha256" "crypto/sha512" "github.com/xdg/scram" ) var ( // SHA256 ... SHA256 scram.HashGeneratorFcn = sha256.New // SHA512 ... SHA512 scram.HashGeneratorFcn = sha512.New ) // XDGSCRAMClient ... type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } // Begin ... func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } // Step ... func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } // Done ... func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() }

消费者接入:

go
package main import ( "context" "log" "os" "os/signal" "sync" "syscall" "github.com/IBM/sarama" ) type Kafka struct { brokers []string topics []string startOffset int64 version string ready chan bool group string channelBufferSize int assignor string } var assignor = "range" var topics = []string{"your_topic"} // topic var group = "consumer_group" // 订阅消费组 var user = "user" // sarma用户 var password = "pwd" // sarama 密码 var brokers = []string{"127.0.0.1:6279"} // 你的kafka host func NewKafka() *Kafka { return &Kafka{ brokers: brokers, topics: topics, group: group, channelBufferSize: 1000, ready: make(chan bool), assignor: assignor, } } func (k *Kafka) Connect() func() { log.Println("kafka init...") config := sarama.NewConfig() config.Version = sarama.V2_4_1_0 // 版本号必须注意要同步,否则会报错 // 分区分配策略 switch assignor { case "sticky": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky case "roundrobin": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin case "range": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } config.Consumer.Offsets.Initial = sarama.OffsetNewest config.ChannelBufferSize = k.channelBufferSize // channel长度 config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Net.SASL.Enable = true config.Net.SASL.User = user config.Net.SASL.Password = password config.Net.SASL.Handshake = true config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 // 创建client log.Println("brokers: ", brokers) newClient, err := sarama.NewClient(brokers, config) if err != nil { log.Fatalf("Error creating client: %v", err) } // 获取所有的topic topics, err := newClient.Topics() if err != nil { log.Fatal(err) } log.Println("topics: ", topics) // 根据client创建consumerGroup client, err := sarama.NewConsumerGroupFromClient(k.group, newClient) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { if err := client.Consume(ctx, k.topics, k); err != nil { // 当setup失败的时候,error会返回到这里 log.Println("Error from consumer: %v", err) return } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { log.Println(ctx.Err()) return } k.ready = make(chan bool) } }() <-k.ready log.Println("Sarama consumer up and running!...") // 保证在系统退出时,通道里面的消息被消费 return func() { log.Println("kafka close") cancel() wg.Wait() if err = client.Close(); err != nil { log.Println("Error closing client: %v", err) } } } // Setup is run at the beginning of a new session, before ConsumeClaim func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Println("setup") session.ResetOffset("lvmi", 0, 13, "") log.Println(session.Claims()) // Mark the consumer as ready close(k.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error { log.Println("cleanup") return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 // 具体消费消息 for message := range claim.Messages() { log.Printf("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]", message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp) // 更新位移 session.MarkMessage(message, "") } return nil } func main() { k := NewKafka() c := k.Connect() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } c() }

本文作者:伞菌

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!