《Rabbit MQ 实战》读书笔记 (二:事务与消息确认模式)

傷城~ 2022-01-05 04:47 337阅读 0赞

AMQP协议的一个亮点就是对消息的可靠性投递-事务。
事务在AMQP-0-9-1中正式成为规范的一部分。虽然AMQP事务保证了在信道开启了事务模式后,全部命令的执行成功。但AMQP事务大大降低了Rabbit的吞吐量,性能极低。同时使用AMQP事务使得生产者与应用程序之间产生同步。

为了解决这个可靠性投递和性能的兼容性问题。RabbitMQ提供了发送方确认模式。

在客户端与Rabbit服务端的链接信道中,设置其属性为confirm模式。一旦信道开启了这个模式变不能再取消,只能通过重新创建来取消。

信道在confirm模式中,所有发布的消息都会被指派一个唯一的ID号(从1开始)。一旦消息投递成功,投递到所匹配的队列后,信道会发送一个包含消息唯一id的消息给生产者应用程序。如果消息和队列是可持久化的,那么确认消息会在队列将消息写入磁盘后发出。

与AMQP自身事务相比,发送方确认模式的是异步,在发送了消息等待确认的同时可以继续处理下一条消息。接受到确认消息时,生产者的回调函数处理。

如果Rabbit发生了内部错误导致了消息的丢失,Rabbit会发送一条nack(未确认)消息如同发送确认消息一样,说明消息已经丢失。

分别用java和Go两种编程语言来体验RabbitMQ的消息确认模式:

生产者:

  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.setUsername("admin");
  3. factory.setPassword("admin");
  4. factory.setVirtualHost("/");
  5. factory.setHost("127.0.0.1");
  6. factory.setPort(32769);
  7. Connection conn = null;
  8. try {
  9. conn = factory.newConnection();
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. } catch (TimeoutException e) {
  13. e.printStackTrace();
  14. }
  15. try {
  16. Channel channel = conn.createChannel();
  17. channel.exchangeDeclare("exchangeWx", "direct", true);
  18. channel.queueBind("Queuewx", "exchangeWx","Queuewx");
  19. //开启发送方确认模式
  20. AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
  21. channel.addConfirmListener(new ConfirmListener(){
  22. @Override
  23. public void handleAck(long l, boolean b) throws IOException {
  24. //参数2对boolean为是否该信道批量发送的消息
  25. System.out.println("消息成功送达,id:"+l);
  26. }
  27. @Override
  28. public void handleNack(long l, boolean b) throws IOException {
  29. //参数2对boolean为是否该信道批量发送的消息
  30. System.out.println("消息丢失,id:"+l);
  31. }
  32. });
  33. //参数三指定了deliveryMode 为2,即对消息持久化
  34. channel.basicPublish("exchangeWx", "Queuewx", MessageProperties.PERSISTENT_TEXT_PLAIN, "messageBodyBytes".getBytes());
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }

消费者:

  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.setUsername("admin");
  3. factory.setPassword("admin");
  4. factory.setVirtualHost("/");
  5. factory.setHost("127.0.0.1");
  6. factory.setPort(32769);
  7. Connection conn = null;
  8. try {
  9. conn = factory.newConnection();
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. } catch (TimeoutException e) {
  13. e.printStackTrace();
  14. }
  15. try {
  16. Channel channel = conn.createChannel();
  17. channel.exchangeDeclare("exchangeWx", "direct", true);
  18. channel.queueBind("Queuewx", "exchangeWx","Queuewx");
  19. //参数2 boolean 为autoAck,是否自动确认消息已送达
  20. channel.basicConsume("Queuewx", false, "myConsumerTag", new DefaultConsumer(channel){
  21. @Override
  22. public void handleDelivery(String consumerTag,
  23. Envelope envelope,
  24. AMQP.BasicProperties properties,
  25. byte[] body)
  26. throws IOException
  27. {
  28. String routingKey = envelope.getRoutingKey();
  29. String contentType = properties.getContentType();
  30. long messageId = envelope.getDeliveryTag();
  31. // (process the message components here ...)
  32. System.out.println("收到routingKey:"+routingKey+"的消息"+new String(body,"UTF-8"));
  33. //basicConsume 的 autoAck 自动回复参数为false,故必须手动确认消息
  34. channel.basicAck(messageId, false);
  35. }
  36. });
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }

如果消费者没有手动处理消息已接收,则消息会一直保存在队列中。

发表评论

表情:
评论列表 (有 0 条评论,337人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Rabbit MQ篇:Rabbit MQ 介绍

    `RabbitMQ`作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、