论坛首页 编程语言技术论坛

像Erlang一样写D程序

浏览 13641 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2007-09-15  
琢磨了好久,一直没时间来做它。在讨论这个问题的时候就已经有这想法了,后来发现tango里已经有Fiber的实现,昨天终于抽了点时间做了个简单的小玩意,离真实应用还差得很远。

测试代码:
import light_process;
import tango.io.Stdout;

void test() {
	Stdout("test").newline;
	receive(
		(Process pid, float f) {
			Stdout("test: receive float: ", f).newline;
			pid.send(f);
		},
		(Process pid, int i) {
			Stdout("test: receive int: ", i).newline;
			pid.send(i * 2);
		},
		(NotMatched) {
			Stdout("test: not matched").newline;
		}
	).timeout(3000, {
		Stdout("test: timeout").newline;
	}).end;

	test();
}

void test1(Process pid){
	Stdout("test1").newline;
	//sleep(1);
	pid.send(self(), 7.7f);
	pid.send(self(), cast(int)3);

	test1_loop();
}

void test1_loop() {
	receive(
		(float f) {
			Stdout("test1: received result: ", f).newline;
		},
		(int value) {
			Stdout("test1: received result: ", value).newline;
		}
	).timeout(3000, {
		Stdout("test1: timeout").newline;
	}).end;

	test1_loop();
}

void sleep(size_t usecs) {
	receive(

	).timeout(usecs, {
	
	}).end;
}

void de_start() {
	auto pid = spawn(&test);
	spawn(&test1, pid);
	//sleep(3);
}

void main(){
	de_start();
	Process.run();
}


实现:
module light_process;

import tango.core.Thread;
import tango.core.Traits;

class NotMatched{
}

class Message {
	TypeInfo typeInfo__;
	public this(TypeInfo typeInfo__) {
		this.typeInfo__ = typeInfo__;
	}
}

class MessageT(Types...) : public Message{
	Types values;

	public this(Types values_){
		super(typeid(typeof(Types)));
		foreach(i, e; values_) {
			values[i] = values_[i];
		}
	}
}

class Delegate {
	abstract void call(Message msg);

	abstract bool matchType(TypeInfo typeInfo__);
}

class DelegateT(Types...) : public Delegate{
	void delegate(Types) dg;
	void function(Types) func;

	this(void delegate(Types) dg){
		this.dg = dg;
	}

	this(void function(Types) func){
		this.func = func;
	}

	override void call(Message msg) {
		auto message = cast(MessageT!(Types))msg;
		if (dg)
			dg(message.values);
		else
			func(message.values);
	}

	override bool matchType(TypeInfo typeInfo__) {
		if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
			return true;
		return false;
	}
}

class Process : public Fiber{
	// static members
	public static Process getThis() {
		return cast(Process)Fiber.getThis();
	}

	private static Process[] processes;

	// instance members
	private Message[] messages;

	private Delegate[] handlers;

	public this() {
		super(&exec);
	}

	public void clearMessageHandlers() {
		handlers.length = 0;
	}

	public void addMessageHandler(Delegate dg){
		handlers ~= dg;
	}

	public Process timeout(int timeout, void delegate() dg) {
		return this;
	}

	public void end() {
		Fiber.yield();
		domsg();
	}

	public Process send(Args ...)(Args args) {
		messages ~= new MessageT!(Args)(args);
		return this;
	}

	public void domsg() {
		while (!messages.length) {
			Fiber.yield();
		}
		Message msg = messages[0];
		foreach(dg; handlers) {
			if (dg.matchType(msg.typeInfo__)) {
				dg.call(msg);
				messages = messages[1..$];
				break;
			}
		}
	}

	public static void run() {
		while(true) {
			foreach(process; Process.processes) {
				if (process.state != Fiber.State.TERM) {
					process.call();
				}
			}
		}
	}

	protected abstract void exec();
}

class ProcessT(Ret, Args...) : public Process {
	Ret delegate(Args) dg;
	Ret function(Args) func;
	Args args;

	public this(Ret delegate(Args) dg, Args args) {
		this.dg = dg;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public this(Ret function(Args) func, Args args) {
		this.func = func;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public void exec() {
		if (dg)
			dg(args);
		else
			func(args);
	}
}

Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
	Process p = new ProcessT!(Ret, Args)(dg, args);
	Process.processes ~= p;
	return p;
}

Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
	Process p =  new ProcessT!(Ret, Args)(func, args);
	Process.processes ~= p;
	return p;
}

