我们在对rabbimq进行性能测试时发现在单个队列的情况下,无论怎么压qps都上不去,而此时服务器的cpu、网络、磁盘都没什么压力,但通过web管理界面看到发送通道不断的处于流控状态,显然流控机制被启动是性能不佳的最表象原因,那这背后到底是什么原因使得rabbimq启动流控机制呢?
首先总结下前篇博客讲述的rabbitmq流控机制原理---实质上就是通过监控没各进程的mailbox,当某个进程负载过高来不及接收消息时,这个进程的mailbox就会开始堆积消息,当堆积到一定量时,就会阻塞住上游进程让其不得接收新消息,从而慢慢上游进程的mailbox也会开始积压消息,到了一定的量也会阻塞上游的上游的进程接收消息,最后就会使得负责网络数据包接收的进程阻塞掉,暂停接收数据。这就有点像一个多级的水库,当下游水库压力过大时,上游水库就得关闭闸门,使得自己的压力也越来越大这就需要关闭更上游的水库闸门直到关闭最最上游的闸门。
从这套流控机制可以看出,对于处于整个流控链中的任意进程,只要该进程被阻塞,上游进程必定全部被阻塞,若某个进程是性能瓶颈,必然会导致起上游所有进程被阻塞。所以我们可以利用流控机制的这个特点找出瓶颈所在的进程,即通过trace找出流控链中被阻塞进程中最下游的那个进程,这个进程的下游进程就是瓶颈所在。例如有四个进程A、B、C、D,消息在进程间传递的顺序是A->B->C->D, 若此时观察到 A,B不断的被阻塞,则瓶颈一定在C ,因为若A是瓶颈就不会有很多的消息往后面传,若B是瓶颈,则A一定被阻塞,下游的消息就会变少,B就不会成为瓶颈,若D是瓶颈则 C也会被阻塞。
首先找出消息传递链条的所有进程以及顺序关系,rabbimq的流控机制实现在模块credit_flow中,当处于流控机制的进程往下游发前会调用credit_flow:send/1在接收消息时会调用crediat_flow:ack/1参数都为pid,在send中该pid为下游进程pid,ack为上游进程pid,为此我们可以trace这两个函数找出所有处于流控机制中的进程以及顺序关系,在通过i(Pid)提供的进程信息结合代码分析各进程扮演的角色。
{ok, Log} = file:open("flow_trace", [write, append]),
Tfun = fun(Msg, _) ->
io:format(Log,"~p ~n", [Msg])
end,
dbg:tracer(process, {Tfun, null}),
dbg:tp({credit_flow, send, '_'}, []),
dbg:tp({credit_flow, ack, '_'}, []),
dbg:p(all, c).
最后分析出这些进程的关系如下:
rabbit_reader:负责接收网络数据包,解析数据。
rabbit_channel:负责处理amqp协议的各种方法,进行路由解析等。
rabbit_amqqueue_process:负责实现queue的所有逻辑。
rabbit_msg_store:负责实现消息的持久化。
有了以上信息后,我们就可以开始给rabbitmq加上load,查看哪些进程被阻塞。由于阻塞这个动作是在credit_flow:send/1这个函数内完成的,并不是每次调用都为阻塞绝大多数时候不会,而且阻塞的状态信息保存在进程字典中,所有直接trace阻塞这个动作不太方便。但由于阻塞之后必然有反阻塞,而这个动作是调用credit_flow:unblock/1完成的,所以我们可以trace这个函数来查看哪些进程被阻塞。
{ok, Log} = file:open("flow_trace", [write, append]),
Tfun = fun(Msg, _) ->
io:format(Log,"~p ~n", [Msg])
end,
dbg:tracer(process, {Tfun, null}),
dbg:tpl({credit_flow, unblock, '_'}, []),
dbg:p(all, c).
通过trace结果发现,瓶颈在rabbit_amqueue_process这个进程。为此eprof看下该进程有没有热点可以优化,profile的结果如下(部分截图)
从上可以看出并没有热点。
对于rabbit_amqqueue_process进程是每个队列都会对应一个这样的进程,那多申明几个队列,多几个这样的进程性能是否能提升呢?通过实验发现答案是肯定的,增加队列能够显著提升qps,但并不是越多越好,太多反而性能会下降,在增加队列数量时发现慢慢瓶颈会转移到rabbit_channel进程上,当然channel也可以声明多个解决该问题,但无论怎么设置多少channel多少queue,瓶颈始终都不会出现在msg_store_persistent进程。
- 大小: 7.7 KB
- 大小: 69.6 KB
分享到:
相关推荐
RabbitMQ流量控制机制分析1
RabbitMQ性能测试报告,对rabbitmq容器内的单机模式和集群模式进行了压力测试盒稳定性测试。
首先总结下前篇博客讲述的rabbitmq流控机制原理---实质上就是通过监控没各进程的mailbox,当某个进程负载过高来不及接收消息时,这个进程的mailbo
比RabbitMQ性能更好的消息队列RocketMQ RabbitMQ 由于持久化场景下的吞吐量只有2.6万 经过 RabbitMQ,Kafka 和 RocketMQ( ActiveMQ 性能较差,暂不考虑)的调研和分析后,我们发现 RocketMQ 比较适合
rabbitmq + spring boot 消息确认、持久化、备用交换机、死信交换机等demo代码
第11周1-第05章节-Python3.5-RabbitMQ消息持久化.mp4
RabbitMQTest 用于RabbitMQ性能测试,可提供对单个队列写入,消费以及对多个队列进行同时读写操作的测试. 可配置连接数,通道数
RabbitMQ_Mirror机制分析
主要介绍了RabbitMQ 的消息持久化与 Spring AMQP 的实现剖析详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
腾讯云在开发云消息队列系统(CMQ)时,对RabbitMQ进行了大量的学习和优化,包括瓶颈分析、内存管理、参数调优等。下文结合Erlang和RabbitMQ架构来分析实践中遇到的问题,并探讨相应的优化方案。图1AMQP模型AMQP是一
通过对这些MQ进行比较分析rabbitmq优势:1.从社区活跃度;2.持久化消息比较(ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制);3.综合...
rabbitmq代码,包含了消息队列的5中方式,topic 等模式,还有保持消息持久化的解决方法(交换机 队列 消息同时持久化),欢迎下载参与讨论,如果有什么疑问或者问题,请@我。
持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。 3.综合技术实现 可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。 RabbitMq ...
RabbitMQ命令行手动创建队列...Durable: 是否需要持久化, true表示持久化, false为非持久化 AutoDelete: 是否自动删除, true表示自动删除, false为非自动删除 补充: 如需指定vhost的队列(默认是/) 加上 --vhost
rabbitmq spring rabbitmq spring rabbitmq spring rabbitmq spring http://knight-black-bob.iteye.com/blog/2304089
包含通过http方式调用rabbitmq对外接口,如查看队列的有效性、获取交换机等信息 包含通过http方式调用rabbitmq对外接口,如查看队列的有效性、获取交换机等信息 包含通过http方式调用rabbitmq对外接口,如查看队列的...
自动进行rabbitmq的多机调用,实现负载均衡,基于python3.6进行实现。
RabbitMQ资料RabbitMQ资料RabbitMQ资料RabbitMQ资料
rabbitmq配置文件,用于rabbitmq管理
SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741