千家信息网

nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容主要讲解"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小
千家信息网最后更新 2025年12月03日nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么

本篇内容主要讲解"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么"吧!

本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    @PostConstruct    public void init() {        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());        try {            Loggers.SRV_LOG.info("listen for service meta change");            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);        } catch (NacosException e) {            Loggers.SRV_LOG.error("listen for service meta change failed!");        }    }    //......}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

UpdatedServiceProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class UpdatedServiceProcessor implements Runnable {        //get changed service from other server asynchronously        @Override        public void run() {            ServiceKey serviceKey = null;            try {                while (true) {                    try {                        serviceKey = toBeUpdatedServicesQueue.take();                    } catch (Exception e) {                        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");                    }                    if (serviceKey == null) {                        continue;                    }                    GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));                }            } catch (Exception e) {                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);            }        }    }
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

ServiceUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceUpdater implements Runnable {        String namespaceId;        String serviceName;        String serverIP;        public ServiceUpdater(ServiceKey serviceKey) {            this.namespaceId = serviceKey.getNamespaceId();            this.serviceName = serviceKey.getServiceName();            this.serverIP = serviceKey.getServerIP();        }        @Override        public void run() {            try {                updatedHealthStatus(namespaceId, serviceName, serverIP);            } catch (Exception e) {                Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",                    serviceName, serverIP, e);            }        }    }
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

ServiceManager.updatedHealthStatus

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    //......    public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {        Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));        JSONObject serviceJson = JSON.parseObject(msg.getData());        JSONArray ipList = serviceJson.getJSONArray("ips");        Map ipsMap = new HashMap<>(ipList.size());        for (int i = 0; i < ipList.size(); i++) {            String ip = ipList.getString(i);            String[] strings = ip.split("_");            ipsMap.put(strings[0], strings[1]);        }        Service service = getService(namespaceId, serviceName);        if (service == null) {            return;        }        boolean changed = false;        List instances = service.allIPs();        for (Instance instance : instances) {            boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));            if (valid != instance.isHealthy()) {                changed = true;                instance.setHealthy(valid);                Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",                    serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"),                    instance.getIp(), instance.getPort(), instance.getClusterName());            }        }        if (changed) {            pushService.serviceChanged(service);        }        StringBuilder stringBuilder = new StringBuilder();        List allIps = service.allIPs();        for (Instance instance : allIps) {            stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");        }        if (changed && Loggers.EVT_LOG.isDebugEnabled()) {            Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}",                service.getNamespaceId(), service.getName(), stringBuilder.toString());        }    }    //......}
  • updatedHealthStatus方法会从synchronizer获取msg,组装ipsMap,之后通过service.allIPs()获取instances信息,然后遍历instances从ipsMap获取实例的valid状态,如果与instance的isHealthy()对不上则标记为changed,更新instance的healthy;对于changed的则通过pushService.serviceChanged(service)发布事件,最后打印日志

小结

  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

到此,相信大家对"nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0