springboot与ActiveMQ整合

ゞ 浴缸里的玫瑰 2022-03-18 08:58 258阅读 0赞

前言

  很多项目, 都不是一个系统就做完了. 而是好多个系统, 相互协作来完成功能. 那, 系统与系统之间, 不可能完全独立吧?

  如: 在学校所用的管理系统中, 有学生系统, 资产系统, 宿舍系统等等. 当学期结束之后, 是否需要对已经结束的期次进行归档操作. 假如归档功能在学生系统中, 那点击归档之后, 学生是不是还要关心宿舍那边是否已结束, 学生所领资产是否全都归还?

  显然, 这并不是一个好的方式, 系统之间的耦合性做的太强了, 很不利于系统扩展, 而且, 一步操作, 可能要等很久很久, 才能完成. 用户可愿意等?

  既然同步归档不可能了, 那是否有办法实现异步归档? 异步归档怎么实现呢?

  我们其实可以通过消息队列来实现异步归档. 学生这边点击归档后, 发个消息到队列中, 其他系统自行去读取, 然后完成各自系统应该完成的工作.

ActiveMQ下载安装

  下载地址: http://activemq.apache.org/download.html

  安装过程比较简单, 在centos中, 解压出来, 就算是安装好了

  运行方法:

640690-20180224123108276-650806528.png

  运行起来后, 可以通过 ip:8161 来查看是否成功.

  640690-20180224125247546-2097228680.png

点击红框中的链接, 会出现登录弹框, 账号密码默认都是admin.

640690-20180224125403078-1969812703.png

springboot整合activemq

一. 目录结构

640690-20180224123327661-1955389331.png

producer : 消息生产者

consumer-a : 消息消费者

consumer-b : 消息消费者

pom文件:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. </dependency>

如果使用pool的话, 就需要在pom中加入以下依赖:

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-pool</artifactId>
  4. <version>5.14.5</version>
  5. </dependency>

二. producer

  1. 目录结构

640690-20180224123759634-41515991.png

  1. yml文件:
  1. server:
  2. port: 8080
  3. context-path: /pro
  4. spring:
  5. activemq:
  6. user: admin
  7. password: admin
  8. broker-url: tcp://192.168.153.129:61616
  9. pool:
  10. enabled: true
  11. max-connections: 10
  12. queueName: publish.queue
  13. topicName: publish.topic

这里我开启了连接池, 默认是不开的.

这里要注意端口, 不是之前的8161.

  1. 配置文件 ActiveMQConfig

    @Configuration
    public class ActiveMQConfig {

    1. @Value("${queueName}")
    2. private String queueName;
    3. @Value("${topicName}")
    4. private String topicName;
    5. @Value("${spring.activemq.user}")
    6. private String usrName;
    7. @Value("${spring.activemq.password}")
    8. private String password;
    9. @Value("${spring.activemq.broker-url}")
    10. private String brokerUrl;
    11. @Bean
    12. public Queue queue(){
    13. return new ActiveMQQueue(queueName);
    14. }
    15. @Bean
    16. public Topic topic(){
    17. return new ActiveMQTopic(topicName);
    18. }
    19. @Bean
    20. public ActiveMQConnectionFactory connectionFactory() {
    21. return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    22. }
    23. @Bean
    24. public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
    25. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    26. bean.setConnectionFactory(connectionFactory);
    27. return bean;
    28. }
    29. @Bean
    30. public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
    31. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    32. //设置为发布订阅方式, 默认情况下使用的生产消费者方式
    33. bean.setPubSubDomain(true);
    34. bean.setConnectionFactory(connectionFactory);
    35. return bean;
    36. }

    }

这里本来不需要配置这么多的, 但是在consumer中也会用到, 所以就暂时弄一份一样的, 拷贝一下完事.

  1. PublishController

    @RestController
    @RequestMapping(“/publish”)
    public class PublishController {

    1. @Autowired
    2. private JmsMessagingTemplate jms;
    3. @Autowired
    4. private Queue queue;
    5. @Autowired
    6. private Topic topic;
    7. @RequestMapping("/queue")
    8. public String queue(){
    9. for (int i = 0; i < 10 ; i++){
    10. jms.convertAndSend(queue, "queue"+i);
    11. }
    12. return "queue 发送成功";
    13. }
    14. @JmsListener(destination = "out.queue")
    15. public void consumerMsg(String msg){
    16. System.out.println(msg);
    17. }
    18. @RequestMapping("/topic")
    19. public String topic(){
    20. for (int i = 0; i < 10 ; i++){
    21. jms.convertAndSend(topic, "topic"+i);
    22. }
    23. return "topic 发送成功";
    24. }

    }

三. consumer

  1. 目录结构

640690-20180224124510167-142513057.png

a,b是一样的, 只是显示的信息不同.

  1. 配置文件

yml配置文件是一样的, 只是修改了端口和context-path.

ActiveMQConfig文件内容是一样的.

  1. listener

    @Component
    public class QueueListener {

    1. @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
    2. @SendTo("out.queue")
    3. public String receive(String text){
    4. System.out.println("QueueListener: consumer-a 收到一条信息: " + text);
    5. return "consumer-a received : " + text;
    6. }

    }

SendTo 会将此方法返回的数据, 写入到 queue : out.queue 中去.

  1. @Component
  2. public class TopicListener {
  3. @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
  4. public void receive(String text){
  5. System.out.println("TopicListener: consumer-a 收到一条信息: " + text);
  6. }
  7. }

这里通过传入不同的factory, 来实现发送不同类型的信息.

四. 测试

  1. queue测试

浏览器中访问: http://localhost:8080/pro/publish/queue

640690-20180224125809579-1903850360.png

然后看一下, 控制台, 那些用户接收到了信息.

640690-20180224125935799-308871732.png

640690-20180224125946663-693631297.png

从上两幅图看的出来, a, b并不能同时接收数据. 这是queue的方式, 点对点.

640690-20180224130044871-1970557652.png

那我想点对面, 怎么办?

  1. topic测试

浏览器访问页面: http://localhost:8080/pro/publish/topic

640690-20180224130206283-783010989.png

640690-20180224130228958-2090908948.png

a用户完全接收到信息了. 再看看b用户

640690-20180224130251053-1183544804.png

没毛病, 也都接收到数据了.

topic默认情况下, 是不会保存数据的, 也就是说, consumer是接收不到之前未接收到的信息.

而queue却是可以的.

但是, topic并不是不能实现那个功能, 只要配置一下, 还是可以的.

推荐链接

消息中间件企业级应用

发表评论

表情:
评论列表 (有 0 条评论,258人围观)

还没有评论,来说两句吧...

相关阅读