線程池的研究及實現(xiàn)_第1頁
線程池的研究及實現(xiàn)_第2頁
線程池的研究及實現(xiàn)_第3頁
線程池的研究及實現(xiàn)_第4頁
線程池的研究及實現(xiàn)_第5頁
已閱讀5頁,還剩12頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、什么是線程池?諸如web服務器、數(shù)據(jù)庫服務器、文件服務器和郵件服務器等許多服務器應用都面向處理來自某些遠程來源的大量短小的任務。構建 服務器應用程序的一個過于簡單的模型是:每當一個請求到達就創(chuàng)建一個新的服務對彖,然后在新的服務對彖屮為請求服務。但當有 大量請求并發(fā)訪問時,服務器不斷的創(chuàng)建和銷毀對彖的開銷很大。所以提高服務器效率的一個手段就是盡可能減少創(chuàng)建和銷毀對彖的 次數(shù),特別是一些很耗資源的對彖創(chuàng)建和銷毀,這樣就引入了“池”的概念,“池”的概念使得人們可以定制一定量的資源,然后對這些資 源進行復用,而不是頻繁的創(chuàng)建和銷毀。線程池是預先創(chuàng)建線程的一種技術。線程池在還沒有任務到來z前,創(chuàng)建一定數(shù)

2、量的線程,放入空閑隊列屮。這些線程都是處于睡眠 狀態(tài),即均為啟動,不消耗cpu,而只是占用較小的內存空間。當請求到來z后,緩沖池給這次請求分配一個空閑線程,把請求傳入 此線程屮運行,進行處理。當預先創(chuàng)建的線程都處于運行狀態(tài),即預制線程不夠,線程池可以自由創(chuàng)建一定數(shù)量的新線程,用于處理 更多的請求。當系統(tǒng)比較閑的時候,也可以通過移除一部分一直處于停用狀態(tài)的線程。線程池的注意事項雖然線程池是構建多線程應用程序的強大機制,但使用它并不是沒有風險的。在使用線程池時需注意線程池大小與性能的關系,注意 并發(fā)風險、死鎖、資源不足和線程泄漏等問題。(1) 線程池大小。多線程應用并非線程越多越好,需要根據(jù)系統(tǒng)運

3、行的軟硬件環(huán)境以及應用木身的特點決定線程池的大小。一般來說, 如果代碼結構合理的話,線程數(shù)口與cpu數(shù)量相適合即可。如果線程運行吋可能出現(xiàn)阻塞現(xiàn)象,可相應增加池的大?。蝗缬斜匾?采用自適應算法來動態(tài)調整線程池的大小,以提高cpu的右效利用率和系統(tǒng)的整體性能。(2) 并發(fā)錯誤。多線程應用要特別注意并發(fā)錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現(xiàn)象的發(fā)生。(3) 線程泄漏。這是線程池應用屮一個嚴重的問題,當任務執(zhí)行完畢而線程沒能返冋池屮就會發(fā)生線程泄漏現(xiàn)象。簡單線程池的設計一個典型的線程池,應該包括如下幾個部分:1、線程池管理器(threadpool),用于啟動、停用,管理線程池2、工作線程

4、(workthread),線程池中的線程3、請求接口(workrequest),創(chuàng)建請求對象,以供工作線程調度任務的執(zhí)行4、請求隊列(requestqueue),用于存放和提取請求5、結杲隊列(resultqueue),用于存儲請求執(zhí)行后返回的結杲 線程池管理器,通過添加請求的方法(putrequest)向請求隊列(requestqueue)添加請求,這些請求事先需要實現(xiàn)請求接口,即 傳遞工作函數(shù)、參數(shù)、結果處理函數(shù)、以及異常處理函數(shù)。z后初始化一定數(shù)量的工作線程,這些線程通過輪詢的方式不斷查看請求 隊列(requestqueue),只要有請求存在,則會提取出請求,進行執(zhí)行。然后,線程池管理器

5、調用方法(poll)查看結果隊列(resultqueue) 是否有值,如果有值,則取出,調用結果處理函數(shù)執(zhí)行。通過以上講述,不難發(fā)現(xiàn),這個系統(tǒng)的核心資源在于請求隊列和結果隊列, 工作線程通過輪詢requestqueue獲得人物,主線程通過查看結果隊列,獲得執(zhí)行結果。因此,對這個隊列的設計,要實現(xiàn)線程同步, 以及一定阻塞和超時機制的設計,以防止因為不斷輪詢而導致的過多cpu開銷。在本文屮,將會用python語言實現(xiàn),python的queue, 就是很好的實現(xiàn)了對線程同步機制。使用python實現(xiàn):#-*-encoding:utf-8-*-,),created on 2012-3-9©s

6、ummary:線程池contact: mai1to:zhanglixinscugmai1. comauthor: zhanglixin,),import sysimport threadingimport queueimport traccback#定義一些exception,用于口定義異常處理class norcsultspcnding(exccption):"""all works requests have been processedpass class noworkersavailablc(exccption):worket threads availa

7、ble to process rcmaining requests.pass def _handle_thread_exception(request, exc_info):默認的異常處理函數(shù),只是簡單的打印"traccback. print cxccption(*cxc info)ttclasscsclass workcrthrcad(threading. thread):""后臺線程,真正的工作線程,從請求隊列(rcqucstqucuc)屮獲取work, 并將執(zhí)行后的結果添加到結果隊列(rcsultqueue)/,/,/,def init (self,rcqu

