在做一个ajax web推送功能的时候碰到一个问题。 如果多个用户同时访问List,或者Map中的同一条数据的时候,如果对数据进行插入或者排序,就会出现并发问题。为了避免这个问题查阅了 java.util.concurrent 里面的一些类。发现没有将锁对象公开的实现类。 所以自己就尝试写一个!
实现代码
Lock 接口实现
队列实现
主要在使用 LinkedQueue 就可以使用锁了
基本思路:如果一个线程获得锁,其他线程再获取该锁时,会挂起该线程,并将线程放入一个等待队列。待线程锁释放的时候再去检查等待队列,出队。并激活线程
实现代码
Lock 接口实现
package com.fantasy.framework.util.concurrent; import java.util.Calendar; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.fantasy.framework.util.common.DateUtil; public class ClassLock implements Lock { private static final Log logger = LogFactory.getLog(ClassLock.class); private ConditionObject condition = new ConditionObject(); transient Thread owner = null; /** * 获取锁定。 <br/> * 如果锁定不可用,出于线程调度目的,将禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态。 */ public void lock() { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); } else { try { condition.await(); } catch (InterruptedException e) { logger.debug(e); } owner = Thread.currentThread(); } } /** * 如果当前线程未被中断,则获取锁定。 <br/> * 如果锁定可用,则获取锁定,并立即返回。 <br/> */ public void lockInterruptibly() throws InterruptedException { if (!Thread.currentThread().isInterrupted()) { this.lock(); } } /** * 和接口描述不一样<br/> * */ public Condition newCondition() { return this.condition; } /** * 仅在调用时锁定为空闲状态才获取该锁定。 <br/> * 如果锁定可用,则获取锁定,并立即返回值 true。 <br/> * 如果锁定不可用,则此方法将立即返回值 false。 */ public boolean tryLock() { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); return true; } return false; } /** * 如果锁定在给定的等待时间内空闲,并且当前线程未被中断,则获取锁定 */ public boolean tryLock(long time, TimeUnit unit)throws InterruptedException { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); return true; } else { if (condition.await(time, unit)) { owner = Thread.currentThread(); return true; } else { return false; } } } /** * 释放锁定 */ public void unlock() { if (this.owner == Thread.currentThread()){ this.owner = null; this.condition.signal(); } } public class ConditionObject implements Condition { private BlockingQueue<Thread> threadQueues = new LinkedBlockingQueue<Thread>(); /** * 造成当前线程在接到信号或被中断之前一直处于等待状态。 */ public void await() throws InterruptedException { if(!this.threadQueues.contains(Thread.currentThread()) && owner != Thread.currentThread()) this.threadQueues.offer(Thread.currentThread()); try { while (true) awaitNanos(TimeUnit.SECONDS.toNanos(60)); } catch (InterruptedException e) { logger.error(e); } } /** * 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 */ public boolean await(long time, TimeUnit unit)throws InterruptedException { return awaitNanos(unit.toNanos(time)) > 0; } public long awaitNanos(long nanosTimeout) throws InterruptedException { Thread current = Thread.currentThread(); if (Thread.interrupted()) throw new InterruptedException(); if(!this.threadQueues.contains(current) && owner != Thread.currentThread()) this.threadQueues.offer(current); long start = System.currentTimeMillis(); try { TimeUnit.NANOSECONDS.sleep(nanosTimeout); } catch (InterruptedException e) { logger.error(e); } long end = System.currentTimeMillis(); return TimeUnit.MILLISECONDS.toNanos(TimeUnit.NANOSECONDS.toMillis(nanosTimeout) - (end - start)); } public boolean awaitUntil(Date deadline) throws InterruptedException { return awaitNanos(DateUtil.interval(deadline, new Date(),Calendar.MILLISECOND)) <= 0 ? false : true; } /** * 造成当前线程在接到信号之前一直处于等待状态。 */ public void awaitUninterruptibly() { try { while (true) awaitNanos(TimeUnit.SECONDS.toNanos(60)); } catch (InterruptedException e) { logger.debug(e); } } /** * 唤醒一个等待线程。 */ public void signal() { if(owner != null){ owner.interrupt(); }else{ Thread thread = threadQueues.poll(); if (thread != null) { thread.interrupt(); } } } /** * 唤醒所有等待线程。 */ public void signalAll() { throw new RuntimeException("signalAll 方法未实现"); } } }
队列实现
package com.fantasy.framework.util.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class LinkedQueue<E> implements BlockingQueue<E> { private static final Log logger = LogFactory.getLog(LinkedQueue.class); private LinkedList<E> items = new LinkedList<E>();//普通队列 protected final ClassLock takeLock = new ClassLock();//取出锁 protected final ClassLock putLock = new ClassLock();//存入锁 public void fullyLock() { putLock.lock(); takeLock.lock(); } public void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public E element() { return items.element(); } public boolean offer(E o) { try { return items.offer(o); } finally { if (this.takeLock.owner != null) { this.takeLock.newCondition().signal(); } } } public E peek() { return items.peek(); } public E poll() { return items.poll(); } public E remove() { return items.remove(); } public boolean add(E o) { return items.add(o); } public boolean addAll(Collection<? extends E> c) { return items.addAll(c); } public void clear() { items.clear(); } public boolean contains(Object o) { return items.contains(o); } public boolean containsAll(Collection<?> c) { return items.containsAll(c); } public boolean isEmpty() { return items.isEmpty(); } public Iterator<E> iterator() { return items.iterator(); } public boolean remove(Object o) { return items.remove(o); } public boolean removeAll(Collection<?> c) { return items.removeAll(c); } public boolean retainAll(Collection<?> c) { return items.retainAll(c); } public int size() { return items.size(); } public Object[] toArray() { return items.toArray(); } public <T> T[] toArray(T[] a) { return items.toArray(a); } public int drainTo(Collection<? super E> c) { return 0; } public int drainTo(Collection<? super E> c, int maxElements) { return 0; } public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { try { if (this.putLock.tryLock(timeout, unit)) { this.offer(o); return true; } return false; } finally { this.putLock.unlock(); } } public E poll(long time, TimeUnit unit) throws InterruptedException { try { long start = System.currentTimeMillis(); long timeout = unit.toMillis(time); if (this.takeLock.tryLock(time,unit)) { long end = System.currentTimeMillis(); E e = this.poll(); if(e == null){ this.takeLock.newCondition().await((timeout - (end - start)), TimeUnit.MILLISECONDS); return this.poll(); } return e; } return null; } finally { this.takeLock.unlock(); } } public static void main(String[] args) throws Exception{ final LinkedQueue<String> queue = new LinkedQueue<String>(); (new Thread(new Runnable() { public void run() { final Thread thread = Thread.currentThread(); try { queue.takeLock.lock(); System.out.println("获取takeLock"); Thread.currentThread().sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } queue.takeLock.unlock(); queue.add("123123"); System.out.println("释放takeLock"); } })).start(); long start = System.currentTimeMillis(); queue.poll(10000, TimeUnit.MILLISECONDS); long end = System.currentTimeMillis(); System.out.println("运行时间>"+(end - start)); } public void put(E o){ try { this.putLock.lock(); this.offer(o); } finally { this.putLock.unlock(); } } public int remainingCapacity() { return Integer.MAX_VALUE; } public E take(){ try { this.takeLock.lock(); if (this.size() == 0) { this.takeLock.newCondition().awaitUninterruptibly(); } return this.poll(); } finally { this.takeLock.unlock(); } } public List<E> toList() { return this.items; } }
主要在使用 LinkedQueue 就可以使用锁了
LinkedQueue<Message> queue = new LinkedQueue<Message>(); try{ queue.fullyLock(); //对队列排序或者插入排序的时候 锁定 takeLock 和 putLock }finally{ queue.fullyUnlock(); }
基本思路:如果一个线程获得锁,其他线程再获取该锁时,会挂起该线程,并将线程放入一个等待队列。待线程锁释放的时候再去检查等待队列,出队。并激活线程
相关推荐
在Java中,有两种主要的锁机制:synchronized和Lock。它们都是用来实现线程同步,防止数据竞争,确保并发环境下的数据一致性。 首先,synchronized是Java的关键字,由JVM直接支持,其底层实现依赖于操作系统原语,...
本文将深入探讨JavaLock中的ReentrantLock(可重入锁)以及与其紧密相关的Condition接口,帮助你理解它们的工作原理和应用场景。 **一、ReentrantLock可重入锁** ReentrantLock是Java.util.concurrent.locks包下的...
Java 中的 Lock 和 Synchronized 的区别 Java 语言中有很多相似关键字或相似意义的字,但 lock 和 synchronized 是两个最容易混淆的关键字。它们都是锁的意思,都是为了线程安全性、应用合理性和运行效率的。下面...
Lock是一个接口,而synchronized是Java中的关键字,synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定,但是使用Lock则不行,lock是...
"Java Lock锁多线程中实现流水线任务" Java Lock锁多线程中实现流水线任务是Java多线程编程中非常重要的一个主题。在多线程编程中,如何保证线程安全和避免死锁是两个主要的问题。Java提供了多种方式来解决这些问题...
Java Lock接口是Java并发编程中一个重要的组成部分,它提供了一种更为灵活的锁机制,相比传统的`synchronized`关键字,Lock接口允许...理解并熟练使用Lock接口及其实现,可以有效地提高Java并发程序的性能和可维护性。
Lock接口及其实现类ReentrantLock是Java并发编程中的重要工具,它们提供了比synchronized关键字更灵活的线程同步控制。通过使用Lock接口,我们可以更精细地控制线程的同步行为,从而编写出更高效、更可靠的并发程序...
Java synchronized关键字和Lock接口实现原理 Java 中的 synchronized 关键字和 Lock 接口是两种常用的线程同步机制,它们都可以用来解决并发问题。下面我们将详细介绍 synchronized 关键字和 Lock 接口的实现原理。...
JAVA提供了synchronized关键字和Lock接口来实现线程安全的操作,防止多用户同时选座导致的数据冲突。 支付订单通常会用到第三方支付API的集成,如支付宝或微信支付。这涉及到网络通信和JSON数据交换,JAVA的...
基于令牌桶算法的Java限流实现 在软件系统中,限流机制是一个重要的环节,它可以防止系统资源被过度使用,避免系统崩溃或性能下降。常见的限流算法有多种,如漏桶算法、令牌桶算法、滑动窗口算法等。在Java中,我们...
为了确保线程安全和有效率的数据交换,Java提供了多种同步机制,其中包括Lock接口及其实现,如ReentrantLock。本项目通过Lock机制实现了生产者-消费者的解决方案。 Lock接口是Java并发库(java.util.concurrent....
在Java中,Lock机制主要有三种实现方式: 1. 同步代码块 synchronized 隐式锁:使用synchronized关键字修饰的代码块,会自动加锁和释放锁。 2. 同步方法 synchronized 隐式锁:使用synchronized关键字修饰的方法,...
Java提供了synchronized关键字和Lock接口,可以用来实现线程安全的访问。例如,每个节点可以是一个线程安全的对象,或者整个树可以使用读写锁来控制并发。 在PersistentIdealHashTreeTest.java测试类中,常见的测试...
在这个项目中,我们将深入探讨如何利用Java技术实现一个功能完备、高效的聊天系统。 首先,我们从基础开始,Java是一种面向对象的编程语言,它的跨平台特性使得开发聊天系统变得更为便捷。Java的基础类库提供了丰富...
Java并发机制的底层实现原理涉及到多个方面,包括了本地内存与线程安全的问题、volatile关键字的使用、synchronized关键字的原理以及Java并发在处理器层面是如何实现的。通过这些机制,Java能够有效地管理多线程环境...
- **同步机制**:为了避免数据冲突,可以使用`synchronized`关键字或者`Lock`接口的实现(如`ReentrantLock`)进行同步控制。 - **进度更新**:每个线程在传输过程中可以更新进度,主线程通过监听这些更新来显示...
在Java编程中,高并发是系统设计中的一...总之,实现Java高并发隔离是提升系统性能和稳定性的重要手段,它涵盖了多种并发控制策略和技术。通过深入理解和实践这些概念,开发者可以设计出更高效、更健壮的并发应用程序。