Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

历史篇章

Nacos 客户端服务注册源码分析-篇一

Nacos 客户端服务注册源码分析-篇二

Nacos 客户端服务注册源码分析-篇三

Nacos 服务端服务注册源码分析-篇四

Nacos 服务端健康检查-篇五

Nacos 客户端服务发现源码分析-篇六

Nacos 客户服务发现续接

之前在第六篇的时候我们探究了 Nacos 客户端的服务发现源码的具体实现流程。

image-20211022150934273

最终是调用的 NamingService 的 getAllInstance 方法获取了所有的实例列表而客户端实例列表是封装在一个 List <Instance> 的集合当中的。

//获取所以的实例信息,这里的实例信息就是客户端的信息
List<Instance> list = namingService.getAllInstances("nacos.test.1");

最终是调用 NamingClientProxyDelegate 类下的 subscribe 方法完成订阅并返回实体信息的。

if (null == serviceInfo) {
    //如果本地的缓存不存在服务信息则进行订阅
    //查找到最新的实例信息
    serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}

由于这一部分的内容在之前的第六篇 Nacos 客户端服务发现源码分析-篇六 已经是分析过了的所以这里我就不再进行赘述这一块的内容了感兴趣的可以返回调转到指定的篇章进行浏览即可。

可能有些人好奇哎。标题为什么称作 Nacos 客户端的服务发现与服务订阅机制的纠缠呢

哈哈其实他们两者是有联系的具体是什么联系就在我们接下来要探究 Nacos 客户端的服务订阅当中有其答案。

既然如此我们就今天研究一把 Nacos 客户端服务订阅事件机制的具体实现叭。。。


Nacos 客户端服务订阅机制核心流程

首先先谈谈什么是订阅生活中那些那些方面体现着类似于订阅这样的概念只要真正的理解了订阅这一概念我们才能更好的进行接下来的内容。

订阅其实简单与生活对比来讲其实就是预定。当然预定的这个动作有发出者就必须有动作的承受者举个栗子外出旅游我们可以会定酒店那么酒店的服务者就是动作的承受者订酒店的对象就是动作的发出者再比如我们的常常提到的订阅一个期刊如果这个期刊的周期是一年而该期刊每月都会推送该期的内容那么订阅期刊的对象就是动作发出者发布期刊的对象就是动作承受者。

订阅者订阅承受者在接受到订阅者的指定命令后周期性的完成指定的任务这就是订阅。

所以对于注册中心 Nacos 也是同样提供了这样的服务的。。。

大致的流程就是 客户端 通过一个定时的任务每 6 秒从注册中心获取当前的实例列表当发现实例发生了变化的时候发布变更事件。对于订阅者而言完成业务部分的处理更新实例更新本地缓存。

我们可以通过一个流程图观察其具体的实现。。。 原图点这里

image-20230421231128284

其实从图中已经大致的清楚了客户端的这个订阅的整体流程。

我们从源码的角度进分析一波。

进入我们的 NacosNamingService 类当中

