`
m635674608
  • 浏览: 5027710 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

java actor模型和消息传递实现分析

    博客分类:
  • java
 
阅读更多

Actor模型是一种基于协程的消息传递模型,在并行计算和并发的消息传递中有很好的性能表现。一般的actor模块框架提供了超轻量级的线程和工具,可以在这些线程之间进行快速、安全、零复制的消息传递。在elang,ruby,lua等语言中都是直接在VM级别支持协程,VM帮你做context的保存和恢复。而在java中,却没有内置actor模型实现,但是有几个开源框架也模拟了actor模型的实现。

 

基于 actor 的系统 通过实现一种消息传递 模式,使并行处理更容易编码。在此模式中,系统中的每个 actor 都可接收消息;执行该消息所表示的操作;然后将消息发送给其他 actor(包括它们自己)以执行复杂的操作序列。actor 之间的所有消息是异步的,这意味着发送者会在收到任何回复之前继续进行处理。因此,一个 actor 可能终生都陷入接收和处理消息的无限循环中。

当使用多个 actor 时,独立的活动可轻松分配到多个可并行执行消息的线程上(进而分配在多个处理器上)。一般而言,每个 actor 都在一个独立线程上处理消息。一些 actor 系统静态地向 actor 分配线程;而其他系统(比如本文中介绍的系统)则会动态地分配它们。

 

 

下面我们会分析下java中的一个actor模型框架的实现:

 

我们先看下elang中的actor模型的实现:

 

以Erlang为例子,介绍一下简单的Actor模型

 1.首先建立一个Actor,在erlang中,起一个进程(这个是erlang虚拟机进程,跟os进程不同),这个进程就是actor了,可以用来接收和发送各种消息了

 Pid = spawn(Mod,func,Args) %起一个进程

 2.处理收到的消息

func()->

    receive

{From,Msg}-> %收到一个消息

%%do something

func();

 3.要对这个actor发送消息,也非常简单

  Pid ! {From,Msg}

 

 

ujavaactor框架:

 

下面摘自ibm developer的一段介绍

 

μJavaActors 是 actor 系统的一个简单的 Java 实现。只有 1,200 行代码,μJavaActors 虽然很小,但很强大。在下面的练习中,您将学习如何使用 μJavaActors 动态地创建和管理 actor,将消息传送给它们。

μJavaActors 围绕 3 个核心界面而构建:

  • 消息 是在 actor 之间发送的消息。Message 是 3 个(可选的)值和一些行为的容器:
    • source 是发送 actor。
    • subject 是定义消息含义的字符串(也称为命令)。
    • data 是消息的任何参数数据;通常是一个映射、列表或数组。参数可以是要处理和/或其他 actor 要与之交互的数据。
    • subjectMatches() 检查消息主题是否与字符串或正则表达式匹配。
    μJavaActors 包的默认消息类是 DefaultMessage
  • ActorManager 是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。ActorManager 拥有以下关键行为或特征:
    • createActor() 创建一个 actor 并将它与此管理器相关联。
    • startActor() 启动一个 actor。
    • detachActor() 停止一个 actor 并将它与此管理器断开。
    • send()/broadcast() 将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或所有 actor。
    在大部分程序中,只有一个 ActorManager,但如果您希望管理多个线程和/或 actor 池,也可以有多个 ActorManager。此接口的默认实现是 DefaultActorManager
  • Actor 是一个执行单元,一次处理一条消息。Actor 具有以下关键行为或特征:
    • 每个 actor 有一个 name,该名称在每个 ActorManager 中必须是惟一的。
    • 每个 actor 属于一个 category;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。
    • 只要 ActorManager 可以提供一个执行 actor 的线程,系统就会调用 receive()。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(比如等待人为输入)。
    • willReceive() 允许 actor 过滤潜在的消息主题。
    • peek() 允许该 actor 和其他 actor 查看是否存在挂起的消息(或许是为了选择主题)。
    • remove() 允许该 actor 和其他 actor 删除或取消任何尚未处理的消息。
    • getMessageCount() 允许该 actor 和其他 actor 获取挂起的消息数量。
    • getMaxMessageCount() 允许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。
    大部分程序都有许多 actor,这些 actor 常常具有不同的类型。actor 可在程序启动时创建或在程序执行时创建(和销毁)。本文中的 actor 包 包含一个名为 AbstractActor 的抽象类,actor 实现基于该类。

 

 

 

Java代码  收藏代码
  1. public abstract class AbstractActor extends Utils implements Actor {  
  2.     public static final int DEFAULT_MAX_MESSAGES = 100;  
  3.     protected DefaultActorManager manager;  
  4.   
  5.     public ActorManager getManager() {  
  6.         return manager;  
  7.     }  
  8.   
  9.     public void setManager(DefaultActorManager manager) {  
  10.         if (this.manager != null && manager != null) {  
  11.             throw new IllegalStateException(  
  12.                     "cannot change manager of attached actor");  
  13.         }  
  14.         this.manager = manager;  
  15.     }  
  16.   
  17.     protected String name;  
  18.   
  19.     @Override  
  20.     public String getName() {  
  21.         return name;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void setName(String name) {  
  26.         if (manager != null) {  
  27.             throw new IllegalStateException("cannot change name if manager set");  
  28.         }  
  29.         this.name = name;  
  30.     }  
  31.   
  32.     protected String category = DEFAULT_CATEGORY;  
  33.   
  34.     @Override  
  35.     public String getCategory() {  
  36.         return category;  
  37.     }  
  38.   
  39.     @Override  
  40.     public void setCategory(String category) {  
  41.         this.category = category;  
  42.     }  
  43.   
  44.     /** 
  45.      * Process a message conditionally. If testMessage() returns null no message 
  46.      * will be consumed. 
  47.      *  
  48.      * @see AbstractActor#testMessage() 
  49.      */  
  50.     @Override  
  51.     public boolean receive() {  
  52.         Message m = testMessage();  
  53.         boolean res = m != null;  
  54.         if (res) {  
  55.             boolean f = remove(m);  
  56.             if (!f) {  
  57.                 logger.warning("receive message not removed: %s", m);  
  58.             }  
  59.             DefaultMessage dm = (DefaultMessage) m;  
  60.             try {  
  61.                 dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.DELIVERED));  
  62.                 //logger.trace("receive %s processing %s", this.getName(), m);  
  63.                 loopBody(m);  
  64.                 dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.COMPLETED));  
  65.             } catch (Exception e) {  
  66.                 dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.FAILED));  
  67.                 logger.error("loop exception", e);  
  68.             }  
  69.         }  
  70.         manager.awaitMessage(this);  
  71.         return res;  
  72.     }  
  73.   
  74.     /** 
  75.      * Test to see if a message should be processed. Subclasses should override 
  76.      */  
  77.     @Override  
  78.     public boolean willReceive(String subject) {  
  79.         return !isEmpty(subject); // default receive all subjects  
  80.     }  
  81.   
  82.     /** Test the current message. Default action is to accept all. */  
  83.     protected Message testMessage() {  
  84.         return getMatch(nullfalse);  
  85.     }  
  86.   
  87.     /** Process the accepted subject. */  
  88.     abstract protected void loopBody(Message m);  
  89.   
  90.     /** Test a message against a defined subject pattern. */  
  91.     protected DefaultMessage getMatch(String subject, boolean isRegExpr) {  
  92.         DefaultMessage res = null;  
  93.         synchronized (messages) {  
  94.             res = (DefaultMessage) peekNext(subject, isRegExpr);  
  95.         }  
  96.         return res;  
  97.     }  
  98.   
  99.     protected List<DefaultMessage> messages = new LinkedList<DefaultMessage>();  
  100.   
  101.     public DefaultMessage[] getMessages() {  
  102.         return messages.toArray(new DefaultMessage[messages.size()]);  
  103.     }  
  104.   
  105.     @Override  
  106.     public int getMessageCount() {  
  107.         synchronized (messages) {  
  108.             return messages.size();  
  109.         }  
  110.     }  
  111.   
  112.     /** 
  113.      * Limit the number of messages that can be received.  Subclasses should override. 
  114.      */  
  115.     @Override  
  116.     public int getMaxMessageCount() {  
  117.         return DEFAULT_MAX_MESSAGES;  
  118.     }  
  119.   
  120.     /** Queue a messaged to be processed later. */  
  121.     public void addMessage(DefaultMessage message) {  
  122.         if (message != null) {  
  123.             synchronized (messages) {  
  124.                 if (messages.size() < getMaxMessageCount()) {  
  125.                     messages.add(message);  
  126.                     // messages.notifyAll();  
  127.                 } else {  
  128.                     throw new IllegalStateException("too many messages, cannot add");  
  129.                 }  
  130.             }  
  131.         }   
  132.     }  
  133.   
  134.     @Override  
  135.     public Message peekNext() {  
  136.         return peekNext(null);  
  137.     }  
  138.   
  139.     @Override  
  140.     public Message peekNext(String subject) {  
  141.         return peekNext(subject, false);  
  142.     }  
  143.   
  144.     /**  
  145.      * See if a message exists that meets the selection criteria.  
  146.      **/  
  147.     @Override  
  148.     public Message peekNext(String subject, boolean isRegExpr) {  
  149.         Message res = null;  
  150.         if (isActive) {  
  151.             Pattern p = subject != null ? (isRegExpr ? Pattern.compile(subject)  
  152.                     : null) : null;  
  153.             long now = new Date().getTime();  
  154.             synchronized (messages) {  
  155.                 for (DefaultMessage m : messages) {  
  156.                     if (m.getDelayUntil() <= now) {  
  157.                         boolean match = subject == null  
  158.                                 || (isRegExpr ? m.subjectMatches(p) : m  
  159.                                         .subjectMatches(subject));  
  160.                         if (match) {  
  161.                             res = m;  
  162.                             break;  
  163.                         }  
  164.                     }  
  165.                 }  
  166.             }  
  167.         }  
  168.         // logger.trace("peekNext %s, %b: %s", subject, isRegExpr, res);  
  169.         return res;  
  170.     }  
  171.   
  172.     @Override  
  173.     public boolean remove(Message message) {  
  174.         synchronized (messages) {  
  175.             return messages.remove(message);  
  176.         }  
  177.     }  
  178.   
  179.     protected boolean isActive;  
  180.   
  181.     public boolean isActive() {  
  182.         return isActive;  
  183.     }  
  184.   
  185.     @Override  
  186.     public void activate() {  
  187.         isActive = true;  
  188.     }  
  189.   
  190.     @Override  
  191.     public void deactivate() {  
  192.         isActive = false;  
  193.     }  
  194.   
  195.     /** Do startup processing. */  
  196.     protected void runBody() {  
  197.         DefaultMessage m = new DefaultMessage("init");  
  198.         getManager().send(m, nullthis);  
  199.     }  
  200.   
  201.     @Override  
  202.     public void run() {  
  203.         runBody();  
  204.         ((DefaultActorManager) getManager()).awaitMessage(this);  
  205.     }  
  206.   
  207.     protected boolean hasThread;  
  208.   
  209.     public boolean getHasThread() {  
  210.         return hasThread;  
  211.     }  
  212.   
  213.     protected void setHasThread(boolean hasThread) {  
  214.         this.hasThread = hasThread;  
  215.     }  
  216.   
  217.     @Override  
  218.     public String toString() {  
  219.         return getClass().getSimpleName() + "[" + bodyString() + "]";  
  220.     }  
  221.   
  222.     protected String bodyString() {  
  223.         return "name=" + name + ", category=" + category + ", messages="  
  224.                 + messages.size();  
  225.     }  
  226.   
  227.     volatile protected boolean shutdown;  
  228.   
  229.     @Override  
  230.     public boolean isShutdown() {  
  231.         return shutdown;  
  232.     }  
  233.   
  234.     @Override  
  235.     public void shutdown() {  
  236.         shutdown = true;  
  237.     }  
  238.   
  239.     volatile protected boolean suspended;  
  240.   
  241.     @Override  
  242.     public void setSuspended(boolean f) {  
  243.         suspended = f;  
  244.     }  
  245.   
  246.     @Override  
  247.     public boolean isSuspended() {  
  248.         return suspended;  
  249.     }  
  250. }  

 

 

我们需要继承AbstractActor抽象类,实现loopBody方法来处理消息内容。

其中receive方法是接收一条消息并进行处理,首先取出一条消息,从消息列表中删除,然后出发消息接收前的MessageEvent.MessageStatus.DELIVERED事件,我们可以实现MessageListener接口来监听消息处理的时间,然后通过抽象方法loopBody来处理消息,最后触发消息接收后的MessageEvent.MessageStatus.COMPLETED事件。

peekNext只是取出消息。

其中isActive表示是否actor已经激活,hasThread表示是否已经分配线程在执行,shutdown是否已经关闭,suspended是否已经挂起。

 

我们看下defaultMessage消息体:

 

Java代码  收藏代码
  1. public class DefaultMessage extends Utils implements Message {  
  2.   
  3. ......  
  4.   
  5.     protected Actor source;  
  6.   
  7.   
  8.     protected String subject;  
  9.   
  10.   
  11.     protected Object data;  
  12.   
  13.   
  14.     protected List<MessageListener> listeners = new LinkedList<MessageListener>();  
  15.   
  16.     public void addMessageListener(MessageListener l) {  
  17.         if (!listeners.contains(l)) {  
  18.             listeners.add(l);  
  19.         }  
  20.     }  
  21.   
  22.     public void removeMessageListener(MessageListener l) {  
  23.         listeners.remove(l);  
  24.     }  
  25.       
  26.     public void fireMessageListeners(MessageEvent e) {  
  27.         for(MessageListener l : listeners) {  
  28.             l.onMessage(e);  
  29.         }  
  30.     }  
  31.     ......  
  32. }  

 

 可以给每个消息增加一些消息监听事件。

 

然后重点再看下DefaultActorManager的实现:

 

初始化:

初始化会创建ActorManager.properties读取配置的启动线程数参数,然后创建一个线程组,创建getThreadCount()个ActorRunnable的线程(用来执行runnables保存的待执行actor队列中的任务),

最后还启动一个Counter的线程,用于计算每个调度时间(1秒)的执行actor数。

 

 

Java代码  收藏代码
  1. /** 
  2.      * Get the default instance. Uses ActorManager.properties for configuration. 
  3.      *  
  4.      * @return shared instance 
  5.      */  
  6.     public static DefaultActorManager getDefaultInstance() {  
  7.         if (instance == null) {  
  8.             instance = new DefaultActorManager();  
  9.             Map<String, Object> options = null;  
  10.             // ConfigUtils configUtils = new ConfigUtils();  
  11.             // Properties p = configUtils  
  12.             // .loadProperties("ActorManager.properties");  
  13.             Properties p = new Properties();  
  14.             try {  
  15.                 p.load(new FileInputStream("ActorManager.properties"));  
  16.             } catch (IOException e) {  
  17.                 try {  
  18.                     p.load(new FileInputStream("/resource/ActorManager.properties"));  
  19.                 } catch (IOException e1) {  
  20.                     logger.warning("DefaultActorManager: no configutration: " + e);  
  21.                 }  
  22.             }  
  23.             if (!isEmpty(p)) {  
  24.                 options = new HashMap<String, Object>();  
  25.                 for (Object key : p.keySet()) {  
  26.                     String skey = (String) key;  
  27.                     options.put(skey, p.getProperty(skey));  
  28.                 }  
  29.             }  
  30.             instance.initialize(options);  
  31.         }  
  32.         return instance;  
  33.     }  
  34.   
  35. protected ThreadGroup threadGroup;  protected static int groupCount;  
  36.   
  37.     protected List<Thread> threads = new LinkedList<Thread>(); //保存线程池的线程列表  
  38.     /** 
  39.      * Initialize this manager. Call only once. 
  40.      *  
  41.      * @param options 
  42.      *            map of options 
  43.      */  
  44.     @Override  
  45.     public void initialize(Map<String, Object> options) {  
  46.         if (!initialized) {  
  47.             initialized = true;  
  48.             int count = getThreadCount(options);  
  49.             ThreadGroup tg = new ThreadGroup("ActorManager" + groupCount++);  
  50.             threadGroup = tg;  
  51.             for (int i = 0; i < count; i++) {  
  52.                 createThread(i);  
  53.             }  
  54.             running = true;  
  55.             for (Thread t : threads) {  
  56.                 // logger.trace("procesNextActor starting %s", t);  
  57.                 t.start();  
  58.             }  
  59.   
  60.             Thread Counter = new Thread(new Runnable() {  
  61.                 @Override  
  62.                 public void run() {  
  63.                     while (running) {  
  64.                         try {  
  65.                             trendValue = sendCount - dispatchCount;  
  66.                             // logger.trace("Counter thread: sc=%d, dc=%d, t=%d",  
  67.                             // sendCount, dispatchCount, trendValue);  
  68.                             lastSendCount = sendCount;  
  69.                             sendCount = 0;  
  70.                             updateLastDispatchCount();  
  71.                             Thread.sleep(1000);  
  72.                         } catch (InterruptedException e) {  
  73.                             break;  
  74.                         }  
  75.                     }  
  76.                     sendCount = lastSendCount = 0;  
  77.                     clearDispatchCount();  
  78.                 }  
  79.             });  
  80.             Counter.setDaemon(true);  
  81.             lastDispatchTime = lastSendTime = new Date().getTime();  
  82.             Counter.start();  
  83.         }  
  84.     }  
  85.     protected void createThread(int i) {  
  86.         addThread("actor" + i);  
  87.     }  
  88. /** 
  89.      * Add a dynamic thread.  
  90.      *  
  91.      * @param name 
  92.      * @return 
  93.      */  
  94.     public Thread addThread(String name) {  
  95.         Thread t = null;  
  96.         synchronized (actors) {  
  97.             if (trunnables.containsKey(name)) {  
  98.                 throw new IllegalStateException("already exists: " + name);  
  99.             }  
  100.             ActorRunnable r = new ActorRunnable();  
  101.             trunnables.put(name, r);  
  102.             t = new Thread(threadGroup, r, name);  
  103.             threads.add(t);  
  104.             //System.out.printf("addThread: %s", name);  
  105.         }  
  106.         t.setDaemon(true);  
  107.         t.setPriority(getThreadPriority());  
  108.         return t;  
  109.     }  

 

 

 

 

 

Java代码  收藏代码
  1. /** Configuration key for thread count. */  
  2. public static final String ACTOR_THREAD_COUNT = "threadCount";  
  3.   
  4. protected Map<String, AbstractActor> actors = new LinkedHashMap<String, AbstractActor>();  
  5.   
  6. protected Map<String, AbstractActor> runnables = new LinkedHashMap<String, AbstractActor>();  
  7.   
  8. protected Map<String, AbstractActor> waiters = new LinkedHashMap<String, AbstractActor>();  

 

 

actors存放所有的actor对象,runnables存放待发送消息(发送方)的actor, waiters存放等待(接收方接收消息)的actors。

 

 

创建一个actor实例:

 

Java代码  收藏代码
  1. /** 
  2.  * Create an actor and associate it with this manager. 
  3.  *  
  4.  * @param clazz 
  5.  *            the actor class 
  6.  * @param the 
  7.  *            actor name; must be unique 
  8.  * @param options 
  9.  *            actor options 
  10.  */  
  11. @Override  
  12. public Actor createActor(Class<? extends Actor> clazz, String name, Map<String, Object> options) {  
  13.     AbstractActor a = null;  
  14.     synchronized (actors) {  
  15.         if (!actors.containsKey(name)) {  
  16.             try {  
  17.                 a = (AbstractActor) clazz.newInstance();  
  18.                 a.setName(name);  
  19.                 a.setManager(this);  
  20.             } catch (Exception e) {  
  21.                 throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(  
  22.                         "mapped exception: " + e, e);  
  23.             }  
  24.         } else {  
  25.             throw new IllegalArgumentException("name already in use: " + name);  
  26.         }  
  27.     }  
  28.     return a;  
  29. }  
  30.   
  31. /** 
  32.  * Start an actor. Must have been created by this manager. 
  33.  *  
  34.  * @param actor 
  35.  *            the actor 
  36.  */  
  37. @Override  
  38. public void startActor(Actor actor) {  
  39.     if (((AbstractActor) actor).getManager() != this) {  
  40.         throw new IllegalStateException("actor not owned by this manager");  
  41.     }  
  42.     String name = actor.getName();  
  43.     synchronized (actors) {  
  44.         if (actors.containsKey(name)) {  
  45.             throw new IllegalStateException("already started");  
  46.         }  
  47.         ((AbstractActor) actor).shutdown = false;  
  48.         actors.put(name, (AbstractActor) actor);  
  49.         runnables.put(name, (AbstractActor) actor);  
  50.     }  
  51.     actor.activate();  
  52. }  

  启动一个actor实例会把actor放入actors队列和runnables等待执行队列(发送方发送消息)中,然后调用actor的activate()方法做初始化。

 

 

我们再看看ActorRunnable线程池中的线程执行actor的过程:

 

actorRunnable.java是DefaultActorManager的内部类,可以用于访问DefaultActorManager的actor实例列表

 

procesNextActor 从runnables的待执行actor列表中取出第一个(fifo)actor,如果不为空,则执行该actor线程的run方法,发送消息,actor接口是继承了runable接口的,如果runnables列表为空,则从waiters列表中取出一个actor执行,接收消息,接收到消息,响应的增加dispatchCount的值,表示已经消费的消息数。如果procesNextActor 执行失败,则主线程等待100ms,否则循环执行。

 

 

Java代码  收藏代码
  1. /** public intended only for "friend" access. */  
  2.     public class ActorRunnable implements Runnable {  
  3.         public boolean hasThread;  
  4.         public AbstractActor actor;  
  5.   
  6.         public void run() {  
  7.             // logger.trace("procesNextActor starting");  
  8.             int delay = 1;  
  9.             while (running) {  
  10.                 try {  
  11.                     if (!procesNextActor()) {  
  12.                         // logger.trace("procesNextActor waiting on actor");  
  13.                         // sleep(delay * 1000);  
  14.                         synchronized (actors) {  
  15.                             // TOOD: adjust this delay; possible parameter  
  16.                             // we want to minizmize overhead (make bigger);  
  17.                             // but it has a big impact on message processing  
  18.                             // rate (makesmaller)  
  19.                             // actors.wait(delay * 1000);  
  20.                             actors.wait(100);  
  21.                         }  
  22.                         delay = Math.max(5, delay + 1);  
  23.                     } else {  
  24.                         delay = 1;  
  25.                     }  
  26.                 } catch (InterruptedException e) {  
  27.                 } catch (Exception e) {  
  28.                     logger.error("procesNextActor exception", e);  
  29.                 }  
  30.             }  
  31.             // logger.trace("procesNextActor ended");  
  32.         }  
  33.   
  34.         protected boolean procesNextActor() {  
  35.             boolean run = false, wait = false, res = false;  
  36.             actor = null;  
  37.             synchronized (actors) {  
  38.                 for (String key : runnables.keySet()) {  
  39.                     actor = runnables.remove(key);  
  40.                     break;  
  41.                 }  
  42.             }  
  43.             if (actor != null) {  
  44.                 // first run never started  
  45.                 run = true;  
  46.                 actor.setHasThread(true);  
  47.                 hasThread = true;  
  48.                 try {  
  49.                     actor.run();  
  50.                 } finally {  
  51.                     actor.setHasThread(false);  
  52.                     hasThread = false;  
  53.                 }  
  54.             } else {  
  55.                 synchronized (actors) {  
  56.                     for (String key : waiters.keySet()) {  
  57.                         actor = waiters.remove(key);  
  58.                         break;  
  59.                     }  
  60.                 }  
  61.                 if (actor != null) {  
  62.                     // then waiting for responses  
  63.                     wait = true;  
  64.                     actor.setHasThread(true);  
  65.                     hasThread = true;  
  66.                     try {  
  67.                         res = actor.receive();  
  68.                         if (res) {  
  69.                             incDispatchCount();  
  70.                         }  
  71.                     } finally {  
  72.                         actor.setHasThread(false);  
  73.                         hasThread = false;  
  74.                     }  
  75.                 }  
  76.             }  
  77.             // if (!(!run && wait && !res) && a != null) {  
  78.             // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a);  
  79.             // }  
  80.             return run || res;  
  81.         }  
  82.     }  

 

 

接下来就是调用发送消息的api了,send将消息message从from发送到to actor中,sentMessages保存了每个actor的消息列表,还有批量发送到多个actor中,send(Message message, Actor from, String category)是将消息发送到分类为category的actor中。broadcast是将消息发送到所有的actor中。

 

 

Java代码  收藏代码
  1. /** 
  2.      * Send a message. 
  3.      *  
  4.      * @param message 
  5.      *            message to 
  6.      * @param from 
  7.      *            source actor 
  8.      * @param to 
  9.      *            target actor 
  10.      * @return number of receiving actors 
  11.      */  
  12.     @Override  
  13.     public int send(Message message, Actor from, Actor to) {  
  14.         int count = 0;  
  15.         if (message != null) {  
  16.             AbstractActor aa = (AbstractActor) to;  
  17.             if (aa != null) {  
  18.                 if (!aa.isShutdown() && !aa.isSuspended() && aa.willReceive(message.getSubject())) {  
  19.                     DefaultMessage xmessage = (DefaultMessage) ((DefaultMessage) message).assignSender(from);  
  20.                     // logger.trace(" %s to %s", xmessage, to);  
  21.                     aa.addMessage(xmessage);  
  22.                     xmessage.fireMessageListeners(new MessageEvent(aa, xmessage, MessageEvent.MessageStatus.SENT));  
  23.                     sendCount++;  
  24.                     lastSendTime = new Date().getTime();  
  25.                     if (recordSentMessages) {  
  26.                         synchronized (sentMessages) {  
  27.                             String aname = aa.getName();  
  28.                             List<Message> l = sentMessages.get(aname);  
  29.                             if (l == null) {  
  30.                                 l = new LinkedList<Message>();  
  31.                                 sentMessages.put(aname, l);  
  32.                             }  
  33.                             // keep from getting too big  
  34.                             if (l.size() < 100) {  
  35.                                 l.add(xmessage);  
  36.                             }  
  37.                         }  
  38.                     }  
  39.                     count++;  
  40.                     synchronized (actors) {  
  41.                         actors.notifyAll();  
  42.                     }  
  43.                 }  
  44.             }  
  45.         }  
  46.         return count;  
  47.     }  
  48.   
  49.     /** 
  50.      * Send a message. 
  51.      *  
  52.      * @param message 
  53.      *            message to 
  54.      * @param from 
  55.      *            source actor 
  56.      * @param to 
  57.      *            target actors 
  58.      * @return number of receiving actors 
  59.      */  
  60.     @Override  
  61.     public int send(Message message, Actor from, Actor[] to) {  
  62.         int count = 0;  
  63.         for (Actor a : to) {  
  64.             count += send(message, from, a);  
  65.         }  
  66.         return count;  
  67.     }  
  68.   
  69.     /** 
  70.      * Send a message. 
  71.      *  
  72.      * @param message 
  73.      *            message to 
  74.      * @param from 
  75.      *            source actor 
  76.      * @param to 
  77.      *            target actors 
  78.      * @return number of receiving actors 
  79.      */  
  80.     @Override  
  81.     public int send(Message message, Actor from, Collection<Actor> to) {  
  82.         int count = 0;  
  83.         for (Actor a : to) {  
  84.             count += send(message, from, a);  
  85.         }  
  86.         return count;  
  87.     }  
  88.   
  89.     /** 
  90.      * Send a message. 
  91.      *  
  92.      * @param message 
  93.      *            message to 
  94.      * @param from 
  95.      *            source actor 
  96.      * @param category 
  97.      *            target actor category 
  98.      * @return number of receiving actors 
  99.      */  
  100.     @Override  
  101.     public int send(Message message, Actor from, String category) {  
  102.         int count = 0;  
  103.         Map<String, Actor> xactors = cloneActors();  
  104.         List<Actor> catMembers = new LinkedList<Actor>();  
  105.         for (String key : xactors.keySet()) {  
  106.             Actor to = xactors.get(key);  
  107.             if (category.equals(to.getCategory()) && (to.getMessageCount() < to.getMaxMessageCount())) {  
  108.                 catMembers.add(to);  
  109.             }  
  110.         }  
  111.         // find an actor with lowest message count  
  112.         int min = Integer.MAX_VALUE;  
  113.         Actor amin = null;  
  114.         for (Actor a : catMembers) {  
  115.             int mcount = a.getMessageCount();  
  116.             if (mcount < min) {  
  117.                 min = mcount;  
  118.                 amin = a;  
  119.             }  
  120.         }  
  121.         if (amin != null) {  
  122.             count += send(message, from, amin);  
  123.             // } else {  
  124.             // throw new  
  125.             // IllegalStateException("no capable actors for category: " +  
  126.             // category);  
  127.         }  
  128.         return count;  
  129.     }  
  130.   
  131.     /** 
  132.      * Send a message to all actors. 
  133.      *  
  134.      * @param message 
  135.      *            message to 
  136.      * @param from 
  137.      *            source actor 
  138.      * @return number of receiving actors 
  139.      */  
  140.     @Override  
  141.     public int broadcast(Message message, Actor from) {  
  142.         int count = 0;  
  143.         Map<String, Actor> xactors = cloneActors();  
  144.         for (String key : xactors.keySet()) {  
  145.             Actor to = xactors.get(key);  
  146.             count += send(message, from, to);  
  147.         }  
  148.         return count;  
  149.     }  

 

这里每次获取所有的actor列表都是采用clone方式复制当前的actor列表,如果长时间加锁,则会降低并发能力。

 

 

Java代码  收藏代码
  1. protected Map<String, Actor> cloneActors() {  
  2.     Map<String, Actor> xactors;  
  3.     synchronized (actors) {  
  4.         xactors = new HashMap<String, Actor>(actors);  
  5.     }  
  6.     return xactors;  
  7. }  

 

当然这里还有termina所有的线程

 

 

Java代码  收藏代码
  1. /** 
  2.      * Terminate processing and wait for all threads to stop. 
  3.      */  
  4.     @Override  
  5.     public void terminateAndWait() {  
  6.         logger.trace("terminateAndWait waiting on termination of %d threads", threads.size());  
  7.         terminate();  
  8.         waitForThreads();  
  9.     }  
  10.   
  11.     /** 
  12.      * Wait for all threads to stop. Must have issued terminate. 
  13.      */  
  14.     public void waitForThreads() {  
  15.         if (!terminated) {  
  16.             throw new IllegalStateException("not terminated");  
  17.         }  
  18.         for (Thread t : threads) {  
  19.             try {  
  20.                 // logger.info("terminateAndWait waiting for %s...", t);  
  21.                 t.join();  
  22.             } catch (InterruptedException e) {  
  23.                 // logger.info("terminateAndWait interrupt");  
  24.             }  
  25.         }  
  26.     }  
  27.   
  28.     boolean running, terminated;  
  29.   
  30.     /** 
  31.      * Terminate processing. 
  32.      */  
  33.     @Override  
  34.     public void terminate() {  
  35.         terminated = true;  
  36.         running = false;  
  37.         for (Thread t : threads) {  
  38.             t.interrupt();  
  39.         }  
  40.         synchronized (actors) {  
  41.             for (String key : actors.keySet()) {  
  42.                 actors.get(key).deactivate();  
  43.             }  
  44.         }  
  45.         sentMessages.clear();  
  46.         sendCount = lastSendCount = 0;  
  47.         clearDispatchCount();  
  48.     }  

 

也可以挂起某个actor,直到有消息达到,这里就是讲actor加入waiters列表,注意上面send发送消息之后,会调用acoters.nofityAll唤醒等待线程

 

 

Java代码  收藏代码
  1. /** 
  2.  * Suspend an actor until it has a read message. 
  3.  *  
  4.  * @param actor 
  5.  *            receiving actor 
  6.  */  
  7. public void awaitMessage(AbstractActor actor) {  
  8.     synchronized (actors) {  
  9.         waiters.put(actor.getName(), actor);  
  10.         // actors.notifyAll();  
  11.         // logger.trace("awaitMessage waiters=%d: %s",waiters.size(), a);  
  12.     }  
  13. }  

 

startActor时才将actor加入runnables队列。

 

示例代码:

 

 

Java代码  收藏代码
  1. import java.util.Date;  
  2. import java.util.HashMap;  
  3. import java.util.LinkedList;  
  4. import java.util.List;  
  5. import java.util.Map;  
  6. import java.util.Random;  
  7. import java.util.concurrent.ConcurrentHashMap;  
  8.   
  9. import javax.swing.event.ChangeEvent;  
  10. import javax.swing.event.ChangeListener;  
  11.   
  12. import com.ibm.actor.Actor;  
  13. import com.ibm.actor.DefaultActorManager;  
  14. import com.ibm.actor.DefaultMessage;  
  15. import com.ibm.actor.logging.DefaultLogger;  
  16. import com.ibm.actor.utils.Utils;  
  17.   
  18. /**  
  19.  * A set of runtime services for testing actors and a test case driver.  
  20.  *  
  21.  * @author BFEIGENB 
  22.  * 
  23.  */  
  24. public class DefaultActorTest extends Utils {  
  25.   
  26.     public static final int MAX_IDLE_SECONDS = 10;  
  27.   
  28.     // public static final int STEP_COUNT = 3 * 60;  
  29.     public static final int TEST_VALUE_COUNT = 1000// TODO: make bigger  
  30.   
  31.     public DefaultActorTest() {  
  32.         super();  
  33.     }  
  34.   
  35.     private Map<String, Actor> testActors = new ConcurrentHashMap<String, Actor>();  
  36.   
  37.     static Random rand = new Random();  
  38.   
  39.     public static int nextInt(int limit) {  
  40.         return rand.nextInt(limit);  
  41.     }  
  42.   
  43.     protected DefaultActorManager getManager() {  
  44.         DefaultActorManager am = actorManager != null ? actorManager : new DefaultActorManager();  
  45.         return am;  
  46.     }  
  47.   
  48.     protected int stepCount = 120;  
  49.   
  50.     public void setStepCount(int stepCount) {  
  51.         this.stepCount = stepCount;  
  52.     }  
  53.   
  54.     public int getStepCount() {  
  55.         return stepCount;  
  56.     }  
  57.   
  58.     protected int threadCount = 10;  
  59.   
  60.     public int getThreadCount() {  
  61.         return threadCount;  
  62.     }  
  63.   
  64.     public void setThreadCount(int threadCount) {  
  65.         this.threadCount = threadCount;  
  66.     }  
  67.   
  68.     public void setTestActors(Map<String, Actor> testActors) {  
  69.         this.testActors = testActors;  
  70.     }  
  71.   
  72.     public Map<String, Actor> getTestActors() {  
  73.         return testActors;  
  74.     }  
  75.   
  76.     public static final int COMMON_ACTOR_COUNT = 10;  
  77.     public static final int TEST_ACTOR_COUNT = 25;  
  78.     public static final int PRODUCER_ACTOR_COUNT = 25;  
  79.   
  80.     public static void sleeper(int seconds) {  
  81.         int millis = seconds * 1000 + -50 + nextInt(100); // a little  
  82.                                                             // variation  
  83.         // logger.trace("sleep: %dms", millis);  
  84.         sleep(millis);  
  85.     }  
  86.   
  87.     public static void dumpMessages(List<DefaultMessage> messages) {  
  88.         synchronized (messages) {  
  89.             if (messages.size() > 0) {  
  90.                 for (DefaultMessage m : messages) {  
  91.                     logger.info("%s", m);  
  92.                 }  
  93.             }  
  94.         }  
  95.     }  
  96.   
  97.     protected List<ChangeListener> listeners = new LinkedList<ChangeListener>();  
  98.   
  99.     public void addChangeListener(ChangeListener l) {  
  100.         if (!listeners.contains(l)) {  
  101.             listeners.add(l);  
  102.         }  
  103.     }  
  104.   
  105.     public void removeChangeListener(ChangeListener l) {  
  106.         listeners.remove(l);  
  107.     }  
  108.   
  109.     protected void fireChangeListeners(ChangeEvent e) {  
  110.         for (ChangeListener l : listeners) {  
  111.             l.stateChanged(e);  
  112.         }  
  113.     }  
  114.   
  115.     protected static String[] types = new String[] { "widget""framit""frizzle""gothca""splat" };  
  116.   
  117.     public static String[] getItemTypes() {  
  118.         return types;  
  119.     }  
  120.   
  121.     public static void main(String[] args) {  
  122.         DefaultActorTest at = new DefaultActorTest();  
  123.         at.run(args);  
  124.         logger.trace("Done");  
  125.     }  
  126.   
  127.     protected String title;  
  128.   
  129.     public String getTitle() {  
  130.         return title;  
  131.     }  
  132.   
  133.       
  134.   
  135.     volatile protected boolean done;  
  136.   
  137.     public void terminateRun() {  
  138.         done = true;  
  139.     }  
  140.   
  141.     public static String[] getTestNames() {  
  142.         return new String[] { "Countdown""Producer Consumer"/* "Quicksort", */"MapReduce""Virus Scan""All" };  
  143.     }  
  144.   
  145.     DefaultActorManager actorManager;  
  146.   
  147.     public DefaultActorManager getActorManager() {  
  148.         return actorManager;  
  149.     }  
  150.   
  151.     public void setActorManager(DefaultActorManager actorManager) {  
  152.         this.actorManager = actorManager;  
  153.     }  
  154.   
  155.     public void run(String[] args) {  
  156.         done = false;  
  157.         // DefaultLogger.getDefaultInstance().setIncludeDate(false);  
  158.         DefaultLogger.getDefaultInstance().setIncludeContext(false);  
  159.         DefaultLogger.getDefaultInstance().setIncludeCaller(false);  
  160.         // DefaultLogger.getDefaultInstance().setIncludeThread(false);  
  161.         DefaultLogger.getDefaultInstance().setLogToFile(false);  
  162.         DefaultLogger.getDefaultInstance().setThreadFieldWidth(10);  
  163.   
  164.         int sc = stepCount;  
  165.         int tc = threadCount;  
  166.         boolean doTest = false;  
  167.         title = "";  
  168.           
  169.         if (!doTest ) {  
  170.             doTest = true;  
  171.         }  
  172.         if (doTest) {  
  173.             if (title.length() > 0) {  
  174.                 title += " ";  
  175.             }  
  176.             title += "(Countdown Test)";  
  177.         }  
  178.           
  179.         DefaultActorManager am = getManager();  
  180.         try {  
  181.             Map<String, Object> options = new HashMap<String, Object>();  
  182.             options.put(DefaultActorManager.ACTOR_THREAD_COUNT, tc);  
  183.             am.initialize(options);  
  184.             if (doTest) {  
  185.                 for (int i = 0; i < COMMON_ACTOR_COUNT; i++) {  
  186.                     Actor a = am.createActor(TestActor.class, String.format("common%02d", i));  
  187.                     if (a instanceof TestableActor) {  
  188.                         TestableActor ta = (TestableActor) a;  
  189.                         ta.setActorTest(this);  
  190.                     }  
  191.                     a.setCategory(TestActor.class.getSimpleName());  
  192.                     getTestActors().put(a.getName(), a);  
  193.                     // logger.trace("created: %s", a);  
  194.                 }  
  195.                 for (int i = 0; i < TEST_ACTOR_COUNT; i++) {  
  196.                     Actor a = am.createActor(TestActor.class, String.format("actor%02d", i));  
  197.                     if (a instanceof TestableActor) {  
  198.                         TestableActor ta = (TestableActor) a;  
  199.                         ta.setActorTest(this);  
  200.                     }  
  201.                     getTestActors().put(a.getName(), a);  
  202.                     // logger.trace("created: %s", a);  
  203.                 }  
  204.             }  
  205.   
  206.             for (String key : getTestActors().keySet()) {  
  207.                 am.startActor(getTestActors().get(key));  
  208.             }  
  209.   
  210.             for (int i = sc; i > 0; i--) {  
  211.                 if (done) {  
  212.                     break;  
  213.                 }  
  214.                 // see if idle a while  
  215.                 long now = new Date().getTime();  
  216.                 if (am.getActiveRunnableCount() == 0) {  
  217.                     if (now - am.getLastDispatchTime() > MAX_IDLE_SECONDS * 1000  
  218.                             && now - am.getLastSendTime() > MAX_IDLE_SECONDS * 1000) {  
  219.                         break;  
  220.                     }  
  221.                 }  
  222.                 setStepCount(i);  
  223.                 fireChangeListeners(new ChangeEvent(this));  
  224.                 if (i < 10 || i % 10 == 0) {  
  225.                     logger.trace("main waiting: %d...", i);  
  226.                 }  
  227.                 sleeper(1);  
  228.             }  
  229.             setStepCount(0);  
  230.             fireChangeListeners(new ChangeEvent(this));  
  231.   
  232.             // logger.trace("main terminating");  
  233.             am.terminateAndWait();  
  234.         } catch (Exception e) {  
  235.             e.printStackTrace();  
  236.         }  
  237.     }  
  238. }  

 

Java代码  收藏代码
  1. /** 
  2.  * An actor that sends messages while counting down a send count.  
  3.  *  
  4.  * @author BFEIGENB 
  5.  * 
  6.  */  
  7. public class TestActor extends TestableActor {  
  8.   
  9.     @Override  
  10.     public void activate() {  
  11.         logger.trace("TestActor activate: %s"this);  
  12.         super.activate();  
  13.     }  
  14.   
  15.     @Override  
  16.     public void deactivate() {  
  17.         logger.trace("TestActor deactivate: %s"this);  
  18.         super.deactivate();  
  19.     }  
  20.   
  21.     @Override  
  22.     protected void runBody() {  
  23.         // logger.trace("TestActor:%s runBody: %s", getName(), this);  
  24.         DefaultActorTest.sleeper(1);  
  25.         DefaultMessage m = new DefaultMessage("init"8);  
  26.         getManager().send(m, nullthis);  
  27.     }  
  28.   
  29.     @Override  
  30.     protected void loopBody(Message m) {  
  31.         // logger.trace("TestActor:%s loopBody %s: %s", getName(), m, this);  
  32.         DefaultActorTest.sleeper(1);  
  33.         String subject = m.getSubject();  
  34.         if ("repeat".equals(subject)) {  
  35.             int count = (Integer) m.getData();  
  36.             logger.trace("TestActor:%s repeat(%d) %s: %s", getName(), count, m,  
  37.                     this);  
  38.             if (count > 0) {  
  39.                 m = new DefaultMessage("repeat", count - 1);  
  40.                 // logger.trace("TestActor loopBody send %s: %s", m, this);  
  41.                 String toName = "actor"  
  42.                         + DefaultActorTest  
  43.                                 .nextInt(DefaultActorTest.TEST_ACTOR_COUNT);  
  44.                 Actor to = actorTest.getTestActors().get(toName);  
  45.                 if (to != null) {  
  46.                     getManager().send(m, this, to);  
  47.                 } else {  
  48.                     logger.warning("repeat:%s to is null: %s", getName(),  
  49.                             toName);  
  50.                 }  
  51.             }  
  52.         } else if ("init".equals(subject)) {  
  53.             int count = (Integer) m.getData();  
  54.             count = DefaultActorTest.nextInt(count) + 1;  
  55.             logger.trace("TestActor:%s init(%d): %s", getName(), count, this);  
  56.             for (int i = 0; i < count; i++) {  
  57.                 DefaultActorTest.sleeper(1);  
  58.                 m = new DefaultMessage("repeat", count);  
  59.                 // logger.trace("TestActor runBody send %s: %s", m, this);  
  60.                 String toName = "actor"  
  61.                         + DefaultActorTest  
  62.                                 .nextInt(DefaultActorTest.TEST_ACTOR_COUNT);  
  63.                 Actor to = actorTest.getTestActors().get(toName);  
  64.                 if (to != null) {  
  65.                     getManager().send(m, this, to);  
  66.                 } else {  
  67.                     logger.warning("init:%s to is null: %s", getName(), toName);  
  68.                 }  
  69.                 DefaultMessage dm = new DefaultMessage("repeat", count);  
  70.                 dm.setDelayUntil(new Date().getTime()  
  71.                         + (DefaultActorTest.nextInt(5) + 1) * 1000);  
  72.                 getManager().send(dm, thisthis.getClass().getSimpleName());  
  73.             }  
  74.         } else {  
  75.             logger.warning("TestActor:%s loopBody unknown subject: %s",  
  76.                     getName(), subject);  
  77.         }  
  78.     }  
  79.   
  80. }  

 

 

输出:

 

 

Java代码  收藏代码
  1. 12:59:38.883 T [main      ] - TestActor activate: TestActor[name=common06, category=TestActor, messages=0]  
  2. 12:59:38.886 T [main      ] - TestActor activate: TestActor[name=common05, category=TestActor, messages=0]  
  3. 12:59:38.887 T [main      ] - TestActor activate: TestActor[name=common08, category=TestActor, messages=0]  
  4. 12:59:38.889 T [main      ] - TestActor activate: TestActor[name=common07, category=TestActor, messages=0]  
  5. 12:59:38.890 T [main      ] - TestActor activate: TestActor[name=common09, category=TestActor, messages=0]  
  6. 12:59:38.891 T [main      ] - TestActor activate: TestActor[name=common00, category=TestActor, messages=0]  
  7. 12:59:38.892 T [main      ] - TestActor activate: TestActor[name=common01, category=TestActor, messages=0]  
  8. 12:59:38.893 T [main      ] - TestActor activate: TestActor[name=common02, category=TestActor, messages=0]  
  9. 12:59:38.895 T [main      ] - TestActor activate: TestActor[name=common03, category=TestActor, messages=0]  
  10. 12:59:38.896 T [main      ] - TestActor activate: TestActor[name=common04, category=TestActor, messages=0]  
  11. 12:59:38.897 T [main      ] - TestActor activate: TestActor[name=actor24, category=default, messages=0]  
  12. 12:59:38.899 T [main      ] - TestActor activate: TestActor[name=actor11, category=default, messages=0]  
  13. 12:59:38.904 T [main      ] - TestActor activate: TestActor[name=actor23, category=default, messages=0]  
  14. 12:59:38.905 T [main      ] - TestActor activate: TestActor[name=actor10, category=default, messages=0]  
  15. 12:59:38.906 T [main      ] - TestActor activate: TestActor[name=actor22, category=default, messages=0]  
  16. 12:59:38.907 T [main      ] - TestActor activate: TestActor[name=actor13, category=default, messages=0]  
  17. 12:59:38.908 T [main      ] - TestActor activate: TestActor[name=actor21, category=default, messages=0]  
  18. 12:59:38.909 T [main      ] - TestActor activate: TestActor[name=actor12, category=default, messages=0]  
  19. 12:59:38.910 T [main      ] - TestActor activate: TestActor[name=actor20, category=default, messages=0]  
  20. 12:59:38.911 T [main      ] - TestActor activate: TestActor[name=actor02, category=default, messages=0]  
  21. 12:59:38.912 T [main      ] - TestActor activate: TestActor[name=actor01, category=default, messages=0]  
  22. 12:59:38.914 T [main      ] - TestActor activate: TestActor[name=actor00, category=default, messages=0]  
  23. 12:59:38.915 T [main      ] - TestActor activate: TestActor[name=actor19, category=default, messages=0]  
  24. 12:59:38.916 T [main      ] - TestActor activate: TestActor[name=actor06, category=default, messages=0]  
  25. 12:59:38.917 T [main      ] - TestActor activate: TestActor[name=actor18, category=default, messages=0]  
  26. 12:59:38.917 T [main      ] - TestActor activate: TestActor[name=actor05, category=default, messages=0]  
  27. 12:59:38.918 T [main      ] - TestActor activate: TestActor[name=actor04, category=default, messages=0]  
  28. 12:59:38.919 T [main      ] - TestActor activate: TestActor[name=actor03, category=default, messages=0]  
  29. 12:59:38.920 T [main      ] - TestActor activate: TestActor[name=actor15, category=default, messages=0]  
  30. 12:59:38.921 T [main      ] - TestActor activate: TestActor[name=actor14, category=default, messages=0]  
  31. 12:59:38.922 T [main      ] - TestActor activate: TestActor[name=actor09, category=default, messages=0]  
  32. 12:59:38.923 T [main      ] - TestActor activate: TestActor[name=actor17, category=default, messages=0]  
  33. 12:59:38.923 T [main      ] - TestActor activate: TestActor[name=actor08, category=default, messages=0]  
  34. 12:59:38.924 T [main      ] - TestActor activate: TestActor[name=actor16, category=default, messages=0]  
  35. 12:59:38.925 T [main      ] - TestActor activate: TestActor[name=actor07, category=default, messages=0]  
  36. 12:59:38.926 T [main      ] - main waiting: 120...  
  37. 12:59:42.970 T [actor3    ] - TestActor:common06 init(4): TestActor[name=common06, category=TestActor, messages=0]  
  38. 12:59:43.028 T [actor2    ] - TestActor:common03 init(8): TestActor[name=common03, category=TestActor, messages=0]  
  39. 12:59:43.048 T [actor1    ] - TestActor:common01 init(6): TestActor[name=common01, category=TestActor, messages=0]  
  40. 12:59:43.054 T [actor8    ] - TestActor:common05 init(4): TestActor[name=common05, category=TestActor, messages=0]  
  41. 12:59:43.064 T [actor6    ] - TestActor:common09 init(8): TestActor[name=common09, category=TestActor, messages=0]  
  42. 12:59:43.873 T [actor0    ] - TestActor:common02 init(7): TestActor[name=common02, category=TestActor, messages=0]  
  43. 12:59:43.898 T [actor9    ] - TestActor:common08 init(1): TestActor[name=common08, category=TestActor, messages=0]  
  44. 12:59:43.993 T [actor7    ] - TestActor:common04 init(2): TestActor[name=common04, category=TestActor, messages=1]  
  45. 12:59:43.994 T [actor5    ] - TestActor:common00 init(1): TestActor[name=common00, category=TestActor, messages=0]  
  46. 12:59:43.996 W [actor2    ] - init:common03 to is null: actor5  
  47. 12:59:44.039 W [actor8    ] - init:common05 to is null: actor9  
  48. 12:59:44.052 W [actor1    ] - init:common01 to is null: actor9  
  49. 12:59:44.060 T [actor4    ] - TestActor:common07 init(5): TestActor[name=common07, category=TestActor, messages=0]  
  50. 12:59:44.912 W [actor0    ] - init:common02 to is null: actor6  
  51. 12:59:44.968 W [actor5    ] - init:common00 to is null: actor9  
  52. 12:59:44.986 W [actor3    ] - init:common06 to is null: actor1  
  53. 12:59:44.986 W [actor7    ] - init:common04 to is null: actor9  
  54. 12:59:45.108 W [actor4    ] - init:common07 to is null: actor8  
  55. 12:59:45.947 T [actor9    ] - TestActor:actor13 init(7): TestActor[name=actor13, category=default, messages=0]  
  56. 12:59:45.951 T [actor5    ] - TestActor:actor22 init(1): TestActor[name=actor22, category=default, messages=1]  
  57. 12:59:45.976 W [actor3    ] - init:common06 to is null: actor1  
  58. 12:59:46.016 W [actor2    ] - init:common03 to is null: actor6  
  59. 12:59:46.052 W [actor1    ] - init:common01 to is null: actor6  
  60. 12:59:46.115 W [actor4    ] - init:common07 to is null: actor9  
  61. 12:59:46.932 W [actor3    ] - init:common06 to is null: actor5  
  62. 12:59:46.940 W [actor5    ] - init:actor22 to is null: actor3  
  63. 12:59:47.039 T [actor7    ] - TestActor:actor24 init(8): TestActor[name=actor24, category=default, messages=1]  
  64. 12:59:47.042 W [actor1    ] - init:common01 to is null: actor6  
  65. 12:59:47.059 W [actor2    ] - init:common03 to is null: actor0  
  66. 12:59:47.904 T [actor8    ] - TestActor:actor21 init(7): TestActor[name=actor21, category=default, messages=1]  
  67. 12:59:47.964 T [actor3    ] - TestActor:actor10 init(4): TestActor[name=actor10, category=default, messages=0]  
  68. 12:59:47.988 T [actor5    ] - TestActor:actor23 init(7): TestActor[name=actor23, category=default, messages=2]  
  69. 12:59:48.003 W [actor6    ] - init:common09 to is null: actor3  
  70. 12:59:48.042 W [actor1    ] - init:common01 to is null: actor6  
  71. 12:59:48.072 W [actor2    ] - init:common03 to is null: actor8  
  72. 12:59:48.101 W [actor4    ] - init:common07 to is null: actor6  
  73. 12:59:48.880 W [actor8    ] - init:actor21 to is null: actor8  
  74. 12:59:48.903 W [actor9    ] - init:actor13 to is null: actor3  
  75. 12:59:48.952 T [main      ] - main waiting: 110...  
  76. 12:59:49.024 W [actor2    ] - init:common03 to is null: actor3  
  77. 12:59:49.037 W [actor6    ] - init:common09 to is null: actor9  
  78. 12:59:49.085 W [actor4    ] - init:common07 to is null: actor6  

 http://zhwj184.iteye.com/blog/1613351 

 

http://www.ibm.com/developerworks/cn/java/j-javaactors/

分享到:
评论

相关推荐

    actor模型java实现源码

    Actor模型的核心思想是将并发处理中的实体——也就是执行单元——抽象为“Actor”,每个Actor都有自己的状态,并且通过异步消息传递与其他Actor进行通信。这种模型特别适合于多核处理器和分布式系统,因为它能够有效...

    akka actor模型开发库 v2.6.14 pc端

    该软件是为了解决分布式编程中一系列的编程问题而设计,是非常实用的Java和Scala的Actor模型应用,支持多种运行系统进行使用,且安全性高,操作简单,用起来也是非常的方便的! 1、Actor之间完全独立; 2、Actor由...

    akka java实现tcp远程调用

    1. Akka Actor模型:理解Actor的异步消息传递和并发特性。 2. Akka ActorSystem:创建和配置ActorSystem以支持远程调用。 3. 配置文件:编写`application.conf`以启用远程部署和指定TCP设置。 4. 服务端和客户端...

    响应式架构小模式Actor实现与scala

    它利用Actor模型提供并发控制,通过消息传递来构建非阻塞、轻量级的处理单元。 3. **Akka与Scala的集成**:Scala和Akka的结合,使得开发响应式系统更加自然和高效。由于Akka和Scala都是基于JVM的,它们能够利用JVM...

    actors-in-java:玩Java中的actor框架

    这个"actors-in-java"项目看起来是一个实验性的框架,用于在Java中实现Actor模型,尽管它可能并不适合实际生产环境,而是用于学习和探索Actor模型的工作原理。 Actor模型的核心思想是将计算过程分解为独立的、并发...

    并行计算 多线程 actor 协程 process

    多线程、Actor模型、协程和进程是实现并行计算的四种常见方式。 一、多线程(Multi-Threading) 多线程是并发编程的基础,允许一个应用程序同时执行多个任务。在Java、C#等语言中,多线程通过创建和管理线程来实现...

    AkkaJava PDF Doc

    在Actor模型中,Actor是基本的计算实体,它通过发送和接收消息与其他Actor通信。每个Actor是独立的,拥有自己的行为和消息队列,且无法直接访问其他Actor的状态。 **什么是Akka?** Akka是一个建立在Actor模型之上...

    响应式架构 消息模式Actor实现与Scala.Akka应用集成

    在Akka框架中,Actor模型得到了广泛的应用,Akka是一个开源的Java和Scala库,它通过Actor模型提供了一种简单的方法来构建并发和分布式系统。 使用Akka框架时,开发者可以通过定义消息类型来与Actor进行通信。消息是...

    Akka Actor Tutorial代码

    它基于Actor模型,该模型将并发问题简化为独立的实体,即Actors,它们通过消息传递进行通信。这种设计允许系统在不共享状态的情况下工作,减少了竞态条件和死锁的风险。 **Scala与Akka Actor** Scala是Akka的主要...

    AkkaJava.pdf

    Actor模型是一种并发模型,其中actor是封装了状态和行为的轻量级实体,通过消息传递相互通信。Akka特别适合于在多核处理器上构建高性能、可扩展、容错的应用程序。 在本篇文档中,将会介绍Akka的基本概念和使用场景...

    基于ECS(Entity component System)构建的分布式游戏服务端框架,同时提供Actor模型

    - 可能是用于构建上述游戏服务端的开源框架,提供了ECS和Actor模型的实现,帮助开发者快速搭建游戏后端。 - 框架可能包含核心组件库、系统管理模块、网络通信库、状态管理工具等,方便开发者进行定制化开发。 5. ...

    用Scala写的akka actor简单demo

    Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言,支持函数式和面向对象编程,与Akka的集成非常紧密。 Akka Actor是Akka的核心组件,它是一种并发原语,...

    Actor Framework Hands-On Instructions_ActorFramework_labview_Han

    2. **Actor模型**:在LabVIEW中实现Actor模型,我们需要创建一系列独立运行的VI(Virtual Instruments),每个VI代表一个actor,有自己的工作空间和状态。它们通过事件结构进行通信,相当于消息队列,一个VI发送消息...

    akka-typed-actor-1.0-RC2.jar.zip

    在传统的Actor模型中,消息传递是通过继承自`AnyRef`的任意对象进行的,这可能导致类型混淆和运行时错误。Typed Actor通过引入强类型的消息协议,使得Actor之间的交互更为清晰和安全。 2. 创建和配置Typed Actor ...

    ProtoActorSample.zip

    - Actor模型是一种并发计算模型,其中每个Actor都是一个独立的实体,有自己的状态和行为,通过消息传递进行通信。ProtoActor实现了这种模型,使得开发者可以轻松地处理高并发和分布式场景下的复杂问题。 2. **...

    Java的Akka学习入门文档

    Akka,由Lightbend公司开发,是一个强大的、基于actor模型的框架,它为构建高度可伸缩、容错和反应式的Java及Scala应用程序提供了基础。在本文档中,我们将深入探讨Akka的核心概念和主要功能。 1. **Actor模型**:...

Global site tag (gtag.js) - Google Analytics