`
cloudeagle_bupt
  • 浏览: 579074 次
文章分类
社区版块
存档分类
最新评论

Executor并发性能对比

 
阅读更多
<span style="font-family: Arial, Helvetica, sans-serif;">import java.util.ArrayList;</span>
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.IntWritable;

public class ArrayTest {

	public static void main(String[] args) {
		long start = System.currentTimeMillis() ;
		ArrayTest blt = new ArrayTest() ;
		List<IntWritable> messages = new ArrayList<IntWritable>();
		
		for(int i =0 ;i<10000; i++) {
			 messages.add(new IntWritable(i)) ;
		}
//		RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();		
//	    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
//	            .newCachedThreadPool();
//	    executor.setMaximumPoolSize(64);
//	    executor.setRejectedExecutionHandler(retryHandler);
//	    
//	     for (IntWritable i : messages) {
//	        executor.execute(blt.new Worker(i));
//	      }
//
//	      executor.shutdown();
//	      try {
//	        executor.awaitTermination(60, TimeUnit.SECONDS);
//	      } catch (Exception e) {
//	        e.printStackTrace() ;
//	      }
		
		ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
		CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
				pool);
		int destSize = 0;
		for(IntWritable i : messages) {
			exchangeResult.submit(blt.new Worker(i));
			destSize++;
		}
		
		int count = 0;
		while (count < destSize) {
			Future<Boolean> f = exchangeResult.poll();
			if (f == null)
				continue;
			count++ ;
		}
	    try {
			pool.awaitTermination(60, TimeUnit.SECONDS);
		} catch (Throwable e) {
			e.printStackTrace();
		}
		System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
	}

	  class Worker implements  Callable<Boolean> {
			IntWritable msg;

			public Worker(IntWritable msg) {
				this.msg = msg;
			}
			
			@Override
			public Boolean call() throws Exception {
				try {
					 int sum = msg.get() ;
				} catch (Exception e) {
					e.printStackTrace() ;
				}
				return true;
			}
		}
	
//	  class Worker implements  Runnable {
//			IntWritable msg;
//
//			public Worker(IntWritable msg) {
//				this.msg = msg;
//			}
//
//			@Override
//			public void run() {
//				try {
//					 int sum = msg.get() ;
//				} catch (Exception e) {
//					e.printStackTrace() ;
//				}
//			}
//		}
	  
}

class RetryRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace() ;
      }
      executor.execute(r);
    }
}



第一种:Last 60186 ms


第二种:


package concurrencyTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.IntWritable;

public class ArrayTest {

	public static void main(String[] args) {
		long start = System.currentTimeMillis() ;
		ArrayTest blt = new ArrayTest() ;
		List<IntWritable> messages = new ArrayList<IntWritable>();
		
		for(int i =0 ;i<10000; i++) {
			 messages.add(new IntWritable(i)) ;
		}
		RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();		
	    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
	            .newCachedThreadPool();
	    executor.setMaximumPoolSize(64);
	    executor.setRejectedExecutionHandler(retryHandler);
	    
	     for (IntWritable i : messages) {
	        executor.execute(blt.new Worker(i));
	      }

	      executor.shutdown();
	      try {
	        executor.awaitTermination(60, TimeUnit.SECONDS);
	      } catch (Exception e) {
	        e.printStackTrace() ;
	      }
		
//		ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
//		CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
//				pool);
//		int destSize = 0;
//		for(IntWritable i : messages) {
//			exchangeResult.submit(blt.new Worker(i));
//			destSize++;
//		}
//		
//		int count = 0;
//		while (count < destSize) {
//			Future<Boolean> f = exchangeResult.poll();
//			if (f == null)
//				continue;
//			count++ ;
//		}
//	    try {
//			pool.awaitTermination(60, TimeUnit.SECONDS);
//		} catch (Throwable e) {
//			e.printStackTrace();
//		}
		System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
	}

//	  class Worker implements  Callable<Boolean> {
//			IntWritable msg;
//
//			public Worker(IntWritable msg) {
//				this.msg = msg;
//			}
//			
//			@Override
//			public Boolean call() throws Exception {
//				try {
//					 int sum = msg.get() ;
//				} catch (Exception e) {
//					e.printStackTrace() ;
//				}
//				return true;
//			}
//		}
	
	  class Worker implements  Runnable {
			IntWritable msg;

			public Worker(IntWritable msg) {
				this.msg = msg;
			}

			@Override
			public void run() {
				try {
					 int sum = msg.get() ;
				} catch (Exception e) {
					e.printStackTrace() ;
				}
			}
		}
	  
}

class RetryRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace() ;
      }
      executor.execute(r);
    }
}


Last 271 ms

可能是前一种写法有问题,导致性能很差,但是奇怪没有报错!

第一种写法,即使用Callable<Boolean>在线程数较少时(<100),没有问题,但是线程数较大,如100,000,则会由于资源不足而阻塞,性能急剧降低。
第二种写法则不存在此问题。


分享到:
评论

相关推荐

    Executor,Executors,ExecutorService比较.docx

    在Java并发编程中,`Executor`、`Executors`和`ExecutorService`是核心组件,它们帮助开发者高效管理线程资源,提高程序的并发性能。理解这三个概念的区别和用途是编写高性能并发程序的关键。 1. **Executor** `...

    并发编程实践,全面介绍基础知识、JVM同步原语、线程安全、低级并发工具、线程安全容器、高级线程协作工具、Executor部分等

    Java并发编程是现代软件开发中的重要领域,尤其在多核处理器普及后,高效地利用并发已成为提高系统性能的关键。本文将全面介绍Java并发编程的基础知识、JVM同步原语、线程安全、低级并发工具、线程安全容器、高级...

    汪文君高并发编程实战视频资源下载.txt

     高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4  高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...

    Java 并发编程实战 中英文+代码示例

    5. **原子变量类**:AtomicInteger、AtomicLong等,它们提供了无锁编程的能力,通过CAS(Compare and Swap,比较并交换)操作实现原子性更新,提高了并发性能。 6. **线程池**:Executor框架是Java并发编程的重要...

    java自带并发框架

    - **并发集合(Concurrent Collections)**:如`ConcurrentHashMap`、`ConcurrentLinkedQueue`等,提供了线程安全的数据结构,减少了对锁的依赖,提高了并发性能。 ### 并发策略 在设计并发程序时,需要考虑以下...

    java并发源码分析之实战编程

    并发容器是另一种提升并发性能的方式,如ArrayList和Vector的对比,ArrayList是非线程安全的,而Vector是线程安全但效率较低。ConcurrentHashMap作为线程安全的哈希表,其采用分段锁策略,提供了高效并发访问。另外...

    Java并发编程全景图.pdf

    在多处理器环境中,非统一内存访问(NUMA)架构、硬件共享内存、内存栅栏和CPU缓存等都是影响并发性能的因素。 13. Java并发编程展望 摩尔定律是衡量硬件发展速度的重要指标,它预言了处理器性能的增长趋势。Java...

    汪文君高并发编程实战视频资源全集

     高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4  高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...

    Java并发编程实战

    如何识别可并行执行的任务,如何提高单线程子系统的响应性,如何确保并发程序执行预期任务,如何提高并发代码的性能和可伸缩性等内容,最后介绍了一些高级主题,如显式锁、原子变量、非阻塞算法以及如何开发自定义的...

    多线程与高并发-电子.pdf

    多线程与高并发是计算机科学中非常重要的两个概念,它们在提高软件程序的性能、响应速度和资源利用率方面起着至关重要的作用。在当今的互联网时代,特别是在产业互联网和5G技术的推动下,多线程和高并发的应用变得...

    《java并发编程艺术》

    《Java并发编程艺术》这本书是Java开发者深入理解多线程...通过阅读《Java并发编程艺术》,开发者可以深入了解Java并发编程的细节,掌握在实际开发中高效、安全地利用多线程的技能,从而提升应用程序的性能和稳定性。

    高并发编程实战1,2,3阶段

    - **线程池管理**:Executor框架、ThreadPoolExecutor类详解。 #### 第二阶段:高级技术与最佳实践 ##### 1. 锁优化技术 - **公平锁与非公平锁**:锁获取策略对比及应用场景分析。 - **自旋锁**:原理、优缺点及...

    Java并发编程与高并发解决方案之并发容器(J.U.C).docx

    它通过将数据分成多个段(segment),每个段有自己的锁,这样可以同时对不同的段进行操作而不会发生冲突,从而提高并发性能。在Java 8及以后版本中,`ConcurrentHashMap`进一步优化了设计,采用了一种更加灵活的分段锁...

    并发编程之一 日常学习笔记

    CAS的优势在于它避免了锁的开销,减少了上下文切换,提高了并发性能。然而,CAS也有其局限性,比如ABA问题。如果一个值从A变为B,然后再变回A,CAS可能会认为没有发生任何变化,从而导致潜在的问题。为了解决这个...

    并发编程 75 道面试题及答案.docx

    Executor 框架可以解决每次执行任务创建线程 new Thread() 比较消耗性能的问题,提供了线程池来管理线程,避免了野线程和线程之间的竞争。 8. Executor、Executors 和 ExecutorService 的区别 Executors 工具类的...

    Tomcat7性能优化.pdf

    - 对比优化前后的性能指标,分析性能提升的具体数值。 #### 五、总结 通过对Tomcat服务器的配置调整,尤其是运行模式的选择和线程池的合理配置,可以有效提升系统的并发处理能力和响应速度。同时,合理规划服务器...

    java并发编程艺术源码-ArtConcurrentBook:《Java并发编程的艺术》

    源码中会包含这些并发集合的使用场景和性能比较。 4. **Executor框架**:Java 5引入了`ExecutorService`、`ThreadPoolExecutor`和`ScheduledExecutorService`,它们简化了线程池的创建和管理。通过源码,我们可以...

    通过多线程编程在Java中发现的并发模式和特性——线程、锁、原子等.zip

    - **读写锁(ReadWriteLock)**:允许多个线程同时读取,但写入时互斥,提高并发性能。 3. **原子操作**: - **java.util.concurrent.atomic包**:包含一系列原子类,如AtomicInteger、AtomicLong等,它们提供了...

Global site tag (gtag.js) - Google Analytics