本次內(nèi)容主要講Fork-Join、
CountDownLatch、CyclicBarrier以及Callable、Future和FutureTask,最后再手寫一個(gè)自己的FutureTask,絕對(duì)干貨滿滿!
?
1、Fork-Join
1.1 什么是Fork-Join
Java多線程的開(kāi)發(fā)可以我們自己?jiǎn)⒂枚嗑€程,線程池,還可以使用forkjoin。forkjoin可以讓我們不去了解諸如Thread、Runnable等相關(guān)的知識(shí),只要遵循forkjoin的開(kāi)發(fā)模式,就可以寫出很好的多線程并發(fā)程序。
?
forkjoin采用的是分而治之。分而治之思想是:將一個(gè)難以直接解決的大問(wèn)題,分割成一些規(guī)模較小的相同問(wèn)題,以便各個(gè)擊破,分而治之。分而治之的策略是:對(duì)于一個(gè)規(guī)模為n的問(wèn)題,若該問(wèn)題可以容易地解決(比如說(shuō)規(guī)模n較?。﹦t直接解決,否則將其分解為m個(gè)規(guī)模較小的子問(wèn)題,
這些子問(wèn)題互相獨(dú)立且與原問(wèn)題形式相同(子問(wèn)題相互之間有聯(lián)系就會(huì)變?yōu)閯?dòng)態(tài)規(guī)范算法)
,遞歸地解這些子問(wèn)題,然后將各子問(wèn)題的解合并得到原問(wèn)題的解,這種算法設(shè)計(jì)策略叫做分治法。用一張圖來(lái)表示forkjoin原理。
我們可以了解一下計(jì)算機(jī)的十大經(jīng)典算法:快速排序、堆排序、歸并排序 、二分查找、BFPRT(線性查找)、DFS(深度優(yōu)先搜索)、BFS(廣度優(yōu)先搜索)、
Dijkstra、動(dòng)態(tài)規(guī)劃、樸素貝葉斯分類。其中有哪一些用到的是分而治之呢?有3個(gè),分別是快速排序、歸并排序和二分查找。
歸并排序是建立在歸并操作上的一種有效的排序算法。該算法是采用分治法的一個(gè)非常典型的應(yīng)用。將已有序的子序列合并,得到完全有序的序列;即先使每個(gè)子序列有序,再使子序列段間有序。若將兩個(gè)有序表合并成一個(gè)有序表,稱為2路歸并,與之對(duì)應(yīng)的還有多路歸并。對(duì)于給定的一組數(shù)據(jù),利用遞歸與分治技術(shù)將數(shù)據(jù)序列劃分成為越來(lái)越小的半子表,在對(duì)半子表排序后,再用遞歸方法將排好序的半子表合并成為越來(lái)越大的有序序列。為了提升性能,有時(shí)我們?cè)诎胱颖淼膫€(gè)數(shù)小于某個(gè)數(shù)(比如15)的情況下,對(duì)半子表的排序采用其他排序算法,比如插入排序。下面演示一下歸并排序的過(guò)程。
1.2 歸并排序(升序)示例
?先將數(shù)組劃分為左右2個(gè)子表:
?然后繼續(xù)對(duì)左右2個(gè)子表進(jìn)行拆分:
對(duì)拆分好的4個(gè)子表進(jìn)行排序:
?對(duì)有序子表進(jìn)行比較合并:
?對(duì)合并后的子表繼續(xù)比較合并:
?第二次合并后,數(shù)組呈有序排列。
?1.3 Fork-Join工作竊取
工作竊取是指當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢,則自動(dòng)取到其他線程的Task隊(duì)列中取出Task繼續(xù)執(zhí)行。ForkJoinPool中維護(hù)著多個(gè)線程在不斷地執(zhí)
行Task,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來(lái)就能能夠減少線程阻塞或是閑置的時(shí)間,提高CPU利用率。用一張圖進(jìn)行說(shuō)明。
1.3 Fork-Join使用
? Fork-Join使用兩個(gè)類來(lái)完成以上兩件事情:ForkJoinTask和ForkJoinPool。
我們要使用ForkJoin框架,必須首先創(chuàng)建一個(gè)ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork和join的操作機(jī)制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。
(1)RecursiveAction,用于沒(méi)有返回結(jié)果的任務(wù)
(2)RecursiveTask,用于有返回值的任務(wù)
?task要通過(guò)ForkJoinPool來(lái)執(zhí)行,使用submit 或 invoke
提交,兩者的區(qū)別是:invoke是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;submit是異步執(zhí)行。
join()和get方法當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果。調(diào)用get/join方法的時(shí)候會(huì)阻塞。還是用一個(gè)圖來(lái)說(shuō)明forkjoin的工作流程。
在我們自己實(shí)現(xiàn)的compute方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用invokeAll方法時(shí),又會(huì)進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。
?1.4 Fork-Join VS 單線程
假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景,數(shù)據(jù)庫(kù)中有編號(hào)為0到1千萬(wàn)的會(huì)員信息,要統(tǒng)計(jì)所有會(huì)員的余額總和。為了對(duì)比結(jié)果的一致性,用戶的余額不用隨機(jī)數(shù)表示,就用編號(hào)代表用戶的余額。現(xiàn)在的做法是每次從數(shù)據(jù)庫(kù)查詢出5000條數(shù)據(jù)進(jìn)行統(tǒng)計(jì),直到所有數(shù)據(jù)統(tǒng)計(jì)完成,進(jìn)行匯總。對(duì)比看看單線程和Fork-Join的差異。
先看單線程場(chǎng)景:
public class SingleThreadSumNumber { /** * 每次查詢5000條進(jìn)行統(tǒng)計(jì) */ private static
final int THRESHOLD = 5000; /** * 最小值 */ private static final int MIN = 0; /**
* 最大值*/ private static final int MAX = 10000000; public void sumNumber() { long
sum = 0; long startTime = System.currentTimeMillis(); int start = MIN; int end
= MIN + THRESHOLD; boolean isFirstTime = true; while (end <= MAX) { sum = sum +
batchSum(start, end);if (isFirstTime) { start = start + THRESHOLD + 1;
isFirstTime= false; } else { start = start + THRESHOLD; } end = end +
THRESHOLD; } System.out.println("The result is " + sum + " spend time:" +
(System.currentTimeMillis() - startTime) + "ms"); } /** * 統(tǒng)計(jì)每次查詢出來(lái)的余額總和 * @param
start *@param end * @return */ public long batchSum(int start, int end) { long
sum = 0; try { Thread.sleep(15);//休眠15毫秒模擬查詢業(yè)務(wù) } catch (InterruptedException e)
{ e.printStackTrace(); }for (int i = start; i <= end; i++) { sum += i; } return
sum; }public static void main(String[] args) { SingleThreadSumNumber thread =
new SingleThreadSumNumber(); thread.sumNumber(); } }
運(yùn)行程序輸出以下結(jié)果:
余額總和為50000005000000,花費(fèi)了30119毫秒,下面使用forkjoin來(lái)進(jìn)行統(tǒng)計(jì):
1 import java.util.concurrent.ForkJoinPool; 2 import
java.util.concurrent.RecursiveTask; 3 4 public class ForkJoinDemo { 5 /** 6
* 門限值,如果任務(wù)門限低于此值,則進(jìn)行計(jì)算 7 */ 8 private static final int THRESHOLD = 5000; 9 10
/** 11 * 最小值 12 */ 13 private static final int MIN = 0; 14 15 /** 16 * 最大值 17
*/ 18 private static final int MAX = 10000000; 19 20 private static class
SumNumberTaskextends RecursiveTask<Long> { 21 private int start; 22 private int
end;23 24 public SumNumberTask(int start, int end) { 25 this.start = start; 26
this.end = end; 27 } 28 29 @Override 30 protected Long compute() { 31 if (end
- start < THRESHOLD) { 32 return sumBatch(start, end); 33 } else { 34 int mid =
(start + end) / 2; 35 SumNumberTask left = new SumNumberTask(start, mid); 36
SumNumberTask right =new SumNumberTask(mid + 1, end); 37 invokeAll(left,
right);38 long leftResult = left.join(); 39 long rightResult = right.join(); 40
return leftResult + rightResult; 41 } 42 } 43 } 44 45 public void
sumNumber() {46 ForkJoinPool pool = new ForkJoinPool(); 47 long start =
System.currentTimeMillis();48 int recordMin = MIN; 49 int recordMax = MAX; 50
SumNumberTask sumTask =new SumNumberTask(recordMin, recordMax); 51
pool.invoke(sumTask);52 System.out.println("Task is Running....."); 53 Long
result = sumTask.join(); 54 System.out.println("The result is " + result + "
spend time:"55 + (System.currentTimeMillis() - start) + "ms"); 56 } 57 58 /**
59 * 統(tǒng)計(jì)每次任務(wù)的總和 60 * @param fromId 61 * @param toId 62 * @return 63 */ 64
public static long sumBatch(int fromId, int toId) { 65 long sum = 0; 66 try { 67
Thread.sleep(15);//休眠15毫秒模擬查詢業(yè)務(wù) 68 } catch (InterruptedException e) { 69
e.printStackTrace();70 } 71 for (int i = fromId; i <= toId; i++) { 72 sum += i;
73 } 74 return sum; 75 } 76 77 public static void main(String[] args) { 78
ForkJoinDemo forkJoinDemo =new ForkJoinDemo(); 79 forkJoinDemo.sumNumber(); 80
}81 }
輸出結(jié)果:
?
余額總和為50000005000000,和使用單線程統(tǒng)計(jì)時(shí)一致,使用forkjoin達(dá)到了同樣的目的,但是只花費(fèi)了4078毫秒,性能提升了7倍多。為了使性能有進(jìn)一步提升,我們可以在第44行指定并發(fā)數(shù)量。不傳參情況下,默認(rèn)并發(fā)量是當(dāng)前服務(wù)器的邏輯CPU個(gè)數(shù)。我們把并發(fā)量調(diào)整成64,即ForkJoinPool
pool = new ForkJoinPool(16 * 4),執(zhí)行程序,輸出結(jié)果為:
?統(tǒng)計(jì)結(jié)果一致,花費(fèi)了567毫秒,比起單線程統(tǒng)計(jì),性能提升了53倍之多,由此可見(jiàn)forkjoin的并發(fā)威力。
2、CountDownLatch
?2.1 什么是CountDownLatch
JDK對(duì)CountDownLatch的解釋是:一種同步輔助器,它允許一個(gè)或多個(gè)線程等待,直到在其他線程中執(zhí)行的一組操作完成為止。舉個(gè)例子來(lái)理解CountDownLatch:隔壁寢室的老王今天要參加學(xué)校運(yùn)動(dòng)會(huì)的400米決賽,跟小王一起爭(zhēng)奪冠軍的還有另外5個(gè)人,不管這6位選手的內(nèi)心多激動(dòng)多澎湃,也要等裁判的發(fā)令槍響了之后才能起跑,裁判不發(fā)出指令,選手就只能在起跑線等待,這就是CountDownLatch的作用。但是實(shí)際場(chǎng)景并不只有一個(gè)發(fā)令裁判,參加過(guò)學(xué)校運(yùn)動(dòng)會(huì)的同學(xué)都知道,還可能需要若干個(gè)裁判進(jìn)行手動(dòng)計(jì)時(shí),要等所有的裁判都就位后,發(fā)令槍一響,運(yùn)動(dòng)員才能起跑。假設(shè)有3個(gè)計(jì)時(shí)裁判,一個(gè)發(fā)令裁判,用一個(gè)圖來(lái)說(shuō)明。
? 在比賽開(kāi)始前,發(fā)令裁判會(huì)用洪荒之力吼一聲,各~就~各~位,此時(shí)發(fā)令裁判會(huì)用炯炯有神的目光和3位計(jì)時(shí)裁判交流,3位裁判分別點(diǎn)頭示意已經(jīng)準(zhǔn)備好了,此時(shí)發(fā)令裁判會(huì)再次大吼一聲,預(yù)備~~~跑?。?!此時(shí)憋了許久的6位運(yùn)動(dòng)員飛奔出去,當(dāng)然老王遙遙領(lǐng)先,畢竟女神給他說(shuō)了跑第一名的話晚上有獎(jiǎng)勵(lì)。發(fā)令裁判的任務(wù)完成,不用繼續(xù)執(zhí)行下去,而3個(gè)計(jì)時(shí)裁判繼續(xù)工作,對(duì)6位選手的成績(jī)進(jìn)行一個(gè)記錄。
?2.1 CountDownLatch實(shí)戰(zhàn)
用一段程序來(lái)模擬老王參加運(yùn)動(dòng)會(huì)400米決賽的場(chǎng)景。
import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo {
/** * 運(yùn)動(dòng)員在計(jì)時(shí)裁判和發(fā)令裁判就位后才能起跑 */ static CountDownLatch sportsManLatch = new
CountDownLatch(4); /** * 發(fā)令裁判在3個(gè)計(jì)時(shí)裁判準(zhǔn)備好之后才能發(fā)令 */ static CountDownLatch
orderRefereeLatch =new CountDownLatch(3); /** * 計(jì)時(shí)裁判 */ static class TimeReferee
implements Runnable { private int no; public TimeReferee(int no) { this.no =
no; } @Overridepublic void run() { System.out.println(no + "號(hào)計(jì)時(shí)裁判就位");
orderRefereeLatch.countDown(); sportsManLatch.countDown(); } }/** * 發(fā)令裁判 */
static class OrderReferee implements Runnable { @Override public void run() {
try { orderRefereeLatch.await(); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("發(fā)令裁判發(fā)出指令~~~~~~");
sportsManLatch.countDown(); } }/*** * 運(yùn)動(dòng)員 */ static class SportsMan implements
Runnable {private int no; public SportsMan(int no) { this.no = no; } @Override
public void run() { try { System.out.println(no + "號(hào)運(yùn)動(dòng)員已經(jīng)就位");
sportsManLatch.await(); }catch (InterruptedException e) { e.printStackTrace();
} System.out.println(no+ "號(hào)選手說(shuō),我要跑第一"); } } public static void main(String[]
args)throws InterruptedException { //6個(gè)運(yùn)動(dòng)員就位 for (int i = 0; i < 6; i++) { new
Thread(new SportsMan(i)).start(); } //發(fā)令裁判和計(jì)時(shí)裁判眼神確認(rèn),等計(jì)時(shí)裁判都準(zhǔn)備好之后發(fā)令 new Thread(new
OrderReferee()).start();//3個(gè)計(jì)時(shí)裁判就位 for (int i = 0; i < 3; i++) { new Thread(new
TimeReferee(i)).start(); } } }
程序輸出:
3、CyclicBarrier
?3.1 什么是CyclicBarrier
JDK對(duì)
CyclicBarrier的解釋是:一種同步輔助工具,它允許一組線程全部互相等待以到達(dá)一個(gè)公共的障礙點(diǎn)。我們可以從字面意思理解它,可循環(huán)使用(Cyclic)的屏障(Barrier)。
它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)打開(kāi),所有被屏障攔截的線程才能繼續(xù)運(yùn)行。CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int
parties),parties表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。CyclicBarrier還提供一個(gè)更高級(jí)的構(gòu)造函數(shù)CyclicBarrier(int
parties,Runnable
barrierAction),用于在parties個(gè)線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。還用一張圖來(lái)說(shuō)明。
?3.2?CyclicBarrier實(shí)戰(zhàn)
CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景。我們模擬3個(gè)子線程向一個(gè)map中添加數(shù)據(jù),它們添加數(shù)據(jù)完成后,到一個(gè)屏障點(diǎn)進(jìn)行等待,由統(tǒng)計(jì)線程對(duì)數(shù)據(jù)進(jìn)行打印,統(tǒng)計(jì)線程工作結(jié)束后,3個(gè)子線程再被統(tǒng)一釋放去干其他工作。我們?cè)O(shè)置2個(gè)屏障點(diǎn)來(lái)演示,,體現(xiàn)其可循環(huán)使用的特征。
public class CyclicBarrierDemo { private static CyclicBarrier barrier = new
CyclicBarrier(3,new CollectThread()); /**存放子線程產(chǎn)生數(shù)據(jù)的容器*/ private static
ConcurrentHashMap<String, Long> map =new ConcurrentHashMap<>(); public static
void main(String[] args) throws InterruptedException { for (int i = 0; i < 3;
i++) { Thread thread = new Thread(new WorkThread()); thread.start(); }
Thread.sleep(5); } /** * 負(fù)責(zé)對(duì)子線程的結(jié)果進(jìn)行其他處理 */ private static class CollectThread
implements Runnable { @Override public void run() { StringBuilder result = new
StringBuilder();for (Map.Entry<String, Long> workResult : map.entrySet()) {
result.append("[" + workResult.getValue() + "]"); } System.out.println("the
result = " + result); System.out.println("CollectThread do other things"); try
{ Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("CollectThread end........"); } } /** * 實(shí)際工作的子線程 */ private
static class WorkThread implements Runnable { @Override public void run() { long
id = Thread.currentThread().getId(); map.put(id + "", id); Random r = new
Random();try { Thread.sleep(r.nextInt(1000)); System.out.println("Thread_" + id
+ " first do something "); //第一次到達(dá)屏障 barrier.await(); System.out.println(
"Thread_" + id + " first do other things"); Thread.sleep(r.nextInt(500));
map.put(id* 2 + "", id * 2); System.out.println("Thread_" + id + " second do
something "); //第二次到達(dá)屏障 barrier.await(); System.out.println("Thread_" + id + "
second other things "); } catch (Exception e) { e.printStackTrace(); } } } }
程序輸出:
3.3?CountDownLatch和CyclicBarrier對(duì)比
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以反復(fù)使用。CountDownLatch.await()一般阻塞工作線程,所有的進(jìn)行預(yù)備工作的線程執(zhí)行countDown(),而CyclicBarrier通過(guò)工作線程調(diào)用await()從而自行阻塞,直到所有工作線程達(dá)到指定屏障,再大家一起往下走。在控制多個(gè)線程同時(shí)運(yùn)行上,CountDownLatch可以不限線程數(shù)量,而CyclicBarrier是固定線程數(shù)。同時(shí),CyclicBarrier還可以提供一個(gè)barrierAction,合并多線程計(jì)算結(jié)果。
4、Callable、Future和FutureTask
4.1 Runnable、Callable、Future和FutureTask之間的關(guān)系
Runnable是一個(gè)接口,在它里面只聲明了一個(gè)run()方法,由于run()方法返回值為void類型,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。Callable位于java.util.concurrent包下,它也是一個(gè)接口,在它里面也只聲明了一個(gè)方法,只不過(guò)這個(gè)方法叫做call(),這是一個(gè)泛型接口,call()函數(shù)返回的類型就是傳遞進(jìn)來(lái)的V類型。Future就是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。要獲取返回結(jié)果時(shí)可以調(diào)用get方,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。因?yàn)镕uture只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了FutureTask。
FutureTask類實(shí)現(xiàn)了RunnableFuture接口,RunnableFuture繼承了Runnable接口和Future接口,所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值。用一個(gè)圖來(lái)說(shuō)明。
因此當(dāng)我們想通過(guò)一個(gè)線程運(yùn)行Callable,但是Thread不支持構(gòu)造方法中傳遞Callable的實(shí)例,我們需要通過(guò)FutureTask把一個(gè)Callable包裝成Runnable,然后再通過(guò)這個(gè)FutureTask拿到Callable運(yùn)行后的返回值。要想new出一個(gè)FutureTask的實(shí)例,有2種方式,直接貼出代碼。
public FutureTask(Callable<V> callable) { if (callable == null) throw new
NullPointerException();this.callable = callable; this.state = NEW; // ensure
visibility of callable } public FutureTask(Runnable runnable, V result) { this
.callable = Executors.callable(runnable, result); this.state = NEW; // ensure
visibility of callable }
4.2?Callable和FutureTask實(shí)戰(zhàn)
這個(gè)例子比較簡(jiǎn)單,在一個(gè)主線程中創(chuàng)建一個(gè)
callable來(lái)對(duì)1到10000進(jìn)行累加,再休眠3秒,然后把這個(gè)callable封裝成一個(gè)futureTask,交給一個(gè)線程去運(yùn)行,最終查看callable的返回結(jié)果和阻塞效果。
import java.util.concurrent.Callable; import
java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;
public class FutureTaskDemo { public static void main(String[] args) throws
InterruptedException, ExecutionException { Callable<Long> callable = new
Callable<Long>() { long sum = 0; @Override public Long call() throws Exception {
for (int i = 0; i <= 10000; i++) { sum += i; } Thread.sleep(3000);//
主要是為了演示get()時(shí)候的阻塞效果 return sum; } }; FutureTask<Long> futureTask = new
FutureTask<>(callable); new Thread(futureTask).start(); Thread.sleep(10);
System.out.println("main線程繼續(xù)執(zhí)行"); System.out.println("獲取callable計(jì)算結(jié)果 = " +
futureTask.get()); System.out.println("main線程繼續(xù)執(zhí)行 "); } }
程序輸出:
?可以看到當(dāng)futureTask.get()沒(méi)有獲取到返回結(jié)果時(shí),主線程是處于阻塞狀態(tài)。
4.3 手寫一個(gè)FutureTask
?
要實(shí)現(xiàn)一個(gè)簡(jiǎn)易的FutureTask,通過(guò)上面對(duì)幾個(gè)接口之間關(guān)系的介紹,以及閱讀FutureTask代碼可以看出,只需定義一個(gè)類,實(shí)現(xiàn)Runnable和Future接口,并實(shí)現(xiàn)run()方法和get()方法就可以了,核心思想就是上一篇文章中提到的通知/等待機(jī)制。直接上代碼:
import java.util.concurrent.*; public class MyFutureTask<V> implements
Runnable, Future<V> { private Callable<V> callable; private V result = null;
public MyFutureTask(Callable<V> callable) { this.callable = callable; }
@Overridepublic void run() { V temp = null; try { temp = callable.call(); }
catch (Exception e) { e.printStackTrace(); } synchronized (this) { result =
temp;this.notifyAll(); } } @Override public V get() throws InterruptedException
{if (result != null) { return result; } System.out.println("等待結(jié)果執(zhí)行完成。。。。。");
synchronized (this) { this.wait(); } return result; } @Override public boolean
cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean
isCancelled() {return false; } @Override public boolean isDone() { return false
; } @Overridepublic V get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {return null; } }
?為了驗(yàn)證效果,把上一段代碼中的FutureTask改成MyFutureTask,其余代碼統(tǒng)統(tǒng)不變。
import java.util.concurrent.Callable; public class FutureTaskDemo { public
static void main(String[] args) throws InterruptedException { Callable<Long>
callable =new Callable<Long>() { long sum = 0; @Override public Long call()
throws Exception { for (int i = 0; i <= 10000; i++) { sum += i; } Thread.sleep(
3000);//主要是為了演示get()時(shí)候的阻塞效果 return sum; } }; MyFutureTask<Long> futureTask =
new MyFutureTask<>(callable);new Thread(futureTask).start(); Thread.sleep(10);
System.out.println("main線程繼續(xù)執(zhí)行"); System.out.println("獲取callable計(jì)算結(jié)果 = " +
futureTask.get()); System.out.println("main線程繼續(xù)執(zhí)行 "); } }
運(yùn)行程序,可以看到輸出結(jié)果和阻塞現(xiàn)象與使用FutureTask一致:
?
?
這篇隨筆就介紹這么多內(nèi)容,希望大家看了有收獲。原子操作CAS在下一篇文章中介紹,閱讀過(guò)程中如發(fā)現(xiàn)描述有誤,請(qǐng)指出,謝謝。
?
熱門工具 換一換
