很多时候我们的程序需要访问数据源取数据,处理完毕后,又要将数据写入数据源,为了解决数据存取问题,一般的做法就是添加数据持久层代码,直接利用DAO对数据进行持久化,或是保存到数据库或是保存到文件或者其他方式。
这是比较流行也比较成熟的做法了,但是有时候我们会发现,刚完成一个项目开发,但是需求发生改变,不得不对代码进行修改。请看如下两个场景:
1. 原来数据是从串口中接过来,程序处理完毕后将结果保存到了数据库里,新需求要求从socket中读取数据,处理结果保存到ActiveMQ(一种消息中间件产品,功能类似数据库,提供数据存储和分发服务)中,这时,不得不对原来的某些代码进行修改。
2. 开发某种工件,这种工件具有一定的通用性,可以随意移植到其他地方,或者与其他系统进行整合,或者是作为其他系统的一个插件功能使用,工件的功能很明确,就是从指定数据源读取数据然后处理,处理结果写回到数据源。这样一来我们在开发这个工件时就必须考虑一个问题,如何获取数据和保存数据?未来应用的场合可能很不同,如果固定了数据读取和写入的方式则势必影响工件的通用性,而且会增大以后的维护成本。
上面的这两个场景所面临的困难其实有很多解决办法,我提供这样一种解决办法:读数据访问层进行抽象,形成独立的接口,程序中定义这些数据访问接口,凡事遇到数据访问问题都调用接口的方法去处理。具体应用时,编写特定环境下的接口实现完成真正的数据访问。
下面是抽象出来的数据访问接口:
/**
*
*/
package test.dataconnector;
/**
* @author
* 数据收发层的数据连接器,负责向某个数据源写入或者读取数据
*/
public interface DataConnector {
/**
* 发送数据
* 如果无法发送数据时抛出异常
* @param data 要发送的数据
* @throws Exception
*/
public void send(byte[] data) throws Exception;
/**
* 发送数据
* 如果无法发送数据时抛出异常
* @param data 要发送的数据
* @param param 附加参数
* @throws Exception
*/
public void send(byte[] data, Object param) throws Exception;
/**
* 接收数据
* 如果没有数据可供读取时则抛出异常
* @return 返回一条消息或者一个传送单位的消息,某些底层实现方式可能要求多次调用此方法才能接收完所有可用消息
* @throws Exception
*/
public byte[] receive() throws Exception;
/**
* 接收数据
* 如果没有数据可供读取时则抛出异常
* @param param 附加参数
* @return 返回一条消息或者一个传送单位的消息,某些底层实现方式可能要求多次调用此方法才能接收完所有可用消息
* @throws Exception
*/
public byte[] receive(Object param) throws Exception;
/**
* 关闭连接器,释放资源
*/
public void close();
/**
* 向连接器注册感兴趣的事件通知
* 注册通知后,一旦感兴趣的事件发生,连接器便主动调用通知处理器的handle方法
* @param type 感兴趣的事件类型
* @param handler 通知处理器
*/
public void registerNotification(EventType type, NotificationHandler handler);
/**
* 通知处理器
*
* @author
*
*/
public interface NotificationHandler{
/**
* 一旦感兴趣的事件发生,连接器便主动调用此方法达到通知的目的
*/
public void handle();
}
/**
* 事件类型
* @author
*
*/
public enum EventType{
/**
* 可读,管道当前有数据可供读取
*/
readable,
/**
* 可写,管道当前可供写入数据
*/
writable
}
/**
* 数据连接器不可用,数据源未准备就绪
* @author
*
*/
public class DataConnectorNotAvailableException extends Exception{
/* (non-Javadoc)
* @see java.lang.Throwable#getMessage()
*/
@Override
public String getMessage() {
return "数据连接器不可用,数据源未准备就绪";
}
}
/**
* 无法从数据源中读取数据
* @author
*
*/
public class CannotReadException extends Exception{
/* (non-Javadoc)
* @see java.lang.Throwable#getMessage()
*/
@Override
public String getMessage() {
return "无法从数据源中读取数据";
}
}
/**
* 无法向数据源中写入数据
* @author
*
*/
public class CannotWriteException extends Exception{
/* (non-Javadoc)
* @see java.lang.Throwable#getMessage()
*/
@Override
public String getMessage() {
return "无法向数据源中写入数据";
}
}
}
一个ActiveMQ的实现:
/**
*
*/
package test.dataconnector.mq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import test.dataconnector.DataConnector;
/**
* @author
* 数据连接器的MQ实现方式
*/
public class MQTopicDataConnector implements DataConnector,MessageListener {
private static Logger logger = LoggerFactory
.getLogger(MQTopicDataConnector.class);//slf4j logging
public MQTopicDataConnector(String url, String defaultTopic){
try {
this.init(url,defaultTopic);
} catch (JMSException e) {
logger.error("无法与MQ建立连接:{}",e.getMessage());
e.printStackTrace();
}
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#receive()
*/
public byte[] receive() throws JMSException, CannotReadException {
byte[] rt = null;
Message message = null;
if (!this.unReadMessage.isEmpty()) {
message = this.unReadMessage.poll();
} else {
throw new CannotReadException();
}
if (message instanceof ObjectMessage){
ObjectMessage obj = (ObjectMessage)message;
rt = (byte[])obj.getObject();
} else {
throw new IllegalStateException("接收到的数据无法转换成字节数组");
}
return rt;
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#registerNotification(test.dataconnector.DataConnector.NotificationType, test.dataconnector.DataConnector.NotificationHandler)
* 都注册到默认topic上
*/
public void registerNotification(EventType type,
NotificationHandler handler) {
switch(type){
case readable:{
this.readableHandler.add(handler);
try {
MessageConsumer consumer = this.getConsumer(this.defaultTopic);
if (this != consumer.getMessageListener()){
consumer.setMessageListener(this);
}
} catch (JMSException e) {
e.printStackTrace();
}
break;
}
case writable:{
throw new UnsupportedOperationException("不支持readable事件类型的通知");
}
default:{
throw new UnsupportedOperationException("不支持此事件类型的通知");
}
}
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#send(byte[])
*/
public void send(byte[] data) throws Exception {
MessageProducer producer = this.getProducer(this.defaultTopic);
ObjectMessage message;
message = session.createObjectMessage(data);
producer.send(message);
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#receive(java.lang.Object)
*/
public byte[] receive(Object param) throws Exception {
throw new UnsupportedOperationException("暂不支持此方法");
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#send(byte[], java.lang.Object)
*/
public void send(byte[] data, Object param) throws Exception {
if (param instanceof String){
MessageProducer producer = this.getProducer((String)param);
ObjectMessage message;
message = session.createObjectMessage(data);
producer.send(message);
} else {
throw new IllegalArgumentException("错误的参数类型,param请指定一个topic名称");
}
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector#close()
*/
public void close() {
try {
connection.close();
logger.info("close connection");
} catch (JMSException e) {
logger.error(e.getMessage());
}
}
/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
try {
this.unReadMessage.put(message);
} catch (InterruptedException e) {
logger.error("缓存从数据源接收到的消息过程中出现异常:{}",e.getMessage());
e.printStackTrace();
}
//通知所有注册了可读事件的通知处理器
for (NotificationHandler handler : this.readableHandler){
handler.handle();
}
}
/**
* 初始化相关参数
* @throws JMSException
*/
private void init(String url, String defaultTopic) throws JMSException{
this.url = url;
this.defaultTopic = defaultTopic;
ConnectionFactory factory = new ActiveMQConnectionFactory(this.url);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("connected");
}
private Destination getDestination(String addName) throws JMSException{
Destination dest = this.addrsMap.get(addName);
if (dest == null){
dest = this.session.createTopic(addName);
this.addrsMap.put(addName, dest);
}
return dest;
}
private MessageProducer getProducer(String addName) throws JMSException{
Destination dest = this.getDestination(addName);
MessageProducer producer = this.producerMap.get(dest);
if (producer == null){
producer = this.session.createProducer(dest);
this.producerMap.put(dest, producer);
}
return producer;
}
private MessageConsumer getConsumer(String addName) throws JMSException{
Destination dest = this.getDestination(addName);
MessageConsumer consumer = this.consumerMap.get(dest);
if (consumer == null){
consumer = this.session.createConsumer(dest);
this.consumerMap.put(dest, consumer);
}
return consumer;
}
/**
* 名称与destination的映射,便于按名称需找目标
*/
private Map<String, Destination> addrsMap = new HashMap<String, Destination>();
private Map<Destination, MessageProducer> producerMap = new HashMap<Destination, MessageProducer>();
private Map<Destination, MessageConsumer> consumerMap = new HashMap<Destination, MessageConsumer>();
private String defaultTopic = "defaultTopic";
/**
* 缓存未被读取的消息
*/
private BlockingQueue<Message> unReadMessage = new LinkedBlockingQueue<Message>();
//jms所需变量
private Session session;
private Connection connection;
//activeMQ所需变量
private String url;
private Set<NotificationHandler> readableHandler = new HashSet<NotificationHandler>();
private Set<NotificationHandler> writableHandler = new HashSet<NotificationHandler>();
}
使用方法:
/**
*
*/
package test.dataconnector.mq;
import test.dataconnector.DataConnector;
import test.dataconnector.DataConnector.EventType;
import test.dataconnector.DataConnector.NotificationHandler;
/**
* @author
*
*/
public class MQTopicDataConnectorTest implements NotificationHandler{
DataConnector connector = new MQTopicDataConnector("tcp://localhost:61616", "defaulTopic");
/**
* @param args
*/
public static void main(String[] args) {
MQTopicDataConnectorTest test = new MQTopicDataConnectorTest();
test.connector.registerNotification(EventType.readable, test);
try {
test.connector.send("hi,this is a test app".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
try {
System.out.println(new String(test.connector.receive()));
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/* (non-Javadoc)
* @see test.dataconnector.DataConnector.NotificationHandler#handle()
*/
public void handle() {
try {
byte[] data = this.connector.receive();
System.out.println("收到数据:" + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}
为了便于测试,编写一个向MQ中发送消息的方法:
/**
*
*/
package test.dataconnector.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendMessage {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
Connection connection = null;
public void sendMessage() throws JMSException {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = (Connection) connectionFactory.createConnection();
connection.start();
Session session = (Session) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("defaulTopic");
MessageProducer producer = session.createProducer(destination);
// TextMessage message = session.createTextMessage(expectedBody);
// message.setStringProperty("headname", "remoteB");
byte[] msg = "HELLO,AMOS".getBytes();
// ObjectMessage ms = session.createObjectMessage();
Message message = session.createObjectMessage(msg);
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close(){
if (this.connection != null){
try {
this.connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SendMessage sndMsg = new SendMessage();
int i = 10;
while(i-- > 0){
try {
sndMsg.sendMessage();
System.out.println("send ok");
} catch (Exception ex) {
System.out.println(ex.toString());
}
}
sndMsg.close();
}
}
以上示例依赖activemq-all-5.5.1.jar包
分享到:
相关推荐
Java通用数据集数据访问框架的设计
在ASP.NET开发中,创建一个通用数据访问类是提高代码复用性和降低维护成本的重要实践。这个类通常封装了数据库连接、SQL语句执行、事务处理等基础操作,使得业务逻辑层无需直接与数据库交互,从而实现解耦。以下是...
访问透明性是指业务应用获取数据时,可以屏蔽分布式调控系统的数据分离性,使得数据访问成为一个整体。传统的电网数据交互模式,如“四遥”通信方式,已不能适应未来电网一体化调度模式的需求。在这种模式下,电网被...
通用数据访问层及Ajax服务端框架源码 这个演示网站的特点: 1. 数据访问全使用了我的通用数据访问层 + 存储过程的实现。 2. 页面使用了大量的Ajax技术,没有任何的直接提交或回传(Submit Form or postback),分页,...
本通用数据访问类库的目的就是抽象出这些细节,提供一个简洁且易于使用的接口,从而减少开发人员编写重复代码的时间。 标题“C#写的通用数据访问类库”表明这个类库是用C#语言编写的,专为简化数据访问任务设计。这...
它的设计目标是实现访问控制策略和机制的分离,提供一个一致且抽象的访问控制接口,支持多种访问控制策略,包括强制访问控制(MAC)和基于角色的访问控制(RBAC)等。 框架的引入允许用户自定义访问控制策略模块,...
**WPF 通用权限开发框架** 是一个专为Windows Presentation Foundation (WPF) 应用程序设计的权限管理解决方案。WPF是微软.NET Framework的一部分,它提供了丰富的用户界面(UI)设计工具和功能,用于构建桌面应用...
【标题】"通用数据访问层及Ajax服务端框架源码" 涉及的主要知识点包括数据访问技术、Ajax服务端框架以及源码分析。 【数据访问技术】:数据访问层(Data Access Layer, DAL)是应用程序中负责与数据库交互的部分,...
标签“三层 框架 数据库开发”揭示了这个框架可能采用典型的三层架构设计,即业务逻辑层(Business Logic Layer)、数据访问层(Data Access Layer)和表示层(Presentation Layer)。这样的架构有助于分离关注点,...
在本示例中,"FishLib-Lab-Demo_20110503"可能是一个包含具体实现的项目,它可能演示了如何结合使用通用数据访问层和Ajax服务端框架。在这样的框架中,服务端可能使用ASP.NET MVC或Web API等技术,为前端提供RESTful...
总的来说,"通用后台管理框架"是一个基于成熟技术栈的解决方案,它为开发者提供了一个快速开发企业级后台系统的起点。通过熟练掌握Spring、SpringMVC和Mybatis,以及MySQL的使用,开发者可以构建出稳定、高效的后台...
井数据格式解编子系统设计一个通用的测井数据格式解编框架。数据访问是测 井数据处理与综合解释软件重要的基础组成部分。确定测井数据的保存格式和 设计数据访问的应用程序接口(API)是开发测井软件面临的首要问题...
"通用数据访问代码"则是指设计用于处理各种数据源的代码库,它可以包括连接管理、查询构建、事务处理、数据序列化等通用功能。这样的代码库有助于提高开发效率,减少重复工作,并确保在不同的项目中保持一致性。 在...
在“可移植数据访问层”这个文件中,可能包含了实现这种通用数据访问层的C#代码或者设计文档。这些资源可以帮助开发者快速搭建一个能够适应多种数据库环境的数据访问层,从而降低项目的开发成本和维护难度。 总结来...
本示例“winform通用权限开发框架”旨在提供一个基础的、可扩展的权限管理解决方案,适用于各种企业级应用。 权限管理是任何复杂系统的核心组成部分,尤其对于多用户、多角色的应用程序而言。通用权限开发框架通常...
在本案例中,我们可以设计一个通用的自定义数据库访问类,封装数据库连接、SQL语句执行等细节,提供统一的接口供上层调用。 在C#中,我们可以利用ADO.NET库来实现数据访问。ADO.NET提供了DataSet、DataTable、...
总的来说,".net 通用权限管理框架"为.NET开发者提供了一个强大的工具,通过深入理解并应用这个框架,可以极大地提高开发效率,减少安全隐患,并使项目更加健壮和可维护。无论是初学者还是经验丰富的开发者,都应该...
简单的理解就是它将我们在数据访问层实现的C#逻辑代码,变为通过 关系数据库与对象的映射,将SQL逻辑放到外部的XML配置文件中,以方便以后的维护。 这个框架有两个主要的组成部分,一个是SQL Maps,另一个是 Data ...