`
abruzzi
  • 浏览: 452339 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

基于总线模式的消息服务

阅读更多

前言

一直以来,都对异步事件很感兴趣,比如一个应用在运行一个耗时的过程时,最好的方式是提交这个耗时的过程给一个专门的工作线程,然后立即返回到主线程上,进行其他的任务,而工作线程完成耗时任务后,异步的通知主线程,这个过程本身是很有意思的。传统的事件-监听器模型可以较好的解决这个问题,不过事件监听器两者的耦合往往略显紧密,所以需要另一种实现,使得这两者的耦合尽量小,那样模块可以比较通用。

 

总线模式

前几天跟同事讨论了下Swing中的消息机制,同事给我讲了下总线模式的消息机制,感觉很有意思,于是周末就自己实现了下。具体的思路是这样的:

  • 系统中存在一个消息服务(Message Service),即总线
  • 监听器对象,通过实现一个可被通知的对象的接口,将自己注册在消息服务上
  • 可被通知的对象可以向消息总线上post消息,就这个对象而言,它对其他注册在总线上的对象是一无所知的
  • 消息服务进行消息的调度和转发,将消息(事件)发送给指定的对象,从而传递这个异步事件

这个思路最大的好处是,事件被抽象成消息(Message),具有统一的格式,便于传递。挂在总线上的监听器互相不知道对方的存在,监听器可以指定自己感兴趣的消息类型,消息可以是广播的形式,也可以是点对点的。(后来参看了下JMS,其中有pub/sub的模式(即订阅模式),不过,对于异步消息的传递来说,这个可以不必实现)

 

消息服务

消息服务可以将一大堆分布在不同物理机上的应用整合起来,进行通信,可以将一些小的应用整合为一个大的,可用的应用系统

用一个例子来说吧:

public class Test{
	public static void main(String[] args) throws RemoteException{
		/*
		 * 创建一个可被通知的对象(监听器), 这个监听器关注这样几个事件
		 * TIMEOUT, CLOSE, and READY
		 */
		Configuration config = new RMIServerConfiguration(null, 0);
		CommonNotifiableEntry entry1 = 
			new CommonNotifiableEntry(config, "client1", 
				MessageTypes.MESSAGE_TIMEOUT | 
				MessageTypes.MESSAGE_CLOSE | 
				MessageTypes.MESSAGE_READY);
		
		/*
		 * 创建另一个监听器, 这个监听器关注这样几个事件
		 * OPEN, CLOSE, and TIMEOUT.
		 */
		CommonNotifiableEntry entry2 = 
			new CommonNotifiableEntry(config, "client2", 
				MessageTypes.MESSAGE_OPEN | 
				MessageTypes.MESSAGE_CLOSE | 
				MessageTypes.MESSAGE_TIMEOUT);
		
		// 将监听器挂在BUS上
		entry1.register();
		entry2.register();
		
		// 创建一个新的消息, MESSAGE_OPEN类型.
		Message msg = new CommonMessage(
				entry1.getId(),
				entry2.getId(),
				MessageTypes.MESSAGE_OPEN,
				"busying now");
		
		// 传递给entry2
		entry1.post(msg);
		
		// 创建一个MESSAGE_CLICKED类型的消息, entry2
		// 不关注这个类型的消息,所以此消息不会被传递
		Message msgCannotBeReceived = new CommonMessage(
				entry1.getId(),
				entry2.getId(),
				MessageTypes.MESSAGE_CLICKED,
				"cliked evnet");
		entry1.post(msgCannotBeReceived);
		
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// re use the message object to send another message entry
		msg.setSource(entry2.getId());
		msg.setTarget(entry1.getId());
		msg.setType(MessageTypes.MESSAGE_READY);
		msg.setBody("okay now");
		entry2.post(msg);
		
		// 卸载这些监听器,当程序退出,或者
		// 或者监听器不在关注事件发生的时候
		entry1.unregister();
		entry2.unregister();
	}
}

 

当前,这个系统可以支持远程的消息传递(通过java的RMI机制),不过对于寻址方面还没有做进一步的处理,有时间再来完善吧。

 

消息服务的实现

 下面我把消息服务的主要实现部分贴出来分析一下:

/**
 * 
 * @author Abruzzi
 *
 */
public class MessageBus extends UnicastRemoteObject implements Bus{
	private static MessageBus instance;
	private List<NotifiableEntry> listeners;
	private List<Message> messages;
	private Thread daemonThread = null;
	
	public static MessageBus getInstance() throws RemoteException{
		if(instance == null){
			instance = new MessageBus();
		}
		return instance;
	}
	
	private MessageBus() throws RemoteException{
		listeners = new LinkedList<NotifiableEntry>();
		messages = new LinkedList<Message>();
		Daemon daemon = new Daemon();
		daemonThread = new Thread(daemon);
        daemonThread.setPriority(Thread.NORM_PRIORITY + 3);
        daemonThread.setDaemon(true);
        daemonThread.start();
        
        while(!daemonThread.isAlive());
	}
	
	/**
	 * mount notifiable object to listener list
	 */
	public void mount(NotifiableEntry entry) throws RemoteException{
		synchronized(listeners){
			listeners.add(entry);
			listeners.notifyAll();
		}
	}

	/**
	 * unmount the special notifiable object from listener
	 */
	public void unmount(NotifiableEntry entry) throws RemoteException{
		synchronized(listeners){
			listeners.remove(entry);
			listeners.notifyAll();
		}
	}
	
	/**
	 * post a new message into the bus
	 * @param message
	 */
	public void post(Message message) throws RemoteException{
		synchronized(messages){
			messages.add(message);
			messages.notifyAll();
		}
	}
	
	/**
	 * 
	 * @author Abruzzi
	 * worker thread, dispatch message to appropriate listener
	 *
	 */
	private class Daemon implements Runnable{
		private boolean loop = true;
		public void run(){
			while(loop){
				if(messages.size() == 0){
					synchronized(messages){
						try {messages.wait();} 
						catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				processIncomingMessage();
			}
		}
	}
	
	/**
	 * process the incoming message, remove the first message from
	 * queue, and then check all listeners to see whether should 
	 * deliver the message to or not.
	 */
	private void processIncomingMessage(){
		Message msg;
		synchronized(messages){
			msg = messages.remove(0);
		}
		String target = null;
		int type = 0;
		int mask = 0;
		try {
			target = msg.getTarget();
			type = msg.getType();
			if(target == MessageTypes.SENDTOALL){
				for(NotifiableEntry entry : listeners){
					mask = entry.getSense();
					if((mask & type) == type){entry.update(msg);}
				}
			}else{
				for(NotifiableEntry entry : listeners){
					mask = entry.getSense();
					if(entry.getId().equals(target) && (mask & type) == type){
						entry.update(msg);
					}
				}
			}
		} catch (RemoteException e) {
			e.printStackTrace();
		}
	}

}

 

消息总线是一个RMI对象,其中mount(), unmout(), post()等方法可以被远程调用。MessageBus维护两个列表,一个消息列表,一个监听器列表。当消息被post到总线上后,post会立即返回,然后工作线程启动,取出消息并将其分发到合适的监听器上。

可能,对同步的处理上考虑不够周全,下来再继续修改。

 

 

P.S.我将这个项目托管在google code上了,叫BBMS(Bus Based Message Service),感兴趣的可以去看看:http://code.google.com/p/bbms/

分享到:
评论
1 楼 smithsun 2009-07-15  
最近也在研究JMS,哈哈。异步的消息机制是很有意思!

相关推荐

    基于消息中心的对象总线

    【基于消息中心的对象总线】是一种先进的软件设计模式,它允许不同组件或对象在分布式环境中进行通信和协作,而无需直接引用或了解彼此的具体实现。这种模式的核心是“消息中心”,它扮演着中介的角色,接收来自各个...

    基于ActiveMQ的消息总线逻辑与物理架构设计详解

    在实施基于ActiveMQ的消息总线逻辑与物理架构设计时,需要注意消息中间件的核心功能实现、SOA设计原则的遵循、企业应用系统的解耦与服务共享、前端架构设计、消息流转机制以及多种消息队列技术的选择与对比。...

    精通SOA_基于服务总线的Struts+EJB+Web Service整合应用开

    ### 精通SOA:基于服务总线的Struts+EJB+Web Service整合应用开发 #### 第8章 基于IBM SIBus服务总线的SOA架构的网上书店系统实战详解 随着信息技术的发展,面向服务的架构(SOA)已成为构建企业级应用程序的一种...

    基于can总线的电梯设计毕业论文

    - 通信模式:Basic CAN是基本的通信模式,适用于简单系统,论文中提到的研究即基于此模式。 2. **电梯系统与CAN总线结合** - 系统架构:电梯系统由多个子系统构成,如驱动控制、门机控制、楼层显示、安全监控等,...

    精通SOA_基于服务总线的Struts+EJB+Web Service整合应用开发08

    【精通SOA_基于服务总线的Struts+EJB+Web Service整合应用开发08】这一主题涵盖了企业级软件开发中的重要技术栈,包括Service-Oriented Architecture(面向服务架构,SOA)、Struts、EJB(Enterprise JavaBeans)...

    企业服务总线(SOA和ESB简介)

    企业服务总线(SOA和ESB简介) 企业服务总线(Enterprise Service Bus,ESB)是一种架构模型,它可以...企业服务总线(ESB)是一种基于SOA的架构模型,可以实现服务间智能化集成与管理,提高企业的业务效率和灵活性。

    企业服务总线ESB技术设计方案.pdf

    企业服务总线(Enterprise Service Bus,ESB)是一种软件架构模式,旨在提供一个集成的平台,用于集成企业内部的各种应用系统、服务和数据资源。ESB技术设计方案的目的是为了提供一个统一的集成平台,实现企业内部的...

    基于Java的高性能、低时延、高可用zbus服务总线设计源码

    该项目是一款基于Java的高性能、低时延、高可用zbus服务总线设计源码,包含198个文件,涵盖152个Java源文件、...该系统专注于RPC和消息队列服务,适用于构建轻量级服务总线架构,满足高效数据处理和系统间通信的需求。

    基于485总线模式的数据传输协议设计

    在基于485总线模式的数据传输协议设计中,数据通信是通过特定的命令包、数据信息头、纯数据块和主机确认信息这四个部分来实现的。这种协议旨在确保在单片机与主机之间的高效、可靠的数据交换。 1. **ComOutData...

    基于CANoe的can总线通信模拟研究.docx

    3. **消息(Message)**:消息是CAN总线上实际传输的数据帧,包含ID(标识发送节点)、DLC(数据长度)和信号。 4. **环境变量(Environment Variables)**:在CANPL编程和用户界面中,环境变量用于与信号绑定,实现...

    ProophServiceBus轻量级消息总线支持CQRS和微服务

    Prooph Service Bus能够很好地适应这种模式,因为它允许服务间通过消息进行异步通信,减少了服务之间的直接交互,降低了故障传播的可能性。 在实际使用中,Prooph Service Bus提供了以下主要功能: 1. **命令...

    Mule是一个企业服务总线(ESB)消息框架

    Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...

    消息系统总线需求模板.docx

    WebSocket是一种基于HTTP/HTTPS的长连接协议,它提供了全双工的通信通道,使得消息总线服务能够快速响应客户端的请求。这种协议特别适合于需要持续通信的应用场景。 **3. 支持Socket协议(TCP长连接/短连接)** 除了...

    基于SOA架构的电力营销系统分布式服务总线.pdf

    【基于SOA架构的电力营销系统分布式服务总线】 面向服务架构(Service-Oriented Architecture,简称SOA)是一种设计原则,它强调构建独立、可重用的服务,这些服务可以通过网络进行交互,以提供功能给不同的应用...

    基于CAN总线的混合动力电动汽车动力总成控制系统研制

    本项目旨在设计并研制一个基于CAN总线的混合动力电动汽车动力总成控制系统,以实现高效、智能的能源管理和车辆控制。 CAN总线是一种多主站的串行通信网络,具有高可靠性和实时性,特别适合汽车环境中的恶劣条件。在...

    基于AMBA总线的CRC运算核IP验证报告1

    《基于AMBA总线的CRC运算核IP验证报告1》 本报告主要针对基于AMBA总线的CRC(Cyclic Redundancy Check)运算核IP的设计进行验证,以确保其功能的正确性和可靠性。在传统的TestBench仿真方法中,由于受限于手动输入...

    基于现场总线的电梯监控系统设计

    电梯监控系统是现代高层建筑中不可或缺的组成部分,随着科技的发展,基于现场总线技术的电梯监控系统已经成为主流。本文以“基于现场总线的电梯监控系统设计”为主题,重点介绍了利用CAN(Controller Area Network)...

    基于芯片CH375的USB总线和ISA总线转接的实现.pdf

    它不仅支持USB Host模式,也支持USB Device/Slave模式,可以使得ISA总线上的设备通过USB接口与现代PC进行通信。在本文提到的方案中,CH375芯片将会被用于ISA总线到USB总线的转接,使得ISA总线上的老式设备能够通过...

    DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线

    DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线。DeFiBus不仅提供了RPC同步调用,还提供了MQ的异步事件通知、事件组播和广播等常用服务调用和消息模式,同时增加了应用多中心多活、服务...

Global site tag (gtag.js) - Google Analytics