//在 NacosNamingService 中暴露了许多的重载的 subscribe 方法
//这里 NacosNamingService 类下的 subscribe 方法 和 NamingService 下的 getAllInstances 发现获取实例列表的方法重载的过程都是一样的
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {
    //创建一个空的集群对象集合
    subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
   //设置默认的群组 DEFAULT_GROUP 默认群组
   subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    //如果事件监听器为空 则返回
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    //注册监听器
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    //对于订阅的本质就是服务的发现的一种方式,也就是服务在发现的时候执行订阅方法,同时触发定时任务去服务端拉去数据
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

可以看到的是 NacosNamingService 中提供了大量的 subscribe 的重载方法这些重载一些默认的参数。

走到 subscribe 方法的尽头在该方法内可以看到有两个核心的方法 InstanceChangeNotifier 类下的registerListener 注册监听器方法与 NamingClientProxy 类下的 subscribe 订阅方法。我们就探究一下这两个方法具体实现以及这两个方法的功能作用是什么?

changeNotifier.registerListener 注册监听器

/**
 * register listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}
/**
 * deregister listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        return;
    }
    eventListeners.remove(listener);
    if (CollectionUtils.isEmpty(eventListeners)) {
        listenerMap.remove(key);
    }
}

可以看到在 InstancesChangeNotifier 类下有两个关于监听器的方法注册监听与取消监听。

注册监听其实就是在监听集合对象 ConcurrentHashSet<EventListener> 中添加一个监听事件而对于取消监听是通过 key 将需要移除的监听事件从集合当中移除。

那么关于这个监听事件添加都监听集合当中后这个监听事件是如何触发又如何调用执行的呢这个。。。哈哈留一个坑其实这一块我自己还没有研究的特别清楚。。。

接下来我们看看另一个重要的方法 clientProxy.subscribe() 服务订阅

clientProxy.subscribe 服务订阅

其实玩到这里呢也就与我们的标题 Nacos 客户的服务发现与客户端服务订阅机制的纠缠就关联了起来为什么这么说呢那让我们看看 clientProxy.subscribe 方法内部的具体实现咯。。。

//其实走到这里就可以看到,该方法与之前的服务发现调用的是同一个方法,这里其实在做的是服务列表的查询
//查询与订阅都调用了同样的而方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //开启定时任务调度 UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //获取缓存中的 ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        //如果缓存中没有数据,则进行订阅逻辑处理,基于 gRPC 协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //serviceInfo 本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

哈哈 看到这一块的代码是不是有一种似曾相识的感觉呢

对咯没错在第六篇 Nacos 客户端服务发现源码分析 当中的发现获取实例列表的时候在 NacosNamingService 中的 getAllInstances 方法多次重载之后调用的 clientProxy.sunscribe 调用的是同一个方法。

所以其实到这里是可以得到一个结论的就是 在 Nacos 客户端的查询与订阅服务都是调用了同样的方法的

这就解释了为什么标题 Nacos 客户端的服务发现与服务订阅机制是冥冥之中有种联系在一起的呢。

我们还记得流程图中有一个关于 UpdateTask 定时任务调度吗

让我们接下来看看这个里面到底在做什么呢

定时任务执行内容

//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
/**
 * Schedule update if absent.
 *
 * @param serviceName service name
 * @param groupName   group name
 * @param clusters    clusters
 */
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clu
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    //双重检测锁
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //构建一个定时处理的任务,最终这里的 future 就是构建的定时任务,该任务用于在 run 中执行
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}
public UpdateTask(String serviceName, String groupName, String clusters) {
    this.serviceName = serviceName;
    this.groupName = groupName;
    this.clusters = clusters;
    this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    //执行延时函数,延时时间为 1000L * MICRO_SCALE = 1S
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到在第二片代码中有这样一个方法addTask () 对的没错就是将通过 serviceName、groupName、cliusters 构建一个 UbdateTask 的更新任务对象然后将其对象构建成一个未来执行的定时任务添加到执行的集合当中最终是由 ServiceInfoUpdateService 中的 run 方法去执行。

定时任务 run() 方法的执行

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        //判断更改通知对象 serviceName 是否订阅
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKe
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }
        //获取缓存中的信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //缓存为空
        if (serviceObj == null) {
            //生成一个服务实例对象
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //处理更新或添加到本地的缓存当中
            serviceInfoHolder.processServiceInfo(serviceObj);
            //更新最后一次的时间
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
        //过期服务如果说服务的更新时间是小于等于缓存刷新的时间的
        //那就说明本地的缓存不是最新的,而当前的服务实例信息也不是客户端最新的,
        //这个时候就需要从 注册中心 中重新的进行一次查询获取最的服务实例信息并更新本地缓存
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //更新处理本地的缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新更新的当前时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        //下次的更新缓存时间设置为缓存中的默认基数 (cacheMillis = 1000) * 6
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为 0
        // 可能会出现一些异常,比如调用 queryInstancesOfService 方法的时候
        // 没有 ServiceInfo 连接不到则会出现异常
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // 下次调度刷新时间下次执行的时间与failCount 失败的次数有关failCount=0则下次调度时间为6秒最长为1分钟
        // 当无异常的情况下 failCount 始终都是 0 则默认的时间一直都 6 s
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

未完待续。。。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6