- 浏览: 753216 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lgh1992314:
a offset: 26b offset: 24c offse ...
java jvm字节占用空间分析 -
ls0609:
语音实现在线听书http://blog.csdn.net/ls ...
Android 语音输入API使用 -
wangli61289:
http://viralpatel-net-tutorials ...
Android 语音输入API使用 -
zxjlwt:
学习了素人派http://surenpi.com
velocity宏加载顺序 -
tt5753:
谢啦........
Lucene的IndexWriter初始化时的LockObtainFailedException的解决方法
java actor模型和消息传递实现分析
Actor模型是一种基于协程的消息传递模型,在并行计算和并发的消息传递中有很好的性能表现。一般的actor模块框架提供了超轻量级的线程和工具,可以在这些线程之间进行快速、安全、零复制的消息传递。在elang,ruby,lua等语言中都是直接在VM级别支持协程,VM帮你做context的保存和恢复。而在java中,却没有内置actor模型实现,但是有几个开源框架也模拟了actor模型的实现。
基于 actor 的系统通过实现一种消息传递模式,使并行处理更容易编码。在此模式中,系统中的每个 actor 都可接收消息;执行该消息所表示的操作;然后将消息发送给其他 actor(包括它们自己)以执行复杂的操作序列。actor 之间的所有消息是异步的,这意味着发送者会在收到任何回复之前继续进行处理。因此,一个 actor 可能终生都陷入接收和处理消息的无限循环中。
当使用多个 actor 时,独立的活动可轻松分配到多个可并行执行消息的线程上(进而分配在多个处理器上)。一般而言,每个 actor 都在一个独立线程上处理消息。一些 actor 系统静态地向 actor 分配线程;而其他系统(比如本文中介绍的系统)则会动态地分配它们。
下面我们会分析下java中的一个actor模型框架的实现:
我们先看下elang中的actor模型的实现:
以Erlang为例子,介绍一下简单的Actor模型
1.首先建立一个Actor,在erlang中,起一个进程(这个是erlang虚拟机进程,跟os进程不同),这个进程就是actor了,可以用来接收和发送各种消息了
Pid = spawn(Mod,func,Args) %起一个进程
2.处理收到的消息
func()->
receive
{From,Msg}-> %收到一个消息
%%do something
func();
3.要对这个actor发送消息,也非常简单
Pid ! {From,Msg}
ujavaactor框架:
下面摘自ibm developer的一段介绍
μJavaActors 是 actor 系统的一个简单的 Java 实现。只有 1,200 行代码,μJavaActors 虽然很小,但很强大。在下面的练习中,您将学习如何使用 μJavaActors 动态地创建和管理 actor,将消息传送给它们。
μJavaActors 围绕 3 个核心界面而构建:
-
消息是在 actor 之间发送的消息。
Message
是 3 个(可选的)值和一些行为的容器:-
source
是发送 actor。 -
subject
是定义消息含义的字符串(也称为命令)。 -
data
是消息的任何参数数据;通常是一个映射、列表或数组。参数可以是要处理和/或其他 actor 要与之交互的数据。 -
subjectMatches()
检查消息主题是否与字符串或正则表达式匹配。
DefaultMessage
。 -
-
ActorManager是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。
ActorManager
拥有以下关键行为或特征:-
createActor()
创建一个 actor 并将它与此管理器相关联。 -
startActor()
启动一个 actor。 -
detachActor()
停止一个 actor 并将它与此管理器断开。 -
send()/broadcast()
将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或所有 actor。
ActorManager
,但如果您希望管理多个线程和/或 actor 池,也可以有多个ActorManager
。此接口的默认实现是DefaultActorManager
。 -
-
Actor是一个执行单元,一次处理一条消息。
Actor
具有以下关键行为或特征:-
每个 actor 有一个
name
,该名称在每个ActorManager
中必须是惟一的。 -
每个 actor 属于一个
category
;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。 -
只要
ActorManager
可以提供一个执行 actor 的线程,系统就会调用receive()
。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(比如等待人为输入)。 -
willReceive()
允许 actor 过滤潜在的消息主题。 -
peek()
允许该 actor 和其他 actor 查看是否存在挂起的消息(或许是为了选择主题)。 -
remove()
允许该 actor 和其他 actor 删除或取消任何尚未处理的消息。 -
getMessageCount()
允许该 actor 和其他 actor 获取挂起的消息数量。 -
getMaxMessageCount()
允许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。
AbstractActor
的抽象类,actor 实现基于该类。 -
每个 actor 有一个
public abstract class AbstractActor extends Utils implements Actor { public static final int DEFAULT_MAX_MESSAGES = 100; protected DefaultActorManager manager; public ActorManager getManager() { return manager; } public void setManager(DefaultActorManager manager) { if (this.manager != null && manager != null) { throw new IllegalStateException( "cannot change manager of attached actor"); } this.manager = manager; } protected String name; @Override public String getName() { return name; } @Override public void setName(String name) { if (manager != null) { throw new IllegalStateException("cannot change name if manager set"); } this.name = name; } protected String category = DEFAULT_CATEGORY; @Override public String getCategory() { return category; } @Override public void setCategory(String category) { this.category = category; } /** * Process a message conditionally. If testMessage() returns null no message * will be consumed. * * @see AbstractActor#testMessage() */ @Override public boolean receive() { Message m = testMessage(); boolean res = m != null; if (res) { boolean f = remove(m); if (!f) { logger.warning("receive message not removed: %s", m); } DefaultMessage dm = (DefaultMessage) m; try { dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.DELIVERED)); //logger.trace("receive %s processing %s", this.getName(), m); loopBody(m); dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.COMPLETED)); } catch (Exception e) { dm.fireMessageListeners(new MessageEvent(this, dm, MessageEvent.MessageStatus.FAILED)); logger.error("loop exception", e); } } manager.awaitMessage(this); return res; } /** * Test to see if a message should be processed. Subclasses should override */ @Override public boolean willReceive(String subject) { return !isEmpty(subject); // default receive all subjects } /** Test the current message. Default action is to accept all. */ protected Message testMessage() { return getMatch(null, false); } /** Process the accepted subject. */ abstract protected void loopBody(Message m); /** Test a message against a defined subject pattern. */ protected DefaultMessage getMatch(String subject, boolean isRegExpr) { DefaultMessage res = null; synchronized (messages) { res = (DefaultMessage) peekNext(subject, isRegExpr); } return res; } protected List<DefaultMessage> messages = new LinkedList<DefaultMessage>(); public DefaultMessage[] getMessages() { return messages.toArray(new DefaultMessage[messages.size()]); } @Override public int getMessageCount() { synchronized (messages) { return messages.size(); } } /** * Limit the number of messages that can be received. Subclasses should override. */ @Override public int getMaxMessageCount() { return DEFAULT_MAX_MESSAGES; } /** Queue a messaged to be processed later. */ public void addMessage(DefaultMessage message) { if (message != null) { synchronized (messages) { if (messages.size() < getMaxMessageCount()) { messages.add(message); // messages.notifyAll(); } else { throw new IllegalStateException("too many messages, cannot add"); } } } } @Override public Message peekNext() { return peekNext(null); } @Override public Message peekNext(String subject) { return peekNext(subject, false); } /** * See if a message exists that meets the selection criteria. **/ @Override public Message peekNext(String subject, boolean isRegExpr) { Message res = null; if (isActive) { Pattern p = subject != null ? (isRegExpr ? Pattern.compile(subject) : null) : null; long now = new Date().getTime(); synchronized (messages) { for (DefaultMessage m : messages) { if (m.getDelayUntil() <= now) { boolean match = subject == null || (isRegExpr ? m.subjectMatches(p) : m .subjectMatches(subject)); if (match) { res = m; break; } } } } } // logger.trace("peekNext %s, %b: %s", subject, isRegExpr, res); return res; } @Override public boolean remove(Message message) { synchronized (messages) { return messages.remove(message); } } protected boolean isActive; public boolean isActive() { return isActive; } @Override public void activate() { isActive = true; } @Override public void deactivate() { isActive = false; } /** Do startup processing. */ protected void runBody() { DefaultMessage m = new DefaultMessage("init"); getManager().send(m, null, this); } @Override public void run() { runBody(); ((DefaultActorManager) getManager()).awaitMessage(this); } protected boolean hasThread; public boolean getHasThread() { return hasThread; } protected void setHasThread(boolean hasThread) { this.hasThread = hasThread; } @Override public String toString() { return getClass().getSimpleName() + "[" + bodyString() + "]"; } protected String bodyString() { return "name=" + name + ", category=" + category + ", messages=" + messages.size(); } volatile protected boolean shutdown; @Override public boolean isShutdown() { return shutdown; } @Override public void shutdown() { shutdown = true; } volatile protected boolean suspended; @Override public void setSuspended(boolean f) { suspended = f; } @Override public boolean isSuspended() { return suspended; } }
我们需要继承AbstractActor抽象类,实现loopBody方法来处理消息内容。
其中receive方法是接收一条消息并进行处理,首先取出一条消息,从消息列表中删除,然后出发消息接收前的MessageEvent.MessageStatus.DELIVERED事件,我们可以实现MessageListener接口来监听消息处理的时间,然后通过抽象方法loopBody来处理消息,最后触发消息接收后的MessageEvent.MessageStatus.COMPLETED事件。
peekNext只是取出消息。
其中isActive表示是否actor已经激活,hasThread表示是否已经分配线程在执行,shutdown是否已经关闭,suspended是否已经挂起。
我们看下defaultMessage消息体:
public class DefaultMessage extends Utils implements Message { ...... protected Actor source; protected String subject; protected Object data; protected List<MessageListener> listeners = new LinkedList<MessageListener>(); public void addMessageListener(MessageListener l) { if (!listeners.contains(l)) { listeners.add(l); } } public void removeMessageListener(MessageListener l) { listeners.remove(l); } public void fireMessageListeners(MessageEvent e) { for(MessageListener l : listeners) { l.onMessage(e); } } ...... }
/** * Get the default instance. Uses ActorManager.properties for configuration. * * @return shared instance */ public static DefaultActorManager getDefaultInstance() { if (instance == null) { instance = new DefaultActorManager(); Map<String, Object> options = null; // ConfigUtils configUtils = new ConfigUtils(); // Properties p = configUtils // .loadProperties("ActorManager.properties"); Properties p = new Properties(); try { p.load(new FileInputStream("ActorManager.properties")); } catch (IOException e) { try { p.load(new FileInputStream("/resource/ActorManager.properties")); } catch (IOException e1) { logger.warning("DefaultActorManager: no configutration: " + e); } } if (!isEmpty(p)) { options = new HashMap<String, Object>(); for (Object key : p.keySet()) { String skey = (String) key; options.put(skey, p.getProperty(skey)); } } instance.initialize(options); } return instance; } protected ThreadGroup threadGroup; protected static int groupCount; protected List<Thread> threads = new LinkedList<Thread>(); //保存线程池的线程列表 /** * Initialize this manager. Call only once. * * @param options * map of options */ @Override public void initialize(Map<String, Object> options) { if (!initialized) { initialized = true; int count = getThreadCount(options); ThreadGroup tg = new ThreadGroup("ActorManager" + groupCount++); threadGroup = tg; for (int i = 0; i < count; i++) { createThread(i); } running = true; for (Thread t : threads) { // logger.trace("procesNextActor starting %s", t); t.start(); } Thread Counter = new Thread(new Runnable() { @Override public void run() { while (running) { try { trendValue = sendCount - dispatchCount; // logger.trace("Counter thread: sc=%d, dc=%d, t=%d", // sendCount, dispatchCount, trendValue); lastSendCount = sendCount; sendCount = 0; updateLastDispatchCount(); Thread.sleep(1000); } catch (InterruptedException e) { break; } } sendCount = lastSendCount = 0; clearDispatchCount(); } }); Counter.setDaemon(true); lastDispatchTime = lastSendTime = new Date().getTime(); Counter.start(); } } protected void createThread(int i) { addThread("actor" + i); } /** * Add a dynamic thread. * * @param name * @return */ public Thread addThread(String name) { Thread t = null; synchronized (actors) { if (trunnables.containsKey(name)) { throw new IllegalStateException("already exists: " + name); } ActorRunnable r = new ActorRunnable(); trunnables.put(name, r); t = new Thread(threadGroup, r, name); threads.add(t); //System.out.printf("addThread: %s", name); } t.setDaemon(true); t.setPriority(getThreadPriority()); return t; }
/** Configuration key for thread count. */ public static final String ACTOR_THREAD_COUNT = "threadCount"; protected Map<String, AbstractActor> actors = new LinkedHashMap<String, AbstractActor>(); protected Map<String, AbstractActor> runnables = new LinkedHashMap<String, AbstractActor>(); protected Map<String, AbstractActor> waiters = new LinkedHashMap<String, AbstractActor>();
actors存放所有的actor对象,runnables存放待发送消息(发送方)的actor,waiters存放等待(接收方接收消息)的actors。
创建一个actor实例:
/** * Create an actor and associate it with this manager. * * @param clazz * the actor class * @param the * actor name; must be unique * @param options * actor options */ @Override public Actor createActor(Class<? extends Actor> clazz, String name, Map<String, Object> options) { AbstractActor a = null; synchronized (actors) { if (!actors.containsKey(name)) { try { a = (AbstractActor) clazz.newInstance(); a.setName(name); a.setManager(this); } catch (Exception e) { throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException( "mapped exception: " + e, e); } } else { throw new IllegalArgumentException("name already in use: " + name); } } return a; } /** * Start an actor. Must have been created by this manager. * * @param actor * the actor */ @Override public void startActor(Actor actor) { if (((AbstractActor) actor).getManager() != this) { throw new IllegalStateException("actor not owned by this manager"); } String name = actor.getName(); synchronized (actors) { if (actors.containsKey(name)) { throw new IllegalStateException("already started"); } ((AbstractActor) actor).shutdown = false; actors.put(name, (AbstractActor) actor); runnables.put(name, (AbstractActor) actor); } actor.activate(); }
启动一个actor实例会把actor放入actors队列和runnables等待执行队列(发送方发送消息)中,然后调用actor的activate()方法做初始化。
我们再看看ActorRunnable线程池中的线程执行actor的过程:
actorRunnable.java是DefaultActorManager的内部类,可以用于访问DefaultActorManager的actor实例列表
procesNextActor 从runnables的待执行actor列表中取出第一个(fifo)actor,如果不为空,则执行该actor线程的run方法,发送消息,actor接口是继承了runable接口的,如果runnables列表为空,则从waiters列表中取出一个actor执行,接收消息,接收到消息,响应的增加dispatchCount的值,表示已经消费的消息数。如果procesNextActor执行失败,则主线程等待100ms,否则循环执行。
/** public intended only for "friend" access. */ public class ActorRunnable implements Runnable { public boolean hasThread; public AbstractActor actor; public void run() { // logger.trace("procesNextActor starting"); int delay = 1; while (running) { try { if (!procesNextActor()) { // logger.trace("procesNextActor waiting on actor"); // sleep(delay * 1000); synchronized (actors) { // TOOD: adjust this delay; possible parameter // we want to minizmize overhead (make bigger); // but it has a big impact on message processing // rate (makesmaller) // actors.wait(delay * 1000); actors.wait(100); } delay = Math.max(5, delay + 1); } else { delay = 1; } } catch (InterruptedException e) { } catch (Exception e) { logger.error("procesNextActor exception", e); } } // logger.trace("procesNextActor ended"); } protected boolean procesNextActor() { boolean run = false, wait = false, res = false; actor = null; synchronized (actors) { for (String key : runnables.keySet()) { actor = runnables.remove(key); break; } } if (actor != null) { // first run never started run = true; actor.setHasThread(true); hasThread = true; try { actor.run(); } finally { actor.setHasThread(false); hasThread = false; } } else { synchronized (actors) { for (String key : waiters.keySet()) { actor = waiters.remove(key); break; } } if (actor != null) { // then waiting for responses wait = true; actor.setHasThread(true); hasThread = true; try { res = actor.receive(); if (res) { incDispatchCount(); } } finally { actor.setHasThread(false); hasThread = false; } } } // if (!(!run && wait && !res) && a != null) { // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a); // } return run || res; } }
接下来就是调用发送消息的api了,send将消息message从from发送到to actor中,sentMessages保存了每个actor的消息列表,还有批量发送到多个actor中,send(Message message, Actor from, String category)是将消息发送到分类为category的actor中。broadcast是将消息发送到所有的actor中。
/** * Send a message. * * @param message * message to * @param from * source actor * @param to * target actor * @return number of receiving actors */ @Override public int send(Message message, Actor from, Actor to) { int count = 0; if (message != null) { AbstractActor aa = (AbstractActor) to; if (aa != null) { if (!aa.isShutdown() && !aa.isSuspended() && aa.willReceive(message.getSubject())) { DefaultMessage xmessage = (DefaultMessage) ((DefaultMessage) message).assignSender(from); // logger.trace(" %s to %s", xmessage, to); aa.addMessage(xmessage); xmessage.fireMessageListeners(new MessageEvent(aa, xmessage, MessageEvent.MessageStatus.SENT)); sendCount++; lastSendTime = new Date().getTime(); if (recordSentMessages) { synchronized (sentMessages) { String aname = aa.getName(); List<Message> l = sentMessages.get(aname); if (l == null) { l = new LinkedList<Message>(); sentMessages.put(aname, l); } // keep from getting too big if (l.size() < 100) { l.add(xmessage); } } } count++; synchronized (actors) { actors.notifyAll(); } } } } return count; } /** * Send a message. * * @param message * message to * @param from * source actor * @param to * target actors * @return number of receiving actors */ @Override public int send(Message message, Actor from, Actor[] to) { int count = 0; for (Actor a : to) { count += send(message, from, a); } return count; } /** * Send a message. * * @param message * message to * @param from * source actor * @param to * target actors * @return number of receiving actors */ @Override public int send(Message message, Actor from, Collection<Actor> to) { int count = 0; for (Actor a : to) { count += send(message, from, a); } return count; } /** * Send a message. * * @param message * message to * @param from * source actor * @param category * target actor category * @return number of receiving actors */ @Override public int send(Message message, Actor from, String category) { int count = 0; Map<String, Actor> xactors = cloneActors(); List<Actor> catMembers = new LinkedList<Actor>(); for (String key : xactors.keySet()) { Actor to = xactors.get(key); if (category.equals(to.getCategory()) && (to.getMessageCount() < to.getMaxMessageCount())) { catMembers.add(to); } } // find an actor with lowest message count int min = Integer.MAX_VALUE; Actor amin = null; for (Actor a : catMembers) { int mcount = a.getMessageCount(); if (mcount < min) { min = mcount; amin = a; } } if (amin != null) { count += send(message, from, amin); // } else { // throw new // IllegalStateException("no capable actors for category: " + // category); } return count; } /** * Send a message to all actors. * * @param message * message to * @param from * source actor * @return number of receiving actors */ @Override public int broadcast(Message message, Actor from) { int count = 0; Map<String, Actor> xactors = cloneActors(); for (String key : xactors.keySet()) { Actor to = xactors.get(key); count += send(message, from, to); } return count; }
这里每次获取所有的actor列表都是采用clone方式复制当前的actor列表,如果长时间加锁,则会降低并发能力。
protected Map<String, Actor> cloneActors() { Map<String, Actor> xactors; synchronized (actors) { xactors = new HashMap<String, Actor>(actors); } return xactors; }
当然这里还有termina所有的线程
/** * Terminate processing and wait for all threads to stop. */ @Override public void terminateAndWait() { logger.trace("terminateAndWait waiting on termination of %d threads", threads.size()); terminate(); waitForThreads(); } /** * Wait for all threads to stop. Must have issued terminate. */ public void waitForThreads() { if (!terminated) { throw new IllegalStateException("not terminated"); } for (Thread t : threads) { try { // logger.info("terminateAndWait waiting for %s...", t); t.join(); } catch (InterruptedException e) { // logger.info("terminateAndWait interrupt"); } } } boolean running, terminated; /** * Terminate processing. */ @Override public void terminate() { terminated = true; running = false; for (Thread t : threads) { t.interrupt(); } synchronized (actors) { for (String key : actors.keySet()) { actors.get(key).deactivate(); } } sentMessages.clear(); sendCount = lastSendCount = 0; clearDispatchCount(); }
也可以挂起某个actor,直到有消息达到,这里就是讲actor加入waiters列表,注意上面send发送消息之后,会调用acoters.nofityAll唤醒等待线程
/** * Suspend an actor until it has a read message. * * @param actor * receiving actor */ public void awaitMessage(AbstractActor actor) { synchronized (actors) { waiters.put(actor.getName(), actor); // actors.notifyAll(); // logger.trace("awaitMessage waiters=%d: %s",waiters.size(), a); } }
startActor时才将actor加入runnables队列。
示例代码:
import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; import com.ibm.actor.Actor; import com.ibm.actor.DefaultActorManager; import com.ibm.actor.DefaultMessage; import com.ibm.actor.logging.DefaultLogger; import com.ibm.actor.utils.Utils; /** * A set of runtime services for testing actors and a test case driver. * * @author BFEIGENB * */ public class DefaultActorTest extends Utils { public static final int MAX_IDLE_SECONDS = 10; // public static final int STEP_COUNT = 3 * 60; public static final int TEST_VALUE_COUNT = 1000; // TODO: make bigger public DefaultActorTest() { super(); } private Map<String, Actor> testActors = new ConcurrentHashMap<String, Actor>(); static Random rand = new Random(); public static int nextInt(int limit) { return rand.nextInt(limit); } protected DefaultActorManager getManager() { DefaultActorManager am = actorManager != null ? actorManager : new DefaultActorManager(); return am; } protected int stepCount = 120; public void setStepCount(int stepCount) { this.stepCount = stepCount; } public int getStepCount() { return stepCount; } protected int threadCount = 10; public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } public void setTestActors(Map<String, Actor> testActors) { this.testActors = testActors; } public Map<String, Actor> getTestActors() { return testActors; } public static final int COMMON_ACTOR_COUNT = 10; public static final int TEST_ACTOR_COUNT = 25; public static final int PRODUCER_ACTOR_COUNT = 25; public static void sleeper(int seconds) { int millis = seconds * 1000 + -50 + nextInt(100); // a little // variation // logger.trace("sleep: %dms", millis); sleep(millis); } public static void dumpMessages(List<DefaultMessage> messages) { synchronized (messages) { if (messages.size() > 0) { for (DefaultMessage m : messages) { logger.info("%s", m); } } } } protected List<ChangeListener> listeners = new LinkedList<ChangeListener>(); public void addChangeListener(ChangeListener l) { if (!listeners.contains(l)) { listeners.add(l); } } public void removeChangeListener(ChangeListener l) { listeners.remove(l); } protected void fireChangeListeners(ChangeEvent e) { for (ChangeListener l : listeners) { l.stateChanged(e); } } protected static String[] types = new String[] { "widget", "framit", "frizzle", "gothca", "splat" }; public static String[] getItemTypes() { return types; } public static void main(String[] args) { DefaultActorTest at = new DefaultActorTest(); at.run(args); logger.trace("Done"); } protected String title; public String getTitle() { return title; } volatile protected boolean done; public void terminateRun() { done = true; } public static String[] getTestNames() { return new String[] { "Countdown", "Producer Consumer", /* "Quicksort", */"MapReduce", "Virus Scan", "All" }; } DefaultActorManager actorManager; public DefaultActorManager getActorManager() { return actorManager; } public void setActorManager(DefaultActorManager actorManager) { this.actorManager = actorManager; } public void run(String[] args) { done = false; // DefaultLogger.getDefaultInstance().setIncludeDate(false); DefaultLogger.getDefaultInstance().setIncludeContext(false); DefaultLogger.getDefaultInstance().setIncludeCaller(false); // DefaultLogger.getDefaultInstance().setIncludeThread(false); DefaultLogger.getDefaultInstance().setLogToFile(false); DefaultLogger.getDefaultInstance().setThreadFieldWidth(10); int sc = stepCount; int tc = threadCount; boolean doTest = false; title = ""; if (!doTest ) { doTest = true; } if (doTest) { if (title.length() > 0) { title += " "; } title += "(Countdown Test)"; } DefaultActorManager am = getManager(); try { Map<String, Object> options = new HashMap<String, Object>(); options.put(DefaultActorManager.ACTOR_THREAD_COUNT, tc); am.initialize(options); if (doTest) { for (int i = 0; i < COMMON_ACTOR_COUNT; i++) { Actor a = am.createActor(TestActor.class, String.format("common%02d", i)); if (a instanceof TestableActor) { TestableActor ta = (TestableActor) a; ta.setActorTest(this); } a.setCategory(TestActor.class.getSimpleName()); getTestActors().put(a.getName(), a); // logger.trace("created: %s", a); } for (int i = 0; i < TEST_ACTOR_COUNT; i++) { Actor a = am.createActor(TestActor.class, String.format("actor%02d", i)); if (a instanceof TestableActor) { TestableActor ta = (TestableActor) a; ta.setActorTest(this); } getTestActors().put(a.getName(), a); // logger.trace("created: %s", a); } } for (String key : getTestActors().keySet()) { am.startActor(getTestActors().get(key)); } for (int i = sc; i > 0; i--) { if (done) { break; } // see if idle a while long now = new Date().getTime(); if (am.getActiveRunnableCount() == 0) { if (now - am.getLastDispatchTime() > MAX_IDLE_SECONDS * 1000 && now - am.getLastSendTime() > MAX_IDLE_SECONDS * 1000) { break; } } setStepCount(i); fireChangeListeners(new ChangeEvent(this)); if (i < 10 || i % 10 == 0) { logger.trace("main waiting: %d...", i); } sleeper(1); } setStepCount(0); fireChangeListeners(new ChangeEvent(this)); // logger.trace("main terminating"); am.terminateAndWait(); } catch (Exception e) { e.printStackTrace(); } } }
/** * An actor that sends messages while counting down a send count. * * @author BFEIGENB * */ public class TestActor extends TestableActor { @Override public void activate() { logger.trace("TestActor activate: %s", this); super.activate(); } @Override public void deactivate() { logger.trace("TestActor deactivate: %s", this); super.deactivate(); } @Override protected void runBody() { // logger.trace("TestActor:%s runBody: %s", getName(), this); DefaultActorTest.sleeper(1); DefaultMessage m = new DefaultMessage("init", 8); getManager().send(m, null, this); } @Override protected void loopBody(Message m) { // logger.trace("TestActor:%s loopBody %s: %s", getName(), m, this); DefaultActorTest.sleeper(1); String subject = m.getSubject(); if ("repeat".equals(subject)) { int count = (Integer) m.getData(); logger.trace("TestActor:%s repeat(%d) %s: %s", getName(), count, m, this); if (count > 0) { m = new DefaultMessage("repeat", count - 1); // logger.trace("TestActor loopBody send %s: %s", m, this); String toName = "actor" + DefaultActorTest .nextInt(DefaultActorTest.TEST_ACTOR_COUNT); Actor to = actorTest.getTestActors().get(toName); if (to != null) { getManager().send(m, this, to); } else { logger.warning("repeat:%s to is null: %s", getName(), toName); } } } else if ("init".equals(subject)) { int count = (Integer) m.getData(); count = DefaultActorTest.nextInt(count) + 1; logger.trace("TestActor:%s init(%d): %s", getName(), count, this); for (int i = 0; i < count; i++) { DefaultActorTest.sleeper(1); m = new DefaultMessage("repeat", count); // logger.trace("TestActor runBody send %s: %s", m, this); String toName = "actor" + DefaultActorTest .nextInt(DefaultActorTest.TEST_ACTOR_COUNT); Actor to = actorTest.getTestActors().get(toName); if (to != null) { getManager().send(m, this, to); } else { logger.warning("init:%s to is null: %s", getName(), toName); } DefaultMessage dm = new DefaultMessage("repeat", count); dm.setDelayUntil(new Date().getTime() + (DefaultActorTest.nextInt(5) + 1) * 1000); getManager().send(dm, this, this.getClass().getSimpleName()); } } else { logger.warning("TestActor:%s loopBody unknown subject: %s", getName(), subject); } } }
输出:
12:59:38.883 T [main ] - TestActor activate: TestActor[name=common06, category=TestActor, messages=0] 12:59:38.886 T [main ] - TestActor activate: TestActor[name=common05, category=TestActor, messages=0] 12:59:38.887 T [main ] - TestActor activate: TestActor[name=common08, category=TestActor, messages=0] 12:59:38.889 T [main ] - TestActor activate: TestActor[name=common07, category=TestActor, messages=0] 12:59:38.890 T [main ] - TestActor activate: TestActor[name=common09, category=TestActor, messages=0] 12:59:38.891 T [main ] - TestActor activate: TestActor[name=common00, category=TestActor, messages=0] 12:59:38.892 T [main ] - TestActor activate: TestActor[name=common01, category=TestActor, messages=0] 12:59:38.893 T [main ] - TestActor activate: TestActor[name=common02, category=TestActor, messages=0] 12:59:38.895 T [main ] - TestActor activate: TestActor[name=common03, category=TestActor, messages=0] 12:59:38.896 T [main ] - TestActor activate: TestActor[name=common04, category=TestActor, messages=0] 12:59:38.897 T [main ] - TestActor activate: TestActor[name=actor24, category=default, messages=0] 12:59:38.899 T [main ] - TestActor activate: TestActor[name=actor11, category=default, messages=0] 12:59:38.904 T [main ] - TestActor activate: TestActor[name=actor23, category=default, messages=0] 12:59:38.905 T [main ] - TestActor activate: TestActor[name=actor10, category=default, messages=0] 12:59:38.906 T [main ] - TestActor activate: TestActor[name=actor22, category=default, messages=0] 12:59:38.907 T [main ] - TestActor activate: TestActor[name=actor13, category=default, messages=0] 12:59:38.908 T [main ] - TestActor activate: TestActor[name=actor21, category=default, messages=0] 12:59:38.909 T [main ] - TestActor activate: TestActor[name=actor12, category=default, messages=0] 12:59:38.910 T [main ] - TestActor activate: TestActor[name=actor20, category=default, messages=0] 12:59:38.911 T [main ] - TestActor activate: TestActor[name=actor02, category=default, messages=0] 12:59:38.912 T [main ] - TestActor activate: TestActor[name=actor01, category=default, messages=0] 12:59:38.914 T [main ] - TestActor activate: TestActor[name=actor00, category=default, messages=0] 12:59:38.915 T [main ] - TestActor activate: TestActor[name=actor19, category=default, messages=0] 12:59:38.916 T [main ] - TestActor activate: TestActor[name=actor06, category=default, messages=0] 12:59:38.917 T [main ] - TestActor activate: TestActor[name=actor18, category=default, messages=0] 12:59:38.917 T [main ] - TestActor activate: TestActor[name=actor05, category=default, messages=0] 12:59:38.918 T [main ] - TestActor activate: TestActor[name=actor04, category=default, messages=0] 12:59:38.919 T [main ] - TestActor activate: TestActor[name=actor03, category=default, messages=0] 12:59:38.920 T [main ] - TestActor activate: TestActor[name=actor15, category=default, messages=0] 12:59:38.921 T [main ] - TestActor activate: TestActor[name=actor14, category=default, messages=0] 12:59:38.922 T [main ] - TestActor activate: TestActor[name=actor09, category=default, messages=0] 12:59:38.923 T [main ] - TestActor activate: TestActor[name=actor17, category=default, messages=0] 12:59:38.923 T [main ] - TestActor activate: TestActor[name=actor08, category=default, messages=0] 12:59:38.924 T [main ] - TestActor activate: TestActor[name=actor16, category=default, messages=0] 12:59:38.925 T [main ] - TestActor activate: TestActor[name=actor07, category=default, messages=0] 12:59:38.926 T [main ] - main waiting: 120... 12:59:42.970 T [actor3 ] - TestActor:common06 init(4): TestActor[name=common06, category=TestActor, messages=0] 12:59:43.028 T [actor2 ] - TestActor:common03 init(8): TestActor[name=common03, category=TestActor, messages=0] 12:59:43.048 T [actor1 ] - TestActor:common01 init(6): TestActor[name=common01, category=TestActor, messages=0] 12:59:43.054 T [actor8 ] - TestActor:common05 init(4): TestActor[name=common05, category=TestActor, messages=0] 12:59:43.064 T [actor6 ] - TestActor:common09 init(8): TestActor[name=common09, category=TestActor, messages=0] 12:59:43.873 T [actor0 ] - TestActor:common02 init(7): TestActor[name=common02, category=TestActor, messages=0] 12:59:43.898 T [actor9 ] - TestActor:common08 init(1): TestActor[name=common08, category=TestActor, messages=0] 12:59:43.993 T [actor7 ] - TestActor:common04 init(2): TestActor[name=common04, category=TestActor, messages=1] 12:59:43.994 T [actor5 ] - TestActor:common00 init(1): TestActor[name=common00, category=TestActor, messages=0] 12:59:43.996 W [actor2 ] - init:common03 to is null: actor5 12:59:44.039 W [actor8 ] - init:common05 to is null: actor9 12:59:44.052 W [actor1 ] - init:common01 to is null: actor9 12:59:44.060 T [actor4 ] - TestActor:common07 init(5): TestActor[name=common07, category=TestActor, messages=0] 12:59:44.912 W [actor0 ] - init:common02 to is null: actor6 12:59:44.968 W [actor5 ] - init:common00 to is null: actor9 12:59:44.986 W [actor3 ] - init:common06 to is null: actor1 12:59:44.986 W [actor7 ] - init:common04 to is null: actor9 12:59:45.108 W [actor4 ] - init:common07 to is null: actor8 12:59:45.947 T [actor9 ] - TestActor:actor13 init(7): TestActor[name=actor13, category=default, messages=0] 12:59:45.951 T [actor5 ] - TestActor:actor22 init(1): TestActor[name=actor22, category=default, messages=1] 12:59:45.976 W [actor3 ] - init:common06 to is null: actor1 12:59:46.016 W [actor2 ] - init:common03 to is null: actor6 12:59:46.052 W [actor1 ] - init:common01 to is null: actor6 12:59:46.115 W [actor4 ] - init:common07 to is null: actor9 12:59:46.932 W [actor3 ] - init:common06 to is null: actor5 12:59:46.940 W [actor5 ] - init:actor22 to is null: actor3 12:59:47.039 T [actor7 ] - TestActor:actor24 init(8): TestActor[name=actor24, category=default, messages=1] 12:59:47.042 W [actor1 ] - init:common01 to is null: actor6 12:59:47.059 W [actor2 ] - init:common03 to is null: actor0 12:59:47.904 T [actor8 ] - TestActor:actor21 init(7): TestActor[name=actor21, category=default, messages=1] 12:59:47.964 T [actor3 ] - TestActor:actor10 init(4): TestActor[name=actor10, category=default, messages=0] 12:59:47.988 T [actor5 ] - TestActor:actor23 init(7): TestActor[name=actor23, category=default, messages=2] 12:59:48.003 W [actor6 ] - init:common09 to is null: actor3 12:59:48.042 W [actor1 ] - init:common01 to is null: actor6 12:59:48.072 W [actor2 ] - init:common03 to is null: actor8 12:59:48.101 W [actor4 ] - init:common07 to is null: actor6 12:59:48.880 W [actor8 ] - init:actor21 to is null: actor8 12:59:48.903 W [actor9 ] - init:actor13 to is null: actor3 12:59:48.952 T [main ] - main waiting: 110... 12:59:49.024 W [actor2 ] - init:common03 to is null: actor3 12:59:49.037 W [actor6 ] - init:common09 to is null: actor9 12:59:49.085 W [actor4 ] - init:common07 to is null: actor6
源代码打包在附件中见http://zhwj184.iteye.com/admin/blogs/1613351
相关推荐
Actor模型的核心思想是将并发处理中的实体——也就是执行单元——抽象为“Actor”,每个Actor都有自己的状态,并且通过异步消息传递与其他Actor进行通信。这种模型特别适合于多核处理器和分布式系统,因为它能够有效...
该软件是为了解决分布式编程中一系列的编程问题而设计,是非常实用的Java和Scala的Actor模型应用,支持多种运行系统进行使用,且安全性高,操作简单,用起来也是非常的方便的! 1、Actor之间完全独立; 2、Actor由...
1. Akka Actor模型:理解Actor的异步消息传递和并发特性。 2. Akka ActorSystem:创建和配置ActorSystem以支持远程调用。 3. 配置文件:编写`application.conf`以启用远程部署和指定TCP设置。 4. 服务端和客户端...
它利用Actor模型提供并发控制,通过消息传递来构建非阻塞、轻量级的处理单元。 3. **Akka与Scala的集成**:Scala和Akka的结合,使得开发响应式系统更加自然和高效。由于Akka和Scala都是基于JVM的,它们能够利用JVM...
这个"actors-in-java"项目看起来是一个实验性的框架,用于在Java中实现Actor模型,尽管它可能并不适合实际生产环境,而是用于学习和探索Actor模型的工作原理。 Actor模型的核心思想是将计算过程分解为独立的、并发...
多线程、Actor模型、协程和进程是实现并行计算的四种常见方式。 一、多线程(Multi-Threading) 多线程是并发编程的基础,允许一个应用程序同时执行多个任务。在Java、C#等语言中,多线程通过创建和管理线程来实现...
在Actor模型中,Actor是基本的计算实体,它通过发送和接收消息与其他Actor通信。每个Actor是独立的,拥有自己的行为和消息队列,且无法直接访问其他Actor的状态。 **什么是Akka?** Akka是一个建立在Actor模型之上...
在Akka框架中,Actor模型得到了广泛的应用,Akka是一个开源的Java和Scala库,它通过Actor模型提供了一种简单的方法来构建并发和分布式系统。 使用Akka框架时,开发者可以通过定义消息类型来与Actor进行通信。消息是...
它基于Actor模型,该模型将并发问题简化为独立的实体,即Actors,它们通过消息传递进行通信。这种设计允许系统在不共享状态的情况下工作,减少了竞态条件和死锁的风险。 **Scala与Akka Actor** Scala是Akka的主要...
Actor模型是一种并发模型,其中actor是封装了状态和行为的轻量级实体,通过消息传递相互通信。Akka特别适合于在多核处理器上构建高性能、可扩展、容错的应用程序。 在本篇文档中,将会介绍Akka的基本概念和使用场景...
- 可能是用于构建上述游戏服务端的开源框架,提供了ECS和Actor模型的实现,帮助开发者快速搭建游戏后端。 - 框架可能包含核心组件库、系统管理模块、网络通信库、状态管理工具等,方便开发者进行定制化开发。 5. ...
Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言,支持函数式和面向对象编程,与Akka的集成非常紧密。 Akka Actor是Akka的核心组件,它是一种并发原语,...
2. **Actor模型**:在LabVIEW中实现Actor模型,我们需要创建一系列独立运行的VI(Virtual Instruments),每个VI代表一个actor,有自己的工作空间和状态。它们通过事件结构进行通信,相当于消息队列,一个VI发送消息...
在传统的Actor模型中,消息传递是通过继承自`AnyRef`的任意对象进行的,这可能导致类型混淆和运行时错误。Typed Actor通过引入强类型的消息协议,使得Actor之间的交互更为清晰和安全。 2. 创建和配置Typed Actor ...
- Actor模型是一种并发计算模型,其中每个Actor都是一个独立的实体,有自己的状态和行为,通过消息传递进行通信。ProtoActor实现了这种模型,使得开发者可以轻松地处理高并发和分布式场景下的复杂问题。 2. **...
Akka,由Lightbend公司开发,是一个强大的、基于actor模型的框架,它为构建高度可伸缩、容错和反应式的Java及Scala应用程序提供了基础。在本文档中,我们将深入探讨Akka的核心概念和主要功能。 1. **Actor模型**:...