1.1 什么是消息中间件
消息中间件通也被称为消息队列服务器,是当今分布式应用架构中经常采用的技术。
简单来说就是程序与程序之间进行异步通信的一种方式,消息的发送者不用一直等待消息的处理完毕,而是将消息发送给消息中间件就返回了。消息的指定消费者会订阅消息并处理他们。
这有点类似与 RPC(远程过程调用),当然消息中间件的实现与 RPC 规范是息息相关的,因为毕竟他们都是为了解决计算机世界中两个程序之间通信的技术。
消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段。它具有 低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。
当前使用较多的消息队列有 RabbitMQ
、RocketMQ
、ActiveMQ
、Kafka
、ZeroMQ
、MetaMQ
等,而部分数据库,如 Redis
、MySQL
以及 phxsql
也可实现消息队列的功能。在当今世界的互联网应用中消息队列中间件基本上成为标配。
1.2 消息中间件的产生背景
A 系统产生一些特有的数据,并需要将这些数据传递给下面其他系统,让其他系统来继续处理这段数据,最终才能称之为完整的处理一条数据。
由于系统与系统之间是通过业务逻辑来拆分的,所以各自系统都”对外”暴露自己可以服务的接口,别人想用自己的服务就必须调用自己设计的接口(将数据作为参数传进接口里进行接口调用),当然被调用的接口也是可以返回给调用者数据处理之后的结果数据。
所以在一个单体服务应用里,可以到处找到一个服务实现里,调用了其他服务接口,并以此进行数据传输通信。但这里有个现实问题:
如果下面的 B 系统不提供某种特殊服务了,那么 A 系统在知道的情况下,就需要去掉调用 B 系统服务接口的代码,修改一处还好说,如果有大量的数据服务都对外关闭了,那么 A 系统中代码改动就会很大。
A 系统调用 B 系统的时候,要是不知道 B 系统停用了接口情况下,A 系统如何知道是通信不正常导致的,还是确实是要调用的接口不存在了,还是调用接口服务超时了。
又来了新的 E 系统,其接口也需要用到 A 系统的数据,那么 A 系统又得增加新的代码,调用 E 系统的接口。如果,E 系统就是尝试运营一段时间,发现业务这样拆分不是很好,又要砍掉,那么 A 系统的代码又需要改一次。
因此这样的系统架构对于那种后期业务需求不需要发生系统性变化的场景,已经是最精炼省成本的方案。但是,遇到业务需求频繁变化,系统拆分频繁的情况下,A 系统和其他相关系统接口之间的代码耦合度太高,以至于后期的代码维护成本十分巨大,也不利于架构的伸缩。
总体用图来表述,就是下面这张看着烦心的变动需求:
从上图可以看出:
只要有别人需要 A 系统的数据,那么 A 系统业务代码中就要增加调用这些系统的接口,当需求方不需要这些数据的时候,又要去掉该代码。因此代码维护成本很高。
1.3 消息中间件的使用场景
1.3.1 解耦
在 A 系统和其他系统之间增加一个处理消息的中间件,这个中间件专门用于接收数据,同时也提供其他订阅了该数据消费的系统来此消费该数据,也是 JMS 规范中的 Pub/Sub(发布/订阅)模型。
A 系统产生了有效数据,随即发给 MQ,即可继续干自己的事情,至于谁来处理这条数据,A 系统无需关心,它只需要一门心思生产有效数据即可。
其他系统想要使用并处理 A 系统的数据,也变得十分简单,只需要去 MQ 中找自己要消费的数据进行消费即可,当不需要A系统数据时,就不去 MQ 消费即可。
这种消息通信方式使得系统之间得到完全解偶,绝对是应对频繁拆分系统问题的优质解决方案。
面试技巧
消息队列的常见使用场景?
此时需要去考虑一下自己负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给他异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 达到解耦。
1.3.2 异步
在同步调用的系统中,A 系统分别调用了下面三个系统,每个系统都会对数据库进行数据操作,那么整个一次数据的完整流程走下来就是很慢长的数据处理,这是用户无法容忍的响应速度。
当使用 MQ 进行异步化:A 系统产生数据之后,只要将数据成功发送给 MQ,就返回响应给用户,至于数据在 MQ 之后的操作,由要消费的系统自己去完成。
1.3.3 削峰
对于互联网应用,流量是右一定的周期性波动,对于单一架构的系统,在流量高峰的时候,由于系统资源瓶颈问题,会导致系统崩溃。
MQ 就像一个大水库一样,当水库水满的时候,会采取开几个闸门放水,达到缓慢放水的目的,如果所有闸门全部放水,那么水库下游的村民全部遭殃。
1.4 MQ的优缺点
1.4.1 优点
解耦、异步、削峰
1.4.2 缺点
系统可用性降低
系统引入的外部依赖越多越容易挂掉,本来 A 系统调用 B、C、D 三个系统的接口就好了,其中一个调用失败了就返回失败响应,这没啥问题。但当加了 MQ 之后,万一 MQ 挂了咋整?MQ 挂了,整套系统崩溃了,A 系统就真的什么也不能用了。
上图中,由于则增加了 MQ 导致系统存在的可能问题会更多。
系统复杂性提高
硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 B、C、D 三个系统那里,B、D 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,MQ 的引用会使得原有的系统复杂度提升了一个数量级,也许是复杂了 10 倍,但是关键时刻,还是得用。
1.5 常见MQ性能对比
对于中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
1.6 如何保证MQ高可用
1.6.1 RabbitMQ的高可用性
RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种MQ的高可用性怎么实现。
rRabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式
单机模式
demo级别,一般就是本地学习开发使用,没人生产用单机模式。
普通集群模式
意思就是在多台机器上启动多个RabbitMQ实例,每个机器里含着一个实例。在创建queue的时候,这个消息队列只会放在集群中的某一个RabbtiMQ实例上,但是每个实例都同步queue的元数据。
在消费的时候,实际上如果消费者连接到了集群中不含实际数据的节点上,那么这个节点的实例会从queue所在实际数据的实例上拉取数据过来。
这种方式没做到所谓的分布式,就是个普通集群。因为集群模式的限制:消费者要么每次随机连接一个实例,然后这个实例会找到真实存放数据的实例去拉取数据再消费,这会引发集群节点之间存在大量的数据传输动作,增加数据拉取的开销;要么固定连接那个queue所在实例消费数据,导致单实例性能瓶颈。
另外当存放实际queue的实例宕机了,会导致接下来其他实例就无法从那个实例拉取。解决方案就是开启消息持久化,让RabbitMQ落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个queue拉取数据。
从上面的分析可知,这种集群模式没有什么所谓的高可用性可言,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。
镜像集群模式
这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,创建queue时候,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。
这种模式的优势:任何一个机器宕机了,别的机器都可以用。
弊端:第一,性能开销很大,消息同步所有机器,导致网络带宽压力和消耗很重;第二,没有扩展性可言了,如果某个queue负载很重,新增节点无济于事,因为新增的机器也包含了这个queue的所有数据,并没有办法线性扩展。
如何开启镜像集群模式呢?
RabbitMQ有很好的管理控制台,在后台管理里新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
1.6.2 Kafka的高可用性
架构思路
Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;当创建一个 topic 的时候,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 只放一部分数据。
这就是天然的分布式消息队列,简单概述就是一份 topic 数据,是分散放在多个机器上的,每个机器就放一部分数据。
严格来说,RabbitMQ 不管使用什么模式,都不是分布式消息队列,它只是传统的消息队列,只不过提供了一些集群、HA 的机制而已,因为无论怎么玩,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。
Kafka 0.8 以前,是没有HA机制的,因此任何一个 broker 宕机了,那么这个 broker 上的 partition 就废了,没法写也没法读,也就没有什么高可用性可言。
Kafka 0.8 以后,提供了 HA 机制,就是 replica 副本机制。每个 partition 的数据都会同步到其他机器上,形成自己的多个 replica 副本。然后所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上数据即可。
为什么只能读写leader?
因为假设可以随意读写每个 follower,那么在数据读写的时候就需要考虑数据一致性的问题,因此这样对 Kafka 系统架构设计的复杂度太高,也很容易出问题。因此为了架构简单,成本、风险小,Kafka 会均匀的将一个 partition的所有 replica 分布在不同的机器上,这样才可以提高容错性。
高可用性
在上述架构的前提下,才为 Kafka 提供了高可用的可能性:如果某个 broker 宕机了,这个 broker 上面的 partition 在其他机器上还有副本,如果这个 broker 是某个 partition 的 leader,那么此时会在所有 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。
读写过程浅析
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
消费的时候,只会从 leader 去读,但是只有一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
1.7 消息幂等性
如何保证消息不被重复消费啊(如何保证消息消费时的幂等性)?这种问题是面试时必问问题,因为这是实际生产上时刻会面临的系统设计问题。
1.7.1 重复消费问题
遇到这种问题的时候,首先需要知道哪些消息中间件可能会这种问题,产生问题的原因是什么?
RabbitMQ、Rocketmq、Kafka,都有可能会出现消费重复消费的问题,因为这些 MQ 可以保证消息不丢,单保证不了消费重发,所以会出现消息重复消费的问题。
以 Kafka 为例,Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表这条消息进入 Kafka 的序号,消费者在成功消费数据之后,会定时定期把刚才消费过的消息的 offset 提交向 zookeeper 注册中心提交,表示消费者已经成功消费消息。当消费者重启的之后,会重新要求 Kafka 继续从上次消费到的 offset 来继续消费。
因此上述的消息消费机制,关键点在于消费者重启的时机,如果消息者消费成功之后,还没来得及提交 offset 的时候,机器重启了,那么当机器重启之后,重新去Kafka消息队列拉取数据的时候,是从上一次成功消费的地方开始,因此会出现消费重复消费的问题。
1.7.2 保证幂等性
其实重复消费不可怕,可怕的是没考虑到重复消费之后,怎么保证幂等性。
举个例子。假设有个系统,消费一条往数据库里插入一条,要是一个消息重复两次,那么不就对同一条数据做了两次插入操作吗?因此可以在消费到第二次的时候,自己判断一下已经这条消息是不是已经消费过了,如果是已经消费过的数据,那么就直接不消费这条消息,因此保证了数据操作的幂等性。
如何保证MQ的消费是幂等性的,需要结合具体的业务来操作,以下是思路:
比如一条数据要进行写库操作,首先根据主键查一下,如果这数据已存在,就不要执行插入操作,而是执行更新操作。
比如如果数据库是redis,那不存在这种问题,反正每次都是set,天然幂等性
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的标识(类似订单id之类的标识),然后所有消费者来自己这里消费的时候,先根据这个唯一标识去比如redis里查一下,之前是否有消费过?如果没有查到,就表示这条消费没有被处理,消费成功之后就将这个唯一标识写入redis。如果查到了,就标识消费过了,因此就不处理了。
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,因为有数据库的唯一键约束保证,所以数据重复插入只会报错,不会导致数据库中出现脏数据。
1.8 消息可靠性
如何保证消息的可靠性传输(如何处理消息丢失的问题)?
这个问题也是实际生产中会面临的问题,对于非常核心的消息传递,比如计费系统、扣费系统等,因为业务重,所以需要使用MQ来进行消息异步化。
广告计费系统会统计每次用户点击广告的次数,安装次数进行计费,那么丢失了数据,积少成多,是绝对不允许出现丢数据的问题。
使用MQ时丢数据一般分两种情况:要么是MQ自己丢数据,要么是消费的时候丢数据。分别对RabbitMQ和Kafka进行分析:
1.8.1 RabbitMQ
(1)生产者弄丢了数据
生产者将数据发送到RabbitMQ的时候,因为网络等问题导致数据在传输路上丢失。
此时可以选择用RabbitMQ提供的事务功能:
生产者发送数据之前开启RabbitMQ事务(channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;
如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果要确保说写RabbitMQ的消息不丢失,可以开启confirm模式:
在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,如果成功写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息已成功传输到了MQ中。如果RabbitMQ没能接收这个消息,则会回调消费者的nack接口,告诉消费这个消息接收失败,此时消费者可以重试发送。而且消费者可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。
事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,常用confirm机制。
(2)RabbitMQ弄丢了数据
为了防止RabbitMQ自己弄丢了数据,所以必须开启RabbitMQ的持久化功能:消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
设置持久化有两个步骤:
第一个是创建queue的时候将其设置为持久化,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据;
第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
此外,RabbitMQ开启了持久化机制也并非完全不丢失数据:也存在一种消息刚写到RabbitMQ中,但还没来得及持久化到磁盘上,此时RabbitMQ挂了的情况,这会导致内存里的少量数据丢失。
(3)消费端弄丢了数据
消费端丢失数据主要原因是在消费的时候,刚消费到一半,还没来得及处理完毕,结果当前的消费端进程挂了,比如重启了,那么RabbitMQ认为你都消费了,因此数据就丢了。
这个时候得用RabbitMQ提供的ack机制,简单来说就是关闭RabbitMQ自动ack,可以通过一个api来调用就行:在消费端处理完数据的时候,在程序里手动ack。
因此,消费端如果还没处理完数据,此时没有ack回MQ,那么RabbitMQ就认为这条消息还没处理完,会把这个消费分配给别的consumer去处理,这样就保证的消息不丢失。
1.8.2 Kafka
(1)消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况:当消费者正在消费的时候,消费者这边就自动提交了offset,导致kafka以为消费者已经确认消费完毕这条消息,然而实际上消费者正准备处理这个消息,还没来得及处理消息,系统就挂了,消息自然就丢失了。
众所周知Kafka会自动提交offset,那么只需要关闭自动提交offset,消费者在处理消息完毕之后自己手动提交offset,就可以保证数据不会丢。
注意:不论关闭不关闭自动提交offset,当消费者正在消费的时候,没有做幂等性机制处理,会导致重复消费问题,因此消费者须要自己解决幂等性问题。
这种消费端丢失数据的问题在生产环境中还是很可能发生:kafka消费者消费到了数据之后是先写到入内存的queue里缓冲一下,结果有的时候,消费者刚把消息写入内存queue,然后消费者会自动提交offset,此时消费者重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了。
(2)kafka弄丢了数据
Kafka本身丢失数据是一个比较常见的场景:众所周知,当Kafka某个broker宕机之后,Kafka需要重新选举partiton的leader。因此,如果在其他的follower刚好还有些数据没有同步完全的时候,结果此时leader挂了,然后选举某个follower成新的leader之后,这个当前新的leader自然就丢失了一些数据。
为了保证Kafka不丢失数据,一般是要求起码设置以下4个参数:
topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。
在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower。
在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。
在producer端设置retries=MAX(很大的一个值,无限次重试的意思):这个参数是要求一旦写入失败,就无限重试,卡在这里了
按照上述要求配置,至少在Kafka的broker端可以保证在leader所在broker发生故障,并进行leader切换时,数据不会丢失。
(3)生产者会不会弄丢数据?
如果按照上述的思路设置了ack=all,一定不会丢,因为参数设置的保证:当Kafka的leader接收到消息,所有的follower都同步完毕消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
1.9 消息有序性
如何保证消息的顺序性?这个问题也是生产系统中常见的问题。
1.9.1 消息乱序问题分析
举个例子,在mysql binlog同步的系统中,日同步数据要达到上亿。业务操作在mysql里增删改一条数据,对应出来了增删改3条binlog,接着这三条binlog发送到MQ里面,到消费出来依次执行,起码得保证日志数据是按照顺序,不然本来是:增加、修改、删除;你楞是换了顺序给执行成删除、修改、增加。
这种数据同步不一致的问题十分危险:本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据未能同步。
顺序会错乱的俩场景:
- RabbitMQ:一个queue,多个consumer,导致顺序乱了
- Kafka:一个topic,一个partition,一个consumer,内部多线程,导致顺序乱了
1.9.2 解决方案
RabbitMQ:
拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
Kafka:
一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可。
1.10 消息积压
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,如何解决?
这种问题也是实际生产中可能会发生的问题:消息队列集群的磁盘都快写满了,一直没人消费,这个时候怎么办?或者是整个这就积压了几个小时,这个时候怎么办?或者是消息积压的时间太长,以至于积压时长都超过了RabbitMQ设置了消息过期时间,此时该怎么办?
这种问题本质在于消费端出了问题,消费者不消费了或者消费的极其极其慢。这种问题一般不会出现,但是一旦出现了,就是巨大的系统性问题,一般常见于:消费端每次消费之后需要操作mysql,结果mysql挂了导致消费端hang在原地不动了。或者是消费端出了个什么叉子,导致消费速度极其慢。
(1)大量消息在MQ里积压了几个小时了还没解决
从下午4点多积压到了晚上11点多,几千万条数据在MQ里积压了七八个小时,这时第一反映就是修复consumer,让消费者恢复正常消费速度,一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条,所以如果MQ积压了几百万到上千万的数据,即使消费者恢复了,也需要大概一小时的时间才能恢复过来。这种只修复现有consumer 是不能及时解决积压问题。
这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍或者20倍数量的queue
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
(2)这里我们假设再来第二个坑
如果用的是RabbitMQ,RabbitMQ是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在MQ里,而是大量的数据会直接搞丢了。
这种情况下,不是增加consumer消费解决积压消息问题,因为实际上没啥积压,而是丢了大量的消息,需要找回丢失的数据。我们可以采取一个方案,就是批量重导:
等到用户流量峰值变小的时候,一般为凌晨之后,写个程序将丢失的那批数据,这个临时程序目的就是把丢失的数据一点一点查出来,然后重新灌入MQ里面去,把白天丢的数据给他补回来。
假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。
(3)然后我们再来假设第三个坑
由于消息长时间积压在MQ里,消费者消费太慢,导致MQ都快写满了,怎么办?
只能通过写临时程序,接入数据以消费数据,消费一个就丢弃一个,快速消费掉所有的消息。然后再将丢失的数据重新补回MQ。
1.11 设计MQ的思路
当自己写一个消息队列,该如何进行架构设计啊?简述思路。
其实这种问题,一般在面试中是考察两块:
你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个MQ的架构原理
看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来。
说实话,我一般面类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西。
类似的问题还有:如果让你来设计一个spring框架你会怎么做?如果让你来设计一个dubbo框架你会怎么做?如果让你来设计一个mybatis框架你会怎么做?
这类问题不算是刁钻的问题,因为只是简述思路,而不是要求分析技术源码,因此面试者起码大概知道这个技术的基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好。
比如说这个消息队列系统,我们来从以下几个角度来考虑一下:
首先这个MQ得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下Kafka的设计理念,broker -> topic -> partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了吗。
其次你得考虑一下这个MQ的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是Kafka的思路。
其次你考虑一下你的MQ的可用性啊?这个事儿,具体参考我们之前可用性那个环节讲解的Kafka的高可用保障机制。多副本 -> leader & follower -> broker挂了重新选举leader即可对外服务。
能不能支持数据0丢失啊?可以的,参考我们之前说的那个Kafka数据零丢失方案
其实一个MQ肯定是很复杂的,面试官问到这个问题,其实是个开放题,他就是看看面试者有没有从架构角度整体构思和设计的思维以及能力。确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。
参考博文
互联网Java工程师面试突击训练第1季 | 微信公众号:石杉的架构笔记