千家信息网

nacos中DistroConsistencyServiceImpl的原理和作用是什么

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容介绍了"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何
千家信息网最后更新 2025年12月03日nacos中DistroConsistencyServiceImpl的原理和作用是什么

本篇内容介绍了"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下nacos的DistroConsistencyServiceImpl

ConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java

public interface ConsistencyService {    /**     * Put a data related to a key to Nacos cluster     *     * @param key   key of data, this key should be globally unique     * @param value value of data     * @throws NacosException     * @see     */    void put(String key, Record value) throws NacosException;    /**     * Remove a data from Nacos cluster     *     * @param key key of data     * @throws NacosException     */    void remove(String key) throws NacosException;    /**     * Get a data from Nacos cluster     *     * @param key key of data     * @return data related to the key     * @throws NacosException     */    Datum get(String key) throws NacosException;    /**     * Listen for changes of a data     *     * @param key      key of data     * @param listener callback of data change     * @throws NacosException     */    void listen(String key, RecordListener listener) throws NacosException;    /**     * Cancel listening of a data     *     * @param key      key of data     * @param listener callback of data change     * @throws NacosException     */    void unlisten(String key, RecordListener listener) throws NacosException;    /**     * Tell the status of this consistency service     *     * @return true if available     */    boolean isAvailable();}
  • ConsistencyService定义了put、remove、get、listen、unlisten、isAvailable方法

EphemeralConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java

public interface EphemeralConsistencyService extends ConsistencyService {}
  • EphemeralConsistencyService接口继承了ConsistencyService接口

DistroConsistencyServiceImpl

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.distro.notifier");            return t;        }    });    @Autowired    private DistroMapper distroMapper;    @Autowired    private DataStore dataStore;    @Autowired    private TaskDispatcher taskDispatcher;    @Autowired    private DataSyncer dataSyncer;    @Autowired    private Serializer serializer;    @Autowired    private ServerListManager serverListManager;    @Autowired    private SwitchDomain switchDomain;    @Autowired    private GlobalConfig globalConfig;    private boolean initialized = false;    public volatile Notifier notifier = new Notifier();    private Map> listeners = new ConcurrentHashMap<>();    private Map syncChecksumTasks = new ConcurrentHashMap<>(16);    @PostConstruct    public void init() {        GlobalExecutor.submit(new Runnable() {            @Override            public void run() {                try {                    load();                } catch (Exception e) {                    Loggers.DISTRO.error("load data failed.", e);                }            }        });        executor.submit(notifier);    }    public void load() throws Exception {        if (SystemUtils.STANDALONE_MODE) {            initialized = true;            return;        }        // size = 1 means only myself in the list, we need at least one another server alive:        while (serverListManager.getHealthyServers().size() <= 1) {            Thread.sleep(1000L);            Loggers.DISTRO.info("waiting server list init...");        }        for (Server server : serverListManager.getHealthyServers()) {            if (NetUtils.localServer().equals(server.getKey())) {                continue;            }            if (Loggers.DISTRO.isDebugEnabled()) {                Loggers.DISTRO.debug("sync from " + server);            }            // try sync data from remote server:            if (syncAllDataFromRemote(server)) {                initialized = true;                return;            }        }    }    //......    public boolean syncAllDataFromRemote(Server server) {        try {            byte[] data = NamingProxy.getAllData(server.getKey());            processData(data);            return true;        } catch (Exception e) {            Loggers.DISTRO.error("sync full data from " + server + " failed!", e);            return false;        }    }    public void processData(byte[] data) throws Exception {        if (data.length > 0) {            Map> datumMap =                serializer.deserializeMap(data, Instances.class);            for (Map.Entry> entry : datumMap.entrySet()) {                dataStore.put(entry.getKey(), entry.getValue());                if (!listeners.containsKey(entry.getKey())) {                    // pretty sure the service not exist:                    if (switchDomain.isDefaultInstanceEphemeral()) {                        // create empty service                        Loggers.DISTRO.info("creating service {}", entry.getKey());                        Service service = new Service();                        String serviceName = KeyBuilder.getServiceName(entry.getKey());                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());                        service.setName(serviceName);                        service.setNamespaceId(namespaceId);                        service.setGroupName(Constants.DEFAULT_GROUP);                        // now validate the service. if failed, exception will be thrown                        service.setLastModifiedMillis(System.currentTimeMillis());                        service.recalculateChecksum();                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)                            .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);                    }                }            }            for (Map.Entry> entry : datumMap.entrySet()) {                if (!listeners.containsKey(entry.getKey())) {                    // Should not happen:                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());                    continue;                }                try {                    for (RecordListener listener : listeners.get(entry.getKey())) {                        listener.onChange(entry.getKey(), entry.getValue().value);                    }                } catch (Exception e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);                    continue;                }                // Update data store if listener executed successfully:                dataStore.put(entry.getKey(), entry.getValue());            }        }    }    //......    @Override    public void put(String key, Record value) throws NacosException {        onPut(key, value);        taskDispatcher.addTask(key);    }    @Override    public void remove(String key) throws NacosException {        onRemove(key);        listeners.remove(key);    }    @Override    public Datum get(String key) throws NacosException {        return dataStore.get(key);    }    //......    @Override    public void listen(String key, RecordListener listener) throws NacosException {        if (!listeners.containsKey(key)) {            listeners.put(key, new CopyOnWriteArrayList<>());        }        if (listeners.get(key).contains(listener)) {            return;        }        listeners.get(key).add(listener);    }    @Override    public void unlisten(String key, RecordListener listener) throws NacosException {        if (!listeners.containsKey(key)) {            return;        }        for (RecordListener recordListener : listeners.get(key)) {            if (recordListener.equals(listener)) {                listeners.get(key).remove(listener);                break;            }        }    }    @Override    public boolean isAvailable() {        return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());    }    //......}
  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口

  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier

  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

