- 浏览: 1898894 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
July01:
最近了解到一款StratoIO打印控件,功能如下:1、Html ...
jquery打印指定的div -
GentlemanQc:
...
quartz系列(二)spring3.2.5与quartz2.1.7集群版集成简要说明 -
静夜独窗:
你好,能说一下server.xml增加的配置是怎么影响性能的吗 ...
tomcat7.0性能优化-挑战极限精简版 -
beyondfengyu:
beyondfengyu 写道如果每个客户进程的时间不同步,时 ...
java并发(二十二)分布式锁 -
beyondfengyu:
如果每个客户进程的时间不同步,时间超前的进程是不是更容易得到锁 ...
java并发(二十二)分布式锁
本文主要介绍多线程的结果组装。其中可以忽略2处代码创建线程池的区别,请关注其他的业务逻辑代码。全部代码已经在附件中上传。如有疑问,请跟帖留言,笔者会予以答复。信号量相比自旋锁的优点很多,性能、代码简单。自旋锁不停得sleep并唤醒,而信号量的底层采用了wait进行编程,只唤醒一次即可。因此性能优越许多。
【自旋锁】
通常,我们会使用自旋锁进行多线程结果组装。这样的性能非常差。比如,大数据库表的多线程查询,无状态服务器的任务分发后的汇总等。
示例代码如下
【信号量】
代码相比线程轮询更简单,思路清晰,而且可以共用一个线程池来进行全局的控制。当所有线程完成任务的时候,唤醒组装线程进行结果的反馈。
【CompletionService】
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已经完成的结果,而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
【更新历史】
2014-04-26
上传1.1版本,增加CompletionService的接口用法。
2014-05-04
上传1.2版本,重构service,使代码结构更清晰易读。
【自旋锁】
通常,我们会使用自旋锁进行多线程结果组装。这样的性能非常差。比如,大数据库表的多线程查询,无状态服务器的任务分发后的汇总等。
示例代码如下
package com.chinaso.search.spinlocks.service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.TimeUnit; import com.chinaso.search.ServerSemaphore; import com.chinaso.search.dao.DataDao; /** * piaohailin 2014-3-21 */ public class DataService { private final DataDao dataDao = new DataDao(); /** * 多线程查询数据库 * @param userId * @return * @throws Exception */ public List<String> getAllData(final String userId) throws Exception { /** * 其中 第一个参数为初始空闲 * 第二个参数为最大线程 * 第三个参数为超时时间 * 第四个参数是超时时间的单位 * 第五个参数是当超过最大线程数以后,可以放在队列中的线程 * 第六个参数 * 第七个参数是线程池塞满时候的策略 */ int corePoolSize = 2; int maximumPoolSize = 3; long keepAliveTime = 0; TimeUnit unit = TimeUnit.NANOSECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(5); ThreadFactory threadFactory = Executors.defaultThreadFactory(); /** * AbortPolicy 如果总线成熟超过maximumPoolSize + workQueue * ,则跑异常java.util.concurrent.RejectedExecutionException */ RejectedExecutionHandler handler = new AbortPolicy(); // ExecutorService 为线程池的接口 ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); List<Future<List<String>>> futures = new ArrayList<Future<List<String>>>(); final int count = dataDao.getCount(userId); // 总记录数 System.out.println("count=" + count); // 如果总记录数小于设置的阈值,就直接单线程查询 int threadCount = ServerSemaphore.threadCount; if (count < ServerSemaphore.hold) { threadCount = 1; } int section = count / threadCount; // 区间大小 // 创建线程 for (int i = 0; i < threadCount; i++) { final int begin = i * section; final int end; // 最后一个区间判断 if ((i + 1) == threadCount) { end = count; } else { end = (i + 1) * section; } System.out.print("begin=" + begin); System.out.print(",end=" + end); System.out.println(",size=" + (end - begin)); // 根据总记录数count和线程数Server.threadCount进行分页任务分发 Future<List<String>> future = executor.submit(new Callable<List<String>>() { @Override public List<String> call() throws Exception { List<String> data = new ArrayList<String>(); try { data = dataDao.find(userId, begin, end); } catch (Throwable t) { t.printStackTrace(); } return data; } }); futures.add(future); } this.waitForDone(futures); // 结果的组装 List<String> reuslt = new ArrayList<String>(); for (Future<List<String>> future : futures) { List<String> tmp = future.get(); reuslt.addAll(tmp); } return reuslt; } private void waitForDone(List<Future<List<String>>> futures) { boolean done = false; while (!done) { done = true; for (Future<List<String>> future : futures) { future.isDone(); if (!future.isDone()) { done = false; try { Thread.sleep(50); } catch (Exception e) { e.printStackTrace(); } break; } } } } }
【信号量】
代码相比线程轮询更简单,思路清晰,而且可以共用一个线程池来进行全局的控制。当所有线程完成任务的时候,唤醒组装线程进行结果的反馈。
package com.chinaso.search.semaphore.service; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import com.chinaso.search.ServerSemaphore; import com.chinaso.search.dao.DataDao; import com.chinaso.search.semaphore.concurrent.DataSemaphore; /** * piaohailin * 2014-3-22 */ public class DataService { private Executor executor = Executors.newFixedThreadPool(4); // 共用线程池,是为了从全局角度,叫多线程可控 private final DataDao dataDao = new DataDao(); /** * 多线程查询数据库 * @param userId * @return * @throws Exception */ public List<String> getAllData(final String userId) throws Exception { final DataSemaphore semaphore = new DataSemaphore(0); final int count = dataDao.getCount(userId); // 总记录数 System.out.println("count=" + count); // 如果总记录数小于设置的阈值,就直接单线程查询 int threadCount = ServerSemaphore.threadCount; if (count < ServerSemaphore.hold) { threadCount = 1; } int section = count / threadCount; // 区间大小 // 创建线程 for (int i = 0; i < threadCount; i++) { final int begin = i * section; final int end; // 最后一个区间判断 if ((i + 1) == threadCount) { end = count; } else { end = (i + 1) * section; } System.out.print("begin=" + begin); System.out.print(",end=" + end); System.out.println(",size=" + (end - begin)); // 根据总记录数count和线程数Server.threadCount进行分页任务分发 executor.execute(new Runnable() { @Override public void run() { try { List<String> data = dataDao.find(userId, begin, end); semaphore.fillData(data); } catch (Throwable t) { t.printStackTrace(); } finally { // 执行成功后,发放授权 semaphore.release(); } } }); } semaphore.acquire(threadCount); // 等待授权数量满足条件,放行 return semaphore.getResult(); } }
【CompletionService】
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已经完成的结果,而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
package com.chinaso.search.completionservice.service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; import com.chinaso.search.ServerSemaphore; import com.chinaso.search.dao.DataDao; import com.chinaso.search.semaphore.concurrent.DataSemaphore; /** * piaohailin * 2014-3-22 */ public class DataService { private Executor executor = Executors.newCachedThreadPool(); // 共用线程池,是为了从全局角度,叫多线程可控 private final DataDao dataDao = new DataDao(); /** * 多线程查询数据库 * * @param userId * @return * @throws Exception */ public List<String> getAllData(final String userId) throws Exception { final int count = dataDao.getCount(userId); // 总记录数 System.out.println("count=" + count); // 如果总记录数小于设置的阈值,就直接单线程查询 int threadCount = ServerSemaphore.threadCount; if (count < ServerSemaphore.hold) { threadCount = 1; } int section = count / threadCount; // 区间大小 // 创建线程 CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executor);// 多线程任务管理 for (int i = 0; i < threadCount; i++) { final int begin = i * section; final int end; // 最后一个区间判断 if ((i + 1) == threadCount) { end = count; } else { end = (i + 1) * section; } System.out.print("begin=" + begin); System.out.print(",end=" + end); System.out.println(",size=" + (end - begin)); // 根据总记录数count和线程数Server.threadCount进行分页任务分发 completionService.submit(new Callable<List<String>>() { @Override public List<String> call() throws Exception { List<String> data = null; try { data = dataDao.find(userId, begin, end); } catch (Throwable t) { t.printStackTrace(); } finally { if (data == null) { data = new ArrayList<String>(); } } return data; } }); } List<String> result = new ArrayList<String>(); for (int i = 0; i < threadCount; i++) { result.addAll(completionService.take().get()); // 取得结果,如果没有返回,则阻塞 } return result; } }
【更新历史】
2014-04-26
上传1.1版本,增加CompletionService的接口用法。
2014-05-04
上传1.2版本,重构service,使代码结构更清晰易读。
- multi_thread_dao.zip (8.7 KB)
- 描述: 1.0
- 下载次数: 29
- multi_thread_dao.zip (11.1 KB)
- 描述: 1.1
- 下载次数: 24
- multi_thread_dao.zip (11.2 KB)
- 下载次数: 46
发表评论
-
java并发(三十四)协程kilim
2015-10-02 11:29 5650概述 对协程的技术已经觊觎很久,他有高性能的优点,但目前工具对 ... -
java并发(三十三)栅栏CyclicBarrier
2015-09-12 16:09 3381如果说CountDownLatch(闭锁)是一次性的, ... -
java并发(三十二)非阻塞算法
2014-06-25 14:08 1338如果在某算法中,一个线程的失败或挂起不会导致其他线程也 ... -
java并发(三十一)Amdahl定律
2014-05-19 16:15 2225阿姆达尔定律 阿姆达尔(Amdahl)定律是计算机系统设计的重 ... -
java并发(三十)闭锁CountDownLatch
2014-05-07 23:33 908CountDownLatch,一个同步辅助类,在完成一组正在其 ... -
java并发(二十九)构建高效且可伸缩的结果缓存
2014-04-23 11:52 1092概述 几乎所有应用程序,都会使用某种形式的缓存。重用之 ... -
java并发(二十八)并发随机数,原子变量,并发集合
2014-04-13 12:04 4091原子变量 java.util.concurrent.a ... -
java并发(二十七) 并发性标注
2014-04-07 10:49 8992一介绍 <dependency> < ... -
java并发(二十六)正确使用Volatile变量
2014-03-30 19:41 1400概述 您只能在有限的一 ... -
java并发(二十五)java7之fork-join框架
2014-03-26 14:12 11518如果让我个人理解什么 ... -
java并发(二十三)阻塞、非阻塞、同步、异步
2014-03-14 17:01 1449因为中文语意的问题, ... -
java并发(二十二)分布式锁
2014-03-12 16:24 18738Redis有一系列的命令, ... -
java并发(二十一)剖析同步器
2014-03-11 18:03 1205虽然许多同步器(如锁,信号量,阻塞队列等)功能上各不相同,但它 ... -
java并发(二十)线程池
2014-03-10 15:02 3935基本介绍 线程池(Thread Pool)对于限制应用程序中同 ... -
java并发(十九)阻塞队列
2014-03-10 14:46 1353阻塞队列与普通队列的 ... -
java并发(十八)信号量
2014-03-10 14:03 1281Semaphore(信号量) 是一个线程同步结构,用于在线程间 ... -
java并发(十七)重入锁死
2014-03-10 11:33 1138重入锁死与死锁和嵌套管程锁死非常相似。当一个线程重新获取锁,读 ... -
java并发(十六)Java中的读/写锁
2014-03-10 10:17 1230相比Java中的锁(Locks in Ja ... -
java并发(十五)Java中的锁
2014-03-09 12:07 958锁像synchronized同步块一样,是一种线程同步机制,但 ... -
java并发(十四)Slipped Conditions
2014-03-09 11:28 1079所谓Slipped conditions,就 ...
相关推荐
本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil.java`和`BatchDataRunnable.java`可能涉及的关键知识点。 ### 1. 多线程并发查询 多线程并发查询允许我们将一...
"13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下是这些资源所涵盖的关键知识点: 1. **多线程基础** - "03 多线程开发如此简单—...
设计采用多线程技术,开启多个线程同时处理多个用户的下载请求,提高系统的并发处理能力和吞吐量。同时采用线程安全技术,通过锁的使用避免多个线程同时修改同一资源,保证数据的一致性和准确性。 具体实现中,设计了...
此外,JAVA的多线程技术也是实现飞鸽传书并发处理的关键。通过Thread或Runnable接口,可以创建多个执行线程,分别处理不同的通信请求,提高系统的并发处理能力。 总的来说,通过分析这个JAVA实现的飞鸽传书源代码,...
**描述**:当多线程环境下尝试修改正在遍历的数据结构时会抛出此异常。 **解决方案**: 1. **使用迭代器的remove方法**:使用Iterator的remove方法安全地删除元素。 2. **使用并发集合类**:如ConcurrentHashMap等...
#### 多线程并发与并行 - **并发**:指一个系统内多个进程或线程交替执行的过程,每个任务都可能被执行一小段时间,然后让位给其他任务,使得多个任务看起来同时运行。 - **并行**:指多个任务在同一时刻执行,通常...
1. **组装应用程序**:将多个组件组合成一个完整的应用程序,包括但不限于EJB Beans、客户端、Applets、Servlets等。 2. **定义部署策略**:确定应用程序的部署方式,比如部署到何种服务器环境、如何处理并发请求等...
在Java中,多线程是实现并发处理的关键。聊天系统通常需要同时处理用户的输入、发送消息、接收消息等多个任务,因此会使用多线程来保证系统的响应速度和效率。 4. **Socket编程**: Java的Socket类提供了在网络中...
此外,为了优化性能,可以考虑使用多线程并发传输数据块,或者使用Java NIO的非阻塞I/O,以提高文件传输的效率。同时,合理设置数据块的大小也很关键,过大可能导致内存压力,过小则可能增加网络开销。 总结来说,...
本篇文章将详细探讨Java的核心知识,包括JVM(Java虚拟机)、多线程、泛型以及Spring框架。 首先,我们来深入理解JVM,它是Java程序运行的基石。JVM负责将字节码解释为机器码执行,提供了一个与硬件无关的运行环境...
9. **多线程处理**:为了提高并发处理能力,Java实现通常会使用多线程技术,分别处理连接、发送、接收和状态报告等任务。 10. **日志记录**:良好的日志记录是调试和排查问题的关键,源代码会包含日志模块,记录与...
总之,"JAVA文件传输(论文+源代码)"提供的资料可以帮助学习者全面理解Java环境下的文件传输技术,包括基础的网络编程、多线程并发、数据流管理和高级特性,对于提升Java网络编程技能和毕业设计能力非常有帮助。...
6. **多线程与并发控制**:银行系统可能面临大量并发请求,因此需要使用Java的线程机制,如`Thread`类、`ExecutorService`和`synchronized`关键字,来处理多线程环境下的任务执行和同步问题。 7. **Spring框架**:...
7. **并发与多线程**:在处理多个预定请求时,Java的并发API(如ExecutorService、Semaphore等)可以保证系统的高效运行。 8. **异常处理**:通过try-catch-finally语句和自定义异常,提高程序的健壮性。 【Spring...
`CompletableFuture`是Java 8引入的一个用于异步编程的重要工具,它的设计目的是解决多步骤计算任务中的依赖管理问题,以提高并发性能。传统的并发编程通常涉及到线程池和同步机制,但`CompletableFuture`提供了一种...
在模拟Tomcat的过程中,了解并实现这些步骤可以帮助我们更好地理解Web服务器的工作流程,特别是Java的多线程模型在其中的应用。同时,标签"java tomcat"提示我们需要关注的是与Java和Tomcat相关的技术,例如Servlet...