- 浏览: 1589118 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (289)
- java 语法基础 (51)
- spring (8)
- mvc struct /Ant --build.xml (8)
- SOA (0)
- oracle 9i/10g (23)
- sql server 2000-2005 (3)
- 数据库基础知识 (6)
- 设计模式与软件架构 (10)
- Hibernate 持久化 (9)
- J2SE/J2EE/J2ME/AJAX 技术 (8)
- JSF 技术 (3)
- JAVA 图形化 (0)
- JMS (40)
- Eclipse 3.2 IDE 开发技巧 (13)
- 项目处理方法集合 (2)
- html/jsp/javascript (2)
- Unix/Linux (9)
- j2me/ARM/windriver/嵌入式 (4)
- 电信科学 (8)
- jsp (1)
- c/c++ (1)
- LZW压缩算法(java) (2)
- Android (77)
- 版本管理git/svn (2)
最新评论
-
huihai:
有demo吗?
NamingStrategy实现动态表名映射 -
cangbaotu:
推荐给大家一些有用的爬虫源码:https://github.c ...
网络爬虫(源代码参考) -
tuspark:
除了.classpath文件以外,.project文件也应该了 ...
Eclipse .classpath文件浅谈 -
tuspark:
造成eclipse自动关闭的原因有很多,这里有很多介绍:ecl ...
eclipse 自动关闭 解决方案 -
DEMONU:
网上都是这些,这种文章。。。
ActiveMQ中的消息持久性
lib:
jms1.1.jar
activemq-all-5.0.jar
首先启动 activemq.bat或者执行以下代码启动一个broker
消费端
producter:
下面是一个Broker实现,不属于同一个例子!
jms1.1.jar
activemq-all-5.0.jar
首先启动 activemq.bat或者执行以下代码启动一个broker
import org.apache.activemq.broker.BrokerService; /** * This example demonstrates how to run an embedded broker inside your Java code * * @version $Revision: 565003 $ */ public final class EmbeddedBroker { private EmbeddedBroker() { } public static void main(String[] args) throws Exception { BrokerService broker = new BrokerService(); broker.setUseJmx(true); broker.addConnector("tcp://localhost:61616"); broker.start(); // now lets wait forever to avoid the JVM terminating immediately Object lock = new Object(); synchronized (lock) { lock.wait(); } } }
消费端
package com.jms; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; //初始化 private void initialize() throws JMSException, Exception{ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } //消费消息 public void consumeMessage() throws JMSException, Exception{ initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); //开始监听 consumer.setMessageListener(this); //Message message = consumer.receive(); } //关闭连接 public void close() throws JMSException{ System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } public void onMessage(Message message) { try{ if (message instanceof TextMessage){ TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else{ System.out.println("Consumer:->Received: " + message); } } catch (JMSException e){ e.printStackTrace(); } } }
producter:
package com.jms; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; //初始化 private void initialize() throws JMSException, Exception{ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } //发送消息 public void produceMessage(String message) throws JMSException, Exception{ initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } //关闭连接 public void close() throws JMSException{ System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
下面是一个Broker实现,不属于同一个例子!
package com.jms; import java.io.IOException; import java.util.Hashtable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; public class MessageBroker extends HttpServlet { private ConnectionFactory confactory=null; private Connection jmsCon=null; private Destination dest=null; protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { Session session; try { String deptId=null; String deptName=null; deptId=req.getParameter("deptId"); deptName=req.getParameter("deptName"); session = this.jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer msgProducer=session.createProducer(dest); Message textMsg=session.createTextMessage(); textMsg.setStringProperty("deptId", deptId); textMsg.setStringProperty("deptName", deptName); msgProducer.send(textMsg); session.close(); resp.sendRedirect("http://localhost/JmsTestWeb2/"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * */ public void init(ServletConfig config) throws ServletException { super.init(config); try { this.confactory=this.getConnectionFactoryFromLdap(); this.dest=this.getDestinationFromLdap(); this.jmsCon=this.confactory.createConnection(); /** 开启一个会话来创建一个消息消费者,异步监听到来的消息。*/ Session session=jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer msgConsumer=session.createConsumer(this.dest); MessageListenerForOrgMsg msgListener=new MessageListenerForOrgMsg(); msgConsumer.setMessageListener(msgListener); /** 开启另一个会话来创建另外一个消息消费者,异步监听到来的消息。*/ Session session2=jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer msgConsumer2=session2.createConsumer(this.dest); MessageListenerForOrgMsg2 msgListener2=new MessageListenerForOrgMsg2(); msgConsumer2.setMessageListener(msgListener2); this.jmsCon.start(); } catch (Exception e) { e.printStackTrace(); } } /** * * @return * @throws NamingException */ private ConnectionFactory getConnectionFactoryFromLdap() throws NamingException { String account="uid=admin,ou=administrators,ou=topologymanagement,o=netscaperoot";//操作LDAP的帐户。默认就是Admin。 String password="111111" ;//帐户Admin的密码。 String root="ou=jmsstore,dc=xindongfang,dc=com"; //所操作的WLS域。也就是LDAP的根节点的DC Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");//必须这样写,无论用什么LDAP服务器。 env.put(Context.PROVIDER_URL, "ldap://192.168.0.15:2922/" + root);//LDAP服务器的地址:端口。对WLS端口就是7001 env.put(Context.SECURITY_AUTHENTICATION, "simple");//授权界别,可以有三种授权级别,但是如果设为另外两种都无法登录,我也不知道为啥,但是只能设成这个值"none"。 env.put(Context.SECURITY_PRINCIPAL, account );//载入登陆帐户和登录密码 env.put(Context.SECURITY_CREDENTIALS, password); InitialContext ctx=null; ConnectionFactory conFacotry=null; try { ctx = new InitialContext(env);//初始化上下文 conFacotry=(ConnectionFactory)ctx.lookup("cn=topicconfac"); System.out.println("get Connection factory success"); return conFacotry; } finally { if (ctx!=null) ctx.close(); } } /** * * @return * @throws NamingException */ private Destination getDestinationFromLdap() throws NamingException { String account="uid=admin,ou=administrators,ou=topologymanagement,o=netscaperoot";//操作LDAP的帐户。默认就是Admin。 String password="111111" ;//帐户Admin的密码。 String root="ou=jmsstore,dc=xindongfang,dc=com"; //所操作的WLS域。也就是LDAP的根节点的DC Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");//必须这样写,无论用什么LDAP服务器。 env.put(Context.PROVIDER_URL, "ldap://192.168.0.15:2922/" + root);//LDAP服务器的地址:端口。对WLS端口就是7001 env.put(Context.SECURITY_AUTHENTICATION, "simple");//授权类别,可以有三种授权级别,但是如果设为另外两种都无法登录,我也不知道为啥,但是只能设成这个值"none"。 env.put(Context.SECURITY_PRINCIPAL, account );//载入登陆帐户和登录密码 env.put(Context.SECURITY_CREDENTIALS, password); InitialContext ctx=null; Destination dst=null; try { ctx = new InitialContext(env);//初始化上下文 dst=(Destination)ctx.lookup("cn=orgmsg"); System.out.println("get destination success"); return dst; } finally { if (ctx!=null) ctx.close(); } } public void destroy() { if (this.jmsCon!=null) { try { this.jmsCon.close(); } catch (JMSException e) { e.printStackTrace(); } } super.destroy(); } }
发表评论
-
基于jms使用ActiveMQ实现异步日志功能.消息持久到oracle 10g 数据库
2008-09-18 15:05 9933package askyaya.entity;import j ... -
activemq 重新连接的机制
2008-09-18 14:39 23220最近一个项目要用到ActiveMq,并且需要最大程度的保证消息 ... -
ActiveMQ stop inactivity read check
2008-09-17 16:01 10648You can do the following to fix ... -
WSAD环境下JMS异步通信全攻略 (3)
2008-09-11 14:18 32233.5 消息驱动的Bean 在前文讨论JMS消息接收处理逻 ... -
WSAD环境下JMS异步通信全攻略 (2)
2008-09-11 13:58 3771三、JMS P2P编程 在JMS P2P通信方式中,发送程 ... -
WSAD环境下JMS异步通信全攻略 (1)
2008-09-11 13:57 4711一、JMS基本概念 1.1 P2P通信 1.2 Pub ... -
topicpublisher (jms)
2008-09-10 18:55 2919目的地类型JNDI名字连接工厂类型Topic/Queuejav ... -
activemq例子代码 发送BytesMessage消息
2008-09-10 13:55 18380import javax.jms.Connection; im ... -
ActiveMQ in Action(7)
2008-09-10 13:44 3948ActiveMQ in Action(7) 关键字 ... -
ActiveMQ in Action(6)
2008-09-10 13:43 3731ActiveMQ in Action(6) 关键字: acti ... -
ActiveMQ in Action(5)
2008-09-10 13:42 3924ActiveMQ in Action(5) 关键字: acti ... -
ActiveMQ in Action(4)
2008-09-10 13:40 3475ActiveMQ in Action(4) 关键字: acti ... -
ActiveMQ in Action(3)
2008-09-10 13:39 3676ActiveMQ in Action(3) 关键字: acti ... -
ActiveMQ in Action(2)
2008-09-10 13:38 4207ActiveMQ in Action(2) 关键字: acti ... -
ActiveMQ in Action(1)
2008-09-10 13:37 5825ActiveMQ in Action(1) 关键字: acti ... -
为ActiveMQ服务器添加简单验证机制
2008-09-09 23:48 4192为ActiveMQ服务器添加简单验证机制 关键字: Java, ... -
Sender/receiver 消息
2008-09-09 20:28 2123Sender:import java.io.BufferedR ... -
activema.xml 配置
2008-09-09 17:44 3811/***作者:andyao,email:andyaoy@gma ... -
JMX 与系统管理
2008-09-08 10:52 1861Java SE 6 新特性: JMX 与系统管理 ... -
ActiveMQ中的消息持久性
2008-09-05 12:24 3389ActiveMQ中的消息持久性 ActiveMQ很 ...
相关推荐
**AIX平台下Message Broker安装指南** 在IBM的Service-Oriented Architecture (SOA)解决方案中,WebSphere Message Broker(WMB)起着至关重要的作用,它作为一个中间件,负责消息传输、转换和路由。本指南将详细...
在非对称环境下,Message Broker集群的负载均衡是一种策略,旨在优化资源分配,确保在硬件配置不均等的情况下,消息处理的负载能够均匀分布到各个节点上,提高系统的稳定性和性能。WebSphere Message Broker (WMB) ...
Message Broker 连接数据库 Message Broker 是一种消息中间件,能够连接各种数据源,包括关系型数据库、消息队列、文件系统等。在这里,我们将讨论如何使用 Message Broker 连接数据库,包括配置 ODBC 连接、设置 ...
1.2. IBM WEBSPHERE MESSAGE BROKER 技术方案 1 1.2.1. WebSphere Message Broker的特性亮点 2 1.2.2. WebSphere Message Broker的价值 3 1.3. 选择IBM的理由 3 1.3.1. WebSphere Message Broker解决方案的优势 3 ...
在IT行业中,消息中间件(Message Broker)是用于在分布式系统中传递消息的关键组件,它允许应用程序异步通信,解耦各个服务,提高系统的可扩展性和可靠性。本示例主要探讨的是在Message Broker环境中,如何使用...
WebSphere Message Broker 开发和部署最佳实践 WebSphere Message Broker 是一个企业服务总线(ESB),提供了用于各种协议的通用连接以及为使用结构化和非结构化数据的应用程序提供数据转换功能。为了提高消息处理...
《WebSphere Message Broker基础知识》是IBM为WebSphere Message Broker V6提供的一份详尽的指导文档,由Saida Davies、Laura Cowen、Cerys Giddings和Hannah Parker共同编写,于2005年12月发布。这份文档旨在介绍...
### Websphere Message Broker配置总结 #### 一、概述 Websphere Message Broker(以下简称WMB)是一款由IBM开发的企业级消息中间件产品,用于构建高效、可靠的应用集成解决方案。本文将详细介绍WMB 6.1版本中的...
精通WebSphere Message Broker,清晰版本
Oracle Message Broker Administration Guide Release 2.0.1 oracle8 本文档是 Oracle Message Broker Administration Guide Release 2.0.1 的详细指南,适用于 SPARC Solaris 和 Windows NT 平台。该指南主要面向...
### IBM WebSphere Message Broker 命令详解 IBM WebSphere Message Broker(以下简称“Message Broker”)是IBM提供的一款用于企业级消息处理的软件,它能够高效地管理和传递大量的消息数据,支持复杂的消息处理...
【WebSphere Message Broker HttpInput 节点】是IBM企业服务总线(Enterprise Service Bus,ESB)解决方案的一部分,主要用于处理HTTP和HTTPS协议的输入消息。在WebSphere Message Broker(WMB,以前称为WebSphere ...
【IBM WebSphere Message Broker Toolkit 7.0 教程(一)】 IBM WebSphere Message Broker (WMB) 是一个强大的企业服务总线(ESB)解决方案,它为企业提供了一个集成平台,用于处理、路由和转换不同系统之间的消息...
Websphere Message Broker实践,WebSphere MQ Java编程,Message Broker 计时器节点编程模式,MessageBroker TCPIP通信协议,wmb关于ws服务的引用,WMB连接oracle数据库实践,全部组件
Oracle Message Broker 安装指南 Release 2.0.1.0 for Windows NT Oracle Message Broker 是一种消息中间件产品,由 Oracle 公司开发和维护。它提供了一个统一的消息队列管理系统,允许不同的应用程序之间进行消息...
《精通WebSphere Message Broker》-陈宇翔-源代码及相关工具-4482这个压缩包文件主要聚焦于IBM的WebSphere Message Broker(WMB)技术,这是一款强大的企业级消息中间件产品。通过深入理解和掌握WMB,开发者能够构建...
精通Websphere Message Broker,最主要的功能是消息选路、消息传送、消息扩展和发布/订阅。显然,这些功能使WebSphere Message Broker成为实现ESB业务集成的首选