Java8中的 Stream 那么強大那你知道它的原理是什么嗎_第1頁
Java8中的 Stream 那么強大那你知道它的原理是什么嗎_第2頁
Java8中的 Stream 那么強大那你知道它的原理是什么嗎_第3頁
Java8中的 Stream 那么強大那你知道它的原理是什么嗎_第4頁
Java8中的 Stream 那么強大那你知道它的原理是什么嗎_第5頁
已閱讀5頁,還剩10頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

Java8中的Stream那么強大,那你知道它的原理是什么嗎?Java8API添加了一個新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數據。Stream使用一種類似用SQL語句從數據庫查詢數據的直觀方式來提供一種對Java集合運算和表達的高階抽象。StreamAPI可以極大提高Java程序員的生產力,讓程序員寫出高效率、干凈、簡潔的代碼。本文會對Stream的實現(xiàn)原理進行剖析。Stream的組成與特點Stream(流)是一個來自數據源的元素隊列并支持聚合操作:元素是特定類型的對象,形成一個隊列。

Java中的Stream并_不會_向集合那樣存儲和管理元素,而是按需計算數據源流的可以是集合Collection、數組Array、I/Ochannel,產生器generator

等聚合操作類似SQL語句一樣的操作,比如filter,

map,

reduce,

find,

match,

sorted等和以前的Collection操作不同,Stream操作還有兩個基礎的特征:Pipelining:中間操作都會返回流對象本身。這樣多個操作可以串聯(lián)成一個管道,如同流式風格(fluentstyle)。這樣做可以對操作進行優(yōu)化,比如延遲執(zhí)行(lazinessevaluation)和短路(

short-circuiting)內部迭代:以前對集合遍歷都是通過Iterator或者For-Each的方式,顯式的在集合外部進行迭代,這叫做外部迭代。

Stream提供了內部迭代的方式,通過訪問者模式(Visitor)實現(xiàn)。和迭代器又不同的是,Stream

可以并行化操作,迭代器只能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個

item

讀完后再讀下一個item。而使用并行去遍歷時,數據會被分成多個段,其中每一個都在不同的線程中處理,然后將結果一起輸出。Stream

的并行操作依賴于

Java7

中引入的

Fork/Join

框架(JSR166y)來拆分任務和加速處理過程。Java

的并行API演變歷程基本如下:1.0-1.4中的java.lang.Thread5.0中的java.util.concurrent6.0中的Phasers等7.0中的Fork/Join框架8.0中的LambdaStream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作:List

numbers

=

Arrays.asList(1,

2,

3,

4,

5,

6,

7,

8,

9);

numbers.parallelStream()

.forEach(out::println);

可以看到一行簡單的代碼就幫我們實現(xiàn)了并行輸出集合中元素的功能,但是由于并行執(zhí)行的順序是不可控的所以每次執(zhí)行的結果不一定相同。如果非得相同可以使用forEachOrdered方法執(zhí)行終止操作:List

numbers

=

Arrays.asList(1,

2,

3,

4,

5,

6,

7,

8,

9);

numbers.parallelStream()

.forEachOrdered(out::println);

這里有一個疑問,如果結果需要有序,是否和我們的并行執(zhí)行的初衷相悖?是的,這個場景下明顯無需使用并行流,直接用串行流執(zhí)行即可,否則性能可能更差,因為最后又強行將所有并行結果進行了排序。OK,下面我們先介紹一下Stream接口的相關知識。BaseStream接口Stream的父接口是BaseStream,后者是所有流實現(xiàn)的頂層接口,定義如下:public

interface

BaseStream<T,

S

extends

BaseStream<T,

S>>

extends

AutoCloseable

{

Iterator

iterator();

Spliterator

spliterator();

boolean

isParallel();

S

sequential();

S

parallel();

S

unordered();

S

onClose(Runnable

closeHandler);

void

close();

}其中,T為流中元素的類型,S為一個BaseStream的實現(xiàn)類,它里面的元素也是T并且S同樣是自己:SextendsBaseStream是不是有點暈?其實很好理解,我們看一下接口中對S的使用就知道了:如sequential()、parallel()這兩個方法,它們都返回了S實例,也就是說它們分別支持對當前流進行串行或者并行的操作,并返回「改變」后的流對象。如果是并行一定涉及到對當前流的拆分,即將一個流拆分成多個子流,子流肯定和父流的類型是一致的。子流可以繼續(xù)拆分子流,一直拆分下去…也就是說這里的S是BaseStream的一個實現(xiàn)類,它同樣是一個流,比如Stream、IntStream、LongStream等。Stream接口再來看一下Stream的接口聲明:public

