一、ThreadPoolExecutor 參數(shù)說明
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory
threadFactory, RejectedExecutionHandler handler)
*
corePoolSize:核心線程池的大小。當(dāng)提交一個(gè)任務(wù)到線程池時(shí),核心線程池會(huì)創(chuàng)建一個(gè)核心線程來執(zhí)行任務(wù),即使其他核心線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于核心線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的
prestartAllCoreThreads() 方法,核心線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。
* workQueue:任務(wù)隊(duì)列。當(dāng)核心線程池中沒有線程時(shí),所提交的任務(wù)會(huì)被暫存在隊(duì)列中。Java 提供了多種阻塞隊(duì)列
<https://www.cnblogs.com/jmcui/p/11442616.html>。
*
maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列也滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的空閑線程執(zhí)行任務(wù)。值得注意的是,如果使用了無界的任務(wù)隊(duì)列則這個(gè)參數(shù)不起作用。
* keepAliveTime:當(dāng)線程池中的線程數(shù)大于 corePoolSize 時(shí),keepAliveTime
為多余的空閑線程等待新任務(wù)的最長時(shí)間,超過這個(gè)時(shí)間后多余的線程將被終止。所以,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大時(shí)間,提高線程的利用率。值得注意的是,如果使用了無界的任務(wù)隊(duì)列則這個(gè)參數(shù)不起作用。
* TimeUnit:線程活動(dòng)保持時(shí)間的單位。
*
threadFactory:創(chuàng)建線程的工廠??梢酝ㄟ^線程工廠給每個(gè)創(chuàng)建出來的線程設(shè)置符合業(yè)務(wù)的名字。
// 依賴 guava new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
*
handler:飽和策略。當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。Java 提供了以下4種策略:
* AbortPolicy:默認(rèn)。直接拋出異常。
* CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
* DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
* DiscardPolicy:不處理,丟棄掉。
tips: 一般我們稱核心線程池中的線程為核心線程,這部分線程不會(huì)被回收;超過任務(wù)隊(duì)列后,創(chuàng)建的線程為空閑線程,這部分線程會(huì)被回收(回收時(shí)間即
keepAliveTime)
二、常見的 ThreadPoolExecutor 介紹
Executors 是創(chuàng)建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。
Java 提供了多種類型的 ThreadPoolExecutor,比較常見的有
FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new
ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>()); }
FixedThreadPool 被稱為可重用固定線程數(shù)的線程池??梢钥吹?corePoolSize 和 maximumPoolSize 都被設(shè)置成了
nThreads;keepAliveTime設(shè)置為0L,意味著多余的空閑線程會(huì)被立即終止;使用了阻塞隊(duì)列 LinkedBlockingQueue
作為線程的工作隊(duì)列(隊(duì)列的容量為 Integer.MAX_VALUE)。
FixedThreadPool 所存在的問題是,由于隊(duì)列的容量為 Integer.MAX_VALUE,基本可以認(rèn)為是無界的,所以
maximumPoolSize 和 keepAliveTime 參數(shù)都不會(huì)生效,飽和拒絕策略也不會(huì)執(zhí)行,會(huì)造成任務(wù)大量堆積在阻塞隊(duì)列中。
FixedThreadPool 適用于為了滿足資源管理的需求,而需要限制線程數(shù)量的應(yīng)用場景。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new
FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor 是使用單個(gè)線程的線程池??梢钥吹?corePoolSize 和 maximumPoolSize
被設(shè)置為1,其他參數(shù)與 FixedThreadPool 相同,所以所帶來的風(fēng)險(xiǎn)也和 FixedThreadPool 一致,就不贅述了。
SingleThreadExecutor 適用于需要保證順序的執(zhí)行各個(gè)任務(wù)。
CachedThreadPool
public static ExecutorService newCachedThreadPool() { return new
ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>()); }
CachedThreadPool 是一個(gè)會(huì)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池??梢钥吹?corePoolSize 被設(shè)置為
0,所以創(chuàng)建的線程都為空閑線程;maximumPoolSize 被設(shè)置為
Integer.MAX_VALUE(基本可認(rèn)為無界),意味著可以創(chuàng)建無限數(shù)量的空閑線程;keepAliveTime
設(shè)置為60L,意味著空閑線程等待新任務(wù)的最長時(shí)間為60秒;使用沒有容量的 SynchronousQueue 作為線程池的工作隊(duì)列。
CachedThreadPool 所存在的問題是, 如果主線程提交任務(wù)的速度高于maximumPool
中線程處理任務(wù)的速度時(shí),CachedThreadPool
會(huì)不斷創(chuàng)建新線程。極端情況下,CachedThreadPool會(huì)因?yàn)閯?chuàng)建過多線程而耗盡CPU和內(nèi)存資源。
CachedThreadPool 適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負(fù)載較輕的服務(wù)器。
三、自建 ThreadPoolExecutor 線程池
鑒于上面提到的風(fēng)險(xiǎn),我們更提倡使用 ThreadPoolExecutor 去創(chuàng)建線程池,而不用 Executors 工廠去創(chuàng)建。
以下是一個(gè) ThreadPoolExecutor 創(chuàng)建線程池的 Demo 實(shí)例:
public class Pool { static ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("pool-task-%d").build(); static
ExecutorService executor = new
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 200, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), threadFactory, new
ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args)
throws ExecutionException, InterruptedException { // 1. 無返回值的任務(wù)執(zhí)行 -> Runnable
executor.execute(() -> System.out.println("Hello World")); // 2. 有返回值的任務(wù)執(zhí)行 ->
Callable Future<String> future = executor.submit(() -> "Hello World"); // get
方法會(huì)阻塞線程執(zhí)行等待返回結(jié)果 String result = future.get(); System.out.println(result); // 3.
監(jiān)控線程池 monitor(); // 4. 關(guān)閉線程池 shutdownAndAwaitTermination(); monitor(); }
private static void monitor() { ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor) Pool.executor;
System.out.println("【線程池任務(wù)】線程池中曾經(jīng)創(chuàng)建過的最大線程數(shù):" +
threadPoolExecutor.getLargestPoolSize()); System.out.println("【線程池任務(wù)】線程池中線程數(shù):"
+ threadPoolExecutor.getPoolSize()); System.out.println("【線程池任務(wù)】線程池中活動(dòng)的線程數(shù):" +
threadPoolExecutor.getActiveCount()); System.out.println("【線程池任務(wù)】隊(duì)列中等待執(zhí)行的任務(wù)數(shù):"
+ threadPoolExecutor.getQueue().size());
System.out.println("【線程池任務(wù)】線程池已執(zhí)行完任務(wù)數(shù):" +
threadPoolExecutor.getCompletedTaskCount()); } /** * 關(guān)閉線程池 * 1.
shutdown、shutdownNow 的原理都是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的 interrupt 方法來中斷線程。 * 2.
shutdownNow:將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。 * 3.
shutdown:將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。 */ private static void
shutdownAndAwaitTermination() { // 禁止提交新任務(wù) executor.shutdown(); try { //
等待現(xiàn)有任務(wù)終止 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 取消當(dāng)前正在執(zhí)行的任務(wù)
executor.shutdownNow(); // 等待一段時(shí)間讓任務(wù)響應(yīng)被取消 if (!executor.awaitTermination(60,
TimeUnit.SECONDS)) { System.err.println("Pool did not terminate"); } } } catch
(InterruptedException ie) { // 如果當(dāng)前線程也中斷,則取消 executor.shutdownNow(); // 保留中斷狀態(tài)
Thread.currentThread().interrupt(); } } }
創(chuàng)建線程池需要注意以下幾點(diǎn):
* CPU 密集型任務(wù)應(yīng)配置盡可能小的線程,如配置 Ncpu+1 個(gè)線程。
* IO 密集型任務(wù)(數(shù)據(jù)庫讀寫等)應(yīng)配置盡可能多的線程,如配置 Ncpu*2 個(gè)線程。
* 優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列 PriorityBlockingQueue 來處理。
* 建議使用有界隊(duì)列??梢员苊鈩?chuàng)建數(shù)量非常多的線程,甚至拖垮系統(tǒng)。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn)兒,比如幾千。
四、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory
threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new
DelayedWorkQueue(), threadFactory); }
ScheduledThreadPoolExecutor 的功能與 Timer
<https://www.cnblogs.com/jmcui/p/7519759.html> 類似,但功能更強(qiáng)大、更靈活。Timer
對(duì)應(yīng)的是單個(gè)后臺(tái)線程,而ScheduledThreadPoolExecutor 可以在構(gòu)造函數(shù)中指定多個(gè)對(duì)應(yīng)的后臺(tái)線程數(shù)。
Java 提供了多種類型的 ScheduledThreadPoolExecutor ,可以通過 Executors 創(chuàng)建,比較常見的有
ScheduledThreadPool、SingleThreadScheduledExecutor
等。適用于需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù),同時(shí)為了滿足資源管理的需求而需要限制后臺(tái)線程數(shù)量的應(yīng)用場景。
public class ScheduleTaskTest { static ThreadFactory threadFactory = new
BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build(); static
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5, threadFactory); public static void
main(String[] args) throws ExecutionException, InterruptedException { // 1. 延遲
3 秒后執(zhí)行 Runnable 方法 scheduledExecutorService.schedule(() ->
System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS); // 2. 延遲 3
秒后執(zhí)行 Callable 方法 ScheduledFuture<String> scheduledFuture =
scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000,
TimeUnit.MILLISECONDS); System.out.println(scheduledFuture.get()); // 3. 延遲 1
秒后開始每隔 3 秒周期執(zhí)行。 // 如果中間任務(wù)遇到異常,則禁止后續(xù)執(zhí)行。 // 固定的頻率來執(zhí)行某項(xiàng)任務(wù),它不受任務(wù)執(zhí)行時(shí)間的影響。到時(shí)間,就執(zhí)行。
scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello
ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS); // 4. 延遲 1 秒后,每個(gè)任務(wù)結(jié)束延遲
3 秒后再執(zhí)行下個(gè)任務(wù)。 // 如果中間任務(wù)遇到異常,則禁止后續(xù)執(zhí)行。 // 受任務(wù)執(zhí)行時(shí)間的影響,等待任務(wù)執(zhí)行結(jié)束后才開始計(jì)算延遲。
scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello
ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS); } }
ScheduledThreadPoolExecutor 的執(zhí)行步驟大抵如下:
* 當(dāng)調(diào)用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者
scheduleWithFixedDelay()方 法時(shí),會(huì)向 DelayedWorkQueue 隊(duì)列添加 ScheduledFutureTask 任務(wù)。
* 線程池中的線程從 DelayedWorkQueue隊(duì)列中獲取執(zhí)行時(shí)間已到達(dá)的 ScheduledFutureTask,然后執(zhí)行任務(wù)。
* 線程修改 ScheduledFutureTask 任務(wù)的執(zhí)行時(shí)間為下次將要被執(zhí)行的時(shí)間。
* 線程把修改后的 ScheduledFutureTask 重新放回隊(duì)列。
熱門工具 換一換