千家信息网

网页主动探测中的NIO优化是怎样的

发表于:2025-11-11 作者:千家信息网编辑
千家信息网最后更新 2025年11月11日,本篇文章给大家分享的是有关网页主动探测中的NIO优化是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。建表语句:CREATE SEQ
千家信息网最后更新 2025年11月11日网页主动探测中的NIO优化是怎样的

本篇文章给大家分享的是有关网页主动探测中的NIO优化是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

建表语句:
CREATE SEQUENCE seq_probe_id INCREMENT BY 1 START WITH 1 NOMAXvalue NOCYCLE CACHE 2000;
create table probe(
host varchar(40) not null,
state int not null,
type varchar(10) not null,
) ;

使用NIO优化这个程序,进一步压榨资源使用率,已经想了好长时间了
无奈NIO+多线程,网上例子都不是很靠谱.自己学的也非常头疼,一拖就是一年多.

新的程序,采用三段过程
首先 使用一个线程池不断的发送连接请求,但是不处理接收.仅仅注册一个SelectionKey.OP_READ的键
另外的一个单线程 程序,不断select符合条件的通道,然后分配给另外一个线程池,用于接收数据,解析数据.(接收和解析的过程合并了)
最后,使用一个单线程的程序,不断的把结果通过批量的方式刷入数据库.这块也算一个优化.由单条Insert改为批量入库.这块至少节约了一个CPU核的处理能力.

