大量业务使用消息中间件进行系统间的解耦、异步化、削峰填谷设计实现。公司内部前期基于RabbitMQ实现了一套高可用的消息中间件平台。随着业务的持续增长,消息体量随之增大,对消息中间件平台提出了更高的要求,此外在运维过程中也遇到了高可用难以保障,功能特性不足等诸多问题。基于遇到的这些问题,决定引入RocketMQ进行替换。本文将介绍基于RocketMQ建设消息中间件平台并实现在线业务无感知的平滑迁移。
vivo互联网中间件团队于2016年开始基于开源RabbitMQ向业务提供高可用消息中间件平台服务。
为解决好业务流量快速增长的问题,我们通过合理的业务集群拆分和动态调整,较好的交付了业务对消息中间件平台的平台能力需求。
但是随着业务长周期的迅猛发展,消息体量也越来越大,在高并发、大流量场景下RabbitMQ的系统架构设计存在着一定的限制,主要有以下问题:
架构设计存在脑裂风险,并且默认脑裂后无法自动恢复,人工介入恢复存在数据丢失的风险。
为解决脑裂问题,可以选择将网络异常后的处理调整为pause_minority模式,但是也带来了可能微小的网络抖动也会导致集群故障无法恢复的问题。
业务消息发送后通过exchange路由到对应的queue中,每一个queue由集群中的某个节点实际承载流量,高流量下集群中的某个节点可能会成为瓶颈。
queue由某个节点承载流量后无法快速迁移,强制迁移到其它低负载节点可能会导致queue不可用,这也导致了向集群中添加节点并无法快速提升集群的流量承载能力。
集群性能较低,经测试使用三台机器组成集群,可承载大概数万tps左右,并且由于queue是由集群中某个节点实际承载的,也无法继续提升某个queue的性能,这样就无法支撑大流量业务。
消息堆积到千万或更多后会导致集群性能下降,甚至海量堆积后如果消费请求tps特别高,可能会因为磁盘的性能损耗导致发送性能下降,并且在消息堆积太多时恢复时间长甚至无法恢复。
RabbitMQ 默认情况下消费异常会执行立即重新投递,少量的异常消息也可能导致业务无法消费后续消息。
功能特性上未支持事务消息、顺序消息功能。
虽可自行实现消息轨迹逻辑,但是会对集群产生非常大的性能损耗,在正式环境中实际无法基于RabbitMQ原生的能力实现消息轨迹功能。
基于以上问题,中间件团队于2020年Q4开始进行了下一代消息中间件平台方案的调研,为保证下一代消息中间件平台符合业务新的需求,我们首先明确了消息中间件平台的建设目标,主要包含两部分:
高性能:可支撑极高的tps,并且支持水平扩展,可快速满足业务的流量增长需求,消息中间件不应成为业务请求链路性能提升的瓶颈点。
高可用:极高的平台可用性(>99.99%),极高的数据可靠性(>99.99999999%)。
丰富的功能特性:支持集群、广播消费;支持事务消息、顺序消息、延时消息、死信消息;支持消息轨迹。
基于当前RabbitMQ平台的问题和对下一代消息中间件平台的项目需求,我们开展了针对当前较流行的两款消息中间件:RocketMQ、Pulsar的调研。
调研过程中主要针对以下两方面进行对比:
3.1.1 高可用架构与负载均衡能力对比
Pulsar部署架构(来源:[Pulsar社区] )
RocketMQ部署架构(来源:[RocketMQ社区] )
Pulsar:
采用计算与存储分离架构设计,可以实现海量数据存储,并且支持冷热数据分离存储。
基于ZK和Manager节点控制Broker的故障切换以实现高可用。
Zookeeper采用分层分片存储设计,天然支持负载均衡。
RocketMQ:
3.1.2 扩缩容与故障恢复对比
Pulsar
Broker与BooKeeper独立扩缩容,并且扩缩容后会完成自动负载均衡。
Broker节点无状态,故障后承载Topic会自动转移到其它Broker节点,完成故障秒级恢复。
BooKeeper由自动恢复服务进行ledger数据对齐,并恢复到设置的QW份。
故障期间已ack消息不会丢失,未ack消息需要客户端重发。
RocketMQ
Broker扩缩容后需要人工介入完成Topic流量均衡,可开发自动负载均衡组件结合Topic的读写权限控制自动化完成扩缩容后的负载均衡。
基于主从切换实现高可用,由于客户端定期30秒从NameSrv更新路由,因此故障恢复时间在30~60秒,可以结合客户端降级策略让客户端主动剔除异常Broker节点,实现更快故障恢复。
采用同步复制异步刷盘部署架构,在极端情况下会造成少量消息丢失,采用同步复制同步刷盘,已写入消息不会丢失。
3.1.3 性能对比
Pulsar
RocketMQ
从高可用架构分析,Pulsar基于Bookeeper组件实现了架构的计算与存储分离,可以实现故障的快速恢复;RocketMQ采用了主从复制的架构,故障恢复依赖主从切换。
从功能特性分析,Pulsar支持了丰富的过期策略,支持了消息去重,可以支持实时计算中消息只消费一次的语义;RocketMQ在事务消息、消息轨迹、消费模式等特性对在线业务有更好的支持。
从这两方面对比,最终选择了RocketMQ构建我们下一代的消息中间件平台。
通过技术调研,确定了基于RocketMQ建设下一代消息中间件平台。
为了实现业务从RabbitMQ平滑迁移到RocketMQ,就需要建设消息网关实现消息从AMQP协议转换到RocketMQ;RabbitMQ与RocketMQ的元数据语义与存储存在差异,需要实现元数据语义的映射与元数据的独立存储。
主要有以下四个事项需要完成:
RabbitMQ采用推模式进行消息消费,虽然RocketMQ也支持消息推送消费,但是因为AMQP协议中通过prefetch参数限制了客户端缓存消息数量以保证不会因缓存太多消息导致客户端内存异常,因此在消息网关实现消息推送时也需要满足AMQP协议的语义。
同时每个消息网关都需要数千甚至数万的queue的消息推送,每个queue消息消费速率存在差异,并且每个队列可能随时有消息需要推送到客户端进行消费,要保证不同queue之间的推送互不干扰且及时。
为了实现高效的、互不干扰的消息推送,有以下策略:
最终选择了第2种方案,数据流转图如下图所示:
一个消息消费过程:客户端在启动连接到消息网关后,在消息网关中会构建RocketMQ推送消费客户端实例,并且注入自定义的ConsumeMessageService实例,同时使用一个信号量保存客户端允许推送的消息数量。
当消息从集群侧推送到消息网关时,将消息按照推送的批次封装为一个任务保存在ConsumeMessageService实例的BlockingQueue中,同时推送线程会轮询所有的ConsumeMessageService实例,如果发现本地缓存有待消费的消息并且有可消费消息的业务客户端,将任务提交到线程池中完成消息的推送。
为了保证不会因为少量消费速率特别高的queue导致其它queue的消息推送时效性降低,会限制每一个ConsumeMessageService只允许推送一定数量的消息即转到推送其它queue的消息,以此即可保证所有queue的消息推送的互不干扰和时效性。
在客户端消费ack/uack后再次通过信号量通知下一次推送,这样也保证了使用少量的线程资源即可完成海量消息的推送需求。
基于消息网关,可以在消息推送逻辑中增加消费启停和消费限流逻辑。
消费启停可以帮助业务快速实现消费的暂停或是部分异常节点停止消息消费。
消费限流可以帮助业务控制消息消费速率,避免对底层依赖产生太大压力。
5.1.1 更高、更稳定的消息发送性能
原生RabbitMQ集群业务压测性能
使用消息网关后业务压测性能
5.1.2 更丰富的功能特性
5.1.3 业务使用特性变化
业务从RabbitMQ迁移到RocketMQ后,可支撑业务流量从万TPS级别提升到十万TPS级别,可支撑业务容量从数亿提升至百亿级别。耗用机器资源下降50%以上,运维难度和成本均大大降低,同时可以基于消息网关实现更加丰富的功能特性。
未来,中间件团队计划在三个方面对消息中间件进行迭代演进:
END
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8