千家信息网

Thrift中socket 关键的作用是什么

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章为大家展示了Thrift中socket 关键的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。TNonblockingServerSocket
千家信息网最后更新 2025年12月03日Thrift中socket 关键的作用是什么

本篇文章为大家展示了Thrift中socket 关键的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

TNonblockingServerSocket:

  public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException {    clientTimeout_ = args.clientTimeout;    try {      serverSocketChannel = ServerSocketChannel.open();      serverSocketChannel.configureBlocking(false);      // Make server socket      serverSocket_ = serverSocketChannel.socket();      // Prevent 2MSL delay problem on server restarts      serverSocket_.setReuseAddress(true);//是否重用地址      // Bind to listening port      serverSocket_.bind(args.bindAddr, args.backlog);//backlog 可接受连接数量    } catch (IOException ioe) {      serverSocket_ = null;      throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe);    }  }  public void listen() throws TTransportException {    // Make sure not to block on accept    if (serverSocket_ != null) {      try {        serverSocket_.setSoTimeout(0);      } catch (SocketException sx) {        LOGGER.error("Socket exception while setting socket timeout", sx);      }    }  }

TNonblockingServer:

  private SelectAcceptThread selectAcceptThread_;  //内部新事件,监听线程  //父类中有相应启动操作 --serve();  public TNonblockingServer(AbstractNonblockingServerArgs args) {    super(args);  }

SelectAcceptThread

public void run() {      try {        if (eventHandler_ != null) {          eventHandler_.preServe();        }        while (!stopped_) {          select();          processInterestChanges();        }        for (SelectionKey selectionKey : selector.keys()) {          cleanupSelectionKey(selectionKey);        }      } catch (Throwable t) {        LOGGER.error("run() exiting due to uncaught error", t);      } finally {        try {          selector.close();        } catch (IOException e) {          LOGGER.error("Got an IOException while closing selector!", e);        }        stopped_ = true;      }    } private void select() {      try {        // wait for io events.        selector.select();        // process the io events we received        Iterator selectedKeys = selector.selectedKeys().iterator();        while (!stopped_ && selectedKeys.hasNext()) {          SelectionKey key = selectedKeys.next();          selectedKeys.remove();          // skip if not valid          if (!key.isValid()) {            cleanupSelectionKey(key);            continue;          }          // if the key is marked Accept, then it has to be the server          // transport.          if (key.isAcceptable()) {            handleAccept();          } else if (key.isReadable()) {            // deal with reads            handleRead(key);          } else if (key.isWritable()) {            // deal with writes            handleWrite(key);          } else {            LOGGER.warn("Unexpected state in select! " + key.interestOps());          }        }      } catch (IOException e) {        LOGGER.warn("Got an IOException while selecting!", e);      }    }
    /**     * Do the work required to read from a readable client. If the frame is     * fully read, then invoke the method call.     */    protected void handleRead(SelectionKey key) {      FrameBuffer buffer = (FrameBuffer) key.attachment();      if (!buffer.read()) {        cleanupSelectionKey(key);        return;      }      // if the buffer's frame read is complete, invoke the method.      if (buffer.isFrameFullyRead()) {        if (!requestInvoke(buffer)) {//回调,通过线程池,调用 FrameBuffer  中的 invoke () 方法 进行数据读取          cleanupSelectionKey(key);        }      }    }// 创建一个新的线程进行回调  protected boolean requestInvoke(FrameBuffer frameBuffer) {    try {      Runnable invocation = getRunnable(frameBuffer);      invoker.execute(invocation);      return true;    } catch (RejectedExecutionException rx) {      LOGGER.warn("ExecutorService rejected execution!", rx);      return false;    }  }  protected Runnable getRunnable(FrameBuffer frameBuffer){    return new Invocation(frameBuffer);  }//Invocation 中回调方法class Invocation implements Runnable {  private final FrameBuffer frameBuffer;  public Invocation(final FrameBuffer frameBuffer) {    this.frameBuffer = frameBuffer;  }  public void run() {    frameBuffer.invoke();  }}

FrameBuffer:

  public void invoke() {      frameTrans_.reset(buffer_.array());      response_.reset();      try {        if (eventHandler_ != null) {          eventHandler_.processContext(context_, inTrans_, outTrans_);        }        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);        responseReady();        return;      } catch (TException te) {        LOGGER.warn("Exception while invoking!", te);      } catch (Throwable t) {        LOGGER.error("Unexpected throwable while invoking!", t);      }      // This will only be reached when there is a throwable.      state_ = FrameBufferState.AWAITING_CLOSE;      requestSelectInterestChange();    }

上述内容就是Thrift中socket 关键的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0