千家信息网

如何用源码分析canal的deployer模块

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。CanalLauncher是启动入口类
千家信息网最后更新 2025年12月02日如何用源码分析canal的deployer模块

这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

  • CanalLauncher是启动入口类

  1. 获取canal.properties配置文件

  2. 如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties

  3. 通过properties构造 CanalStarter并调用其start方法

CanalStarter是启动类

public synchronized void start() throws Throwable {        //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);        if (serverMode.equalsIgnoreCase("kafka")) {            canalMQProducer = new CanalKafkaProducer();        } else if (serverMode.equalsIgnoreCase("rocketmq")) {            canalMQProducer = new CanalRocketMQProducer();        }        if (canalMQProducer != null) {            // disable netty            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");            // 设置为raw避免ByteString->Entry的二次解析            System.setProperty("canal.instance.memory.rawEntry", "false");        }        //接下来构造CanalController并调用其start方法        logger.info("## start the canal server.");        controller = new CanalController(properties);        controller.start();        logger.info("## the canal server is running now ......");        ...        //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性        if (canalMQProducer != null) {            canalMQStarter = new CanalMQStarter(canalMQProducer);            MQProperties mqProperties = buildMQProperties(properties);            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);            canalMQStarter.start(mqProperties, destinations);            controller.setCanalMQStarter(canalMQStarter);        }        ...        running = true;    }
  • CanalController是实例调度控制器

public CanalController(final Properties properties){        // 初始化managerClients用于请求admin        managerClients = MigrateMap.makeComputingMap(new Function() {            public PlainCanalConfigClient apply(String managerAddress) {                return getManagerClient(managerAddress);            }        });        // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance        globalInstanceConfig = initGlobalConfig(properties);        instanceConfigs = new MapMaker().makeMap();        // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml        initInstanceConfig(properties);        ...        // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性        embededCanalServer = CanalServerWithEmbedded.instance();        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator        int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));        embededCanalServer.setMetricsPort(metricsPort);        this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);        this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);        embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));        embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));        ...        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);        //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点        if (StringUtils.isNotEmpty(zkServers)) {            zkclientx = ZkClientx.getZkClient(zkServers);            // 初始化系统目录            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);        }        // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例        final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);        ServerRunningMonitors.setServerData(serverData);        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function() {            ...        }));        // 初始化InstanceAction,用于启动和关闭实例        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));        if (autoScan) {            defaultAction = new InstanceAction() {                ...            };            // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() {                public InstanceConfigMonitor apply(InstanceMode mode) {                    ...                }            });        }    }
  • ManagerInstanceConfigMonitor是实例扫描器

public void start() {        super.start();        //启动定时任务,定时扫描所有instance        executor.scheduleWithFixedDelay(new Runnable() {            public void run() {                try {                    scan();                    if (isFirst) {                        isFirst = false;                    }                } catch (Throwable e) {                    logger.error("scan failed", e);                }            }        }, 0, scanIntervalInSecond, TimeUnit.SECONDS);    }private void scan() {        //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启        String instances = configClient.findInstances(null);        final List is = Lists.newArrayList(StringUtils.split(instances, ','));        List start = Lists.newArrayList();        List stop = Lists.newArrayList();        List restart = Lists.newArrayList();        for (String instance : is) {            if (!configs.containsKey(instance)) {                PlainCanal newPlainCanal = configClient.findInstance(instance, null);                if (newPlainCanal != null) {                    configs.put(instance, newPlainCanal);                    start.add(instance);                }            } else {                PlainCanal plainCanal = configs.get(instance);                PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());                if (newPlainCanal != null) {                    // 配置有变化                    restart.add(instance);                    configs.put(instance, newPlainCanal);                }            }        }        configs.forEach((instance, plainCanal) -> {            if (!is.contains(instance)) {                stop.add(instance);            }        });        stop.forEach(instance -> {            notifyStop(instance);        });        restart.forEach(instance -> {            notifyReload(instance);        });        start.forEach(instance -> {            notifyStart(instance);        });    }private void notifyStart(String destination) {        try {            //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例            defaultAction.start(destination);            actions.put(destination, defaultAction);            // 启动成功后记录配置文件信息        } catch (Throwable e) {            logger.error(String.format("scan add found[%s] but start failed", destination), e);        }    }
  • ServerRunningMonitor是针对server的running实例控制

public ServerRunningMonitor(){        // 创建父节点        dataListener = new IZkDataListener() {            public void handleDataChange(String dataPath, Object data) throws Exception {                MDC.put("destination", destination);                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);                if (!isMine(runningData.getAddress())) {                    mutex.set(false);                }                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active                    releaseRunning();// 彻底释放mainstem                }                activeData = (ServerRunningData) runningData;            }            public void handleDataDeleted(String dataPath) throws Exception {                MDC.put("destination", destination);                mutex.set(false);                if (!release && activeData != null && isMine(activeData.getAddress())) {                    // 如果上一次active的状态就是本机,则即时触发一下active抢占                    initRunning();                } else {                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作                    delayExector.schedule(new Runnable() {                        public void run() {                            initRunning();                        }                    }, delayTime, TimeUnit.SECONDS);                }            }        };    }public synchronized void start() {        super.start();        try {            //首先调用listener的processStart方法            processStart();            if (zkClient != null) {                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start                // 监视/otteradmin/canal/destinations/{0}/running节点变化                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);                zkClient.subscribeDataChanges(path, dataListener);                                initRunning();            } else {                processActiveEnter();// 没有zk,直接启动            }        } catch (Exception e) {            logger.error("start failed", e);            // 没有正常启动,重置一下状态,避免干扰下一次start            stop();        }    }private void processStart() {        if (listener != null) {            try {                //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port                listener.processStart();            } catch (Exception e) {                logger.error("processStart failed", e);            }        }    }private void initRunning() {        if (!isStart()) {            return;        }        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);        // 序列化        byte[] bytes = JsonUtils.marshalToByte(serverData);        try {            mutex.set(false);            //尝试创建/otteradmin/canal/destinations/{0}/running节点            zkClient.create(path, bytes, CreateMode.EPHEMERAL);            activeData = serverData;            //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例            processActiveEnter();// 触发一下事件            mutex.set(true);            release = false;        } catch (ZkNodeExistsException e) {            bytes = zkClient.readData(path, true);            if (bytes == null) {// 如果不存在节点,立即尝试一次                initRunning();            } else {                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);            }        } catch (ZkNoNodeException e) {            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点            initRunning();        }    }
  • canal.properties配置

canal.register.ip =canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster =

上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

实例 方法 节点 配置 分析 就是 属性 文件 尝试 模块 源码 成功 全局 内容 机器 状态 本机 变化 控制 频繁 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 流媒体服务器管理服务 网络安全为什么会重点议题 百色计算机网络技术培训 软件开发公司相关新闻 LOSER音译软件开发 游戏mysql数据库修改方法 北京郭毅网络技术 提高软件开发人员的执行力 奇妙林森网络技术有限公司 软件开发初级设计 武汉小程序软件开发服务费 赵玉灿软件开发 gitlab 服务器 榆树有名的网络技术哪家好 table显示数据库所有信息 车联网部标平台软件开发 我的世界地球服务器怎么进入 中国生物文献数据库在哪里注册 法信是常用的法律法规数据库 北京咨询云控软件开发商 北京聚汇融盛互联网科技有限公司 服务器如何挂载光驱 软件开发的活好接么 新乡市博易网络技术有限公司 代号探戈第四关服务器核心 软件开发的大师都有谁 数据库字段是否为空的依据 网络安全创始人 花生壳弄游戏服务器需要买流量不 应届生面试软件开发工程师
0