`

《Java并发编程》之三:基础构建模块

    博客分类:
  • Java
阅读更多

将线程安全性委托给现有的线程安全类,委托时创建线程安全类一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。

 

5.1  同步容器类

早起JDK的同步容器类包括Vector和Hashtable等,这些同步封装器类是由Collections.synchronizedXxx等工厂方法创建的。

同步容器类的问题是,但有复合操作:迭代、跳转、条件运算的时候,如果有其他线程并发地修改容器的时候,它们可能会出现意料之外的行为。

如果加锁的话,就会牺牲性能和伸缩性。

当容器作为另一个容器的元素或者键值时,就会出现调用hashCode equals等方法,间接调用迭代操作,而可能会产生ConcurrentModificationException异常。同样containsAll、removeAll、retainAll等方法以及把容器作为参数的构造函数都会对容器进行迭代。

 

5.2  并发容器

JDK5提供了多种并发容器来改进同步容器的性能。上面讲的Vector和Hashtable等是同步容器,它们对所有容器状态访问进行串行化,代价是性能严重低下。

而并发容器是针对多个线程并发访问设计的,在JDK5中,ConcurrentHashMap代替同步且基于散列的Map,CopyOnWriteArrayList用于在遍历操作为主要操作情况下代替同步的List。

注:通过并发容器来代替同步容器,可以极大提高伸缩性和性能并且降低风险

 

ConcurrentHashMap

CopyOnWriteArrayList

Queue

BlockingQueue:构建生产者-消费者模式的阻塞队列

LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步的List拥有更好的并发性。

PriorityBlockingQueue是一个按优先级排序队列

SynchronizedQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护一组线程,这些线程在等待着把元素加入或者移出队列。  这种区别相当于将文件直接交给同事(SynchronizedQueue),还是将文件放入她的邮箱并希望她能尽快拿到文件。

 

 将桌面搜索问题分解为:文件遍历和建立索引两个独立操作,使用阻塞队列可简化编程模式:

public class ProducerConsumer {
    static class FileCrawler implements Runnable {
        private final BlockingQueue<File> fileQueue;
        private final FileFilter fileFilter;
        private final File root;

        public FileCrawler(BlockingQueue<File> fileQueue,
                           final FileFilter fileFilter,
                           File root) {
            this.fileQueue = fileQueue;
            this.root = root;
            this.fileFilter = new FileFilter() {
                public boolean accept(File f) {
                    return f.isDirectory() || fileFilter.accept(f);
                }
            };
        }

        private boolean alreadyIndexed(File f) {
            return false;
        }

        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries)
                    if (entry.isDirectory())
                        crawl(entry);
                    else if (!alreadyIndexed(entry))
                        fileQueue.put(entry);
            }
        }
    }

    static class Indexer implements Runnable {
        private final BlockingQueue<File> queue;

        public Indexer(BlockingQueue<File> queue) {
            this.queue = queue;
        }

        public void run() {
            try {
                while (true)
                    indexFile(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void indexFile(File file) {
            // Index the file...
        }
    }

    private static final int BOUND = 10;
    private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

    public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        for (File root : roots)
            new Thread(new FileCrawler(queue, filter, root)).start();

        for (int i = 0; i < N_CONSUMERS; i++)
            new Thread(new Indexer(queue)).start();
    }
}

 爬虫程序负责往队列中添加File,属于生产者,而索引线程负责从队列中拿出File,然后建立索引,属于消费者。所有同步管理完全交由容器,相当的爽啊。

 

对象池利用了串行线程封闭,将对象借给一个请求线程,只要对象池包含足够的内部同步来安全发布池中对象,并且客户端不会发布池中对象或者使用完后返回池中对象,就可以安全地在线程间传递所有权了。

 

Latch的用法:门闩

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

5.5.3  信号量:

计数信号量 counting semaphore用来控制同时访问某个特定资源的操作数量,或同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded)
                sem.release();
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

 

5.5.4  栅栏 Barrier

上面演示的Latch是一次性对象,一旦进入终止状态,就不能被重置。 Barrier跟Latch类似,它能阻塞一组线程直到某个事件发生。Barrier与Latch的区别在于所有线程必须同时到达Barrier位置,才能继续执行。

Latch用来等待事件,而Barrier用来等待其他线程。

Barrier用来实现一些协议,例如几个家庭决定在某个地方集合: 所有人6:00在地铁洛溪站碰头,到了后要等其他人,之后再讨论下一步怎么做。

 

CyclicBarrier在并行迭代算法中非常有用,当线程到达barrier位置时候调用await方法,这个方法阻塞直到所有线程都达到barrier位置。如果所有线程都到达barrier的位置,那么barrier will open and release all threads,and then reset state for the next usage。如果对await调用超时或者await阻塞的线程被中断,那么barrier就被认为打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。

细胞自动衍生模拟:

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable() {
                    public void run() {
                        mainBoard.commitNewValues();
                    }
                });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++)
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
    }

    private class Worker implements Runnable {
        private final Board board;

        public Worker(Board board) {
            this.board = board;
        }

        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++)
                    for (int y = 0; y < board.getMaxY(); y++)
                        board.setNewValue(x, y, computeValue(x, y));
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }

        private int computeValue(int x, int y) {
            // Compute the new value that goes in (x,y)
            return 0;
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++)
            new Thread(workers[i]).start();
        mainBoard.waitForConvergence();
    }

    interface Board {
        int getMaxX();

        int getMaxY();

        int getValue(int x, int y);

        int setNewValue(int x, int y, int value);

        void commitNewValues();

        boolean hasConverged();

        void waitForConvergence();

        Board getSubBoard(int numPartitions, int index);
    }
}

 

5.6  构建高效可伸缩性的结果缓存:

public class Memoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache
            = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw LaunderThrowable.launderThrowable(e.getCause());
            }
        }
    }
}

 有了这个之后,可以在因式分解中使用这个Memoizer来缓存之前的计算结果,不仅高效而且可扩展性更强:

@ThreadSafe
public class Factorizer extends GenericServlet implements Servlet {
    private final Computable<BigInteger, BigInteger[]> c =
            new Computable<BigInteger, BigInteger[]>() {
                public BigInteger[] compute(BigInteger arg) {
                    return factor(arg);
                }
            };
    private final Computable<BigInteger, BigInteger[]> cache
            = new Memoizer<BigInteger, BigInteger[]>(c);

    public void service(ServletRequest req,
                        ServletResponse resp) {
        try {
            BigInteger i = extractFromRequest(req);
            encodeIntoResponse(resp, cache.compute(i));
        } catch (InterruptedException e) {
            encodeError(resp, "factorization interrupted");
        }
    }

    void encodeIntoResponse(ServletResponse resp, BigInteger[] factors) {
    }

    void encodeError(ServletResponse resp, String errorString) {
    }

    BigInteger extractFromRequest(ServletRequest req) {
        return new BigInteger("7");
    }

    BigInteger[] factor(BigInteger i) {
        // Doesn't really factor
        return new BigInteger[]{i};
    }
}

 

本人博客已搬家,新地址为:http://yidao620c.github.io/

分享到:
评论

相关推荐

    Java并发编程-线程安全与基础构建模块

    本文将深入探讨"Java并发编程-线程安全与基础构建模块"这一主题,旨在帮助开发者理解如何有效地处理并发问题,提高程序性能和稳定性。 首先,线程安全是并发编程中的核心概念,指的是多个线程访问同一资源时,无论...

    Java并发编程实战

    第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...

    《java并发编程实战》读书笔记-第5章-基础构建模块

    《java并发编程实战》读书笔记-第3章-对象的共享,脑图形式,使用xmind8制作 包括同步容器类、并发容器类、阻塞队列和生产者消费者模式、阻塞和中断方法、同步工具类。最后是构建高效且可伸缩的结果缓存

    Java 并发编程实战

    第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...

    Java编程模式与范例:基础开发技巧

    Java编程模式与范例:基础开发技巧是针对Java开发者的一本深入学习指南,旨在提升开发者在实际编程中的技能和效率。这本书涵盖了多种重要的编程模式和最佳实践,为初学者和有经验的Java开发者提供了丰富的知识资源。...

    Java并发编程实战2019.zip

    Java并发编程实战,第1章 简介,第2章 线程安全性 第3章 对象的共享 第4章 对象的组合 第5章 基础构建模块 第6章 任务执行 第7章 取消与关闭 第8章 线程池的使用 第9章 图形用户界面应用程序 第10章 避免...

    Java并发编程实战 英文版 Java Concurr

    Java并发编程实战是针对Java并发编程的详细指南,随着多核处理器的普及,多线程并发编程成为了提升应用程序性能的关键技术之一。从Java 5和Java 6版本开始,Java在并发编程方面取得了显著的进步,不仅改进了Java...

    Java并发编程实战-读书笔记

    《Java并发编程实战》个人读书笔记,非常详细: 1 简介 2 线程安全性 3 对象的共享 4 对象的组合 5 基础构建模块 6 任务执行 7 取消与关闭 8 线程池的使用 9 图形用户界面应用程序 10 避免活跃性危险 11 性能与可...

    Java完美编程(第三版)

    通过学习《Java完美编程(第三版)》的这些章节,读者将能够构建坚实的基础,掌握从基础到高级的Java编程技能,并有能力解决实际开发中的各种问题。这本书对于初学者和有经验的开发者都是宝贵的资源,帮助他们在Java...

    JAVA 网络编程之QQ 项目编程实训

    在“JAVA 网络编程之QQ 项目编程实训”中,我们主要关注的是使用Java语言进行网络编程,特别是构建类似QQ这样的实时通讯系统。这个实训项目是学习网络编程的一个优秀起点,它涵盖了多种关键技术和概念,对于初学者...

    完整版core java的课程体系 Java基础入门课件 java编程教程 共353页.ppt

    学习这门课程将帮助你构建坚实的Java编程基础,掌握核心概念和技术,为将来进一步深入学习Java技术和框架,乃至整个软件开发领域打下坚实的基础。同时,了解Java的发展历程也有助于理解其设计理念和在技术领域的影响...

    java 实效编程百例

    16. **并发编程**:Java并发库中的并发工具类,如CountDownLatch、Semaphore、CyclicBarrier等。 17. **Lambda表达式**:Java 8引入的新特性,简化函数式编程。 18. **Stream API**:处理集合的新方式,提供链式...

    Java编程宝典:十年典藏版6

    《Java编程宝典:十年典藏版6》是一本深入探讨Java编程技术的权威著作,旨在为读者提供全面、深入...这本典藏版不仅包含了传统的Java编程知识,还反映了近年来Java技术的发展趋势,是每个Java开发者书架上的必备之作。

    Java核心编程技术

    11. **模块化系统**:学习Java 9引入的模块系统,理解模块的定义、依赖关系和模块化项目的构建。 12. **Lambda表达式和函数式编程**:熟悉Java 8引入的Lambda表达式的语法,以及如何使用Stream API进行函数式编程。...

    Java五年工作经验深刻推荐的书籍,有设计模式,Java并发编程,Spring源码解析,大型网站..等

    《JAVA并发编程实践》这本书深入浅出地讲解了Java并发工具类、线程池、锁机制以及并发容器等内容,帮助开发者规避并发陷阱,提升系统性能。 Spring框架是Java企业级应用的事实标准,其源码解析有助于开发者更深入地...

    Java开发典型模块大全(仅含程序源码)-20个Java项目

    1. **基础模块**:这些项目通常涉及Java基础语法、面向对象编程(OOP)概念,如类、对象、封装、继承和多态。学习者可以在此基础上巩固Java的基础知识。 2. **数据结构与算法**:可能包含链表、栈、队列、树、图等...

    《Java核心技术 卷1 基础知识 原书第8版》 PDF+高清+影印+全书签.rar

    《Java核心技术 卷1 基础知识 原书第8版》是Java开发者必读的经典之作,全面深入地讲解了Java编程的基础概念和技术。这本书涵盖了从Java语言的语法到面向对象编程的核心原理,再到类库的使用,是学习Java开发的权威...

Global site tag (gtag.js) - Google Analytics