kafka
问题描述:kafka 重复消费的问题,每次重启服务,都从broker拉取offset,拉到旧的消息
解决:消费者每次从broker拉取数据,消费之后都会自动commit,自动commit有一个定时机制,但是并不保证一定提交成功,例如在commit的时候,程序崩溃(例如重启的情况
在重启后,消费者会拉取最新的offset作为消费的起点,sarama默认的initial offset 为OffsetNewest,这个参数表示,每次从broker拉取最新的消息消费,而OffsetOldest则会拉取最旧的消息进行消费,而我们的业务代码中配置的是拉取最旧的消息,并且没有做幂等处理,所以导致重复消息消费。
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
sarama kafka producer and comsumer lib