RabbitMQ--高级特性

朴灿烈づ我的快乐病毒、 2023-07-11 12:50 90阅读 0赞

生产端可靠性投递

  • 保证消息成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善消息进行补偿机制

大厂处理方案

  • 消息落库,对消息状态打标
  • 消息延迟投递,做二次确认,回调检查

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM1NDE4NTE4_size_16_color_FFFFFF_t_70

冥等性概念

就是类似原子性,高并发下,不出现重复消费。

方案:乐观锁、唯一id+指纹锁机制,利用数据库去重、redis原子性

一般在消费端处理

Confirm 确认消息

  • 消息投递后,如果Broker收到消息,则会给我们一个生产者应答。
  • channel.confirmSelect(); 开启确认消息
  • channel.addConfirmListener(new ConfirmListener() {}) 添加消息接收对象

(producer)

  1. public static void main(String[] args) throws Exception {
  2. //1 创建ConnectionFactory
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("192.168.11.76");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. //2 获取C onnection
  8. Connection connection = connectionFactory.newConnection();
  9. //3 通过Connection创建一个新的Channel
  10. Channel channel = connection.createChannel();
  11. //4 指定我们的消息投递模式: 消息的确认模式
  12. channel.confirmSelect();
  13. String exchangeName = "test_confirm_exchange";
  14. String routingKey = "confirm.save";
  15. //5 发送一条消息
  16. String msg = "Hello RabbitMQ Send confirm message!";
  17. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  18. //6 添加一个确认监听
  19. channel.addConfirmListener(new ConfirmListener() {
  20. @Override
  21. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  22. System.err.println("-------no ack!--返回失败---------");
  23. }
  24. @Override
  25. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  26. System.err.println("-------ack!---成功返回--------");
  27. }
  28. });
  29. }

Return 消息机制

  • Return Listener 用于处理一下不可路由的消息
  • 如果发送的消息指定的 Exchange或者路由Key找不到,这种不可达的消息,需要Return Listener
  • Mandatory :如果设置成True才会接收不可达消息,false的话,那么broker会自动删除该消息!

(producer)

  1. public static void main(String[] args) throws Exception {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.11.76");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/");
  6. Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel();
  8. String exchange = "test_return_exchange";
  9. String routingKey = "return.save";
  10. String msg = "Hello RabbitMQ Return Message";
  11. channel.addReturnListener(new ReturnListener() {
  12. @Override
  13. public void handleReturn(int replyCode, String replyText, String exchange,
  14. String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. System.err.println("---------handle return----------");
  16. System.err.println("replyCode: " + replyCode);
  17. System.err.println("replyText: " + replyText);
  18. System.err.println("exchange: " + exchange);
  19. System.err.println("routingKey: " + routingKey);
  20. System.err.println("properties: " + properties);
  21. System.err.println("body: " + new String(body));
  22. }
  23. });
  24. //这里的 true 就是 Mandatory
  25. channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  26. }

自定义消费监听

消息接收类

  1. public class MyConsumer extends DefaultConsumer {
  2. public MyConsumer(Channel channel) {
  3. super(channel);
  4. }
  5. @Override
  6. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  7. System.err.println("-----------consume message----------");
  8. System.err.println("consumerTag: " + consumerTag);
  9. System.err.println("envelope: " + envelope);
  10. System.err.println("properties: " + properties);
  11. System.err.println("body: " + new String(body));
  12. }
  13. }

添加绑定(consumer)

  1. public static void main(String[] args) throws Exception {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.11.76");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/");
  6. Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel();
  8. String exchangeName = "test_consumer_exchange";
  9. String routingKey = "consumer.#";
  10. String queueName = "test_consumer_queue";
  11. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  12. channel.queueDeclare(queueName, true, false, false, null);
  13. channel.queueBind(queueName, exchangeName, routingKey);
  14. channel.basicConsume(queueName, true, new MyConsumer(channel));
  15. }

消费限流

  • prefetchSize:0
  • prefetchCount : 消费者接收消息上限,一旦有N个消息还没ack(处理完成),该consumer将block(阻塞)掉,直到有ack
  • global : 是否将上面设置应用于channel。就是上面限制是channel级别还是consumer级别
  • prefetchSize 和 global这两个,rabbitmq没实现。prefetchCount 在 no_ask=false情况下生效,自动应答的情况下,这两个值不生效的。

    public class MyConsumer extends DefaultConsumer {

    1. private Channel channel ;
    2. public MyConsumer(Channel channel) {
    3. super(channel);
    4. this.channel = channel;
    5. }
    6. @Override
    7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    8. System.err.println("-----------consume message----------");
    9. System.err.println("consumerTag: " + consumerTag);
    10. System.err.println("envelope: " + envelope);
    11. System.err.println("properties: " + properties);
    12. System.err.println("body: " + new String(body));
    13. //手动ack ,false不支持多条签收
    14. channel.basicAck(envelope.getDeliveryTag(), false);
    15. }

    }

