RabbitMQ:基于分布式消息队列 RabbitMQ 实现延迟队列

不念不忘少年蓝@ 2022-05-18 10:23 463阅读 0赞

RabbitMQ:基于分布式消息队列 RabbitMQ 实现延迟队列

【参考文献】:基于 rabbitmq 实现延迟队列

流程图:

20170209171946867

【延迟队列】:

  1. package com.caox.rabbitmq.demo._16_rabbitmq_distributed_delay_retry_queue_ttl;
  2. import com.google.common.base.Preconditions;
  3. import com.google.common.collect.Maps;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.Consumer;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import java.io.IOException;
  12. import java.util.HashMap;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. * Created by nazi on 2018/8/2.
  16. * 基于 RabbitMQ 实现的分布式延迟重试队列
  17. */
  18. @Slf4j
  19. public class RabbitMQDelayQueue {
  20. private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);
  21. private static final String POSTFIX_TASK = "_task";
  22. // direct类型 交换器
  23. public static final String EXCHANGE_TYPE_DIRECT = "direct";
  24. private Connection connection;
  25. //注册消费者
  26. private ConsumerRegister consumerRegister;
  27. //任务队列配置
  28. private String taskExchangeName;
  29. private String taskQueueName;
  30. private String taskRoutingKeyName;
  31. //延迟队列配置
  32. private String delayExchangeName;
  33. private String delayQueueName;
  34. private String delayRoutingKeyName;
  35. //延迟队列中的消息ttl
  36. private long perDelayQueueMessageTTL;
  37. public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName,
  38. String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws Exception {
  39. this.connection = connection;
  40. this.consumerRegister = consumerRegister;
  41. this.delayExchangeName = delayExchangeName;
  42. this.delayQueueName = delayQueueName;
  43. this.delayRoutingKeyName = delayRoutingKeyName;
  44. this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
  45. this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
  46. this.taskQueueName = delayQueueName + POSTFIX_TASK;
  47. this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
  48. init();
  49. registerConsumer();
  50. }
  51. /**
  52. * @Description 注册消费者
  53. */
  54. public interface ConsumerRegister {
  55. public Consumer register(Channel channel) throws IOException;
  56. }
  57. /**
  58. * 注册带有ttl的queue和对应的任务队列
  59. * @throws IOException
  60. */
  61. private void init() throws Exception {
  62. Channel channel = connection.createChannel();
  63. channel.exchangeDeclare(taskExchangeName, EXCHANGE_TYPE_DIRECT, true);
  64. channel.exchangeDeclare(delayExchangeName, EXCHANGE_TYPE_DIRECT, true);
  65. // 任务队列 B
  66. HashMap<String, Object> argumentsTask = Maps.newHashMap();
  67. argumentsTask.put("x-dead-letter-exchange", delayExchangeName);
  68. argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName);
  69. channel.queueDeclare(taskQueueName, true, false, false, argumentsTask);
  70. channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName);
  71. // 延迟队列 A
  72. HashMap<String, Object> argumentsDelay = Maps.newHashMap();
  73. argumentsDelay.put("x-dead-letter-exchange", taskExchangeName);
  74. argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName);
  75. argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL);
  76. channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay);
  77. channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName);
  78. channel.close();
  79. }
  80. /**
  81. * 注册消费者
  82. * @throws IOException
  83. * @author roc
  84. */
  85. private void registerConsumer() throws IOException {
  86. LOGGER.info("register consumer ->{}", this);
  87. Channel channel = connection.createChannel();
  88. Consumer consumer = consumerRegister.register(channel);
  89. channel.basicConsume(taskQueueName, false, consumer);
  90. LOGGER.info("register consumer ->{} success", this);
  91. }
  92. /**
  93. * 消息入队
  94. *
  95. * @param body 消息内容
  96. * @param timeout 超时时间
  97. * @param unit 超时时间单位
  98. * @throws IOException
  99. * @author roc
  100. */
  101. public void put(byte[] body, long timeout, TimeUnit unit) throws Exception {
  102. Preconditions.checkNotNull(body);
  103. Preconditions.checkArgument(timeout >= 0);
  104. Preconditions.checkNotNull(unit);
  105. LOGGER.info("put element to delay queue ->{}", body.hashCode());
  106. Channel channel = null;
  107. try {
  108. channel = connection.createChannel();
  109. // deliveryMode=2 标识任务的持久性
  110. long millis = unit.toMillis(timeout);
  111. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build();
  112. channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body);
  113. LOGGER.info("put element to delay queue success");
  114. } finally {
  115. if (null != channel){
  116. channel.close();
  117. }
  118. }
  119. }
  120. public static class Builder {
  121. private Connection connection;
  122. private ConsumerRegister consumerRegister;
  123. private String delayExchangeName;
  124. private String delayQueueName;
  125. private String delayRoutingKeyName;
  126. private long perDelayQueueMessageTTL;
  127. public Builder setConnection(Connection connection) {
  128. this.connection = connection;
  129. return this;
  130. }
  131. public Builder setDelayExchangeName(String delayExchangeName) {
  132. this.delayExchangeName = delayExchangeName;
  133. return this;
  134. }
  135. public Builder setDelayQueueName(String delayQueueName) {
  136. this.delayQueueName = delayQueueName;
  137. return this;
  138. }
  139. public Builder setDelayRoutingKeyName(String delayRoutingKeyName) {
  140. this.delayRoutingKeyName = delayRoutingKeyName;
  141. return this;
  142. }
  143. public Builder setConsumerRegister(ConsumerRegister consumerRegister) {
  144. this.consumerRegister = consumerRegister;
  145. return this;
  146. }
  147. public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) {
  148. this.perDelayQueueMessageTTL = unit.toMillis(timeout);;
  149. return this;
  150. }
  151. public RabbitMQDelayQueue build() throws Exception {
  152. Preconditions.checkNotNull(connection);
  153. Preconditions.checkNotNull(delayExchangeName);
  154. Preconditions.checkNotNull(delayQueueName);
  155. Preconditions.checkNotNull(delayRoutingKeyName);
  156. Preconditions.checkNotNull(consumerRegister);
  157. return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName,
  158. delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL);
  159. }
  160. }
  161. }

