【消息队列开发】 实现消费者订阅消息

拼搏现实的明天。 2024-04-17 10:55 147阅读 0赞

文章目录

  • ?前言
  • ?关于订阅消息方法参数解析
  • ?如何实现将消息推送给消费者
  • ?消费者类
  • ?消费消息的流程
  • ?如何实现消息确认呢?
  • ⭕总结

?前言

本次开发任务

  • 实现消费者订阅消息

?关于订阅消息方法参数解析

我们关于订阅消息的方法如下:

在这里插入图片描述

  • consumerTag: 消费者的身份标识
  • autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.
  • consumer: 是一个回调函数,此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了
  • queueName:需要订阅消息的队列

在这里插入图片描述

回调函数实现代码如下:

  1. /*
  2. * 只是一个单纯的函数式接口(回调函数). 收到消息之后要处理消息时调用的方法.
  3. */
  4. @FunctionalInterface
  5. public interface Consumer {
  6. // Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.
  7. // 通过这个方法把消息推送给对应的消费者.
  8. void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
  9. }

?如何实现将消息推送给消费者

消费者订阅消息后,如何将消息推送给消费者呢?

首先我们需要知道一个队列都有那些消费者进行消费,且一个队列的消费者肯定不止一个。

这里博主的做法是,给最初的队列类,加一个集合类的属性,用于储存当前都有那些消费者进行消费

这么多的消费者,那他们应该怎样消费这些消息呢?

这里博主采用轮询的方式进行消费,轮着进行消费就好

以此我们再创建一个属性为 consumerSeq 记录当前取到了第几个消费者. 方便实现轮询策略.

在这里插入图片描述

?消费者类

对于消费者,我们创建一个类,并给出基础属性,实现如下:

  1. /*
  2. * 表示一个消费者(完整的执行环境)
  3. */
  4. public class ConsumerEnv {
  5. private String consumerTag;
  6. private String queueName;
  7. private boolean autoAck;
  8. // 通过这个回调来处理收到的消息.
  9. private Consumer consumer;
  10. public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
  11. this.consumerTag = consumerTag;
  12. this.queueName = queueName;
  13. this.autoAck = autoAck;
  14. this.consumer = consumer;
  15. }
  16. public String getConsumerTag() {
  17. return consumerTag;
  18. }
  19. public void setConsumerTag(String consumerTag) {
  20. this.consumerTag = consumerTag;
  21. }
  22. public String getQueueName() {
  23. return queueName;
  24. }
  25. public void setQueueName(String queueName) {
  26. this.queueName = queueName;
  27. }
  28. public boolean isAutoAck() {
  29. return autoAck;
  30. }
  31. public void setAutoAck(boolean autoAck) {
  32. this.autoAck = autoAck;
  33. }
  34. public Consumer getConsumer() {
  35. return consumer;
  36. }
  37. public void setConsumer(Consumer consumer) {
  38. this.consumer = consumer;
  39. }
  40. }

?消费消息的流程

如果有消息需要进行消费,我们就将该队列放入 一个阻塞队列中,并用一个扫描线程对给该队列进行扫描。若有需要消费的消息队列,就将该队列取出来,交给线程池进行执行信息的回调函数。
在这里插入图片描述
该流程,我们创建一个类ConsumerManager进行实现这些操作:

在这里插入图片描述
该部分代码后面博主再进行书写

?如何实现消息确认呢?

我们在执行回调函数之前,先将需要消费的该消息放入待确认的集合中,若执行回调函数没有发生异常与错误,我们就认为消息消费成功。实现了消息确认。

⭕总结

关于《【消息队列开发】 实现消费者订阅消息》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

发表评论

表情:
评论列表 (有 0 条评论,147人围观)

还没有评论,来说两句吧...

相关阅读