interface

Stream

extends

BaseStream>

參考上面的解釋這里不難理解:即Stream可以繼續(xù)拆分為Stream,我們可以通過它的一些方法來證實:在公眾號編輯技術圈后臺回復“Java”,獲取一份驚喜禮包。Stream

filter(Predicatesuper

T>

predicate);

Stream

map(Functionsuper

T,

?

extends

R>

mapper);

Stream

flatMap(Functionsuper

T,

?

extends

Stream>

mapper);

Stream

sorted();

Stream

peek(Consumersuper

T>

action);

Stream

limit(long

maxSize);

Stream

skip(long

n);

...這些都是操作流的中間操作,它們的返回結果必須是流對象本身。關閉流操作BaseStream實現(xiàn)了

AutoCloseable

接口,也就是

close()

方法會在流關閉時被調用。同時,BaseStream

中還給我們提供了onClose()方法:/**

*

Returns

an

equivalent

stream

with

an

additional

close

handler.

Close

*

handlers

are

run

when

the

{@link

#close()}

method

*

is

called

on

the

stream,

and

are

executed

in

the

order

they

were

*

added.

All

close

handlers

are

run,

even

if

earlier

close

handlers

throw

*

exceptions.

If

any

close

handler

throws

an

exception,

the

first

*

exception

thrown

will

be

relayed

to

the

caller

of

{@code

close()},

with

*

any

remaining

exceptions

added

to

that

exception

as

suppressed

exceptions

*

(unless

one

of

the

remaining

exceptions

is

the

same

exception

as

the

*

first

exception,

since

an

exception

cannot

suppress

itself.)

May

*

return

itself.

*

*

This

is

an

intermediate

*

operation.

*

*

@param

closeHandler

A

task

to

execute

when

the

stream

is

closed

*

@return

a

stream

with

a

handler

that

is

run

if

the

stream

is

closed

*/

S

onClose(Runnable

closeHandler);當AutoCloseable的close()接口被調用的時候會觸發(fā)調用流對象的onClose()方法,但有幾點需要注意:onClose()

方法會返回流對象本身,也就是說可以對改對象進行多次調用如果調用了多個onClose()

方法,它會按照調用的順序觸發(fā),但是如果某個方法有異常則只會向上拋出第一個異常前一個

onClose()

方法拋出了異常不會影響后續(xù)

onClose()

方法的使用如果多個

onClose()

方法都拋出異常,只展示第一個異常的堆棧,而其他異常會被壓縮,只展示部分信息并行流和串行流BaseStream接口中分別提供了并行流和串行流兩個方法,這兩個方法可以任意調用若干次,也可以混合調用,但最終只會以最后一次方法調用的返回結果為準。參考parallel()方法的說明:Returnsanequivalentstreamthatisparallel.Mayreturnitself,eitherbecausethestreamwasalreadyparallel,orbecausetheunderlyingstreamstatewasmodifiedtobeparallel.所以多次調用同樣的方法并不會生成新的流,而是直接復用當前的流對象。下面的例子里以最后一次調用parallel()為準,最終是并行地計算sum:stream.parallel()

.filter(...)

.sequential()

.map(...)

.parallel()

.sum();ParallelStream背后的男人:ForkJoinPoolForkJoin框架是從JDK7中新特性,它同ThreadPoolExecutor一樣,也實現(xiàn)了Executor和ExecutorService接口。它使用了一個「無限隊列」來保存需要執(zhí)行的任務,而線程的數量則是通過構造函數傳入,

