什么是消息队列?
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
如何实现?
之前使用Java的BlockingQueue
和线程池实现了简单的消息队列,但这样的实现有两个问题:
- 内存消耗:若消息很多,则JVM的内存可能被快速耗尽,导致内存不足
- 非持久性存储:JVM的内存不是持久性的,消息队列中的消息可能发生丢失
所以这里采用Redis来实现,可以很好解决上述问题(为什么不用其他中间件?因为Redis成本更低)
Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
基于List结构模拟消息队列
实现方式
使用LPUSH
实现消息进入队列,BRPOP
实现消息有阻塞的取出队列
该实现方法的优缺点
优点
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅) 是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
实现方式
一般通过以下三个指令实现
SUBSCRIBE channel [channel]
:订阅一个或多个频道PUBLISH channel msg
:向一个频道发送消息PSUBSCRIBE pattern [pattern]
:订阅与pattern格式匹配的所有频道
该实现方法的优缺点
优点
- 采用发布订阅模型,支持多生产,多消费
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
单消费者模式
Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。
实现方法
发送消息的命令
1
XADD key ID field string [field string ...]
ID
:指定为*
时,是使用自增长的ID,格式是时间戳-递增数字
。
读取消息的命令
1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
count
:每次读取消息的最大数量milliseconds
:当没有消息时,阻塞的时长。为0的时候永久等待,不给这个参数是不等待key
:消息队列的名字ID
:起始ID,只返回大于该ID的值,0
代表从第一个消息开始,$
代表从最新的消息开始
Stream的特点
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险(使用
$
进行读取时,若在处理当前消息的时候,新增一条以上的消息,则下一次只能读取最新一条,漏掉其他消息)
消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具有以下特点:
消息分流
队列中的消息会分流给组内的不同消费者,加快消息处理速度消息标示
消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,也会从标示之后开始读取,保证每一个消息都会被消费消息确认
消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK
来确认消息,标记消息为已处理,才会从pending-list中移除
实现方法
创建消费者组
1
XGROUP CREATE key groupname id|$ [MKSTREAM]
key
:队列名称groupname
:消费者组名称id
,$
:起始id标示,$
代表列中最后一个消息,0
则代表第一个消息MKSTREAM
:队列不存在时自动创建队列
从消费者组读取消息
1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
ID
:起始id- 使用
>
,表示从下一个未消费的消息开始,防止漏读。 - 其他情况下,根据id从pending-list中获取已消费但未确认的消息。例如
0
,是从pending-list中的第一个消息开始
- 使用
确认消息
1
XACK key group ID [ID ...]
特点
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
基于Stream实现异步秒杀
创建Stream类型的消息队列
1 | xgroup create stream.orders g1 0 mkstream |
修改Lua脚本
修改之前的Lua脚本,实现在认定有抢购资格后,直接向stream.orders中添加消息,包含voucherId
,userId
,orderId
修改后的脚本如下:
1 | --- |
在Service中的实现如下:
1 | /** |
读取消息队列并下单
在项目启动时开启一个线程,用来一直读取消息队列并尝试进行消费,若消费时出现异常则将其加入pending-list
,并尝试把pending-list
中的消息全部ACK。代码实现如下:
1 | private class orderHandlerWithMq implements Runnable { |
这样的实现,有两个很大的问题
- 若处理消息时的异常是一定会出现的,则程序会一直在处理pending-list的地方循环,无法跳出,导致死循环,无法进行消息队列的处理
- 若消息队列不为空就发生了服务器宕机,则再启动服务时会报错,因为对
proxyService
的赋值是在下单时完成的,所以刚开始是默认值null
,这里会抛出空指针异常
以上就是使用Redis stream实现消息队列,并进行秒杀优化的方法,有时间的话应该会解决上面两个问题,并使用Jmeter进行压测,看看性能提升了多少。