该帖已经被评为良好帖
|
|
---|---|
作者 | 正文 |
发表时间:2010-07-14
最后修改:2010-07-14
其实性能还可以再提高的,Cyclicbarrier运算完毕后都要await是很讨厌的。
应该使用CompletionService,然后这样 long count = 0L; for(int i=0;i<segCount;i++){ count += cs.take().get(); } |
|
返回顶楼 | |
发表时间:2010-07-14
package com.wl.test.concurrent.semaphore; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** * * 有一个很大的整数list,需要求这个list中所有整数的和,写一个可以充分利用多核CPU的代码,来计算结果。 * * * 乍一看到题目“充分利用多核CPU”, * 以为会根据CPU的核心数,计算出比较合理的任务线程的数目。 * 就题目的计算目标来说,实际上讲的主线程等待任务线程完成, * 任务线程之间没有必要互相等待。 * 是不是可以考虑用信号来...... * 每次来看帖,都发现大家图画的很拉风…… * * 我也帖个代码,虽然来晚了,大家不一定能看到…… 加锁计数,原来放在任务线程中的,现在那到外面来了 * * @author HardPass * */ public class BigListSumWithSemaphore { private List<Integer> list = new ArrayList<Integer>(); private AtomicLong sum = new AtomicLong(0L); // 结果 private int threadCounts = 2 * Runtime.getRuntime().availableProcessors() + 1; // 任务线程数 private List<Runnable> tasks = new ArrayList<Runnable>(); // 任务线程 SemaphoreLock lock = new SemaphoreLock(threadCounts);// 初始化锁计数 改变的地方 public static void main(String[] args) { //测试100次 for (int i = 0; i < 100; i++) { long l = new BigListSumWithSemaphore().test(); if (l!=500000500000L) System.out.println("___________________________________________________________________________________________________________________________________"); } } public long test() { init(); System.out.println("---" + threadCounts); ExecutorService exec = Executors.newFixedThreadPool(threadCounts); //线程池 for(Runnable task : tasks){ exec.execute(task); } lock.lockHere(); // 此处尝试wait exec.shutdown(); System.out.println("List中所有整数的和为:" + sum); return sum.get(); } private void init() { for (int i = 1; i <= 1000000; i++) { list.add(i); } int len = list.size() / threadCounts; int i = 0; for (; i < threadCounts - 1; i++) { tasks.add(new Task(list.subList(i * len, (i + 1) * len))); } tasks.add(new Task(list.subList(i * len, list.size()))); } private class Task implements Runnable { private List<Integer> subList; public Task(List<Integer> subList) { this.subList = subList; } @Override public void run() { //lock.lockThere(); // 增加锁的计数 long subSum = 0L; for (Integer i : subList) { subSum += i; } sum.addAndGet(subSum); System.out.println("分配给线程:" + Thread.currentThread().getName() + "那一部分List的整数和为:\tSubSum:" + subSum); lock.release(); // 释放一个锁的计数 } } } /** * * 信号量锁 * * 此Semaphore非java.util.concurrent.Semaphore * * @author HardPass * */ class SemaphoreLock { private int count = 0; // 信号量 SemaphoreLock(int count){ this.count = count; } /** * 信号量大于0的时候 wait * 这是不是传说中的可重入? */ public synchronized void lockHere() { while (count > 0) { try { wait(); } catch (InterruptedException e) { } } } public synchronized void lockThere() { count++; } public synchronized void release() { count--; if(count==0){ notify(); } } } |
|
返回顶楼 | |
发表时间:2010-07-14
最后修改:2010-07-14
skzr.org 写道 使用Gedit编辑的,难免有语法错误,呵呵
我的一个解法: public class MyWorkThread extends Thread { private static BigDecimal sum; private List<Integer> list; private int start, end; private long value; public static BigDecimal getSum() { return sum; } public static synchronized void addSum(long v) { if (sum == null) { sum = new BigDecimal(v); } else { sum.add(BigDecimal.valueOf(v)); } } public MyWorkThread(List<Integer> list, Integer start, Integer end) { this.list = list; this.start = start; this.end = end; } private void add(int v) { if (Long.MAX_VALUE - v > value) { value += v; } else { addSum(value); value = v; } } public void run() { for(int i = start; i < end; i++) add(list.get(i)); } public static void main(String[] args) throws InterruptedException { List<Integer> list = new ArrayList<Integer>(); int cpuCoreSize = 2; int len = list.size() / cpuCoreSize; int start = 0, end = len; for (;;) { end = start + len; if (end > list.size()) end = list.size(); new MyWorkThread(list, start, end).start(); start = end; if (start == list.size()) break; } [color=red]Thread.currentThread().join();[/color] System.out.println("和为:" + MyWorkThread.getSum()); } } 52行Thread.currentThread().join();朋友有个地方不懂,这里Thread.currentThread()是主线程吧,那join()方法就是是主线程等待主线程执行完成,这不是抛出InterruptedException吗 |
|
返回顶楼 | |
发表时间:2010-07-14
kakaluyi 写道 skzr.org 写道 使用Gedit编辑的,难免有语法错误,呵呵
我的一个解法: public class MyWorkThread extends Thread { private static BigDecimal sum; private List<Integer> list; private int start, end; private long value; public static BigDecimal getSum() { return sum; } public static synchronized void addSum(long v) { if (sum == null) { sum = new BigDecimal(v); } else { sum.add(BigDecimal.valueOf(v)); } } public MyWorkThread(List<Integer> list, Integer start, Integer end) { this.list = list; this.start = start; this.end = end; } private void add(int v) { if (Long.MAX_VALUE - v > value) { value += v; } else { addSum(value); value = v; } } public void run() { for(int i = start; i < end; i++) add(list.get(i)); } public static void main(String[] args) throws InterruptedException { List<Integer> list = new ArrayList<Integer>(); int cpuCoreSize = 2; int len = list.size() / cpuCoreSize; int start = 0, end = len; for (;;) { end = start + len; if (end > list.size()) end = list.size(); new MyWorkThread(list, start, end).start(); start = end; if (start == list.size()) break; } [color=red]Thread.currentThread().join();[/color] System.out.println("和为:" + MyWorkThread.getSum()); } } 52行Thread.currentThread().join();朋友有个地方不懂,这里Thread.currentThread()是主线程吧,那join()方法就是是主线程等待主线程执行完成,这不是抛出InterruptedException吗 不会,只是会活锁。你可以在main方法里面试试。 |
|
返回顶楼 | |
发表时间:2010-07-15
mercyblitz 写道 kakaluyi 写道 skzr.org 写道 使用Gedit编辑的,难免有语法错误,呵呵
我的一个解法: public class MyWorkThread extends Thread { private static BigDecimal sum; private List<Integer> list; private int start, end; private long value; public static BigDecimal getSum() { return sum; } public static synchronized void addSum(long v) { if (sum == null) { sum = new BigDecimal(v); } else { sum.add(BigDecimal.valueOf(v)); } } public MyWorkThread(List<Integer> list, Integer start, Integer end) { this.list = list; this.start = start; this.end = end; } private void add(int v) { if (Long.MAX_VALUE - v > value) { value += v; } else { addSum(value); value = v; } } public void run() { for(int i = start; i < end; i++) add(list.get(i)); } public static void main(String[] args) throws InterruptedException { List<Integer> list = new ArrayList<Integer>(); int cpuCoreSize = 2; int len = list.size() / cpuCoreSize; int start = 0, end = len; for (;;) { end = start + len; if (end > list.size()) end = list.size(); new MyWorkThread(list, start, end).start(); start = end; if (start == list.size()) break; } [color=red]Thread.currentThread().join();[/color] System.out.println("和为:" + MyWorkThread.getSum()); } } 52行Thread.currentThread().join();朋友有个地方不懂,这里Thread.currentThread()是主线程吧,那join()方法就是是主线程等待主线程执行完成,这不是抛出InterruptedException吗 不会,只是会活锁。你可以在main方法里面试试。 经过验证,不是活锁,证明了我的担心,是个死锁 public static void main(String args[]) { try { Thread.currentThread().join(); System.out.println("ok"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } ok永远打印不出来,主线程等待所有子线程结束的方法是不是酱紫地,前面一个同学说了一个方法 atominit方法才是正解 |
|
返回顶楼 | |
发表时间:2010-07-15
昨天刚刚看了concurrent包的ConcurrentHashMap的分析,也是使用分段来进行并发,读操作允许并发,修改操作加锁
|
|
返回顶楼 | |
发表时间:2010-07-15
这种题目用java做真是太多代码了,用clojure只要几行:
(defn mysum [coll n] (let [sub-colls (partition n n [0] coll) result-coll (map #(future (reduce + 0 %)) sub-colls)] (reduce #(+ %1 @%2) 0 result-coll))) |
|
返回顶楼 | |
发表时间:2010-07-15
dennis_zane 写道
这种题目用java做真是太多代码了,用clojure只要几行:
(defn mysum [coll n] (let [sub-colls (partition n n [0] coll) result-coll (map #(future (reduce + 0 %)) sub-colls)] (reduce #(+ %1 @%2) 0 result-coll)))
真够简单的,用erlang代码也很少 |
|
返回顶楼 | |
发表时间:2010-07-15
kakaluyi 写道 mercyblitz 写道 kakaluyi 写道 skzr.org 写道 使用Gedit编辑的,难免有语法错误,呵呵
我的一个解法: public class MyWorkThread extends Thread { private static BigDecimal sum; private List<Integer> list; private int start, end; private long value; public static BigDecimal getSum() { return sum; } public static synchronized void addSum(long v) { if (sum == null) { sum = new BigDecimal(v); } else { sum.add(BigDecimal.valueOf(v)); } } public MyWorkThread(List<Integer> list, Integer start, Integer end) { this.list = list; this.start = start; this.end = end; } private void add(int v) { if (Long.MAX_VALUE - v > value) { value += v; } else { addSum(value); value = v; } } public void run() { for(int i = start; i < end; i++) add(list.get(i)); } public static void main(String[] args) throws InterruptedException { List<Integer> list = new ArrayList<Integer>(); int cpuCoreSize = 2; int len = list.size() / cpuCoreSize; int start = 0, end = len; for (;;) { end = start + len; if (end > list.size()) end = list.size(); new MyWorkThread(list, start, end).start(); start = end; if (start == list.size()) break; } [color=red]Thread.currentThread().join();[/color] System.out.println("和为:" + MyWorkThread.getSum()); } } 52行Thread.currentThread().join();朋友有个地方不懂,这里Thread.currentThread()是主线程吧,那join()方法就是是主线程等待主线程执行完成,这不是抛出InterruptedException吗 不会,只是会活锁。你可以在main方法里面试试。 经过验证,不是活锁,证明了我的担心,是个死锁 public static void main(String args[]) { try { Thread.currentThread().join(); System.out.println("ok"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } ok永远打印不出来,主线程等待所有子线程结束的方法是不是酱紫地,前面一个同学说了一个方法 atominit方法才是正解 Thread.currentThread().join(); 莫非是传说中的永久sleep,必须等待别人来打断? |
|
返回顶楼 | |
发表时间:2010-07-15
dennis_zane 写道 这种题目用java做真是太多代码了,用clojure只要几行:
(defn mysum [coll n] (let [sub-colls (partition n n [0] coll) result-coll (map #(future (reduce + 0 %)) sub-colls)] (reduce #(+ %1 @%2) 0 result-coll))) 传说中的map-redurce |
|
返回顶楼 | |