`
cfyme
  • 浏览: 275072 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

linkedBlokingQueue源码学习

 
阅读更多

ReentrantLock是一个较为常用的锁对象 可重入的互斥锁定 与synchronized 相同的一些基本行为和语义,但功能更强大

 

Condition类与lock绑定,用newCondition()方法创建,提供了线程之间通信的方式(类似信号量)。

其使用基本与object类的wait,notify,notifyAll相同。

 

1,用condition.await()替换Object,wait(),调用时该线程阻塞,释放该线程的锁。

2,用condition.signal()替换Object.notify(),用condition.signalAll()替换Object.notifyAll(),唤醒该condition await方法所阻塞的线程

 

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
      //链表节点node类结构   
      static class Node<E> {  
          volatile E item;//volatile,保证了数据的可见性   
          Node<E> next;  
          Node(E x) { item = x; }  
      }  
      //容量
      private final int capacity;  
      //用原子变量,当前元素个数  
      private final AtomicInteger count = new AtomicInteger(0);  
      //头节点
      private transient Node<E> head;  
      //表尾节点 
      private transient Node<E> last;  
      //获取元素或删除元素时,要加的takeLock锁  
      private final ReentrantLock takeLock = new ReentrantLock();  
      //获取元素时若队列为空,线程阻塞,直至notEmpty条件满足(被通知) 
      private final Condition notEmpty = takeLock.newCondition();  
      //插入元素时 要加putLock锁  
      private final ReentrantLock putLock = new ReentrantLock();  
      //插入时,若队列已满,线程阻塞,直至notFull条件满足(被通知)
      private final Condition notFull = putLock.newCondition();  
      // 唤醒等待的take操作,插入数据时若插入前链表中无数据,则调用,表示链表不再为空
      private void signalNotEmpty() {  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              notEmpty.signal();  
          } finally {  
              takeLock.unlock();  
          }  
      }  
      //唤醒等待插入操作,移除数据时若链表原先已满则调用,表示链表不再满 
      private void signalNotFull() {  
          final ReentrantLock putLock = this.putLock;  
          putLock.lock();  
          try {  
              notFull.signal();  
          } finally {  
              putLock.unlock();  
          }  
      }  
      // 插入到链表尾部 
      private void insert(E x) {  
          last = last.next = new Node<E>(x);  
      }  
      //获取并移除头元素 
      private E extract() {  
          Node<E> first = head.next;  
          head = first;  
          E x = first.item;  
          first.item = null;  
          return x;  
      }  
      //锁住两把锁,在remove,clear等方法中调用   
      private void fullyLock() {  
          putLock.lock();  
          takeLock.lock();  
      }  
      //和fullyLock成对使用 
      private void fullyUnlock() {  
          takeLock.unlock();  
          putLock.unlock();  
      }  
      //默认构造,容量为 Integer.MAX_VALUE  
 
      public LinkedBlockingQueue() {  
          this(Integer.MAX_VALUE);  
      }  
      //指定容量的构造   
      public LinkedBlockingQueue(int capacity) {  
          if (capacity <= 0) throw new IllegalArgumentException();  
          this.capacity = capacity;  
          last = head = new Node<E>(null);  
      }  
      //指定初始化集合的构造   
      public LinkedBlockingQueue(Collection<? extends E> c) {  
          this(Integer.MAX_VALUE);  
          for (E e : c)  
              add(e);  
      }  
      //获得大小 
        
      public int size() {  
          return count.get();  
      }  
      //剩余容量  
      public int remainingCapacity() {  
          return capacity - count.get();  
      }  
      // 将指定元素插入到此队列的尾部,如已满,阻塞至队列中有元素被移除 
      public void put(E e) throws InterruptedException {  
          if (e == null) throw new NullPointerException();  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          final AtomicInteger count = this.count;
   //加put锁,多个线程不能同时进入  
          putLock.lockInterruptibly();  
          try {  
              try {  
    //容量已满,则一直阻塞
                  while (count.get() == capacity)  
                      notFull.await();  
              } catch (InterruptedException ie) {  
                  notFull.signal(); // propagate to a non-interrupted thread  
                  throw ie;  
              }  
//插入
              insert(e);  
              c = count.getAndIncrement();
//通知链表未满  
              if (c + 1 < capacity)  
                  notFull.signal();  
          } finally {  
//解锁,注意必须在finally里调用,反正各种异常导致没有unlock使线程死锁
              putLock.unlock();  
          }  
    //通知链表非空
          if (c == 0)  
              signalNotEmpty();  
      }  
      // 将指定元素插入到此队列的尾部,如有必要,则等待一定时间以使空间变得可用。 
       
      public boolean offer(E e, long timeout, TimeUnit unit)  
          throws InterruptedException {  
          if (e == null) throw new NullPointerException();  
          long nanos = unit.toNanos(timeout);  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          final AtomicInteger count = this.count;  
   //加锁
          putLock.lockInterruptibly();  
          try {  
              for (;;) {
                  //未满可插入  
                  if (count.get() < capacity) {  
                      insert(e);  
                      c = count.getAndIncrement();
    //通知未满  
                      if (c + 1 < capacity)  
                          notFull.signal();
    //跳出循环  
                      break;  
                  }  
   //队列已满,未能插入,等待时间是负的,直接返回
                  if (nanos <= 0)  
                      return false;  
                  try {  
    //等待一定时间后再次尝试
                      nanos = notFull.awaitNanos(nanos);  
                  } catch (InterruptedException ie) {  
                      notFull.signal(); // propagate to a non-interrupted thread  
                      throw ie;  
                  }  
              }  
          } finally {  
//解锁
              putLock.unlock();  
          }  
//通知已插入数据,链表非空
          if (c == 0)  
              signalNotEmpty();  
          return true;  
      }  
      //将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量), 
       在成功时返回 true,如果此队列已满,则返回 false。 
        
      public boolean offer(E e) {  
          if (e == null) throw new NullPointerException();  
          final AtomicInteger count = this.count;  
          if (count.get() == capacity)  
              return false;  
          int c = -1;  
          final ReentrantLock putLock = this.putLock;  
          putLock.lock();  
          try {  
//由于可能在lock被阻塞时其他线程进行了插入操作,需再次判断count
              if (count.get() < capacity) {  
                  insert(e);  
                  c = count.getAndIncrement();
    //通知未满  
                  if (c + 1 < capacity)  
                      notFull.signal();  
              }  
          } finally {  
              putLock.unlock();  
          }  
  //通知非空
          if (c == 0)  
              signalNotEmpty();
          // >0表示已成功插入  
          return c >= 0;  
      }  
      //获取并移除此队列的头部,若队列为空,则阻塞。  
      public E take() throws InterruptedException {  
          E x;  
          int c = -1;  
          final AtomicInteger count = this.count;  
          final ReentrantLock takeLock = this.takeLock;
   //加锁 
          takeLock.lockInterruptibly();  
          try {  
              try {
   //队列为空时阻塞 
                  while (count.get() == 0)  
                      notEmpty.await();  
              } catch (InterruptedException ie) {  
                  notEmpty.signal(); // propagate to a non-interrupted thread  
                  throw ie;  
              }  
//获取数据
              x = extract();  
              c = count.getAndDecrement();
//通知非空  
              if (c > 1)  
                  notEmpty.signal();  
          } finally {  
              takeLock.unlock();  
          }  
    //通知未满
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
        
      //与offer方法结构基本一致,若队列为空,则阻塞一段时间,一段时间后仍为空,则返回null
      public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
          E x = null;  
          int c = -1;  
          long nanos = unit.toNanos(timeout);  
          final AtomicInteger count = this.count;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lockInterruptibly();  
          try {  
              for (;;) {  
                  if (count.get() > 0) {  
                      x = extract();  
                      c = count.getAndDecrement();  
                      if (c > 1)  
                          notEmpty.signal();  
                      break;  
                  }  
                  if (nanos <= 0)  
                      return null;  
                  try {  
                      nanos = notEmpty.awaitNanos(nanos);  
                  } catch (InterruptedException ie) {  
                      notEmpty.signal(); // propagate to a non-interrupted thread  
                      throw ie;  
                  }  
              }  
          } finally {  
              takeLock.unlock();  
          }  
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
        
      ////与offer方法结构基本一致 队列为空,不阻塞,直接返回null
      public E poll() {  
          final AtomicInteger count = this.count;  
          if (count.get() == 0)  
              return null;  
          E x = null;  
          int c = -1;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              if (count.get() > 0) {  
                  x = extract();  
                  c = count.getAndDecrement();  
                  if (c > 1)  
                      notEmpty.signal();  
              }  
          } finally {  
              takeLock.unlock();  
          }  
          if (c == capacity)  
              signalNotFull();  
          return x;  
      }  
      //获取但不移除此队列的头;如果此队列为空,则返回 null。  
      public E peek() {  
          if (count.get() == 0)  
              return null;  
          final ReentrantLock takeLock = this.takeLock;  
          takeLock.lock();  
          try {  
              Node<E> first = head.next;  
              if (first == null)  
                  return null;  
              else  
                  return first.item;  
          } finally {  
              takeLock.unlock();  
          }  
      }  
      /** 
       * 从此队列移除指定元素的单个实例(如果存在)。 
       */  
      public boolean remove(Object o) {  
          if (o == null) return false;  
          boolean removed = false;
   //同时加锁,此时其他线程不能插入,不能移除
          fullyLock();  
          try {  
              Node<E> trail = head;  
              Node<E> p = head.next;
//遍历,获取到该元素  
              while (p != null) {  
                  if (o.equals(p.item)) {  
                      removed = true;  
                      break;  
                  }  
                  trail = p;  
                  p = p.next;  
              }  
//删除该元素
              if (removed) {  
                  p.item = null;  
                  trail.next = p.next;  
                  if (last == p)  
                      last = trail;  
                  if (count.getAndDecrement() == capacity)  
                      notFull.signalAll();  
              }  
          } finally {  
              fullyUnlock();  
          }  
          return removed;  
      }  
      ……  
  }

 

分享到:
评论

相关推荐

    白色卡通风格响应式游戏应用商店企业网站模板.zip

    白色卡通风格响应式游戏应用商店企业网站模板.zip

    48页-智慧工地监管平台解决方案.pdf

    智慧工地,作为现代建筑施工管理的创新模式,以“智慧工地云平台”为核心,整合施工现场的“人机料法环”关键要素,实现了业务系统的协同共享,为施工企业提供了标准化、精益化的工程管理方案,同时也为政府监管提供了数据分析及决策支持。这一解决方案依托云网一体化产品及物联网资源,通过集成公司业务优势,面向政府监管部门和建筑施工企业,自主研发并整合加载了多种工地行业应用。这些应用不仅全面连接了施工现场的人员、机械、车辆和物料,实现了数据的智能采集、定位、监测、控制、分析及管理,还打造了物联网终端、网络层、平台层、应用层等全方位的安全能力,确保了整个系统的可靠、可用、可控和保密。 在整体解决方案中,智慧工地提供了政府监管级、建筑企业级和施工现场级三类解决方案。政府监管级解决方案以一体化监管平台为核心,通过GIS地图展示辖区内工程项目、人员、设备信息,实现了施工现场安全状况和参建各方行为的实时监控和事前预防。建筑企业级解决方案则通过综合管理平台,提供项目管理、进度管控、劳务实名制等一站式服务,帮助企业实现工程管理的标准化和精益化。施工现场级解决方案则以可视化平台为基础,集成多个业务应用子系统,借助物联网应用终端,实现了施工信息化、管理智能化、监测自动化和决策可视化。这些解决方案的应用,不仅提高了施工效率和工程质量,还降低了安全风险,为建筑行业的可持续发展提供了有力支持。 值得一提的是,智慧工地的应用系统还围绕着工地“人、机、材、环”四个重要因素,提供了各类信息化应用系统。这些系统通过配置同步用户的组织结构、智能权限,结合各类子系统应用,实现了信息的有效触达、问题的及时跟进和工地的有序管理。此外,智慧工地还结合了虚拟现实(VR)和建筑信息模型(BIM)等先进技术,为施工人员提供了更为直观、生动的培训和管理工具。这些创新技术的应用,不仅提升了施工人员的技能水平和安全意识,还为建筑行业的数字化转型和智能化升级注入了新的活力。总的来说,智慧工地解决方案以其创新性、实用性和高效性,正在逐步改变建筑施工行业的传统管理模式,引领着建筑行业向更加智能化、高效化和可持续化的方向发展。

    基于卷积神经网络的AV1视频编码环路滤波技术

    内容概要:本文提出了一种基于卷积神经网络(CNN)的AV1视频编码环路滤波方法。该方法利用深度可变的简单网络结构SimNet,针对不同量化参数(QP)调整网络深度,从而提高编码效率和视觉质量。同时,作者提出了一种适用于INTER编码的跳过增强策略,以避免重复增强导致的图像质量下降。实验结果表明,该方法在INTRA和INTER编码模式下分别实现了平均7.27%和5.57%的BD-rate降低,且在编码时间上优于AV1基准。 适合人群:视频编码研究人员、AI开发者、多媒体技术专家。 使用场景及目标:适用于提升视频压缩编码的效率和视觉质量,特别是对于AV1视频编码标准的应用。 其他说明:该方法不仅提高了编码效率和视觉质量,还降低了计算复杂度。

    白色简洁风格的商业投资组合网站HTML5模板.zip

    白色简洁风格的商业投资组合网站HTML5模板.zip

    在线式缠绕膜机自动覆膜缠绕机sw16全套技术资料100%好用.zip

    在线式缠绕膜机自动覆膜缠绕机sw16全套技术资料100%好用.zip

    (176109030)基于ESO的永磁同步电机无感FOC1.采用线性扩张状态观测器(LESO)估计电机反电势,利用锁相环从反电势中提取位置和转速信息

    基于ESO的永磁同步电机无感FOC 1.采用线性扩张状态观测器(LESO)估计电机反电势,利用锁相环从反电势中提取位置和转速信息,从而实现无位置传感器控制; 2.提供算法对应的参考文献和仿真模型。 购买赠送PMSM控制相关电子资料。 仿真模型纯手工搭建,不是从网络上复制得到。 仿真模型仅供学习参考。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    白色简洁风格的酒店展示及预订企业网站源码下载.zip

    白色简洁风格的酒店展示及预订企业网站源码下载.zip

    白色简洁风格的享受旅行导航指南整站网站源码下载.zip

    白色简洁风格的享受旅行导航指南整站网站源码下载.zip

    白色简洁风格的照片浏览切换源码下载.rar

    白色简洁风格的照片浏览切换源码下载.rar

    白色简洁风格的女装商城整站网站源码下载.zip

    白色简洁风格的女装商城整站网站源码下载.zip

    EnvironmentError.md

    EnvironmentError.md

    白色简洁风格的举重锻炼健身企业网站模板.zip

    白色简洁风格的举重锻炼健身企业网站模板.zip

    html+css+js网页设计 美食 家美食1个页面

    预览地址:https://blog.csdn.net/qq_42431718/article/details/144758683 html+css+js网页设计 美食 家美食1个页面

    白色简洁风格的室内家具展示企业网站源码下载.zip

    白色简洁风格的室内家具展示企业网站源码下载.zip

    白色简洁风格的婚礼活动展示信息源码下载.zip

    白色简洁风格的婚礼活动展示信息源码下载.zip

    Python绘制圣诞树:文本和图形实现

    内容概要:本文详细介绍了使用Python绘制圣诞树的方法,包括简单的文本圣诞树、使用Turtle库绘制复杂图形圣诞树,以及添加装饰和动态效果的高级应用。文章通过代码示例和详细的解释,逐步展示了如何实现不同风格的圣诞树。 适合人群:Python初学者,特别是对图形编程感兴趣的读者。 使用场景及目标:① 学习如何使用Python的字符串操作和循环结构绘制文本圣诞树;② 掌握Turtle库的基本用法,绘制复杂的图形圣诞树;③ 添加装饰和动态效果,提升节日氛围。 阅读建议:建议读者跟随代码示例进行实践,结合注释和解释,更好地理解和掌握每个步骤的实现方法。

    白色简洁风格的宴席精致餐饮整站网站源码下载.zip

    白色简洁风格的宴席精致餐饮整站网站源码下载.zip

    基于STM32f103的红外测温仪程序(测温模块MLX90614,芯片GY906)

    采用STM32F103系列作为主控,利用MLX90614采集温度,配以OLED显示以及flash存储数据

    html渲染器,粘贴html代码到这个渲染器即可渲染出对应的效果

    html渲染器,粘贴html代码到这个渲染器即可渲染出对应的效果

    在线教育平台:课程管理与学习分析

    随着学业负担的日益加重,越来越多的学生选择通过家教、自学或参加补习班来加强课外学习。然而,家教费用高昂,自学效率低下且难以及时解决疑难问题,而补习班则受限于时间和地点,灵活性不足。此外,国家政策也不鼓励校外补习。鉴于网络技术的成熟和各类在线平台的兴起,开发一个专业的在线辅助学习网站对于辅助学生的课外学习显得尤为重要。 本在线教育系统基于Vue.js构建,采用B/S架构设计,后端语言为Java,数据库使用MySQL。通过整合Vue.js技术,系统界面更加丰富和友好。系统主要面向课程购买用户,涉及的角色包括管理员、学生和教师。学生可以注册登录后浏览课程视频、收藏课程、留言并购买课程,同时实现订单管理。管理员负责管理学生信息、课程信息、发布班级和管理章节等。教师则可以管理课程订单、课程内容和章节。该系统允许学生利用碎片时间自主学习,具有很高的灵活性,对于难以理解的课程可以反复学习并在线提问,极大地促进了学生的学习。

Global site tag (gtag.js) - Google Analytics