SpringBoot整合ActiveMq

今天药忘吃喽~ 2022-05-30 10:25 459阅读 0赞

1、先下载activemq安装

从ActiveMq官方上下载ActiveMq服务
下载地址:http://activemq.apache.org/download.html

我当前下载的是版本是5.15.3 官方备注:当前最新的稳定版本。

下载下来解压后进到window相对应的版本的bin目录下执行activemq.bat

2、创建SpringBoot项目

pom.xml

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>1.5.8.RELEASE</version>
  5. <relativePath /> <!-- lookup parent from repository -->
  6. </parent>
  7. <properties>
  8. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  9. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  10. <java.version>1.8</java.version>
  11. </properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-web</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-activemq</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.activemq</groupId>
  27. <artifactId>activemq-pool</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  35. <dependency>
  36. <groupId>com.alibaba</groupId>
  37. <artifactId>fastjson</artifactId>
  38. <version>1.2.40</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-devtools</artifactId>
  43. <optional>true</optional>
  44. <!-- optional=true,依赖不会传递,该项目依赖devtools;之后依赖myboot项目的项目如果想要使用devtools,需要重新引入 -->
  45. </dependency>
  46. </dependencies>
  47. <build>
  48. <plugins>
  49. <plugin>
  50. <groupId>org.springframework.boot</groupId>
  51. <artifactId>spring-boot-maven-plugin</artifactId>
  52. </plugin>
  53. </plugins>
  54. </build>

生产者:

  1. @Service("userProducer")
  2. public class UserProducer {
  3. @Resource
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. @Resource
  6. private Queue userQueue;
  7. @Resource
  8. private Queue queue;
  9. @Resource
  10. private Topic topic;
  11. public void sendData(Serializable obj) {
  12. this.jmsMessagingTemplate.convertAndSend(userQueue,obj);
  13. }
  14. public void sendMessage(String message) {
  15. this.jmsMessagingTemplate.convertAndSend(queue,message);
  16. }
  17. public void sendTopicMessage(String message) {
  18. this.jmsMessagingTemplate.convertAndSend(topic,message);
  19. }
  20. }

activemq配置

  1. @Configuration
  2. public class ActiveMqConfig {
  3. @Bean
  4. public Queue userQueue() {
  5. return new ActiveMQQueue("userMqQueue");
  6. }
  7. @Bean
  8. public ActiveMQQueue queue() {
  9. return new ActiveMQQueue("weisg.queue");
  10. }
  11. @Bean
  12. public Topic topic() {
  13. return new ActiveMQTopic("weisg.topic");
  14. }
  15. // topic模式的ListenerContainer
  16. @Bean
  17. public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
  18. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  19. bean.setPubSubDomain(true);
  20. bean.setConnectionFactory(activeMQConnectionFactory);
  21. return bean;
  22. }
  23. // queue模式的ListenerContainer
  24. @Bean
  25. public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
  26. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  27. bean.setConnectionFactory(activeMQConnectionFactory);
  28. return bean;
  29. }
  30. }

以上的配置为 支持同时发送和接收queue/topic

消费者

  1. @Service
  2. public class UserConsumer {
  3. @JmsListener(destination = "userMqQueue",containerFactory = "jmsListenerContainerQueue")
  4. public void receive(ObjectMessage message) throws JMSException {
  5. System.out.println("----------UserConsumer-----------"+message);
  6. System.out.println("----------message-----------"+message.getObject());
  7. User user = new User();
  8. BeanUtils.copyProperties(message.getObject(), user);
  9. System.out.println("user--------------"+user);
  10. System.out.println("user数据数据完成...");
  11. }
  12. @JmsListener(destination = "weisg.topic",containerFactory = "jmsListenerContainerTopic")
  13. public void receiveTopicMessage(String message) throws JMSException {
  14. System.out.println("receiveTopicMessage--------------"+message);
  15. }
  16. @JmsListener(destination = "weisg.topic",containerFactory = "jmsListenerContainerTopic")
  17. public void receiveTopicMessage2(String message) throws JMSException {
  18. System.out.println("receiveTopicMessage2--------------"+message);
  19. }
  20. }

请求方法

  1. @RequestMapping(value="/api/addUser",method = RequestMethod.POST)
  2. public Map<String, Object> beetl(@RequestParam Map<String, Object> params){
  3. Map<String, Object> retMap = new HashMap<String, Object>();
  4. User user = new User();
  5. user.setUserId((String)params.get("userId"));
  6. user.setUserName((String)params.get("userName"));
  7. user.setMobile((String)params.get("mobile"));
  8. userProducer.sendData((Serializable)user);
  9. userProducer.sendMessage(user.getUserId());
  10. for (int i = 0; i < 10; i++) {
  11. userProducer.sendTopicMessage((String)params.get("userId")+i);
  12. }
  13. retMap.put("code", "200");
  14. retMap.put("msg", "success!");
  15. return retMap;
  16. }

