如何理解zk-client通信层ClientCnxnSocket
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Client
千家信息网最后更新 2025年12月02日如何理解zk-client通信层ClientCnxnSocket
这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
ClientCnxnSocket抽象类结构 定义了底层Socket通信接口,默认实现是ClientCnxnSocketNIO
readConnectResult 读取server的connnect的response
readLength 方法 读取buffer长度并给incomingBuffer分配对应大小空间
ClientCnxnSocketNIO 实现
findSendablePacket函数 从outgoingQueue中读取发送的packet
doIO函数 处理读写
doTransport函数
如果连接就绪,调用sendThread连接操作
若读写就绪,调用doIO函数
ClientCnxnSocket
属性
ClientCnxnSocketNIO子类
属性
//nio中的selectorprivate final Selector selector = Selector.open();/** * nio中的selectionKey */private SelectionKey sockKey;private SocketAddress localSocketAddress;private SocketAddress remoteSocketAddress;方法 @Overridevoid connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; }//已经连接,但是没有收到resposne initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer;}client和server主要交互函数@Overridevoid doTransport( int waitTimeOut, Queue pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear();} 主要分为读写两种读:没有初始化完成初始化读取len再改incomingbuffer分配对应空间读取对应response写: 找到可以发送的packet如果packet的bytebuffer没有创建,那就进行属性添加bytebuffer写入socketChannel把Packet从outgingQueue中取出,放入pendingQueue中/** * @throws InterruptedException * @throws IOException */void doIO(QueuependingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } }}查询待发送队列中可以发送的packetprivate Packet findSendablePacket(LinkedBlockingDeque outgoingQueue, boolean tunneledAuthInProgres) { //没有要发送的,返回null if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { //取队列第一个进行发送 return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. //有正在认证处理,发送空请求头包给服务端 Iterator iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p); } } return null;}
发送接收packet图
关于如何理解zk-client通信层ClientCnxnSocket就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
函数
通信
属性
内容
文章
方法
更多
知识
空间
篇文章
队列
分配
处理
不错
大小
子类
底层
接口
正在
结构
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
广东网络安全信息化
软件工程在软件开发中的应用
公司软件开发困难吗
mysql增加数据库
宣传网络安全知识作文
python编写简单网页服务器
衡阳法院网络安全普法
优酷网络技术北京公司
无锡企业网络安全准入控制
网络资产信息技术数据库设计
图像和音乐分布在服务器的哪里
武汉有软件开发与应用培训班吗
甘肃书法教学软件开发
我的世界1.9.4服务器
关系数据库就是一张数据表
网络安全公司月薪
中国联通网络技术大会王睿
陕交职院计算机网络技术专业
江苏大容量服务器厂家
网络安全内容手抄报花边
ps5重建数据库没反应
战地1创建服务器免费吗
数据库导出版本不同
宁波艾盛网络技术有...
怎么将数据库表中数据更改
网络安全二十四条
查看数据库是否打开的命令
计算机网络技术教材课本
江苏直销服务器货源充足
如何给自己的数据库加密码