《大數(shù)據(jù)的Python基礎(chǔ)》課件-第12章 并發(fā)編程_第1頁
《大數(shù)據(jù)的Python基礎(chǔ)》課件-第12章 并發(fā)編程_第2頁
《大數(shù)據(jù)的Python基礎(chǔ)》課件-第12章 并發(fā)編程_第3頁
《大數(shù)據(jù)的Python基礎(chǔ)》課件-第12章 并發(fā)編程_第4頁
《大數(shù)據(jù)的Python基礎(chǔ)》課件-第12章 并發(fā)編程_第5頁
已閱讀5頁,還剩66頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第12章并發(fā)編程學(xué)習(xí)目標

掌握進程的執(zhí)行、同步與數(shù)據(jù)交換掌握線程的創(chuàng)建與同步方法深入了解協(xié)程的概念和使用方法22025/3/612.1進程

進程是操作系統(tǒng)中正在執(zhí)行程序的實例,具有獨立的地址空間,保存了執(zhí)行指令、變量、動態(tài)分配的內(nèi)存以及堆棧數(shù)據(jù)空間等。32025/3/612.1.1進程的執(zhí)行

傳統(tǒng)的批處理系統(tǒng)采用串行的方式執(zhí)行程序,一個任務(wù)執(zhí)行完了再啟動下一個任務(wù)。并發(fā)執(zhí)行的操作系統(tǒng)允許多個任務(wù)同時執(zhí)行,如果涉及CPU占用等受限資源的使用,會使用調(diào)度算法實現(xiàn)多任務(wù)之間的共享。

在Python程序啟動后,會默認產(chǎn)生一個進程作為主進程,若在程序中啟動一個進程,事實是啟動了當前進程的子進程。子進程采用start()方法啟動執(zhí)行,而其執(zhí)行是否結(jié)束可以由join()方法來確定,子進程join()方法一旦被執(zhí)行,會確保該進程一定執(zhí)行完成后才能執(zhí)行后續(xù)的指令。42025/3/6【例12-1】子進程的啟動和執(zhí)行importmultiprocessingasmpimportosimporttimex=100deftask(msg): print('intask,子進程runing') print('intask,modulename:',__name__) print('intask',msg) globalx x=10 time.sleep(2)print('intask,parentprocess:',os.getppid()) #查看父進程IDprint('intask,processid:',os.getpid()) #查看當前進程IDprint('intask,x=',x)52025/3/6if__name__=='__main__': p=mp.Process(target=task,args=('MySubProcess',)) p.start()

p.join() #等待子進程執(zhí)行完畢

print('x=',x) print('parentprocess:',os.getppid())#查看父進程ID print('processid:',os.getpid()) #查看當前進程ID【例12-1】子進程的啟動和執(zhí)行

對于進程的執(zhí)行不能在IDLE環(huán)境中進行,否則無法準確觀察結(jié)果。以上的程序需要在命令行中執(zhí)行,如圖所示。

在一個程序中,若同時存在多個任務(wù),可以利用進程池并發(fā)執(zhí)行各個任務(wù)。62025/3/6【例12-2】利用進程池執(zhí)行多個序列元素的乘積importmultiprocessingasmpimportosfromfunctoolsimportreducedeff(lst): print('name={}ppid={}pid={}'.format(mp.current_process().name, os.getppid(),os.getpid())) return(reduce(lambdax,y:x*y,lst))if__name__=='__main__': x=[list(range(x,x+9))forxinrange(1,100,10)] #list(range(1,9)),list(range(10,19))… withmp.Pool(os.cpu_count())asp: print(p.map(f,x))72025/3/6【例12-2】利用進程池執(zhí)行多個序列元素的乘積

程序執(zhí)行結(jié)果中的計算結(jié)果唯一,但結(jié)果中參與計算的進程信息每次運行會出現(xiàn)不同的信息,由于x中總共有10個元素,因此其打印的數(shù)量為10個記錄。

根據(jù)進程號可以看出,當前有三個子進程4152、4896、6600,一個父進程9496,最后10組列表各自元素的累積則放在一個列表中打印出來:82025/3/6【例12-2】利用進程池執(zhí)行多個序列元素的乘積92025/3/6name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-2ppid=9496pid=4896name=SpawnPoolWorker-1ppid=9496pid=4152name=SpawnPoolWorker-3ppid=9496pid=6600[362880,33522128640,3634245014400,76899763100160,745520860465920,4559830787191680,20565162535357440,74684882115043200,230656425830328960,628156509555294720]

運行結(jié)果:12.1.2進程同步102025/3/6

