怎样建立连接Zookeeper中的服务端
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。服务端处理请求的代码有两种NIOServerCnxnFactory和Nett
千家信息网最后更新 2025年12月01日怎样建立连接Zookeeper中的服务端
这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。
这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。
NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求
//NIOServerCnxnFactory.java//第200行public void run() { while (!ss.socket().isClosed()) { try { selector.select(1000); Set selected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList selectedList = new ArrayList( selected); Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { //如果是连接请求 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); //获取IP地址对应的客户端连接数 int cnxncount = getClientCnxnCount(ia); //如果超出则关闭 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); //每一个连接都是一个NIOServerCnxn NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { //在第二个循环的时候,会进入这里,处理真正的连接请求 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method");}//NIOServerCnxn.java//第237行void doIO(SelectionKey k) throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); return; } if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else return; } } } //省略部分代码 } catch (CancelledKeyException e) { } catch (CloseRequestException e) { } catch (EndOfStreamException e) { } catch (IOException e) { }}//NIOServerCnxn.java//第194行private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; }}//NIOServerCnxn.java//第434行private void readConnectRequest() throws IOException, InterruptedException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true;}//ZookeeperServer.java//第886行public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } boolean readOnly = false; try { readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readOnly field LOG.warn("Connection request from old client " + cnxn.getRemoteSocketAddress() + "; will be dropped if server is in r-o mode"); } //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg); } if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg); } //协商session超时时间 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); long sessionId = connReq.getSessionId(); if (sessionId != 0) { //如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况 long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }}//ZookeeperServer.java//第617行long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { //创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析 long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); //响应客户端 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId;}//ZookeeperServer.java//第728行public void submitRequest(Request si) { //省略部分代码 try { //刷新session的超时时间 touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { //提交给PrepRequestProcessor进一步处理 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); }}//PrepRequestProcessor.java//第294行protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { //省略部分代码 case OpCode.createSession: request.request.rewind(); int to = request.request.getInt(); request.txn = new CreateSessionTxn(to); request.request.rewind(); //这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么 zks.sessionTracker.addSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; //省略部分代码 default: LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type); }} 关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
代码
部分
处理
服务
客户
客户端
内容
情况
时间
更多
分析
帮助
循环
不错
复杂
两个
兴趣
原因
参数
只是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
东莞菲凡网络技术
台州市龙讯网络技术有限公司
消息中间件和数据库联合事务
网络安全控制和监测
广东安卓软件开发正规平台
手机软件开发如何入门
网络安全知识答题答案查询
开源有限元的软件开发
大并发服务器
水务行业网络安全价格
易语言 服务器ip
无线网络技术教程考试题
吉利对供应商产网络安全的要求
网页调用sql数据库
浙江磐石网络技术公司
网络技术最新动态、
加强网络安全防护的思考
怎样制作一个家庭服务器
php数据库列表页面
网络安全等级保护测评教程
none sql数据库
打开服务器连接异常
合格数据库
网络技术中级考试试题
我的世界服装模组金币服务器
网络安全 个人事迹
合计数据库
教资支付时内部服务器错误怎么办
职中正雄5g网络技术
重庆网络安全微视频