RocketMQ5.0.0路由中心NameServer

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、NameServer概览

NameServer是RocketMQ的注册中心是消息存储Broker、生产者、消费者沟通的桥梁。NameServer集群之间是相互独立的Broker启动时向所有NameServer注册中心注册。通过DLedger构建NameServer集群实现如主从切换等功能。

启动NameServer启动注册中心维护路由信息、周期检测Broker发送的心跳包

Broker注册Broker启动时向所有NameServer发送心跳包、长连接NameServer

Broker删除NameServer启动定时任务检测Broker是否发送心跳包

生产者发送消息创建Topic时向NameServer获取Broker路由信息

发送消息时直接向Broker发送消息并消息ACK确认

消费者消费消息根据PUSH/PULL模式消费消息消费ACK确认

二、启动NameServer

NameServer启动入口是org.apache.rocketmq.namesrv.NamesrvStartup#main该方法调用链如下图。核心方法是org.apache.rocketmq.namesrv.NamesrvController#initialize。

// 初始化NameServer控制器
public boolean initialize() {
    // kvConfig配置加载
    loadConfig();
    // 初始化Netty的server、client
    initiateNetworkComponents();
    // 初始化defaultExecutor、clientRequestExecutor线程池
    initiateThreadExecutors();
    // 路由注册仅支持临时路由
    registerProcessor();
    // 启动定时任务5s扫描brokerLiveTable10min打印日志
    startScheduleService();
    // 初始化SSL上下文
    initiateSslContext();
    // 注册RPC钩子
    initiateRpcHooks();
    return true;
}

三、路由元数据

NameServer管理路由实现类org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager该类中主要属性如下代码所示。

// 主题的路由列表信息消息发送时根据这个列表进行负载均衡
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// Broker的基础信息
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker所在的集群信息
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker的状态信息心跳检查
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker的FilterServer列表
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// 主题的每个broker的队列映射信息
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

// 定时批量移除过期Broker服务线程
private final BatchUnregistrationService unRegisterService;

其中brokerLiveTable是Broker向NameServer注册的心跳包信息缓存表如下UML图所示。NameServer收到Broker发送的心跳包则更新BrokerLiveInfo下的lastUpdateTimestamp。

四、Broker注册

1.Broker发送心跳包

Broker启动时向所有NameServer注册并启动定时任务30s周期发送心跳包。下图是org.apache.rocketmq.broker.BrokerStartup#main的调用链。

org.apache.rocketmq.broker.BrokerController#start方法中向所有NameServer注册并启动定时任务30s周期发送心跳包代码如下。

if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
    changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
    this.registerBrokerAll(true, false, true);
}

// broker启动定时任务每个30s向所有NameServer发送心跳包
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
    @Override
    public void run2() {
        try {
            if (System.currentTimeMillis() < shouldStartTime) {
                BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                return;
            }
            if (isIsolated) {
                BrokerController.LOG.info("Skip register for broker is isolated");
                return;
            }
            // broker注册
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            BrokerController.LOG.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

if (this.brokerConfig.isEnableSlaveActingMaster()) {
    scheduleSendHeartbeat();

    scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
        @Override
        public void run2() {
            try {
                BrokerController.this.syncBrokerMemberGroup();
            } catch (Throwable e) {
                BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
            }
        }
    }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

2.NameServer接收到心跳包

接受请求的总入口org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest注册Broker请求码为RequestCode.REGISTER_BROKERNameServer注册Broker的核心方法是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker如下代码所示。

/*
    注册broker处理心跳包
    NameServer与broker保持长连接NameServer每次收到心跳包将更新关于broker相关信息
        topicQueueTable 、 brokerAddrTable 、 brokerLiveTable 、 filterServerTable
    注意
        a. 注册时加上写锁防止并发修改主题路由表
        b. 维护BrokerData先从brokerAddrTable获取不存在则新增
        c. 是否第一次注册是否主从主从是否切换
        d. 创建或更新topic路由元数据填充topicConfigTable即为默认主题自动注册路由信息
        e. 更新broker存活信息BrokerLiveInfo执行路由删除的重要依据
 */
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final String zoneName,
    final Long timeoutMillis,
    final Boolean enableActingMaster,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        // 当前线程获取写锁除非当前线程中断
        this.lock.writeLock().lockInterruptibly();

        //init or update the cluster info
        // 没有则创建有则添加
        Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
        brokerNames.add(brokerName);

        boolean registerFirst = false;

        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        if (null == brokerData) {
            // true时第一次注册
            registerFirst = true;
            brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
            this.brokerAddrTable.put(brokerName, brokerData);
        }

        boolean isOldVersionBroker = enableActingMaster == null;
        brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
        brokerData.setZoneName(zoneName);

        Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

        boolean isMinBrokerIdChanged = false;
        long prevMinBrokerId = 0;
        if (!brokerAddrsMap.isEmpty()) {
            prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
        }

        if (brokerId < prevMinBrokerId) {
            isMinBrokerIdChanged = true;
        }

        //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
        //The same IP:PORT must only have one record in brokerAddrTable
        brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());

        //If Local brokerId stateVersion bigger than the registering one,
        String oldBrokerAddr = brokerAddrsMap.get(brokerId);
        if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
            BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));

            if (null != oldBrokerInfo) {
                long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
                long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
                if (oldStateVersion > newStateVersion) {
                    log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                            "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                        clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                    //Remove the rejected brokerAddr from brokerLiveTable.
                    brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                    return result;
                }
            }
        }

        if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
            log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
                topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
            return null;
        }

        String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
        registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

        // 主broker
        boolean isMaster = MixAll.MASTER_ID == brokerId;
        // 首要的从broker
        boolean isPrimeSlave = !isOldVersionBroker && !isMaster
            && brokerId == Collections.min(brokerAddrsMap.keySet());

        if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
            /*
                创建或更新Topic路由元数据填充topicConfigTable即为主题自动注册路由信息
             */

            // topic路由元数据
            ConcurrentMap<String, TopicConfig> tcTable =
                topicConfigWrapper.getTopicConfigTable();
            if (tcTable != null) {
                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                    if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                        topicConfigWrapper.getDataVersion(), brokerName,
                        entry.getValue().getTopicName())) {
                        final TopicConfig topicConfig = entry.getValue();
                        if (isPrimeSlave) {
                            // Wipe write perm for prime slave
                            topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                        }
                        // 创建或更新topic自动注册路由信息
                        this.createAndUpdateQueueData(brokerName, topicConfig);
                    }
                }
            }

            if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
                Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
                //the topicQueueMappingInfoMap should never be null, but can be empty
                for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                    if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                        topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                    }
                    //Note asset brokerName equal entry.getValue().getBname()
                    //here use the mappingDetail.bname
                    topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                }
            }
        }

        /*
            更新broker存活信息表默认120s后执行删除路由信息的重要依据心跳检测
         */
        BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
        BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
            new BrokerLiveInfo(
                System.currentTimeMillis(),
                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
                topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
        if (null == prevBrokerLiveInfo) {
            log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
        }

        // 注册broker的FilterServer列表一个broker会关联多个FilterServer消息过滤服务器
        if (filterServerList != null) {
            if (filterServerList.isEmpty()) {
                this.filterServerTable.remove(brokerAddrInfo);
            } else {
                this.filterServerTable.put(brokerAddrInfo, filterServerList);
            }
        }

        // 该broker为从则更新主broker地址
        if (MixAll.MASTER_ID != brokerId) {
            String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
            if (masterAddr != null) {
                BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
                BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
                if (masterLiveInfo != null) {
                    result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                    result.setMasterAddr(masterAddr);
                }
            }
        }

        if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
            notifyMinBrokerIdChanged(brokerAddrsMap, null,
                this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }

    return result;
}

