`
jiajw0426
  • 浏览: 24863 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

利用fork-join新建Token来实现JBPM流程并发

阅读更多
以前看过一篇文章介绍JBPM流程并发的,也是通过fork节点,但是没有join节点直接拉到End节点,逻辑上可以实现,但是看起来让人感觉有点乱.当然了通过一个节点多个task也可以实现,可是流程图看起来总觉得别扭,那也就不是什么流程.言归正传,Token贯穿整个流程,一个Token可以有多个子Token,在fork和join中有多少走向,就会有多少子Token.那么我们可以不可以给Token添加子Token呢?答案是可以的,我们可以通过T构造函数Token(Token parent,String name)来创建一个子Token.问题又来了,什么时候创建token呢,通过试验在Fork节点是可以创建的,但是新建的Token会一直停留在fork节点上,需要手动Single下去.而在fork节点single的话会有异常,那就需要在fork和join之间拉一条transition,在这条transition来控制,在transition上的action创建Token,赋值对应的变量然后Single下去.下面介绍一下如何实现.

流程图

流程定义
<?xml version="1.0" encoding="UTF-8"?>
<process-definition  xmlns=""  name="test">
	<start-state name="开始节点">
		<transition to="fork1"></transition>
	</start-state>
	<fork name="fork1">
		<transition to="并发">
			<action></action>
		</transition>
		<transition to="join1" name="control">
			<action class="test.Controler"></action>
		</transition>
	</fork>
	<task-node name="并发">
		<task name="并发任务"></task>
		<transition to="join1"></transition>
	</task-node>
	<join name="join1">
		<transition to="结束"></transition>
	</join>
	<end-state name="结束"></end-state>
</process-definition>

流程的目的是,在流程开始时给流程变量departments赋值,不同部门用','隔开,可每个部门都要走'并发'那条线,有多少部门就并发几条,每一条对应一个Token,每个Token中有同名变量dept,当然对不同的部门这个值不同,这里通过ContextInstance的createVariable(name,value,token)方法来实现,处理task的时候也是通过对应的方法得到对应的值.下面是action 的handler类 test.Controler的代码,
package test;
import org.jbpm.context.exe.ContextInstance;
import org.jbpm.graph.def.ActionHandler;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.ProcessInstance;
import org.jbpm.graph.exe.Token;
public class Controler implements ActionHandler {
	public void execute(ExecutionContext arg0) throws Exception {
		ProcessInstance pi=arg0.getProcessInstance();
		ContextInstance ci=pi.getContextInstance();
		String deparments=(String) ci.getVariable("departments");
		String[] deptArry=deparments.split(",");
		Token root=pi.getRootToken();		
		//默认的token
		Token degaultToken=root.getChild("1");
		ci.createVariable("dept", deptArry[0], degaultToken);
		//其他部门分别创建对应的token,并分别对变量dept赋值
		for(int i=1;i<deptArry.length;i++){
			Token newToken=new Token(root,deptArry[i]);
			ci.createVariable("dept", deptArry[i], newToken);
			newToken.signal();
		}
      	}
}

测试结果代码为
package test;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.context.exe.ContextInstance;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ProcessInstance;
import org.jbpm.graph.exe.Token;
import org.jbpm.taskmgmt.exe.TaskInstance;
import org.jbpm.taskmgmt.exe.TaskMgmtInstance;
public class Test {
	public static void main(String[] args) {

		JbpmContext jc = JbpmConfiguration.getInstance().createJbpmContext();
		ProcessDefinition pd = jc.getGraphSession()
				.findLatestProcessDefinition("test");
		ProcessInstance pi = pd.createProcessInstance();
		System.out.println("新流程开始成功!!");
		Token root = pi.getRootToken();
		ContextInstance ci = pi.getContextInstance();
		//把部门参数加入流程当中
		ci.createVariable("departments", "dept1,dept2,dept3");
		root.signal();
		TaskMgmtInstance tmi = pi.getTaskMgmtInstance();
		Collection c = tmi.getTaskInstances();
		Iterator iter = c.iterator();
		System.out.println("rootToken:" + root.getNode());
		priintTokenChhilds(root);
		while (iter.hasNext()) {
			TaskInstance ti = (TaskInstance) iter.next();
			Token token = ti.getToken();
			String dept = (String) ci.getVariable("dept", token);
			ti.end();
			System.out.println("部门" + dept + "的task'" + ti.getName() + "'结束");
			System.out.println("rootToken:" + root.getNode());
			priintTokenChhilds(root);
		}
		jc.close();
	}

	public static void priintTokenChhilds(Token token) {
		Map childs = token.getChildren();
		Iterator it = childs.keySet().iterator();
		while (it.hasNext()) {
			Token child = (Token) childs.get(it.next());
			System.out.println(child.getFullName() + ":" + child.getNode());
		}
	}
}

测试结果
新流程开始成功!!
rootToken:Fork(fork1)
/dept2:TaskNode(并发)
/dept3:TaskNode(并发)
/control:Join(join1)
/1:TaskNode(并发)
部门dept1的task'并发任务'结束
rootToken:Fork(fork1)
/dept2:TaskNode(并发)
/dept3:TaskNode(并发)
/control:Join(join1)
/1:Join(join1)
部门dept2的task'并发任务'结束
rootToken:Fork(fork1)
/dept2:Join(join1)
/dept3:TaskNode(并发)
/control:Join(join1)
/1:Join(join1)
部门dept3的task'并发任务'结束
rootToken:EndState(结束)
/dept2:Join(join1)
/dept3:Join(join1)
/control:Join(join1)
/1:Join(join1)

/1节点是流程默认的子Token,代表dept1那条线.
可见所有的Token结束之后,rootToken就到达了结束节点,这是对于在并发之前已经知道并发流程个数的实现方法,对于在流程中不知道并发的条数实现方式类似,只是control那那条控制线改成task节点来实现并发控制就可以.这是这个节点要手动结束
分享到:
评论
8 楼 zengqun89 2009-07-22  
package test.joinAndFork;

import java.util.Collection;  
import java.util.Iterator;  
import java.util.Map;  
import org.jbpm.JbpmConfiguration;  
import org.jbpm.JbpmContext;  
import org.jbpm.context.exe.ContextInstance;  
import org.jbpm.graph.def.ProcessDefinition;  
import org.jbpm.graph.exe.ProcessInstance;  
import org.jbpm.graph.exe.Token;  
import org.jbpm.taskmgmt.exe.TaskInstance;  
import org.jbpm.taskmgmt.exe.TaskMgmtInstance;  

public class TestJoinAndFork {  

    public static void main(String[] args) {  
 
        JbpmContext jc = JbpmConfiguration.getInstance().createJbpmContext();  
        ProcessDefinition pd = ProcessDefinition.parseXmlString("<?xml version='1.0' encoding='UTF-8'?> " +
        " <process-definition  xmlns='urn:jbpm.org:jpdl-3.2'  name='test'>   "+
        "    <start-state name='开始节点'>     "+
        "        <transition to='fork1'></transition>     "+
        "   </start-state>    "+
        "    <fork name='fork1'>   "+
        "        <transition to='并发'>   "+
        "            <action></action>   "+
        "        </transition>    "+
        "        <transition to='join1' name='control'>   "+
        "            <action class='test.joinAndFork.Controler'></action>   "+
        "        </transition>     "+
        "    </fork>     "+
        "    <task-node name='并发'>   "+
        "        <task name='并发任务'></task>   "+
        "        <transition to='join1'></transition>     "+
        "    </task-node>    "+
        "    <join name='join1'>    "+
        "        <transition to='结束'></transition>       "+
        "    </join>                                "+
        "    <end-state name='结束'></end-state>        "+
        " </process-definition>  ");
        jc.getGraphSession().deployProcessDefinition(pd);
       
        ProcessDefinition newpd2=jc.getGraphSession().loadProcessDefinition(pd.getId());
       
        ProcessInstance pi =new ProcessInstance(newpd2);
        System.out.println("新流程开始成功!!");  
        Token root = pi.getRootToken(); 
       
        ContextInstance ci = pi.getContextInstance();  
        //把部门参数加入流程当中  
        ci.createVariable("departments", "dept1,dept2,dept3");
        pi.signal();
        //root.signal();  
       // TaskMgmtInstance tmi = pi.getTaskMgmtInstance();  
      // Collection c = tmi.getTaskInstances(); 
        Collection c=pi.getTaskMgmtInstance().getTaskInstances();
        Iterator iter = c.iterator();  
        System.out.println("rootToken:" + root.getNode());  
        priintTokenChhilds(root);  
        while (iter.hasNext()) {  
            TaskInstance ti = (TaskInstance) iter.next();  
            Token token = ti.getToken();  
            String dept = (String) ci.getVariable("dept", token);  
            ti.end();  
            System.out.println("部门" + dept + "的task'" + ti.getName() + "'结束");  
            System.out.println("rootToken:" + root.getNode());  
            priintTokenChhilds(root);  
        }  
        jc.close();  
    }  
 
    public static void priintTokenChhilds(Token token) {  
        Map childs = token.getChildren();  
        Iterator it = childs.keySet().iterator();  
        while (it.hasNext()) {  
            Token child = (Token) childs.get(it.next());  
            System.out.println(child.getFullName() + ":" + child.getNode());  
        }  
    }  


=====================在运行时报这样的错误..请帮助解决一下儿.
Exception in thread "main" java.lang.NoSuchFieldError: FORCE
at org.jbpm.graph.node.Join.execute(Join.java:105)
at org.jbpm.graph.def.Node.enter(Node.java:318)
at org.jbpm.graph.def.Transition.take(Transition.java:151)
at org.jbpm.graph.def.Node.leave(Node.java:393)
at org.jbpm.graph.def.Node.leave(Node.java:368)
at org.jbpm.graph.node.Fork.execute(Fork.java:140)
at org.jbpm.graph.def.Node.enter(Node.java:318)
at org.jbpm.graph.def.Transition.take(Transition.java:151)
at org.jbpm.graph.def.Node.leave(Node.java:393)
at org.jbpm.graph.node.StartState.leave(StartState.java:70)
at org.jbpm.graph.exe.Token.signal(Token.java:194)
at org.jbpm.graph.exe.Token.signal(Token.java:139)
at org.jbpm.graph.exe.ProcessInstance.signal(ProcessInstance.java:251)
at test.joinAndFork.TestJoinAndFork.main(TestJoinAndFork.java:53)

7 楼 zengqun89 2009-07-22  
package test.joinAndFork;



import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.context.exe.ContextInstance;  
import org.jbpm.graph.def.ActionHandler;  
import org.jbpm.graph.exe.ExecutionContext;  
import org.jbpm.graph.exe.ProcessInstance;  
import org.jbpm.graph.exe.Token;  
public class Controler implements ActionHandler {  
    public void execute(ExecutionContext arg0) throws Exception { 
    JbpmContext jc = JbpmConfiguration.getInstance().createJbpmContext();  
        ProcessInstance pi=arg0.getProcessInstance();  
        ContextInstance ci=pi.getContextInstance();  
        String deparments=(String) ci.getVariable("departments");  
        String[] deptArry=deparments.split(",");  
        Token root=pi.getRootToken(); 
        System.out.println("=============>root="+root.getNode().getName());
        //默认的token  
        Token degaultToken=root.getChild("1");
        System.out.println("==============>degaultToken="+degaultToken.getNode().getName());
        ci.createVariable("dept", deptArry[0], degaultToken);  
        //其他部门分别创建对应的token,并分别对变量dept赋值  
        for(int i=1;i<deptArry.length;i++){  
            Token newToken=new Token(root,deptArry[i]);  
            ci.createVariable("dept", deptArry[i], newToken);  
            System.out.println("=====================>newToken="+newToken.getNode().getName());
            newToken.signal();  
            //jc.save(pi);
          }  
        }  

=====================在运行时报这样的错误..请帮助解决一下儿.
Exception in thread "main" java.lang.NoSuchFieldError: FORCE
at org.jbpm.graph.node.Join.execute(Join.java:105)
at org.jbpm.graph.def.Node.enter(Node.java:318)
at org.jbpm.graph.def.Transition.take(Transition.java:151)
at org.jbpm.graph.def.Node.leave(Node.java:393)
at org.jbpm.graph.def.Node.leave(Node.java:368)
at org.jbpm.graph.node.Fork.execute(Fork.java:140)
at org.jbpm.graph.def.Node.enter(Node.java:318)
at org.jbpm.graph.def.Transition.take(Transition.java:151)
at org.jbpm.graph.def.Node.leave(Node.java:393)
at org.jbpm.graph.node.StartState.leave(StartState.java:70)
at org.jbpm.graph.exe.Token.signal(Token.java:194)
at org.jbpm.graph.exe.Token.signal(Token.java:139)
at org.jbpm.graph.exe.ProcessInstance.signal(ProcessInstance.java:251)
at test.joinAndFork.TestJoinAndFork.main(TestJoinAndFork.java:53)
6 楼 ansendly 2009-05-08  
楼主,如果派发的部门流程不一样呢?数量也不能确定,你能处理吗?
5 楼 okhaoba 2008-12-16  
请问,流程并发是什么意思?
4 楼 wolfwood 2008-11-26  
不错,测试通过了,这样的解决方法很简洁
3 楼 jacky6024 2008-11-25  
很不错的想法。
2 楼 jiajw0426 2008-11-12  
这种singal()是走默认的路径,为了确保无误的,你可以给每条分支都定义名字,然后调用singal(String )方法就可以了
1 楼 debugself 2008-11-12  
这样实现确实不错!
不过有点疑问
Token newToken=new Token(root,deptArry[i]);
ci.createVariable("dept", deptArry[i], newToken);
newToken.signal();
在signal后,就一定是走到“并发”节点吗?就不会到control吗?
或者说newToken怎么知道它要走那个分支呢,如果fork下还有另外一个分支,又会怎么样呢?因为new的时候只知道它的root和name。

相关推荐

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理 Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在提高程序的执行效率和性能。该框架的核心思想是将大任务分割成若干个小任务,并将其分配给不同的线程执行,以...

    Go-Golang版的fork-join

    Go-Golang版的Fork-Join框架是一种在Golang中实现的并发编程模型,灵感来源于Java的ForkJoin框架。这个框架的核心理念是通过将大任务分解为小任务,然后并行执行这些小任务,最后合并结果,以提高计算效率。在Golang...

    eclipse-collections-forkjoin-7.1.2-API文档-中文版.zip

    赠送jar包:eclipse-collections-forkjoin-7.1.2.jar; 赠送原API文档:eclipse-collections-forkjoin-7.1.2-javadoc.jar; 赠送源代码:eclipse-collections-forkjoin-7.1.2-sources.jar; 赠送Maven依赖信息文件:...

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    Fork-join Queues Response-Time Tails的有效逼近:Fork-join队列中响应时间尾的有效逼近方法-matlab开发

    Harrison 在 IFIP Performance 2015 接受的论文“Beyond the Mean in Fork-Join Queues: Efficient Approximation for Response-Time Tails”中提出的近似方法。 文件 main_example.m 中提供了如何使用脚本的示例,...

    eclipse-collections-forkjoin-7.1.2-API文档-中英对照版.zip

    赠送jar包:eclipse-collections-forkjoin-7.1.2.jar; 赠送原API文档:eclipse-collections-forkjoin-7.1.2-javadoc.jar; 赠送源代码:eclipse-collections-forkjoin-7.1.2-sources.jar; 赠送Maven依赖信息文件:...

    35 拆分你的任务—学习使用Fork-Join框架.pdf

    在Java并发编程中,Fork/Join框架是一个强大的工具,尤其在处理大量数据时能显著提升性能。这个框架从Java 7开始引入,是ExecutorService的一个实现,它基于分而治之的策略,将大任务分解成多个小任务,然后并行地...

    jbpm4.4 会签 测试( fork & join)

    在jbpm中,"fork & join"机制是实现并行处理和流程控制的关键概念,尤其在处理会签(多个审批人同时参与审批)等场景时显得尤为重要。 "fork"在jbpm中指的是流程分支,当流程到达一个fork节点时,流程会分裂为多个...

    Java并发Fork and join

    Fork/Join框架是Java并发库中的一部分,自Java 7开始引入,它为开发者提供了一种高效的处理大规模计算任务的方法。这个框架基于分治策略,将大任务分解成若干小任务,然后并行执行这些小任务,最后再将结果合并。...

    Fork-Join实时任务图模型的可行性:硬度和算法

    Fork-Join实时任务图模型(FJRT模型)是该领域内的一项重要研究,它通过结合分支代码建模和任务内并行结构建模,扩展了基于有向图的任务模型,引入了任务分叉(forking)和合并(joining)的概念。 在研究实时系统...

    工作流jbpm中join与fork用法

    在实际操作中,开发者可以通过JBPM的图形界面或XML配置文件来实现`Fork`和`Join`的布局,以满足业务需求。 为了更好地理解和应用这些概念,可以参考提供的博文链接(https://dolphin-ygj.iteye.com/blog/67860),...

    simple-fork-join:ForkJoin的简单示例

    `simple-fork-join`项目提供的示例代码可以帮助初学者理解如何在Java中使用ForkJoin框架来编写高效的并行计算程序。通过分析和实践这个项目,开发者可以深入掌握并行编程的核心概念,提升软件性能。

    forkjoin:窃取Rust的fork-join并行性库的工作

    叉连接一个窃取fork-join并行性库的工作。 受博客文章的启发,并通过硕士论文。 托管在存储库托管的图书馆文档开发该库的目的是为了满足三种类型的算法的需求,这些算法都非常适合fork-join并行性。减少风格减少样式...

    fork-rest-dataware-master

    fork-rest-dataware-master

    Fork/Join例子

    标题“Fork/Join例子”暗示我们将探讨一个具体的示例,展示如何使用这个框架来解决问题。通常,这样的例子会涵盖创建自定义的`RecursiveTask`或`RecursiveAction`类,以及如何使用`ForkJoinPool`来执行它们。 描述...

    基于JDK的ForkJoin构建一个简单易用的并发组件1

    在Java编程中,线程池和并发任务执行是常见的方式,但在某些场景下,我们可以利用JDK的ForkJoin框架来构建更加高效和易用的并发组件。ForkJoin框架自Java 7引入,它为处理大型任务提供了一种分解成多个子任务并行...

    ForkJoin并发框架入门示例

    ForkJoin并发框架是Java 7引入的一种高效并行计算框架,它基于分而治之(Divide and Conquer)的策略,适用于处理大量可分割的任务。这个框架的核心类是`ForkJoinPool`和`ForkJoinTask`,它们为开发者提供了创建和...

    jBPM2-流程图与JBPM API.ppt

    jBPM2 版本中,流程图和JBPM API 是核心组成部分,帮助开发者实现复杂的业务流程自动化。 ### 1. jBPM-jBDL 相关概念 jBPM-jBDL(jBPM Business Definition Language)是一种用来定义业务流程的语言,基于有向图...

Global site tag (gtag.js) - Google Analytics