锁定老帖子 主题:像Erlang一样写D程序
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2007-09-15
这个问题的时候就已经有这想法了,后来发现tango里已经有Fiber的实现,昨天终于抽了点时间做了个简单的小玩意,离真实应用还差得很远。
琢磨了好久,一直没时间来做它。在讨论测试代码: import light_process; import tango.io.Stdout; void test() { Stdout("test").newline; receive( (Process pid, float f) { Stdout("test: receive float: ", f).newline; pid.send(f); }, (Process pid, int i) { Stdout("test: receive int: ", i).newline; pid.send(i * 2); }, (NotMatched) { Stdout("test: not matched").newline; } ).timeout(3000, { Stdout("test: timeout").newline; }).end; test(); } void test1(Process pid){ Stdout("test1").newline; //sleep(1); pid.send(self(), 7.7f); pid.send(self(), cast(int)3); test1_loop(); } void test1_loop() { receive( (float f) { Stdout("test1: received result: ", f).newline; }, (int value) { Stdout("test1: received result: ", value).newline; } ).timeout(3000, { Stdout("test1: timeout").newline; }).end; test1_loop(); } void sleep(size_t usecs) { receive( ).timeout(usecs, { }).end; } void de_start() { auto pid = spawn(&test); spawn(&test1, pid); //sleep(3); } void main(){ de_start(); Process.run(); } 实现: module light_process; import tango.core.Thread; import tango.core.Traits; class NotMatched{ } class Message { TypeInfo typeInfo__; public this(TypeInfo typeInfo__) { this.typeInfo__ = typeInfo__; } } class MessageT(Types...) : public Message{ Types values; public this(Types values_){ super(typeid(typeof(Types))); foreach(i, e; values_) { values[i] = values_[i]; } } } class Delegate { abstract void call(Message msg); abstract bool matchType(TypeInfo typeInfo__); } class DelegateT(Types...) : public Delegate{ void delegate(Types) dg; void function(Types) func; this(void delegate(Types) dg){ this.dg = dg; } this(void function(Types) func){ this.func = func; } override void call(Message msg) { auto message = cast(MessageT!(Types))msg; if (dg) dg(message.values); else func(message.values); } override bool matchType(TypeInfo typeInfo__) { if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types)) return true; return false; } } class Process : public Fiber{ // static members public static Process getThis() { return cast(Process)Fiber.getThis(); } private static Process[] processes; // instance members private Message[] messages; private Delegate[] handlers; public this() { super(&exec); } public void clearMessageHandlers() { handlers.length = 0; } public void addMessageHandler(Delegate dg){ handlers ~= dg; } public Process timeout(int timeout, void delegate() dg) { return this; } public void end() { Fiber.yield(); domsg(); } public Process send(Args ...)(Args args) { messages ~= new MessageT!(Args)(args); return this; } public void domsg() { while (!messages.length) { Fiber.yield(); } Message msg = messages[0]; foreach(dg; handlers) { if (dg.matchType(msg.typeInfo__)) { dg.call(msg); messages = messages[1..$]; break; } } } public static void run() { while(true) { foreach(process; Process.processes) { if (process.state != Fiber.State.TERM) { process.call(); } } } } protected abstract void exec(); } class ProcessT(Ret, Args...) : public Process { Ret delegate(Args) dg; Ret function(Args) func; Args args; public this(Ret delegate(Args) dg, Args args) { this.dg = dg; foreach(i, e; args) { this.args[i] = e; } } public this(Ret function(Args) func, Args args) { this.func = func; foreach(i, e; args) { this.args[i] = e; } } public void exec() { if (dg) dg(args); else func(args); } } Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) { Process p = new ProcessT!(Ret, Args)(dg, args); Process.processes ~= p; return p; } Process spawn(Ret, Args...)(Ret function(Args) func, Args args) { Process p = new ProcessT!(Ret, Args)(func, args); Process.processes ~= p; return p; } Process receive(Dgs ...)(Dgs dgs) { Process self = Process.getThis(); self.clearMessageHandlers(); foreach(i, dg; dgs) { Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i]))) (dgs[i]); self.addMessageHandler(handler); } return self; } Process self() { return Process.getThis(); } 依赖tango。 目前只实现了基本的消息发送接收,为了简单目前使用内置数组来保存数据。整个驱动过程是对所有进程顺序进行调度,没有过优化是否有需要调度,只是快速做了个原型。timeout功能也没有实现,这个有时间可以重写调度部分来完成,因为这个部分就是随意写了一个,测试是否有可能实现类似erlang的高度过程。所以这个驱动过程实际上是个死循环,测试几秒就强行结束程序吧,CPU被烧本人概不负责。 递归是比较麻烦的,不知道D有没有优化尾递归,我想应该还是有的吧,不然上面那个程序应该早就溢出了。receive的各个委托参数里是没办法优化成尾递归的,解决办法当然有,就是麻烦点,谁叫咱不是FP语言呢。。(这年头,不是FP你都感觉抬不起头。。。) 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2007-09-15
听说FP并行有优势,但是俺命令式陷入太深了!
|
|
返回顶楼 | |
发表时间:2007-09-15
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错. D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数. BTW: C++ 的版本也是用 thread 的, 性能比D差. |
|
返回顶楼 | |
发表时间:2007-09-15
Colorful 写道 听说FP并行有优势,但是俺命令式陷入太深了!
是更容易做成并行,但这也依赖于实现。 |
|
返回顶楼 | |
发表时间:2007-09-15
redsea 写道 language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错. D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数. BTW: C++ 的版本也是用 thread 的, 性能比D差. fiber不一定会比thread性能高的,特别是大量计算的只会降低效率。只是更容易生成大量的“线程”,提高并行能力。如果实现erlang这种架构,就可以让程序写起来更方便,用同步的写法就完成了异步的调用,对于库的编写要求会比较高。 上面这个测试程序只是简单测试一下可行性,看有没有人有兴趣一起研究一下,这种架构最主要的难点在于调度,不过这是个不断优化的过程,关键是看有没有简化编程。D的垃圾收集对于委托的影响还没测试到呢,它和fiber的配合程度怎样也不清楚,毕竟fiber的堆栈是要不断切换的,而D的GC在扫描堆栈时只是扫描线程栈。 |
|
返回顶楼 | |
发表时间:2007-09-15
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.
这个测试对 er-lang 来说, 简直是量身定做 |
|
返回顶楼 | |
发表时间:2007-09-15
redsea 写道 language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.
这个测试对 er-lang 来说, 简直是量身定做 原来如此,这个测试D/C++成绩也太差了,线程开销还是太大了些,应该很容易用轻量级线程来提高吧。目前首先要做的是做个调度器,先让它跑起来再说。 |
|
返回顶楼 | |
发表时间:2007-09-16
tango的fibre据说是assertfalse.com mikolalysenko的coroutine。language shootout的coroutine测试我不太懂其他语言的实现,然而感觉是对D不公平的,D用的是系统的线程。其他的可能根本就没有启动系统线程
|
|
返回顶楼 | |
发表时间:2007-09-17
看样子要跑过erlang很难啊,那里有一个gcc的测试,使用ucontext的,竟然比python还差,更不用说erlang了。。
|
|
返回顶楼 | |
发表时间:2007-09-18
做了个可工作的版本,完成cheap_concurrency的功能,结果是正确的,不过效率那是相当地低,主要是调度部分并没有优化。优化的策略就是每次只调度处于receive状态并且有message的进程,有空再进行,可能要这周末了。先发出来,有兴趣的不妨改改看。纯属好玩,不一定有什么价值。
cheap_concurrency测试代码是从erlang版本改过来的,不过由于递归会导致堆栈溢出,所以这里改成了循环。 1.cheap_concurrency.d import light_process; import tango.io.Stdout; import tango.text.convert.Integer; void de_main(int N) { Process Last = start(500, self()); Stdout(sendtimes(N, Last, 0)).newline; } Process start(int X, Process LastPID) { if (X == 0) return LastPID; return start(X-1, spawn(&loop, LastPID)); } void loop(Process LastPID) { while(true) { receive( (int N) { LastPID.send(N+1); } ).end; } } int sendtimes(int N, Process Last, int X) { for(int i=0; i<N; i++) { Last.send(0); int y; receive( (int Y) { y = Y; } ).end; X += y; } return X; } void main(char[][] args){ if (args.length != 2){ Stdout("USAGE: " ~ args[0] ~ " N").newline; return; } int n = toInt(args[1]); spawn(&de_main, n); Process.run(); } 2.light_process.d module light_process; import tango.io.Stdout; import tango.core.Thread; import tango.core.Traits; import tango.util.collection.LinkSeq; class NotMatched{ } class Message { TypeInfo typeInfo__; public this(TypeInfo typeInfo__) { this.typeInfo__ = typeInfo__; } } class MessageT(Types...) : public Message{ Types values; public this(Types values_){ super(typeid(typeof(Types))); foreach(i, e; values_) { values[i] = values_[i]; } } } class Delegate { abstract void call(Message msg); abstract bool matchType(TypeInfo typeInfo__); } class DelegateT(Types...) : public Delegate{ void delegate(Types) dg; void function(Types) func; this(void delegate(Types) dg){ this.dg = dg; } this(void function(Types) func){ this.func = func; } override void call(Message msg) { auto message = cast(MessageT!(Types))msg; if (dg) dg(message.values); else func(message.values); } override bool matchType(TypeInfo typeInfo__) { if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types)) return true; return false; } } class Process : public Fiber{ // static members public static Process getThis() { return cast(Process)Fiber.getThis(); } // receiving processes private static bool[Process] receivingProcesses; // instance members private LinkSeq!(Message) messages; private LinkSeq!(Delegate) handlers; public this() { super(&exec); messages = new LinkSeq!(Message); handlers = new LinkSeq!(Delegate); } public void startReceiving() { receivingProcesses[this] = true; } public void stopReceiving() { receivingProcesses.remove(this); } public void clearMessageHandlers() { handlers.clear(); } public void addMessageHandler(Delegate dg){ handlers.append(dg); } public Process timeout(int timeout, void delegate() dg) { return this; } public void end() { Fiber.yield(); domsg(); } public Process send(Args ...)(Args args) { messages.append(new MessageT!(Args)(args)); return this; } public void domsg() { while(messages.size() == 0) { Fiber.yield(); } Message msg = messages.head(); foreach(dg; handlers) { if (dg.matchType(msg.typeInfo__)) { dg.call(msg); messages.removeHead(); break; } } } public static void run() { while(true) { foreach(process, dummy; Process.receivingProcesses) { if (process.state != Fiber.State.TERM) { process.call(); } } } } protected abstract void exec(); } class ProcessT(Ret, Args...) : public Process { Ret delegate(Args) dg; Ret function(Args) func; Args args; public this(Ret delegate(Args) dg, Args args) { this.dg = dg; foreach(i, e; args) { this.args[i] = e; } } public this(Ret function(Args) func, Args args) { this.func = func; foreach(i, e; args) { this.args[i] = e; } } public void exec() { if (dg) dg(args); else func(args); } } Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) { Process p = new ProcessT!(Ret, Args)(dg, args); p.call(); return p; } Process spawn(Ret, Args...)(Ret function(Args) func, Args args) { Process p = new ProcessT!(Ret, Args)(func, args); p.call(); return p; } Process receive(Dgs ...)(Dgs dgs) { Process pid = self(); pid.clearMessageHandlers(); foreach(i, dg; dgs) { Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))(dgs[i]); pid.addMessageHandler(handler); } pid.startReceiving(); return pid; } Process self() { return Process.getThis(); } |
|
返回顶楼 | |