本来打算在项目中部署使用rabbitmq,发现对于app应用,rabbitmq还是有点重,考虑到使用和维护难易度,redis在5.0版本以后提供了消息队列功能,也就是stream的相关命令,这里对使用这个组件的一些笔记,生产环境中运行稳定良好。
1. 读取未消费的消息包
至上一次停机后,消息队列中未确认的消息包。(已读取但没有确认处理完毕,重新读取)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// type XAutoClaimArgs struct {
// Stream string
// Group string
// MinIdle time.Duration
// Start string
// Count int64
// Consumer string
// }
args0 := &redis.XAutoClaimArgs{
Stream: this.stream,
Group: this.group,
Start: "0",
Consumer: this.consumer,
}
// playloads 就是这个stream下这个group的这个consumer还没有处理的消息
playloads, _, err := redisInst.XAutoClaim(this.ctx, args0).Result()
if err != nil {
log.Errorf("MysqlServices::logicWork XAutoClaim error:%v", err)
break
}
id := this.last_id
2. 读取自上一次后的未读取的消息包
1 | // type XReadGroupArgs struct { |
3. 处理完毕以后的消息确认(从队列中标记删除)
1 | // successful_ids是所有成功的单号 |
4. 结语
- 上面的XAutoClaimArgs命令要redis 6.2以上版本才支持。
- 发布订阅模式不支持消息队列的持久化
- 基于List的 LPUSH+BRPOP 的实现,没有ACK机制,不能重复消费,不支持多播(多个消费者)和分组。
因此在使用消息队列组件时,redis的stream是一个很好的替代,有持久性以后和ACK机制,可以保证消息不丢失。有分组和多播模式,那么开几组服务器不同的消费者来处理消息保证易于扩展。redis的stream参考