`
snake1987
  • 浏览: 72738 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发学习之六:JCSP(Java Communicating Sequential Processes)实践

    博客分类:
  • java
阅读更多
首先得描述下什么是JCSP:CSP是Communicating Sequential Processes的缩写

在ibm的developerworks 中国上学习java的并发编程时无意发现的,因为在之前的多线程编程实践中,调试一个多线程的bug都花了大量的时间,所以一直想找点工具或者技巧来学习一下

然后就发现了这个玩意儿,这是JCSP在ibm的地址:
并发专题
JCSP第一部分
JCSP第二部分
JCSP第三部分
JCSP的SVN地址

文中有这样一段话:

“确实想进行多线程编程的开发人员通常准备好了以下一个或两个解决方案(至少是一部分):
长时间艰苦地测试代码,找出所有出现的并发性问题,诚心地希望到应用程序真正运行地时候已经发现并修复了所有这类问题。

大量运行设计模式和为多线程编程建立的指导原则。但是,这类指导原则只在整个系统都按照它们的规范设计的时候才有效,没有设计规则能够覆盖所有类型的系统。
虽然知道的人不多,但是对于编写(然后验证)正确的多线程应用程序这一问题,还有第三个选项。使用称为通信顺序进程(Communicating Sequential Processes,CSP)的精确的线程同步的数学理论,可以在设计时最好地处理死锁和活动锁之类的问题。CSP 由 C.A.R. Hoare 与 20 世纪 70 年代后期设计,CSP 提供了有效的方法,证明用它的构造和工具构建的系统可以免除并发的常见问题。”

就因为这一段话,勾起来我对它的兴趣

详细看下去,大概明白了它的一个原理:将所有的任务抽象为CSProcess,这些任务之间通信用一种既定的协议:通道,任务之间除了通道之外无任何联系,然后除了通道传递的参数,其他的变量均为任务私有,所以完全地避免了往常多线程编码中共享变量的访问问题

先不考虑效率,先设想现在并发编程存在的问题:现在的并发开发,锁,happen-before规则,volatile,基本完全颠覆了我们往常的思维习惯:AB操作明明是连续发生的,但却要考虑中间经过了n多操作。我们需要考虑:A操作后面发生了什么操作,B操作必须发生在A操作之前,怎么保证,等等。这些都是很痛苦的事情,尤其是设计一个很大的并发的框架,像在《Concurrency in practice》里面说的,你在使用(修改,增强)一个并发的设计之前,你必须弄清楚它的锁策略。当一个并发的设计膨胀到一个很大的程度的时候,怎么办?

所以在某种程度上,需要考虑牺牲它的效率了(个人认为),JCSP很优雅地对并发的过程进行了抽象。将一个并发的设计分隔为一个个独立的而且明确的任务,并且能够简单有效地组合起来。当一个并发的设计到达一定程度的时候,可以将子模块用绝对效率的方式来优化,在大模块上用JCSP来关联,在追求效率的同时,也具有了稳健性,也不失为一种选择(尤其对于小企业,与其让一堆对并发只是模棱两可的人来瞎折腾,不如降低一点效率,让应用更稳健)

怀着这样的想法,就拿着JCSP写了点代码来感受一下,也是一个线程池,不过就换用JCSP来实现了

画了个大概的流程图


很简单的一个步骤
代码如下:(测试代码见线程池(三)的测试代码)
public class ThreadPoolTest3WithJCSP implements Executor {
	/**
	 * 用于发送任务
	 */
	private ChannelOutput startCommandOut;
	
	public ThreadPoolTest3WithJCSP(final int threadNum) {
		//最繁琐的是创建通道,然后让通道的两边分别连到不同的CSProcess上
		ArrayList<CSProcess> processList = new ArrayList<CSProcess>();
		One2OneChannel channelUsedForController = StandardChannelFactory.getDefaultInstance().createOne2One();
		startCommandOut = channelUsedForController.out();
		One2OneChannel[] channelUsedForExecuter = StandardChannelFactory.getDefaultInstance().createOne2One(threadNum);
		ChannelOutput[] channeloutputUsedForExecuter = new ChannelOutput[threadNum];
		for(int i = 0;i<threadNum;i++)
		{
			channeloutputUsedForExecuter[i] = channelUsedForExecuter[i].out();
		}
		StandardChannelIntFactory intFactory = new StandardChannelIntFactory();
		One2OneChannelInt channelUsedForAfterExecuted = intFactory.createOne2One();
		One2OneChannelInt[] channelUsedForNotifyTask = intFactory.createOne2One(threadNum);
		AltingChannelInputInt[] channelInputUsedForNotifyTask = new AltingChannelInputInt[threadNum];
		for(int i = 0;i<threadNum;i++)
		{
			channelInputUsedForNotifyTask[i] = channelUsedForNotifyTask[i].in();
		}
		ThreadExecuteController controller = new ThreadExecuteController(threadNum, channelUsedForController.in(),channelUsedForAfterExecuted.in(), channeloutputUsedForExecuter);
		processList.add(controller);
		ExecuteProcess[] exeProcesses = new ExecuteProcess[threadNum];
		for(int i = 0;i<threadNum;i++)
		{
			exeProcesses[i] = new ExecuteProcess(channelUsedForExecuter[i].in(), channelUsedForNotifyTask[i].out(), i);
			processList.add(exeProcesses[i]);
		}
		
		AfterThreadExecuteController atec = new AfterThreadExecuteController(channelInputUsedForNotifyTask, channelUsedForAfterExecuted.out());
		processList.add(atec);
		final CSProcess[] pArray = new CSProcess[processList.size()];
		for(int i = 0;i<pArray.length;i++)
			pArray[i] = processList.get(i);
		new Thread(new Runnable() {
			@Override
			public void run() {
				new Parallel(pArray).run();
			}
		}).start();
		
	}

	@Override
	public void execute(Runnable command) {
		startCommandOut.write(command);		
	}
	
	//任务执行者
	static private class ExecuteProcess implements CSProcess
	{
		final ChannelInput in;
		final ChannelOutputInt out;
		final int index;
		
		public ExecuteProcess(ChannelInput i,ChannelOutputInt o,final int index) {
			in = i;
			out = o;
			this.index = index;
		}
		@Override
		public void run() {	
			while(true)
			{
				Runnable r = (Runnable)in.read();
				r.run();
				System.out.println("complete");
				out.write(index);				
			}
		}
	}
	//任务控制者
	static private class ThreadExecuteController implements CSProcess
	{
		private final int MAXTHREADNUM;
		final ChannelOutput[] executeCS;
		private Queue<ChannelOutput> waitingThreadQueue = new LinkedList<ChannelOutput>();
		private Queue<Runnable> waitingTask = new LinkedList<Runnable>();
		Alternative executeAlt;
		final int COMMANDIN = 0;
		final int COMPLETECSINDEX = 1;
		AltingChannelInput commandIn;
		AltingChannelInputInt completeCSIndex;
		
		public ThreadExecuteController(int max,AltingChannelInput commandIn,AltingChannelInputInt completeCSIndex
				,ChannelOutput[] executeCS) {
			MAXTHREADNUM = max;
			this.executeCS = executeCS;
			executeAlt = new Alternative(new Guard[]{commandIn,completeCSIndex});
			this.commandIn = commandIn;
			this.completeCSIndex = completeCSIndex;
			for(ChannelOutput co:executeCS)
			{
				waitingThreadQueue.offer(co);
			}
		}
		
		public void run() {
			while(true)
			{
				switch(executeAlt.select())
				{
					//如果是有新任务运行
					case COMMANDIN:
					{
						ChannelOutput o = waitingThreadQueue.poll();
						if(o == null)
						{
							waitingTask.offer((Runnable)commandIn.read());
						}
						else
						{
							o.write(commandIn.read());
						}
						break;
					}
					//如果是有任务结束
					case COMPLETECSINDEX:
					{
						//本应该先看看是否有正好需要运行的任务
						//暂时不知道怎么让select立刻返回,所以只能直接从等待队列取
						Runnable r = waitingTask.poll();
						if(r == null)
						{
							int i = completeCSIndex.read();
							waitingThreadQueue.offer(executeCS[i]);							
						}
						else
						{
							int i = completeCSIndex.read();
							executeCS[i].write(r);
						}
						break;
					}
				}
			}
		}
	}
	//任务结束管理者(暂时没做啥事,就把结束的任务索引丢回给管理者)
	static private class AfterThreadExecuteController implements CSProcess
	{
		final ChannelOutputInt out;
		Alternative executedAlt;
		
		public AfterThreadExecuteController(AltingChannelInputInt[] indexes,ChannelOutputInt o) {
			out = o;
			executedAlt = new Alternative(indexes);
		}
		
		public void run() {
			while(true)
			{
				//maybe do something else
				out.write(executedAlt.select());
			}
		}
	}
}


跑了一下,发现一下就死锁了~

管理者,阻塞在:executeCS[i].write(r);

执行者,阻塞在:o.write(commandIn.read());

貌似还是有死锁的问题啊,不过由于全是一个个锁围绕着,找问题简单多了,倒是文中所说的验证方法不知道是什么
大概调试了一下,基本上是没加缓冲区导致的,但效率差得惊人,大概是任务太简单了,然后都是线程切换的消耗,真正执行任务的计算反而少了,毕竟不是应用于这个场景的东西,也正常了。

本来打算看下源码的(因为之前以为里面有比较多的奥妙),调试之后发现,每个CSProcess都是用一个守护线程,然后通道都是阻塞的,基本上也没太多奇妙之处,文章中也说基本没使用1.5以后的同步相关的特性,不看也罢。只是这个抽象还是挺好的,很简单就能整理清楚自己的实现思路。



  • 大小: 23.1 KB
  • 大小: 13.4 KB
  • 大小: 13.5 KB
0
1
分享到:
评论

相关推荐

    JCSP.rar_jc

    【标题】"JCSP.rar_jc" 是一个与Java并发编程相关的压缩包,其中包含的资源是英国肯特大学开发的JCSP(Java Concurrent Sequential Processes)库。JCSP是一个实现进程代数概念的库,它使得Java程序员在设计和实现多...

    Formal Analysis of Concurrent Java Systems

    - **贡献**:本文提供了一种简单且形式化的CSP(Communicating Sequential Processes)模型来弥补这一缺陷,并在形式化环境中进行推理以提高置信度。 #### 标签:并发Java - **并发Java**:指利用Java提供的多线程...

    兼职小程序源码java-jcsp:https://xircles.codehaus.org/projects/jcsp/repos/prima

    【标题】"兼职小程序源码java-jcsp:...通过研究这个开源项目,开发者不仅可以提升Java编程技能,还能了解如何设计和实现一个具有并发特性的Web应用,并参与到开源社区中,学习最佳实践和协作模式。

    UCaPE:有效地使用并发性和并行性,可从bookboon.com免费获得Jon Kerridge的i和ii部分,其中显示了许多使用jcsp和groovyJCSP库的示例

    并将它们与以下库关联JCSP从 groovyJCSP从 build.gradle文件假定您已将所需的库二进制文件下载到Maven本地存储库中您还需要在类路径中包含junit.jar 您将需要确保库的Java和groovy版本与您在IDE中使用的版本匹配。...

    我爱世界杯网PHP源码

    对于学习PHP的开发者来说,这是一个很好的实践项目,可以深入了解PHP在实际网站开发中的运用。同时,对于想要搭建类似世界杯主题网站的个人或企业,这个源码也是一个快速启动的起点。 在使用源码时,需要注意以下几...

    关注程序员健康(2020.12.14).rar

    标签“NOIP C++ CSP-JCSP-S”虽然主要涉及编程竞赛和编程语言,但这里可能是提醒我们,即使在高强度的编程学习和竞赛中,也要注意个人健康。 这个压缩包内的文件包括: 1. "电脑前超过4小时必须要做的一些事情":这...

    NOI Linux学习资料集(PDF)-2020-10-27.rar

    【NOI Linux学习资料集】是一份专门为信息学竞赛(NOI)参与者准备的Linux学习资源,涵盖了从基础到进阶的各种知识点。这份资料集旨在帮助参赛者掌握Linux操作系统,以便在比赛中更好地应对编程和系统操作任务。以下...

    AjErl:用 C# 编写的类似 Erlang 的语言解释器

    学习 Erlang:编码教程 有人可以解释一下 Erlang 中 Pid 的结构吗? 并发编程 错误和过程 列表 Erlang 的重大变化 流程 “绿色线程”和 Erlang 的进程有什么区别? 特别是关于 Erlang/OTP 和多核性能 通信顺序过程...

    第1章 哈希和哈希表-2020.08.19.pdf

    3. **调整负载因子**:负载因子是指哈希表中的元素数量与表的容量之比。过高的负载因子会导致冲突增加,降低性能;过低的负载因子会浪费空间资源。 ### 三、字符串哈希 #### 3.1 概念 字符串哈希是针对字符串设计...

    NOIp复赛注意事项(2016).pdf

    ### NOIp复赛注意事项解析 #### 一、赛前准备 **准考证与身份证明**:参加NOIp复赛的选手需携带准考证以及有效身份证件(如身份证或其他带照片的有效证件),这是入场的重要凭证。 **文具准备**:虽然考试提供...

    noip普及组(初赛)试题精选及讲解(好).pdf

    本资源摘要信息涵盖了NOIP普及组(初赛)试题精选及讲解的所有领域,涉及到计算机系统、计算机存储容量、进制转换、计算机网络和数据库等多个领域,为NOIP普及组(初赛)考生提供了一个全面的学习资源。

    第4章 第2节 图的遍历-2019-01-17.pdf

    图的遍历是图论算法中的基础操作,用于系统地访问图中所有顶点,确保每个顶点只被访问一次。这一概念在计算机科学中,特别是在数据结构和算法设计中,有着广泛的应用。本节主要介绍了两种遍历方法:深度优先遍历...

    NOIP2006-2018初赛+2019CSP-J CSP-S初赛(2020.09.27).rar

    NOIP2006-2018初赛+2019CSP-J CSP-S初赛(2020.09.27) NOIP2006-2018初赛+2019CSP-J CSP-S初赛(2020.09.27) NOIP2006-2018初赛+2019CSP-J CSP-S初赛(2020.09.27) NOIP2006-2018初赛+2019CSP-J CSP-S初赛(2020....

    CSP-J CSP-S模拟题(2020.09.20).rar

    CSP-J CSP-S模拟题(2020.09.20).rar

Global site tag (gtag.js) - Google Analytics