Activiti 版本 5.10
使用activiti 有一段时间了,目前使用activiti 的大部分公司都是用来做类似于OA 等以用户任务为主的流程,
这我没什么好说的,因为我们的流程是以ServiceTask + UserTask 结合来处理定时调度等数据处理任务。
ServiceTask 以主,采用class 和 Spring bean 的方式。废话补多少,切入正题:
Activiti 5.10 设计器是支持 delegateExpression 的注入参数的,但activiti 引擎的解析器却未能将参数注入从Spring 得到的bean,该问题在5.11 的版本上是得到解决了的,但 5.11版本目前还没有发布,有兴趣的同学可以去下载正在开发中的代码进行研究,或者修改源码
然后我们来谈谈Activiti 对于并发的处理以及其中的问题(以ServiceTask 为例):
当我们将serviceTask 设置 async = "true" (关于 isExclusive 后续会提到) 的时候,流程引擎采用JobExecutor 来异步执行,执行顺序为引擎首先会将该任务实例化一条job记录,插act_ru_job表,然后JobExecutor 扫描该表并加锁执行该job,这里就涉及到定义ServiceTask 的另外一个属性isExclusive,这个属性默认为true,即同一流程中当前存在于act_ru_job 且 is_exclusive 为true的会一起取出来,放入一个AcquireJobsCmd,然后放入一个线程执行,这样做用来保证该批任务时串行执行的,使用相同的context,这样做没有什么问题,
但不能达到真正并行的目的。
附上我们的流程图:
目标,处于parallelGateWay后面的任务并行执行,即任务完成的时间为单个任务完成的最大时间。
每个ServiceTask 的 acitiviti:async = "true" activiti:exclusive="false"
运行,这时你会遇到一个 ActivitiOptimisticLockingException 异常
(toString(updatedObject)+" was updated by another transaction concurrently");
为什么会这样呢?因为每个ServiceTask 对应一条act_ru_execution 表的记录,当该任务完后后,会去跟新其
parent_id 对应的execution 将其版本 +1 ,
update ${prefix}ACT_RU_EXECUTION set
REV_ = #{revisionNext, jdbcType=INTEGER},
PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
ACT_ID_ = #{activityId, jdbcType=VARCHAR},
IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
where ID_ = #{id, jdbcType=VARCHAR}
and REV_ = #{revision, jdbcType=INTEGER}
跟踪该异常的抛出原因是 在serviceTask完成后,更新的ExecutionEntity 是同一条记录,而每一个serviceTask 此时处于
两个不同的线程和事务当中,两个事务彼此不可见,任务开始时获取的ExecutionEntity完成相同,当一个事务成功更新后,
另一个事务就会失败。这样保证了流程的准确执行,当该任务失败后,会在下一个JobExecutor 扫描时重新执行。此时获取的
execution 的版本已经加1,此时任务正常结束。Activiti 引擎如此做有一定的道理,但这不是我要的。为什么这样说呢?
假设我两个 serviceTask,每个执行都需要30分钟,仅仅因为这样,我就需要花费1个小时的时间才能完成,天啊,饶了我吧!
有木有办法,我想是有的,只要你够大胆。跟踪代码,更新父execution的时候,唯一变了的就是version,其他值都没变化,
那么我们是否可以将update语句更改一下,如下:
update ${prefix}ACT_RU_EXECUTION set
REV_ = REV_ + 1,
PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
ACT_ID_ = #{activityId, jdbcType=VARCHAR},
IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
where ID_ = #{id, jdbcType=VARCHAR}
注意到这里的变化,where 条件只剩id去掉了version ,然后REV_使用表里面的数据直接+1经测试完成可行,有兴趣的朋友可以自己试试。这里的关口过了,但后面仍然存在危险,所以修改源码是有风险的(%>_<%)。问题出在哪儿?有时候你会发现 两个serviceTask 运行完了,Execution表的记录也更新了,流程停滞不前了,这是神马原因???
最后终于让我问题出现的地方,ParallelGatewayActivityBehavior,我们前面说过,当每个ServiceTask执行完成之后,事务并没有到结束的地方,根据ServiceTask 的流程指向来到了第二个ParallelGateway,ParallelGatewayActivityBehavior 的作用就是判断前面的任务是否完成,是否继续执行,当每个ServiceTask所在的事务到达此处时,他们都只能看见自己完成的部分,而不能看见与他并行的事务里面的状态。所以当到达是否执行下一步的判断条件时
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
// Fork
log.fine("parallel gateway '"+activity.getId()+"' activates: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");
execution.takeAll(outgoingTransitions, joinedExecutions);
} else if (log.isLoggable(Level.FINE)){
log.fine("parallel gateway '"+activity.getId()+"' does not activate: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");
}
都会告诉ParallelGatewayActivityBehavior 我已经完成了,其他的还没有完成。
有人可能会说,为什么我的程序没有遇到这种情况呢?
第一 如果你不是ServiceTask 任务很难遇到这种情况
第二 如果你的ServiceTask 没有设置为 async = "true" 和 exclusive="false" 也就不是真正的并发,当然也不会遇到
还有其他原因我就不赘述了。
这种问题也是可以解决的,因为针对同一个流程,每一个事务都是通过同一个ParallelGatewayActivityBehavior实例来进行判断的,
我们只要记录每一个通过该GateWay的事务的完成情况,然后汇总起来就OK 了,另外在完成的那一步需要将executio 的parent 的
executions 对应的更新,否则execution 会有记录不能删除,但流程是可以完整的执行完成,给出我的完整处理方式:
public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {
private static Logger log = Logger.getLogger(ParallelGatewayActivityBehavior.class.getName());
private Map<String,ActivityExecution> activityJoinedExecutions = new ConcurrentHashMap<String,ActivityExecution>();
public void execute(ActivityExecution execution) throws Exception {
// Join
PvmActivity activity = execution.getActivity();
List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
execution.inactivate();
lockConcurrentRoot(execution);
List<ActivityExecution> joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
int nbrOfExecutionsToJoin = execution.getActivity().getIncomingTransitions().size();
int nbrOfExecutionsJoined = joinedExecutions.size();
if(nbrOfExecutionsToJoin!=nbrOfExecutionsJoined){
for(ActivityExecution e:joinedExecutions){
activityJoinedExecutions.put(e.getId(), e);
}
nbrOfExecutionsJoined = activityJoinedExecutions.size();
if(nbrOfExecutionsJoined == nbrOfExecutionsToJoin && execution.getParentId()!=null
&& execution instanceof ExecutionEntity){
ExecutionEntity et = (ExecutionEntity)execution;
while(joinedExecutions.size()!=nbrOfExecutionsToJoin ){
Thread.sleep(10000);
for(int i = 0 ; i < et.getParent().getExecutions().size(); i++){
ExecutionEntity ct = et.getParent().getExecutions().get(i);
if(activityJoinedExecutions.containsKey(ct.getId()) ){
et.getParent().getExecutions().set(i, (ExecutionEntity)activityJoinedExecutions.get(ct.getId()));
}
}
joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
}
}
}
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
activityJoinedExecutions.clear();
// Fork
log.fine("parallel gateway '"+activity.getId()+"' activates: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");
execution.takeAll(outgoingTransitions, joinedExecutions);
} else if (log.isLoggable(Level.FINE)){
log.fine("parallel gateway '"+activity.getId()+"' does not activate: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");
}
}
}
- 大小: 11.7 KB
分享到:
相关推荐
工作流 activiti 5.22 退回代码实现。 activiti在设计的时候没有回退相关的操作,回退是中国特有的特色。这里写一个比较简单的回退。不支持回退到并行网关前面节点,虽然回退到前面节点不会报错 但会导致任务无法...
3. **表达式和脚本语言**:Activiti 支持使用 Expression Language (EL) 和 Scripting Language 进行条件判断和业务逻辑处理。源码中可以找到 EL 的解析和执行,以及不同脚本语言(如 JavaScript)的集成。 4. **...
- **流程执行**:高效执行流程实例,处理并发、分支、循环等复杂逻辑。 - **任务管理**:支持任务分配、委托、完成等操作。 - **表单支持**:可以与动态表单集成,收集和展示流程数据。 - **API与服务**:提供了...
Activiti Modeler是一款基于Java开发的流程建模工具,它主要与Activiti BPMN(业务流程管理Notation)引擎紧密配合,为用户提供了一个直观、易用的界面来设计、模拟和部署业务流程。Activiti是Alfresco公司推出的一...
9. **并发与事务管理**:作为企业级服务,Activiti需要处理并发和事务一致性问题。源码中包含了事务边界设定和并发控制的实现,对于理解分布式系统的挑战非常有帮助。 10. **测试与调试**:源码中包含了大量的单元...
Activiti 是一个开源的工作流程(Workflow)和业务自动化(Business Automation)引擎,它基于Java平台,主要用于企业级应用中处理业务流程。这个压缩包“activiti资料.zip”包含的资源很可能是关于Activiti的源代码...
BPMN 2.0还提供了对流程变量、事件驱动、并发处理和流程实例的生命周期管理等功能。 3. **流程定义插件**: 描述中提到的“流程定义插件”可能是指用于图形化设计和编辑流程的工具,如Activiti Modeler。这种工具...
10. **表单与表单引擎**:Activiti允许通过表单来交互处理流程任务,可以是内置的简单表单,也可以通过集成第三方表单引擎如Alfresco Form Engine来实现复杂表单需求。 在解压 "activiti-5.22.0" 压缩包后,你将...
在实际应用中,Activiti Designer常被用于企业级应用的流程自动化,比如审批流程、订单处理流程等。通过将业务流程可视化,不仅方便了开发,也利于非技术背景的业务人员理解和调整流程。 总结来说,Activiti ...
在 Activiti 5.22 版本中,我们可能需要在某些情况下撤销已经启动的流程实例,比如当发现某个任务处理有误时。实现撤回操作需要对 Activiti 的内部机制有深入了解,特别是关于任务和执行对象(ExecutionEntity)的...
在Ruoyi中,你可以开发对应的业务服务来处理Activiti的工作流任务。例如,对于一个用户任务,你可以编写一个Service接口和实现类,处理任务的分配、审批逻辑等。同时,通过Ruoyi的前端界面,用户可以查看、接受和...
- 改进了性能和并发处理能力,以应对大规模的企业级应用场景。 - 支持多租户模式,便于在一个 Activiti 实例中管理多个独立的业务单元。 总结来说,Activiti 6.0 提供的这三个 WAR 包为企业构建和管理业务流程提供...
部署的流程模型会被转换为可执行的流程实例,这些实例在 Activiti 引擎中运行,处理业务逻辑。 3. **任务管理**:Activiti 支持任务分配和管理工作。任务可以自动分配给指定的角色或用户,也可以通过用户界面手动...
4. **流程实例管理**:讨论如何启动和控制流程实例,包括开始、暂停、恢复、结束流程实例的操作,以及对并发和多实例的支持。 5. **任务管理**:介绍任务的概念,如何分配、领取、完成任务,以及任务监听器的使用,...
在错误处理和调试方面,手册会讲解如何处理异常,以及使用 Activiti 的日志和监控功能来跟踪和诊断问题。这对于优化流程性能和解决运行时问题至关重要。 最后,手册可能会涵盖一些高级主题,如表单处理、多租户支持...
Activiti 适配达梦数据库教程 activiti 是一个开源的 Workflow 和 BPM system,它提供了强大的工作流引擎和业务流程管理功能。然而,activiti 默认支持的数据库只有 MySQL、 PostgreSQL、Oracle 等少数几种,而达梦...
在实际使用中,开发者可以通过这些资源来了解如何设计流程图,如何启动和结束流程,如何处理任务分配,如何进行并发控制,以及如何集成其他系统如CRM或ERP。同时,Activiti还支持通过REST API或服务调用来与外部应用...
activiti 5.22 explorer的war包
logging.properties 是 Activiti 的日志处理配置文件,用于配置 Activiti 的日志处理参数。 准备环境 准备环境是指为 Activiti 的开发和测试环境。准备环境包括了安装 Activiti 软件、相关资源下载、安装流程设计...
Activiti 是一个开源的工作流引擎,它被...同时,还可以了解到 Activiti 如何处理并发控制、异常处理以及与其他系统的集成。这个实例对初学者来说是一份宝贵的资源,可以帮助深入理解 Activiti 的工作原理和实际应用。