rocketmq源码-consumer负载均衡逻辑

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

前言

这篇笔记主要记录consumer在启动过程中负载均衡的逻辑多个消费者组成一个消费者组对于集群模式同一个消费者组中的多个消费者共同消费一个topic下的所有消息所以每个consumer可能会处理N个messageQueue至于哪个consumer消费哪个messageQueue是由负载均衡策略决定的

源码

在消费者启动的时候会通过负载均衡策略来决定当前消费者处理哪几个messageQueue入口是

this.rebalanceService.start();

在run()方法中会通过while循环每20S去进行一次负载均衡计算
在这里插入图片描述

在这里插入图片描述
无论是pull模式还是push模式都会调用到

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

这个方法是按照topic维度进行负载均衡
在这里插入图片描述

广播模式

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

在这个方法中会先判断当前消费者的模式是集群模式还是广播模式
对于广播模式无需进行负载均衡因为广播模式每个消费者都会消费所有的消息。也就是说topic中的所有messageQueue都是需要消费者去处理的
在这里插入图片描述

集群模式

对于集群模式最重要的就是这里根据负载均衡策略进行计算的逻辑
在这里插入图片描述

这里是根据负载均衡之后得到的结果然后更新一些信息
在这里插入图片描述

这里更新的逻辑很重要对于push模式每个messageQueue会对应一个pullRequest请求然后把pullRequest请求放到队列之后线程会不停的从queue中拉取pullRequest然后请求broker
updateProcessQueueTableInRebalance在这个方法中就会去根据messageQueue构建pullRequest请求然后放到queue中

对于pull模式是需要启动异步的pullTaskImpl任务在这个任务中会不停的去broker拉取消息然后放到消费者主动拉取的队列中
messageQueueChanged() 这个方法就会根据messageQueue启动pullTaskImpl

所以对于consumer我们会发现对于广播模式无需进行负载均衡每个消费者都会处理messageQueue中的消息对于集群模式同一个consumeGroup中的消费者会分摊一个topic中所有的messageQueue

负载均衡策略

在consumer进行负载均衡时默认提供了多个负载均衡策略但是还没有仔细研究这几个负载均衡策略的细节先列举出来

AllocateMachineRoomNearby

就近机房

AllocateMessageQueueAveragely

平均分配算法

AllocateMessageQueueAveragelyByCircle

平均轮询分配

AllocateMessageQueueByConfig

自定义配置

AllocateMessageQueueByMachineRoom

指定机房

AllocateMessageQueueConsistentHash

一致性hash算法

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6