《Rabbit MQ 实战》读书笔记 (一:认识AMQP模型)
最近在读RabbitMQ实战这本书,开个帖子。记录一下读书笔记吧。
一、AMQP模型
Advanced Message Queuing Protocol-高级消息队列协议是一个进程间传递异步消息的网络协议。
AMQP模型描述了一套模块化的组件以及这些组件之间进行连接的标准规则。
在服务器中,三个主要功能模块连接成一个处理链完成预期的功能:
- “exchange”接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。
- “message queue”存储消息,直到这些消息被消费者安全处理完为止。
- “binding”定义了exchange和message queue之间的关联,提供路由规则
二、工作过程
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
从消息可靠性角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
三、Exchange交换机
交换机是用来发送消息的 AMQP 实体。
交换机拿到一个消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
AMQP 0-9-1 的代理提供了四种交换机:
交换机可以有两个状态:持久(durable)、暂存(transient)。
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
AMQP协议提供一个默认交换机:name为空字符串(“”),类型为Direct直连交换机。每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
交换机在与消息队列的匹配上,规则如下:
- direct模式,与消息队列绑定的BingingKey和消息发送方的RoutingKey必须完全匹配
- fanout模式,不需要发送方提供RoutingKey。该模式的交换机与队列的绑定也不需要BingingKey
- topic模式,RoutingKey与BingingKey模糊匹配的规则
- headers模式,几乎没有使用场景
四、Queue队列
AMQP 中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。
队列跟交换机共享某些属性,但是队列也有一些另外的属性。
- name 队列名称
- durable(消息代理重启后,队列依旧存在)
- exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
- Auto-delete(当最后一个消费者退订后即被删除)
- Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
五、消息机制
1.消息确认机制
笔者认为,AMQP的消息确认机制是非常亮眼的一个功能。
在对于处理消息丢失,保证消息的可靠性这个问题上,AMQP 0-9-1 规范给我们两种建议:
- 1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
- 2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
2.拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。
当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它拒绝多个带有确认回执(acknowledgements)的消息。但在RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。
3.预取消息
在多个消费者共享一个队列的案例中,可以明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息来起到简单的负载均衡和提高消息吞吐量的作用。
RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。
4.消息持久化
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。
六、TCP链接
AMQP的客户端(生产者 or 消费者)链接到服务器所谓的信道链接其实是基于真实 TCP链接内的虚拟链接。每条信道都会指派一个唯一id。通过信道而直接使用TCP链接是因为OS建立和销毁TCP会话非常耗时。如果多个线程都需要链接RabbitMQ,那么多个TCP链接消耗非常巨大。因此,每个线程对RabbitMQ的链接都会基于现有的链接来创建一条信道,这样就获得了链接到RabbitMQ上的私密通信路径,而不会对OS的TCP栈造成负担。
常见MQ的差异对比图如下:
还没有评论,来说两句吧...