溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的

發(fā)布時(shí)間:2021-12-27 17:09:17 來源:億速云 閱讀:85 作者:柒染 欄目:編程語言

本篇文章給大家分享的是有關(guān)網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

建表語句:
CREATE SEQUENCE seq_probe_id INCREMENT BY 1 START WITH 1 NOMAXvalue NOCYCLE CACHE 2000;
create table probe( 
    host varchar(40) not null, 
    state int not null,
    type varchar(10) not null,
) ;

使用NIO優(yōu)化這個(gè)程序,進(jìn)一步壓榨資源使用率,已經(jīng)想了好長(zhǎng)時(shí)間了
無奈NIO+多線程,網(wǎng)上例子都不是很靠譜.自己學(xué)的也非常頭疼,一拖就是一年多.

新的程序,采用三段過程
首先 使用一個(gè)線程池不斷的發(fā)送連接請(qǐng)求,但是不處理接收.僅僅注冊(cè)一個(gè)SelectionKey.OP_READ的鍵
另外的一個(gè)單線程 程序,不斷select符合條件的通道,然后分配給另外一個(gè)線程池,用于接收數(shù)據(jù),解析數(shù)據(jù).(接收和解析的過程合并了)
最后,使用一個(gè)單線程的程序,不斷的把結(jié)果通過批量的方式刷入數(shù)據(jù)庫(kù).這塊也算一個(gè)優(yōu)化.由單條Insert改為批量入庫(kù).這塊至少節(jié)約了一個(gè)CPU核的處理能力.

持久化過程和解析過程 基本復(fù)用了原來的代碼