Notifier

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {        private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);        private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);        public void addTask(String datumKey, ApplyAction action) {            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                return;            }            if (action == ApplyAction.CHANGE) {                services.put(datumKey, StringUtils.EMPTY);            }            tasks.add(Pair.with(datumKey, action));        }        public int getTaskSize() {            return tasks.size();        }        @Override        public void run() {            Loggers.DISTRO.info("distro notifier started");            while (true) {                try {                    Pair pair = tasks.take();                    if (pair == null) {                        continue;                    }                    String datumKey = (String) pair.getValue0();                    ApplyAction action = (ApplyAction) pair.getValue1();                    services.remove(datumKey);                    int count = 0;                    if (!listeners.containsKey(datumKey)) {                        continue;                    }                    for (RecordListener listener : listeners.get(datumKey)) {                        count++;                        try {                            if (action == ApplyAction.CHANGE) {                                listener.onChange(datumKey, dataStore.get(datumKey).value);                                continue;                            }                            if (action == ApplyAction.DELETE) {                                listener.onDelete(datumKey);                                continue;                            }                        } catch (Throwable e) {                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                        }                    }                    if (Loggers.DISTRO.isDebugEnabled()) {                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                            datumKey, count, action.name());                    }                } catch (Throwable e) {                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                }            }        }    }
  • Notifier实现了Runnable接口,其run方法会从LinkedBlockingQueue取task,然后挨个执行listener回调

小结

  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口

  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier

  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

"nacos中DistroConsistencyServiceImpl的原理和作用是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

方法 接口 作用 原理 内容 数据 更多 状态 知识 实用 学有所成 接下来 困境 实际 小结 情况 文章 案例 编带 网站 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 学习网络安全法的目的 网络安全的真实感受 猫和老鼠服务器优化是什么意思 巨杉数据库公司 北京 服务器二手主板家用 杭州塘栖网络软件开发公司 手机软件开发岗位分类 实时数据库包含哪些专业 黑龙江亿农网络技术有限公司 服务器24小时硬盘能用几年 tbase数据库创建函数 吉林服务器机柜供应商 重庆拍拍网络技术有限公司做什么的 义乌世茂微播网络技术有限公司 镇江互联网软件开发 网络赌博棋牌软件开发 广州艾拉网络技术 数据库模型运算法则 我的世界蓝色村民在哪一个服务器 云服务器200m能带多少人 专科软件开发多少钱 如何可以收缩数据库 南京电信dns服务器 长沙纵享网络技术有限公司 人事管理系统需要连接数据库吗 学习网络安全专业哪个学校好 大连东软软件开发工资 常用的电子图书数据库 广元市地质环境数据库 鸿翔网络技术有限公司
0