`
cfyme
  • 浏览: 275131 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

linkedBlokingQueue源码学习

 
阅读更多

ReentrantLock是一个较为常用的锁对象 可重入的互斥锁定 与synchronized 相同的一些基本行为和语义,但功能更强大

 

Condition类与lock绑定,用newCondition()方法创建,提供了线程之间通信的方式(类似信号量)。

其使用基本与object类的wait,notify,notifyAll相同。

 

1,用condition.await()替换Object,wait(),调用时该线程阻塞,释放该线程的锁。

2,用condition.signal()替换Object.notify(),用condition.signalAll()替换Object.notifyAll(),唤醒该condition await方法所阻塞的线程

 

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
      //链表节点node类结构   
      static class Node<E> {  
          volatile E item;//volatile,保证了数据的可见性   
          Node<E> next;  
          Node(E x) { item = x; }  
      }  
      //容量
      private final int capacity;  
      //用原子变量,当前元素个数  
      private final AtomicInteger count = new AtomicInteger(0);  
      //头节点
      private transient Node<E> head;  
      //表尾节点 
      private transient Node<E> last;  
      //获取元素或删除元素时,要加的takeLock锁  
      private final ReentrantLock takeLock = new ReentrantLock();  
      //获取元素时若队列为空,线程阻塞,直至notEmpty条件满足(被通知) 
      private final Condition notEmpty = takeLock.newCondition();  
      //插入元素时 要加putLock锁  
      private final ReentrantLock putLock = new ReentrantLock();  
      //插入时,若队列已满,线程阻塞,直至notFull条件满足(被通知)
      private final Condition notFull = putLock.newCondition();  
      // 唤醒等待的take操作,插入数据时若插入前链表中无数据,则调用,表示链表不再为空
      private void signalNotEmpty() {  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              notEmpty.signal();  
          } finally {  
              takeLock.unlock();  
          }  
      }  
      //唤醒等待插入操作,移除数据时若链表原先已满则调用,表示链表不再满 
      private void signalNotFull() {  
          final ReentrantLock putLock = this.putLock;  
          putLock.lock();  
          try {  
              notFull.signal();  
          } finally {  
              putLock.unlock();  
          }  
      }  
      // 插入到链表尾部 
      private void insert(E x) {  
          last = last.next = new Node<E>(x);  
      }  
      //获取并移除头元素 
      private E extract() {  
          Node<E> first = head.next;  
          head = first;  
          E x = first.item;  
          first.item = null;  
          return x;  
      }  
      //锁住两把锁,在remove,clear等方法中调用   
      private void fullyLock() {  
          putLock.lock();  
          takeLock.lock();  
      }  
      //和fullyLock成对使用 
      private void fullyUnlock() {  
          takeLock.unlock();  
          putLock.unlock();  
      }  
      //默认构造,容量为 Integer.MAX_VALUE  
 
      public LinkedBlockingQueue() {  
          this(Integer.MAX_VALUE);  
      }  
      //指定容量的构造   
      public LinkedBlockingQueue(int capacity) {  
          if (capacity <= 0) throw new IllegalArgumentException();  
          this.capacity = capacity;  
          last = head = new Node<E>(null);  
      }  
      //指定初始化集合的构造   
      public LinkedBlockingQueue(Collection<? extends E> c) {  
          this(Integer.MAX_VALUE);  
          for (E e : c)  
              add(e);  
      }  
      //获得大小 
        
      public int size() {  
          return count.get();  
      }  
      //剩余容量  
      public int remainingCapacity() {  
          return capacity - count.get();  
      }  
      // 将指定元素插入到此队列的尾部,如已满,阻塞至队列中有元素被移除 
      public void put(E e) throws InterruptedException {  
          if (e == null) throw new NullPointerException();  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          final AtomicInteger count = this.count;
   //加put锁,多个线程不能同时进入  
          putLock.lockInterruptibly();  
          try {  
              try {  
    //容量已满,则一直阻塞
                  while (count.get() == capacity)  
                      notFull.await();  
              } catch (InterruptedException ie) {  
                  notFull.signal(); // propagate to a non-interrupted thread  
                  throw ie;  
              }  
//插入
              insert(e);  
              c = count.getAndIncrement();
//通知链表未满  
              if (c + 1 < capacity)  
                  notFull.signal();  
          } finally {  
//解锁,注意必须在finally里调用,反正各种异常导致没有unlock使线程死锁
              putLock.unlock();  
          }  
    //通知链表非空
          if (c == 0)  
              signalNotEmpty();  
      }  
      // 将指定元素插入到此队列的尾部,如有必要,则等待一定时间以使空间变得可用。 
       
      public boolean offer(E e, long timeout, TimeUnit unit)  
          throws InterruptedException {  
          if (e == null) throw new NullPointerException();  
          long nanos = unit.toNanos(timeout);  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          final AtomicInteger count = this.count;  
   //加锁
          putLock.lockInterruptibly();  
          try {  
              for (;;) {
                  //未满可插入  
                  if (count.get() < capacity) {  
                      insert(e);  
                      c = count.getAndIncrement();
    //通知未满  
                      if (c + 1 < capacity)  
                          notFull.signal();
    //跳出循环  
                      break;  
                  }  
   //队列已满,未能插入,等待时间是负的,直接返回
                  if (nanos <= 0)  
                      return false;  
                  try {  
    //等待一定时间后再次尝试
                      nanos = notFull.awaitNanos(nanos);  
                  } catch (InterruptedException ie) {  
                      notFull.signal(); // propagate to a non-interrupted thread  
                      throw ie;  
                  }  
              }  
          } finally {  
//解锁
              putLock.unlock();  
          }  
//通知已插入数据,链表非空
          if (c == 0)  
              signalNotEmpty();  
          return true;  
      }  
      //将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量), 
       在成功时返回 true,如果此队列已满,则返回 false。 
        
      public boolean offer(E e) {  
          if (e == null) throw new NullPointerException();  
          final AtomicInteger count = this.count;  
          if (count.get() == capacity)  
              return false;  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          putLock.lock();  
          try {  
//由于可能在lock被阻塞时其他线程进行了插入操作,需再次判断count
              if (count.get() < capacity) {  
                  insert(e);  
                  c = count.getAndIncrement();
    //通知未满  
                  if (c + 1 < capacity)  
                      notFull.signal();  
              }  
          } finally {  
              putLock.unlock();  
          }  
  //通知非空
          if (c == 0)  
              signalNotEmpty();
          // >0表示已成功插入  
          return c >= 0;  
      }  
      //获取并移除此队列的头部,若队列为空,则阻塞。  
      public E take() throws InterruptedException {  
          E x;  
          int c = -1;  
          final AtomicInteger count = this.count;  
          final ReentrantLock takeLock = this.takeLock;
   //加锁 
          takeLock.lockInterruptibly();  
          try {  
              try {
   //队列为空时阻塞 
                  while (count.get() == 0)  
                      notEmpty.await();  
              } catch (InterruptedException ie) {  
                  notEmpty.signal(); // propagate to a non-interrupted thread  
                  throw ie;  
              }  
//获取数据
              x = extract();  
              c = count.getAndDecrement();
//通知非空  
              if (c > 1)  
                  notEmpty.signal();  
          } finally {  
              takeLock.unlock();  
          }  
    //通知未满
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
        
      //与offer方法结构基本一致,若队列为空,则阻塞一段时间,一段时间后仍为空,则返回null
      public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
          E x = null;  
          int c = -1;  
          long nanos = unit.toNanos(timeout);  
          final AtomicInteger count = this.count;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lockInterruptibly();  
          try {  
              for (;;) {  
                  if (count.get() > 0) {  
                      x = extract();  
                      c = count.getAndDecrement();  
                      if (c > 1)  
                          notEmpty.signal();  
                      break;  
                  }  
                  if (nanos <= 0)  
                      return null;  
                  try {  
                      nanos = notEmpty.awaitNanos(nanos);  
                  } catch (InterruptedException ie) {  
                      notEmpty.signal(); // propagate to a non-interrupted thread  
                      throw ie;  
                  }  
              }  
          } finally {  
              takeLock.unlock();  
          }  
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
        
      ////与offer方法结构基本一致 队列为空,不阻塞,直接返回null
      public E poll() {  
          final AtomicInteger count = this.count;  
          if (count.get() == 0)  
              return null;  
          E x = null;  
          int c = -1;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              if (count.get() > 0) {  
                  x = extract();  
                  c = count.getAndDecrement();  
                  if (c > 1)  
                      notEmpty.signal();  
              }  
          } finally {  
              takeLock.unlock();  
          }  
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
      //获取但不移除此队列的头;如果此队列为空,则返回 null。  
      public E peek() {  
          if (count.get() == 0)  
              return null;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              Node<E> first = head.next;  
              if (first == null)  
                  return null;  
              else  
                  return first.item;  
          } finally {  
              takeLock.unlock();  
          }  
      }  
      /** 
       * 从此队列移除指定元素的单个实例(如果存在)。 
       */  
      public boolean remove(Object o) {  
          if (o == null) return false;  
          boolean removed = false;
   //同时加锁,此时其他线程不能插入,不能移除
          fullyLock();  
          try {  
              Node<E> trail = head;  
              Node<E> p = head.next;
//遍历,获取到该元素  
              while (p != null) {  
                  if (o.equals(p.item)) {  
                      removed = true;  
                      break;  
                  }  
                  trail = p;  
                  p = p.next;  
              }  
//删除该元素
              if (removed) {  
                  p.item = null;  
                  trail.next = p.next;  
                  if (last == p)  
                      last = trail;  
                  if (count.getAndDecrement() == capacity)  
                      notFull.signalAll();  
              }  
          } finally {  
              fullyUnlock();  
          }  
          return removed;  
      }  
      ……  
  }

 

分享到:
评论

相关推荐

    知攻善防-应急响应靶机-web2.z18

    知攻善防-应急响应靶机-web2.z18

    知攻善防-应急响应靶机-web2.z09

    知攻善防-应急响应靶机-web2.z09

    白色简洁风格的影视众筹平台整站网站源码下载.zip

    白色简洁风格的影视众筹平台整站网站源码下载.zip

    HTTP请求流程深入解析与性能优化技术指南

    内容概要:本文详细解析了HTTP请求的整个流程,包括用户请求发起、请求报文构建、服务器处理请求、响应报文生成、网络传输响应和浏览器接收响应六个阶段。每个阶段的内容均涵盖了关键步骤和技术细节,如DNS解析、TCP连接、缓存策略、HTTP/2性能提升、HTTPS加密等。通过这些内容,读者可以全面理解HTTP请求的完整流程。 适合人群:具备一定网络基础知识的前端、后端开发人员及IT运维人员。 使用场景及目标:适用于希望深入了解HTTP协议及其优化技术的技术人员,有助于提升系统的性能和安全性,优化用户体验。 阅读建议:本文内容详尽且涉及多个关键技术点,建议读者结合实际案例进行学习,逐步理解和掌握各个阶段的技术细节和优化方法。

    白色简洁风格的电话通讯公司模板下载.zip

    白色简洁风格的电话通讯公司模板下载.zip

    白色简洁风格的日历当日事件提醒整站网站源码下载.zip

    白色简洁风格的日历当日事件提醒整站网站源码下载.zip

    RX8 专业消人声 乐器 软件

    一键制作 歌曲伴奏! 可以消人声 吉他 鼓 等 多轨道声音。相当好用。

    知攻善防-应急响应靶机-web2.z04

    知攻善防-应急响应靶机-web2.z04

    NSDocumentError如何解决.md

    NSDocumentError如何解决.md

    白色宽屏风格的大气冲浪运动整站网站模板.rar

    白色宽屏风格的大气冲浪运动整站网站模板.rar

    白色简洁风格的婴儿用品商城网站模板.zip

    白色简洁风格的婴儿用品商城网站模板.zip

    罗兰贝格2023未来营养趋势报告21页

    罗兰贝格2023未来营养趋势报告21页

    html+css 圣诞树代码html

    预览地址:https://blog.csdn.net/qq_42431718/article/details/144749829 html+css 圣诞树代码html

    小学生出题软件v6.3.3.zip

    1-100加减乘除出题生成器

    白色简洁风格的网络实验室CSS模板.zip

    白色简洁风格的网络实验室CSS模板.zip

    白色简洁风格的企业产品展示整站网站源码下载.zip

    白色简洁风格的企业产品展示整站网站源码下载.zip

    etcd服务器性能指标与状态监控数据

    内容概要:《etcd-metrics-latest.txt》文档记录了 etcd(一个分布式键值存储系统)的多个指标数据,包括但不限于集群版本、认证修订版、后端磁盘操作延时分布、租赁管理、键值操作统计、快照保存、网络通信、Go 运行时指标、gRPC 请求处理、操作系统资源使用以及进程资源使用等。这些指标提供了详细的性能监测数据,帮助运维人员和开发人员理解和优化 etcd 集群的运行状态。 适合人群:具有基础计算机科学知识的运维人员或开发人员,尤其是负责维护或开发基于 etcd 技术系统的专业人员。 使用场景及目标:主要用于监控 etcd 集群的健康状况,评估性能瓶颈,辅助故障排查,支持集群的持续优化和技术决策。 其他说明:文档中大量使用了指标和术语,建议读者对 etcd、Go 语言、gRPC 和操作系统基础知识有一定的了解,以便更好地解读文档中的数据。对于不熟悉这些技术的读者来说,可能需要额外查阅相关资料来辅助理解。

    (1866400)java编的计算器程序

    Java编写的计算器程序是一种基于Java编程语言实现的计算工具,常用于教学或个人项目中,以帮助用户执行基本的数学运算。在这个简单的计算器程序中,我们可能会遇到以下几个关键的Java知识点: 1. **基础语法与控制结构**:Java的基础语法包括变量声明、数据类型(如int、double等)、条件语句(if-else)和循环语句(for, while)。在计算器程序中,这些元素用于读取用户输入、判断操作类型以及重复执行某些计算过程。 2. **面向对象编程**:Java是一种面向对象的语言,因此计算器程序可能包含多个类,如Calculator类、Button类(模拟图形界面的按钮)和Display类(显示计算结果)。类之间可能存在继承关系,例如Button类可能继承自一个抽象的UIComponent类。 3. **输入/输出处理**:在命令行计算器中,Java的Scanner类用于获取用户输入,如数字和运算符。在图形用户界面(GUI)计算器中,可能使用事件监听器处理用户的点击事件,获取按钮上的文字信息。 4. **异常处理**:为了确保程序的健壮性,计算器可能包含异常处理代码,比如当

    SystemExit.md

    SystemExit.md

    NavigationGuardError解决办法.md

    NavigationGuardError解决办法.md

Global site tag (gtag.js) - Google Analytics