0%

golang笔记-redis的stream试用

本来打算在项目中部署使用rabbitmq,发现对于app应用,rabbitmq还是有点重,考虑到使用和维护难易度,redis在5.0版本以后提供了消息队列功能,也就是stream的相关命令,这里对使用这个组件的一些笔记,生产环境中运行稳定良好。

1. 读取未消费的消息包

至上一次停机后,消息队列中未确认的消息包。(已读取但没有确认处理完毕,重新读取)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// type XAutoClaimArgs struct {
// Stream string
// Group string
// MinIdle time.Duration
// Start string
// Count int64
// Consumer string
// }
args0 := &redis.XAutoClaimArgs{
Stream: this.stream,
Group: this.group,
Start: "0",
Consumer: this.consumer,
}
// playloads 就是这个stream下这个group的这个consumer还没有处理的消息
playloads, _, err := redisInst.XAutoClaim(this.ctx, args0).Result()
if err != nil {
log.Errorf("MysqlServices::logicWork XAutoClaim error:%v", err)
break
}
id := this.last_id

2. 读取自上一次后的未读取的消息包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// type XReadGroupArgs struct {
// Group string
// Consumer string
// Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
// Count int64
// Block time.Duration
// NoAck bool
// }
args := &redis.XReadGroupArgs{
Group: this.group,
Consumer: this.consumer,
Streams: []string{this.stream, id},
Count: 100,
Block: -1,
}
// type XStream struct {
// Stream string
// Messages []XMessage
// }

// type XMessage struct {
// ID string
// Values map[string]interface{}
// }
// 读取自上一次读取的消息id后的所有消息包,进行处理
streams, err := redisInst.XReadGroup(this.ctx, args).Result()
if err == redis.Nil {
continue
}
if err != nil {
log.Errorf("MysqlServices::logicWork XReadGroup error:%v", err)
continue
}

3. 处理完毕以后的消息确认(从队列中标记删除)

1
2
3
4
5
6
7
// successful_ids是所有成功的单号
if len(successful_ids) != 0 {
_, err = redisInst.XAck(this.ctx, this.stream, this.group, successful_ids...).Result()
if err != nil {
log.Errorf("MysqlServices::logicWork XAck error:%v", err)
}
}

4. 结语

  • 上面的XAutoClaimArgs命令要redis 6.2以上版本才支持。
  • 发布订阅模式不支持消息队列的持久化
  • 基于List的 LPUSH+BRPOP 的实现,没有ACK机制,不能重复消费,不支持多播(多个消费者)和分组。
    因此在使用消息队列组件时,redis的stream是一个很好的替代,有持久性以后和ACK机制,可以保证消息不丢失。有分组和多播模式,那么开几组服务器不同的消费者来处理消息保证易于扩展。redis的stream参考