<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      介紹


      阻塞隊列(BlockingQueue)是指當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿;當(dāng)隊列空時,隊列會阻塞獲得元素的線程,直到隊列變非空。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器。

      當(dāng)線程 插入/獲取 動作由于隊列 滿/空 阻塞后,隊列也提供了一些機制去處理,或拋出異常,或返回特殊值,或者線程一直等待...

      方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
      插入方法 add(e) offer(e) put(e) offer(e, timeout, unit)
      移除方法 remove(o) poll() take() poll(timeout, unit)
      檢查方法 element() peek() — 不移除元素 不可用 不可用
      tips: 如果是無界阻塞隊列,則 put 方法永遠(yuǎn)不會被阻塞;offer 方法始終返回 true。

      Java 中的阻塞隊列:



      ArrayBlockingQueue

      ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列。此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序,默認(rèn)情況下不保證線程公平的訪問。

      通過可重入的獨占鎖 ReentrantLock 來控制并發(fā),Condition 來實現(xiàn)阻塞。
      public class ArrayBlockingQueueTest { /** * 1. 由于是有界阻塞隊列,需要設(shè)置初始大小 * 2.
      默認(rèn)不保證阻塞線程的公平訪問,可設(shè)置公平性 */ private static ArrayBlockingQueue<String> QUEUE = new
      ArrayBlockingQueue<>(2, true); public static void main(String[] args) throws
      InterruptedException { Thread put = new Thread(() -> { // 3. 嘗試插入元素 try {
      QUEUE.put("java"); QUEUE.put("javaScript"); // 4. 元素已滿,會阻塞線程 QUEUE.put("c++");
      } catch (InterruptedException e) { e.printStackTrace(); } }); put.start();
      Thread take = new Thread(() -> { try { // 5. 獲取一個元素
      System.out.println(QUEUE.take()); } catch (InterruptedException e) {
      e.printStackTrace(); } }); take.start(); // 6 javaScript、c++
      System.out.println(QUEUE.take()); System.out.println(QUEUE.take()); } }
      LinkedBlockingQueue

      LinkedBlockingQueue 是一個用單向鏈表實現(xiàn)的有界阻塞隊列。此隊列的默認(rèn)最大長度為
      Integer.MAX_VALUE。此隊列按照先進(jìn)先出的原則對元素進(jìn)行排序,吞吐量通常要高于ArrayBlockingQueue。Executors.newFixedThreadPool()
      就使用了這個隊列。

      和 ArrayBlockingQueue 一樣,采用 ReentrantLock 來控制并發(fā),不同的是它使用了兩個獨占鎖來控制消費和生產(chǎn),通過
      takeLock 和 putLock
      兩個鎖來控制生產(chǎn)和消費,互不干擾,只要隊列未滿,生產(chǎn)線程可以一直生產(chǎn);只要隊列不空,消費線程可以一直消費,不會相互因為獨占鎖而阻塞。

      tips:因為使用了雙鎖,避免并發(fā)計算不準(zhǔn)確,使用了一個 AtomicInteger 變量統(tǒng)計元素總量。

      LinkedBlockingDeque

      LinkedBlockingDeque
      是一個由雙向鏈表結(jié)構(gòu)組成的有界阻塞隊列,可以從隊列的兩端插入和移出元素。它實現(xiàn)了BlockingDeque接口,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以
      First 單詞結(jié)尾的方法,表示插入、獲取或移除雙端隊列的第一個元素。以 Last 單詞結(jié)尾的方法,表示插入、獲取或移除雙端隊列的最后一個元素。

      LinkedBlockingDeque 的 Node 實現(xiàn)多了指向前一個節(jié)點的變量 prev,以此實現(xiàn)雙向隊列。并發(fā)控制上和
      ArrayBlockingQueue 類似,采用單個 ReentrantLock 來控制并發(fā)。因為雙端隊列頭尾都可以消費和生產(chǎn),所以使用了一個共享鎖。

      雙向阻塞隊列可以運用在“工作竊取”模式中。
      public class LinkedBlockingDequeTest { private static
      LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2); public static
      void main(String[] args) { DEQUE.addFirst("java"); DEQUE.addFirst("c++"); //
      java System.out.println(DEQUE.peekLast()); // java
      System.out.println(DEQUE.pollLast()); DEQUE.addLast("php"); // c++
      System.out.println(DEQUE.pollFirst()); } }
      tips: take() 方法調(diào)用的是 takeFirst(),使用時候需注意。

      PriorityBlockingQueue

      PriorityBlockingQueue
      是一個底層由數(shù)組實現(xiàn)的無界阻塞隊列,并帶有排序功能。由于是無界隊列,所以插入永遠(yuǎn)不會被阻塞。默認(rèn)情況下元素采取自然順序升序排列。也可以自定義類實現(xiàn)
      compareTo()方法來指定元素排序規(guī)則,或者初始化 PriorityBlockingQueue 時,指定構(gòu)造參數(shù) Comparator 來對元素進(jìn)行排序。

      底層同樣采用 ReentrantLock 來控制并發(fā),由于只有獲取會阻塞,所以只采用一個Condition(只通知消費)來實現(xiàn)。
      public class PriorityBlockingQueueTest { private static
      PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>(); public
      static void main(String[] args) { QUEUE.add("java"); QUEUE.add("javaScript");
      QUEUE.add("c++"); QUEUE.add("python"); QUEUE.add("php"); Iterator<String> it =
      QUEUE.iterator(); while (it.hasNext()) { // c++ javaScript java python php //
      同優(yōu)先級不保證排序順序 System.out.print(it.next() + " "); } } }
      DelayQueue

      DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現(xiàn)。隊列中的元素必須實現(xiàn) Delayed
      接口(Delayed 接口的設(shè)計可以參考 ScheduledFutureTask
      類),元素按延遲優(yōu)先級排序,延遲時間短的排在前面,只有在延遲期滿時才能從隊列中提取元素。

      DelayQueue 中的 PriorityQueue 會對隊列中的任務(wù)進(jìn)行排序。排序時,time 小的排在前面(時間早的任務(wù)將被先執(zhí)行)。如果兩個任務(wù)的
      time 相同,就比較 sequenceNumber,sequenceNumber
      小的排在前面(也就是說,如果兩個任務(wù)的執(zhí)行時間相同,那么先提交的任務(wù)將被先執(zhí)行)。

      和 PriorityBlockingQueue 相似,底層也是數(shù)組,采用一個 ReentrantLock 來控制并發(fā)。

      應(yīng)用場景:

      * 緩存系統(tǒng)的設(shè)計:可以用 DelayQueue 保存緩存元素的有效期,使用一個線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue
      中獲取元素時,表示緩存有效期到了。
      * 定時任務(wù)調(diào)度:使用 DelayQueue 保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間,一旦從 DelayQueue 中獲取到任務(wù)就開始執(zhí)行,比如
      TimerQueue 就是使用 DelayQueue 實現(xiàn)的。 public class DelayElement implements Delayed,
      Runnable { private static final AtomicLong SEQUENCER = new AtomicLong(); /** *
      標(biāo)識元素先后順序 */ private final long sequenceNumber; /** * 延遲時間,單位納秒 */ private long
      time; public DelayElement(long time) { this.time = System.nanoTime() + time;
      this.sequenceNumber = SEQUENCER.getAndIncrement(); } @Override public long
      getDelay(TimeUnit unit) { return unit.convert(time - System.nanoTime(),
      NANOSECONDS); } @Override public int compareTo(Delayed other) { // compare zero
      if same object if (other == this) { return 0; } if (other instanceof
      DelayElement) { DelayElement x = (DelayElement) other; long diff = time -
      x.time; if (diff < 0) { return -1; } else if (diff > 0) { return 1; } else if
      (sequenceNumber < x.sequenceNumber) { return -1; } else { return 1; } } long
      diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ?
      -1 : (diff > 0) ? 1 : 0; } @Override public void run() {
      System.out.println("sequenceNumber" + sequenceNumber); } @Override public
      String toString() { return "DelayElement{" + "sequenceNumber=" + sequenceNumber
      + ", time=" + time + '}'; } } public class DelayQueueTest { private static
      DelayQueue<DelayElement> QUEUE = new DelayQueue<>(); public static void
      main(String[] args) { // 1. 添加 10 個參數(shù) for (int i = 1; i < 10; i++) { // 2. 5
      秒內(nèi)隨機延遲 int nextInt = new Random().nextInt(5); long convert =
      TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS); QUEUE.offer(new
      DelayElement(convert)); } // 3. 查詢元素排序 —— 延遲短的排在前面 Iterator<DelayElement>
      iterator = QUEUE.iterator(); while (iterator.hasNext()) {
      System.out.println(iterator.next()); } // 4. 可觀察到元素延遲輸出 while
      (!QUEUE.isEmpty()) { Thread thread = new Thread(QUEUE.poll()); thread.start();
      } } }
      LinkedTransferQueue

      LinkedTransferQueue是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列。

      并發(fā)控制上采用了大量的 CAS 操作,沒有使用鎖。

      相對于其他阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

      * transfer : Transfers the element to a consumer, waiting if necessary to do
      so. 存入的元素必須等到有消費者消費才返回。
      * tryTransfer:Transfers the element to a waiting consumer immediately, if
      possible. 如果有消費者正在等待消費元素,則把傳入的元素傳給消費者。否則立即返回 false,不用等到消費。
      SynchronousQueue

      SynchronousQueue 是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則繼續(xù) put
      操作會被阻塞。Executors.newCachedThreadPool 就使用了這個隊列。

      SynchronousQueue 默認(rèn)情況下線程采用非公平性策略訪問隊列,未使用鎖,全部通過 CAS 操作來實現(xiàn)并發(fā),吞吐量非常高,高于
      LinkedBlockingQueue 和
      ArrayBlockingQueue,非常適合用來處理一些高效的傳遞性場景。Executors.newCachedThreadPool() 就使用了
      SynchronousQueue 進(jìn)行任務(wù)傳遞。
      public class SynchronousQueueTest { private static class
      SynchronousQueueProducer implements Runnable { private BlockingQueue<String>
      blockingQueue; private SynchronousQueueProducer(BlockingQueue<String> queue) {
      this.blockingQueue = queue; } @Override public void run() { while (true) { try
      { String data = UUID.randomUUID().toString();
      System.out.println(Thread.currentThread().getName() + " Put: " + data);
      blockingQueue.put(data); Thread.sleep(1000); } catch (InterruptedException e) {
      e.printStackTrace(); } } } } private static class SynchronousQueueConsumer
      implements Runnable { private BlockingQueue<String> blockingQueue; private
      SynchronousQueueConsumer(BlockingQueue<String> queue) { this.blockingQueue =
      queue; } @Override public void run() { while (true) { try {
      System.out.println(Thread.currentThread().getName() + " take(): " +
      blockingQueue.take()); Thread.sleep(2000); } catch (InterruptedException e) {
      e.printStackTrace(); } } } } public static void main(String[] args) { final
      BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
      SynchronousQueueProducer queueProducer = new
      SynchronousQueueProducer(synchronousQueue); new Thread(queueProducer, "producer
      - 1").start(); SynchronousQueueConsumer queueConsumer1 = new
      SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer1,
      "consumer — 1").start(); SynchronousQueueConsumer queueConsumer2 = new
      SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer2,
      "consumer — 2").start(); } }
      ?
      ?

      * 參考書籍:《Java 并發(fā)編程的藝術(shù)》
      * 參考博文:https://www.cnblogs.com/konck/p/9473677.html
      <https://www.cnblogs.com/konck/p/9473677.html>

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點信息
      問題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          成人美女黄网站色大免费的 | 办公室做好硬好紧我要视频 | ass毛茸茸粉嫩小美女ass | 成人无码天堂 | 人人做人人摸 | 久久久久无码精品国产情侣 | 黄片网站免费大全入口 | 久久久人 | 操碰在线观看视频 | 无码国产精品一区二区免费式直播 |