RabbitMQ

冷不防 2022-05-31 07:39 86阅读 0赞

RabbitMQ

消息中间件














































名称 图标 简介 适用场景 优点 缺点
RabbitMQ 是AMQP协议领先的一个实现,他实现了代理架构,意味着在发送到客户端之前可以在中央节点上排队,此特性使得RabbitMQ易于使用和部署 路由、负载均衡、消息持久化 几行代码就可以搞定 扩展性差,速度较慢—中央节点增加了延迟,消息封装后也比较大
ZeroMQ 是一个非常轻量级的消息系统,专门为高吞吐量/低延迟的场景开发 金融界 与RabbitMQ相比,ZeroMQ支持许多高级的场景但是必须实现ZeroMQ的框架中的各个快(比如Socket和Device等);非常灵活 手册有80多页,如果手写一个分布式系统,一定得阅读它
ActiveMQ 居于两者之间 类似于RabbitMQ,可以部署于代理模式和p2p模式;类似于ZeroMQ,易于实现高级场景,而且只需付出低消耗 被视为消息中间件的“瑞士军刀” 因为吸收两者的优点,暂时略缺点
Apollo 下一代的ActiveMQ 越来越多领域喽 越来越好喽 这个暂时略

RabbitMQ简单学习—从京淘(电商网)项目入手

  • 引入

    • 当前京淘的架构性能提升点

      1. NGINX高并发
      2. Redis内存缓存数据库(非结构数据)
      3. amoeba提升数据最后关卡的性能
    • 超负荷的请求,以上三个技术无法处理

      • 当请求来到时,如果并发量太大,就让请求排成队列
    • 基于erlang语言
  • 消息队列分类

    • simple简单队列【先后顺序】
    • work工作模式【资源竞争】—红包
    • publish/subscribe发布订阅【共享资源】:引入交换机—邮件的群发、群聊天、广播
    • 路由模式:消息的生产者发送给交换机,通过路由判断key值发送到相应的队列—error通知
    • topic主题模式(路由模式的一种):通过表达式进行判断—*代表多个单词,#号代表一个单词
  • 注意:别名

    • publish:fanout
    • routing:direct
    • topic:topic

使用

  • 依赖

    #

    1. <dependency>
    2. <groupId>com.rabbitmq</groupId>
    3. <artifactId>amqp-client</artifactId>
    4. <version>3.5.1</version>
    5. </dependency>
  • 流程

    1. 创建连接工厂
    2. 从连接工厂获取connection
    3. 从连接获取channel
    4. 从channel获取绑定的queue
    5. 生产者生产消息放入队列
    6. 释放资源