盡管多個進程可以并發(fā)式執(zhí)行,對于IO設(shè)備等一些共享資源,經(jīng)常需要某個進程的獨占式訪問,這樣就需要進程同步機制?;コ怄i是一種常見的共享資源保護措施,能夠確保只有單一進程訪問資源,其它進程訪問已經(jīng)加鎖的資源則必須等待互斥鎖釋放以后才能訪問該資源?!纠?2-3】利用互斥鎖進行文件讀寫112025/3/6importmultiprocessingasmpimportsysimporttimedefworker1(lock,f): withlock: fs=open(f,'a+') foriinrange(3): fs.write("worker1lockd\n") time.sleep(0.5) fs.close()defworker2(lock,f): lock.acquire() try:fs=open(f,'a+') foriinrange(3): fs.write("worker2acquired\n") time.sleep(0.5) fs.close() finally: lock.release()if__name__=="__main__": lock=mp.Lock() f="locked.txt" p1=mp.Process(target=worker1,args=(lock,f)) p2=mp.Process(target=worker2,args=(lock,f)) p1.start() p2.start() print("end")【例12-3】利用互斥鎖進行文件讀寫122025/3/6在命令行中執(zhí)行以上程序以后,可查看locked.txt中具有以下內(nèi)容:worker1lockdworker1lockdworker1lockdworker2acquiredworker2acquiredworker2acquired12.1.2進程同步132025/3/6事件(Event)機制可以用于進程或線程之間的通信。事件(Event)通信的相關(guān)方法

方法說明is_set()返回Event的內(nèi)部標志是否為Trueset()把內(nèi)部標志設(shè)置為True,并喚醒所有處于等待狀態(tài)的進程或線程clear()將Event的內(nèi)部標志設(shè)置為False,通常接下來會調(diào)用wait()阻塞wait(timeout=None)阻塞當前的進程或線程,直到內(nèi)部標志為True【例12-4】采用事件機制通信的進程142025/3/6importmultiprocessingasmpimporttimedeftask1(e): print("task1starting,flag=",str(e.is_set())) e.wait() print("task1,flag=",str(e.is_set()))deftask2(e,t): print("task2starting,flag=",str(e.is_set())) e.wait(t) print("task2,flag=",str(e.is_set()))if__name__=="__main__": e=mp.Event() mp.Process(name="block",target=task1,args=(e,)).start() mp.Process(name="non-block",target=task2,args=(e,2)).start() time.sleep(3) e.set() print("main:eventisset")在命令行中調(diào)用python語句執(zhí)行以上程序可以得到以下結(jié)果:task1starting,flag=Falsetask2starting,flag=Falsetask2,flag=Falsemain:eventissettask1,flag=True12.1.3進程間的數(shù)據(jù)交換152025/3/6

進程擁有獨立的地址空間,意味著兩個子進程之間要想進行數(shù)據(jù)交換,需要一些特殊的設(shè)計。隊列、管道和共享內(nèi)存和進程管理器是一些常見的進程間數(shù)據(jù)交換的方式。

隊列

Queue是多進程安全的隊列,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞。put方法用以插入數(shù)據(jù)到隊列中,get方法可以從隊列讀取并且刪除一個元素。【例12-5】生產(chǎn)者與消費者進程162025/3/6importrandomimportmultiprocessingasmpimporttimedefproduter(q,name,food):foriinrange(1,4):res='%s%s'%(food,i)time.sleep(random.randint(1,3))q.put(res)print('%s生產(chǎn)%s'%(name,res))q.put(None) #結(jié)束時放置一個None到隊列defconsumer(q,name):whileTrue:res=q.get()ifres==None:break

#若接收為None則終止循環(huán)time.sleep(random.randint(1,3))print('%s消費%s'%(name,res))if__name__=='__main__':q=mp.Queue()mp.Process(target=produter,args=(q,'農(nóng)民','大米')).start()mp.Process(target=consumer,args=(q,'居民')).start()在命令行中調(diào)用python語句執(zhí)行以上程序可以得到以下結(jié)果:農(nóng)民生產(chǎn)大米1居民消費大米1農(nóng)民生產(chǎn)大米2居民消費大米2農(nóng)民生產(chǎn)大米3居民消費大米312.1.3進程間的數(shù)據(jù)交換172025/3/6

管道

管道(Pipe)是常用的數(shù)據(jù)傳遞方法,一般包括兩個連接端(conn1,conn2)。

默認情況下,管道會建立在全雙工模式(duplex=True)下,兩個連接端均可收發(fā)數(shù)據(jù),即均能夠調(diào)用send()和recv()方法進行消息的發(fā)送和接收。

如果沒有消息可接收,recv()方法會一直阻塞。如果管道已經(jīng)被關(guān)閉,那么recv()方法會拋出EOFError異常?!纠?2-6】采用管道進行數(shù)據(jù)的收發(fā)182025/3/6管道的兩端也可以自定義內(nèi)部協(xié)議,雙方以消息的方式相互通知關(guān)閉管道通信,如例12-6中以'end'消息作為通信的結(jié)束語。importmultiprocessingasmpdeff(conn): conn.send('hello') #向管道中發(fā)送數(shù)據(jù)

