时间:2021-05-19
前言
本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点。本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。
介绍之前首先抛出几个问题:
1. 要做负载均衡,首先要解决的一个问题是什么?
2. 负载均衡是Client端处理还是Broker端处理?
个人理解:
1. 要做负载均衡,首先要做的就是信号收集。
所谓信号收集,就是得知道每一个consumerGroup有哪些consumer,对应的topic是谁。信号收集分为Client端信号收集与Broker端信号收集两个部分。
2. 负载均衡放在Client端处理。
具体做法是:消费者客户端在启动时完善rebalanceImpl实例,同时拷贝订阅信息存放rebalanceImpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有Broker,注册RegisterConsumer,等待上述过程准备好之后在Client端不断执行的负载均衡服务线程从Broker端获取一份全局信息(该consumerGroup下所有的消费Client),然后分配这些全局信息,获取当前客户端分配到的消费队列。
本文具体的内容:
I. copySubscription
Client端信号收集,拷贝订阅信息。
在DefaultMQPushConsumerImpl.start()时,会将消费者的topic订阅关系设置到rebalanceImpl的SubscriptionInner的map中用于负载:
FilterAPI.buildSubscriptionData接口将订阅关系转换为SubscriptionData 数据,其中subString包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。
II. 完善rebalanceImpl实例
Client继续收集信息:
本文以DefaultMQPushConsumerImpl为例,因此this对象类型为DefaultMQPushConsumerImp。
III. this.rebalanceService.start()
开启负载均衡服务。this.rebalanceService是一个RebalanceService实例对象,它继承与ServiceThread,是一个线程类。 this.rebalanceService.start()执行时,也即执行RebalanceService线程体:
IV. this.mqClientFactory.doRebalance
客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:
V. MQConsumerInner.doRebalance
由于本文以DefaultMQPushConsumerImpl消费过程为例,即DefaultMQPushConsumerImpl.doRebalance:
步骤II 中完善了rebalanceImpl实例,为调用rebalanceImpl.doRebalance()提供了初始数据。
rebalanceImpl.doRebalance()过程如下:
public void doRebalance() { // 前文copySubscription中初始化了SubscriptionInnerMap<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic);} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}VI. rebalanceByTopic -- 核心步骤之一
rebalanceByTopic方法中根据消费者的消费类型为BROADCASTING或CLUSTERING做不同的逻辑处理。CLUSTERING逻辑包括BROADCASTING逻辑,本部分只介绍集群消费负载均衡的逻辑。
集群消费负载均衡逻辑主要代码如下(省略了log等代码):
//1.从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//2. 从broker端获取消费该消费组的所有客户端clientIdList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);f (null == mqSet) { ... }if (null == cidAll) { ... }if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll); // 3.创建DefaultMQPushConsumer对象时默认设置为AllocateMessageQueueAveragelyAllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try { // 4.调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,cidAll);} catch (Throwable e) {return;} // 5. 将分配得到的allocateResult 中的队列放入allocateResultSet 集合Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}、 //6. 更新updateProcessQueueboolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);if (changed) {this.messageQueueChanged(topic, mqSet, allocateResultSet);}}注:BROADCASTING逻辑只包含上述的1、6。
集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:
第1点:从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
第2点: 从broker端获取消费该消费组的所有客户端clientId
首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端ChannelInfoTable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:
第4点:调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
注:上图中cId1、cId2、...、cIdN通过 getConsumerIdListByGroup 获取,它们在这个ConsumerGroup下所有在线客户端列表中。
当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
Nginx代理与负载均衡详解nginx除了可以做网站的虚拟主机之外,还可以做代理器,并且,nginx在代理器的基础上可以做到负载均衡。一、代理器:所谓代理器,即
详解负载均衡实现一个域名对应多个IP地址使用负载均衡实现,传统和常规做法,其他方式需要特殊处理。(dns轮询,或者自己做解析)1、一个域名设定多个dns服务或者
Ribbon简介分布式系统中,各个微服务会部署多个实例,如何将服务消费者均匀分摊到多个服务提供者实例上,就要使用到负载均衡器Ribbon是负载均衡器,它提供了很
  负载均衡是一项很实用是网络技术,如果想要实现负载均衡,可以自行调试,也可以使用负载均衡器,这时候大家需要了解什么是负载均衡器以及负载均衡
详解Linux系统配置nginx的负载均衡负载均衡的几种方式:1.轮询:默认按照时间顺序对所有服务器一个一个的访问,如果有服务器宕机,会自动剔除;2.weigh