Java多线程 BlockingQueue实现生产者消费者模型详解

时间:2021-05-20

BlockingQueue

BlockingQueue、解决了多线程中,如何高效安全“传输”数据的问题。程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程。

方法介绍

BlockingQueue是Queue的子类

void put(E e)

插入指定元素,当BlockingQueue为满,则线程阻塞,进入Waiting状态,直到BlockingQueue有空闲空间再继续。
这里以ArrayBlockingQueue为例进行分析

void take()

队首出队,当BlockingQueue为空,则线程阻塞,进入Waiting状态,直到BlockingQueue不为空再继续。

int drainTo(Collection<? super E> c)

从队列中批量取出数据,并放入到另一个集合中,返回转移数据的数量,只需一次加锁和解锁。

BlockingQueue的实现类

ArrayBlockingQueue

/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;

基于数组实现的BlockingQueue,需要指定队列容量,可以指定是否为公平锁;只有一个ReentrantLock,生产者和消费者不能异步执行。

LinkedBlockingQueue

/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

基于链表实现的BlockingQueue,可以指定队列容量,不指定队列容量默认为Integer.MAX_VALUE;有两个ReentrantLock,生产者和消费者可以异步执行。

BlockingQueue实现生产者消费者模型

缓冲区可以存放大量数据

生产者和消费者速度各不相同

public class MyThread42 { public static void main(String[] args) { final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10); Runnable producerRunnable = new Runnable() { int i = 0; public void run() { while (true) { try { System.out.println("我生产了一个" + i++); bq.put(i + ""); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable customerRunnable = new Runnable() { public void run() { while (true) { try { System.out.println("我消费了一个" + bq.take()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread producerThread = new Thread(producerRunnable); Thread customerThread = new Thread(customerRunnable); producerThread.start(); customerThread.start(); }}

输出结果如下

我生产了一个0我消费了一个1我生产了一个1我生产了一个2我消费了一个2我生产了一个3我生产了一个4我生产了一个5我消费了一个3我生产了一个6我生产了一个7我生产了一个8我消费了一个4我生产了一个9我生产了一个10我生产了一个11我消费了一个5我生产了一个12我生产了一个13我生产了一个14我消费了一个6我生产了一个15我生产了一个16我消费了一个7我生产了一个17我消费了一个8我生产了一个18我消费了一个9我生产了一个19我消费了一个10我生产了一个20我消费了一个11我生产了一个21我消费了一个12我生产了一个22我消费了一个13我生产了一个23我消费了一个14我生产了一个24······

生产者没有生产到BlockingQueue的容量(极限是10)之前,生产3个,消费1个,再生产到BlockingQueue的容量之后,生产一个消费一个,因为不能超过BlockingQueue的容量。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章