print(conn.recv()) conn.send(100) print(conn.recv()) conn.send('end') conn.close()【例12-6】采用管道進行數(shù)據(jù)的收發(fā)192025/3/6if__name__=='__main__': conn1,conn2=mp.Pipe() #創(chuàng)建管道對象 p=mp.Process(target=f,args=(conn2,)) #將管道的一方作為參數(shù)傳遞給子進程 p.start() whileTrue: m=conn1.recv() print(type(m),m) #通過管道的另一方獲取數(shù)據(jù) ifm=='end':break conn1.send('received') p.join()在命令行中調(diào)用python語句執(zhí)行以上程序可以得到以下結(jié)果:<class'str'>helloreceived<class'int'>100received<class'str'>end12.1.3進程間的數(shù)據(jù)交換202025/3/6

內(nèi)存共享

共享內(nèi)存是一種高效的進程間通信方式,進程可以直接共享并讀寫一段內(nèi)存,而不需要任何數(shù)據(jù)的拷貝,因此可以快速在進程間傳遞數(shù)據(jù)。multiprocessing模塊中的Value和Array提供了共享內(nèi)存通信的能力,其中Value提供單值的存放,而Array提供同類型的多個數(shù)據(jù)的存放。Value與Array的用法如下:

Value(typecode_or_type,*args,lock=True)

Array(typecode_or_type,size_or_initializer,*,lock=True)

其中的typecode_or_type表示類型代碼或C語言的類型,如表所示。12.1.3進程間的數(shù)據(jù)交換212025/3/6typecode_or_type表示類型代碼'H'unsignedshortint2'i'signedintint2'I'unsignedintlong2'l'signedlongint4'L'unsignedlonglong4'q'signedlonglongint8'Q'unsignedlonglongint8'f'floatfloat4'd'doublefloat8類型代碼C語言類型Python類型字節(jié)數(shù)'b'signedcharint1'B'unsignedcharint1'u'Py_UNICODEUnicode>1'h'signedshortint2【例12-7】共享內(nèi)存通信示例222025/3/6importmultiprocessingasmpdeftask(n,a): n.value=10 foriinrange(len(a)): a[i]=a[i]*int(n.value) print('taskcomplete')if__name__=='__main__': num=mp.Value('d',0.0) #實型 arr=mp.Array('i',range(10))

#整型數(shù)組 print(num.value)

print(arr[:]) p=mp.Process(target=task,args=(num,arr)) p.start() p.join() print(num.value) print(arr[:])在命令行中調(diào)用python語句執(zhí)行以上程序可以得到以下結(jié)果:0.0[0,1,2,3,4,5,6,7,8,9]taskcomplete10.0[0,10,20,30,40,50,60,70,80,90]12.1.3進程間的數(shù)據(jù)交換232025/3/6

進程管理器(Manager)控制一個擁有l(wèi)ist、dict、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array、Namespace等對象的服務(wù)端進程,并且允許其他進程訪問這些對象,可以提供更為強大的數(shù)據(jù)共享功能。Manager類是數(shù)據(jù)不安全的,因此在使用過程中若需要確保多進程的數(shù)據(jù)共享可以采取互斥鎖的方式確保數(shù)據(jù)訪問的唯一性?!纠?2-8】進程管理器的應(yīng)用示例242025/3/6importmultiprocessingasmpdefworker(d,l,v,lock): withlock: foriinrange(1,6): key="key{0}".format(i) val="val{0}".format(i)d[key]=val l+=range(11,16) v.value=10if__name__=="__main__": lock=mp.Lock() withmp.Manager()asm:

d,l,v=m.dict(),m.list(),m.Value('i',0) p=mp.Process(target=worker,args=(d,l,v,lock)) p.start() p.join() print('{}\n{}\n{}'.format(d,l,v.value))在命令行中執(zhí)行以上程序可以得到以下結(jié)果:{'key1':'val1','key2':'val2','key3':'val3','key4':'val4','key5':'val5'}[11,12,13,14,15]1012.2

線程252025/3/6

線程是進程中的實體,是被操作系統(tǒng)獨立調(diào)度和分派處理器運行的基本單位。進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)。

threading模塊提供了Thread、Lock、RLock、Condition、Event、Timer和Semaphore等大量類來支持多線程編程,可以通過Thread類創(chuàng)建線程并控制線程的運行12.2.1

創(chuàng)建線程262025/3/6

Thread對象成員成員說明start()自動調(diào)用run()方法,啟動線程run()用來實現(xiàn)線程的功能和業(yè)務(wù)邏輯,可以在子類中重寫該方法來自定義線程的行為__inti__(self,group=None,target=None,name=None,kwargs=None,verbose=None)構(gòu)造函數(shù)name用來讀取或設(shè)置線程的名字ident線程標識,非0數(shù)字或None(線程未被啟動)is_alive(),isAlive()測試線程是否處于活躍狀態(tài)daemon布爾值,標識線程是否為守護線程join(timeout=None)等待線程結(jié)束或超時返回【例12-9】線程的執(zhí)行272025/3/6importthreadingdeffunc1(x,y):foriinrange(x,y):print(i)t1=threading.Thread(target=func1,args=(1,3))t1.start()print('t1-',t1.isAlive())t1.join(5)t2=threading.Thread(target=func1, args=(13,15))print('t2-',t2.isAlive())t2.join()print('t1:',t1.isAlive())print('t2:',t2.isAlive())

