jdk源码解析三之LinkedBlockingQueue 红太狼 2023-02-20 12:07 2阅读 0赞 ### 文章目录 ### * * LinkedBlockingQueue * * put * offer * 阻塞时间的offer * take * poll * peek * remove * 迭代器 * 总结 ## LinkedBlockingQueue ## 一个基于链表的阻塞队列。此队列按 FIFO(先进先出)排序元素 public LinkedBlockingQueue() { //默认最大容量 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //维护双端队列 last = head = new Node<E>(null); } ### put ### //put将指定元素插入此队列尾部,将等待可用的空间 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; //创建新节点 Node<E> node = new Node<E>(e); //获取put锁 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //如果当前线程未被中断,则获取锁。 putLock.lockInterruptibly(); try { //达到上限容量,则一直等待 while (count.get() == capacity) { notFull.await(); } //设置值 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) //列是否有可用空间,如果有则唤醒一个等待线程 notFull.signal(); } finally { //释放锁 putLock.unlock(); } // 如果队列中有一条数据,唤醒消费线程进行消费 if (c == 0) signalNotEmpty(); } ### offer ### public boolean offer(E e) { if (e == null) throw new NullPointerException(); //等于最大容量,则返回,而不阻塞 final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //因为不阻塞,所以直接获取锁 putLock.lock(); try { //再次检查容量大小,然后直接添加,随后唤醒一个等待线程 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } // 如果队列中有一条数据,唤醒消费线程进行消费 if (c == 0) signalNotEmpty(); return c >= 0; } ### 阻塞时间的offer ### public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取中断锁 putLock.lockInterruptibly(); try { //等于最大容量,则一直循环 while (count.get() == capacity) { //超过超时时间则返回 if (nanos <= 0) return false; //当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); //通知信号 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果队列中有一条数据,唤醒消费线程进行消费 if (c == 0) signalNotEmpty(); return true; } ### take ### //获取并移除此队列的头部,在元素变得可用之前一直等待 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //中断点 takeLock.lockInterruptibly(); try { //队列为空,阻塞等待 while (count.get() == 0) { notEmpty.await(); } //获取值 x = dequeue(); c = count.getAndDecrement(); // 队列中还有元素,唤醒下一个消费线程进行消费 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 之前队列是满的,则唤醒生产线程进行添加元素 if (c == capacity) signalNotFull(); return x; } private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //head默认root的value是null Node<E> h = head; Node<E> first = h.next; // head节点原来指向的节点的next指向自己,等待下次gc回收 h.next = h; // help GC // head节点指向下一个节点 head = first; //获取新的head的value E x = first.item; //新的head设置null first.item = null; return x; } ### poll ### public E poll() { final AtomicInteger count = this.count; //容量为0,直接返回 if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ### peek ### public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } ### remove ### public boolean remove(Object o) { //为null,直接返回 if (o == null) return false; //put和take锁,就暂时不能新增或修改 fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //匹配到值,则删除 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { //释放2个锁 fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; //在迭代的时候,如果p.next为null,则会造成异常.所以这里没设置null trail.next = p.next; if (last == p) last = trail; // 如果删除之前元素是满的,删除之后就有空间了,唤醒生产线程放入元素 if (count.getAndDecrement() == capacity) notFull.signal(); } ### 迭代器 ### 当执行迭代器的nextNode的时候,如果同时发现有执行take操作,因为当前head.next指向了自己, ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70] private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; //take时,head.next=head,则直接返回当前head的下一个节点 if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } } ### 总结 ### 底层阻塞队列FIFO.内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。 默认容量无界,且底层链表,所以执行插入和删除效率比较高.且2把锁维护新增删除,所以并发有所提高. [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20200526151439409.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70
相关 JUC集合类 LinkedBlockingQueue源码解析 JDK8 文章目录 前言 成员 构造器 入队 add offer put 超时offer 你的名字/ 2023年03月01日 13:40/ 0 赞/ 102 阅读
相关 jdk源码解析八之NIO 文章目录 Buffer ByteBuffer MappedByteBuffer DirectByteBu 素颜马尾好姑娘i/ 2023年02月22日 05:12/ 0 赞/ 161 阅读
相关 jdk源码解析三之ThreadLocal 文章目录 ThreadLocal set 初始化ThreadLocalMap set赋值 柔情只为你懂/ 2023年02月20日 12:08/ 0 赞/ 38 阅读
相关 jdk源码解析三之ArrayBlockingQueue 文章目录 ArrayBlockingQueue put offer take 我会带着你远行/ 2023年02月20日 12:08/ 0 赞/ 21 阅读
相关 jdk源码解析三之LinkedBlockingQueue 文章目录 LinkedBlockingQueue put offer 阻塞时间的offer 红太狼/ 2023年02月20日 12:07/ 0 赞/ 3 阅读
相关 jdk源码解析三之CopyOnWriteArrayList 文章目录 CopyOnWriteArrayList add remove get 末蓝、/ 2023年02月20日 12:07/ 0 赞/ 64 阅读
相关 jdk源码解析三之ConcurrentHashMap 文章目录 ConcurrentHashMap put 初始化 扩容 ge た 入场券/ 2023年02月20日 12:07/ 0 赞/ 76 阅读
相关 jdk源码解析三之JUC并发容器 文章目录 本篇文章主要是对JUC包下,一些并发类的源码分析,如果想了解具体实例,请点击 并发容器 ConcurrentHashMap 淩亂°似流年/ 2023年02月12日 09:26/ 0 赞/ 44 阅读
相关 jdk11源码--LinkedBlockingQueue源码分析 > 更多java源码分析请见:jdk11源码分析系列文章专栏:[Java11源码分析][Java11] 文章目录 欢迎关注本人公众号 概述 构造方法 向右看齐/ 2021年12月19日 00:29/ 0 赞/ 485 阅读
还没有评论,来说两句吧...