`
limaofeng
  • 浏览: 878 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

自己实现的java lock

    博客分类:
  • java
阅读更多
    在做一个ajax web推送功能的时候碰到一个问题。 如果多个用户同时访问List,或者Map中的同一条数据的时候,如果对数据进行插入或者排序,就会出现并发问题。为了避免这个问题查阅了 java.util.concurrent 里面的一些类。发现没有将锁对象公开的实现类。 所以自己就尝试写一个!

实现代码

Lock  接口实现
package com.fantasy.framework.util.concurrent;

import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.fantasy.framework.util.common.DateUtil;

public class ClassLock implements Lock {

	private static final Log logger = LogFactory.getLog(ClassLock.class);

	private ConditionObject condition = new ConditionObject();
	transient Thread owner = null;

	/**
	 * 获取锁定。 <br/>
	 * 如果锁定不可用,出于线程调度目的,将禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态。
	 */
	public void lock() {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
		} else {
			try {
				condition.await();
			} catch (InterruptedException e) {
				logger.debug(e);
			}
			owner = Thread.currentThread();
		}
	}

	/**
	 * 如果当前线程未被中断,则获取锁定。 <br/>
	 * 如果锁定可用,则获取锁定,并立即返回。 <br/>
	 */
	public void lockInterruptibly() throws InterruptedException {
		if (!Thread.currentThread().isInterrupted()) {
			this.lock();
		}
	}
	
	/**
	 * 和接口描述不一样<br/>
	 * 
	 */
	public Condition newCondition() {
		return this.condition;
	}

	/**
	 * 仅在调用时锁定为空闲状态才获取该锁定。 <br/>
	 * 如果锁定可用,则获取锁定,并立即返回值 true。 <br/>
	 * 如果锁定不可用,则此方法将立即返回值 false。
	 */
	public boolean tryLock() {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
			return true;
		}
		return false;
	}

	/**
	 * 如果锁定在给定的等待时间内空闲,并且当前线程未被中断,则获取锁定
	 */
	public boolean tryLock(long time, TimeUnit unit)throws InterruptedException {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
			return true;
		} else {
			if (condition.await(time, unit)) {
				owner = Thread.currentThread();
				return true;
			} else {
				return false;
			}
		}
	}

	/**
	 * 释放锁定
	 */
	public void unlock() {
		if (this.owner == Thread.currentThread()){
			this.owner = null;
			this.condition.signal();
		}
	}

	public class ConditionObject implements Condition {
		private BlockingQueue<Thread> threadQueues = new LinkedBlockingQueue<Thread>();

		/**
		 * 造成当前线程在接到信号或被中断之前一直处于等待状态。
		 */
		public void await() throws InterruptedException {
			if(!this.threadQueues.contains(Thread.currentThread()) && owner != Thread.currentThread())
				this.threadQueues.offer(Thread.currentThread());
			try {
				while (true)
					awaitNanos(TimeUnit.SECONDS.toNanos(60));
			} catch (InterruptedException e) {
				logger.error(e);
			}
		}

		/**
		 * 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
		 */
		public boolean await(long time, TimeUnit unit)throws InterruptedException {
			return awaitNanos(unit.toNanos(time)) > 0;
		}
		
		public long awaitNanos(long nanosTimeout) throws InterruptedException {
			Thread current = Thread.currentThread();
			if (Thread.interrupted())
				throw new InterruptedException();
			if(!this.threadQueues.contains(current) && owner != Thread.currentThread())
				this.threadQueues.offer(current);
			long start = System.currentTimeMillis();
			try {
				TimeUnit.NANOSECONDS.sleep(nanosTimeout);
			} catch (InterruptedException e) {
				logger.error(e);
			}
			long end = System.currentTimeMillis();
			return TimeUnit.MILLISECONDS.toNanos(TimeUnit.NANOSECONDS.toMillis(nanosTimeout) - (end - start));
		}

		public boolean awaitUntil(Date deadline) throws InterruptedException {
			return awaitNanos(DateUtil.interval(deadline, new Date(),Calendar.MILLISECOND)) <= 0 ? false : true;
		}

		/**
		 * 造成当前线程在接到信号之前一直处于等待状态。
		 */
		public void awaitUninterruptibly() {
			try {
				while (true)
					awaitNanos(TimeUnit.SECONDS.toNanos(60));
			} catch (InterruptedException e) {
				logger.debug(e);
			}
		}

		/**
		 * 唤醒一个等待线程。
		 */
		public void signal() {
			if(owner != null){
				owner.interrupt();
			}else{
				Thread thread = threadQueues.poll();
				if (thread != null) {
					thread.interrupt();
				}
			}
		}

		/**
		 * 唤醒所有等待线程。
		 */
		public void signalAll() {
			throw new RuntimeException("signalAll 方法未实现");
		}

	}

}



队列实现
package com.fantasy.framework.util.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LinkedQueue<E> implements BlockingQueue<E> {

	private static final Log logger = LogFactory.getLog(LinkedQueue.class);
	private LinkedList<E> items = new LinkedList<E>();//普通队列
	protected final ClassLock takeLock = new ClassLock();//取出锁
	protected final ClassLock putLock = new ClassLock();//存入锁

	public void fullyLock() {
		putLock.lock();
		takeLock.lock();
	}

	public void fullyUnlock() {
		takeLock.unlock();
		putLock.unlock();
	}

	public E element() {
		return items.element();
	}

	public boolean offer(E o) {
		try {
			return items.offer(o);
		} finally {
			if (this.takeLock.owner != null) {
				this.takeLock.newCondition().signal();
			}
		}
	}
	
	public E peek() {
		return items.peek();
	}

	public E poll() {
		return items.poll();
	}
	
	public E remove() {
		return items.remove();
	}

	public boolean add(E o) {
		return items.add(o);
	}

	public boolean addAll(Collection<? extends E> c) {
		return items.addAll(c);
	}
	
	public void clear() {
		items.clear();
	}

	public boolean contains(Object o) {
		return items.contains(o);
	}

	public boolean containsAll(Collection<?> c) {
		return items.containsAll(c);
	}

	public boolean isEmpty() {
		return items.isEmpty();
	}

	public Iterator<E> iterator() {
		return items.iterator();
	}

	public boolean remove(Object o) {
		return items.remove(o);
	}

	public boolean removeAll(Collection<?> c) {
		return items.removeAll(c);
	}

	public boolean retainAll(Collection<?> c) {
		return items.retainAll(c);
	}

	public int size() {
		return items.size();
	}

	public Object[] toArray() {
		return items.toArray();
	}

	public <T> T[] toArray(T[] a) {
		return items.toArray(a);
	}

	public int drainTo(Collection<? super E> c) {
		return 0;
	}

	public int drainTo(Collection<? super E> c, int maxElements) {
		return 0;
	}

	public boolean offer(E o, long timeout, TimeUnit unit)
			throws InterruptedException {
		try {
			if (this.putLock.tryLock(timeout, unit)) {
				this.offer(o);
				return true;
			}
			return false;
		} finally {
			this.putLock.unlock();
		}
	}

	public E poll(long time, TimeUnit unit) throws InterruptedException {
		try {
			long start = System.currentTimeMillis();
			long timeout = unit.toMillis(time);
			if (this.takeLock.tryLock(time,unit)) {
				long end = System.currentTimeMillis();
				E e = this.poll();
				if(e == null){
					this.takeLock.newCondition().await((timeout - (end - start)), TimeUnit.MILLISECONDS);
					return this.poll();
				}
				return e;
			}
			return null;
		} finally {
			this.takeLock.unlock();
		}
	}

	public static void main(String[] args) throws Exception{
		final LinkedQueue<String> queue = new LinkedQueue<String>();
		(new Thread(new Runnable() {

			public void run() {
				final Thread thread = Thread.currentThread();
				try {					
					queue.takeLock.lock();
					System.out.println("获取takeLock");
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}				
				queue.takeLock.unlock();
				queue.add("123123");
				System.out.println("释放takeLock");
			}

		})).start();
		long start = System.currentTimeMillis();
		queue.poll(10000, TimeUnit.MILLISECONDS);
		long end = System.currentTimeMillis();
		System.out.println("运行时间>"+(end - start));
	}
	
	public void put(E o){
		try {
			this.putLock.lock();
			this.offer(o);
		} finally {
			this.putLock.unlock();
		}
	}

	public int remainingCapacity() {
		return Integer.MAX_VALUE;
	}

	public E take(){
		try {
			this.takeLock.lock();
			if (this.size() == 0) {
				this.takeLock.newCondition().awaitUninterruptibly();
			}
			return this.poll();
		} finally {
			this.takeLock.unlock();
		}
	}

	public List<E> toList() {
		return this.items;
	}

}



主要在使用 LinkedQueue 就可以使用锁了
LinkedQueue<Message> queue = new LinkedQueue<Message>();
		try{
			queue.fullyLock();
			//对队列排序或者插入排序的时候 锁定 takeLock 和   putLock
		}finally{
			queue.fullyUnlock();
		}



基本思路:如果一个线程获得锁,其他线程再获取该锁时,会挂起该线程,并将线程放入一个等待队列。待线程锁释放的时候再去检查等待队列,出队。并激活线程
分享到:
评论

相关推荐

    java的Lock锁原理详解.docx

    在Java中,有两种主要的锁机制:synchronized和Lock。它们都是用来实现线程同步,防止数据竞争,确保并发环境下的数据一致性。 首先,synchronized是Java的关键字,由JVM直接支持,其底层实现依赖于操作系统原语,...

    JavaLock与Condition的理解Reentran

    本文将深入探讨JavaLock中的ReentrantLock(可重入锁)以及与其紧密相关的Condition接口,帮助你理解它们的工作原理和应用场景。 **一、ReentrantLock可重入锁** ReentrantLock是Java.util.concurrent.locks包下的...

    java的lock和synchronized的区别.docx

    Java 中的 Lock 和 Synchronized 的区别 Java 语言中有很多相似关键字或相似意义的字,但 lock 和 synchronized 是两个最容易混淆的关键字。它们都是锁的意思,都是为了线程安全性、应用合理性和运行效率的。下面...

    Java lock同步锁使用实例解析

    Lock是一个接口,而synchronized是Java中的关键字,synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定,但是使用Lock则不行,lock是...

    Java Lock锁多线程中实现流水线任务

    "Java Lock锁多线程中实现流水线任务" Java Lock锁多线程中实现流水线任务是Java多线程编程中非常重要的一个主题。在多线程编程中,如何保证线程安全和避免死锁是两个主要的问题。Java提供了多种方式来解决这些问题...

    java Lock接口详解及实例代码

    Java Lock接口是Java并发编程中一个重要的组成部分,它提供了一种更为灵活的锁机制,相比传统的`synchronized`关键字,Lock接口允许...理解并熟练使用Lock接口及其实现,可以有效地提高Java并发程序的性能和可维护性。

    Java中的Lock接口及其实现:深度解析与代码示例

    Lock接口及其实现类ReentrantLock是Java并发编程中的重要工具,它们提供了比synchronized关键字更灵活的线程同步控制。通过使用Lock接口,我们可以更精细地控制线程的同步行为,从而编写出更高效、更可靠的并发程序...

    Java synchronized关键字和Lock接口实现原理

    Java synchronized关键字和Lock接口实现原理 Java 中的 synchronized 关键字和 Lock 接口是两种常用的线程同步机制,它们都可以用来解决并发问题。下面我们将详细介绍 synchronized 关键字和 Lock 接口的实现原理。...

    铁路订票系统 JAVA实现

    JAVA提供了synchronized关键字和Lock接口来实现线程安全的操作,防止多用户同时选座导致的数据冲突。 支付订单通常会用到第三方支付API的集成,如支付宝或微信支付。这涉及到网络通信和JSON数据交换,JAVA的...

    基于令牌桶算法的Java限流实现

    基于令牌桶算法的Java限流实现 在软件系统中,限流机制是一个重要的环节,它可以防止系统资源被过度使用,避免系统崩溃或性能下降。常见的限流算法有多种,如漏桶算法、令牌桶算法、滑动窗口算法等。在Java中,我们...

    生产者消费者Java—LOCK机制

    为了确保线程安全和有效率的数据交换,Java提供了多种同步机制,其中包括Lock接口及其实现,如ReentrantLock。本项目通过Lock机制实现了生产者-消费者的解决方案。 Lock接口是Java并发库(java.util.concurrent....

    Java锁机制Lock用法示例

    在Java中,Lock机制主要有三种实现方式: 1. 同步代码块 synchronized 隐式锁:使用synchronized关键字修饰的代码块,会自动加锁和释放锁。 2. 同步方法 synchronized 隐式锁:使用synchronized关键字修饰的方法,...

    PersistentIdealHashTree-Java实现

    Java提供了synchronized关键字和Lock接口,可以用来实现线程安全的访问。例如,每个节点可以是一个线程安全的对象,或者整个树可以使用读写锁来控制并发。 在PersistentIdealHashTreeTest.java测试类中,常见的测试...

    基于java聊天系统的设计与实现

    在这个项目中,我们将深入探讨如何利用Java技术实现一个功能完备、高效的聊天系统。 首先,我们从基础开始,Java是一种面向对象的编程语言,它的跨平台特性使得开发聊天系统变得更为便捷。Java的基础类库提供了丰富...

    Java并发机制的底层实现原理.pdf

    Java并发机制的底层实现原理涉及到多个方面,包括了本地内存与线程安全的问题、volatile关键字的使用、synchronized关键字的原理以及Java并发在处理器层面是如何实现的。通过这些机制,Java能够有效地管理多线程环境...

    java实现多线程文件传输

    - **同步机制**:为了避免数据冲突,可以使用`synchronized`关键字或者`Lock`接口的实现(如`ReentrantLock`)进行同步控制。 - **进度更新**:每个线程在传输过程中可以更新进度,主线程通过监听这些更新来显示...

    实现Java高并发隔离 模拟

    在Java编程中,高并发是系统设计中的一...总之,实现Java高并发隔离是提升系统性能和稳定性的重要手段,它涵盖了多种并发控制策略和技术。通过深入理解和实践这些概念,开发者可以设计出更高效、更健壮的并发应用程序。

Global site tag (gtag.js) - Google Analytics