以上程序可以在命令行窗口觀察到以下結(jié)果,可以看到線程的運行使得程序print()語句的輸出順序發(fā)生了變化。1t1-True21314t2-Truet1:Falset2:False12.2.1

創(chuàng)建線程282025/3/6

不同于子進程的執(zhí)行無法在IDLE中觀測,線程的執(zhí)行可以在IDLE中觀察到print()的結(jié)果,但是其輸出的順序會與命令行執(zhí)行方式有所區(qū)別,可掃碼二維碼進一步查看。

一般來說,線程start()以后會首先啟動run()方法中的內(nèi)容,然后再啟動后續(xù)的語句,但由于線程的執(zhí)行是不受限的,因此后續(xù)的代碼事實是并發(fā)進行的。例12-10是一個用類的方式實現(xiàn)的線程,可以進一步觀察其并發(fā)執(zhí)行情況?!纠?2-10】線程的執(zhí)行292025/3/6fromthreadingimportThreadimporttimex=1classMyThread(Thread): def__init__(self,name): Thread.__init__(self) =name defrun(self): globalx print(f'inrun{}x={x}') time.sleep(1) x=100 print('runend,x=',x) deftest(self):

print('intest',)在命令行窗口可觀察到以下結(jié)果:inrunThread-1x=1intestThread-1runend,x=1complete,x=100t=MyThread('Thread-1')t.start()t.test()time.sleep(2)print('complete,x=',x)

12.2.2

線程同步302025/3/6為充分利用CPU等各類硬件資源來提高任務(wù)處理的速度和效率,可以將任務(wù)拆分成互相協(xié)作的多個線程同時運行,多個線程之間往往會有一定交互和同步以協(xié)作完成任務(wù)?;コ怄i

線程也可以具有互斥鎖方法來實現(xiàn)多線程的同步運行。threading模塊的Lock和RLock對象都是用于線程同步的互斥鎖,可被同一個線程acquire()多次。線程可以通過Lock或RLock對象的acquire()/release()調(diào)用實現(xiàn)對共享資源的訪問,能夠確保共享資源在同一時間能被線程進行獨占式訪問?!纠?2-11】通過互斥鎖訪問共享資源312025/3/6fromthreadingimportThread,RLockimporttimeclassMyThread(Thread):defrun(self):globalxlock.acquire()foriinrange(3):x=x+itime.sleep(1)print(x,end='')lock.release()lock=RLock()tl=[]foriinrange(5):t=MyThread()tl.append(t)x=0foriintl:i.start()在命令行窗口可觀察到輸出的結(jié)果都是3的倍數(shù),說明線程的互斥鎖確保了多線程對于全局變量訪問的唯一性:

369121512.2.2

線程同步322025/3/6信號量

互斥鎖同時只允許一個線程訪問共享數(shù)據(jù),而信號量是同時允許一定數(shù)量的線程訪問共享數(shù)據(jù),比如銀行柜臺有5個窗口,則允許同時有5個人辦理業(yè)務(wù),后面的人只能等待前面有了辦完業(yè)務(wù)后才可以進入柜臺辦理?!纠?2-12】銀行業(yè)務(wù)辦理過程332025/3/6importthreadingimporttimedefbankTask(name):#銀行業(yè)務(wù)辦理

semaphore.acquire()time.sleep(3)print(f'{name}辦理業(yè)務(wù)')semaphore.release()semaphore=threading.BoundedSemaphore(5) #5個銀行窗口同時工作thread_list=[]foriinrange(10): #10個顧客t=threading.Thread(target=bankTask,args=(i,))thread_list.append(t)thread.join()因采用了線程同步機制,采用IDLE和命令行窗口方式執(zhí)行可得到相同的結(jié)果:4辦理業(yè)務(wù)0辦理業(yè)務(wù)3辦理業(yè)務(wù)1辦理業(yè)務(wù)2辦理業(yè)務(wù)5辦理業(yè)務(wù)6辦理業(yè)務(wù)7辦理業(yè)務(wù)8辦理業(yè)務(wù)9辦理業(yè)務(wù)12.2.2

線程同步342025/3/6條件變量

有時候線程的同步需要滿足一定條件才能繼續(xù),此時單純的互斥鎖難以滿足需要,而條件變量(Condition)除了能提供互斥鎖的功能外,還提供了wait()、notify()、notifyAll()等方法。wait([timeout])方法可以將線程掛起,直到收到notify通知或者超時。