8、cstqucuc, resultqueue, poll timcout=5, *kwds): threading. thread. _init_(self, *kwds)'''設置為守護進行self, setdacmon(true)self. _requcstqueue 二 requestqueueself. _resultqucue 二 resuitqueueself. _poll_timcout 二 poll_timeout'''設置一個flag信號,用來表示該線程是否還被dismiss,默認為false''' sel

9、f, -dismissed 二 threading. event()self, st art ()def run(self):'''每個線程盡口j能多的執(zhí)行work,所以采用loop, 只要線程叫用,并il requestqueue有work未完成,則一直loop''' whi1e true:if self, dismissed, is set ():breakt ry:,queue. queue隊列設置了線程同步策略,并且可以設置timcouto 一直block,直到rcqucstqucuc有值,或者超時,request = self. rcq

10、ucstqucuc. get(true, self. _poll timeout)except queue. empty:continueelse:'''之所以在這里再次判斷dimissed,是因為之前的timcout時間里,很有可能,該線程被dismiss掉了'' if self.-dismissed. is_set():self. _requestqucuc. put(request)breaktry:'''執(zhí)彳亍callable,講請求和結果以tuple的方式放入rcqucstqucuc,''resuit 二

11、 request.cedlablc(*request. args, *request. kwds)print self, gctname ()self. _resuitqueue, put(request, resuit)except:'''異常處理'''request. exception 二 trueself. rcsultqucuc. put (request, sys. cxc info ()def dismiss(self):'''設置一個標志,表示完成當前work之后,退出self, dismissed, se

12、t ()class workrequest:,),param callable.:,可定制的,執(zhí)行work的函數(shù)param args:列表參數(shù)param kwds:字典參數(shù)param requestid: idparam callback:可定制的,處理rcsultqucuc隊列元素的函數(shù)param exc_callback:可定制的,處理異常的函數(shù),def _init_(self, callable_, args=none,kwds二none, requestld=none, callback=none, cxc_callback=_handle_thrcad_cxccption): if

13、requestid 二二 none:self, requestid 二 id(self)el se:t ry:self.requestid = hash (requesttd)except typcerror:raise typcerror("rcqucstld must be hashable")self, exception 二 falseself, callback = callbackself, exc callback = cxc callbackself, callable = callableself, args = args or self, kwds =

14、kwds or def _str_(self):return z,workrcqucst id=%s args=%r kwargs=%r exception二s % (self, requestid,self, args, self, kwds, self, cxccption)class thrcadpool:,param numworkers:初始化的線程數(shù)量param q_sizc, rcsq_sizc: rcqucstqucuc 和 result 隊列的初始大小param poll_timcout:設置工作線程 workcrthrcad 的 timeout,也就是等待 rcqucstq

15、ucuc 的 timcout ,def _init_(self, num_workers, q_size=0, resq_size=o, poll_timoout=5):self. _requestqueue 二 queue. queue(q_size)self. _resultqueue 二 queue. queue(resq_size)seif. workers 二self, dismissedworkcrs =self. workrcquests = #設置個字典,方便使用self, crcatcworkers(num workers, poll timeout)def crcatcwo

16、rkcrs(self, num workers, poll timcout=5):'''創(chuàng)建 num workers 個 workthread,默認 timeout 為 5'''for i in range(num workers):self, workers, append(workerthread(self. rcqucstqucuc, self. resultqueue, poll timcout二poll timeout)def dismissworkers(self, num_workers, do_join=falsc):'&

17、#39;'停用num_workcrs數(shù)量的線程,并加入dismiss_list,''dismiss_list 二for i in range(min(num_workcrs, len(self, workers):worker = self, workers. pop()worker, dismiss ()dismiss_list. append(worker)if do_join :for worker in dismiss_list:worker. join()else:seif. di smissedworkers. extend(dismiss_list)def

18、 joinalidismissedworkers(seif):"'join所有停用的thread'"#print lcn(sclf dismissedworkers)for worker in self, dismissedworkers:worker. join()self, dismissedworkers =def putrequest(self,request , block=truc, timcout=nonc):asscrt isinstancc(request,workrequest)assert not getattr (request,&#

19、39; exception,, none)'''當queue滿了,也就是容量達到了前面設定的q_sizc,它將一直阻塞,直到有空余位置,或是timeout'' self. _requestqucue. put(request, block, timcout)self. workrequestsrcqucst. requestid二 requestdef poll (self, block = false):while true:if not self. workrequests:raise norcsuitspendingelif block emd n

20、ot self, workers:raise noworkersavailablctry:'''默認只要resultqueue有值,則取出,否則一直block'''request , resuit 二 self. _resuitqueue, get(block二block)if request. exception and request. exc_caliback:request, cxc callback (request, result)callback):if request, callback and not (request, exc

21、eption and request.cxc request.callback (request, result)del self. workrequestsrequest. requestidexcept queue. empty:breakdef wait(self):while true:try:seif poll (true)except norcsuitspending:breakdef workersize(self):return lcn(sclf. workers)def stop(self):'''join所有的thread,確保所有的線程都執(zhí)行完畢&

22、#39;self. dismissworkers(self, workersize(), true)self.joinal 1 di smi ssedworkers()測試代碼:#test a demoif _name_=' _meiin_' :import randomimport timeimport dcitetimedef do_work(data):time, sleep(random. nmdint (1,3)res = str (date ti me. datetime .no w () + "” +st r (data)return resdef pr

23、int_result(request, result):print "resuit from request %s : %r % (request. requestid, result)main = threadpool(3)for i in range(40):req = workrequest(do_work, args=i, kwds二,callback=print_result) main. putrequest(req)print "work request #%s added.,z % req. requesltdprint*20,main, workersize 0 /*20counter = 0while true:t ry:ti

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論