持久化过程和解析过程 基本复用了原来的代码

    • import java.io.IOException;

    • import java.net.InetSocketAddress;

    • import java.net.SocketAddress;

    • import java.nio.ByteBuffer;

    • import java.nio.channels.SelectionKey;

    • import java.nio.channels.Selector;

    • import java.nio.channels.SocketChannel;

    • import java.nio.charset.Charset;

    • import java.sql.Connection;

    • import java.sql.DriverManager;

    • import java.sql.PreparedStatement;

    • import java.sql.SQLException;

    • import java.util.ArrayList;

    • import java.util.HashSet;

    • import java.util.Iterator;

    • import java.util.List;

    • import java.util.Set;

    • import java.util.concurrent.BlockingQueue;

    • import java.util.concurrent.CopyOnWriteArrayList;

    • import java.util.concurrent.ExecutorService;

    • import java.util.concurrent.Executors;

    • import java.util.concurrent.LinkedBlockingQueue;

    • import java.util.concurrent.atomic.AtomicInteger;

    • import java.util.regex.Matcher;

    • import java.util.regex.Pattern;

    • public class Probe {

    • private static final int REQUESTTHREADCOUNT = 10;

    • private static final BlockingQueue CONNECTLIST = new LinkedBlockingQueue();

    • private static final BlockingQueue PERSISTENCELIST = new LinkedBlockingQueue();

    • private static ExecutorService REQUESTTHREADPOOL;

    • private static ExecutorService RESPONSETHREADPOOL;

    • private static ExecutorService PERSISTENCETHREADPOOL;

    • private static final List DOMAINLIST = new CopyOnWriteArrayList<>();

    • private static Selector SELECTOR;

    • static {

    • REQUESTTHREADPOOL = Executors.newFixedThreadPool(REQUESTTHREADCOUNT);

    • RESPONSETHREADPOOL = Executors.newFixedThreadPool(3);

    • PERSISTENCETHREADPOOL = Executors.newFixedThreadPool(1);

    • DOMAINLIST.add("news.163.com");

    • try {

    • SELECTOR = Selector.open();

    • } catch (IOException e) {

    • e.printStackTrace();

    • }

    • }

    • public static void main(String[] args) throws IOException, InterruptedException {

    • long start = System.currentTimeMillis();

    • CONNECTLIST.put(new Task("news.163.com", 80, "/index.html"));

    • for (int i = 0; i < REQUESTTHREADCOUNT; i++) {

    • REQUESTTHREADPOOL.submit(new RequestHandler(CONNECTLIST, SELECTOR));

    • }

    • RESPONSETHREADPOOL

    • .submit(new ResponseHandler(SELECTOR, CONNECTLIST, PERSISTENCELIST, DOMAINLIST, RESPONSETHREADPOOL));

    • PERSISTENCETHREADPOOL.submit(new PersistenceHandler(PERSISTENCELIST));

    • while (true) {

    • Thread.sleep(1000);

    • long end = System.currentTimeMillis();

    • float interval = ((end - start) / 1000);

    • int connectTotal = ResponseHandler.GETCOUNT();

    • int persistenceTotal = PersistenceHandler.GETCOUNT();

    • int connectps = Math.round(connectTotal / interval);

    • int persistenceps = Math.round(persistenceTotal / interval);

    • System.out.print(

    • "\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + CONNECTLIST.size() + " \t持久化总数:"

    • + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:" + PERSISTENCELIST.size());

    • }

    • }

    • }

    • class RequestHandler implements Runnable {

    • BlockingQueue connectlist;

    • Selector selector;

    • public RequestHandler(BlockingQueue connectlist, Selector selector) {

    • this.connectlist = connectlist;

    • this.selector = selector;

    • }

    • @Override

    • public void run() {

    • while (true) {

    • try {

    • Task task = (Task) connectlist.take();

    • SocketAddress addr = new InetSocketAddress(task.getHost(), 80);

    • SocketChannel socketChannel = SocketChannel.open(addr);

    • socketChannel.configureBlocking(false);

    • ByteBuffer byteBuffer = ByteBuffer.allocate(2400);

    • byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));

    • byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));

    • byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));

    • byteBuffer.put(("\r\n").getBytes("utf8"));

    • byteBuffer.flip();

    • socketChannel.write(byteBuffer);

    • byteBuffer.clear();

    • socketChannel.register(selector, SelectionKey.OP_READ, task);

    • selector.wakeup();

    • } catch (Exception e) {

    • e.printStackTrace();

    • }

    • }

    • }

    • }

    • class ResponseHandler implements Runnable {

    • Selector selector;

    • BlockingQueue connectlist;

    • BlockingQueue persistencelist;

    • List domainlist;

    • ExecutorService threadPool;

    • Charset charset = Charset.forName("utf8");

    • Charset gbkcharset = Charset.forName("gbk");

    • public static int GETCOUNT() {

    • return COUNT.get();

    • }

    • private static final AtomicInteger COUNT = new AtomicInteger();

    • public ResponseHandler(Selector selector, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist,

    • ExecutorService threadpool) {

    • this.selector = selector;

    • this.connectlist = connectlist;

    • this.persistencelist = persistencelist;

    • this.domainlist = domainlist;

    • this.threadPool = threadpool;

    • }

    • @Override

    • public void run() {

    • while (true) {

    • try {

    • int n = selector.selectNow();

    • if (n == 0)

    • continue;

    • Iterator it = selector.selectedKeys().iterator();

    • while (it.hasNext()) {

    • SelectionKey key = (SelectionKey) it.next();

    • if (key.isReadable() && key.isValid()) {

    • key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));

    • Runnable r = new Runnable() {

    • @Override

    • public void run() {

    • try {

    • Task task = (Task) key.attachment();

    • ByteBuffer byteBuffer = ByteBuffer.allocate(2400);

    • SocketChannel channel = (SocketChannel) key.channel();

    • int length;

    • while ((length = channel.read(byteBuffer)) > 0) {

    • byteBuffer.flip();

    • task.appendContent(charset.decode(charset.encode(gbkcharset.decode(byteBuffer)))

    • .toString());

    • byteBuffer.compact();

    • }

    • if (length == -1) {

    • channel.close();

    • COUNT.incrementAndGet();

    • new ParseHandler(task, connectlist, persistencelist, domainlist).handler();

    • } else {

    • channel.register(selector, SelectionKey.OP_READ, task);

    • }

    • key.selector().wakeup();

    • } catch (Exception e) {

    • try {

    • key.cancel();

    • key.channel().close();

    • } catch (IOException e1) {

    • e1.printStackTrace();

    • }

    • e.printStackTrace();

    • }

    • }

    • };

    • threadPool.submit(r);

    • }

    • it.remove();

    • }

    • } catch (Exception e) {

    • e.printStackTrace();

    • }

    • }

    • }

    • }

    • class ParseHandler {

    • private static final Set SET = new HashSet();

    • private BlockingQueue connectlist;

    • private BlockingQueue persistencelist;

    • List domainlist;

    • Task task;

    • private interface Filter {

    • void doFilter(Task fatherTask, Task newTask, String path, Filter chain);

    • }

    • private class FilterChain implements Filter {

    • private List list = new ArrayList();

    • {

    • addFilter(new TwoLevel());

    • addFilter(new OneLevel());

    • addFilter(new FullPath());

    • addFilter(new Root());

    • addFilter(new Default());

    • }

    • private void addFilter(Filter filter) {

    • list.add(filter);

    • }

    • private Iterator it = list.iterator();

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (it.hasNext()) {

    • ((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);

    • }

    • }

    • }

    • private class TwoLevel implements Filter {

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (path.startsWith("../../")) {

    • String prefix = getPrefix(fatherTask.getCurrentPath(), 3);

    • newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));

    • } else {

    • chain.doFilter(fatherTask, newTask, path, chain);

    • }

    • }

    • }

    • private class OneLevel implements Filter {

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (path.startsWith("../")) {

    • String prefix = getPrefix(fatherTask.getCurrentPath(), 2);

    • newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));

    • } else {

    • chain.doFilter(fatherTask, newTask, path, chain);

    • }

    • }

    • }

    • private class FullPath implements Filter {

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (path.startsWith("http://")) {

    • Iterator it = domainlist.iterator();

    • boolean flag = false;

    • while (it.hasNext()) {

    • String domain = (String) it.next();

    • if (path.startsWith("http://" + domain + "/")) {

    • newTask.init(domain, fatherTask.getPort(), path.replace("http://" + domain + "/", "/"));

    • flag = true;

    • break;

    • }

    • }

    • if (!flag) {

    • newTask.setValid(false);

    • }

    • } else {

    • chain.doFilter(fatherTask, newTask, path, chain);

    • }

    • }

    • }

    • private class Root implements Filter {

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (path.startsWith("/")) {

    • newTask.init(fatherTask.getHost(), fatherTask.getPort(), path);

    • } else {

    • chain.doFilter(fatherTask, newTask, path, chain);

    • }

    • }

    • }

    • private class Default implements Filter {

    • @Override

    • public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {

    • if (path.contains(":")) {

    • newTask.setValid(false);

    • return;

    • }

    • String prefix = getPrefix(fatherTask.getCurrentPath(), 1);

    • newTask.init(fatherTask.getHost(), fatherTask.getPort(), prefix + "/" + path);

    • }

    • }

    • public ParseHandler(Task task, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist) {

    • this.connectlist = connectlist;

    • this.task = task;

    • this.persistencelist = persistencelist;

    • this.domainlist = domainlist;

    • }

    • private Pattern pattern = Pattern.compile("\"[^\"]+\\.htm[^\"]*\"");

    • protected void handler() {

    • try {

    • parseTaskState(task);

    • if (200 == task.getState()) {

    • Matcher matcher = pattern.matcher(task.getContent());

    • while (matcher.find()) {

    • String path = matcher.group();

    • if (!path.contains(" ") && !path.contains("\t") && !path.contains("(") && !path.contains(")")) {

    • path = path.substring(1, path.length() - 1);

    • createNewTask(task, path);

    • }

    • }

    • }

    • task.dropContent();

    • persistencelist.put(task);

    • } catch (Exception e) {

    • e.printStackTrace();

    • }

    • }

    • private void parseTaskState(Task task) {

    • if (task.getContent().startsWith("HTTP/1.1")) {

    • task.setState(Integer.parseInt(task.getContent().substring(9, 12)));

    • } else {

    • task.setState(Integer.parseInt(task.getContent().substring(9, 12)));

    • }

    • }

    • /**

    • * @param fatherTask

    • * @param path

    • * @throws Exception

    • */

    • private void createNewTask(Task fatherTask, String path) throws Exception {

    • Task newTask = new Task();

    • FilterChain filterchain = new FilterChain();

    • filterchain.doFilter(fatherTask, newTask, path, filterchain);

    • if (newTask.isValid()) {

    • synchronized (SET) {

    • if (SET.contains(newTask.getHost() + newTask.getCurrentPath())) {

    • return;

    • }

    • SET.add(newTask.getHost() + newTask.getCurrentPath());

    • }

    • connectlist.put(newTask);

    • }

    • }

    • private String getPrefix(String s, int count) {

    • String prefix = s;

    • while (count > 0) {

    • prefix = prefix.substring(0, prefix.lastIndexOf("/"));

    • count--;

    • }

    • return "".equals(prefix) ? "/" : prefix;

    • }

    • }

    • class Task {

    • public Task() {

    • }

    • public void init(String host, int port, String path) {

    • this.setCurrentPath(path);

    • this.host = host;

    • this.port = port;

    • }

    • public Task(String host, int port, String path) {

    • init(host, port, path);

    • }

    • private String host;

    • private int port;

    • private String currentPath;

    • private long starttime;

    • private long endtime;

    • public long getStarttime() {

    • return starttime;

    • }

    • public void setStarttime(long starttime) {

    • this.starttime = starttime;

    • }

    • public long getEndtime() {

    • return endtime;

    • }

    • public void setEndtime(long endtime) {

    • this.endtime = endtime;

    • }

    • private long taskTime;

    • private String type;

    • private StringBuilder content = new StringBuilder(2400);

    • private int state;

    • private boolean isValid = true;

    • public boolean isValid() {

    • return isValid;

    • }

    • public void setValid(boolean isValid) {

    • this.isValid = isValid;

    • }

    • public int getState() {

    • return state;

    • }

    • public void setState(int state) {

    • this.state = state;

    • }

    • public String getCurrentPath() {

    • return currentPath;

    • }

    • public void setCurrentPath(String currentPath) {

    • this.currentPath = currentPath;

    • int i = 0;

    • if (currentPath.indexOf("?") != -1) {

    • i = currentPath.indexOf("?");

    • } else {

    • if (currentPath.indexOf("#") != -1) {

    • i = currentPath.indexOf("#");

    • } else {

    • i = currentPath.length();

    • }

    • }

    • this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);

    • }

    • public long getTaskTime() {

    • return getEndtime() - getStarttime();

    • }

    • public String getType() {

    • return type;

    • }

    • public void setType(String type) {

    • this.type = type;

    • }

    • public String getHost() {

    • return host;

    • }

    • public int getPort() {

    • return port;

    • }

    • public String getContent() {

    • return content.toString();

    • }

    • public void dropContent() {

    • this.content = null;

    • }

    • public void appendContent(String content) {

    • this.content.append(content);

    • }

    • }

    • class PersistenceHandler implements Runnable {

    • static {

    • try {

    • Class.forName("oracle.jdbc.OracleDriver");

    • } catch (ClassNotFoundException e) {

    • // TODO Auto-generated catch block

    • e.printStackTrace();

    • }

    • }

    • public static int GETCOUNT() {

    • return COUNT.get();

    • }

    • private static final AtomicInteger COUNT = new AtomicInteger();

    • private BlockingQueue persistencelist;

    • public PersistenceHandler(BlockingQueue persistencelist) {

    • this.persistencelist = persistencelist;

    • try {

    • conn = DriverManager.getConnection("jdbc:oracle:thin:127.0.0.1:1521:orcl", "edmond", "edmond");

    • ps = conn.prepareStatement(

    • "insert into probe(id,host,path,state,tasktime,type) values(seq_probe_id.nextval,?,?,?,?,?)");

    • } catch (SQLException e) {

    • // TODO Auto-generated catch block

    • e.printStackTrace();

    • }

    • }

    • private Connection conn;

    • private PreparedStatement ps;

    • @Override

    • public void run() {

    • while (true) {

    • this.handler();

    • COUNT.addAndGet(1);

    • }

    • }

    • private void handler() {

    • try {

    • Task task = (Task) persistencelist.take();

    • ps.setString(1, task.getHost());

    • ps.setString(2, task.getCurrentPath());

    • ps.setInt(3, task.getState());

    • ps.setLong(4, task.getTaskTime());

    • ps.setString(5, task.getType());

    • ps.addBatch();

    • if (GETCOUNT() % 500 == 0) {

    • ps.executeBatch();

    • conn.commit();

    • }

    • } catch (InterruptedException e) {

    • e.printStackTrace();

    • } catch (SQLException e) {

    • e.printStackTrace();

    • }

    • }

    • }


    • 每秒可以爬170-200左右的网页




    • 因为这个速度受制于公司带宽.




    • CPU也基本上跑满了





    • 这个程序还有优化的空间,主要是以下代码的阻塞和唤醒关系,还是没有搞明白.


    • socketChannel.register(selector, SelectionKey.OP_READ, task);


    • int n = selector.select();


    • key.selector().wakeup();

    以上就是网页主动探测中的NIO优化是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0