您好,登錄后才能下訂單哦!
1、任何的高并發(fā),請求總是會有一個順序的
2、java的隊(duì)列的數(shù)據(jù)結(jié)構(gòu)是先進(jìn)先出的取值順序
3、BlockingQueue類(線程安全)(使用方法可以百度)
一般使用LinkedBlockingQueue
利用以上幾點(diǎn),我們可以把高并發(fā)時候的請求放入一個隊(duì)列,隊(duì)列的大小可以自己定義,比如隊(duì)列容量為1000個數(shù)據(jù),那么可以利用過濾器或者攔截器把當(dāng)前的請求放入隊(duì)列,如果隊(duì)列的容量滿了,其余的請求可以丟掉或者作出相應(yīng)回復(fù)
具體實(shí)施:
利用生產(chǎn)者、消費(fèi)者模型:
將隊(duì)列的請求一一處理完。
上代碼:
/** * @author fuguangli * @description 前沿消費(fèi)者類 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Customer implements Runnable{ /** * 拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Customer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println("消費(fèi)者線程啟動..."); LockFlag.setCustomerRunningFlag(true); try { while (LockFlag.getProducerRunningFlag()){ System.out.println(Thread.currentThread().getId()+"I'm Customer.Queue current size="+blockingQueue.size()); String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS); if(data!=null){ System.out.println(Thread.currentThread().getId()+"*************正在消費(fèi)數(shù)據(jù) data="+data); }else{ //表示超過取值時間,視為生產(chǎn)者不再生產(chǎn)數(shù)據(jù) System.out.println(Thread.currentThread().getId()+"隊(duì)列為空無數(shù)據(jù),請檢查生產(chǎn)者是否阻塞"); } Thread.sleep(50); } System.err.println("消費(fèi)者程序執(zhí)行完畢"); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("消費(fèi)者程序退出"); LockFlag.setCustomerRunningFlag(false);//異常退出線程 Thread.currentThread().interrupt(); } } }
package com.qysxy.framework.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author fuguangli * @description 隊(duì)列生產(chǎn)者類 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Producer implements Runnable{ /** * 拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println("生產(chǎn)者線程啟動..."); LockFlag.setProducerRunningFlag(true); try { while (LockFlag.getProducerRunningFlag()){ String data = "data:"+count.incrementAndGet(); if(blockingQueue.offer(data,10, TimeUnit.SECONDS)){ //返回true表示生產(chǎn)數(shù)據(jù)正確 System.out.println("^^^^^^^^^^^^^^正在生產(chǎn)數(shù)據(jù) data="+data); }else { //表示阻塞時間內(nèi)還沒有生產(chǎn)者生產(chǎn)數(shù)據(jù) System.out.println("生產(chǎn)者異常,無法生產(chǎn)數(shù)據(jù)"); } Thread.sleep(50); } } catch (InterruptedException e) { e.printStackTrace(); System.err.println("生產(chǎn)者程序退出"); LockFlag.setProducerRunningFlag(false);//異常退出線程 Thread.currentThread().interrupt(); } } }
package com.qysxy.framework.queue; /** * @author fuguangli * @description 前沿生產(chǎn)者消費(fèi)者模型的鎖類 * @Create date: 2017/3/7 */ public class LockFlag { /** * 生產(chǎn)者互斥鎖 */ private static Boolean producerRunningFlag = false; /** * 消費(fèi)者互斥鎖 */ private static Boolean customerRunningFlag = false; public static Boolean getProducerRunningFlag() { return producerRunningFlag; } public static void setProducerRunningFlag(Boolean producerRunningFlag) { LockFlag.producerRunningFlag = producerRunningFlag; } public static Boolean getCustomerRunningFlag() { return customerRunningFlag; } public static void setCustomerRunningFlag(Boolean customerRunningFlag) { LockFlag.customerRunningFlag = customerRunningFlag; } }
package com.qysxy.framework.queue; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Queue; import java.util.concurrent.*; /** * @author fuguangli * @description 前沿隊(duì)列實(shí)用類,用于大量并發(fā)用戶 * @Create date: 2017/3/7 */ public class BlockingQueueHelper { private static final Integer maxQueueSize = 1000; private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize); private static ExecutorService threadPool = Executors.newCachedThreadPool(); public static BlockingQueue getBlockingQueue() { if (blockingQueue == null) { blockingQueue = new LinkedBlockingQueue(maxQueueSize); } return blockingQueue; } /** * @param o 隊(duì)列處理對象(包含request,response,data) */ public static void requestQueue(Object o) { //檢測當(dāng)前的隊(duì)列大小 if (blockingQueue != null && blockingQueue.size() < maxQueueSize) { //可以正常進(jìn)入隊(duì)列 if (blockingQueue.offer(o)) { //添加成功,檢測數(shù)據(jù)處理線程是否正常 if (LockFlag.getCustomerRunningFlag()) { //說明處理線程類正常運(yùn)行 } else { //說明處理線程類停止,此時,應(yīng)重新啟動線程進(jìn)行數(shù)據(jù)處理 LockFlag.setCustomerRunningFlag(true); //example:run Customer customer = new Customer(blockingQueue); threadPool.execute(customer); } } else { //進(jìn)入隊(duì)列失敗,做出相應(yīng)的處理,或者嘗試重新進(jìn)入隊(duì)列 } } else { //隊(duì)列不正常,或隊(duì)列大小已達(dá)上限,做出相應(yīng)處理 } } }
好了,這時候,利用過濾器或者攔截器將每個請求封裝成隊(duì)列元素進(jìn)行處理就行。
當(dāng)然了,對于多應(yīng)用服務(wù)器的部署架構(gòu)來說,數(shù)據(jù)庫也需要加鎖,數(shù)據(jù)庫隔離級別下篇再說。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。