【例12-13】采用線程方式實現(xiàn)一個生產(chǎn)者-消費者過程。fromthreadingimportThread,ConditionclassProducer(Thread):defrun(self):globalxcon.acquire()ifx==20:con.wait()else:print('Producer:',end='')foriinrange(20):print(x,end='')x=x+112.2.2

線程同步352025/3/6classConsumer(Thread):defrun(self):globalxcon.acquire()ifx==0:con.wait()else:print('Consumer:',end='')foriinrange(20):print(x,end='')x=x-1print(x)con.notify()con.release()print('complete',x)con=Condition()x=0p=Producer()c=Consumer()p.start()c.start()p.join()c.join()以上程序執(zhí)行結(jié)果如下:Producer:01234567891011121314151617181920Consumer:20191817161514131211109876543210complete012.2.2

線程同步362025/3/6隊列

線程使用隊列需要引入queue模塊,一般用于生產(chǎn)者-消費者類型的業(yè)務(wù)模型?!纠?2-14】隊列實現(xiàn)的線程級生產(chǎn)者-消費者過程。fromthreadingimportThreadfromqueueimportQueueimporttimeclassWorker(Thread):def__init__(self,queue):Thread.__init__(self)self.queue=queueself.thread_stop=Falsedefrun(self):whileTrue:try:task=q.get(block=True,timeout=20)time.sleep(3)q.task_done()#完成一個任務(wù)

res=q.qsize() #消息隊列大小print("taskrecv:%s,taskNo:%d,resttasks:%d"%(task[0],task[1],res))

exceptExceptionase:break12.2.2

線程同步372025/3/6if__name__=="__main__":q=Queue(3)worker=Worker(q)worker.start()q.put(["makeacup!",1],block=True,timeout=None)q.put(["makeadesk!",2],block=True,timeout=None)q.put(["makeanapple!",3],block=True,timeout=None)q.put(["makeabanana!",4],block=True,timeout=None)q.put(["makeabag!",5],block=True,timeout=None)q.join()print("complete")以上程序執(zhí)行結(jié)果如下:taskrecv:makeacup!,taskNo:1,resttasks:3taskrecv:makeadesk!,taskNo:2,resttasks:3taskrecv:makeanapple!,taskNo:3,resttasks:2taskrecv:makeabanana!,taskNo:4,resttasks:1taskrecv:makeabag!,taskNo:5,resttasks:0complete12.2.2

線程同步382025/3/6事件

threading模塊提供了事件(Event)的實現(xiàn)。set()方法可以設(shè)置Event對象內(nèi)部的信號標志為真,clear()方法則將其設(shè)置為假;isSet()方法用來判斷其內(nèi)部信號標志的狀態(tài),wait()方法在其內(nèi)部信號狀態(tài)為真時返回,否則將一直等待至超時或內(nèi)部信號狀態(tài)為真?!纠?2-15】多線程的事件機制392025/3/6fromthreadingimportThread,EventclassMyThread(Thread):def__init__(self,thname):Thread.__init__(self,name=thname)defrun(self):globalevtifevt.isSet():evt.clear()evt.wait()print(self.getName(),end='')else:print(self.getName(),end='')evt.set()evt=Event()evt.set()tl=[]foriinrange(4):t=MyThread(str(i))tl.append(t)foriintl:i.start()以上程序執(zhí)行結(jié)果如下:103212.3協(xié)程

概念的引入

生成器協(xié)程

異步處理協(xié)程402025/3/612.3.1

概念的引入412025/3/6

多進程的執(zhí)行中數(shù)據(jù)共享復(fù)雜,往往需要進程間通信(Inter-ProcessCommunication,IPC)。對于計算密集型的任務(wù),由于線程并發(fā)時需要進行執(zhí)行現(xiàn)場的切換和狀態(tài)保存(如開關(guān)線程、保存寄存器和堆棧等),會耗費大量系統(tǒng)的資源,因此多線程的使用有時在性能方面可能會低于單線程?!纠?2-16】IO密集型任務(wù)與計算密集型任務(wù)的性能對比422025/3/6frommultiprocessingimportProcessfromthreadingimportThreadimporttimedefioIntenseWork():time.sleep(0.5)defcomputeIntenseWork():res=0foriinrange(10000000):res*=1defgetRunTime(entity,work):l=[]start=time.time()foriinrange(100):

p=entity(target=work)

