使用Redis实现自己的消息队列

6.2k 词

什么是消息队列?

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

如何实现?

之前使用Java的BlockingQueue和线程池实现了简单的消息队列,但这样的实现有两个问题:

  • 内存消耗:若消息很多,则JVM的内存可能被快速耗尽,导致内存不足
  • 非持久性存储:JVM的内存不是持久性的,消息队列中的消息可能发生丢失

所以这里采用Redis来实现,可以很好解决上述问题(为什么不用其他中间件?因为Redis成本更低

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

基于List结构模拟消息队列

实现方式

使用LPUSH实现消息进入队列,BRPOP实现消息有阻塞的取出队列

该实现方法的优缺点

优点

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅) 是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

实现方式

一般通过以下三个指令实现

  • SUBSCRIBE channel [channel]:订阅一个或多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

该实现方法的优缺点

优点

  • 采用发布订阅模型,支持多生产,多消费

缺点

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

  • 单消费者模式

    Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。

    实现方法

    发送消息的命令

    1
    XADD key ID field string [field string ...]
    • ID:指定为*时,是使用自增长的ID,格式是时间戳-递增数字

    读取消息的命令

    1
    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    • count:每次读取消息的最大数量
    • milliseconds:当没有消息时,阻塞的时长。为0的时候永久等待,不给这个参数是不等待
    • key:消息队列的名字
    • ID:起始ID,只返回大于该ID的值,0代表从第一个消息开始,$代表从最新的消息开始

    Stream的特点

    • 消息可回溯
    • 一个消息可以被多个消费者读取
    • 可以阻塞读取
    • 有消息漏读的风险(使用$进行读取时,若在处理当前消息的时候,新增一条以上的消息,则下一次只能读取最新一条,漏掉其他消息)
  • 消费者组模式

    消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

    具有以下特点:

    • 消息分流

      队列中的消息会分流给组内的不同消费者,加快消息处理速度
    • 消息标示

      消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,也会从标示之后开始读取,保证每一个消息都会被消费
    • 消息确认

      消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除

    实现方法

    创建消费者组

    1
    XGROUP CREATE key groupname id|$ [MKSTREAM]
    • key:队列名称
    • groupname:消费者组名称
    • id$:起始id标示,$代表列中最后一个消息,0则代表第一个消息
    • MKSTREAM:队列不存在时自动创建队列

    从消费者组读取消息

    1
    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    • ID:起始id
      • 使用>,表示从下一个未消费的消息开始,防止漏读。
      • 其他情况下,根据id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始

    确认消息

    1
    XACK key group ID [ID ...]

    特点

    • 消息可回溯
    • 可以多消费者争抢消息,加快消费速度
    • 可以阻塞读取
    • 没有消息漏读风险
    • 有消息确认机制,保证消息至少被消费一次

基于Stream实现异步秒杀

创建Stream类型的消息队列

1
xgroup create stream.orders g1 0 mkstream

修改Lua脚本

修改之前的Lua脚本,实现在认定有抢购资格后,直接向stream.orders中添加消息,包含voucherIduserIdorderId

修改后的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
---
--- Created by Strelitzia.
--- DateTime: 2024/5/12 20:33
---
-- 参数列表
local voucherId = ARGV[1] -- 优惠券id
local userId = ARGV[2] -- 用户id
local orderId = ARGV[3] -- 订单id

-- 数据key
local stockKey = 'seckill:stock:' .. voucherId -- 库存key
local orderKey = 'seckill:order:' .. voucherId -- 订单key

-- 脚本业务
-- 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足,返回1
return 1
end
-- 判断用户是否下单 sismember orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
-- 存在,说明是重复下单
return 2
end
-- 扣减库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 下单 sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 发送消息到队列 xadd stream.orders * k1 v1 k2 v2 k3 v3
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

在Service中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 基于消息队列实现异步秒杀
* @param voucherId
* @return
*/
@Override
public Result seckillVoucherAsyncWithMq(Long voucherId) {
Long userId = UserHolder.getUser().getId();
Long orderId = redisIdWorker.nextId(RedisConstants.ORDER_ID_PREFIX);
// 使用Lua脚本实现在Redis中的查询
// 执行Lua脚本
Long result = redisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
orderId.toString()
);
// 判断结果
if (result == null) throw new LuaScriptException();
switch (result.intValue()) {
case 1 -> {
log.warn("库存不足");
return Result.fail(MessageConstant.STOCK_NOT_ENOUGH);
}
case 2 -> {
log.warn("重复购买");
return Result.fail(MessageConstant.REPEAT_PURCHASE);
}
}
proxyService = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}

读取消息队列并下单

在项目启动时开启一个线程,用来一直读取消息队列并尝试进行消费,若消费时出现异常则将其加入pending-list,并尝试把pending-list中的消息全部ACK。代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private class orderHandlerWithMq implements Runnable {

private final String streamName = "stream.orders";
private final String groupName = "g1", consumerName = "c1";
@Override
public void run() {
while (true) {
try {
// 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders
List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(streamName, ReadOffset.lastConsumed())
);
// 判断消息是否获取成功
if (list == null || list.isEmpty()) {
continue;
}
// 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 写入订单
proxyService.createOrderAsync(voucherOrder);
// ACK确认 SACK stream.orders g1 id
redisTemplate.opsForStream().acknowledge(streamName, groupName, record.getId());
} catch (Exception e) {
log.error("订单处理异常:", e);
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 获取消息队列中的订单信息 XPENDING stream.orders g1 - + 1
PendingMessages pendingMessages = redisTemplate.opsForStream().pending(
streamName,
groupName,
Range.unbounded(),
1
);
// 判断消息是否获取成功
if (pendingMessages.isEmpty()) {
break;
}
// 解析消息中的订单信息
// 将当前消息重新分配给消费者
List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().claim(
streamName,
groupName,
consumerName,
Duration.ofMillis(1),
pendingMessages.get(0).getId()
);
Map<Object, Object> values = list.get(0).getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 写入订单
proxyService.createOrderAsync(voucherOrder);
// ACK确认 SACK stream.orders g1 id
redisTemplate.opsForStream().acknowledge(streamName, groupName, list.get(0).getId());
} catch (Exception e) {
log.error("订单处理异常:", e);
// 若处理不成功,则等一下再尝试
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}

这样的实现,有两个很大的问题

  • 若处理消息时的异常是一定会出现的,则程序会一直在处理pending-list的地方循环,无法跳出,导致死循环,无法进行消息队列的处理
  • 若消息队列不为空就发生了服务器宕机,则再启动服务时会报错,因为对proxyService的赋值是在下单时完成的,所以刚开始是默认值null,这里会抛出空指针异常

以上就是使用Redis stream实现消息队列,并进行秒杀优化的方法,有时间的话应该会解决上面两个问题,并使用Jmeter进行压测,看看性能提升了多少。