到目前為止,你學(xué)到的都是順序編程,順序編程的概念就是某一時(shí)刻只有一個(gè)任務(wù)在執(zhí)行,順序編程固然能夠解決很多問(wèn)題,但是對(duì)于某種任務(wù),如果能夠并發(fā)的執(zhí)行程序中重要的部分就顯得尤為重要,同時(shí)也可以極大提高程序運(yùn)行效率,享受并發(fā)為你帶來(lái)的便利。但是,熟練掌握并發(fā)編程理論和技術(shù),對(duì)于只會(huì)CRUD的你來(lái)說(shuō)是一種和你剛學(xué)面向?qū)ο笠粯拥囊环N飛躍。
正如你所看到的,當(dāng)并行的任務(wù)彼此干涉時(shí),實(shí)際的并發(fā)問(wèn)題就會(huì)接踵而至。而且并發(fā)問(wèn)題不是很難復(fù)現(xiàn),在你實(shí)際的測(cè)試過(guò)程中往往會(huì)忽略它們,因?yàn)楣收鲜桥紶柊l(fā)生的,這也是我們研究它們的必要條件:如果你對(duì)并發(fā)問(wèn)題置之不理,那么你最終會(huì)承受它給你帶來(lái)的損害。
并發(fā)的多面性
更快的執(zhí)行
速度問(wèn)題聽(tīng)起來(lái)很簡(jiǎn)單,如果你想讓一個(gè)程序運(yùn)行的更快一些,那么可以將其切成多個(gè)分片,在單獨(dú)的處理器上運(yùn)行各自的分片:前提是這些任務(wù)彼此之間沒(méi)有聯(lián)系。
注意:速度的提高是以多核處理器而不是芯片的形式出現(xiàn)的。
如果你有一臺(tái)多處理器的機(jī)器,那么你就可以在這些處理器之間分布多個(gè)任務(wù),從而極大的提高吞吐量。但是,并發(fā)通常是提高在單處理器上的程序的性能。
在單處理上的性能開(kāi)銷(xiāo)要比多處理器上的性能開(kāi)銷(xiāo)大很多,因?yàn)檫@其中增加了線程切換
(從一個(gè)線程切換到另外一個(gè)線程)的重要依據(jù)。表面上看,將程序的所有部分當(dāng)作單個(gè)的任務(wù)運(yùn)行好像是開(kāi)銷(xiāo)更小一點(diǎn),節(jié)省了線程切換的時(shí)間。
改進(jìn)代碼的設(shè)計(jì)
在單CPU機(jī)器上使用多任務(wù)的程序在任意時(shí)刻仍舊只在執(zhí)行一項(xiàng)工作,你肉眼觀察到控制臺(tái)的輸出好像是這些線程在同時(shí)工作,這不過(guò)是CPU的障眼法罷了,CPU為每個(gè)任務(wù)都提供了不固定的時(shí)間切片。Java
的線程機(jī)制是搶占式的,也就是說(shuō),你必須編寫(xiě)某種讓步語(yǔ)句才會(huì)讓線程進(jìn)行切換,切換給其他線程。
基本的線程機(jī)制
并發(fā)編程使我們可以將程序劃分成多個(gè)分離的,獨(dú)立運(yùn)行的任務(wù)。通過(guò)使用多線程機(jī)制,這些獨(dú)立任務(wù)中的每一項(xiàng)任務(wù)都由執(zhí)行線程
來(lái)驅(qū)動(dòng)。一個(gè)線程就是進(jìn)程中的一個(gè)單一的順序控制流。因此,單個(gè)進(jìn)程可以擁有多個(gè)并發(fā)執(zhí)行的任務(wù),但是你的程序看起來(lái)每個(gè)任務(wù)都有自己的CPU一樣。其底層是切分CPU時(shí)間,通常你不需要考慮它。
定義任務(wù)
線程可以驅(qū)動(dòng)任務(wù),因此你需要一種描述任務(wù)的方式,這可以由 Runnable 接口來(lái)提供,要想定義任務(wù),只需要實(shí)現(xiàn) Runnable 接口,并在run
方法中實(shí)現(xiàn)你的邏輯即可。
public class TestThread implements Runnable{ public static int i = 0;
@Override public void run() { System.out.println("start thread..." + i); i++;
System.out.println("end thread ..." + i); } public static void main(String[]
args) { for(int i = 0;i < 5;i++){ TestThread testThread = new TestThread();
testThread.run(); } } }
任務(wù) run 方法會(huì)有某種形式的循環(huán),使得任務(wù)一直運(yùn)行下去直到不再需要,所以要設(shè)定 run 方法的跳出條件(有一種選擇是從 run 中直接返回,下面會(huì)說(shuō)到。)
在 run 中使用靜態(tài)方法 Thread.yield()
可以使用線程調(diào)度,它的意思是建議線程機(jī)制進(jìn)行切換:你已經(jīng)執(zhí)行完重要的部分了,剩下的交給其他線程跑一跑吧。注意是建議執(zhí)行,而不是強(qiáng)制執(zhí)行。在下面添加
Thread.yield() 你會(huì)看到有意思的輸出
public void run() { System.out.println("start thread..." + i); i++;
Thread.yield(); System.out.println("end thread ..." + i); }
Thread 類(lèi)
將 Runnable 轉(zhuǎn)變工作方式的傳統(tǒng)方式是使用 Thread 類(lèi)托管他,下面展示了使用 Thread 類(lèi)來(lái)實(shí)現(xiàn)一個(gè)線程。
public static void main(String[] args) { for(int i = 0;i < 5;i++){ Thread t =
new Thread(new TestThread()); t.start(); } System.out.println("Waiting thread
..."); }
Thread 構(gòu)造器只需要一個(gè) Runnable 對(duì)象,調(diào)用 Thread 對(duì)象的 start() 方法為該線程執(zhí)行必須的初始化操作,然后調(diào)用
Runnable 的 run 方法,以便在這個(gè)線程中啟動(dòng)任務(wù)??梢钥吹?,在 run 方法還沒(méi)有結(jié)束前,run 就被返回了。也就是說(shuō),程序不會(huì)等到 run
方法執(zhí)行完畢就會(huì)執(zhí)行下面的指令。
在 run 方法中打印出每個(gè)線程的名字,就更能看到不同的線程的切換和調(diào)度
@Override public void run() { System.out.println(Thread.currentThread() +
"start thread..." + i); i++; System.out.println(Thread.currentThread() + "end
thread ..." + i); }
這種線程切換和調(diào)度是交由 線程調(diào)度器
來(lái)自動(dòng)控制的,如果你的機(jī)器上有多個(gè)處理器,線程調(diào)度器會(huì)在這些處理器之間默默的分發(fā)線程。每一次的運(yùn)行結(jié)果都不盡相同,因?yàn)榫€程調(diào)度機(jī)制是未確定的。
使用 Executor
CachedThreadPool
JDK1.5 的java.util.concurrent 包中的執(zhí)行器 Executor 將為你管理 Thread
對(duì)象,從而簡(jiǎn)化了并發(fā)編程。Executor 在客戶端和任務(wù)之間提供了一個(gè)間接層;與客戶端直接執(zhí)行任務(wù)不同,這個(gè)中介對(duì)象將執(zhí)行任務(wù)。Executor
允許你管理異步任務(wù)的執(zhí)行,而無(wú)須顯示地管理線程的生命周期。
public static void main(String[] args) { ExecutorService service =
Executors.newCachedThreadPool(); for(int i = 0;i < 5;i++){ service.execute(new
TestThread()); } service.shutdown(); }
我們使用 Executor 來(lái)替代上述顯示創(chuàng)建 Thread 對(duì)象。CachedThreadPool
為每個(gè)任務(wù)都創(chuàng)建一個(gè)線程。注意:ExecutorService 對(duì)象是使用靜態(tài)的Executors 創(chuàng)建的,這個(gè)方法可以確定 Executor 類(lèi)型。對(duì)
shutDown 的調(diào)用可以防止新任務(wù)提交給 ExecutorService ,這個(gè)線程在 Executor 中所有任務(wù)完成后退出。
FixedThreadPool
FixedThreadPool 使你可以使用有限的線程集來(lái)啟動(dòng)多線程
public static void main(String[] args) { ExecutorService service =
Executors.newFixedThreadPool(5); for(int i = 0;i < 5;i++){ service.execute(new
TestThread()); } service.shutdown(); }
有了 FixedThreadPool
使你可以一次性的預(yù)先執(zhí)行高昂的線程分配,因此也就可以限制線程的數(shù)量。這可以節(jié)省時(shí)間,因?yàn)槟悴槐貫槊總€(gè)任務(wù)都固定的付出創(chuàng)建線程的開(kāi)銷(xiāo)。
SingleThreadExecutor
SingleThreadExecutor 就是線程數(shù)量為 1 的 FixedThreadPool,如果向 SingleThreadPool
一次性提交了多個(gè)任務(wù),那么這些任務(wù)將會(huì)排隊(duì),每個(gè)任務(wù)都會(huì)在下一個(gè)任務(wù)開(kāi)始前結(jié)束,所有的任務(wù)都將使用相同的線程。SingleThreadPool
會(huì)序列化所有提交給他的任務(wù),并會(huì)維護(hù)它自己(隱藏)的懸掛隊(duì)列。
public static void main(String[] args) { ExecutorService service =
Executors.newSingleThreadExecutor(); for(int i = 0;i < 5;i++){
service.execute(new TestThread()); } service.shutdown(); }
從輸出的結(jié)果就可以看到,任務(wù)都是挨著執(zhí)行的。我為任務(wù)分配了五個(gè)線程,但是這五個(gè)線程不像是我們之前看到的有換進(jìn)換出的效果,它每次都會(huì)先執(zhí)行完自己的那個(gè)線程,然后余下的線程繼續(xù)“走完”這條線程的執(zhí)行路徑。你可以用
SingleThreadExecutor 來(lái)確保任意時(shí)刻都只有唯一一個(gè)任務(wù)在運(yùn)行。
從任務(wù)中產(chǎn)生返回值
Runnable 是執(zhí)行工作的獨(dú)立任務(wù),但它不返回任何值。如果你希望任務(wù)在完成時(shí)能夠返回一個(gè)值 ,這個(gè)時(shí)候你就需要考慮使用 Callable 接口,它是
JDK1.5 之后引入的,通過(guò)調(diào)用它的submit 方法,可以把它的返回值放在一個(gè) Future 對(duì)象中,然后根據(jù)相應(yīng)的 get() 方法取得提交之后的返回值。
public class TaskWithResult implements Callable<String> { private int id;
public TaskWithResult(int id){ this.id = id; } @Override public String call()
throws Exception { return "result of TaskWithResult " + id; } } public class
CallableDemo { public static void main(String[] args) throws
ExecutionException, InterruptedException { ExecutorService executors =
Executors.newCachedThreadPool(); ArrayList<Future<String>> future = new
ArrayList<>(); for(int i = 0;i < 10;i++){ // 返回的是調(diào)用 call 方法的結(jié)果
future.add(executors.submit(new TaskWithResult(i))); } for(Future<String> fs :
future){ System.out.println(fs.get()); } } }
submit() 方法會(huì)返回 Future 對(duì)象,F(xiàn)uture 對(duì)象存儲(chǔ)的也就是你返回的結(jié)果。你也可以使用 isDone 來(lái)查詢 Future 是否已經(jīng)完成。
休眠
影響任務(wù)行為的一種簡(jiǎn)單方式就是使線程 休眠,選定給定的休眠時(shí)間,調(diào)用它的 sleep() 方法, 一般使用的TimeUnit 這個(gè)時(shí)間類(lèi)替換
Thread.sleep() 方法,示例如下:
public class SuperclassThread extends TestThread{ @Override public void run()
{ System.out.println(Thread.currentThread() + "starting ..." ); try { for(int i
= 0;i < 5;i++){ if(i == 3){ System.out.println(Thread.currentThread() +
"sleeping ..."); TimeUnit.MILLISECONDS.sleep(1000); } } } catch
(InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread() + "wakeup and end ..."); } public
static void main(String[] args) { ExecutorService executors =
Executors.newCachedThreadPool(); for(int i = 0;i < 5;i++){
executors.execute(new SuperclassThread()); } executors.shutdown(); } }
關(guān)于 TimeUnit 中的 sleep() 方法和 Thread.sleep() 方法的比較,請(qǐng)參考下面這篇博客
(https://www.cnblogs.com/xiadongqing/p/9925567.html)
優(yōu)先級(jí)
上面提到線程調(diào)度器對(duì)每個(gè)線程的執(zhí)行都是不可預(yù)知的,隨機(jī)執(zhí)行的,那么有沒(méi)有辦法告訴線程調(diào)度器哪個(gè)任務(wù)想要優(yōu)先被執(zhí)行呢?你可以通過(guò)設(shè)置線程的優(yōu)先級(jí)狀態(tài),告訴線程調(diào)度器哪個(gè)線程的執(zhí)行優(yōu)先級(jí)比較高,"請(qǐng)給這個(gè)騎手馬上派單",線程調(diào)度器傾向于讓優(yōu)先級(jí)較高的線程優(yōu)先執(zhí)行,然而,這并不意味著優(yōu)先級(jí)低的線程得不到執(zhí)行,也就是說(shuō),優(yōu)先級(jí)不會(huì)導(dǎo)致死鎖的問(wèn)題。優(yōu)先級(jí)較低的線程只是執(zhí)行頻率較低。
public class SimplePriorities implements Runnable{ private int priority;
public SimplePriorities(int priority) { this.priority = priority; } @Override
public void run() { Thread.currentThread().setPriority(priority); for(int i =
0;i < 100;i++){ System.out.println(this); if(i % 10 == 0){ Thread.yield(); } }
} @Override public String toString() { return Thread.currentThread() + " " +
priority; } public static void main(String[] args) { ExecutorService service =
Executors.newCachedThreadPool(); for(int i = 0;i < 5;i++){ service.execute(new
SimplePriorities(Thread.MAX_PRIORITY)); } service.execute(new
SimplePriorities(Thread.MIN_PRIORITY)); } }
toString() 方法被覆蓋,以便通過(guò)使用 Thread.toString() 方法來(lái)打印線程的名稱。你可以改寫(xiě)線程的默認(rèn)輸出,這里采用了
Thread[pool-1-thread-1,10,main] 這種形式的輸出。
通過(guò)輸出,你可以看到,最后一個(gè)線程的優(yōu)先級(jí)最低,其余的線程優(yōu)先級(jí)最高。注意,優(yōu)先級(jí)是在 run
開(kāi)頭設(shè)置的,在構(gòu)造器中設(shè)置它們不會(huì)有任何好處,因?yàn)檫@個(gè)時(shí)候線程還沒(méi)有執(zhí)行任務(wù)。
盡管JDK有10個(gè)優(yōu)先級(jí),但是一般只有MAX_PRIORITY,NORM_PRIORITY,MIN_PRIORITY 三種級(jí)別。
作出讓步
我們上面提過(guò),如果知道一個(gè)線程已經(jīng)在 run()
方法中運(yùn)行的差不多了,那么它就可以給線程調(diào)度器一個(gè)提示:我已經(jīng)完成了任務(wù)中最重要的部分,可以讓給別的線程使用CPU了。這個(gè)暗示將通過(guò) yield() 方法作出。
有一個(gè)很重要的點(diǎn)就是,Thread.yield() 是建議執(zhí)行切換CPU,而不是強(qiáng)制執(zhí)行CPU切換。
對(duì)于任何重要的控制或者在調(diào)用應(yīng)用時(shí),都不能依賴于 yield()方法,實(shí)際上, yield() 方法經(jīng)常被濫用。
后臺(tái)線程
后臺(tái)(daemon) 線程,是指運(yùn)行時(shí)在后臺(tái)提供的一種服務(wù)線程,這種線程不是屬于必須的。當(dāng)所有非后臺(tái)線程結(jié)束時(shí),程序也就停止了,同時(shí)會(huì)終止所有的后臺(tái)線程。
反過(guò)來(lái)說(shuō),只要有任何非后臺(tái)線程還在運(yùn)行,程序就不會(huì)終止。
public class SimpleDaemons implements Runnable{ @Override public void run() {
while (true){ try { TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this); } catch
(InterruptedException e) { System.out.println("sleep() interrupted"); } } }
public static void main(String[] args) throws InterruptedException { for(int i
= 0;i < 10;i++){ Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true); daemon.start(); } System.out.println("All Daemons
started"); TimeUnit.MILLISECONDS.sleep(175); } }
在每次的循環(huán)中會(huì)創(chuàng)建10個(gè)線程,并把每個(gè)線程設(shè)置為后臺(tái)線程,然后開(kāi)始運(yùn)行,for循環(huán)會(huì)進(jìn)行十次,然后輸出信息,隨后主線程睡眠一段時(shí)間后停止運(yùn)行。在每次run
循環(huán)中,都會(huì)打印當(dāng)前線程的信息,主線程運(yùn)行完畢,程序就執(zhí)行完畢了。因?yàn)閐aemon 是后臺(tái)線程,無(wú)法影響主線程的執(zhí)行。
但是當(dāng)你把 daemon.setDaemon(true) 去掉時(shí),while(true)
會(huì)進(jìn)行無(wú)限循環(huán),那么主線程一直在執(zhí)行最重要的任務(wù),所以會(huì)一直循環(huán)下去無(wú)法停止。
ThreadFactory
按需要?jiǎng)?chuàng)建線程的對(duì)象。使用線程工廠替換了 Thread 或者 Runnable 接口的硬連接,使程序能夠使用特殊的線程子類(lèi),優(yōu)先級(jí)等。一般的創(chuàng)建方式為
class SimpleThreadFactory implements ThreadFactory { public Thread
newThread(Runnable r) { return new Thread(r); } }
Executors.defaultThreadFactory 方法提供了一個(gè)更有用的簡(jiǎn)單實(shí)現(xiàn),它在返回之前將創(chuàng)建的線程上下文設(shè)置為已知值
ThreadFactory 是一個(gè)接口,它只有一個(gè)方法就是創(chuàng)建線程的方法
public interface ThreadFactory { // 構(gòu)建一個(gè)新的線程。實(shí)現(xiàn)類(lèi)可能初始化優(yōu)先級(jí),名稱,后臺(tái)線程狀態(tài)和 線程組等
Thread newThread(Runnable r); }
下面來(lái)看一個(gè) ThreadFactory 的例子
public class DaemonThreadFactory implements ThreadFactory { @Override public
Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true);
return t; } } public class DaemonFromFactory implements Runnable{ @Override
public void run() { while (true){ try { TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this); } catch
(InterruptedException e) { System.out.println("Interrupted"); } } } public
static void main(String[] args) throws InterruptedException { ExecutorService
service = Executors.newCachedThreadPool(new DaemonThreadFactory()); for(int i =
0;i < 10;i++){ service.execute(new DaemonFromFactory()); }
System.out.println("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); } }
Executors.newCachedThreadPool
可以接受一個(gè)線程池對(duì)象,創(chuàng)建一個(gè)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,但會(huì)在它們可用時(shí)重用先前構(gòu)造的線程,并在需要時(shí)使用提供的ThreadFactory創(chuàng)建新線程。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
{ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory); }
加入一個(gè)線程
一個(gè)線程可以在其他線程上調(diào)用 join() 方法,其效果是等待一段時(shí)間直到第二個(gè)線程結(jié)束才正常執(zhí)行。如果某個(gè)線程在另一個(gè)線程 t 上調(diào)用 t.join()
方法,此線程將被掛起,直到目標(biāo)線程 t 結(jié)束才回復(fù)(可以用 t.isAlive() 返回為真假判斷)。
也可以在調(diào)用 join 時(shí)帶上一個(gè)超時(shí)參數(shù),來(lái)設(shè)置到期時(shí)間,時(shí)間到期,join方法自動(dòng)返回。
對(duì) join 的調(diào)用也可以被中斷,做法是在線程上調(diào)用 interrupted 方法,這時(shí)需要用到 try...catch 子句
public class TestJoinMethod extends Thread{ @Override public void run() {
for(int i = 0;i < 5;i++){ try { TimeUnit.MILLISECONDS.sleep(1000); } catch
(InterruptedException e) { System.out.println("Interrupted sleep"); }
System.out.println(Thread.currentThread() + " " + i); } } public static void
main(String[] args) throws InterruptedException { TestJoinMethod join1 = new
TestJoinMethod(); TestJoinMethod join2 = new TestJoinMethod(); TestJoinMethod
join3 = new TestJoinMethod(); join1.start(); // join1.join(); join2.start();
join3.start(); } }
join() 方法等待線程死亡。 換句話說(shuō),它會(huì)導(dǎo)致當(dāng)前運(yùn)行的線程停止執(zhí)行,直到它加入的線程完成其任務(wù)。
線程異常捕獲
由于線程的本質(zhì),使你不能捕獲從線程中逃逸的異常,一旦異常逃出任務(wù)的run 方法,它就會(huì)向外傳播到控制臺(tái),除非你采取特殊的步驟捕獲這種錯(cuò)誤的異常,在
Java5 之前,你可以通過(guò)線程組來(lái)捕獲,但是在 Java5 之后,就需要用 Executor 來(lái)解決問(wèn)題,因?yàn)榫€程組不是一次好的嘗試。
下面的任務(wù)會(huì)在 run 方法的執(zhí)行期間拋出一個(gè)異常,并且這個(gè)異常會(huì)拋到 run 方法的外面,而且 main 方法無(wú)法對(duì)它進(jìn)行捕獲
public class ExceptionThread implements Runnable{ @Override public void run()
{ throw new RuntimeException(); } public static void main(String[] args) { try
{ ExecutorService service = Executors.newCachedThreadPool();
service.execute(new ExceptionThread()); }catch (Exception e){
System.out.println("eeeee"); } } }
為了解決這個(gè)問(wèn)題,我們需要修改 Executor 產(chǎn)生線程的方式,Java5 提供了一個(gè)新的接口
Thread.UncaughtExceptionHandler ,它允許你在每個(gè) Thread 上都附著一個(gè)異常處理器。
Thread.UncaughtExceptionHandler.uncaughtException() 會(huì)在線程因未捕獲臨近死亡時(shí)被調(diào)用。
public class ExceptionThread2 implements Runnable{ @Override public void run()
{ Thread t = Thread.currentThread(); System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler()); // 手動(dòng)拋出異常 throw
new RuntimeException(); } } // 實(shí)現(xiàn)Thread.UncaughtExceptionHandler 接口,創(chuàng)建異常處理器
public class MyUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler{ @Override public void uncaughtException(Thread
t, Throwable e) { System.out.println("caught " + e); } } public class
HandlerThreadFactory implements ThreadFactory { @Override public Thread
newThread(Runnable r) { System.out.println(this + " creating new Thread");
Thread t = new Thread(r); System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("ex = " + t.getUncaughtExceptionHandler()); return t; } }
public class CaptureUncaughtException { public static void main(String[] args)
{ ExecutorService service = Executors.newCachedThreadPool(new
HandlerThreadFactory()); service.execute(new ExceptionThread2()); } }
在程序中添加了額外的追蹤機(jī)制,用來(lái)驗(yàn)證工廠創(chuàng)建的線程會(huì)傳遞給UncaughtExceptionHandler,你可以看到,未捕獲的異常是通過(guò)
uncaughtException 來(lái)捕獲的。
文章來(lái)源:
《Java編程思想》
https://www.javatpoint.com/join()-method
熱門(mén)工具 換一換