l.append(p)p.start()forpinl:p.join()returntime.time()-startif__name__=='__main__':print('IO密集型任務(wù)’)print('runtimeforthreadsis:',getRunTime(Thread,ioIntenseWork))

print('runtimeforprocessesis:',getRunTime(Process,ioIntenseWork))print('計算密集型任務(wù)')print('runtimeforthreadsis:’,getRunTime(Thread,computeIntenseWork))print('runtimeforprocessesis:',getRunTime(Process,computeIntenseWork))12.3.1

概念的引入432025/3/6

在命令行窗口執(zhí)行以上程序,得到結(jié)果如下,可以看出與之前的分析相吻合:IO密集型任務(wù)runtimeforthreadsis:0.5621793270111084runtimeforprocessesis:28.937815189361572計算密集型任務(wù)runtimeforthreadsis:78.20331883430481runtimeforprocessesis:45.74988174438476612.3.1

概念的引入442025/3/6

線程的管理和調(diào)度是由內(nèi)核來完成的,對于程序設(shè)計人員無法準確控制多個線程的執(zhí)行順序。

另外一種線程,其調(diào)度是由程序員編寫的程序來自行管理的,而無需的內(nèi)核控制和線程切換,這樣的線程叫做用戶空間線程,又稱為協(xié)程。12.3.2

生成器協(xié)程452025/3/6

Python代碼的執(zhí)行由Python虛擬機控制,這種Python解釋器的主循環(huán)之中要求同時只有一個線程在運行。因此Python虛擬機引入了一種全局解釋鎖(GlobalInterpreterLock,GIL)的線程同步機制。

協(xié)程(Coroutine),又稱為微線程,屬于一種用戶空間的線程,只有一個線程執(zhí)行,但是卻能夠?qū)崿F(xiàn)以往多線程才能夠?qū)崿F(xiàn)的協(xié)同式函數(shù)調(diào)用的能力,即在執(zhí)行一個函數(shù)A時,可以隨時中斷其執(zhí)行而轉(zhuǎn)為執(zhí)行函數(shù)B,且這種函數(shù)執(zhí)行的切換可以自由進行。對于計算密集型的任務(wù),適用于多進程與協(xié)程相結(jié)合的方式。12.3.2

生成器協(xié)程462025/3/6yield表達式的運用

生成器不需要保存序列中的元素,而是通過生成器函數(shù)來實現(xiàn)用于產(chǎn)生每個元素的算法,再經(jīng)過迭代逐一生成序列中的所有元素。生成器函數(shù)不是通過return語句來返回數(shù)值,而是通過yield語句來迭代返回序列的元素。yield語句每次返回一個值,然后由生成器函數(shù)來保存當前函數(shù)的執(zhí)行狀態(tài),等待下一次調(diào)用?!纠?2-17】一個簡單的生成器協(xié)程472025/3/6defgen_range(start,end):print("starting...")whilestart<end:start=start+1yieldstart #標志所在函數(shù)為生成器函數(shù)getRunTime(Thread,computeIntenseWork))print('runtimeforprocessesis:',getRunTime(Process,computeIntenseWork))if__name__=='__main__': forningen_range(0,10):#迭代器 print(n,end='') print()執(zhí)行以上程序,得到結(jié)果如下:starting...12345678910【例子12-18】構(gòu)成協(xié)程的兩個生成器482025/3/6importtimedefA():foriinrange(5):print('A',end='')yieldtime.sleep(0.5)defB(c):whileTrue:print('B',end='')next(c)time.sleep(0.5)if__name__=='__main__':a=A()B(a)得到結(jié)果如下,其中出現(xiàn)了6個B,執(zhí)行到調(diào)用A函數(shù)時引發(fā)了StopIteration異常:BABABABABABTraceback(mostrecentcalllast):B(a)next(c)StopIteration12.3.2

生成器協(xié)程492025/3/6

如果在執(zhí)行期間同一程序的兩段子程序之間形成了協(xié)作式互動,則這種子程序間的互動就構(gòu)成了協(xié)程,類似于通過線程而實現(xiàn)的兩段子程序的并發(fā)執(zhí)行。利用生成器實現(xiàn)協(xié)程時,往往會利用send()方法實現(xiàn)子程序之間的數(shù)據(jù)傳遞。對于生成器函數(shù)而言,其執(zhí)行方式如下,這也是協(xié)程的工作方式:為生成器函數(shù)A建立一個實例a以后,并不能引發(fā)A的執(zhí)行,而是處于一種等待執(zhí)行的GEN_CREATED狀態(tài),此時生成器函數(shù)A也就成為了協(xié)程A;(2)調(diào)用了next(A)或a.send(None)時會激活協(xié)程A,使其執(zhí)行完成第一個yield表達式并暫停,此時執(zhí)行狀態(tài)為GEN_SUSPENED;12.3.2

生成器協(xié)程502025/3/6再次調(diào)用next()或send()才能夠執(zhí)行第一個yield表達式,并執(zhí)行完成下一個yield表達式并暫停,此后不斷重復(fù)此過程。關(guān)閉生成器以后協(xié)程執(zhí)行結(jié)束,進入GEN_CLOSED狀態(tài),而協(xié)程工作時的狀態(tài)為GEN_RUNNING。綜合而言,協(xié)程可以處于GEN_CREATED、GEN_RUNNING、GEN_SUSPENED、GEN_CLOSED等四個狀態(tài)中的一個,當前狀態(tài)可以使用inspect.getgeneratorstate()函數(shù)確定。例12-19展示了一個簡單協(xié)程的工作狀態(tài)?!纠?2-19】一個簡單協(xié)程的工作狀態(tài)512025/3/6importinspectdefcoroutine(a):print(‘start’)#(1)b=yield#(2)

print(f‘a(chǎn)={a},b=’)#(3)c=yielda+b#(4)print(f'a={a},b=,c={c}')#(5)c=coroutine(5)#(6)#輸出結(jié)果:

#結(jié)果對應(yīng)的語句print(inspect.getgeneratorstate(c))#(7)#GEN_CREATED#(7)print('next(c)yield',next(c)) #(8)#start#(8)-next(1)(2)-yield#next(c)yieldNone#(8)-printprint(inspect.getgeneratorstate(c))#(9)#GEN_SUSPENDED#(9)print('c.send(6)yield',c.send(6)) #(10)#a=5,b=6#(10)-send(2)-b(3)(4)-yield#c.send(6)yield11#(10)-printprint(inspect.getgeneratorstate(c))#(11)#GEN_SUSPENDED

#(11)c.send(7)#(12)#a=5,b=6,c=7

#(12)(4)-c#StopIteration#(5)print(inspect.getgeneratorstate(c))#(13)#GEN_CLOSED#(13)本例的執(zhí)行順序標注在注釋部分,可以直接查閱到語句執(zhí)行的對應(yīng)情況?!纠?2-20】求已輸入數(shù)據(jù)的當前累計均值522025/3/6defavg_coroutine():#求均值的生成器協(xié)程total=0.0count=0avg=NonewhileTrue:num=yieldavgtotal+=numcount+=1avg=total/countif__name__=='__main__':avg=avg_coroutine()next(avg)#激活協(xié)程lst=[10,20,45,65]foriinlst:

print(avg.send(i),end='') #輸出當前已發(fā)送數(shù)據(jù)的均值print()以上程序執(zhí)行結(jié)果如下:10.015.025.035.012.3.2

生成器協(xié)程532025/3/6yieldfrom表達式的運用

yieldfrom表達式可用于返回可迭代對象中的數(shù)據(jù),也可以返回來自于迭代器甚至生成器中的數(shù)據(jù)。如果數(shù)據(jù)來源于生成器,則yieldfrom所在的函數(shù)相當于委托生成器協(xié)程,它以簡潔的方式可以委托多個子生成器協(xié)程,這樣就可以實現(xiàn)相當復(fù)雜的邏輯體系。當yieldfrom的數(shù)據(jù)來源于子生成器而形成委托生成器協(xié)程時,其邏輯狀態(tài)變化就要考慮委托生成器和子生成器等各個部分。例12-21給出了一個這種生成器狀態(tài)變化的演示?!纠?2-21】含有yieldfrom的協(xié)程542025/3/6importinspectdefsubgen_coroutine(): #子生成器協(xié)程print('subgenstart')a=yieldprint('a=',100)b=yielda+1000print('b=',b)yieldb+1000defproxygen_coroutine(x): #委托生成器協(xié)程print('proxygenstart')yieldfromsubgen_coroutine()print('proceed1')yieldfromrange(x-1,0,-1) #利用yieldfrom拼接可迭代對象print('proceed2')yieldfromrange(x)print('end')【例12-21】含有yieldfrom的協(xié)程552025/3/6if__name__=='__main__':g=proxygen_coroutine(5)print(inspect.getgeneratorstate(g))print(next(g)) #激活協(xié)程,yield無內(nèi)容則返回Noneprint(inspect.getgeneratorstate(g))print('g.send(100)yield',g.send(100))#將100發(fā)送給a,yielda+1000print('g.send(200)yield',g.send(200))#將200發(fā)送給b,yieldb+1000print(list(g))print(inspect.getgeneratorstate(g))以上程序執(zhí)行結(jié)果如下:GEN_CREATEDproxygenstartsubgenstartNoneGEN_SUSPENDEDa=100g.send(100)yield1100b=200g.send(200)yield1200proceed1proceed2end[4,3,2,1,0,1,2,3,4]GEN_CLOSED1.由于yieldfrom表達式的運行會針對一個子生成器協(xié)程,如果委托生成器協(xié)程中涉及多個子生成器協(xié)程,也可以將委托生成器協(xié)程設(shè)計成一個委派器,將各個yieldfrom所獲得的返回值序列進行分組保存。2.在例12-22中,yieldfrom所在的例程負責(zé)將獲得的結(jié)果插入列表實現(xiàn)其分組存放?!纠?2-22】利用委派生成器協(xié)程實現(xiàn)一個累加器562025/3/6defsubgen_coroutine():#子生成器協(xié)程total=0.0whileTrue:num=yieldifnumisNone:returntotaltotal+=numdefgrouper_coroutine(lst): #委派生成器協(xié)程whileTrue:res=yieldfromsubgen_coroutine()lst.append(res)if__name__=='__main__':lst=[]g=grouper_coroutine(lst)print(g)next(g)#使累加生成器準備好接收傳入值【例12-22】利用委派生成器協(xié)程實現(xiàn)一個累加器572025/3/6foriinrange(4):#計算0、1、2、3的累加和g.send(i)g.send(None)#結(jié)束第一次累加foriinrange(5):#計算0、1、2、3、4的累加和