【测试】:

  1. package com.caox.rabbitmq.demo._16_rabbitmq_distributed_delay_retry_queue_ttl;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.Charset;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.concurrent.TimeUnit;
  8. /**
  9. * Created by nazi on 2018/8/2.
  10. */
  11. public class RabbitMQDelayQueueTest {
  12. public static void main(String[] args) throws Exception {
  13. delayQueue();
  14. }
  15. public static void delayQueue() throws Exception {
  16. ConnectionFactory factory = new ConnectionFactory();
  17. factory.setUsername("caoxia");
  18. factory.setPassword("caoxia123456");
  19. Address address = new Address("127.0.0.1", 5672);
  20. Connection connection = factory.newConnection(new Address[] { address });
  21. RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder()
  22. .setConnection(connection)
  23. .setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS)
  24. .setDelayExchangeName("delay_exchange_roc")
  25. .setDelayQueueName("delay_queue_roc")
  26. .setDelayRoutingKeyName("delay_routing_key_roc")
  27. .setConsumerRegister
  28. (new RabbitMQDelayQueue.ConsumerRegister() {
  29. // @Override
  30. public Consumer register(Channel channel) throws IOException {
  31. return new DefaultConsumer(channel) {
  32. // @Override
  33. public void handleDelivery(String consumerTag, Envelope envelope,
  34. BasicProperties properties, byte[] body) throws IOException {
  35. long deliveryTag = envelope.getDeliveryTag();
  36. System.out.println(deliveryTag);
  37. String exchange = envelope.getExchange();
  38. String routingKey = envelope.getRoutingKey();
  39. // TODO do something
  40. String content = new String(body, Charset.forName("utf-8"));
  41. System.out.println("receive message --- > " + content);
  42. Map<String, Object> headers = properties.getHeaders();
  43. if (headers != null) {
  44. List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
  45. System.out.println("xDeath--- > " + xDeath);
  46. if (xDeath != null && !xDeath.isEmpty()) {
  47. Map<String, Object> entrys = xDeath.get(0);
  48. }
  49. }
  50. // 消息拒收
  51. // if(do something) 消息重新入队
  52. // getChannel().basicReject(deliveryTag, false);
  53. // else 消息应答
  54. getChannel().basicAck(deliveryTag, false);
  55. }
  56. };
  57. }
  58. }).build();
  59. delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);
  60. }
  61. }

发表评论

表情:
评论列表 (有 0 条评论,463人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分布式消息队列RabbitMQ

    这篇文章简单讲述下分布式消息队列的基础知识,不会太深入,因为类似MQ这样的分布式组件有很多不同的种类,都有各自的特征和其对应的应用场景,需要在实际应用中才能更加深入的理解。