RabbitMQ--高级特性
生产端可靠性投递
- 保证消息成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)确认应答
- 完善消息进行补偿机制
大厂处理方案
- 消息落库,对消息状态打标
- 消息延迟投递,做二次确认,回调检查
冥等性概念
就是类似原子性,高并发下,不出现重复消费。
方案:乐观锁、唯一id+指纹锁机制,利用数据库去重、redis原子性
一般在消费端处理
Confirm 确认消息
- 消息投递后,如果Broker收到消息,则会给我们一个生产者应答。
- channel.confirmSelect(); 开启确认消息
- channel.addConfirmListener(new ConfirmListener() {}) 添加消息接收对象
(producer)
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!--返回失败---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!---成功返回--------");
}
});
}
Return 消息机制
- Return Listener 用于处理一下不可路由的消息
- 如果发送的消息指定的 Exchange或者路由Key找不到,这种不可达的消息,需要Return Listener
- Mandatory :如果设置成True才会接收不可达消息,false的话,那么broker会自动删除该消息!
(producer)
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//这里的 true 就是 Mandatory
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
自定义消费监听
消息接收类
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
添加绑定(consumer)
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
消费限流
- prefetchSize:0
- prefetchCount : 消费者接收消息上限,一旦有N个消息还没ack(处理完成),该consumer将block(阻塞)掉,直到有ack
- global : 是否将上面设置应用于channel。就是上面限制是channel级别还是consumer级别
prefetchSize 和 global这两个,rabbitmq没实现。prefetchCount 在 no_ask=false情况下生效,自动应答的情况下,这两个值不生效的。
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//手动ack ,false不支持多条签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
添加绑定(consumer)
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
//一次只接受1条消息
channel.basicQos(0, 1, false);
//关闭自动签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
消费端ACK与重回队列
- 消费端重回队列是为了对没有处理成功的消息,把消息传递给Broker!
实际应用中,一般都会关闭重回队列。设置成false
添加绑定(consumer)
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//第一个false 是否支持多条签收,第二个true是否重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
TTL队列/消息
- 就是队列/消息 的生命周期
- 设置最大数量和过期时间
Arguments 设置queue
- x-max-length 3000 (最大长度3000)
- x-message-ttl 1000 (过期时间10秒)
(producer)
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")//过期时间
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
死信队列
- DLX,Dead-Letter-Exchange
- 当消息在一个队列中变成死信之后,他会被重新publish到另一个Exchange,这个Exchange就是DLX
变成死信队列的几种情况
- 消息被拒绝(basic.reject/basic.nack) 并且requeue=false (手动签收)
- TTL过期
- 队列达到最大长度
设置死信队列
- 就是绑定一个死信队列
- Exchange: dlx.exchange
- Queue: dlx.queue
- RoutingKey: #
- arguments.put(“x-dead-letter-exchange”,”dlx.exchange”) 其实上面名称都可以随便取,只要这里绑定对应交换机就行。
(consumer)
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
还没有评论,来说两句吧...