如果沒有向構造函數中傳入希望的線程數量,那么當前計算機可用的CPU數量會被設置為線程數量作為默認值。ForkJoinPool主要用來使用分治法(Divide-and-ConquerAlgorithm)來解決問題,典型的應用比如_快速排序算法_。這里的要點在于,F(xiàn)orkJoinPool需要使用相對少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合并任務。以此類推,對于500萬的數據也會做出同樣的分割處理,到最后會設置一個閾值來規(guī)定當數據規(guī)模到多少時,停止這樣的分割處理。比如,當元素的數量小于10時,會停止分割,轉而使用插入排序對它們進行排序。那么到最后,所有的任務加起來會有大概2000000+個。問題的關鍵在于,對于一個任務而言,只有當它所有的子任務完成之后,它才能夠被執(zhí)行,想象一下歸并排序的過程。所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務隊列中再添加一個任務并且在等待該任務完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時,就能夠讓其中的線程創(chuàng)建新的任務,并掛起當前的任務,此時線程就能夠從隊列中選擇子任務執(zhí)行。那么使用ThreadPoolExecutor或者ForkJoinPool,會有什么性能的差異呢?首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有「父子關系」的任務,比如使用4個線程來完成超過200萬個任務。使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,顯然這是不可行的。WorkStealing原理:每個工作線程都有自己的工作隊列WorkQueue;這是一個雙端隊列dequeue,它是線程私有的;ForkJoinTask中fork的子任務,將放入運行該任務的工作線程的隊頭,工作線程將以LIFO的順序來處理工作隊列中的任務,即堆棧的方式;為了最大化地利用CPU,空閑的線程將從其它線程的隊列中「竊取」任務來執(zhí)行但是是從工作隊列的尾部竊取任務,以減少和隊列所屬線程之間的競爭;雙端隊列的操作:push()/pop()僅在其所有者工作線程中調用,poll()是由其它線程竊取任務時調用的;當只剩下最后一個任務時,還是會存在競爭,是通過CAS來實現(xiàn)的;用ForkJoinPool的眼光來看ParallelStreamJava8為ForkJoinPool添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態(tài)元素,它擁有的默認線程數量等于運行計算機上的CPU數量。當調用Arrays類上添加的新方法時,自動并行化就會發(fā)生。比如用來排序一個數組的并行快速排序,用來對一個數組中的元素進行并行遍歷。自動并行化也被運用在Java8新添加的StreamAPI中。比如下面的代碼用來遍歷列表中的元素并執(zhí)行需要的操作:List

userInfoList

=

DaoContainers.getUserInfoDAO().queryAllByList(new

UserInfoModel());

userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);對于列表中的元素的操作都會以并行的方式執(zhí)行。forEach方法會為每個元素的計算操作創(chuàng)建一個任務,該任務會被前文中提到的ForkJoinPool中的commonPool處理。以上的并行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。對于ForkJoinPool通用線程池的線程數量,通常使用默認值就可以了,即運行時計算機的處理器數量。也可以通過設置系統(tǒng)屬性:-Djava.util.concurrent.ForkJoinPmon.parallelism=N

(N為線程數量),來調整ForkJoinPool的線程數量。值得注意的是,當前執(zhí)行的線程也會被用來執(zhí)行任務,所以最終的線程個數為N+1,1就是當前的主線程。這里就有一個問題,如果你在并行流的執(zhí)行計算使用了_阻塞操作_,如I/O,那么很可能會導致一些問題:在公眾號后端架構師后臺回復“架構整潔”,獲取一份驚喜禮包。public

static

String

query(String

question)

{

List

engines

=

new

ArrayList();

engines.add("/?q=");

engines.add("/?q=");

engines.add("/search?q=");

//

get

element

as

soon

as

it

is

available

Optional

result

=

engines.stream().parallel().map((base)

-

{

String

url

=

base

+

question;

//

open

connection

and

fetch

the

result

return

WS.url(url).get();

}).findAny();

return

result.get();

}這個例子很典型,讓我們來分析一下:這個并行流計算操作將由主線程和JVM默認的ForkJoinPmonPool()來共同執(zhí)行。map中是一個阻塞方法,需要通過訪問HTTP接口并得到它的response,所以任何一個worker線程在執(zhí)行到這里的時候都會阻塞并等待結果。所以當此時再其他地方通過并行流方式調用計算方法的時候,將會受到此處阻塞等待的方法的影響。目前的ForkJoinPool的實現(xiàn)并未考慮補償等待那些阻塞在等待新生成的線程的工作worker線程,所以最終ForkJoinPmonPool()中的線程將備用光并且阻塞等待。正如我們上面那個列子的情況分析得知,lambda的執(zhí)行并不是瞬間完成的,所有使用parallelstreams的程序都有可能成為阻塞程序的源頭,并且在執(zhí)行過程中程序中的其他部分將無法訪問這些workers,這意味著任何依賴parallelstreams的程序在什么別的東西占用著commonForkJoinPool時將會變得不可預知并且暗藏危機。小結:當需要處理遞歸分治算法時,考慮使用ForkJoinPool。仔細設置不再進行任務劃分的閾值,這個閾值對性能有影響。Java8中的一些特性會使用到ForkJoinPool中的通用線程池。在某些場合下,需要調整該線程池的默認的線程數量lambda應該盡量避免副作用,也就是說,避免突變基于堆的狀態(tài)以及任何IOlambda應該互不干擾,也就是說避免修改數據源(因為這可能帶來線程安全的問題)避免訪問在流操作生命周期內可能會改變的狀態(tài)并行流的性能并行流框架的性能受以下因素影響:數據大小:數據夠大,每個管道處理時間夠長,并行才有意義;源數據結構:每個管道操作都是基于初始數據源,通常是集合,將不同的集合數據源分割會有一定消耗;裝箱:處理基本類型比裝箱類型要快;核的數量:默認情況下,核數量越多,底層fork/join線程池啟動線程就越多;單元處理開銷:花在流中每個元素身上的時間越長,并行操作帶來的性能提升越明顯;源數據結構分為以下3組:性能好:ArrayList、數組或IntStream.range(數據支持隨機讀取,能輕易地被任意分割)性能一般:HashSet、TreeSet(數據不易公平地分解,大部分也是可以的)性能差:LinkedList(需要遍歷鏈表,難以對半分解)、Stream.iterate和BufferedReader.lines(長度未知,難以分解)注意:下面幾個部分節(jié)選自:Streams的幕后原理,順便感謝一下_BrianGoetz_,寫的太通透了。NQ模型要確定并行性是否會帶來提速,需要考慮的最后兩個因素是:可用的數據量和針對每個數據元素執(zhí)行的計算量。在我們最初的并行分解描述中,我們采用的概念是拆分,直到分段足夠小,以致解決該分段上的問題的順序方法更高效。分段大小必須依賴于所解決的問題,確切的講,取決于每個元素完成的工作量。例如,計算一個字符串的長度涉及的工作比計算字符串的

