1. 各种同步控制工具的使用
1.1 ReentrantLock
package test; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 10000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { Test test = new Test(); Thread t1 = new Thread(test); Thread t2 = new Thread(test); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } }
有两个线程都对i进行++操作,为了保证线程安全,使用了 ReentrantLock,从用法上可以看出, synchronized相比,ReentrantLock就稍微复杂一点。因为必须在finally中进行解锁操作,如果不 finally解锁,有可能代码出现异常锁没被释放,而synchronized是由JVM来释放锁。
1.1.1 可重入
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
由于ReentrantLock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。
public class Child extends Father implements Runnable{ final static Child child = new Child();//为了保证锁唯一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 调用自己类中其他的synchronized方法 } private synchronized void doAnotherThing() { super.doSomething(); // 调用父类的synchronized方法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
我们可以看到一个线程进入不同的 synchronized方法,是不会释放之前得到的锁的。所以输出还是顺序输出。所以synchronized也是重入锁
1child.doSomething() 2father.doSomething() 3child.doAnotherThing() 1child.doSomething() 2father.doSomething() 3child.doAnotherThing() 1child.doSomething() 2father.doSomething() 3child.doAnotherThing() ...
package test; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock1.lockInterruptibly(); } } catch (Exception e) { // TODO: handle exception } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":线程退出"); } } public static void main(String[] args) throws InterruptedException { Test t1 = new Test(1); Test t2 = new Test(2); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t2); thread1.start(); thread2.start(); Thread.sleep(1000); //DeadlockChecker.check(); } static class DeadlockChecker { private final static ThreadMXBean mbean = ManagementFactory .getThreadMXBean(); final static Runnable deadlockChecker = new Runnable() { @Override public void run() { // TODO Auto-generated method stub while (true) { long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null) { ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet()) { for (int i = 0; i < threadInfos.length; i++) { if(t.getId() == threadInfos[i].getThreadId()) { t.interrupt(); } } } } try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } } } }; public static void check() { Thread t = new Thread(deadlockChecker); t.setDaemon(true); t.start(); } } }
超时不能获得锁,就返回false,不会永久等待构成死锁使用lock.tryLock(long timeout, TimeUnit unit)来实现可限时锁,参数为时间和单位。
package test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("get lock failed"); } } catch (Exception e) { } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { Test t = new Test(); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t1.start(); t2.start(); } }
get lock failed
public ReentrantLock(boolean fair) public static ReentrantLock fairLock = new ReentrantLock(true);
1.2 Condition
Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/signal() await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { Test t = new Test(); Thread thread = new Thread(t); thread.start(); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } }
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Test implements Runnable { final Semaphore semaphore = new Semaphore(5); @Override public void run() { try { semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + " done"); } catch (Exception e) { e.printStackTrace(); }finally { semaphore.release(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++) { executorService.submit(t); } } }
有一个20个线程的线程池,每个线程都去 Semaphore的许可,Semaphore的许可只有5个,运行后可以看到,5个一批,一批一批地输出。
public void acquire(int permits) throws InterruptedException
1.4 ReadWriteLock
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock();
1.5 CountDownLatch
倒数计时器一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程,等待所有检查线程全部完工后,再执行
static final CountDownLatch end = new CountDownLatch(10); end.countDown(); end.await();
package test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test implements Runnable { static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override public void run() { try { Thread.sleep(2000); System.out.println("complete"); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(t); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } }
1.6 CyclicBarrier
和CountDownLatch相似,也是等待某些线程都做完以后再执行。与CountDownLatch区别在于这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,然后接着凑齐下一批10个线程
public CyclicBarrier(int parties, Runnable barrierAction) barrierAction就是当计数器一次计数完成后,系统会执行的动作 await()
package test; import java.util.concurrent.CyclicBarrier; public class Test implements Runnable { private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { //等待所有士兵到齐 cyclic.await(); dowork(); //等待所有士兵完成工作 cyclic.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void dowork() { // TODO Auto-generated method stub try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println(soldier + ": done"); } public static class BarrierRun implements Runnable { boolean flag; int n; public BarrierRun(boolean flag, int n) { super(); this.flag = flag; this.n = n; } @Override public void run() { if (flag) { System.out.println(n + "个任务完成"); } else { System.out.println(n + "个集合完成"); flag = true; } } } public static void main(String[] args) { final int n = 10; Thread[] threads = new Thread[n]; boolean flag = false; CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n)); System.out.println("集合"); for (int i = 0; i < n; i++) { System.out.println(i + "报道"); threads[i] = new Thread(new Test("士兵" + i, barrier)); threads[i].start(); } } }
集合 0报道 1报道 2报道 3报道 4报道 5报道 6报道 7报道 8报道 9报道 10个集合完成 士兵5: done 士兵7: done 士兵8: done 士兵3: done 士兵4: done 士兵1: done 士兵6: done 士兵2: done 士兵0: done 士兵9: done 10个任务完成
1.7 LockSupport
LockSupport.park(); LockSupport.unpark(t1);
与suspend相比 不容易引起线程冻结LockSupport的思想呢,和 Semaphore有点相似,内部有一个许可,park的时候拿掉这个许可,unpark的时候申请这个许可。所以如果unpark在park之前,是不会发生线程冻结的。
下面的代码是[高并发Java 二] 多线程基础中suspend示例代码,在使用suspend时会发生死锁。
package test; import java.util.concurrent.locks.LockSupport; public class Test { static Object u = new Object(); static TestSuspendThread t1 = new TestSuspendThread("t1"); static TestSuspendThread t2 = new TestSuspendThread("t2"); public static class TestSuspendThread extends Thread { public TestSuspendThread(String name) { setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); //Thread.currentThread().suspend(); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); // t1.resume(); // t2.resume(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } }
而使用 LockSupport则不会发生死锁。
public static void park() { unsafe.park(false, 0L); }
1.8 ReentrantLock 的实现
- CAS状态
- 等待队列
- park()
/** * The synchronization state. */ private volatile int state;
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
2. 并发容器及典型源码分析
2.1 ConcurrentHashMap
public static Map m=Collections.synchronizedMap(new HashMap());
private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { if (m==null) throw new NullPointerException(); this.m = m; mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
在 ConcurrentHashMap内部有一个Segment段,它将大的HashMap切分成若干个段(小的HashMap),然后让数据在每一段上Hash,这样多个线程在不同段上的Hash操作一定是线程安全的,所以只需要同步同一个段上的线程就可以了,这样实现了锁的分离,大大增加了并发量。在使用ConcurrentHashMap.size时会比较麻烦,因为它要统计每个段的数据和,在这个时候,要把每一个段都加上锁,然后再做数据统计。这个就是把锁分离后的小小弊端,但是size方法应该是不会被高频率调用的方法。在实现上,不使用synchronized和lock.lock而是尽量使用trylock,同时在HashMap的实现上,也做了一点优化。这里就不提了。
2.2 BlockingQueue