Process receive(Dgs ...)(Dgs dgs) {
	Process self = Process.getThis();
	self.clearMessageHandlers();
	foreach(i, dg; dgs) {
		Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))
			(dgs[i]);
		self.addMessageHandler(handler);
	}
	return self;
}

Process self() {
	return Process.getThis();
}


依赖tango。

目前只实现了基本的消息发送接收,为了简单目前使用内置数组来保存数据。整个驱动过程是对所有进程顺序进行调度,没有过优化是否有需要调度,只是快速做了个原型。timeout功能也没有实现,这个有时间可以重写调度部分来完成,因为这个部分就是随意写了一个,测试是否有可能实现类似erlang的高度过程。所以这个驱动过程实际上是个死循环,测试几秒就强行结束程序吧,CPU被烧本人概不负责。

递归是比较麻烦的,不知道D有没有优化尾递归,我想应该还是有的吧,不然上面那个程序应该早就溢出了。receive的各个委托参数里是没办法优化成尾递归的,解决办法当然有,就是麻烦点,谁叫咱不是FP语言呢。。(这年头,不是FP你都感觉抬不起头。。。)
   发表时间:2007-09-15  
听说FP并行有优势,但是俺命令式陷入太深了!
0 请登录后投票
   发表时间:2007-09-15  
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错.

D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数.

BTW: C++ 的版本也是用 thread 的, 性能比D差.
0 请登录后投票
   发表时间:2007-09-15  
Colorful 写道
听说FP并行有优势,但是俺命令式陷入太深了!

是更容易做成并行,但这也依赖于实现。
0 请登录后投票
   发表时间:2007-09-15  
redsea 写道
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错.

D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数.

BTW: C++ 的版本也是用 thread 的, 性能比D差.

fiber不一定会比thread性能高的,特别是大量计算的只会降低效率。只是更容易生成大量的“线程”,提高并行能力。如果实现erlang这种架构,就可以让程序写起来更方便,用同步的写法就完成了异步的调用,对于库的编写要求会比较高。

上面这个测试程序只是简单测试一下可行性,看有没有人有兴趣一起研究一下,这种架构最主要的难点在于调度,不过这是个不断优化的过程,关键是看有没有简化编程。D的垃圾收集对于委托的影响还没测试到呢,它和fiber的配合程度怎样也不清楚,毕竟fiber的堆栈是要不断切换的,而D的GC在扫描堆栈时只是扫描线程栈。
0 请登录后投票
   发表时间:2007-09-15  
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.

这个测试对 er-lang 来说, 简直是量身定做


0 请登录后投票
   发表时间:2007-09-15  
redsea 写道
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.

这个测试对 er-lang 来说, 简直是量身定做



原来如此,这个测试D/C++成绩也太差了,线程开销还是太大了些,应该很容易用轻量级线程来提高吧。目前首先要做的是做个调度器,先让它跑起来再说。
0 请登录后投票
   发表时间:2007-09-16  
tango的fibre据说是assertfalse.com mikolalysenko的coroutine。language shootout的coroutine测试我不太懂其他语言的实现,然而感觉是对D不公平的,D用的是系统的线程。其他的可能根本就没有启动系统线程
0 请登录后投票
   发表时间:2007-09-17  
看样子要跑过erlang很难啊,那里有一个gcc的测试,使用ucontext的,竟然比python还差,更不用说erlang了。。
0 请登录后投票
   发表时间:2007-09-18  
做了个可工作的版本,完成cheap_concurrency的功能,结果是正确的,不过效率那是相当地低,主要是调度部分并没有优化。优化的策略就是每次只调度处于receive状态并且有message的进程,有空再进行,可能要这周末了。先发出来,有兴趣的不妨改改看。纯属好玩,不一定有什么价值。

cheap_concurrency测试代码是从erlang版本改过来的,不过由于递归会导致堆栈溢出,所以这里改成了循环。

1.cheap_concurrency.d
import light_process;
import tango.io.Stdout;
import tango.text.convert.Integer;

void de_main(int N) {
	Process Last = start(500, self());
	Stdout(sendtimes(N, Last, 0)).newline;
}

Process start(int X, Process LastPID) {
	if (X == 0) return LastPID;
	return start(X-1, spawn(&loop, LastPID));
}