SHA-1

哈希值要少得多。為每個元素完成的工作越多,“大到足夠利用并行性”的閾值就越低。類似地,擁有的數據越多,拆分的分段就越多,而不會與“太小”閾值發(fā)生沖突。一個簡單但有用的并行性能模型是

NQ

模型,其中

N

是數據元素數量,Q

是為每個元素執(zhí)行的工作量。乘積

N*Q

越大,就越有可能獲得并行提速。對于具有很小的

Q

的問題,比如對數字求和,您通常可能希望看到

N>10,000

以獲得提速;隨著

Q

增加,獲得提速所需的數據大小將會減小。并行化的許多阻礙(比如拆分成本、組合成本或遇到順序敏感性)都可以通過

Q

更高的操作來緩解。盡管拆分某個

LinkedList

特征的結果可能很糟糕,但只要擁有足夠大的

Q,仍然可能獲得并行提速。遇到順序遇到順序指的是分發(fā)元素的順序是否對計算至關重要。一些(比如基于哈希的集合和映射)沒有有意義的遇到順序。流標志

ORDERED

描述了流是否有有意義的遇到順序。JDK集合的

spliterator

會根據集合的規(guī)范來設置此標志;一些中間操作可能注入

ORDERED

(sorted())或清除它(unordered())。如果流沒有遇到順序,大部分流操作都必須遵守該順序。對于順序執(zhí)行,會「自動保留遇到順序」,因為元素會按遇到它們的順序自然地處理。甚至在并行執(zhí)行中,許多操作(無狀態(tài)中間操作和一些終止操作(比如

reduce())),遵守遇到順序不會產生任何實際成本。但對于其他操作(有狀態(tài)中間操作,其語義與遇到順序關聯(lián)的終止操作,比如

findFirst()

forEachOrdered()),在并行執(zhí)行中遵守遇到順序的責任可能很重大。如果流有一個已定義的遇到順序,但該順序對結果沒有意義,那么可以通過使用

unordered()

操作刪除

ORDERED

標志,加速包含順序敏感型操作的管道的順序執(zhí)行。作為對遇到順序敏感的操作的示例,可以考慮

limit(),它會在指定大小處截斷一個流。在順序執(zhí)行中實現(xiàn)

limit()

很簡單:保留一個已看到多少元素的計數器,在這之后丟棄任何元素。但是在并行執(zhí)行中,實現(xiàn)

limit()

要復雜得多;您需要保留前

N

個元素。此要求大大限制了利用并行性的能力;如果輸入劃分為多個部分,您只有在某個部分之前的所有部分都已完成后,才知道該部分的結果是否將包含在最終結果中。因此,該實現(xiàn)一般會錯誤地選擇不使用所有可用的核心,或者緩存整個試驗性結果,直到您達到目標長度。如果流沒有遇到順序,limit()

操作可以自由選擇任何

N

個元素,這讓執(zhí)行效率變得高得多。知道元素后可立即將其發(fā)往下游,無需任何緩存,

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論