Java并發(fā):juc Executor框架詳解_第1頁(yè)
Java并發(fā):juc Executor框架詳解_第2頁(yè)
Java并發(fā):juc Executor框架詳解_第3頁(yè)
Java并發(fā):juc Executor框架詳解_第4頁(yè)
Java并發(fā):juc Executor框架詳解_第5頁(yè)
已閱讀5頁(yè),還剩4頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Executor框架是juc里提供的線程池的實(shí)現(xiàn)。前兩天看了下Executor框架的一些源碼,做個(gè)簡(jiǎn)單的總結(jié)。線程池大概的思路是維護(hù)一個(gè)的線程池用于執(zhí)行提交的任務(wù)。我理解池的技術(shù)的主要意義有兩個(gè):資源的控制,如并發(fā)量限制。像連接池這種是對(duì)數(shù)據(jù)庫(kù)資源的保護(hù)。資源的有效利用,如線程復(fù)用,避免頻繁創(chuàng)建線程和線程上下文切換。那么想象中設(shè)計(jì)一個(gè)線程池就需要有線程池大小、線程生命周期管理、等待隊(duì)列等等功能,下面結(jié)合代碼看看原理。Excutor整體結(jié)構(gòu)如下:Executor接口定義了最基本的execute方法,用于接收用戶提交任務(wù)。ExecutorService定義了線程池終止和創(chuàng)建及提交futureTask任務(wù)支持的方法。AbstractExecutorService是抽象類,主要實(shí)現(xiàn)了ExecutorService和futureTask相關(guān)的一些任務(wù)創(chuàng)建和提交的方法。ThreadPoolExecutor是最核心的一個(gè)類,是線程池的內(nèi)部實(shí)現(xiàn)。線程池的功能都在這里實(shí)現(xiàn)了,平時(shí)用的最多的基本就是這個(gè)了。其源碼很精練,遠(yuǎn)沒當(dāng)時(shí)想象的多。ScheduledThreadPoolExecutor在ThreadPoolExecutor的基礎(chǔ)上提供了支持定時(shí)調(diào)度的功能。線程任務(wù)可以在一定延時(shí)時(shí)間后才被觸發(fā)執(zhí)行。一_ThreadPoolExecutor原理1.1ThreadPoolExecutor內(nèi)部的幾個(gè)重要屬性線程池本身的狀態(tài)Java代碼_volatileintrunState;staticfinalintRUNNING=0;staticfinalintSHUTDOWN=1;staticfinalintSTOP=2;staticfinalintTERMINATED=3;等待任務(wù)隊(duì)列和工作集Java代碼 privatefinalBlockingQueue<Runnable>workQueue;//等待被執(zhí)行的Runnable任務(wù)privatefinalHashSet<Worker>workers=newHashSet<Worker>();//正在被執(zhí)行的Worker任務(wù)集線程池的主要狀態(tài)鎖。線程池內(nèi)部的狀態(tài)變化(如線程大小)都需要基于此鎖。Java代碼—1.privatefinalReentrantLockmainLock=newReentrantLock();線程的存活時(shí)間和大小Java代碼_privatevolatilelongkeepAliveTime;//線程存活時(shí)間privatevolatilebooleanallowCoreThreadTimeOut;//是否允許核心線程存活privatevolatileintcorePoolSize;//核心池大小privatevolatileintmaximumPoolSize;//最大池大小privatevolatileintpoolSize;//當(dāng)前池大小privateintlargestPoolSize;//最大池大小,區(qū)別于maximumPoolSize,是用于記錄線程池曾經(jīng)達(dá)到過的最大并發(fā),理論上小于等于maximumPoolSize。線程工廠和拒絕策略Java代碼 privatevolatileRejectedExecutionHandlerhandler;//拒絕策略,用于當(dāng)線程池?zé)o法承載新線程是的處理策略。privatevolatileThreadFactorythreadFactory;//線程工廠,用于在線程池需要新創(chuàng)建線程的時(shí)候創(chuàng)建線程線程池完成任務(wù)數(shù)Java代碼_"LprivatelongcompletedTaskCount;//線程池運(yùn)行到U當(dāng)前完成的任務(wù)數(shù)總和1.2ThreadPoolExecutor的內(nèi)部工作原理有了以上定義好的數(shù)據(jù),下面來看看內(nèi)部是如何實(shí)現(xiàn)的。DougLea的整個(gè)思路總結(jié)起來就是5句話:如果當(dāng)前池大小poolSize小于corePoolSize,則創(chuàng)建新線程執(zhí)行任務(wù)。如果當(dāng)前池大小poolSize大于corePoolSize,且等待隊(duì)列未滿,則進(jìn)入等待隊(duì)列如果當(dāng)前池大小poolSize大于corePoolSize且小于maximumPoolSize且等待隊(duì)列已滿,則創(chuàng)建新線程執(zhí)行任務(wù)。如果當(dāng)前池大小poolSize大于corePoolSize且大于maximumPoolSize且等待隊(duì)列已滿,則調(diào)用拒絕策略來處理該任務(wù)。