void loop(Process LastPID) {
	while(true) {
		receive(
			(int N) {
				LastPID.send(N+1);
			}
		).end;
	}
}

int sendtimes(int N, Process Last, int X) {
	for(int i=0; i<N; i++) {
		Last.send(0);
		int y;
		receive(
			(int Y) {
				y = Y;
			}
		).end;
		X += y;
	}
	return X;
}


void main(char[][] args){
	if (args.length != 2){
		Stdout("USAGE: " ~ args[0] ~ " N").newline;
		return;
	}
	int n = toInt(args[1]);
	spawn(&de_main, n);
	Process.run();
}


2.light_process.d
module light_process;

import tango.io.Stdout;
import tango.core.Thread;
import tango.core.Traits;
import tango.util.collection.LinkSeq;

class NotMatched{
}

class Message {
	TypeInfo typeInfo__;
	public this(TypeInfo typeInfo__) {
		this.typeInfo__ = typeInfo__;
	}
}

class MessageT(Types...) : public Message{
	Types values;

	public this(Types values_){
		super(typeid(typeof(Types)));
		foreach(i, e; values_) {
			values[i] = values_[i];
		}
	}
}

class Delegate {
	abstract void call(Message msg);

	abstract bool matchType(TypeInfo typeInfo__);
}

class DelegateT(Types...) : public Delegate{
	void delegate(Types) dg;
	void function(Types) func;

	this(void delegate(Types) dg){
		this.dg = dg;
	}

	this(void function(Types) func){
		this.func = func;
	}

	override void call(Message msg) {
		auto message = cast(MessageT!(Types))msg;
		if (dg)
			dg(message.values);
		else
			func(message.values);
	}

	override bool matchType(TypeInfo typeInfo__) {
		if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
			return true;
		return false;
	}
}

class Process : public Fiber{
	// static members
	public static Process getThis() {
		return cast(Process)Fiber.getThis();
	}

	// receiving processes
	private static bool[Process] receivingProcesses;

	// instance members
	private LinkSeq!(Message) messages;

	private LinkSeq!(Delegate) handlers;

	public this() {
		super(&exec);
		messages = new LinkSeq!(Message);
		handlers = new LinkSeq!(Delegate);
	}

	public void startReceiving() {
		receivingProcesses[this] = true;
	}

	public void stopReceiving() {
		receivingProcesses.remove(this);
	}

	public void clearMessageHandlers() {
		handlers.clear();
	}

	public void addMessageHandler(Delegate dg){
		handlers.append(dg);
	}

	public Process timeout(int timeout, void delegate() dg) {
		return this;
	}

	public void end() {
		Fiber.yield();
		domsg();
	}

	public Process send(Args ...)(Args args) {
		messages.append(new MessageT!(Args)(args));
		return this;
	}

	public void domsg() {
		while(messages.size() == 0) {
			Fiber.yield();
		}

		Message msg = messages.head();
		foreach(dg; handlers) {
			if (dg.matchType(msg.typeInfo__)) {
				dg.call(msg);
				messages.removeHead();
				break;
			}
		}
	}

	public static void run() {
		while(true) {
			foreach(process, dummy; Process.receivingProcesses) {

				if (process.state != Fiber.State.TERM) {
					process.call();
				}
			}
		}
	}

	protected abstract void exec();
}

class ProcessT(Ret, Args...) : public Process {
	Ret delegate(Args) dg;
	Ret function(Args) func;
	Args args;

	public this(Ret delegate(Args) dg, Args args) {
		this.dg = dg;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public this(Ret function(Args) func, Args args) {
		this.func = func;
		foreach(i, e; args) {
			this.args[i] = e;
		}
	}

	public void exec() {
		if (dg)
			dg(args);
		else
			func(args);
	}
}

Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
	Process p = new ProcessT!(Ret, Args)(dg, args);
	p.call();
	return p;
}

Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
	Process p =  new ProcessT!(Ret, Args)(func, args);
	p.call();
	return p;
}

Process receive(Dgs ...)(Dgs dgs) {
	Process pid = self();
	pid.clearMessageHandlers();
	foreach(i, dg; dgs) {
		Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))(dgs[i]);
		pid.addMessageHandler(handler);
	}

	pid.startReceiving();
	return pid;
}

Process self() {
	return Process.getThis();
}
0 请登录后投票
论坛首页 编程语言技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics