- 浏览: 452339 次
- 性别:
- 来自: 西安
文章分类
最新评论
-
进退取舍:
谢谢,这个用上了!!
Java 一个线程池的示例 -
pb_water:
感谢楼主,打算买楼主的书,支持一下,楼主功德无量
JavaScript内核系列第0版整理稿下载 -
lancezhcj:
有图会直观的多呢,再摸索摸索
有限自动机与建模 -
hsmsyy:
这里应该是原创了吧,楼主我觉得闭包的作用:实现面向对象。有待商 ...
JavaScript内核系列 第7章 闭包 -
wll52:
在应用退出之前,需要释放连接 con.disconnect() ...
使用smack与GTalk通信
前言
一直以来,都对异步事件很感兴趣,比如一个应用在运行一个耗时的过程时,最好的方式是提交这个耗时的过程给一个专门的工作线程,然后立即返回到主线程上,进行其他的任务,而工作线程完成耗时任务后,异步的通知主线程,这个过程本身是很有意思的。传统的事件-监听器模型可以较好的解决这个问题,不过事件和监听器两者的耦合往往略显紧密,所以需要另一种实现,使得这两者的耦合尽量小,那样模块可以比较通用。
总线模式
前几天跟同事讨论了下Swing中的消息机制,同事给我讲了下总线模式的消息机制,感觉很有意思,于是周末就自己实现了下。具体的思路是这样的:
- 系统中存在一个消息服务(Message Service),即总线
- 监听器对象,通过实现一个可被通知的对象的接口,将自己注册在消息服务上
- 可被通知的对象可以向消息总线上post消息,就这个对象而言,它对其他注册在总线上的对象是一无所知的
- 消息服务进行消息的调度和转发,将消息(事件)发送给指定的对象,从而传递这个异步事件
这个思路最大的好处是,事件被抽象成消息(Message),具有统一的格式,便于传递。挂在总线上的监听器互相不知道对方的存在,监听器可以指定自己感兴趣的消息类型,消息可以是广播的形式,也可以是点对点的。(后来参看了下JMS,其中有pub/sub的模式(即订阅模式),不过,对于异步消息的传递来说,这个可以不必实现)
消息服务
消息服务可以将一大堆分布在不同物理机上的应用整合起来,进行通信,可以将一些小的应用整合为一个大的,可用的应用系统。
用一个例子来说吧:
public class Test{ public static void main(String[] args) throws RemoteException{ /* * 创建一个可被通知的对象(监听器), 这个监听器关注这样几个事件 * TIMEOUT, CLOSE, and READY */ Configuration config = new RMIServerConfiguration(null, 0); CommonNotifiableEntry entry1 = new CommonNotifiableEntry(config, "client1", MessageTypes.MESSAGE_TIMEOUT | MessageTypes.MESSAGE_CLOSE | MessageTypes.MESSAGE_READY); /* * 创建另一个监听器, 这个监听器关注这样几个事件 * OPEN, CLOSE, and TIMEOUT. */ CommonNotifiableEntry entry2 = new CommonNotifiableEntry(config, "client2", MessageTypes.MESSAGE_OPEN | MessageTypes.MESSAGE_CLOSE | MessageTypes.MESSAGE_TIMEOUT); // 将监听器挂在BUS上 entry1.register(); entry2.register(); // 创建一个新的消息, MESSAGE_OPEN类型. Message msg = new CommonMessage( entry1.getId(), entry2.getId(), MessageTypes.MESSAGE_OPEN, "busying now"); // 传递给entry2 entry1.post(msg); // 创建一个MESSAGE_CLICKED类型的消息, entry2 // 不关注这个类型的消息,所以此消息不会被传递 Message msgCannotBeReceived = new CommonMessage( entry1.getId(), entry2.getId(), MessageTypes.MESSAGE_CLICKED, "cliked evnet"); entry1.post(msgCannotBeReceived); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // re use the message object to send another message entry msg.setSource(entry2.getId()); msg.setTarget(entry1.getId()); msg.setType(MessageTypes.MESSAGE_READY); msg.setBody("okay now"); entry2.post(msg); // 卸载这些监听器,当程序退出,或者 // 或者监听器不在关注事件发生的时候 entry1.unregister(); entry2.unregister(); } }
当前,这个系统可以支持远程的消息传递(通过java的RMI机制),不过对于寻址方面还没有做进一步的处理,有时间再来完善吧。
消息服务的实现
下面我把消息服务的主要实现部分贴出来分析一下:
/** * * @author Abruzzi * */ public class MessageBus extends UnicastRemoteObject implements Bus{ private static MessageBus instance; private List<NotifiableEntry> listeners; private List<Message> messages; private Thread daemonThread = null; public static MessageBus getInstance() throws RemoteException{ if(instance == null){ instance = new MessageBus(); } return instance; } private MessageBus() throws RemoteException{ listeners = new LinkedList<NotifiableEntry>(); messages = new LinkedList<Message>(); Daemon daemon = new Daemon(); daemonThread = new Thread(daemon); daemonThread.setPriority(Thread.NORM_PRIORITY + 3); daemonThread.setDaemon(true); daemonThread.start(); while(!daemonThread.isAlive()); } /** * mount notifiable object to listener list */ public void mount(NotifiableEntry entry) throws RemoteException{ synchronized(listeners){ listeners.add(entry); listeners.notifyAll(); } } /** * unmount the special notifiable object from listener */ public void unmount(NotifiableEntry entry) throws RemoteException{ synchronized(listeners){ listeners.remove(entry); listeners.notifyAll(); } } /** * post a new message into the bus * @param message */ public void post(Message message) throws RemoteException{ synchronized(messages){ messages.add(message); messages.notifyAll(); } } /** * * @author Abruzzi * worker thread, dispatch message to appropriate listener * */ private class Daemon implements Runnable{ private boolean loop = true; public void run(){ while(loop){ if(messages.size() == 0){ synchronized(messages){ try {messages.wait();} catch (InterruptedException e) { e.printStackTrace(); } } } processIncomingMessage(); } } } /** * process the incoming message, remove the first message from * queue, and then check all listeners to see whether should * deliver the message to or not. */ private void processIncomingMessage(){ Message msg; synchronized(messages){ msg = messages.remove(0); } String target = null; int type = 0; int mask = 0; try { target = msg.getTarget(); type = msg.getType(); if(target == MessageTypes.SENDTOALL){ for(NotifiableEntry entry : listeners){ mask = entry.getSense(); if((mask & type) == type){entry.update(msg);} } }else{ for(NotifiableEntry entry : listeners){ mask = entry.getSense(); if(entry.getId().equals(target) && (mask & type) == type){ entry.update(msg); } } } } catch (RemoteException e) { e.printStackTrace(); } } }
消息总线是一个RMI对象,其中mount(), unmout(), post()等方法可以被远程调用。MessageBus维护两个列表,一个消息列表,一个监听器列表。当消息被post到总线上后,post会立即返回,然后工作线程启动,取出消息并将其分发到合适的监听器上。
可能,对同步的处理上考虑不够周全,下来再继续修改。
P.S.我将这个项目托管在google code上了,叫BBMS(Bus Based Message Service),感兴趣的可以去看看:http://code.google.com/p/bbms/。
发表评论
-
JavaScript内核系列 第15章 服务器端的JavaScript
2012-02-12 21:39 2325第15章已经在icodeit上发布,这一章分为上/下两篇,请朋 ... -
使用vim开发python及graphviz绘图
2011-12-23 14:49 6457基本需求 使用vim中的autocmd命令可以很容易的将正在 ... -
Java脚本技术应用实例
2011-01-22 11:24 4267前言 一直以来都很喜欢可以自由扩展的软件,这一点应该已经在很 ... -
可编程计算器(phoc)的设计与实现
2011-01-17 11:34 1983前言 借助JavaScript脚本 ... -
函数式编程(javascirpt)
2009-04-18 22:18 1264前言 Javascript,有人称 ... -
C和指针
2009-05-21 23:15 1117前言 指针是C的灵魂,正是指针使得C存在了这么多年,而且将长 ... -
C和指针(续)
2009-05-25 23:41 1360前言 上一篇《C和指针》可能对关于C和指针的有些内容没有说透 ... -
有限自动机与建模
2009-06-06 10:48 1786前言 在学校学程序设计语言的时候,能接触到的所有例子没有一个 ... -
事件和监听器
2009-06-21 22:06 1437前言 事件监听器是经 ... -
基于总线的消息服务(BBMS)的设计与实现
2009-07-25 22:19 1364前言 异步事件的通知机制在比较有规模的软件设计中必然会有涉及 ... -
JavaScript内核系列 第9章 函数式的Javascript
2010-05-13 19:20 3785第九章 函数式的Javascript 要说Ja ... -
JavaScript内核系列 第8章 面向对象的JavaScript(下)
2010-05-06 09:40 3672接上篇:JavaScript内核系列 第8章 面向对象的Jav ... -
JavaScript内核系列 第8章 面向对象的JavaScript(上)
2010-05-06 09:26 2906第八章 面向对象的 Javascript ... -
JavaScript内核系列 第7章 闭包
2010-05-04 08:48 3874第七章 闭包 闭包向来给包括JavaScript程序 ... -
JavaScript内核系列 第6章 正则表达式
2010-04-27 19:44 4050第六章 正则表达式 正则表达式是对字符串的结构 ... -
JavaScript内核系列 第5章 数组
2010-04-24 15:17 4530第五章 数组 JavaScript的数组也是一个比较 ... -
Swing小应用(Todo-List)之三
2010-04-22 20:47 2136前言 去年9月份开发的那个小工具sTodo,只是做到了能用, ... -
JavaScript内核系列 第4章 函数
2010-04-18 17:31 5095第四章 函数 函数,在C语言之类的过程式语言中 ... -
JavaScript内核系列 第3章 对象与JSON
2010-04-12 09:12 6133第三章 对象与JSON JavaScript对象与传 ... -
JavaScript内核系列 第2章 基本概念
2010-04-03 19:44 5688第二章 基本概念 ...
相关推荐
【基于消息中心的对象总线】是一种先进的软件设计模式,它允许不同组件或对象在分布式环境中进行通信和协作,而无需直接引用或了解彼此的具体实现。这种模式的核心是“消息中心”,它扮演着中介的角色,接收来自各个...
在实施基于ActiveMQ的消息总线逻辑与物理架构设计时,需要注意消息中间件的核心功能实现、SOA设计原则的遵循、企业应用系统的解耦与服务共享、前端架构设计、消息流转机制以及多种消息队列技术的选择与对比。...
### 精通SOA:基于服务总线的Struts+EJB+Web Service整合应用开发 #### 第8章 基于IBM SIBus服务总线的SOA架构的网上书店系统实战详解 随着信息技术的发展,面向服务的架构(SOA)已成为构建企业级应用程序的一种...
- 通信模式:Basic CAN是基本的通信模式,适用于简单系统,论文中提到的研究即基于此模式。 2. **电梯系统与CAN总线结合** - 系统架构:电梯系统由多个子系统构成,如驱动控制、门机控制、楼层显示、安全监控等,...
【精通SOA_基于服务总线的Struts+EJB+Web Service整合应用开发08】这一主题涵盖了企业级软件开发中的重要技术栈,包括Service-Oriented Architecture(面向服务架构,SOA)、Struts、EJB(Enterprise JavaBeans)...
企业服务总线(SOA和ESB简介) 企业服务总线(Enterprise Service Bus,ESB)是一种架构模型,它可以...企业服务总线(ESB)是一种基于SOA的架构模型,可以实现服务间智能化集成与管理,提高企业的业务效率和灵活性。
企业服务总线(Enterprise Service Bus,ESB)是一种软件架构模式,旨在提供一个集成的平台,用于集成企业内部的各种应用系统、服务和数据资源。ESB技术设计方案的目的是为了提供一个统一的集成平台,实现企业内部的...
该项目是一款基于Java的高性能、低时延、高可用zbus服务总线设计源码,包含198个文件,涵盖152个Java源文件、...该系统专注于RPC和消息队列服务,适用于构建轻量级服务总线架构,满足高效数据处理和系统间通信的需求。
在基于485总线模式的数据传输协议设计中,数据通信是通过特定的命令包、数据信息头、纯数据块和主机确认信息这四个部分来实现的。这种协议旨在确保在单片机与主机之间的高效、可靠的数据交换。 1. **ComOutData...
3. **消息(Message)**:消息是CAN总线上实际传输的数据帧,包含ID(标识发送节点)、DLC(数据长度)和信号。 4. **环境变量(Environment Variables)**:在CANPL编程和用户界面中,环境变量用于与信号绑定,实现...
Prooph Service Bus能够很好地适应这种模式,因为它允许服务间通过消息进行异步通信,减少了服务之间的直接交互,降低了故障传播的可能性。 在实际使用中,Prooph Service Bus提供了以下主要功能: 1. **命令...
Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...
WebSocket是一种基于HTTP/HTTPS的长连接协议,它提供了全双工的通信通道,使得消息总线服务能够快速响应客户端的请求。这种协议特别适合于需要持续通信的应用场景。 **3. 支持Socket协议(TCP长连接/短连接)** 除了...
【基于SOA架构的电力营销系统分布式服务总线】 面向服务架构(Service-Oriented Architecture,简称SOA)是一种设计原则,它强调构建独立、可重用的服务,这些服务可以通过网络进行交互,以提供功能给不同的应用...
本项目旨在设计并研制一个基于CAN总线的混合动力电动汽车动力总成控制系统,以实现高效、智能的能源管理和车辆控制。 CAN总线是一种多主站的串行通信网络,具有高可靠性和实时性,特别适合汽车环境中的恶劣条件。在...
《基于AMBA总线的CRC运算核IP验证报告1》 本报告主要针对基于AMBA总线的CRC(Cyclic Redundancy Check)运算核IP的设计进行验证,以确保其功能的正确性和可靠性。在传统的TestBench仿真方法中,由于受限于手动输入...
电梯监控系统是现代高层建筑中不可或缺的组成部分,随着科技的发展,基于现场总线技术的电梯监控系统已经成为主流。本文以“基于现场总线的电梯监控系统设计”为主题,重点介绍了利用CAN(Controller Area Network)...
它不仅支持USB Host模式,也支持USB Device/Slave模式,可以使得ISA总线上的设备通过USB接口与现代PC进行通信。在本文提到的方案中,CH375芯片将会被用于ISA总线到USB总线的转接,使得ISA总线上的老式设备能够通过...
DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线。DeFiBus不仅提供了RPC同步调用,还提供了MQ的异步事件通知、事件组播和广播等常用服务调用和消息模式,同时增加了应用多中心多活、服务...