五、路由删除

NameServer每隔5s扫描brokerLiveTable状态表如BrokerLiveInfo的lastUpdateTimestamp时间戳距当前时间超过120s则Broker失效移除该Broker并关闭与Broker连接同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ有两种情况触发删除Broker路由状态

  • NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差如果差值大于120s则移除该Broker信息

  • Broker正常关闭执行unregisterBroker指令。

NameServer定时扫描的核心实现方法是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker代码如下。

// 定时扫描brokerLiveTablebroker存活表删除心跳过期的broker
public void scanNotActiveBroker() {
    try {
        log.info("start scanNotActiveBroker");
        for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
            // 上次心跳检测更新时间
            long last = next.getValue().getLastUpdateTimestamp();
            // 心跳过期超时时间
            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
            // 过期判定
            if ((last + timeoutMillis) < System.currentTimeMillis()) {
                // 关闭当前broker的Channel关闭与broker的长连接
                RemotingUtil.closeChannel(next.getValue().getChannel());
                log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
                // 删除与该broker的路由信息
                this.onChannelDestroy(next.getKey());
            }
        }
    } catch (Exception e) {
        log.error("scanNotActiveBroker exception", e);
    }
}

六、路由发现

1.生产者路由发现

生产者生产消息时根据topic会到NameServer获取Broker路由信息缓存到本地实体类是主题发布信息org.apache.rocketmq.client.impl.producer.TopicPublishInfo其属性如下。

// 是否顺序消息
private boolean orderTopic = false;
// 是否有topic路由信息
private boolean haveTopicRouterInfo = false;
// topic的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每选择一次消息队列该值增加1ThreadLocal<Integer>
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
// 主题路由信息
private TopicRouteData topicRouteData;

获取主题发布信息的核心实现方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo代码如下。

/**
 * 获取主题路由发布信息
 * 获取逻辑
 * step1: 本地生产者有缓存该topic路由信息和消息队列则直接返回
 * step2: 本地生产者没有缓存则从NameServer查找主题路由信息
 * step3         没有缓存从NameServer查找不到则isDefault是否采用默认主题路由defaultMQProducer.getCreateTopicKey() —— AUTO_CREATE_TOPIC_KEY_TOPIC
 * @param topic 主题
 * @return 主题发布信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 获取生产者缓存的主题发布信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 生产者没有主题发布信息或没有消息队列则创建并更新NameServer主题路由信息
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 本地生产者创建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 更新NameServer主题路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 本地有缓存则直接获取
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault为true采用默认主题发送消息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

需要注意的是RocketMQ路由发现是非实时的当Topic路由出现变化后NameServer不主动推送给客户端而是由客户端定时拉取主题最新的路由。

2.消费者路由发现

消费者路由发现逻辑实现比较复杂已消费组的模式展开详细见后续章节。

七、参考资料

https://www.cnblogs.com/qdhxhz/p/11094624.html

https://blog.csdn.net/yuanchangliang/article/details/119155557

https://blog.csdn.net/m0_37543627/article/details/128542505

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6