时间:2021-05-20
有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable { /** 队列元素 */ final Object[] items; /** 下一次读取操作的位置, poll, peek or remove */ int takeIndex; /** 下一次写入操作的位置, offer, or add */ int putIndex; /** 元素数量 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. * 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。 */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** 指定大小 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 指定容量大小与指定访问策略 * @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁; */ public ArrayBlockingQueue(int capacity, boolean fair) {} /** * 指定容量大小、指定访问策略与最初包含给定集合中的元素 * @param c 将此集合中的元素在构造方法期间就先添加到队列中 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {}}这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加锁后获取的共享变量都是从主内存中获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新到主内存。
另外这个队列使用循环数组实现,所以在计算下一个元素存放下标时候有些特殊。另外 insert 后调用 notEmpty.signal() ;是为了激活调用 notEmpty.await(); 阻塞后放入 notEmpty 条件队列的线程。
Take 操作和 Put 操作很类似
//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。/*** 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。*/class Itrs { void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) //队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除 queueIsEmpty(); //takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取 else if (takeIndex == 0) takeIndexWrapped(); } /** * 当队列为空的时候做的事情 * 1. 通知所有迭代器队列已经为空 * 2. 清空所有的弱引用,并且将迭代器置空 */ void queueIsEmpty() {} /** * 将takeIndex包装成0 * 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象) * 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。 */ void takeIndexWrapped() {}}代码演示
package com.rumenz.task;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @className: BlockingQuqueExample * @description: TODO 类描述 * @author: mac * @date: 2021/1/20 **/public class BlockingQueueExample { private static volatile Boolean flag=false; public static void main(String[] args) { BlockingQueue blockingQueue=new ArrayBlockingQueue(1024); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try{ blockingQueue.put(1); Thread.sleep(2000); blockingQueue.put(3); flag=true; }catch (Exception e){ e.printStackTrace(); } }); executorService.execute(()->{ try { while (!flag){ Integer i = (Integer) blockingQueue.take(); System.out.println(i); } }catch (Exception e){ e.printStackTrace(); } }); executorService.shutdown(); }}LinkedBlockingQueue
基于链表的阻塞队列,通 ArrayBlockingQueue 类似,其内部也维护这一个数据缓冲队列(该队列由一个链表构成),当生产者往队列放入一个数据时,队列会从生产者手上获取数据,并缓存在队列的内部,而生产者立即返回,只有当队列缓冲区到达最大值容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞队列,直到消费者从队列中消费掉一份数据,生产者会被唤醒,反之对于消费者这端的处理也基于同样的原理。
LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以调高整个队列的并发能力。
如果构造一个 LinkedBlockingQueue 对象,而没有指定容量大小, LinkedBlockingQueue 会默认一个类似无限大小的容量 Integer.MAX_VALUE ,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。
LinkedBlockingQueue 是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable { //队列的容量,指定大小或为默认值Integer.MAX_VALUE private final int capacity; //元素的数量 private final AtomicInteger count = new AtomicInteger(); //队列头节点,始终满足head.item==null transient Node<E> head; //队列的尾节点,始终满足last.next==null private transient Node<E> last; /** Lock held by take, poll, etc */ //出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //入队的锁:put, offer 等写操作的方法需要获取到这个锁 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件 private final Condition notFull = putLock.newCondition(); //传说中的无界队列 public LinkedBlockingQueue() {} //传说中的有界队列 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } //传说中的无界队列 public LinkedBlockingQueue(Collection<? extends E> c){} /** * 链表节点类 */ static class Node<E> { E item; /** * One of: * - 真正的继任者节点 * - 这个节点,意味着继任者是head.next * - 空,意味着没有后继者(这是最后一个节点) */ Node<E> next; Node(E x) { item = x; } }}通过其构造函数,得知其可以当做无界队列也可以当做有界队列来使用。
这里用了两把锁分别是 takeLock 和 putLock ,而 Condition 分别是 notEmpty 和 notFull ,它们是这样搭配的。
takeLock
putLock
从上面的构造函数中可以看到,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也是获取头结点后面的一个元素。count的计数值不包含这个头结点。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。
LinkedBlockingQueue 实现一个线程添加文件对象,四个线程读取文件对象
package concurrent;import java.io.File;import java.io.FileFilter;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("F:\\JavaLib"); // 完成标志 final File exitFile = new File(""); // 读个数 final AtomicInteger rc = new AtomicInteger(); // 写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = "Write" + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); }}到此这篇关于Java高并发BlockingQueue重要实现类的文章就介绍到这了,更多相关Java高并发BlockingQueue实现类内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
在学习Java多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中
java高并发中volatile的实现原理摘要:在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的syn
Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解在java1.5中,提供了一些非常有用的辅助类来帮助我们
在[高并发Java一]前言中已经提到了无锁的概念,由于在jdk源码中有大量的无锁应用,所以在这里介绍下无锁。1无锁类的原理详解1.1CASCAS算法的过程是这样
前言ConcurrentHashMap是Java5中支持高并发、高吞吐量的线程安全HashMap实现。我们知道,ConcurrentHashmap(1.8)这个