<span style="font-family: Arial, Helvetica, sans-serif;">import java.util.ArrayList;</span>
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.IntWritable;
public class ArrayTest {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
ArrayTest blt = new ArrayTest() ;
List<IntWritable> messages = new ArrayList<IntWritable>();
for(int i =0 ;i<10000; i++) {
messages.add(new IntWritable(i)) ;
}
// RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
// ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
// .newCachedThreadPool();
// executor.setMaximumPoolSize(64);
// executor.setRejectedExecutionHandler(retryHandler);
//
// for (IntWritable i : messages) {
// executor.execute(blt.new Worker(i));
// }
//
// executor.shutdown();
// try {
// executor.awaitTermination(60, TimeUnit.SECONDS);
// } catch (Exception e) {
// e.printStackTrace() ;
// }
ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
pool);
int destSize = 0;
for(IntWritable i : messages) {
exchangeResult.submit(blt.new Worker(i));
destSize++;
}
int count = 0;
while (count < destSize) {
Future<Boolean> f = exchangeResult.poll();
if (f == null)
continue;
count++ ;
}
try {
pool.awaitTermination(60, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
}
class Worker implements Callable<Boolean> {
IntWritable msg;
public Worker(IntWritable msg) {
this.msg = msg;
}
@Override
public Boolean call() throws Exception {
try {
int sum = msg.get() ;
} catch (Exception e) {
e.printStackTrace() ;
}
return true;
}
}
// class Worker implements Runnable {
// IntWritable msg;
//
// public Worker(IntWritable msg) {
// this.msg = msg;
// }
//
// @Override
// public void run() {
// try {
// int sum = msg.get() ;
// } catch (Exception e) {
// e.printStackTrace() ;
// }
// }
// }
}
class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace() ;
}
executor.execute(r);
}
}
第一种:Last 60186 ms
第二种:
package concurrencyTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.IntWritable;
public class ArrayTest {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
ArrayTest blt = new ArrayTest() ;
List<IntWritable> messages = new ArrayList<IntWritable>();
for(int i =0 ;i<10000; i++) {
messages.add(new IntWritable(i)) ;
}
RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(64);
executor.setRejectedExecutionHandler(retryHandler);
for (IntWritable i : messages) {
executor.execute(blt.new Worker(i));
}
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace() ;
}
// ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
// CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
// pool);
// int destSize = 0;
// for(IntWritable i : messages) {
// exchangeResult.submit(blt.new Worker(i));
// destSize++;
// }
//
// int count = 0;
// while (count < destSize) {
// Future<Boolean> f = exchangeResult.poll();
// if (f == null)
// continue;
// count++ ;
// }
// try {
// pool.awaitTermination(60, TimeUnit.SECONDS);
// } catch (Throwable e) {
// e.printStackTrace();
// }
System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
}
// class Worker implements Callable<Boolean> {
// IntWritable msg;
//
// public Worker(IntWritable msg) {
// this.msg = msg;
// }
//
// @Override
// public Boolean call() throws Exception {
// try {
// int sum = msg.get() ;
// } catch (Exception e) {
// e.printStackTrace() ;
// }
// return true;
// }
// }
class Worker implements Runnable {
IntWritable msg;
public Worker(IntWritable msg) {
this.msg = msg;
}
@Override
public void run() {
try {
int sum = msg.get() ;
} catch (Exception e) {
e.printStackTrace() ;
}
}
}
}
class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace() ;
}
executor.execute(r);
}
}
Last 271 ms
可能是前一种写法有问题,导致性能很差,但是奇怪没有报错!
第一种写法,即使用Callable<Boolean>在线程数较少时(<100),没有问题,但是线程数较大,如100,000,则会由于资源不足而阻塞,性能急剧降低。
第二种写法则不存在此问题。
分享到:
相关推荐
在Java并发编程中,`Executor`、`Executors`和`ExecutorService`是核心组件,它们帮助开发者高效管理线程资源,提高程序的并发性能。理解这三个概念的区别和用途是编写高性能并发程序的关键。 1. **Executor** `...
Java并发编程是现代软件开发中的重要领域,尤其在多核处理器普及后,高效地利用并发已成为提高系统性能的关键。本文将全面介绍Java并发编程的基础知识、JVM同步原语、线程安全、低级并发工具、线程安全容器、高级...
高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4 高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...
5. **原子变量类**:AtomicInteger、AtomicLong等,它们提供了无锁编程的能力,通过CAS(Compare and Swap,比较并交换)操作实现原子性更新,提高了并发性能。 6. **线程池**:Executor框架是Java并发编程的重要...
- **并发集合(Concurrent Collections)**:如`ConcurrentHashMap`、`ConcurrentLinkedQueue`等,提供了线程安全的数据结构,减少了对锁的依赖,提高了并发性能。 ### 并发策略 在设计并发程序时,需要考虑以下...
并发容器是另一种提升并发性能的方式,如ArrayList和Vector的对比,ArrayList是非线程安全的,而Vector是线程安全但效率较低。ConcurrentHashMap作为线程安全的哈希表,其采用分段锁策略,提供了高效并发访问。另外...
在多处理器环境中,非统一内存访问(NUMA)架构、硬件共享内存、内存栅栏和CPU缓存等都是影响并发性能的因素。 13. Java并发编程展望 摩尔定律是衡量硬件发展速度的重要指标,它预言了处理器性能的增长趋势。Java...
高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4 高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...
如何识别可并行执行的任务,如何提高单线程子系统的响应性,如何确保并发程序执行预期任务,如何提高并发代码的性能和可伸缩性等内容,最后介绍了一些高级主题,如显式锁、原子变量、非阻塞算法以及如何开发自定义的...
多线程与高并发是计算机科学中非常重要的两个概念,它们在提高软件程序的性能、响应速度和资源利用率方面起着至关重要的作用。在当今的互联网时代,特别是在产业互联网和5G技术的推动下,多线程和高并发的应用变得...
《Java并发编程艺术》这本书是Java开发者深入理解多线程...通过阅读《Java并发编程艺术》,开发者可以深入了解Java并发编程的细节,掌握在实际开发中高效、安全地利用多线程的技能,从而提升应用程序的性能和稳定性。
- **线程池管理**:Executor框架、ThreadPoolExecutor类详解。 #### 第二阶段:高级技术与最佳实践 ##### 1. 锁优化技术 - **公平锁与非公平锁**:锁获取策略对比及应用场景分析。 - **自旋锁**:原理、优缺点及...
它通过将数据分成多个段(segment),每个段有自己的锁,这样可以同时对不同的段进行操作而不会发生冲突,从而提高并发性能。在Java 8及以后版本中,`ConcurrentHashMap`进一步优化了设计,采用了一种更加灵活的分段锁...
CAS的优势在于它避免了锁的开销,减少了上下文切换,提高了并发性能。然而,CAS也有其局限性,比如ABA问题。如果一个值从A变为B,然后再变回A,CAS可能会认为没有发生任何变化,从而导致潜在的问题。为了解决这个...
Executor 框架可以解决每次执行任务创建线程 new Thread() 比较消耗性能的问题,提供了线程池来管理线程,避免了野线程和线程之间的竞争。 8. Executor、Executors 和 ExecutorService 的区别 Executors 工具类的...
- 对比优化前后的性能指标,分析性能提升的具体数值。 #### 五、总结 通过对Tomcat服务器的配置调整,尤其是运行模式的选择和线程池的合理配置,可以有效提升系统的并发处理能力和响应速度。同时,合理规划服务器...
源码中会包含这些并发集合的使用场景和性能比较。 4. **Executor框架**:Java 5引入了`ExecutorService`、`ThreadPoolExecutor`和`ScheduledExecutorService`,它们简化了线程池的创建和管理。通过源码,我们可以...
- **读写锁(ReadWriteLock)**:允许多个线程同时读取,但写入时互斥,提高并发性能。 3. **原子操作**: - **java.util.concurrent.atomic包**:包含一系列原子类,如AtomicInteger、AtomicLong等,它们提供了...