線程池里的每個(gè)線程執(zhí)行完任務(wù)后不會(huì)立刻退出,而是會(huì)去檢查下等待隊(duì)列里是否還有線程任務(wù)需要執(zhí)行,如果在keepAliveTime里等不到新的任務(wù)了,那么線程就會(huì)退出。下面看看代碼實(shí)現(xiàn):線程池最重要的方法是由Executor接口定義的execute方法,是任務(wù)提交的入口。我們看看ThreadPoolExecutor.execute(Runnablecmd)的實(shí)現(xiàn):Java代碼 publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command)){if(runState==RUNNING&&workQueue.offer(command)){if(runState!=RUNNING||poolSize==0)ensureQueuedTaskHandled(command);TOC\o"1-5"\h\z}elseif(!addIfUnderMaximumPoolSize(command))reject(command);//isshutdownorsaturated\o"CurrentDocument"}\o"CurrentDocument"}解釋如下:當(dāng)提交一個(gè)新的Runnable任務(wù):分支1: 如果當(dāng)前池大小小于corePoolSize,執(zhí)行addIfUnderCorePoolSize(command),如果線程池處于運(yùn)行狀態(tài)且poolSize<corePoolSizeaddIfUnderCorePoolSize(command)會(huì)做如下事情,將Runnable任務(wù)封裝成Worker任務(wù),創(chuàng)建新的Thread,執(zhí)行Worker任務(wù)。如果不滿足條件,則返回false。代碼如下:Java代碼—privatebooleanaddIfUnderCorePoolSize(RunnablefirstTask){Threadt=null;finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{if(poolSize<corePoolSize&&runState==RUNNING)

t=addThread(firstTask);}finally{mainLock.unlock();}if(t==null)returnfalse;t.start();returntrue;}分支2:如果大于corePoolSize或1失敗失敗,則:如果等待隊(duì)列未滿,把Runnable任務(wù)加入到workQueue等待隊(duì)列workQueue.offer(command)?如多等待隊(duì)列已經(jīng)滿了,調(diào)用addlfUnderMaximumPoolSize(command),和addIfUnderCorePoolSize基本類似,只不過判斷條件是poolSize<maximumPoolSize。如果大于maximumPoolSize,則ij把Runnable任務(wù)交由RejectedExecutionHandler來處理。問題:如何實(shí)現(xiàn)線程的復(fù)用?DougLea的實(shí)現(xiàn)思路是線程池里的每個(gè)線程執(zhí)行完任務(wù)后不立刻退出,而是去檢查下等待隊(duì)列里是否還有線程任務(wù)需要執(zhí)行,如果在keepAliveTime里等不到新的任務(wù)了,那么線程就會(huì)退出。這個(gè)功能的實(shí)現(xiàn)關(guān)鍵在于Worker。線程池在執(zhí)行Runnable任務(wù)的時(shí)候,并不單純把Runnable任務(wù)交給創(chuàng)建一個(gè)Thread。而是會(huì)把Runnable任務(wù)封裝成Worker任務(wù)。下面看看Worker的實(shí)現(xiàn):代碼很簡(jiǎn)單,可以看出,worker里面包裝了firstTask屬性,在構(gòu)造worker的時(shí)候傳進(jìn)來的那個(gè)Runnable任務(wù)就是firstTask。同時(shí)也實(shí)現(xiàn)了Runnable接口,所以是個(gè)代理模式,看看代理增加了哪些功能。關(guān)鍵看woker的run方法:Java代碼 Lpublicvoidrun(){try{Runnabletask=firstTask;firstTask=null;while(task!=null||(task=getTask())!=null){runTask(task);task=null;}}finally{workerDone(this);11.12.可以看出worker的run方法是一個(gè)循環(huán),第一次循環(huán)運(yùn)行的必然是firstTask,在運(yùn)行完firstTask之后,并不會(huì)立刻結(jié)束,而是會(huì)調(diào)用getTask獲取新的任務(wù)(getTask會(huì)從等待隊(duì)列里獲取等待中的任務(wù)),如果keepAliveTime時(shí)間內(nèi)得到新任務(wù)則繼續(xù)執(zhí)行,得不到新任務(wù)則那么線程才會(huì)退出。這樣就保證了多個(gè)任務(wù)可以復(fù)用一個(gè)線程,而不是每次都創(chuàng)建新任務(wù)。keepAliveTime的邏輯在哪里實(shí)現(xiàn)的呢?主要是利用了BlockingQueue的poll方法支持等待??煽磄etTask的代碼段:Java代碼_「「if(state==SHUTDOWN)//Helpdrainqueuer=workQueue.poll();elseif(poolSize>corePoolSize||allowCoreThreadTimeOut)r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);elser=workQueue.take();ThreadFactory和RejectedExecutionHandlerThreadFactory和RejectedExecutionHandler是ThreadPoolExecutor的兩個(gè)屬性,也可以認(rèn)為是兩個(gè)簡(jiǎn)單的擴(kuò)展點(diǎn)?ThreadFactory是創(chuàng)建線程的工廠。默認(rèn)的線程工廠會(huì)創(chuàng)建一個(gè)帶有“pool-poolNumber-thread-threadNumber”為名字的線程,如果我們有特別的需要,如線程組命名、優(yōu)先級(jí)等,可以定制自己的ThreadFactory。RejectedExecutionHandler是拒絕的策略。常見有以下幾種:AbortPolicy:不執(zhí)行,會(huì)拋出RejectedExecutionException異常。CallerRunsPolicy:由調(diào)用者(調(diào)用線程池的主線程)執(zhí)行。DiscardOldestPolicy:拋棄等待隊(duì)列中最老的。DiscardPolicy:不做任何處理,即拋棄當(dāng)前任務(wù)。3.ScheduledThreadPoolExecutorScheduleThreadPoolExecutor是對(duì)ThreadPoolExecutor的集成。增加了定時(shí)觸發(fā)線程任務(wù)的功能。需要注意

從內(nèi)部實(shí)現(xiàn)看,ScheduleThreadPoolExecutor使用的是corePoolSize線程和一個(gè)無界隊(duì)列的固定大小的池,所以調(diào)整maximumPoolSize沒有效果。無界隊(duì)列是一個(gè)內(nèi)部自定義的DelayedWorkQueue。ScheduleThreadPoolExecutor線程池接收定時(shí)任務(wù)的方法是schedule,看看內(nèi)部實(shí)現(xiàn):Java代碼—publicScheduledFuture<?>schedule(Runnablecommand,longdelay,TimeUnitunit){if(command==null||unit==null)thrownewNullPointerException();RunnableScheduledFuture<?>t=decorateTask(command,newScheduledFutureTask<Void>(command,null,triggerTime(delay,unit)));9.delayedExecute(t);returnt;}以上代碼會(huì)初始化一個(gè)RunnableScheduledFuture類型的任務(wù)t,并交給delayedExecute方法。delayedExecute(t)方法實(shí)現(xiàn)如下:Java代碼Java代碼..9.}privatevoiddelayedExecute(Runnablecommand){if(isShutdown()){reject(command);return;}if(getPoolSize()<getCorePoolSize())prestartCoreThread();super.getQueue().add(command);如果當(dāng)前線程池大小poolSize小于CorePoolSize,則創(chuàng)建一個(gè)新的線程,注意這里創(chuàng)建的線程是空的,不會(huì)把任務(wù)直接交給線程來做,而是把線程任務(wù)放到隊(duì)列里。因?yàn)槿蝿?wù)是要定時(shí)觸發(fā)的,所以不能直接交給線程去執(zhí)行。問題:那如何做到定時(shí)觸發(fā)呢?