RabbitMQ的工作原理

  • 单发送,单接收

    • 使用场景:简单的发送与接收,没有设么特别的处理
    • 0QPW6QC.png
    • 示例【生产者】

      #

      1. public class Send {
      2. private final static String QUEUE_NAME = "hello";//队列的名称
      3. public static void main(String[] argv) throws Exception {
      4. // 获取连接工厂
      5. ConnectionFactory factory = new ConnectionFactory();
      6. // 设置主机IP
      7. factory.setHost("localhost");
      8. // 获取连接
      9. Connection connection = factory.newConnection();
      10. // 创建通道
      11. Channel channel = connection.createChannel();
      12. // 通道找到队列
      13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      14. String message = "Hello World!";
      15. // 发送消息给队列
      16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
      17. System.out.println(" [x] Sent '" + message + "'");
      18. // 关闭连接
      19. channel.close();
      20. connection.close();
      21. }
      22. }
    • 示例【消费者】

      #

      1. public class Recv {
      2. // 队列名称
      3. private final static String QUEUE_NAME = "hello";
      4. public static void main(String[] argv) throws Exception {
      5. // 获得连接工厂
      6. ConnectionFactory factory = new ConnectionFactory();
      7. // 设置主机IP
      8. factory.setHost("localhost");
      9. // 获得连接
      10. Connection connection = factory.newConnection();
      11. // 创建通道
      12. Channel channel = connection.createChannel();
      13. // 通道连接队列
      14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      15. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      16. // 接收队列
      17. QueueingConsumer consumer = new QueueingConsumer(channel);
      18. // 执行
      19. channel.basicConsume(QUEUE_NAME, true, consumer);
      20. // 遍历队列消息
      21. while (true) {
      22. // 传送队列信息
      23. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      24. String message = new String(delivery.getBody());
      25. System.out.println(" [x] Received '" + message + "'");
      26. }
      27. }
      28. }
  • 单发送多接收

    • 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
    • vt95qL1.png
    • 示例【生产者】

      #

      1. public class NewTask {
      2. // 队列的名称
      3. private static final String TASK_QUEUE_NAME = "task_queue";
      4. public static void main(String[] argv) throws Exception {
      5. // 连接工厂
      6. ConnectionFactory factory = new ConnectionFactory();
      7. // 设置主机IP
      8. factory.setHost("localhost");
      9. // 获取连接
      10. Connection connection = factory.newConnection();
      11. // 创建通道
      12. Channel channel = connection.createChannel();
      13. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
      14. String message = getMessage(argv);
      15. // PERSISTENT_TEXT_PLAIN:消息的持久化
      16. channel.basicPublish("", TASK_QUEUE_NAME,
      17. MessageProperties.PERSISTENT_TEXT_PLAIN,
      18. message.getBytes());
      19. System.out.println(" [x] Sent '" + message + "'");
      20. channel.close();
      21. connection.close();
      22. }
      23. private static String getMessage(String[] strings){
      24. if (strings.length < 1)
      25. return "Hello World!";
      26. return joinStrings(strings, " ");
      27. }
      28. private static String joinStrings(String[] strings, String delimiter) {
      29. int length = strings.length;
      30. if (length == 0) return "";
      31. StringBuilder words = new StringBuilder(strings[0]);
      32. for (int i = 1; i < length; i++) {
      33. words.append(delimiter).append(strings[i]);
      34. }
      35. return words.toString();
      36. }
      37. }
    • 示例【消费者】

      #

      1. public class Worker {
      2. private static final String TASK_QUEUE_NAME = "task_queue";
      3. public static void main(String[] argv) throws Exception {
      4. ConnectionFactory factory = new ConnectionFactory();
      5. factory.setHost("localhost");
      6. Connection connection = factory.newConnection();
      7. Channel channel = connection.createChannel();
      8. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
      9. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      10. // 使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端
      11. channel.basicQos(1);
      12. QueueingConsumer consumer = new QueueingConsumer(channel);
      13. channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
      14. while (true) {
      15. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      16. String message = new String(delivery.getBody());
      17. System.out.println(" [x] Received '" + message + "'");
      18. doWork(message);
      19. System.out.println(" [x] Done");
      20. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      21. }
      22. }
      23. private static void doWork(String task) throws InterruptedException {
      24. for (char ch: task.toCharArray()) {
      25. if (ch == '.') Thread.sleep(1000);
      26. }
      27. }
      28. }
  • Publish/Subscribe

    • 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收
    • 1jRyhLo.png
    • 示例【生产者】

      #

      1. public class EmitLog {
      2. private static final String EXCHANGE_NAME = "logs";
      3. public static void main(String[] argv) throws Exception {
      4. ConnectionFactory factory = new ConnectionFactory();
      5. factory.setHost("localhost");
      6. Connection connection = factory.newConnection();
      7. Channel channel = connection.createChannel();
      8. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      9. String message = getMessage(argv);
      10. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
      11. System.out.println(" [x] Sent '" + message + "'");
      12. channel.close();
      13. connection.close();
      14. }
      15. private static String getMessage(String[] strings){
      16. if (strings.length < 1)
      17. return "info: Hello World!";
      18. return joinStrings(strings, " ");
      19. }
      20. private static String joinStrings(String[] strings, String delimiter) {
      21. int length = strings.length;
      22. if (length == 0) return "";
      23. StringBuilder words = new StringBuilder(strings[0]);
      24. for (int i = 1; i < length; i++) {
      25. words.append(delimiter).append(strings[i]);
      26. }
      27. return words.toString();
      28. }
      29. }
    • 示例【消费者】

      #

      1. public class ReceiveLogs {
      2. private static final String EXCHANGE_NAME = "logs";
      3. public static void main(String[] argv) throws Exception {
      4. ConnectionFactory factory = new ConnectionFactory();
      5. factory.setHost("localhost");
      6. Connection connection = factory.newConnection();
      7. Channel channel = connection.createChannel();
      8. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      9. String queueName = channel.queueDeclare().getQueue();
      10. channel.queueBind(queueName, EXCHANGE_NAME, "");
      11. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      12. QueueingConsumer consumer = new QueueingConsumer(channel);
      13. channel.basicConsume(queueName, true, consumer);
      14. while (true) {
      15. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      16. String message = new String(delivery.getBody());
      17. System.out.println(" [x] Received '" + message + "'");
      18. }
      19. }
      20. }
  • Routing (按路线发送接收)

    • 使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息
    • fzDXMDh.png
    • 示例【生产者】

      #

      1. public class EmitLogDirect {
      2. private static final String EXCHANGE_NAME = "direct_logs";
      3. public static void main(String[] argv) throws Exception {
      4. ConnectionFactory factory = new ConnectionFactory();
      5. factory.setHost("localhost");
      6. Connection connection = factory.newConnection();
      7. Channel channel = connection.createChannel();
      8. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
      9. String severity = getSeverity(argv);
      10. String message = getMessage(argv);
      11. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
      12. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
      13. channel.close();
      14. connection.close();
      15. }
      16. private static String getSeverity(String[] strings){
      17. if (strings.length < 1)
      18. return "info";
      19. return strings[0];
      20. }
      21. private static String getMessage(String[] strings){
      22. if (strings.length < 2)
      23. return "Hello World!";
      24. return joinStrings(strings, " ", 1);
      25. }
      26. private static String joinStrings(String[] strings, String delimiter, int startIndex) {
      27. int length = strings.length;
      28. if (length == 0 ) return "";
      29. if (length < startIndex ) return "";
      30. StringBuilder words = new StringBuilder(strings[startIndex]);
      31. for (int i = startIndex + 1; i < length; i++) {
      32. words.append(delimiter).append(strings[i]);
      33. }
      34. return words.toString();
      35. }
      36. }
    • 示例【消费者】

      #

      1. public class ReceiveLogsDirect {
      2. private static final String EXCHANGE_NAME = "direct_logs";
      3. public static void main(String[] argv) throws Exception {
      4. ConnectionFactory factory = new ConnectionFactory();
      5. factory.setHost("localhost");
      6. Connection connection = factory.newConnection();
      7. Channel channel = connection.createChannel();
      8. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
      9. String queueName = channel.queueDeclare().getQueue();
      10. if (argv.length < 1){
      11. System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      12. System.exit(1);
      13. }
      14. for(String severity : argv){
      15. channel.queueBind(queueName, EXCHANGE_NAME, severity);
      16. }
      17. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      18. QueueingConsumer consumer = new QueueingConsumer(channel);
      19. channel.basicConsume(queueName, true, consumer);
      20. while (true) {
      21. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      22. String message = new String(delivery.getBody());
      23. String routingKey = delivery.getEnvelope().getRoutingKey();
      24. System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
      25. }
      26. }
      27. }
  • Topics (按topic发送接收)

    • 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
    • aUuVgIP.png
    • 示例【生产者】

      #

      1. public class EmitLogTopic {
      2. private static final String EXCHANGE_NAME = "topic_logs";
      3. public static void main(String[] argv) {
      4. Connection connection = null;
      5. Channel channel = null;
      6. try {
      7. ConnectionFactory factory = new ConnectionFactory();
      8. factory.setHost("localhost");
      9. connection = factory.newConnection();
      10. channel = connection.createChannel();
      11. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      12. String routingKey = getRouting(argv);
      13. String message = getMessage(argv);
      14. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
      15. System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
      16. }
      17. catch (Exception e) {
      18. e.printStackTrace();
      19. }
      20. finally {
      21. if (connection != null) {
      22. try {
      23. connection.close();
      24. }
      25. catch (Exception ignore) {}
      26. }
      27. }
      28. }
      29. private static String getRouting(String[] strings){
      30. if (strings.length < 1)
      31. return "anonymous.info";
      32. return strings[0];
      33. }
      34. private static String getMessage(String[] strings){
      35. if (strings.length < 2)
      36. return "Hello World!";
      37. return joinStrings(strings, " ", 1);
      38. }
      39. private static String joinStrings(String[] strings, String delimiter, int startIndex) {
      40. int length = strings.length;
      41. if (length == 0 ) return "";
      42. if (length < startIndex ) return "";
      43. StringBuilder words = new StringBuilder(strings[startIndex]);
      44. for (int i = startIndex + 1; i < length; i++) {
      45. words.append(delimiter).append(strings[i]);
      46. }
      47. return words.toString();
      48. }
      49. }
    • 示例【消费者】

      #

      1. public class ReceiveLogsTopic {
      2. private static final String EXCHANGE_NAME = "topic_logs";
      3. public static void main(String[] argv) {
      4. Connection connection = null;
      5. Channel channel = null;
      6. try {
      7. ConnectionFactory factory = new ConnectionFactory();
      8. factory.setHost("localhost");
      9. connection = factory.newConnection();
      10. channel = connection.createChannel();
      11. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      12. String queueName = channel.queueDeclare().getQueue();
      13. if (argv.length < 1){
      14. System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      15. System.exit(1);
      16. }
      17. for(String bindingKey : argv){
      18. channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
      19. }
      20. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      21. QueueingConsumer consumer = new QueueingConsumer(channel);
      22. channel.basicConsume(queueName, true, consumer);
      23. while (true) {
      24. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      25. String message = new String(delivery.getBody());
      26. String routingKey = delivery.getEnvelope().getRoutingKey();
      27. System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
      28. }
      29. }
      30. catch (Exception e) {
      31. e.printStackTrace();
      32. }
      33. finally {
      34. if (connection != null) {
      35. try {
      36. connection.close();
      37. }
      38. catch (Exception ignore) {}
      39. }
      40. }
      41. }
      42. }

秒杀

业务场景分析

  • 并发量很高的时间段—抢商品
  • 队列中的消息可以是什么

    1. 电话号码
    2. username
    3. ticket
    4. ……
  • 做法

    • 调用SSO查询用户信息,把前n个消息获取到,后面的放入rabbitmq的垃圾桶
    • 更高的并发可以考虑分布式的队列
  • 文件位置

    • 生产者:后台
    • 消费者:前台
  • 秒杀设计

    • KoWfMTK.png
  • 未完待续。。。

注:参考文章

  • RabbitMQ的几种典型使用场景:https://www.rabbitmq.com/getstarted.html

发表评论

表情:
评论列表 (有 0 条评论,86人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ(一):初始RabbitMQ

    1. 消息队列的作用 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。 2. Rabb