<ol start="1" class="dp-j"  white-space:normal;"="">

  • import java.io.IOException;  

  • import java.net.InetSocketAddress;  

  • import java.net.SocketAddress;  

  • import java.nio.ByteBuffer;  

  • import java.nio.channels.SelectionKey;  

  • import java.nio.channels.Selector;  

  • import java.nio.channels.SocketChannel;  

  • import java.nio.charset.Charset;  

  • import java.sql.Connection;  

  • import java.sql.DriverManager;  

  • import java.sql.PreparedStatement;  

  • import java.sql.SQLException;  

  • import java.util.ArrayList;  

  • import java.util.HashSet;  

  • import java.util.Iterator;  

  • import java.util.List;  

  • import java.util.Set;  

  • import java.util.concurrent.BlockingQueue;  

  • import java.util.concurrent.CopyOnWriteArrayList;  

  • import java.util.concurrent.ExecutorService;  

  • import java.util.concurrent.Executors;  

  • import java.util.concurrent.LinkedBlockingQueue;  

  • import java.util.concurrent.atomic.AtomicInteger;  

  • import java.util.regex.Matcher;  

  • import java.util.regex.Pattern;  

  •   

  • public class Probe {  

  •     private static final int REQUESTTHREADCOUNT = 10;  

  •     private static final BlockingQueue CONNECTLIST = new LinkedBlockingQueue();  

  •     private static final BlockingQueue PERSISTENCELIST = new LinkedBlockingQueue();  

  •   

  •     private static ExecutorService REQUESTTHREADPOOL;  

  •     private static ExecutorService RESPONSETHREADPOOL;  

  •   

  •     private static ExecutorService PERSISTENCETHREADPOOL;  

  •     private static final List DOMAINLIST = new CopyOnWriteArrayList<>();  

  •     private static Selector SELECTOR;  

  •     static {  

  •         REQUESTTHREADPOOL = Executors.newFixedThreadPool(REQUESTTHREADCOUNT);  

  •         RESPONSETHREADPOOL = Executors.newFixedThreadPool(3);  

  •         PERSISTENCETHREADPOOL = Executors.newFixedThreadPool(1);  

  •         DOMAINLIST.add("news.163.com");  

  •         try {  

  •             SELECTOR = Selector.open();  

  •         } catch (IOException e) {  

  •             e.printStackTrace();  

  •         }  

  •     }  

  •   

  •     public static void main(String[] args) throws IOException, InterruptedException {  

  •         long start = System.currentTimeMillis();  

  •         CONNECTLIST.put(new Task("news.163.com", 80, "/index.html"));  

  •         for (int i = 0; i < REQUESTTHREADCOUNT; i++) {  

  •             REQUESTTHREADPOOL.submit(new RequestHandler(CONNECTLIST, SELECTOR));  

  •         }  

  •         RESPONSETHREADPOOL  

  •                 .submit(new ResponseHandler(SELECTOR, CONNECTLIST, PERSISTENCELIST, DOMAINLIST, RESPONSETHREADPOOL));  

  •         PERSISTENCETHREADPOOL.submit(new PersistenceHandler(PERSISTENCELIST));  

  •   

  •         while (true) {  

  •             Thread.sleep(1000);  

  •             long end = System.currentTimeMillis();  

  •             float interval = ((end - start) / 1000);  

  •             int connectTotal = ResponseHandler.GETCOUNT();  

  •   

  •             int persistenceTotal = PersistenceHandler.GETCOUNT();  

  •   

  •             int connectps = Math.round(connectTotal / interval);  

  •             int persistenceps = Math.round(persistenceTotal / interval);  

  •             System.out.print(  

  •                     "\r連接總數(shù):" + connectTotal + " \t每秒連接:" + connectps + "\t連接隊(duì)列剩余:" + CONNECTLIST.size() + " \t持久化總數(shù):"  

  •                             + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化隊(duì)列剩余:" + PERSISTENCELIST.size());  

  •         }  

  •     }  

  • }  

  •   

  • class RequestHandler implements Runnable {  

  •     BlockingQueue connectlist;  

  •     Selector selector;  

  •   

  •     public RequestHandler(BlockingQueue connectlist, Selector selector) {  

  •         this.connectlist = connectlist;  

  •         this.selector = selector;  

  •     }  

  •   

  •     @Override  

  •     public void run() {  

  •         while (true) {  

  •             try {  

  •                 Task task = (Task) connectlist.take();  

  •                 SocketAddress addr = new InetSocketAddress(task.getHost(), 80);  

  •                 SocketChannel socketChannel = SocketChannel.open(addr);  

  •   

  •                 socketChannel.configureBlocking(false);  

  •   

  •                 ByteBuffer byteBuffer = ByteBuffer.allocate(2400);  

  •                 byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));  

  •                 byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));  

  •                 byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));  

  •                 byteBuffer.put(("\r\n").getBytes("utf8"));  

  •                 byteBuffer.flip();  

  •                 socketChannel.write(byteBuffer);  

  •                 byteBuffer.clear();  

  •   

  •                 socketChannel.register(selector, SelectionKey.OP_READ, task);  

  •                 selector.wakeup();  

  •             } catch (Exception e) {  

  •                 e.printStackTrace();  

  •             }  

  •         }  

  •     }  

  • }  

  •   

  • class ResponseHandler implements Runnable {  

  •     Selector selector;  

  •     BlockingQueue connectlist;  

  •     BlockingQueue persistencelist;  

  •     List domainlist;  

  •     ExecutorService threadPool;  

  •     Charset charset = Charset.forName("utf8");  

  •     Charset gbkcharset = Charset.forName("gbk");  

  •   

  •     public static int GETCOUNT() {  

  •         return COUNT.get();  

  •     }  

  •   

  •     private static final AtomicInteger COUNT = new AtomicInteger();  

  •   

  •     public ResponseHandler(Selector selector, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist,  

  •             ExecutorService threadpool) {  

  •         this.selector = selector;  

  •         this.connectlist = connectlist;  

  •         this.persistencelist = persistencelist;  

  •         this.domainlist = domainlist;  

  •         this.threadPool = threadpool;  

  •     }  

  •   

  •     @Override  

  •     public void run() {  

  •         while (true) {  

  •             try {  

  •                 int n = selector.selectNow();  

  •                 if (n == 0)  

  •                     continue;  

  •                 Iterator it = selector.selectedKeys().iterator();  

  •                 while (it.hasNext()) {  

  •   

  •                     SelectionKey key = (SelectionKey) it.next();  

  •                     if (key.isReadable() && key.isValid()) {  

  •                         key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));  

  •                         Runnable r = new Runnable() {  

  •   

  •                             @Override  

  •                             public void run() {  

  •                                 try {  

  •                                     Task task = (Task) key.attachment();  

  •   

  •                                     ByteBuffer byteBuffer = ByteBuffer.allocate(2400);  

  •                                     SocketChannel channel = (SocketChannel) key.channel();  

  •   

  •                                     int length;  

  •                                     while ((length = channel.read(byteBuffer)) > 0) {  

  •                                         byteBuffer.flip();  

  •                                         task.appendContent(charset.decode(charset.encode(gbkcharset.decode(byteBuffer)))  

  •                                                 .toString());  

  •   

  •                                         byteBuffer.compact();  

  •                                     }  

  •                                     if (length == -1) {  

  •                                         channel.close();  

  •                                           

  •                                         COUNT.incrementAndGet();  

  •                                         new ParseHandler(task, connectlist, persistencelist, domainlist).handler();  

  •                                     } else {  

  •                                         channel.register(selector, SelectionKey.OP_READ, task);  

  •                                     }  

  •                                     key.selector().wakeup();  

  •                                 } catch (Exception e) {  

  •                                     try {  

  •                                         key.cancel();  

  •                                         key.channel().close();  

  •                                     } catch (IOException e1) {  

  •                                         e1.printStackTrace();  

  •                                     }  

  •                                     e.printStackTrace();  

  •                                 }  

  •   

  •                             }  

  •                         };  

  •                         threadPool.submit(r);  

  •                     }  

  •                     it.remove();  

  •                 }  

  •   

  •             } catch (Exception e) {  

  •                 e.printStackTrace();  

  •             }  

  •         }  

  •   

  •     }  

  • }  

  •   

  • class ParseHandler {  

  •     private static final Set SET = new HashSet();  

  •   

  •     private BlockingQueue connectlist;  

  •   

  •     private BlockingQueue persistencelist;  

  •     List domainlist;  

  •   

  •     Task task;  

  •   

  •     private interface Filter {  

  •         void doFilter(Task fatherTask, Task newTask, String path, Filter chain);  

  •     }  

  •   

  •     private class FilterChain implements Filter {  

  •         private List list = new ArrayList();  

  •   

  •         {  

  •             addFilter(new TwoLevel());  

  •             addFilter(new OneLevel());  

  •             addFilter(new FullPath());  

  •             addFilter(new Root());  

  •             addFilter(new Default());  

  •         }  

  •   

  •         private void addFilter(Filter filter) {  

  •             list.add(filter);  

  •         }  

  •   

  •         private Iterator it = list.iterator();  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (it.hasNext()) {  

  •                 ((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);  

  •             }  

  •         }  

  •   

  •     }  

  •   

  •     private class TwoLevel implements Filter {  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (path.startsWith("../../")) {  

  •                 String prefix = getPrefix(fatherTask.getCurrentPath(), 3);  

  •                 newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));  

  •             } else {  

  •                 chain.doFilter(fatherTask, newTask, path, chain);  

  •             }  

  •   

  •         }  

  •     }  

  •   

  •     private class OneLevel implements Filter {  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (path.startsWith("../")) {  

  •                 String prefix = getPrefix(fatherTask.getCurrentPath(), 2);  

  •                 newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));  

  •             } else {  

  •                 chain.doFilter(fatherTask, newTask, path, chain);  

  •             }  

  •   

  •         }  

  •   

  •     }  

  •   

  •     private class FullPath implements Filter {  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (path.startsWith("http://")) {  

  •                 Iterator it = domainlist.iterator();  

  •                 boolean flag = false;  

  •                 while (it.hasNext()) {  

  •                     String domain = (String) it.next();  

  •                     if (path.startsWith("http://" + domain + "/")) {  

  •                         newTask.init(domain, fatherTask.getPort(), path.replace("http://" + domain + "/", "/"));  

  •                         flag = true;  

  •                         break;  

  •                     }  

  •                 }  

  •                 if (!flag) {  

  •                     newTask.setValid(false);  

  •                 }  

  •             } else {  

  •                 chain.doFilter(fatherTask, newTask, path, chain);  

  •             }  

  •         }  

  •   

  •     }  

  •   

  •     private class Root implements Filter {  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (path.startsWith("/")) {  

  •                 newTask.init(fatherTask.getHost(), fatherTask.getPort(), path);  

  •             } else {  

  •                 chain.doFilter(fatherTask, newTask, path, chain);  

  •             }  

  •         }  

  •   

  •     }  

  •   

  •     private class Default implements Filter {  

  •   

  •         @Override  

  •         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  

  •             if (path.contains(":")) {  

  •                 newTask.setValid(false);  

  •                 return;  

  •             }  

  •             String prefix = getPrefix(fatherTask.getCurrentPath(), 1);  

  •             newTask.init(fatherTask.getHost(), fatherTask.getPort(), prefix + "/" + path);  

  •         }  

  •     }  

  •   

  •     public ParseHandler(Task task, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist) {  

  •         this.connectlist = connectlist;  

  •         this.task = task;  

  •         this.persistencelist = persistencelist;  

  •         this.domainlist = domainlist;  

  •     }  

  •   

  •     private Pattern pattern = Pattern.compile("\"[^\"]+\\.htm[^\"]*\"");  

  •   

  •     protected void handler() {  

  •         try {  

  •             parseTaskState(task);  

  •             if (200 == task.getState()) {  

  •                 Matcher matcher = pattern.matcher(task.getContent());  

  •                 while (matcher.find()) {  

  •                     String path = matcher.group();  

  •                     if (!path.contains(" ") && !path.contains("\t") && !path.contains("(") && !path.contains(")")) {  

  •                         path = path.substring(1, path.length() - 1);  

  •   

  •                         createNewTask(task, path);  

  •                     }  

  •                 }  

  •             }  

  •             task.dropContent();  

  •             persistencelist.put(task);  

  •         } catch (Exception e) {  

  •             e.printStackTrace();  

  •         }  

  •     }  

  •   

  •     private void parseTaskState(Task task) {  

  •         if (task.getContent().startsWith("HTTP/1.1")) {  

  •             task.setState(Integer.parseInt(task.getContent().substring(9, 12)));  

  •         } else {  

  •             task.setState(Integer.parseInt(task.getContent().substring(9, 12)));  

  •         }  

  •     }  

  •   

  •     /**  

  •      * @param fatherTask  

  •      * @param path  

  •      * @throws Exception  

  •      */  

  •     private void createNewTask(Task fatherTask, String path) throws Exception {  

  •         Task newTask = new Task();  

  •         FilterChain filterchain = new FilterChain();  

  •         filterchain.doFilter(fatherTask, newTask, path, filterchain);  

  •         if (newTask.isValid()) {  

  •             synchronized (SET) {  

  •                 if (SET.contains(newTask.getHost() + newTask.getCurrentPath())) {  

  •                     return;  

  •                 }  

  •                 SET.add(newTask.getHost() + newTask.getCurrentPath());  

  •             }  

  •             connectlist.put(newTask);  

  •         }  

  •     }  

  •   

  •     private String getPrefix(String s, int count) {  

  •         String prefix = s;  

  •         while (count > 0) {  

  •             prefix = prefix.substring(0, prefix.lastIndexOf("/"));  

  •             count--;  

  •         }  

  •         return "".equals(prefix) ? "/" : prefix;  

  •     }  

  • }  

  •   

  • class Task {  

  •     public Task() {  

  •     }  

  •   

  •     public void init(String host, int port, String path) {  

  •         this.setCurrentPath(path);  

  •         this.host = host;  

  •         this.port = port;  

  •     }  

  •   

  •     public Task(String host, int port, String path) {  

  •         init(host, port, path);  

  •     }  

  •   

  •     private String host;  

  •     private int port;  

  •     private String currentPath;  

  •     private long starttime;  

  •     private long endtime;  

  •   

  •     public long getStarttime() {  

  •         return starttime;  

  •     }  

  •   

  •     public void setStarttime(long starttime) {  

  •         this.starttime = starttime;  

  •     }  

  •   

  •     public long getEndtime() {  

  •         return endtime;  

  •     }  

  •   

  •     public void setEndtime(long endtime) {  

  •         this.endtime = endtime;  

  •     }  

  •   

  •     private long taskTime;  

  •     private String type;  

  •     private StringBuilder content = new StringBuilder(2400);  

  •     private int state;  

  •     private boolean isValid = true;  

  •   

  •     public boolean isValid() {  

  •         return isValid;  

  •     }  

  •   

  •     public void setValid(boolean isValid) {  

  •         this.isValid = isValid;  

  •     }  

  •   

  •     public int getState() {  

  •         return state;  

  •     }  

  •   

  •     public void setState(int state) {  

  •         this.state = state;  

  •     }  

  •   

  •     public String getCurrentPath() {  

  •         return currentPath;  

  •     }  

  •   

  •     public void setCurrentPath(String currentPath) {  

  •         this.currentPath = currentPath;  

  •         int i = 0;  

  •         if (currentPath.indexOf("?") != -1) {  

  •             i = currentPath.indexOf("?");  

  •         } else {  

  •             if (currentPath.indexOf("#") != -1) {  

  •                 i = currentPath.indexOf("#");  

  •             } else {  

  •                 i = currentPath.length();  

  •             }  

  •         }  

  •         this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);  

  •     }  

  •   

  •     public long getTaskTime() {  

  •         return getEndtime() - getStarttime();  

  •     }  

  •   

  •     public String getType() {  

  •         return type;  

  •     }  

  •   

  •     public void setType(String type) {  

  •         this.type = type;  

  •     }  

  •   

  •     public String getHost() {  

  •         return host;  

  •     }  

  •   

  •     public int getPort() {  

  •         return port;  

  •     }  

  •   

  •     public String getContent() {  

  •         return content.toString();  

  •     }  

  •   

  •     public void dropContent() {  

  •         this.content = null;  

  •   

  •     }  

  •   

  •     public void appendContent(String content) {  

  •         this.content.append(content);  

  •     }  

  • }  

  •   

  • class PersistenceHandler implements Runnable {  

  •     static {  

  •         try {  

  •             Class.forName("oracle.jdbc.OracleDriver");  

  •         } catch (ClassNotFoundException e) {  

  •             // TODO Auto-generated catch block  

  •             e.printStackTrace();  

  •         }  

  •     }  

  •   

  •     public static int GETCOUNT() {  

  •         return COUNT.get();  

  •     }  

  •   

  •     private static final AtomicInteger COUNT = new AtomicInteger();  

  •     private BlockingQueue persistencelist;  

  •   

  •     public PersistenceHandler(BlockingQueue persistencelist) {  

  •         this.persistencelist = persistencelist;  

  •         try {  

  •             conn = DriverManager.getConnection("jdbc:oracle:thin:127.0.0.1:1521:orcl", "edmond", "edmond");  

  •             ps = conn.prepareStatement(  

  •                     "insert into probe(id,host,path,state,tasktime,type) values(seq_probe_id.nextval,?,?,?,?,?)");  

  •         } catch (SQLException e) {  

  •             // TODO Auto-generated catch block  

  •             e.printStackTrace();  

  •         }  

  •     }  

  •   

  •     private Connection conn;  

  •     private PreparedStatement ps;  

  •   

  •     @Override  

  •     public void run() {  

  •         while (true) {  

  •             this.handler();  

  •             COUNT.addAndGet(1);  

  •         }  

  •     }  

  •   

  •     private void handler() {  

  •         try {  

  •             Task task = (Task) persistencelist.take();  

  •             ps.setString(1, task.getHost());  

  •             ps.setString(2, task.getCurrentPath());  

  •             ps.setInt(3, task.getState());  

  •             ps.setLong(4, task.getTaskTime());  

  •             ps.setString(5, task.getType());  

  •   

  •             ps.addBatch();  

  •             if (GETCOUNT() % 500 == 0) {  

  •                 ps.executeBatch();  

  •                 conn.commit();  

  •             }  

  •         } catch (InterruptedException e) {  

  •             e.printStackTrace();  

  •         } catch (SQLException e) {  

  •             e.printStackTrace();  

  •         }  

  •     }  

  • }  


  • 每秒可以爬170-200左右的網(wǎng)頁(yè)


  • 網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的

  • 因?yàn)檫@個(gè)速度受制于公司帶寬.


  • 網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的

  • CPU也基本上跑滿了


  • 網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的


  • 這個(gè)程序還有優(yōu)化的空間,主要是以下代碼的阻塞和喚醒關(guān)系,還是沒有搞明白.


  • socketChannel.register(selector, SelectionKey.OP_READ, task);


  • int n = selector.select();


  • key.selector().wakeup();

以上就是網(wǎng)頁(yè)主動(dòng)探測(cè)中的NIO優(yōu)化是怎樣的,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI