基础
在我看来,java比C++的一个大好处就是提供了对多线程的支持(C++只有多线程的库,语言本身不包含线程的概念)。而其中我最爱用的就是ThreadPoolExecutor这个类,它实现了一个非常棒的thread pool。
thread pool一般被用来解决两个问题:当处理大量的同步task的时候,它能够避免thread不断创建销毁的开销;而另外一个也许更重要的含义是,它其实表示了一个boundary,通过使用thread pool可以限制这些任务所消耗的资源,比如最大线程数,比如最大的消息缓冲池。
需要指出的是,ThreadPoolExecutor不仅仅是简单的多个thread的集合,它还带有一个消息队列。
在Java中,如果只是需要一个简单的thread pool,ExecuteService可能更为合适,这是一个Interface。可以通过调用Executor的静态方法来获得一些简单的threadpool,如:
- ExecuteService pool = Executors.newFixedThreadPool(poolSize);
但如果要用定制的thread pool,则要使用ThreadPoolExecutor类,这是一个高度可定制的线程池类,下面是一些重要的参数和方法:
corePoolSize 和 maxPoolSize
这两个参数其实和threadpool的调度策略密切相关:
如果poolsize小于coresize,那么只要来了一个request,就新创建一个thread来执行;
如果poolsize已经大于或等于coresize,那么来了一个request后,就放进queue中,等来线程执行;
一旦且只有queue满了,才会又创建新的thread来执行;
当然,coresize和maxpoolsize可以在运行时通过set方法来动态的调节;
(queue如果是一个确定size的队列,那么很有可能发生reject request的事情(因为队列满了)。很多人会认为这样的系统不好。但其实,reject request很多时候是个好事,因为当负载大于系统的capacity的时候,如果不reject request,系统会出问题的。)
ThreadFactory
可以通过设置默认的ThreadFactory来改变threadpool如何创建thread
keep-alive time
如果实际的线程数大于coresize,那么这些超额的thread过了keep-alive的时间之后,就会被kill掉。这个时间是可以动态设定的;
queue
任何一个BlockingQueue都可以做为threadpool中的队列,又可以分为三种:
AsynchronousQueue,采用这种queue,任何的task会被直接交到thread手中,queue本身不缓存任何的task,所以如果所有的线程在忙的话,新进入的task是会被拒绝的;
LinkedBlockingQueue,queue的size是无限的,根据前面的调度策略可知,thread的size永远也不会大于coresize;
ArrayBlockingQueue,这其实是需要仔细调整参数的一种方式。因为通过设定maxsize和queuesize,其实就是设定这个threadpool所能使用的resource,然后试图达到一种性能的最优;(Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput. )
此外,还有诸如beforeExecute,afterExecute等方法可以被重写。以上的这些内容其实都可以在ThreadPoolExecutor的javadoc中找到。应该说,ThreadPoolExecutor是可以非常灵活的被设置的,只除了一点,你没办法改变它的调度策略。
一个实例
通过分析一个特殊的ThreadPoolExeuctor的源代码,能够更好的理解它的内部机制和灵活性。
Mina中有一个特殊的ThreadPoolExecutor--org.apache.mina.filter.executor.OrderedThreadPoolExecutor。
这个executor是用来处理从网络中来的请求。它的不同之处在于,对于同一个session来的请求,它能够按照请求到达的时间顺序的执行。举个例子,在一个session中,如果先接收到request A,然后再接收到request B,那么,OrderedThreadPoolExecutor能够保证一定处理完A之后再处理B。而一般的thread pool,会将A和B传递给不同的thread处理,很有可能request B会先于request A完成。
先看看它的构造函数:
- public OrderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit,
- ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
- // We have to initialize the pool with default values (0 and 1) in order to
- // handle the exception in a better way. We can't add a try {} catch() {}
- // around the super() call.
- super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
- new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
- if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
- throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
- }
- if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
- throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
- }
- // Now, we can setup the pool sizes
- super.setCorePoolSize( corePoolSize );
- super.setMaximumPoolSize( maximumPoolSize );
- // The queueHandler might be null.
- if (eventQueueHandler == null) {
- this.eventQueueHandler = IoEventQueueHandler.NOOP;
- } else {
- this.eventQueueHandler = eventQueueHandler;
- }
- }
这里比较意外的是,它竟然用的是SynchronousQueue?! 也就是说,来了一个task,不会被放入Queue中,而是直接送给某个thread。这和一般的threadpoolExecutor是非常不一样的,因为一旦thread全用满了,task就不能再被接受了。后面我们会看到为什么使用SynchronousQueue。
再看看它的execute函数:
- public void execute(Runnable task) {
- if (shutdown) {
- rejectTask(task);
- }
- // Check that it's a IoEvent task
- checkTaskType(task);
- IoEvent event = (IoEvent) task;
- // Get the associated session
- IoSession session = event.getSession();
- // Get the session's queue of events
- SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
- Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
- boolean offerSession;
- boolean offerEvent = eventQueueHandler.accept(this, event);
- if (offerEvent) {
- // Ok, the message has been accepted
- synchronized (tasksQueue) {
- // Inject the event into the executor taskQueue
- tasksQueue.offer(event);
- if (sessionTasksQueue.processingCompleted) {
- sessionTasksQueue.processingCompleted = false;
- offerSession = true;
- } else {
- offerSession = false;
- }
- //.......
- }
- } else {
- offerSession = false;
- }
- if (offerSession) {
- waitingSessions.offer(session);
- }
- addWorkerIfNecessary();
- //..............
这里有几点需要解释的:
首先是getSessionTaskQueue函数。从这个函数可以看出,对于每一个session,都创建了一个queue来存储它的task。也就是说,同一个session的task被放在了同一个queue中。这是非常关键的地方,后面会看到,正是这个queue保证了同一个session的task能够按照顺序来执行;
其次是waitingSessions.offer(session)这条语句。waitingSessions是OrderedThreadPoolExecutor的一个私有成员,它也是一个queue: BlockingQueue<IoSession> waitingSessions ...;
这个queue里面放的是该threadpool所接收到的每个task所对应的Session,并且,如果两个task对应的是同一个session,那么这个session只会被放进waitingSessions中一次。waitingSession.offer(session)这条语句就是要将session放进queue。而offerSession这个变量和前面的十几行代码就是在判断task所对应的session是否要放入到queue中;
最后一行代码addWorkerIfNecessary();字面上很容易理解,就是判断是否添加worker。可是,worker又是什么呢?
看看Worker这个类:
- private class Worker implements Runnable {
- private volatile long completedTaskCount;
- private Thread thread;
- public void run() {
- thread = Thread.currentThread();
- try {
- for (;;) {
- IoSession session = fetchSession();
- //..........
- try {
- if (session != null) {
- runTasks(getSessionTasksQueue(session));
- }
- } finally {
- idleWorkers.incrementAndGet();
- }
- }
- } finally {
- //.......
- }
- }
- private IoSession fetchSession() {
- //........
- for (;;) {
- try {
- try {
- session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
- break;
- } finally {
- //..............
- }
- } catch (InterruptedException e) {
- //........
- }
- }
- return session;
- }
- private void runTasks(SessionTasksQueue sessionTasksQueue) {
- for (;;) {
- //......
- runTask(task);
- }
- }
- private void runTask(Runnable task) {
- beforeExecute(thread, task);
- boolean ran = false;
- try {
- task.run();
- ran = true;
- afterExecute(task, null);
- completedTaskCount ++;
- } catch (RuntimeException e) {
- if (!ran) {
- afterExecute(task, e);
- }
- throw e;
- }
- }
- }
在Worker.run()中,一开始就调用fetchSession(),这个函数从WaitingSessions这个queue中拿出一个Session。然后又调用了runTasks,这个函数会将Session中的那个TaskQueue中的每个Task挨个执行一遍。
OK,现在OrderedThreadPoolExecutor的整体设计就清晰了:
从外面看上去,OrderedThreadPoolExecutor只是一个thread pool,但本质上,它是有由两个thread pool拼接而成, 只不过后一个thread pool被隐藏在了类的内部实现中。第一个thread pool中的thread只需要完成很简单的一个任务,即将接收到的task对应的session添加到waitingSessions中(如果需要的话)。正因为如此,所以第一个threadpool的queue被设置成了SynchronousQueue。而后一个thread pool中的那些worker(也是一些thread)才真正的执行task。并且,后一个thread pool所能创建的thread的数量也受到了coreSize和MaxSize的限制。所以,整个OrderedThreadPoolExecutor实际上创建了2 * coreSize的thread。
前面的解释可能有些乱,再重新梳理整个OrderedThreadPoolExecutor的执行流程:
1. 当一个task被接收,前一个thread pool中的某个thread被指定负责处理这个task;
2. thread会找到task所对应的session,将这个task放入该session的TaskQueue中;
3. 如果该session已经被放入了waitingSessions,那么什么都不做,否则,将该session放入waitingSessions中;
4. 后一个threadpool中的某一个worker从waitingSessions中将该Session取出;
5. 找到该Session中的TaskQueue,依次执行queue中的task;
总结
总的来说,Java的TheadPoolExecutor整体架构设计的很具有扩展性,可以通过继承改写来实现不同的各具功能的threadpool,唯一的缺点就是它的调度策略是不能够改变的,但很多时候一个threadpool的调度策略会对系统性能产生很大的影响。所以,如果ThreadPoolExecutor的调度策略不适合你的话,就只能手工再造个“轮子”了。
另外,如果读过SOSP01年的“SEDA: An Architecture for Well-Conditioned, Scalable Internet Services”,那么会发现Java中的ThreadPoolExecutor非常类似于SEDA中的Stage概念。虽然我没有找到总够的证据,但是从时间的顺序看,java1.5版才加入的ThreadPoolExecutor很可能受到了01年这篇论文的启发。
相关推荐
三菱FX3G FX3S与四台E700变频器Modbus RTU通讯控制:正反转、频率设定与读取方案,三菱FX3G FX3S与四台E700变频器通讯:Modbus RTU协议实现正反转、频率设定与控制,快速反馈与教程包含,三菱FX3G FX3S 485协议通讯四台三菱E700变频器程序资料 三菱FX3G FX3S+485bd扩展,采用modbus rtu协议,crc校验,通讯控制四台E700变频器,可以实现正反转,停止,频率的设定,频率,电流等的读取。 反馈快,使用方便,包括教程,plc和触摸屏程序,变频器参数设置和接线,别的变频器支持rtu协议也可以实现。 ,三菱FX系列PLC; 485协议通讯; 变频器E700; 通讯控制; 参数设置; 教程。,三菱PLC控制E700变频器:485协议通讯与程序设置全解
1、文件内容:hyphen-nl-0.20050617-10.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/hyphen-nl-0.20050617-10.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊
西门子S7-1200PLC结构化编程在5轴伺服项目中的应用:模块化设计、触摸屏控制及电气图纸实战解析,西门子S7-1200PLC结构化编程实现多轴联动与多种伺服功能应用:CAD图纸、PLC程序和触摸屏程序协同运作。,西门子S7-1200PLC结构化编程5轴伺服项目 ,包含plc程序、威纶通触摸屏程序、cad电气图纸。 可以实现以下功能,规格有: 1.三轴机械手X轴-Y轴-Z轴联动取放料PTO脉冲定位控制台达B2伺服 2.台达伺服速度模式应用+扭矩模式应用实现收放卷 3.程序为结构化编程,每一功能为模块化设计,功能:自动_手动_单步_暂停后原位置继续运行_轴断电保持_报警功能_气缸运行及报警. 4.每个功能块可以无数次重复调用,可以建成库,用时调出即可 5.上位机采样威纶通触摸屏 6.参考本案例熟悉掌握结构化编程技巧,扩展逻辑思维。 博图14以上都可以打开 ,核心关键词:西门子S7-1200PLC; 结构化编程; 5轴伺服项目; PLC程序; 威纶通触摸屏程序; CAD电气图纸; 三轴机械手; PTO脉冲定位控制; 台达B2伺服; 速度模式应用; 扭矩模式应用; 模块化设计; 轴断电保
情感分析算法在多个领域有着广泛的应用场景和丰富的案例
基于MATLAB仿真的MMC整流站与逆变站柔性互联技术研究:快速工况仿真与环流抑制控制,基于MATLAB仿真的MMC整流站与逆变站运行分析及四端柔性互联工况仿真模拟研究,21电平MMC整流站、MMC逆变站、两端柔性互联的MATLAB仿真模型,4端柔性互联、MMC桥臂平均值模型、MMC聚合模型(四端21电平一分钟即能完成2s的工况仿真) 1-全部能正常运行,图四和图五为仿真波形 2-双闭环控制,逆变站PQ控制,整流站站Udc Q控制 3-最近电平逼近调制+子模块电容充电 4-环流抑制控制 ,1. 21电平MMC整流站; 2. MMC逆变站; 3. MATLAB仿真模型; 4. 两端柔性互联; 5. 桥臂平均值模型; 6. 聚合模型; 7. 双闭环控制; 8. 最近电平逼近调制; 9. 子模块电容充电; 10. 环流抑制控制。,基于柔性互联的MMC系统仿真模型:多电平控制与环流抑制研究
有效应对网络舆情教育培训PPT.pptx
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
Matlab领域上传的视频是由对应的完整代码运行得来的,完整代码皆可运行,亲测可用,适合小白; 1、从视频里可见完整代码的内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作
淘宝买的,直接分享给大家了,没有测试环境,也没有办法去测。但我想,他应该是可以用的
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
ACM比赛经验分享(基础知识与算法准备等)
运行GUI版本,可二开
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
该是指包含恶意网址的数据库或数据集,它通常被用于网络安全研究、恶意软件检测、网络欺诈防范等领域。研究人员和安全专家会利用这个数据集来分析恶意网址的特征、行为模式,进而开发出相应的检测算法和防护措施,以识别和阻止恶意网址对用户设备和网络环境造成的潜在威胁。该数据集包含约 651,191 条经过标记的 URL,涵盖了四种主要类型:良性(Benign)、篡改(Defacement)、钓鱼(Phishing)和恶意软件(Malware)。其中,良性 URL 占据了约 428,103 条,篡改 URL 有 96,457 条,钓鱼 URL 为 94,111 条,而恶意软件 URL 则有 32,520 条。该数据集的显著特点是其多类别分类的全面性,不仅包括常见的恶意 URL 类型,还涵盖了大量良性 URL,使得研究人员能够更全面地理解和区分不同类型的 URL。此外,数据集以原始的 URL 形式提供,研究人员可以根据需要提取和创建特征,而不受预设特征的限制。
字卡v4.3.4 原版 三种UI+关键字卡控制+支持获取用户信息+支持强制关注 集卡模块从一开始的版本到助力版本再到现在的新规则版本。 集卡模块难度主要在于 如何控制各种不同的字卡组合 被粉丝集齐的数量。 如果不控制那么一定会出现超过数量的粉丝集到指定的字卡组合,造成奖品不够的混乱,如果大奖价值高的话,超过数量的粉丝集到大奖后,就造成商家的活动费用超支了。我们冥思苦想如何才能限制集到指定字卡组合的粉丝数,后我们想到了和支付宝一样的选一张关键字卡来进行规则设置的方式来进行限制,根据奖品所需的关键字卡数,设定规则就可以控制每种奖品所需字卡组合被粉丝集到的数量,规则可以在活动进行中根据需要进行修改,活动规则灵活度高。新版的集卡规则,在此次政府发布号的活动中经受了考验,集到指定字卡组合的粉丝没有超出规则限制。有了这个规则限制后,您无需盯着活动,建好活动后就无人值守让活动进行就行了,您只需要时不时来看下蹭蹭上涨的活动数据即可。 被封? 无需担心,模块内置有防封功能,支持隐藏主域名,显示炮灰域名,保护活动安全进行。 活动准备? 只需要您有一个认证服务号即可,支持订阅号借用认证服务号来做活动。如果您
DSP28035的CAN通信升级方案:包括源码、测试固件与C#上位机开发,支持周立功USBCAN-II兼容盒及BootLoader闪烁指示,DSP28035的CAN升级方案及详细配置说明:使用新动力开发板与C#上位机软件实现固件升级,涉及用户代码、BootLoader代码及硬件连接细节,DSP28035的can升级方案 提供源代码,测试用固件。 上位机采用c#开发。 说明 一、介绍 1、测试平台介绍:采用M新动力的DSP28035开发板,CAN口使用GPIO30\31。波特率为500K。 2、28035__APP为测试用的用户代码,ccs10.3.1工程,参考其CMD配置。 3、28035_Bootloader_CAN为bootloader源代码,ccs10.3.1工程; 4、SWJ为上位机,采用VS2013开发,C#语言。 5、测试使用的是周立功的USBCAN-II,can盒,如果用一些国产可以兼容周立功的,则更这里面的ControlCAN.dll即可。 6、升级的app工程需要生成hex去升级,具体参考我给的工程的设置。 7、BootLoader代码,只有D400这一个灯1s闪烁一
基于Matlab的数字验证码识别系统:预处理与不变矩算法的实践应用及GUI界面构建,基于MATLAB不变矩算法的数字验证码识别系统设计与实现,基于matlab不变矩算法实现数字验证码 过程:先对验证图像进行去噪、定位、归一化等预处理,然后计算待识别数字的不变矩,再进行特征匹配,得到识别结果。 以Matlab软件为开发平台来进行设计实现及仿真,并构建相应的GUI界面。 实验结果表明利用不变矩在识别数字验证码方面具有可行性。 ,关键词:Matlab;不变矩算法;数字验证码;预处理;特征匹配;GUI界面;实验验证;可行性。,Matlab实现数字验证码识别:预处理与不变矩算法的GUI仿真
基于STM32F103的磁编码器通讯方案:原理图、PCB设计与源码实现,附多摩川协议手册解析,基于STM32F103的精准多摩川绝对值磁编码器通讯解决方案:原理图、PCB设计与源码实践手册,完整包含多摩川协议解析,基于STM32F103的多摩川绝对值磁编码器通讯方案 包含:原理图,PCB,源码,多摩川协议手册 ,核心关键词:STM32F103;多摩川绝对值磁编码器;通讯方案;原理图;PCB;源码;多摩川协议手册;,基于STM32F103的绝对值磁编码器通讯方案:原理图PCB与源码解析,附多摩川协议手册
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
php项目之学生成绩查询系统源码,项目仅供学习参考使用