netty中pipeline如何添加handler
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。pipeline无疑是 netty中非常
千家信息网最后更新 2025年12月01日netty中pipeline如何添加handler
这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 绑定线程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 绑定监听端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 绑定客户端连接时候触发操作 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串编码 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服务器异步创建绑定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); // 关闭服务器通道 cf.channel().closeFuture().sync(); } finally { // 释放线程池资源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } } 顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception {//省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看这里 添加了一个ChannelInitializer实例 实例中实现了initChannel 获取配置的handler添加到pipeline p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } 而pipeline的addlast方法如下
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //这方法很简单 添加到链表中 但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer //但请注意 首次注册不会走到这里 在前面的判断就会返回 会在注册的逻辑中触发 最终调用的一致 callHandlerAdded0(newCtx); return this; }查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看这里删除pipeline中的 ChannelInitializer ChannelInitializer 其实就是一个handler 只是它重写了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }关于"netty中pipeline如何添加handler"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
方法
服务
代码
实例
就是
篇文章
关键
字符
字符串
更多
服务器
线程
处理
监听
不错
实用
有序
重要
一致
上文
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
如何减少软件开发的成本
专升本计算机导论数据库
西门子医疗软件开发
什么是软件开发费用
登入游戏服务器超时是什么意思
网络安全知识连环画
车停到服务器超6个小时
服务器网口灯橙色闪烁
粘土服务器举报指令
网络安全法主题宣传课班会
数据库查询中新增字段
食品安全标准数据库英文版
湖南管理软件开发费用
网络安全金融业务
国家网络安全快板
普陀区常见网络技术问答知识
昆明适合软件开发
怎么租国外的服务器
长宁区企业网络技术服务信息推荐
互联网金融科技本质
滁州机架式服务器哪家好
汉中士亢网络技术有限公司
怎样做好一个数据库
我的世界酷跑服务器
软件开发结算计费
网络安全运维工程师防火墙
广州网络安全运维有哪些
自建数据库 扫描枪
上海揆安 网络安全检查
网络安全工作有哪些内容