消息队列 缺乏、安全感 2023-10-06 17:00 69阅读 0赞 ### 一、什么是消息队列 ### 以下为虚构的小故事: * 有一天,产品跑来跟小王说:“我们要做一个用户实名的功能,需要在用户实名成功后给用户发一条短信。” 小王(攻城狮level1):“好的,这个需求简单。” ,小王直接调用了XX实名接口,实名成功后,同时调起触达服务给该用户发短信,待短信发送成功后,页面就会响应用户操作成功。没一会功夫,代码写完并测试成功,就发布上线了。 线上正常运行了一段时间,产品又匆匆地跑来说:“你做的功能不行啊,客服反馈实名操作响应太慢,已经有好多用户流失了。” 小王听得一身冷汗,赶紧跑回去改。他发现,原先的以单线程同步阻塞的方式进行短信发送,确实存在问题。这次,他利用了 JAVA 多线程的特性,另起线程进行短信发送,主线程直接返回保存结果。测试通过后,赶紧发布上线。 没过多久,产品又跑来了,他说:“现在,实名操作响应是快多了。但是又有新的问题了,有用户反应,短信收不到。我们能否感知到发送失败的短信消息,进行补发。” 小王一听,哎,又得熬夜加班了,急忙找大伟寻求帮助, 大伟说,“咱们公司提供了一个类似邮局信箱的东西,你往这信箱里写上你要发送的消息,以及约定的地址,之后你就不用再操心了,触达服务自然能从邮件信箱取到消息进行短信发送” 后来,小王才知道,这就是外界广为流传的消息队列。你不用知道具体的服务在哪,如何调用。你要做的只是将该发送的消息,向你们约定好的地址进行发送,你的任务就完成了。对应的服务自然能监听到你发送的消息, 进行后续的操作。这就是消息队列最大的特点,将同步操作转为异步处理,将多服务共同操作转为职责单一的单服务操作,做到了服务间的解耦。 上面小故事分析的“邮局信箱“是在消息传输过程中的保存消息的容器,也就是本文要介绍的消息队列,借助消息队列我们可以实现应用耦合,异步消息,流量削锋等问题。目前使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。 ## 二、消息队列特性&优点 ## * `异步性:体现的便是处理消息堆积的能力,把非必须的,时效性要求不高业务可放在消息队列中做异步处理。好处是:异步化处理以后,减少了同步等待的时间` * 实名认证过程,将发送短信异步化处理,减少了用户页面响应时间 * `解耦性:消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。好处是:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力;` * 调用实名认证的系统和触达系统之间不直接交互,实名结果也不受触达系统处理短息的结果所影响 * `削峰处理能力:在一些高并发的业务场景,短时间内流量太大,应用系统配置承载不了这股瞬间流量,可能导致系统直接挂掉,消息队列具有很好的削峰处理能力,即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。` * 如果有钱花上线抢红包等活动,大量实名认证请求进来,借助消息队列可`削平高峰期的并发事务` * `可靠性:消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。` * `借助消息队列将接收到的消息存放在本地磁盘中,这样即使触达系统挂掉、或者触达系统消费消息的时候消息队列挂掉,各服务恢复后依然可以继续消费消息` * `分布式:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。` ## 三、消息队列通信模型 ## * PTP(Point to Point)点对点模式 PTP模式包含三个角色,消息队列(Queue),发送者(Sender),接受者(Receiver),每个消息都被发送至特定的队列,接收者从消息队列中拉取消息,消息被消费以后,该消息在queue中被删除 点对点模式特点: 1、一个消息只会被一个消费者消费,一旦被消费,消息就不在消息队列中; 2、生产者和消费者在时间上是没有依赖性的,也就是说当生产者发送消息之后,不管消费者有没有正在运行,都不影响消息被发送到队列中; 3、消费者主动拉取消息的频率可以自己控制,但消费者需要启额外线程监听消息队列是有代消费的消息 点对点模式有什么缺点? * Pub/Sub(Publish/Subscribe)发布/订阅模式 Pub/Sub模式包含三个角色:主题(Topic),发布者(Publish),订阅者(Subscribe),多个发布者将消息发送到Topic,系统可以将这种消息传送到多个订阅者 Pub/Sub模式特点 1、每个消息可以有多个消费者来消费 2、发布者和订阅者之间有时间上的依赖性,针对某个主题(Topic)的订阅者,它必须创建一个订阅之后(类似微信公众号),才能消费发布者的消息,除非建立可持久化的订阅 3、这种模式订阅者被动接受消息队列推送的消息,订阅者必须保持运行状态 发布订阅模式有什么缺点? ## 四、消息队列之kafka ## ### 1、kafka简介 ### Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:同时为发布和订阅提供高吞吐量 * Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。消息持久化将消息持久化到磁盘,因此可用于批量消费,例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。 * 分布式 支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。 * 消费消息采用 pull 模式 消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。 * 支持 online 和 offline 的场景 * 同时支持离线数据处理和实时数据处理。 ### 2、kafka集群模型图 ### ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70][] 一个典型的Kafka集群中包含若干 producer(可以是 web 前端,或者是系统服务等),若干 broker(服务器),若干 consumer,以及一个 Zookeeper 集群 #### 2.1 名词解释 #### ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 1][] * 主题(topic) 1)每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为主题 2)用户只需制定消息的主题,即可生产或消费数据,而不必关心数据存放在何处 * 代理(broker) Kafka 集群中的一台或多台服务器统称为 Broker,Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘,Kafka 支持水平扩展,一般代理服务器数量越多,集群吞吐率越高。 * 分区(partition) 分区是物理上的概念,每个主题被划分为一个或多个分区 ,一个分区就是一个提交日志,消息以顺序追加的方式写入分区 ,分区中的每条消息都会被分配一个有序的 id(offset),kafka分区的目 的本质是为了提高并发能力的,因为kafka以Partition为单位写的。分区的优势是什么呢? * 领导者(leader) 每个分区都有一台代理服务器作为领导者,领导者处理一切对此分区的读写请求。 * 跟随者(follower) 每个分区都有零台或者多台代理服务器作为跟随者,跟随者需要同步领导者上的数据。 * 副本(replication) Kafka 允许主题的分区有很多副本,当集群中的服务器出现故障时,能自动进行故障转移,保证数据的可用性。 * 控制器(controller) 每个集群都有一个代理同时充当了控制器的角色,负责检测代理级别的故障,负责更改故障代理中的分区的领导者 * 生产者(producer) 生产者,负责使用 push 模式发布消息到 Kafka 的主题中,生产者负责将消息分配到主题的哪一个分区中 * 消费者(consumer) 消费者,从 Kafka 中读取消息的客户端,消费者使用 pull 模式从 Kafka 订阅并消费消息 * 消费者组(consumer group) 每个消费者属于一个特定的消费者组,发布到主题中的每一条消息,被分配给订阅消费者组中的一个消费者 ### 3、kafka工作流程分析 ### ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 2][] #### 3.1 消息的发送及存储 #### 3.1.1 消息的发送 如上图,ProducerA直接发送消息到broker上的leader partition,不需要经过任何中介或其他路由转发,为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求, 并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic 的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。ProducerA可以自己控制着消息被推送到哪些partition,实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法 。如果不指定分区,kafka会轮训出 一个partition的leader返回给ProducerA,假设当前ProducerA向TopicA-partition0 push一个消息,TopicA-partition0 接收到这份消息后会将消息存入到磁盘里,通常producer在发送完消息之后会得到一个future响应,包括offset值或者发送过程中遇到 的错误。这其中有个非常重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,1)如果acks设置数量为0,表示producer不会等待broker的响应 2)若acks设置为1,表示producer会在leader partition收到消息时 得到 broker的一个确认 3)若设置为 - 1,producer会在所有备份的partition收到消息时得到broker的确认,这种情况下当收到消息的partition leader存储完成后,当前partition的所有Follower都会主动将这份pull过来存储,存储成功会向 partition0- leader 应答ok,当partition0收到所有Follower的应答便会向ProducerA应答ok,至此完成了消息的发送和存储。 3.1.2 消息的存储 对于每一个 topic,Kafka 集群都会维持一个分区日志,如下所示: ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 3][] 每个分区都是有序且顺序不可变的记录集,并且记录不断地追加到结构化的 commit log 文件。 分区中的每一个记录都会分配一个 id 号来表示顺序,我们称之为偏移量(offset),偏移量用来唯一的标识分区中每一条记录。生产者发送到特定 topic 分区上的消息,将按照发送的顺序处理。也就是说,如果 消息 M1 和消息 M2 由相同的生产者发送,并先发送 M1 消息,那么 M1 的偏移量 比 M2 小,并在日志中较早出现。每个分区相当于一个巨型文件,被平均分配到多个大小相当的日志段文件(log segment)中, 但每个段文件的消息数量不一定相等。这种特性方便老旧的消息被快速删除。 每个分区只需要支持顺序读写,日志段文件的生命周期由服务端配置参数决定。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 4][] #### 3.2 消息的读取 #### kafka提供了两种Consumer API ,Low Level Consumer,Hign Level Consumer。 Low Level Consumer API---提供更大灵活控制,但是以复杂性为代价的 在kafka中,当前读到哪条消息的offset值是由consumer来维护的,因此, consumer可以自己决定如何读取kafka中的数据。比如,consumer可以通过重设offset值来重复读取消息,跳读,严格只读取一次,缺点是Broker失败转移自己处理,增加Consumer、Partition、Broker需要自己做负载均衡。 Hign Level Consumer API High level api 封装了对集群中一系列broker的访问,可以透明的消费一个topic,它屏蔽了每个topic的每个partition的offset管理、Broker失败转移以及增减Partition、Consumer时的负载均衡。High level api 会启动另外一个线程去每隔一段时间, offset自动同步到zookeeper上。但是,如果使用了High level api, 每个message只能被读一次,一旦读了这条message之后,无论我consumer的处理是否ok。High level api的另外一个线程会自动的把offset+1同步到 zookeeper上。如果 consumer读取数据出了问题,offset也会在zookeeper上同步。因此, 如果consumer处理失败了,会继续执行下一条。这往往是不对的行为,因此,Best Practice是一旦consumer处理失败,直接让整个conusmer group抛Exception终止,但是最后读的这一条数据是丢失了,因为在zookeeper里面的offset已经+1了。等再次启动 conusmer group的时候,已经从下一条开始读取处理了。 3.1.2 消息的读取过程 消费者将会按照日志中的顺序查看消息,在每一个消费者中唯一保存的元数据是偏移量,即消费在分区日志中的位置,移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。这些细节说明 Kafka 的消费者是非常廉价的,消费者的增加和减少,对集群或者其他消费者没有多大的影响 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 5][] #### 3.3 kafka的优点 #### * topic的容量不受单台服务器容量大小的限制 一个主题可以拥有多个分区,每个分区位于一个服务器(broker)上,这样存储的消息容量可以超越一个服务器的限制。 * 数据更加安全:1)从生产端看,当一个消息被发送后,producer会等待broker成功接收到消息后反馈,如果在途中丢失或者是broker挂掉,producer会重新发送,一旦消息被提交,只要有一个broker上面的分区副本写入来了该消息,并且保存存活状态,该消息就不会丢失。2)从消费端看,消费端记录了一个offset值,当 consumer收到了某个分区的消息,但却在处理过程中挂掉,此时会有其他consumer来消费这个分区,并可以通过这个offset值重新找到这条消息再进行处理。3)从备份机制上看,有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一 个备份数量为n的集群允许n-1个节点失败 4)消息的持久化,意味着消息可以重复消费 * 每个服务器(broker)作为一些分区的leader,同时也作为另一些分区的follow,从而可平衡个kafka服务器的负载。 * 延迟小,读写速率不随着数据量的增加而增加,每个topic的Partition的是一个大文件夹,里面有无数个小文件夹segment,但 partition是一个队列,队列中的元素是segment,消费的时候先从第0个segment开始消 费,新来message存在最后一个消息队列中。对于segment也是对队列,队列元素是 message,有对应的offset标识是哪个message。消费的时候先从这个segment的一个message开始消费,新来的message存在segment的最后,消息系统的持久化队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。 更多学习资料:[http://kafka.apache.org/082/documentation.html][http_kafka.apache.org_082_documentation.html] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20210325160617532.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 1]: https://img-blog.csdnimg.cn/20210325160704902.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 2]: https://img-blog.csdnimg.cn/2021032516082186.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 3]: https://img-blog.csdnimg.cn/2021032516091634.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 4]: https://img-blog.csdnimg.cn/20210325160943174.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA_size_16_color_FFFFFF_t_70 5]: https://img-blog.csdnimg.cn/20210325161019184.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpWmhlbjMxNA==,size_16,color_FFFFFF,t_70 [http_kafka.apache.org_082_documentation.html]: http://kafka.apache.org/082/documentation.html
相关 消息队列 :场景 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0... 蔚落/ 2024年04月18日 23:53/ 0 赞/ 97 阅读
相关 消息队列 一、消息队列MQ(Message Queue): 1)消息队列是一种先进先出的数据结构; 2)消息队列使用的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义 「爱情、让人受尽委屈。」/ 2023年10月11日 11:11/ 0 赞/ 74 阅读
相关 消息队列? 对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。下面我们以这个本质作为根,一起由浅入深地聊聊 MQ。 01 从 Bertha 。/ 2023年10月10日 12:49/ 0 赞/ 52 阅读
相关 消息队列 一、什么是消息队列 以下为虚构的小故事: 有一天,产品跑来跟小王说:“我们要做一个用户实名的功能,需要在用户实名成功后给用户发一条短信。” 小王(攻城狮leve 缺乏、安全感/ 2023年10月06日 17:00/ 0 赞/ 70 阅读
相关 消息队列 1. 消息队列在项目中的使用 背景:在分布式系统中是如何处理高并发的。 由于在高并发的环境下,来不及同步处理用户发送的请求,则会导致请求发生阻塞。比如说,大量的ins ﹏ヽ暗。殇╰゛Y/ 2022年12月15日 03:23/ 0 赞/ 292 阅读
相关 消息队列 https://www.cnblogs.com/457248499-qq-com/p/7392678.html 来源 消息队列:在消息的传输过程中保存消息的容器。 迈不过友情╰/ 2022年05月30日 07:42/ 0 赞/ 306 阅读
相关 消息队列 消息队列是啥?我觉得大家都心知肚明,已经众所周知到不用解释的程度。不过,但凡学习、解释一样东西,都应该遵循 “它是什么?”、 “做什么用?”、 “为啥要用它” ╰半橙微兮°/ 2022年04月23日 08:42/ 0 赞/ 380 阅读
相关 消息队列 消息队列介绍 维基百科上的描述:在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通 红太狼/ 2022年01月26日 10:51/ 0 赞/ 365 阅读
相关 消息队列 为什么写这篇文章? 博主有两位朋友分别是小A和小B: 1. 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑。再不然就 小咪咪/ 2021年12月20日 06:51/ 0 赞/ 460 阅读
还没有评论,来说两句吧...