添加绑定(consumer)

  1. public static void main(String[] args) throws Exception {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.11.76");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/");
  6. Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel();
  8. String exchangeName = "test_qos_exchange";
  9. String queueName = "test_qos_queue";
  10. String routingKey = "qos.#";
  11. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  12. channel.queueDeclare(queueName, true, false, false, null);
  13. channel.queueBind(queueName, exchangeName, routingKey);
  14. //1 限流方式 第一件事就是 autoAck设置为 false
  15. //一次只接受1条消息
  16. channel.basicQos(0, 1, false);
  17. //关闭自动签收
  18. channel.basicConsume(queueName, false, new MyConsumer(channel));
  19. }

消费端ACK与重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息传递给Broker!
  • 实际应用中,一般都会关闭重回队列。设置成false

    添加绑定(consumer)

    // 手工签收 必须要关闭 autoAck = false

    1. channel.basicConsume(queueName, false, new MyConsumer(channel));

    public class MyConsumer extends DefaultConsumer {

    1. private Channel channel ;
    2. public MyConsumer(Channel channel) {
    3. super(channel);
    4. this.channel = channel;
    5. }
    6. @Override
    7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    8. System.err.println("-----------consume message----------");
    9. System.err.println("body: " + new String(body));
    10. try {
    11. Thread.sleep(2000);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. if((Integer)properties.getHeaders().get("num") == 0) {
    16. //第一个false 是否支持多条签收,第二个true是否重回队列
    17. channel.basicNack(envelope.getDeliveryTag(), false, true);
    18. } else {
    19. channel.basicAck(envelope.getDeliveryTag(), false);
    20. }
    21. }

    }

TTL队列/消息

  • 就是队列/消息 的生命周期
  • 设置最大数量和过期时间

Arguments 设置queue

  • x-max-length 3000 (最大长度3000)
  • x-message-ttl 1000 (过期时间10秒)

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM1NDE4NTE4_size_16_color_FFFFFF_t_70 1

(producer)

  1. public static void main(String[] args) throws Exception {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.11.76");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/");
  6. Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel();
  8. String exchange = "test_dlx_exchange";
  9. String routingKey = "dlx.save";
  10. String msg = "Hello RabbitMQ DLX Message";
  11. for(int i =0; i<1; i ++){
  12. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  13. .deliveryMode(2)
  14. .contentEncoding("UTF-8")
  15. .expiration("10000")//过期时间
  16. .build();
  17. channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
  18. }
  19. }

死信队列

  • DLX,Dead-Letter-Exchange
  • 当消息在一个队列中变成死信之后,他会被重新publish到另一个Exchange,这个Exchange就是DLX
  • 变成死信队列的几种情况

    • 消息被拒绝(basic.reject/basic.nack) 并且requeue=false (手动签收)
    • TTL过期
    • 队列达到最大长度
  • 设置死信队列

    • 就是绑定一个死信队列
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
    • arguments.put(“x-dead-letter-exchange”,”dlx.exchange”) 其实上面名称都可以随便取,只要这里绑定对应交换机就行。

(consumer)

  1. public static void main(String[] args) throws Exception {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("192.168.11.76");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/");
  6. Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel();
  8. // 这就是一个普通的交换机 和 队列 以及路由
  9. String exchangeName = "test_dlx_exchange";
  10. String routingKey = "dlx.#";
  11. String queueName = "test_dlx_queue";
  12. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  13. Map<String, Object> agruments = new HashMap<String, Object>();
  14. agruments.put("x-dead-letter-exchange", "dlx.exchange");
  15. //这个agruments属性,要设置到声明队列上
  16. channel.queueDeclare(queueName, true, false, false, agruments);
  17. channel.queueBind(queueName, exchangeName, routingKey);
  18. //要进行死信队列的声明:
  19. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  20. channel.queueDeclare("dlx.queue", true, false, false, null);
  21. channel.queueBind("dlx.queue", "dlx.exchange", "#");
  22. channel.basicConsume(queueName, true, new MyConsumer(channel));
  23. }

发表评论

表情:
评论列表 (有 0 条评论,90人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ高级特性

    目录 RabbitMQ高级特性 1、消息的可靠投递 2、Consumer Ack 3、消费端限流 4、TTL(存活时间/过期时间) 5、死信队列 6、延迟队列

    相关 RabbitMQ高级特性之消息可靠性

    消息可靠性 如何确保消息的可靠性? 支付场景中,我们在超时后买商品之后,扫码枪将二维码扫描之后,会给我们推送一条消息,说购买成功,但此时银行还未进行扣款,过来一段时间

    相关 RabbitMQ高级特性

    目录 RabbitMQ高级特性 消息如何保障100%的投递成功? 幂等性概念详解 在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消