千家信息网

nacos ServiceManager的updateInstance有什么作用

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容介绍了"nacos ServiceManager的updateInstance有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
千家信息网最后更新 2025年12月02日nacos ServiceManager的updateInstance有什么作用

本篇内容介绍了"nacos ServiceManager的updateInstance有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下nacos ServiceManager的updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component@DependsOn("nacosApplicationContext")public class ServiceManager implements RecordListener {    /**     * Map>     */    private Map> serviceMap = new ConcurrentHashMap<>();    private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);    private Synchronizer synchronizer = new ServiceStatusSynchronizer();    private final Lock lock = new ReentrantLock();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private DistroMapper distroMapper;    @Autowired    private ServerListManager serverListManager;    @Autowired    private PushService pushService;    private final Object putServiceLock = new Object();    //......    public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {        Service service = getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }        if (!service.allIPs().contains(instance)) {            throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);        }        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);        Service service = getService(namespaceId, serviceName);        List instanceList = addIpAddresses(service, ephemeral, ips);        Instances instances = new Instances();        instances.setInstanceList(instanceList);        consistencyService.put(key, instances);    }    public List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);    }    public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));        Map oldInstanceMap = new HashMap<>(16);        List currentIPs = service.allIPs(ephemeral);        Map map = new ConcurrentHashMap<>(currentIPs.size());        for (Instance instance : currentIPs) {            map.put(instance.toIPAddr(), instance);        }        if (datum != null) {            oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);        }        // use HashMap for deep copy:        HashMap instanceMap = new HashMap<>(oldInstanceMap.size());        instanceMap.putAll(oldInstanceMap);        for (Instance instance : ips) {            if (!service.getClusterMap().containsKey(instance.getClusterName())) {                Cluster cluster = new Cluster(instance.getClusterName(), service);                cluster.init();                service.getClusterMap().put(instance.getClusterName(), cluster);                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                    instance.getClusterName(), instance.toJSON());            }            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {                instanceMap.remove(instance.getDatumKey());            } else {                instanceMap.put(instance.getDatumKey(), instance);            }        }        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {            throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "                + JSON.toJSONString(instanceMap.values()));        }        return new ArrayList<>(instanceMap.values());    }    //...... }
  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法

  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD

  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.distro.notifier");            return t;        }    });    @Autowired    private DistroMapper distroMapper;    @Autowired    private DataStore dataStore;    @Autowired    private TaskDispatcher taskDispatcher;    @Autowired    private DataSyncer dataSyncer;    @Autowired    private Serializer serializer;    @Autowired    private ServerListManager serverListManager;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private GlobalConfig globalConfig;    private boolean initialized = false;    public volatile Notifier notifier = new Notifier();    private Map> listeners = new ConcurrentHashMap<>();    private Map syncChecksumTasks = new ConcurrentHashMap<>(16);    //......    public void put(String key, Record value) throws NacosException {        onPut(key, value);        taskDispatcher.addTask(key);    }    public void onPut(String key, Record value) {        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {            Datum datum = new Datum<>();            datum.value = (Instances) value;            datum.key = key;            datum.timestamp.incrementAndGet();            dataStore.put(key, datum);        }        if (!listeners.containsKey(key)) {            return;        }        notifier.addTask(key, ApplyAction.CHANGE);    }    //......}
  • DistroConsistencyServiceImpl的put方法会先执行onPut,然后执行taskDispatcher.addTask(key);onPut在判断key是ephemeralInstanceListKey时会创建一个Datum,递增其timestamp,然后放到dataStore中,最后调用notifier.addTask(key, ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {        private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);        private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);        public void addTask(String datumKey, ApplyAction action) {            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                return;            }            if (action == ApplyAction.CHANGE) {                services.put(datumKey, StringUtils.EMPTY);            }            tasks.add(Pair.with(datumKey, action));        }        public int getTaskSize() {            return tasks.size();        }        @Override        public void run() {            Loggers.DISTRO.info("distro notifier started");            while (true) {                try {                    Pair pair = tasks.take();                    if (pair == null) {                        continue;                    }                    String datumKey = (String) pair.getValue0();                    ApplyAction action = (ApplyAction) pair.getValue1();                    services.remove(datumKey);                    int count = 0;                    if (!listeners.containsKey(datumKey)) {                        continue;                    }                    for (RecordListener listener : listeners.get(datumKey)) {                        count++;                        try {                            if (action == ApplyAction.CHANGE) {                                listener.onChange(datumKey, dataStore.get(datumKey).value);                                continue;                            }                            if (action == ApplyAction.DELETE) {                                listener.onDelete(datumKey);                                continue;                            }                        } catch (Throwable e) {                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                        }                    }                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                            datumKey, count, action.name());                    }                } catch (Throwable e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                }            }        }    }
  • Notifier的addTask方法对于action为ApplyAction.CHANGE的且不在services当中的会放入到services当中,最后添加到tasks;run方法会不断从tasks取出数据,执行相应的回调

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Componentpublic class TaskDispatcher {    @Autowired    private GlobalConfig partitionConfig;    @Autowired    private DataSyncer dataSyncer;    private List taskSchedulerList = new ArrayList<>();    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();    @PostConstruct    public void init() {        for (int i = 0; i < cpuCoreCount; i++) {            TaskScheduler taskScheduler = new TaskScheduler(i);            taskSchedulerList.add(taskScheduler);            GlobalExecutor.submitTaskDispatch(taskScheduler);        }    }    public void addTask(String key) {        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);    }    public class TaskScheduler implements Runnable {        private int index;        private int dataSize = 0;        private long lastDispatchTime = 0L;        private BlockingQueue queue = new LinkedBlockingQueue<>(128 * 1024);        public TaskScheduler(int index) {            this.index = index;        }        public void addTask(String key) {            queue.offer(key);        }        public int getIndex() {            return index;        }        @Override        public void run() {            List keys = new ArrayList<>();            while (true) {                try {                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),                        TimeUnit.MILLISECONDS);                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                        Loggers.DISTRO.debug("got key: {}", key);                    }                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {                        continue;                    }                    if (StringUtils.isBlank(key)) {                        continue;                    }                    if (dataSize == 0) {                        keys = new ArrayList<>();                    }                    keys.add(key);                    dataSize++;                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {                        for (Server member : dataSyncer.getServers()) {                            if (NetUtils.localServer().equals(member.getKey())) {                                continue;                            }                            SyncTask syncTask = new SyncTask();                            syncTask.setKeys(keys);                            syncTask.setTargetServer(member.getKey());                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));                            }                            dataSyncer.submit(syncTask, 0);                        }                        lastDispatchTime = System.currentTimeMillis();                        dataSize = 0;                    }                } catch (Exception e) {                    Loggers.DISTRO.error("dispatch sync task failed.", e);                }            }        }    }}
  • TaskDispatcher的addTask方法会从taskSchedulerList获取指定的TaskScheduler,然后执行其addTask方法;TaskScheduler的addTask方法会往queue中添加数据,而run方法则不断从queue取数据,然后通过dataSyncer执行syncTask

SyncTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {    private List keys;    private int retryCount;    private long lastExecuteTime;    private String targetServer;    public List getKeys() {        return keys;    }    public void setKeys(List keys) {        this.keys = keys;    }    public int getRetryCount() {        return retryCount;    }    public void setRetryCount(int retryCount) {        this.retryCount = retryCount;    }    public long getLastExecuteTime() {        return lastExecuteTime;    }    public void setLastExecuteTime(long lastExecuteTime) {        this.lastExecuteTime = lastExecuteTime;    }    public String getTargetServer() {        return targetServer;    }    public void setTargetServer(String targetServer) {        this.targetServer = targetServer;    }}
  • SyncTask包含了keys、targetServer属性,其中targetServer用于告诉DataSyncer该往哪个server执行sync操作

小结

  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法

  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD

  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

"nacos ServiceManager的updateInstance有什么作用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

方法 数据 作用 不断 内容 参数 更多 知识 类型 更新 实用 学有所成 接下来 会创 困境 实际 小结 属性 情况 文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 电脑网络安全证书出现问题 深渊数据库五间竞速 深圳市财信网络技术有限公司 数字化转型传统数据库问题 吃鸡游戏用什么软件开发 北京班信息网络技术有限公司 崇明区品质数据库服务商服务电话 网络安全与国际贸易 金山网络技术有限公司 人力资源网络技术专业怎么样 泰国的服务器租用 城管局网络安全工作总结 七日杀服务器后台管理 小学生网络安全知识判断题 我的世界末日服务器手机版推荐 网吧怎么制作电影服务器 祥云杯网络安全比赛题 ovid数据库的检索技术 花雨落服务器ip 数据库自动化巡检脚本 c 如何监听数据库变化 数据库数据论文 数据库原理岭南师范学院答案 嘉定区新能源软件开发销售厂家 网络安全与国际贸易 某网站的数据库泄露 武汉时捷软件开发有限公司 mc网易版能玩速建的服务器 职工信息数据库 行星边际2服务器地址
0