- 浏览: 602554 次
- 性别:
- 来自: 厦门
文章分类
- 全部博客 (669)
- oracle (36)
- java (98)
- spring (48)
- UML (2)
- hibernate (10)
- tomcat (7)
- 高性能 (11)
- mysql (25)
- sql (19)
- web (42)
- 数据库设计 (4)
- Nio (6)
- Netty (8)
- Excel (3)
- File (4)
- AOP (1)
- Jetty (1)
- Log4J (4)
- 链表 (1)
- Spring Junit4 (3)
- Autowired Resource (0)
- Jackson (1)
- Javascript (58)
- Spring Cache (2)
- Spring - CXF (2)
- Spring Inject (2)
- 汉字拼音 (3)
- 代理模式 (3)
- Spring事务 (4)
- ActiveMQ (6)
- XML (3)
- Cglib (2)
- Activiti (15)
- 附件问题 (1)
- javaMail (1)
- Thread (19)
- 算法 (6)
- 正则表达式 (3)
- 国际化 (2)
- Json (3)
- EJB (3)
- Struts2 (1)
- Maven (7)
- Mybatis (7)
- Redis (8)
- DWR (1)
- Lucene (2)
- Linux (73)
- 杂谈 (2)
- CSS (13)
- Linux服务篇 (3)
- Kettle (9)
- android (81)
- protocol (2)
- EasyUI (6)
- nginx (2)
- zookeeper (6)
- Hadoop (41)
- cache (7)
- shiro (3)
- HBase (12)
- Hive (8)
- Spark (15)
- Scala (16)
- YARN (3)
- Kafka (5)
- Sqoop (2)
- Pig (3)
- Vue (6)
- sprint boot (19)
- dubbo (2)
- mongodb (2)
最新评论
多个线程实现累加
例子 2
3.使用线程处理项目中的需求
需求:三个线程并行处理三个SQL语句查询
线程类
XXXDAO代码片段
线程池实现类(实现并发执行的关键)
多线程调用多线程需要注意的是子线程的线程池需要调大到大于8否则会导致不同步
FutureTask不会阻塞主线程
package com.thread; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author:xxx * @TODO:使用callable进行自增 */ public class AddCallable { public static void main(String[] args) { long begin = System.currentTimeMillis(); ExecutorService threadPool = Executors.newCachedThreadPool(); AddTask task01 = new AddTask("Task-01", 1, 50); AddTask task02 = new AddTask("Task-02", 51, 100); //得到的结果集 Future<Long> resultSet01 = threadPool.submit(task01); Future<Long> resultSet02 = threadPool.submit(task02); try { System.out.println("最后的结果是:"+(resultSet01.get()+resultSet02.get())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally{ threadPool.shutdown(); } System.out.println(System.currentTimeMillis() - begin); } }; /** * @author:xxx * @TODO:泛型中的Long表示返回的类型 */ class AddTask implements Callable<Long> { private String name; private long begin; private long end; public AddTask(String name, long begin, long end) { this.name = name; this.begin = begin; this.end = end; } @Override public Long call() throws Exception { System.out.println(Thread.currentThread().getName()); System.out.println(name + " 执行中......."); long sum = 0; for (long i = begin; i <= end; i++) { sum += i; } return sum; } };
例子 2
/** * */ package com.comtop.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @ProjectName:Skeleton * @PackageName:com.comtop.test * @Verson :0.1 * @CreateUser :lanweixing * @CreateDate :2014-9-3上午9:41:16 * @UseFor : */ public class DoThread { public static void main(String[] args) { List<Num> list = new ArrayList<Num>(); List<Message> result1 = new ArrayList<Message>(); List<Message> result = new ArrayList<Message>(); // ExecutorService pool = Executors.newCachedThreadPool() ; //使用线程池处理 ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 4, 3000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2)); for (int i = 0; i < 1000000; i++) { list.add(new Num(i)); } long begin = System.currentTimeMillis(); try { //1000000条数据5个线程跑 每个线程跑集合中的一部分 for (int i = 0; i < 5; i++) { result1 = pool.submit( new Main4Thread(list, i * 200000, (i + 1) * 200000)) .get(); result.addAll(result1); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } pool.shutdown(); System.out.println("所用时间" + (System.currentTimeMillis() - begin)); System.out.println(result.size()); } }; class Main4Thread implements Callable<List<Message>> { private List<Num> num; private int begin; private int end; /** * @param num */ Main4Thread(List<Num> num, int begin, int end) { super(); this.num = num; this.begin = begin; this.end = end; } @Override public List<Message> call() throws Exception { List<Message> msgList = new ArrayList<Message>(); for (int i = begin; i < end; i++) { if (num.get(i).getAge() >= 500000) { //线程逻辑处理 Message msg = new Message(); msg.setMsg(num.get(i).getAge() + " Too old"); msgList.add(msg); } } System.out.println(Thread.currentThread().getName()); return msgList; } }; class Message { private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }; class Num { private int age; /** * @param age */ Num(int age) { super(); this.age = age; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } };
3.使用线程处理项目中的需求
需求:三个线程并行处理三个SQL语句查询
线程类
@Scope("prototype") //线程类需要注解到spring中 不然无法使用具体DAO操作 @Service public class MainExcutor implements Callable<List<XXXVO>> { // 日志记录 private final static Logger logger = LoggerFactory .getLogger(MainExcutor.class); @Resource private XXXDAO xxxDAO; private XXXVO xxxVO; public XXXVO getXXXVO() { return xxxVO; } public void setXXXVO(XXXVO xxxVO) { this.xxxVO = xxxVO; } MainExcutor() { super(); } @Override public List<XXXVO> call() throws Exception { logger.info(" 线程查询数据中。。。"); List<XXXVO> resultList = new ArrayList<XXXVO>(); resultList = this.xxxDAO .queryXXXData(xxxVO); logger.info(" 线程查询数据结束。。。"); return resultList; } };
XXXDAO代码片段
//三个线程实例 @Resource private MainExcutor mainExcutor1; @Resource private MainExcutor mainExcutor2; @Resource private MainExcutor mainExcutor3; Map<String, Callable<List<XXXVO>>> taskMap = new HashMap<String, Callable<List<XXXVO>>>(); // 三个参数对象各不相同 BeanCopier beanCopier = BeanCopier.create(XXXVO.class, XXXVO.class, false); XXXVO xxxVO02 = new XXXVO(); XXXVO xxxVO03 = new ElectricTopicVO(); beanCopier.copy(xxxVO, xxxVO02, null); beanCopier.copy(xxxVO, xxxVO03 , null); // 1 2 3 xxxVO.setType("1"); this.mainExcutor1.setXXXVO(xxxVO); taskMap.put("Task1", this.mainExcutor1); xxxVO02.setType("2"); this.mainExcutor2.setXXXVO(xxxVO02); taskMap.put("Task2", this.mainExcutor2); xxxVO03 .setType("3"); this.mainExcutor3.setXXXVO(xxxVO03); taskMap.put("Task3", this.mainExcutor3); MessageThreadPoolExecutor executor = new MessageThreadPoolExecutor(3, 4, 3000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4)); try { Map<String, List<XXXVO>> resultMap = executor .performTasks(taskMap); for (String key : resultMap.keySet()) { tempList = resultMap.get(key); resultList.addAll(tempList); } } catch (InterruptedException e1) { e1.printStackTrace(); }
线程池实现类(实现并发执行的关键)
class MessageThreadPoolExecutor extends ThreadPoolExecutor { // 把父类的构造函数全弄出来算了。。。 public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } // 线程池主要执行方法 public <T> Map<String, T> performTasks(Map<String, Callable<T>> taskMap) throws InterruptedException { // 无任务集合 if (taskMap == null || taskMap.isEmpty()) { throw new NullPointerException(); } Map<String, Future<T>> futureMap = new HashMap<String, Future<T>>(); Map<String, T> messageMap = new HashMap<String, T>(); boolean done = false; try { for (String key : taskMap.keySet()) { futureMap.put(key, submit(taskMap.get(key))); } for (String key : futureMap.keySet()) { Future<T> f = futureMap.get(key); try { T result = f.get(); messageMap.put(key, result); } catch (ExecutionException e) { System.out.println(e.getMessage()); } } done = true; return messageMap; } finally { if (!done) { for (String key : futureMap.keySet()) { futureMap.get(key).cancel(true); } } this.shutdown(); } } };
多线程调用多线程需要注意的是子线程的线程池需要调大到大于8否则会导致不同步
import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public class Test implements Callable<String> { private static final MainPoolExecutor executor = new MainPoolExecutor(3, 4, 3000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>( 10)); private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public Test() { super(); } public Test(String name) { this.name = name; } @Override public String call() throws Exception { System.out.println(name + " 开始!"); Thread.sleep(1000); System.out.println(name + " 结束!"); return null; } public static void main(String[] args) { SuperTest[] testArr = new SuperTest[] { new SuperTest("父线程1"), new SuperTest("父线程2"), new SuperTest("父线程3") }; Map<String, Callable<String>> taskMap = new HashMap<String, Callable<String>>(); try { for (SuperTest test : testArr) { taskMap.put(String.valueOf(Math.random()), test); } executor.performTasks(taskMap); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } } }; class SuperTest implements Callable<String> { // 一共有9个线程所以 corePoolSize 和 maximumPoolSize得设置成大于等于9的数字 否则不同步 private static final MainPoolExecutor executor = new MainPoolExecutor(9, 9, 3000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>( 10)); private String name; public String getName() { return name; } public SuperTest() { super(); } public SuperTest(String name) { this.name = name; } @Override public String call() { Test[] testArr = new Test[] { new Test(name + " 子线程1"), new Test(name + " 子线程2"), new Test(name + " 子线程3") }; Map<String, Callable<String>> taskMap = new HashMap<String, Callable<String>>(); for (Test test : testArr) { taskMap.put(String.valueOf(Math.random()), test); } try { executor.performTasks(taskMap); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } return null; } }
FutureTask不会阻塞主线程
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * FutureTask线程执行不会阻塞主线程 * @author Test * */ public class NoInterruptTest implements Callable<String> { private long longTime; // 锁对象 需要放外部 否则 private final ReadWriteLock lock =new ReentrantReadWriteLock(); public long getLongTime() { return longTime; } public void setLongTime(long longTime) { this.longTime = longTime; } public NoInterruptTest() { super(); } public NoInterruptTest(long longTime) { this.longTime = longTime; } @Override public String call() throws Exception { // synchronized 用于控制同一个对象的多个线程访问 lock.writeLock().lock(); System.out.println(Thread.currentThread().getName()+" 开始!"); Thread.sleep(longTime); System.out.println(Thread.currentThread().getName()+" 结束!"); lock.writeLock().unlock() ; return "lanweixing"; } public static void main(String[] args) throws Exception{ long n1 = System.currentTimeMillis() ; NoInterruptTest callable1 = new NoInterruptTest(3000); NoInterruptTest callable2 = new NoInterruptTest(3000); ExecutorService executorService = Executors.newCachedThreadPool(); FutureTask<String> task1 = new FutureTask<String>(callable1); FutureTask<String> task2 = new FutureTask<String>(callable1); executorService.execute(task1); executorService.execute(task2); while(true) { if(task1.isDone() && task2.isDone()) { // 获得线程执行结果 System.out.println(task1.get()); System.out.println("任务已处理。"); break ; } } System.out.println((System.currentTimeMillis()-n1)/1000); executorService.shutdown(); } };
发表评论
文章已被作者锁定,不允许评论。
-
java WeakHashMap学习(key是弱引用)
2018-06-21 09:31 1237在Java集合中有一种特殊的Map类型:WeakHashMap ... -
java HashMap TreeMap(key顺序) LinkedHashMap(插入顺序)学习
2018-06-07 10:27 956java为数据结构中的映射定义了一个接口java.util.M ... -
java RESTful 详解
2018-04-27 11:35 646(1)每一个URI代表一种资源,独一无二; (2)客户端 ... -
java 通过HttpsUrlConnection访问接口数据
2018-04-19 11:25 998server: ssl: key-stor ... -
java 使用多线程的场景总结
2018-04-10 14:35 1708在一个高并发的网站中,多线程是必不可少的。下面先说一下多线程在 ... -
java Enum枚举设置
2018-04-10 10:55 482/** * 数据状态:0:无效,1:有效 **/ ... -
java RestTemplate访问restful服务
2018-03-01 15:02 1625REST的基础知识 当谈论REST时,有一种常见的错误就是将其 ... -
java FYOpenApi实现短信发送
2018-01-02 17:10 11811.配置文件 sms.OpenUrl = http://s ... -
java JSONObject序列化包含Date类型数据的Java对象
2017-12-26 16:31 1622如果Date.class无法进行转换则使用Timestamp. ... -
java 用HttpsURLConnection进行传递中文时错误总结
2017-12-07 16:42 656传递中文时需要用Writer而不是OutputStream ... -
java 内存泄漏
2017-11-27 13:51 4981.内存溢出 out of memory ... -
ActiveMQ 三种发送消息方式(同步,异步,单向)
2017-11-17 10:25 2463MQ 发送普通消息有三种实现方式:可靠同步发送、可靠异步发送、 ... -
java Guava ListenableFuture实现线程回调功能
2017-11-14 10:17 1777java Future具有局限性。在实际应用中,当需要下 ... -
java Curator实现分布式锁
2017-09-05 14:39 1092Curator实现分布式锁主要依赖于zookeeper ... -
java Guava工具集学习(强大)
2017-09-05 10:28 436import java.util.Iterator ... -
java CyclicBarrier进行并发编程
2017-08-25 15:44 676CyclicBarrier允许一组线程相互等待达到一个公共的障 ... -
java 几种性能优化的总结
2017-08-23 14:08 3281、使用StringBuilder 一般 ... -
java 使用kyro进行高性能序列化对象和集合
2017-08-23 14:05 2160import java.io.ByteArrayInp ... -
java 对重复电话号码进行排除的优化(排序和前后对比)
2017-08-22 14:14 7951.先对10万数据排序; 2.对比前后两条数据 ; 3.筛 ... -
ActiveMQ 结合Spring进行数据同步
2017-07-19 15:27 585注意事项hibernate配置文件必须设置自动提交否则不能插入 ...
相关推荐
Java使用Callable和Future创建线程操作示例主要介绍了Java使用Callable和Future创建线程操作,结合实例形式分析了java使用Callable接口和Future类创建线程的相关操作技巧与注意事项。 首先,Java 5开始,Java提供了...
这篇教程将深入探讨如何在Java中使用Callable。 Callable接口位于`java.util.concurrent`包下,它的主要方法是`call()`,这个方法相当于Runnable的`run()`,但`call()`可以有返回值和抛出异常。当我们需要在线程中...
基于Java Callable接口实现线程代码实例 Java Callable接口简介 Java Callable接口是Java中的一种特殊接口,用于实现线程的返回值。该接口提供了一个call()方法,该方法可以返回一个值,用于获取线程执行的结果。 ...
在这个例子中,我们定义了一个`MyCallableClass`类实现`Callable`接口,并根据`flag`的值返回不同的结果或抛出异常。主函数中,我们创建了一个固定大小的线程池,然后提交了三个`Callable`任务,并通过`Future`对象...
在Java中,有三种常见的创建线程的方式:继承Thread类、实现Runnable接口以及使用Callable接口。下面将对这三种方式的使用、优缺点进行详细对比。 1. 继承Thread类 这种方式是最直接的创建线程的方法,通过创建...
Java是一种广泛使用的面向对象的编程语言,以其跨平台、高性能和强大的库支持而闻名。"100个Java经典例子后端- Java"这个资源显然旨在帮助开发者通过实践加深对Java的理解,尤其在后端开发领域。下面我们将深入探讨...
Callable 和 Future 是 Java 多线程编程中的两个关键接口,它们在 Java 5 及以上版本引入,以增强并发处理的能力。Callable 接口类似 Runnable,但提供了更丰富的功能,而 Future 对象则用于管理和控制异步计算的...
当你有一个Callable任务并希望将其作为线程执行时,可以创建一个FutureTask实例,然后将这个实例传递给Executor。 下面简要说明它们之间的关系和用法: - Runnable通常用于简单的任务,不关心结果的返回。 - ...
FutureTask扮演了适配器的角色,它组合了一个Callable实例,并通过自己的run()方法调用了Callable的call()方法,实现了Runnable接口的要求。这种设计使得不兼容的接口之间能够协同工作,提高了代码的复用性和灵活性...
在这个例子中,没有显示地使用同步机制,可能需要根据实际情况考虑使用Collections.synchronizedList()或者并发集合如ConcurrentHashMap。 8. **时间单位**:`future.get(1, TimeUnit.SECONDS)` 方法设置超时时间,...
例如,`TestCyclicBarrier`和`TestCyclicBarrier2`可能包含了如何创建和使用`CyclicBarrier`来同步多个任务的实例。 2. **BlockingQueue**:阻塞队列是一种线程安全的数据结构,它支持插入和移除操作,并且在队列满...
Java提供了wait()、notify()和notifyAll()方法进行线程间的通信,但这需要在同步控制块(synchronized)中使用。Java 5引入了BlockingQueue阻塞队列,提供了一种更安全的线程间通信方式,如ArrayBlockingQueue、...
本文将深入探讨如何在Java中使用线程池来查询大量数据,以及这样做的好处和实现方法。 首先,理解线程池的概念至关重要。线程池是一种多线程处理形式,预先创建了若干个线程,当有任务需要执行时,会从线程池中取出...
Java并发编程Callable与Future的应用实例代码 在Java并发编程中,Callable与Future是两个非常重要的概念,它们通常结合使用来实现异步编程。在本文中,我们将详细介绍Callable与Future的应用实例代码,并探究它们在...
在JAVA编程领域,掌握经典实例是提升技能的关键步骤。这些实例涵盖了广泛的编程概念和技术,能够帮助初学者和有经验的开发者巩固基础,提高问题解决能力。"JAVA经典的40个实例"提供了40个精心挑选的编程案例,每个...
Java多线程之Callable接口的实现 Java中的多线程编程是指在一个程序中同时运行多个线程,以提高程序的执行效率和响应速度。在Java中,有两种方式可以实现多线程编程,即继承Thread类和实现Runnable接口。然而,在...
Java多线程是Java编程中的重要概念,它允许程序同时执行多个任务,提高了程序的效率和响应性。在Java中,实现多线程有两种主要方式:通过继承`Thread`类和实现`Runnable`接口。 1. 继承Thread类: 当你需要创建一...
在上述代码中,executorService是ExecutorService的一个实例,它可以管理和控制线程的执行,而Future对象则用来获取Callable任务的执行结果。 了解这些技术对于处理大型数据处理、并发编程和优化代码质量至关重要。...
实例可能包含TCP和UDP通信的例子。 10. **GUI编程**:Java的Swing和JavaFX库可用于创建图形用户界面。实例会指导你创建窗口、按钮、文本框等组件,并响应用户事件。 11. **数据库连接**:Java JDBC(Java Database...
大家都知道Runnable和Callable接口都可以作为其他线程执行的任务,...这个例子并不摘自JDK中Callable接口源码,只是提供一种可行方案,仅供参考!该Demo解压、导入到开发环境即可,测试入口为test包下的MainTest.java