zk中FinalRequestProcessor的作用是什么
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章给大家介绍zk中FinalRequestProcessor的作用是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。是处理器最后一个环节FinalRequestProce
千家信息网最后更新 2025年12月01日zk中FinalRequestProcessor的作用是什么
这篇文章给大家介绍zk中FinalRequestProcessor的作用是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
是处理器最后一个环节
FinalRequestProcessor implements RequestProcessor
处理器链最后一个环节处理事务和非事务请求最后一个环节
构造器
public FinalRequestProcessor(ZooKeeperServer zks) { this.zks = zks; this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();}处理命令信息
public void processRequest(Request request) { LOG.debug("Processing request:: {}", request); // request.addRQRec(">final"); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558: // In some cases the server does not close the connection (e.g., closeconn buffer // was not being queued - ZOOKEEPER-558) properly. This happens, for example, // when the client closes the connection. The server should still close the session, though. // Calling closeSession() after losing the cnxn, results in the client close session response being dropped. if (request.type == OpCode.closeSession && connClosedByClient(request)) { // We need to check if we can close the session id. // Sometimes the corresponding ServerCnxnFactory could be null because // we are just playing diffs from the leader. if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) { return; } } if (request.getHdr() != null) { /* * Request header is created only by the leader, so this must be * a quorum request. Since we're comparing timestamps across hosts, * this metric may be incorrect. However, it's still a very useful * metric to track in the happy case. If there is clock drift, * the latency can go negative. Note: headers use wall time, not * CLOCK_MONOTONIC. */ long propagationLatency = Time.currentWallTime() - request.getHdr().getTime(); if (propagationLatency >= 0) { ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency); } } if (request.cnxn == null) { return; } ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA"; // Notify ZooKeeperServer that the request has finished so that it can // update any request accounting/throttling limits zks.decInProcess(); zks.requestFinished(request); Code err = Code.OK; Record rsp = null; String path = null; try { if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) { /* * When local session upgrading is disabled, leader will * reject the ephemeral node creation due to session expire. * However, if this is the follower that issue the request, * it will have the correct error code, so we should use that * and report to user */ if (request.getException() != null) { throw request.getException(); } else { throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr())); } } KeeperException ke = request.getException(); if (ke instanceof SessionMovedException) { throw ke; } if (ke != null && request.type != OpCode.multi) { throw ke; } LOG.debug("{}", request); if (request.isStale()) { ServerMetrics.getMetrics().STALE_REPLIES.add(1); } switch (request.type) { case OpCode.ping: { lastOp = "PING"; updateStats(request, lastOp, lastZxid); cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response"); return; } case OpCode.createSession: { lastOp = "SESS"; updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true); return; } case OpCode.multi: { lastOp = "MULT"; rsp = new MultiResponse(); for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult; switch (subTxnResult.type) { case OpCode.check: subResult = new CheckResult(); break; case OpCode.create: subResult = new CreateResult(subTxnResult.path); break; case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: subResult = new CreateResult(subTxnResult.path, subTxnResult.stat); break; case OpCode.delete: case OpCode.deleteContainer: subResult = new DeleteResult(); break; case OpCode.setData: subResult = new SetDataResult(subTxnResult.stat); break; case OpCode.error: subResult = new ErrorResult(subTxnResult.err); if (subTxnResult.err == Code.SESSIONMOVED.intValue()) { throw new SessionMovedException(); } break; default: throw new IOException("Invalid type of op"); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.multiRead: { lastOp = "MLTR"; MultiOperationRecord multiReadRecord = new MultiOperationRecord(); ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { try { Record rec; switch (readOp.getType()) { case OpCode.getChildren: rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo); subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren()); break; case OpCode.getData: rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo); GetDataResponse gdr = (GetDataResponse) rec; subResult = new GetDataResult(gdr.getData(), gdr.getStat()); break; default: throw new IOException("Invalid type of readOp"); } } catch (KeeperException e) { subResult = new ErrorResult(e.code().intValue()); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { lastOp = "CREA"; rsp = new Create2Response(rc.path, rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.delete: case OpCode.deleteContainer: { lastOp = "DELE"; err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.setData: { lastOp = "SETD"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.reconfig: { lastOp = "RECO"; rsp = new GetDataResponse( ((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(), rc.stat); err = Code.get(rc.err); break; } case OpCode.setACL: { lastOp = "SETA"; rsp = new SetACLResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.closeSession: { lastOp = "CLOS"; err = Code.get(rc.err); break; } case OpCode.sync: { lastOp = "SYNC"; SyncRequest syncRequest = new SyncRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; } case OpCode.check: { lastOp = "CHEC"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); break; } case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.setWatches: { lastOp = "SETW"; SetWatches setWatches = new SetWatches(); // TODO We really should NOT need this!!!! request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase() .setWatches( relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), cnxn); break; } case OpCode.getACL: { lastOp = "GETA"; GetACLRequest getACLRequest = new GetACLRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path, null); Stat stat = new Stat(); List acl = zks.getZKDatabase().getACL(path, stat); requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath()); try { zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.ADMIN, request.authInfo, path, null); rsp = new GetACLResponse(acl, stat); } catch (KeeperException.NoAuthException e) { List acl1 = new ArrayList(acl.size()); for (ACL a : acl) { if ("digest".equals(a.getId().getScheme())) { Id id = a.getId(); Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x")); acl1.add(new ACL(a.getPerms(), id1)); } else { acl1.add(a); } } rsp = new GetACLResponse(acl1, stat); } break; } case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); int number = zks.getZKDatabase().getAllChildrenNumber(path); rsp = new GetAllChildrenNumberResponse(number); break; } case OpCode.getChildren2: { lastOp = "GETC"; GetChildren2Request getChildren2Request = new GetChildren2Request(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); List children = zks.getZKDatabase() .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null); rsp = new GetChildren2Response(children, stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.checkWatches: { lastOp = "CHKW"; CheckWatchesRequest checkWatches = new CheckWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); if (!containsWatcher) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath()); break; } case OpCode.removeWatches: { lastOp = "REMW"; RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); if (!removed) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath()); break; } case OpCode.getEphemerals: { lastOp = "GETE"; GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals); String prefixPath = getEphemerals.getPrefixPath(); Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List ephemerals = new ArrayList<>(); if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) { ephemerals.addAll(allEphems); } else { for (String p : allEphems) { if (p.startsWith(prefixPath)) { ephemerals.add(p); } } } rsp = new GetEphemeralsResponse(ephemerals); break; } } } catch (SessionMovedException e) { // session moved is a connection level error, we need to tear // down the connection otw ZOOKEEPER-710 might happen // ie client on slow follower starts to renew session, fails // before this completes, then tries the fast follower (leader) // and is successful, however the initial renew is then // successfully fwd/processed by the leader and as a result // the client and leader disagree on where the client is most // recently attached (and therefore invalid SESSION MOVED generated) cnxn.sendCloseSession(); return; } catch (KeeperException e) { err = e.code(); } catch (Exception e) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process " + request, e); StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } LOG.error("Dumping request buffer: 0x" + sb.toString()); err = Code.MARSHALLINGERROR; } ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); updateStats(request, lastOp, lastZxid); try { if (request.type == OpCode.getData && path != null && rsp != null) { // Serialized read responses could be cached by the connection object. // Cache entries are identified by their path and last modified zxid, // so these values are passed along with the response. GetDataResponse getDataResponse = (GetDataResponse) rsp; Stat stat = null; if (getDataResponse.getStat() != null) { stat = getDataResponse.getStat(); } cnxn.sendResponse(hdr, rsp, "response", path, stat); } else { cnxn.sendResponse(hdr, rsp, "response"); } if (request.type == OpCode.closeSession) { cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG", e); }} 关于zk中FinalRequestProcessor的作用是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
处理
环节
作用
事务
内容
处理器
更多
帮助
不错
信息
兴趣
命令
小伙
小伙伴
文章
知识
篇文章
构造器
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
指数软件开发
网络安全审查报表
软件开发工程师穿搭微胖
佛山市天迹网络技术有限公司
100人的服务器
教育局抓实网络安全宣传
数据库应用的使用错误
拼多多服务器图片
乐清软件开发招聘
达内网络安全工程师
海信聚好看软件开发咋样
服务器板载显卡黑屏
管家婆没有数据库怎么办
成人网络安全教育课件
闵行区创新数据库服务商报价行情
北京世外桃源网络技术
软件开发怎么包装简历
广州鲜京网络技术
交通违章查询软件开发
计算机网络技术学生努力的方向
结构化分析方法软件开发方法
兰州安卓软件开发报价多少
长宁区技术软件开发大概费用
上海手机软件开发定做
为什么互联网需要服务器
校园网络安全事例前言
网络安全方案排版海报
魂师对决服务器人多进不去了咋办
数据库实验教程2014
治理网络安全的法律