千家信息网

dubbo中ExecutionDispatcher的作用是什么

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇文章为大家展示了dubbo中ExecutionDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。ExecutionDispat
千家信息网最后更新 2025年12月01日dubbo中ExecutionDispatcher的作用是什么

本篇文章为大家展示了dubbo中ExecutionDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

ExecutionDispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java

public class ExecutionDispatcher implements Dispatcher {    public static final String NAME = "execution";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new ExecutionChannelHandler(handler, url);    }}
  • ExecutionDispatcher实现了Dispatcher接口,其dispatch方法返回的是ExecutionChannelHandler

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {    public ExecutionChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getExecutorService();        if (message instanceof Request) {            try {                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));            } catch (Throwable t) {                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent                // this scenario from happening, but a better solution should be considered later.                if (t instanceof RejectedExecutionException) {                    Request request = (Request) message;                    if (request.isTwoWay()) {                        String msg = "Server side(" + url.getIp() + "," + url.getPort()                                + ") thread pool is exhausted, detail msg:" + t.getMessage();                        Response response = new Response(request.getId(), request.getVersion());                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                        response.setErrorMessage(msg);                        channel.send(response);                        return;                    }                }                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);            }        } else {            handler.received(channel, message);        }    }}
  • ExecutionChannelHandler继承了WrappedChannelHandler,其received方法判断message是否是Request类型,如果是则创建ChannelEventRunnable放到线程池里头执行,如果不是则直接执行handler.received

PerformanceServerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java

public class PerformanceServerTest  {    private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);    private static ExchangeServer server = null;    private static void restartServer(int times, int alive, int sleep) throws Exception {        if (server != null && !server.isClosed()) {            server.close();            Thread.sleep(100);        }        for (int i = 0; i < times; i++) {            logger.info("restart times:" + i);            server = statServer();            if (alive > 0) Thread.sleep(alive);            server.close();            if (sleep > 0) Thread.sleep(sleep);        }        server = statServer();    }    private static ExchangeServer statServer() throws Exception {        final int port = PerformanceUtils.getIntProperty("port", 9911);        final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER);        final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);        final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL);        final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS);        final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS);        final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE);        final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME);        // Start server        ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter="                + transporter + "&serialization="                + serialization + "&threadpool=" + threadpool                + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() {            public String telnet(Channel channel, String message) throws RemotingException {                return "echo: " + message + "\r\ntelnet> ";            }            public CompletableFuture reply(ExchangeChannel channel, Object request) throws RemotingException {                if ("environment".equals(request)) {                    return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());                }                if ("scene".equals(request)) {                    List scene = new ArrayList();                    scene.add("Transporter: " + transporter);                    scene.add("Service Threads: " + threads);                    return CompletableFuture.completedFuture(scene);                }                return CompletableFuture.completedFuture(request);            }        });        return server;    }    private static ExchangeServer statTelnetServer(int port) throws Exception {        // Start server        ExchangeServer telnetserver = Exchangers.bind("exchange://0.0.0.0:" + port, new ExchangeHandlerAdapter() {            public String telnet(Channel channel, String message) throws RemotingException {                if (message.equals("help")) {                    return "support cmd: \r\n\tstart \r\n\tstop \r\n\tshutdown \r\n\trestart times [alive] [sleep] \r\ntelnet>";                } else if (message.equals("stop")) {                    logger.info("server closed:" + server);                    server.close();                    return "stop server\r\ntelnet>";                } else if (message.startsWith("start")) {                    try {                        restartServer(0, 0, 0);                    } catch (Exception e) {                        e.printStackTrace();                    }                    return "start server\r\ntelnet>";                } else if (message.startsWith("shutdown")) {                    System.exit(0);                    return "start server\r\ntelnet>";                } else if (message.startsWith("channels")) {                    return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "\r\ntelnet>";                } else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100                    String[] args = message.split(" ");                    int times = Integer.parseInt(args[1]);                    int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0;                    int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100;                    try {                        restartServer(times, alive, sleep);                    } catch (Exception e) {                        e.printStackTrace();                    }                    return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] \r\ntelnet>";                } else {                    return "echo: " + message + "\r\ntelnet> ";                }            }        });        return telnetserver;    }    @Test    public void testServer() throws Exception {        // Read port from property        if (PerformanceUtils.getProperty("port", null) == null) {            logger.warn("Please set -Dport=9911");            return;        }        final int port = PerformanceUtils.getIntProperty("port", 9911);        final boolean telnet = PerformanceUtils.getBooleanProperty("telnet", true);        if (telnet) statTelnetServer(port + 1);        server = statServer();        synchronized (PerformanceServerTest.class) {            while (true) {                try {                    PerformanceServerTest.class.wait();                } catch (InterruptedException e) {                }            }        }    }}
  • PerformanceServerTest的statServer方法使用PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME)获取channelHandler,找不到则使用ExecutionDispatcher.NAME

上述内容就是dubbo中ExecutionDispatcher的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

方法 作用 内容 技能 知识 简明 简明扼要 就是 接口 文章 更多 篇文章 类型 线程 行业 资讯 资讯频道 频道 池里 一亮 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 打开本地数据库服务器 创业网络技术培训费用多少 公安网络安全教育宣传活动 软件开发所需的设备有哪些 网络安全为人民的文案 镇网络安全属于哪个部门 软件开发技术人员评级 南通奥鹏软件开发有限公司 软件不用数据库怎么打开 杭州刀塔网络技术有限公司 达梦数据库在麒麟系统启动 网络安全攻防演习是什么 上海市贸易学校计算机网络技术 遵义民宿软件开发 mac客户端软件开发 石家庄软件开发服务价格 重庆应用软件开发机构 软件开发成本如何资本化 数据库表最多能包含多少条记录 ebsco数据库的使用 商丘注册网络安全工程师证 菏泽网络安全公司 网络安全就业率多少 派出所 网络安全讲座 数据库中模块的英文是 下列属于云数据库产品的是 刺激战场外服如何更改服务器 增强网络安全简讯 415网络安全宣传活动总结 华为服务器安装欧拉系统教程
0