在上一篇文章中,我们用线程池实现了异步,但是这么做会有两个问题
- 阻塞队列在内存当中,如果服务器重启会导致任务丢失
- 只能在单体服务器上运行。若是分布式服务器,则需要进行同步工作,造成额外的开销和风险
所以,现在要采用分布式消息队列来实现异步
什么是消息队列?
MQ(全称Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式,队列就是一个消息容器
现实使用中,我们将消息队列称之为中间件,从它的名字就可以看出,消息队列不存储消息内容的本身,它只是消息的搬运工
消息队列的基本模型
- 消息 Message :一个系统想要向另一个系统传递的数据
- 生产者 Producer :产生消息 Message 的系统
- 消费者 Consumer :处理生产者 Producer 产生的消息
- 队列 Queue :暂时用来存放消息
分布式消息队列的优势是什么?
消息队列的优势
- 异步处理,生产者只需要发送消息就可以了,无需关系消费者何时处理消息
- 削峰填谷,当消费者的处理能力有限时,而用户的请求量又很大。我们可以先把用户的请求存储在消息队列中,然后消费者或实际执行应用可以按照自身的处理能力逐步从队列中取出请求
分布式的优势
- 数据持久化,消息队列把消息存储在硬盘里,因此重启服务器后消息不会丢失
- 可拓展性,这意味着可以通过增加机器来提升服务器的消息处理能力
- 应用解耦,这是只有在分布式场景下才能实现的功能,它允许各个使用不同语言框架开发的系统之间进行灵活的数据传输与读取
消息队列的实现
这里我们选择 RabbitMq ,因为他比较容易上手,其他各方面表现也不错
初始化交换机(exchange),队列(queue)和 channel
因为只有一个队列、一个生产者和一个消费者,所以这里我们路由方式选择 direct ,确认方式选择手动(MANUAL)
先成名一个常量类,防止后续出现硬编码
1 2 3 4 5 6 7
| public interface BiMqConstant { String BI_EXCHANGE = "bi_exchange";
String BI_QUEUE_NAME = "bi_queue";
String BI_ROUTING_KEY = "bi_routingKey"; }
|
然后再一个 main
函数中实现初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class BiMqInit { public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(BiMqConstant.BI_EXCHANGE, "direct");
channel.queueDeclare(BiMqConstant.BI_QUEUE_NAME, true, false, false, null); channel.queueBind(BiMqConstant.BI_QUEUE_NAME, BiMqConstant.BI_EXCHANGE, BiMqConstant.BI_ROUTING_KEY); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } } }
|
生产者实现
1 2 3 4 5 6 7 8 9
| @Component public class BiMessageProducer { @Resource private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
|
其实都不需要自己去做封装,直接调用都可以
消费者实现
Spring 提供了 RabbitMq 的封装,所以只需要按照下面这样写就好了
优点是使用方便
缺点是不够灵活
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Component @Slf4j public class BiMessageConsumer { private void rejectMessage(Channel channel, long deliveryTag) { try { channel.basicNack(deliveryTag, false, false); } catch (IOException e) { throw new BusinessException(ErrorCode.SYSTEM_ERROR, "消息拒绝失败"); } } @RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME}, ackMode = "MANUAL") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { if (StringUtils.isBlank(message)) { rejectMessage(channel, deliveryTag); } log.info("receivedMessage message = {}", message); try { } catch (Exception e) { rejectMessage(channel, deliveryTag); } try { channel.basicAck(deliveryTag, false); } catch (IOException e) { throw new BusinessException(ErrorCode.SYSTEM_ERROR, "消息确认失败"); } } }
|