g.send(i)g.send(None)print(lst)以上程序執(zhí)行結(jié)果如下:

<generatorobjectgrouper_coroutineat0x000001F90DF9BEB8>[6.0,10.0]12.3.3

異步處理協(xié)程582025/3/6

協(xié)程是Python中發(fā)展很快的熱門領(lǐng)域,從Python3.5開始引入了新的語法async和await,可以簡便地實現(xiàn)異步IO,通過異步處理函數(shù)完成協(xié)程的并發(fā)操作。阻塞與調(diào)度

異步IO模塊提供了異步處理協(xié)程的定義方式,適用于磁盤讀寫和網(wǎng)絡(luò)操作等IO操作。

異步處理方式的好處是可以暫停一個函數(shù)的執(zhí)行控制,卻不影響其異步的IO執(zhí)行?!纠?2-23】異步處理協(xié)程的創(chuàng)建與運行592025/3/6importasyncioasyncdeffun1():print("fun1:Start")print("fun1:Stop")asyncdeffun2():print("fun2:Start")print("fun2:Stop")return100if__name__=='__main__':f1=fun1()print(f1)f2=fun2()print(f2)try:f1.send(None)exceptStopIteration:pass程序的執(zhí)行結(jié)果如下:<coroutineobjectfun1at0x000001EB0494AFC0><coroutineobjectfun2at0x000001EB04A1E6D0>fun1:Startfun1:Stopfun2:Startfun2:Stop100try:f2.send(None)exceptStopIterationase:print(e)12.3.3異步處理協(xié)程602025/3/6異步處理協(xié)程非常適用于具有IO操作的環(huán)境,在模擬運行時往往通過sleep()睡眠操作模擬IO的執(zhí)行。將await關(guān)鍵字加在需要等待的操作前面,可以在異步處理消息循環(huán)中加以調(diào)用,用于耗時的IO操作并掛起協(xié)程,讓出其執(zhí)行的控制權(quán)?!纠?2-24】異步處理協(xié)程的創(chuàng)建與運行612025/3/6importasyncioasyncdeffunc(x):print('輸出%d'%x,end='')awaitasyncio.sleep(2)#模擬這里產(chǎn)生了一個異步操作{異步IO}print('ended',end='')if__name__=='__main__':a=func(10)print(a)print('prepareeventloop')loop=asyncio.get_event_loop() #獲取一個事件輪詢對象print('starttorun')loop.run_until_complete(a) #運行事件環(huán),處理異步事件print('\ncomplete')loop.close()#關(guān)閉事件環(huán)【例12-24】異步處理協(xié)程的創(chuàng)建與運行622025/3/6程序的執(zhí)行結(jié)果如下:<coroutineobjectfuncat0x00000219072B6048>prepareeventloopstarttorun輸出10endedcomplete異步處理協(xié)程需要使用await表達式返回協(xié)程,并等待將來的事件循環(huán)調(diào)度。這種由await表達式返回并讓出執(zhí)行控制權(quán)的情景下,其返回的對象又稱可等待對象(awaitable),可以是協(xié)程或具有__await__()方法的對象。12.3.3異步處理協(xié)程632025/3/6一、對于異步執(zhí)行的協(xié)程,可以將其實例直接加入事件循環(huán),如例12-24中的loop.run_until_complete(a),更多的做法是采用gather()或wait()函數(shù),可以實現(xiàn)多個協(xié)程的同時注冊,具體用法如下:awaitableasyncio.gather(*aws,loop=None,return_exceptions=False)awaitasyncio.wait(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED)

12.3.3異步處理協(xié)程642025/3/6二、一旦將協(xié)程通過run_until_complete()方法注冊到事件循環(huán),其await所發(fā)起的耗時IO操作之后的執(zhí)行就可以在事件循環(huán)中等待被調(diào)度以恢復(fù)其執(zhí)行。例12-25展示了事件循環(huán)的使用及其控制的異步協(xié)程調(diào)用,其中事件循環(huán)進行了兩次調(diào)度:loop.run_until_complete(asyncio.wait([io1(),io2()]))loop.run_until_complete(asyncio.gather(io3(),io4()))這兩次調(diào)度屬于順序執(zhí)行,而每次調(diào)度之中的各個協(xié)程直接采用了異步執(zhí)行的方式?!纠?2-25】異步IO處理協(xié)程運行模擬652025/3/6importasyncio,timeasyncdefio1():

#模擬磁盤異步IOforiinrange(2):pr

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論