配置文件:

  1. spring.activemq.broker-url=tcp://127.0.0.1:61616
  2. #如果此处设置为true,需要加activemq-pool的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
  3. spring.activemq.pool.enabled=true
  4. spring.activemq.user=admin
  5. # 密码
  6. spring.activemq.password=admin
  7. # 在考虑结束之前等待的时间
  8. spring.activemq.close-timeout=150
  9. # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
  10. spring.activemq.in-memory=false
  11. # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
  12. spring.activemq.non-blocking-redelivery=false
  13. # 等待消息发送响应的时间。设置为0等待永远。
  14. spring.activemq.send-timeout=0
  15. #发布模式,为true时是topic模式,为false是queue模式
  16. spring.jms.pub-sub-domain=true
  17. # 是否信任所有包
  18. #注意:对象传输需要开启包白名单,否则会报错
  19. spring.activemq.packages.trust-all=true
  20. # 要信任的特定包的逗号分隔列表(当不信任所有包时)
  21. #spring.activemq.packages.trusted=
  22. # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
  23. #spring.activemq.pool.block-if-full=true
  24. # 如果池仍然满,则在抛出异常前阻塞时间。
  25. #spring.activemq.pool.block-if-full-timeout=-1ms
  26. # 是否在启动时创建连接。可以在启动时用于加热池。
  27. #spring.activemq.pool.create-connection-on-startup=true
  28. # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
  29. #spring.activemq.pool.enabled=false
  30. # 连接过期超时。
  31. #spring.activemq.pool.expiry-timeout=0ms
  32. #空闲的连接过期时间,默认为30秒
  33. spring.activemq.pool.idle-timeout=30
  34. # 连接池最大连接数
  35. spring.activemq.pool.max-connections=10
  36. # 每个连接的有效会话的最大数目。
  37. #spring.activemq.pool.maximum-active-session-per-connection=500
  38. # 当有"JMSException"时尝试重新连接
  39. #spring.activemq.pool.reconnect-on-exception=true
  40. # 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
  41. #spring.activemq.pool.time-between-expiration-check=-1ms
  42. # 是否只使用一个MessageProducer
  43. #spring.activemq.pool.use-anonymous-producers=true

结果:如下

  1. 2018-03-10 11:51:25.582 [http-nio-8889-exec-4] INFO org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms
  2. ----------UserConsumer-----------ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:admin-PC-51016-1520653879572-1:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:admin-PC-51016-1520653879572-1:4:1:1, destination = queue://userMqQueue, transactionId = null, expiration = 0, timestamp = 1520653885674, arrival = 0, brokerInTime = 1520653885675, brokerOutTime = 1520653885677, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6df8e57e, marshalledProperties = org.apache.activemq.util.ByteSequence@4306219a, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1520653885637}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
  3. ----------message-----------User [userId=zhangsan001, userName=张三, userNo=null, mobile=15819295938]
  4. user--------------User [userId=zhangsan001, userName=张三, userNo=null, mobile=15819295938]
  5. user数据数据完成...
  6. receiveTopicMessage--------------zhangsan0010
  7. receiveTopicMessage2--------------zhangsan0010
  8. receiveTopicMessage2--------------zhangsan0011
  9. receiveTopicMessage--------------zhangsan0011
  10. receiveTopicMessage--------------zhangsan0012
  11. receiveTopicMessage2--------------zhangsan0012
  12. receiveTopicMessage--------------zhangsan0013
  13. receiveTopicMessage2--------------zhangsan0013
  14. receiveTopicMessage--------------zhangsan0014
  15. receiveTopicMessage--------------zhangsan0015
  16. receiveTopicMessage2--------------zhangsan0014
  17. receiveTopicMessage2--------------zhangsan0015
  18. receiveTopicMessage--------------zhangsan0016
  19. receiveTopicMessage--------------zhangsan0017
  20. receiveTopicMessage2--------------zhangsan0016
  21. receiveTopicMessage2--------------zhangsan0017
  22. receiveTopicMessage--------------zhangsan0018
  23. receiveTopicMessage2--------------zhangsan0018
  24. receiveTopicMessage--------------zhangsan0019
  25. receiveTopicMessage2--------------zhangsan0019

发表评论

表情:
评论列表 (有 0 条评论,459人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合ActiveMQ

    前言   ActiveMQ 一个成熟的消息中间件,作用于系统之间的通信,降低模块与模块之间的耦合度。   消息的传递有两种类型: 1. Queue 队列模式:一个生