關(guān)鍵在于DelayedWorkQueue,它代理了DelayQueue??梢哉J(rèn)為DelayQueue是這樣一個(gè)隊(duì)列(具體可以去看下源碼,不詳細(xì)分析):隊(duì)列里的元素按照任務(wù)的delay時(shí)間長(zhǎng)短升序排序,delay時(shí)間短的在隊(duì)頭,delay時(shí)間長(zhǎng)的在隊(duì)尾。從DelayQueue里FIFO的獲取一個(gè)元素的時(shí)候,不會(huì)直接返回head。可能會(huì)阻塞,等到head節(jié)點(diǎn)到達(dá)delay時(shí)間后才能被獲取??梢钥聪翫elayQueue的take方法實(shí)現(xiàn):Java代碼_publicEtake()throwsInterruptedException{final ReentrantLock lock=this.lock;lock.lockInterruptibly();try{for(;;){E first= q.peek();if(first ==null){available.await();}else{longdelay=first.getDelay(TimeUnit.NANOSECONDS);if(delay>0){longtl=available.awaitNanos(delay);//等待delay時(shí)間}else{Ex=q.poll();assertx!=null;if(q.size()!=0)available.signalAll();//wakeupothertakersreturnx;TOC\o"1-5"\h\z}}}}finally{lock.unlock();}}線程池使用策略

通過以上的詳解基本上能夠定制出自己需要的策略了,下面簡(jiǎn)單介紹下Executors里面提供的一些常見線程池策略:I.FixedThreadPoolJava代碼—publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());}實(shí)際上就是個(gè)不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。2.SingleThreadExecutorJava代碼 publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>()));}實(shí)際上就是個(gè)不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。3.CachedThreadPoolJava代碼 publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadP

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論