`
qiezi
  • 浏览: 497906 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

像Erlang一样写D程序

    博客分类:
  • D
阅读更多
琢磨了好久,一直没时间来做它。在讨论这个问题的时候就已经有这想法了,后来发现tango里已经有Fiber的实现,昨天终于抽了点时间做了个简单的小玩意,离真实应用还差得很远。

测试代码:
import light_process;
import tango.io.Stdout;

void test() {
	Stdout("test").newline;
	receive(
		(Process pid, float f) {
			Stdout("test: receive float: ", f).newline;
			pid.send(f);
		},
		(Process pid, int i) {
			Stdout("test: receive int: ", i).newline;
			pid.send(i * 2);
		},
		(NotMatched) {
			Stdout("test: not matched").newline;
		}
	).timeout(3000, {
		Stdout("test: timeout").newline;
	}).end;

	test();
}

void test1(Process pid){
	Stdout("test1").newline;
	//sleep(1);
	pid.send(self(), 7.7f);
	pid.send(self(), cast(int)3);

	test1_loop();
}

void test1_loop() {
	receive(
		(float f) {
			Stdout("test1: received result: ", f).newline;
		},
		(int value) {
			Stdout("test1: received result: ", value).newline;
		}
	).timeout(3000, {
		Stdout("test1: timeout").newline;
	}).end;

	test1_loop();
}

void sleep(size_t usecs) {
	receive(

	).timeout(usecs, {
	
	}).end;
}

void de_start() {
	auto pid = spawn(&test);
	spawn(&test1, pid);
	//sleep(3);
}

void main(){
	de_start();
	Process.run();
}


实现:
module light_process;

import tango.core.Thread;
import tango.core.Traits;

class NotMatched{
}

class Message {
	TypeInfo typeInfo__;
	public this(TypeInfo typeInfo__) {
		this.typeInfo__ = typeInfo__;
	}
}

class MessageT(Types...) : public Message{
	Types values;

	public this(Types values_){
		super(typeid(typeof(Types)));
		foreach(i, e; values_) {
			values[i] = values_[i];
		}
	}
}

class Delegate {
	abstract void call(Message msg);

	abstract bool matchType(TypeInfo typeInfo__);
}

class DelegateT(Types...) : public Delegate{
	void delegate(Types) dg;
	void function(Types) func;

	this(void delegate(Types) dg){
		this.dg = dg;
	}

	this(void function(Types) func){
		this.func = func;
	}

	override void call(Message msg) {
		auto message = cast(MessageT!(Types))msg;
		if (dg)
			dg(message.values);
		else
			func(message.values);
	}

	override bool matchType(TypeInfo typeInfo__) {
		if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
			return true;
		return false;
	}
}

class Process : public Fiber{
	// static members
	public static Process getThis() {
		return cast(Process)Fiber.getThis();
	}

	private static Process[] processes;

	// instance members
	private Message[] messages;

	private Delegate[] handlers;

	public this() {
		super(&exec);
	}

	public void clearMessageHandlers() {
		handlers.length = 0;
	}

	public void addMessageHandler(Delegate dg){
		handlers ~= dg;
	}

	public Process timeout(int timeout, void delegate() dg) {
		return this;
	}

	public void end() {
		Fiber.yield();
		domsg();
	}

	public Process send(Args ...)(Args args) {
		messages ~= new MessageT!(Args)(args);
		return this;
	}

	public void domsg() {
		while (!messages.length) {
			Fiber.yield();
		}
		Message msg = messages[0];
		foreach(dg; handlers) {
			if (dg.matchType(msg.typeInfo__)) {
				dg.call(msg);
				messages = messages[1..$];
				break;
			}
		}
	}

	public static void run() {
		while(true) {
			foreach(process; Process.processes) {
				if (process.state != Fiber.State.TERM) {
					process.call();
				}
			}
		}
	}

	protected abstract void exec();
}

class ProcessT(Ret, Args...) : public Process {
	Ret delegate(Args) dg;
	Ret function(Args) func;
	Args args;

	public this(Ret delegate(Args) dg, Args args) {
		this.dg = dg;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public this(Ret function(Args) func, Args args) {
		this.func = func;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public void exec() {
		if (dg)
			dg(args);
		else
			func(args);
	}
}

Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
	Process p = new ProcessT!(Ret, Args)(dg, args);
	Process.processes ~= p;
	return p;
}

Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
	Process p =  new ProcessT!(Ret, Args)(func, args);
	Process.processes ~= p;
	return p;
}

Process receive(Dgs ...)(Dgs dgs) {
	Process self = Process.getThis();
	self.clearMessageHandlers();
	foreach(i, dg; dgs) {
		Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))
			(dgs[i]);
		self.addMessageHandler(handler);
	}
	return self;
}

Process self() {
	return Process.getThis();
}


依赖tango。

目前只实现了基本的消息发送接收,为了简单目前使用内置数组来保存数据。整个驱动过程是对所有进程顺序进行调度,没有过优化是否有需要调度,只是快速做了个原型。timeout功能也没有实现,这个有时间可以重写调度部分来完成,因为这个部分就是随意写了一个,测试是否有可能实现类似erlang的高度过程。所以这个驱动过程实际上是个死循环,测试几秒就强行结束程序吧,CPU被烧本人概不负责。

递归是比较麻烦的,不知道D有没有优化尾递归,我想应该还是有的吧,不然上面那个程序应该早就溢出了。receive的各个委托参数里是没办法优化成尾递归的,解决办法当然有,就是麻烦点,谁叫咱不是FP语言呢。。(这年头,不是FP你都感觉抬不起头。。。)
分享到:
评论
18 楼 qiezi 2007-09-18  
可气的是python也这么快。。。
17 楼 qiezi 2007-09-18  
优化到9秒了,感觉没办法进一步提高了,从12秒到9秒的一点提高就是Delegate对象改成scope了,直接在栈上分配。

erlang的这个应用实际上所有进程都没有退出,直到最后执行exit前没有进程退出,所以可能还没到清扫那一步,但它的内存占用也比较低,还是可以进一步分析一下。
16 楼 tomqyp 2007-09-18  
为什么动态语言在这方面都好强,那个mozart-oz看它的语言特性介绍好像也很酷
15 楼 Trustno1 2007-09-18  
erlang的私有堆清扫非常方便,process退出,整个私有堆就撤掉.完全不用管其他的process里面的东西.而D/python这样的共享堆需要清扫的根集非常之大.python关掉gc disable能够跑的快,是因为python最早的gc是基于reference counting实现的,关闭gc相当于启用reference counting,对于cheap-concurrency这种应用,reference counting比mark-sweep GC省力很多,只要你不出reference cycles.而D的gc disable只是在性能攸关但是内存充足的情况下允许程序员关闭GC完全不清扫垃圾,等到性能瓶颈运算过去之后,再由程序员手工打开gc清扫垃圾,是一个纯粹空间换时间的策略.
14 楼 qiezi 2007-09-18  
看样子没办法让它提速500倍,优化调度过程,把LinkSeq换成D的数组以后,提升到12秒,D的线程版本只用8秒,erlang才1秒多,所以还相差很多。erlang是在windows上测试的,如果是linux下,HiPE应该还可以更快。
13 楼 qiezi 2007-09-18  
这个测试目前erlang是排第二的,他的轻量级线程切换开销比fiber/ucontext更小,实际上python版本也排在c++/D前面.

erlang版本在我的机器上运行是1秒多,我的D版本运行是304秒,如果真能加速500倍的话倒是可以超过它.我测试了关闭GC,结果内存用到640M,用到了交换文件,成绩反而下降了.
12 楼 redsea 2007-09-18  
他这个是比较公平的,各种语言都可以针对测试做特别的优化,那个gcc的版本就有用ucontext的,调度时甚至是按消息发送的顺序进行调度,但就是跑不过erlang。。所以我这个版本估计也不会好。

现在没做任何优化,每次都要调度500个“进程”,实际上每次只有一个进程有消息,所以最理想情况下能提速500倍,我计算过似乎还是比不上erlang版本。


erlang 这么强, 比专门写的代码都强, 原因何在呢 ?

哦, 对了 python 的代码是一来就关闭 gc的, 你做了这个没有.
11 楼 qiezi 2007-09-18  
他这个是比较公平的,各种语言都可以针对测试做特别的优化,那个gcc的版本就有用ucontext的,调度时甚至是按消息发送的顺序进行调度,但就是跑不过erlang。。所以我这个版本估计也不会好。

现在没做任何优化,每次都要调度500个“进程”,实际上每次只有一个进程有消息,所以最理想情况下能提速500倍,我计算过似乎还是比不上erlang版本。
10 楼 redsea 2007-09-18  
to: DavidL
tango 的fibre据说是assertfalse.com mikolalysenko的coroutine。language shootout的coroutine测试我不太懂其他语言的实现,然而感觉是对D不公平的,D用的是系统的线程。其他的可能根本就没有启动系统线程


shootout 的代码都是用户自己提供的, 算不上不公平, 如果qiezi 优化好了他的代码, 一样可以提供上去, 只要符合规定, 结果正确, 可以取代原来的代码作为 D 的性能基准的.
9 楼 qiezi 2007-09-18  
做了个可工作的版本,完成cheap_concurrency的功能,结果是正确的,不过效率那是相当地低,主要是调度部分并没有优化。优化的策略就是每次只调度处于receive状态并且有message的进程,有空再进行,可能要这周末了。先发出来,有兴趣的不妨改改看。纯属好玩,不一定有什么价值。

cheap_concurrency测试代码是从erlang版本改过来的,不过由于递归会导致堆栈溢出,所以这里改成了循环。

1.cheap_concurrency.d
import light_process;
import tango.io.Stdout;
import tango.text.convert.Integer;

void de_main(int N) {
	Process Last = start(500, self());
	Stdout(sendtimes(N, Last, 0)).newline;
}

Process start(int X, Process LastPID) {
	if (X == 0) return LastPID;
	return start(X-1, spawn(&loop, LastPID));
}

void loop(Process LastPID) {
	while(true) {
		receive(
			(int N) {
				LastPID.send(N+1);
			}
		).end;
	}
}

int sendtimes(int N, Process Last, int X) {
	for(int i=0; i<N; i++) {
		Last.send(0);
		int y;
		receive(
			(int Y) {
				y = Y;
			}
		).end;
		X += y;
	}
	return X;
}


void main(char[][] args){
	if (args.length != 2){
		Stdout("USAGE: " ~ args[0] ~ " N").newline;
		return;
	}
	int n = toInt(args[1]);
	spawn(&de_main, n);
	Process.run();
}


2.light_process.d
module light_process;

import tango.io.Stdout;
import tango.core.Thread;
import tango.core.Traits;
import tango.util.collection.LinkSeq;

class NotMatched{
}

class Message {
	TypeInfo typeInfo__;
	public this(TypeInfo typeInfo__) {
		this.typeInfo__ = typeInfo__;
	}
}

class MessageT(Types...) : public Message{
	Types values;

	public this(Types values_){
		super(typeid(typeof(Types)));
		foreach(i, e; values_) {
			values[i] = values_[i];
		}
	}
}

class Delegate {
	abstract void call(Message msg);

	abstract bool matchType(TypeInfo typeInfo__);
}

class DelegateT(Types...) : public Delegate{
	void delegate(Types) dg;
	void function(Types) func;

	this(void delegate(Types) dg){
		this.dg = dg;
	}

	this(void function(Types) func){
		this.func = func;
	}

	override void call(Message msg) {
		auto message = cast(MessageT!(Types))msg;
		if (dg)
			dg(message.values);
		else
			func(message.values);
	}

	override bool matchType(TypeInfo typeInfo__) {
		if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
			return true;
		return false;
	}
}

class Process : public Fiber{
	// static members
	public static Process getThis() {
		return cast(Process)Fiber.getThis();
	}

	// receiving processes
	private static bool[Process] receivingProcesses;

	// instance members
	private LinkSeq!(Message) messages;

	private LinkSeq!(Delegate) handlers;

	public this() {
		super(&exec);
		messages = new LinkSeq!(Message);
		handlers = new LinkSeq!(Delegate);
	}

	public void startReceiving() {
		receivingProcesses[this] = true;
	}

	public void stopReceiving() {
		receivingProcesses.remove(this);
	}

	public void clearMessageHandlers() {
		handlers.clear();
	}

	public void addMessageHandler(Delegate dg){
		handlers.append(dg);
	}

	public Process timeout(int timeout, void delegate() dg) {
		return this;
	}

	public void end() {
		Fiber.yield();
		domsg();
	}

	public Process send(Args ...)(Args args) {
		messages.append(new MessageT!(Args)(args));
		return this;
	}

	public void domsg() {
		while(messages.size() == 0) {
			Fiber.yield();
		}

		Message msg = messages.head();
		foreach(dg; handlers) {
			if (dg.matchType(msg.typeInfo__)) {
				dg.call(msg);
				messages.removeHead();
				break;
			}
		}
	}

	public static void run() {
		while(true) {
			foreach(process, dummy; Process.receivingProcesses) {

				if (process.state != Fiber.State.TERM) {
					process.call();
				}
			}
		}
	}

	protected abstract void exec();
}

class ProcessT(Ret, Args...) : public Process {
	Ret delegate(Args) dg;
	Ret function(Args) func;
	Args args;

	public this(Ret delegate(Args) dg, Args args) {
		this.dg = dg;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public this(Ret function(Args) func, Args args) {
		this.func = func;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public void exec() {
		if (dg)
			dg(args);
		else
			func(args);
	}
}

Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
	Process p = new ProcessT!(Ret, Args)(dg, args);
	p.call();
	return p;
}

Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
	Process p =  new ProcessT!(Ret, Args)(func, args);
	p.call();
	return p;
}

Process receive(Dgs ...)(Dgs dgs) {
	Process pid = self();
	pid.clearMessageHandlers();
	foreach(i, dg; dgs) {
		Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))(dgs[i]);
		pid.addMessageHandler(handler);
	}

	pid.startReceiving();
	return pid;
}

Process self() {
	return Process.getThis();
}
8 楼 qiezi 2007-09-17  
看样子要跑过erlang很难啊,那里有一个gcc的测试,使用ucontext的,竟然比python还差,更不用说erlang了。。
7 楼 DavidL 2007-09-16  
tango的fibre据说是assertfalse.com mikolalysenko的coroutine。language shootout的coroutine测试我不太懂其他语言的实现,然而感觉是对D不公平的,D用的是系统的线程。其他的可能根本就没有启动系统线程
6 楼 qiezi 2007-09-15  
redsea 写道
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.

这个测试对 er-lang 来说, 简直是量身定做



原来如此,这个测试D/C++成绩也太差了,线程开销还是太大了些,应该很容易用轻量级线程来提高吧。目前首先要做的是做个调度器,先让它跑起来再说。
5 楼 redsea 2007-09-15  
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.

这个测试对 er-lang 来说, 简直是量身定做


4 楼 qiezi 2007-09-15  
redsea 写道
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错.

D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数.

BTW: C++ 的版本也是用 thread 的, 性能比D差.

fiber不一定会比thread性能高的,特别是大量计算的只会降低效率。只是更容易生成大量的“线程”,提高并行能力。如果实现erlang这种架构,就可以让程序写起来更方便,用同步的写法就完成了异步的调用,对于库的编写要求会比较高。

上面这个测试程序只是简单测试一下可行性,看有没有人有兴趣一起研究一下,这种架构最主要的难点在于调度,不过这是个不断优化的过程,关键是看有没有简化编程。D的垃圾收集对于委托的影响还没测试到呢,它和fiber的配合程度怎样也不清楚,毕竟fiber的堆栈是要不断切换的,而D的GC在扫描堆栈时只是扫描线程栈。
3 楼 qiezi 2007-09-15  
Colorful 写道
听说FP并行有优势,但是俺命令式陷入太深了!

是更容易做成并行,但这也依赖于实现。
2 楼 redsea 2007-09-15  
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错.

D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数.

BTW: C++ 的版本也是用 thread 的, 性能比D差.
1 楼 Colorful 2007-09-15  
听说FP并行有优势,但是俺命令式陷入太深了!

相关推荐

    erlang程序设计相关例子程序

    通过对这些文件名的分析,我们可以看出这些Erlang程序涵盖了并发处理、进程通信、异常处理、服务器设计、数据处理和测试等多个方面,体现了Erlang在构建分布式系统中的强大功能。学习并理解这些示例,对于深入掌握...

    Erlang程序设计(第2版)1

    【Erlang程序设计(第2版)】是由Erlang之父Joe Armstrong撰写的一本经典著作,专注于介绍Erlang编程语言在并发、分布式和容错系统中的应用。本书适用于初学者和有一定经验的Erlang程序员。作者在书中讨论了如何利用...

    erlang程序设计与入门

    **Erlang程序设计与入门** Erlang是一种并发、函数式编程语言,主要用于构建分布式、高可用性、容错性强的系统。它的设计灵感来源于电信行业的需求,由瑞典爱立信公司于1986年开发。Erlang以其独特的并发模型、轻量...

    Erlang程序设计(第二版)及源码

    书中兼顾了顺序编程、并发编程和分布式编程,重点介绍如何编写并发和分布式的Erlang程序以及如何在多核CPU上自动加速程序,并深入地讨论了开发Erlang应用中至关重要的文件和网络编程、OTP、ETS和DETS等主题。...

    用Erlang写了个解八数码的小程序

    标题中的“用Erlang写了个解八数码的小程序”指的是使用Erlang编程语言实现的一个解决8数码问题(也称为滑动拼图)的算法。8数码问题是一个经典的计算机科学问题,它涉及到在一个3x3的网格上,通过空格与其他数字...

    Erlang程序设计中文版

    **Erlang程序设计中文版**是一本深入探讨Erlang编程语言的书籍,旨在帮助开发者理解和掌握这种在并发处理和分布式系统中广泛使用的语言。Erlang以其强大的错误恢复能力、热代码替换以及对大规模并发的支持而闻名,是...

    erlang程序设计2

    erlang发明者写的书。erlang/otp一种高可靠性的平台。

    Erlang并发编程,Erlang程序设计,Erlang中文手册

    Erlang并发编程,Erlang程序设计,Erlang中文手册。 学习erlang的好资料。  Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此...

    Erlang程序设计及源码

    本资源包含了一本Erlang程序设计的入门经典书籍及其配套源码,适合初学者深入理解Erlang编程。 1. **Erlang简介** Erlang以其强大的并发处理能力而闻名,它采用了轻量级进程模型,使得在单个系统中可以同时运行...

    Erlang程序设计 第2版

    Erlang程序设计 第2版 Erlang程序设计 第2版Erlang程序设计 第2版

    Erlang程序设计,包含完整目录和全套源码

    这个压缩包包含了Erlang程序设计的完整目录和源码,是学习和理解Erlang编程的重要资源。 Erlang的并发特性源于其轻量级进程模型,每个进程都有自己的内存空间,进程间通信通过消息传递实现,这种设计降低了并发执行...

    erlang -c语言程序接口.pdf

    ### Erlang与C语言程序接口详解 #### 一、引言 在软件开发领域,不同编程语言之间的交互是一项重要的技术。Erlang作为一种专为构建高并发、容错性强的应用程序而设计的语言,在与其他语言(如C语言)的集成方面具有...

    [Erlang程序设计]源代码

    **Erlang程序设计源代码详解** Erlang是一种面向并发、函数式编程语言,尤其在分布式系统和高可用性领域表现出色。本资源包含了《Erlang程序设计》一书的所有实例代码,旨在帮助读者深入理解Erlang语言的核心特性和...

    erlang 程序设计 源码

    在深入理解Erlang程序设计的源码之前,我们需要了解Erlang的一些核心概念和特性。 1. **并发性**:Erlang的并发模型基于轻量级进程(Lightweight Processes,LWP),它们类似于操作系统中的线程,但资源消耗小得多...

    erlang程序设计中文版

    尽管如此,Erlang的活跃度和影响力在一段时间里似乎并不像它的同辈语言那样广泛。然而,由于其语言设计上的一些特性,如轻量级进程、消息传递机制和容错能力,Erlang在多核和分布式计算的背景下找到了自己的位置,...

    erlang程序设计

    在深入探讨Erlang程序设计之前,我们先来了解一下Erlang的基础概念。 1. 函数式编程:Erlang是一种纯函数式编程语言,这意味着函数不具有副作用,它们仅根据输入产生输出,不改变外部状态。这种特性使得代码更容易...

    Erlang程序设计].源代码

    在"Erlang程序设计"].源代码这个压缩包中,我们可以找到学习和实践Erlang编程的相关材料。这些源代码可能是书中实例的实现,或者是针对Erlang编程技巧和概念的示例。通过阅读和分析这些源代码,你可以深入理解Erlang...

Global site tag (gtag.js) - Google Analytics