`

ProducerTool /MessageBroker /getConnectionFactoryF

    博客分类:
  • JMS
阅读更多
lib:
jms1.1.jar
activemq-all-5.0.jar

首先启动 activemq.bat或者执行以下代码启动一个broker
import org.apache.activemq.broker.BrokerService;

/**
 * This example demonstrates how to run an embedded broker inside your Java code
 * 
 * @version $Revision: 565003 $
 */
public final class EmbeddedBroker {

    private EmbeddedBroker() {
    }

    public static void main(String[] args) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(true);
        broker.addConnector("tcp://localhost:61616");
        broker.start();

        // now lets wait forever to avoid the JVM terminating immediately
        Object lock = new Object();
        synchronized (lock) {
            lock.wait();
        }
    }
}



消费端
package com.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool implements MessageListener {
	
	 private String user = ActiveMQConnection.DEFAULT_USER;

	 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

	 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

	 private String subject = "TOOL.DEFAULT";

	 private Destination destination = null;

	 private Connection connection = null;

	 private Session session = null;

	 private MessageConsumer consumer = null;

	 //初始化
	 private void initialize() throws JMSException, Exception{
		 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
		 connection = connectionFactory.createConnection();
		 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		 destination = session.createQueue(subject);
		 consumer = session.createConsumer(destination);
	 }

	 //消费消息
	 public void consumeMessage() throws JMSException, Exception{
		 initialize();
		 connection.start();

		 System.out.println("Consumer:->Begin listening...");
		 //开始监听
		 consumer.setMessageListener(this);
		 //Message message = consumer.receive();
	 }

	 //关闭连接
	 public void close() throws JMSException{
		 System.out.println("Consumer:->Closing connection");
		 if (consumer != null)	 consumer.close();
		 if (session != null)	 session.close();
		 if (connection != null)	 connection.close();
	 }	

	public void onMessage(Message message) {
		try{
			if (message instanceof TextMessage){
				TextMessage txtMsg = (TextMessage) message;
				String msg = txtMsg.getText();
				System.out.println("Consumer:->Received: " + msg);
			} else{
				System.out.println("Consumer:->Received: " + message);
			}
		} catch (JMSException e){
			e.printStackTrace();
		}
	}

}



producter:
package com.jms;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTool {
	
	 private String user = ActiveMQConnection.DEFAULT_USER;

	 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

	 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

	 private String subject = "TOOL.DEFAULT";

	 private Destination destination = null;

	 private Connection connection = null;

	 private Session session = null;

	 private MessageProducer producer = null;	

	 //初始化
	 private void initialize() throws JMSException, Exception{
		 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
		 connection = connectionFactory.createConnection();
		 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		 destination = session.createQueue(subject);
		 producer = session.createProducer(destination);
		 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	 }
	 
	 //发送消息
	 public void produceMessage(String message) throws JMSException, Exception{
		 initialize();
		 TextMessage msg = session.createTextMessage(message);
		 connection.start();
		 System.out.println("Producer:->Sending message: " + message);
		 producer.send(msg);
		 System.out.println("Producer:->Message sent complete!");
	 }
	 
	 //关闭连接
	 public void close() throws JMSException{
		 System.out.println("Producer:->Closing connection");
		 if (producer != null) producer.close();
		 if (session != null) session.close();
		 if (connection != null) connection.close();
	 }	 

}



下面是一个Broker实现,不属于同一个例子!
package com.jms;

import java.io.IOException;
import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class MessageBroker extends HttpServlet
{
	private ConnectionFactory confactory=null;
	private Connection jmsCon=null;
    private Destination dest=null;
	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 
	{
		this.doPost(req, resp);
	}

	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 
	{
		Session session;
		try 
		{
			String deptId=null;
			String deptName=null;
			deptId=req.getParameter("deptId");
			deptName=req.getParameter("deptName");
			session = this.jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE);
			MessageProducer msgProducer=session.createProducer(dest);
			Message textMsg=session.createTextMessage();
			textMsg.setStringProperty("deptId", deptId);
			textMsg.setStringProperty("deptName", deptName);
			msgProducer.send(textMsg);
			session.close();
			resp.sendRedirect("http://localhost/JmsTestWeb2/");
		}
		catch (Exception e)
        {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
    /**
     * 
     */
	public void init(ServletConfig config) throws ServletException 
	{
		super.init(config);
		try 
		{
			this.confactory=this.getConnectionFactoryFromLdap();
			this.dest=this.getDestinationFromLdap();
			this.jmsCon=this.confactory.createConnection();
			/** 开启一个会话来创建一个消息消费者,异步监听到来的消息。*/
			Session session=jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE);
			MessageConsumer msgConsumer=session.createConsumer(this.dest);
			MessageListenerForOrgMsg msgListener=new MessageListenerForOrgMsg();
			msgConsumer.setMessageListener(msgListener);
			/** 开启另一个会话来创建另外一个消息消费者,异步监听到来的消息。*/			 
			Session session2=jmsCon.createSession(false, Session.CLIENT_ACKNOWLEDGE);
			MessageConsumer msgConsumer2=session2.createConsumer(this.dest);
			MessageListenerForOrgMsg2 msgListener2=new MessageListenerForOrgMsg2();
			msgConsumer2.setMessageListener(msgListener2);
			this.jmsCon.start();	         
		} 
		catch (Exception e) 
		{
			e.printStackTrace();
		}
	}
	/**
	 * 
	 * @return
	 * @throws NamingException
	 */
	private ConnectionFactory getConnectionFactoryFromLdap() throws NamingException
	{
		   String account="uid=admin,ou=administrators,ou=topologymanagement,o=netscaperoot";//操作LDAP的帐户。默认就是Admin。
	       String password="111111" ;//帐户Admin的密码。
	       String root="ou=jmsstore,dc=xindongfang,dc=com"; //所操作的WLS域。也就是LDAP的根节点的DC
	       Hashtable env = new Hashtable();
	       env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");//必须这样写,无论用什么LDAP服务器。
	       env.put(Context.PROVIDER_URL, "ldap://192.168.0.15:2922/" + root);//LDAP服务器的地址:端口。对WLS端口就是7001
	       env.put(Context.SECURITY_AUTHENTICATION, "simple");//授权界别,可以有三种授权级别,但是如果设为另外两种都无法登录,我也不知道为啥,但是只能设成这个值"none"。
	       env.put(Context.SECURITY_PRINCIPAL, account );//载入登陆帐户和登录密码
	       env.put(Context.SECURITY_CREDENTIALS, password);
	       InitialContext ctx=null;
	       ConnectionFactory conFacotry=null;
	       try
	       {
	    	   ctx = new InitialContext(env);//初始化上下文
	    	   conFacotry=(ConnectionFactory)ctx.lookup("cn=topicconfac");
	    	   System.out.println("get Connection factory success");
	    	   return conFacotry;
	       }
	      
	       finally
	       {
	    	   if (ctx!=null) ctx.close();   
	       }
	  }
	 /**
	  * 
	  * @return
	  * @throws NamingException
	  */
	  private Destination getDestinationFromLdap() throws NamingException
	  {
		   String account="uid=admin,ou=administrators,ou=topologymanagement,o=netscaperoot";//操作LDAP的帐户。默认就是Admin。
	       String password="111111" ;//帐户Admin的密码。
	       String root="ou=jmsstore,dc=xindongfang,dc=com"; //所操作的WLS域。也就是LDAP的根节点的DC
	       Hashtable env = new Hashtable();
	       env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");//必须这样写,无论用什么LDAP服务器。
	       env.put(Context.PROVIDER_URL, "ldap://192.168.0.15:2922/" + root);//LDAP服务器的地址:端口。对WLS端口就是7001
	       env.put(Context.SECURITY_AUTHENTICATION, "simple");//授权类别,可以有三种授权级别,但是如果设为另外两种都无法登录,我也不知道为啥,但是只能设成这个值"none"。
	       env.put(Context.SECURITY_PRINCIPAL, account );//载入登陆帐户和登录密码
	       env.put(Context.SECURITY_CREDENTIALS, password);
	       InitialContext ctx=null;
	       Destination dst=null;
	       try
	       {
	    	   ctx = new InitialContext(env);//初始化上下文
	    	   dst=(Destination)ctx.lookup("cn=orgmsg");
	    	   System.out.println("get destination success");
	    	   return dst;   
	       }
	       finally
	       {
	    	   if (ctx!=null) ctx.close();
	       }
	   }

	public void destroy() 
	{
		if (this.jmsCon!=null)
		{
			try 
			{
			   this.jmsCon.close();
			} 
			catch (JMSException e) 
			{
   			   e.printStackTrace();
			}
		}
		super.destroy();
	}
}

分享到:
评论

相关推荐

    AIX平台下Message Broker安装指南

    **AIX平台下Message Broker安装指南** 在IBM的Service-Oriented Architecture (SOA)解决方案中,WebSphere Message Broker(WMB)起着至关重要的作用,它作为一个中间件,负责消息传输、转换和路由。本指南将详细...

    非对称环境下Message Broker集群负载均衡

    在非对称环境下,Message Broker集群的负载均衡是一种策略,旨在优化资源分配,确保在硬件配置不均等的情况下,消息处理的负载能够均匀分布到各个节点上,提高系统的稳定性和性能。WebSphere Message Broker (WMB) ...

    Message Broker连接数据库

    Message Broker 连接数据库 Message Broker 是一种消息中间件,能够连接各种数据源,包括关系型数据库、消息队列、文件系统等。在这里,我们将讨论如何使用 Message Broker 连接数据库,包括配置 ODBC 连接、设置 ...

    WebSphere Message Broker QPP0.doc

    1.2. IBM WEBSPHERE MESSAGE BROKER 技术方案 1 1.2.1. WebSphere Message Broker的特性亮点 2 1.2.2. WebSphere Message Broker的价值 3 1.3. 选择IBM的理由 3 1.3.1. WebSphere Message Broker解决方案的优势 3 ...

    Message Broker 中JavaCompute Node 的一个示例

    在IT行业中,消息中间件(Message Broker)是用于在分布式系统中传递消息的关键组件,它允许应用程序异步通信,解耦各个服务,提高系统的可扩展性和可靠性。本示例主要探讨的是在Message Broker环境中,如何使用...

    WebSphere Message Broker 开发和部署最佳实践

    WebSphere Message Broker 开发和部署最佳实践 WebSphere Message Broker 是一个企业服务总线(ESB),提供了用于各种协议的通用连接以及为使用结构化和非结构化数据的应用程序提供数据转换功能。为了提高消息处理...

    WebSphere Message Broker Basics_v6.pdf

    《WebSphere Message Broker基础知识》是IBM为WebSphere Message Broker V6提供的一份详尽的指导文档,由Saida Davies、Laura Cowen、Cerys Giddings和Hannah Parker共同编写,于2005年12月发布。这份文档旨在介绍...

    IBM WebSphere message broker 命令详解

    ### IBM WebSphere Message Broker 命令详解 IBM WebSphere Message Broker(以下简称“Message Broker”)是IBM提供的一款用于企业级消息处理的软件,它能够高效地管理和传递大量的消息数据,支持复杂的消息处理...

    Websphere Message Broker配置总结

    ### Websphere Message Broker配置总结 #### 一、概述 Websphere Message Broker(以下简称WMB)是一款由IBM开发的企业级消息中间件产品,用于构建高效、可靠的应用集成解决方案。本文将详细介绍WMB 6.1版本中的...

    精通WebSphere Message Broker

    精通WebSphere Message Broker,清晰版本

    Oracle Message Broker Administration Guide Release 2.0.1 oracle8

    Oracle Message Broker Administration Guide Release 2.0.1 oracle8 本文档是 Oracle Message Broker Administration Guide Release 2.0.1 的详细指南,适用于 SPARC Solaris 和 Windows NT 平台。该指南主要面向...

    WebSphere Message Broker HttpInput 节点

    【WebSphere Message Broker HttpInput 节点】是IBM企业服务总线(Enterprise Service Bus,ESB)解决方案的一部分,主要用于处理HTTP和HTTPS协议的输入消息。在WebSphere Message Broker(WMB,以前称为WebSphere ...

    IBM WebSphere Message Broker Toolkit 7.0 教程(一)

    【IBM WebSphere Message Broker Toolkit 7.0 教程(一)】 IBM WebSphere Message Broker (WMB) 是一个强大的企业服务总线(ESB)解决方案,它为企业提供了一个集成平台,用于处理、路由和转换不同系统之间的消息...

    Websphere Message Broker实践,WebSphere MQ Java编程

    Websphere Message Broker实践,WebSphere MQ Java编程,Message Broker 计时器节点编程模式,MessageBroker TCPIP通信协议,wmb关于ws服务的引用,WMB连接oracle数据库实践,全部组件

    Oracle Message Broker Installation Guide Release 2.0.1.0 for Win

    Oracle Message Broker 安装指南 Release 2.0.1.0 for Windows NT Oracle Message Broker 是一种消息中间件产品,由 Oracle 公司开发和维护。它提供了一个统一的消息队列管理系统,允许不同的应用程序之间进行消息...

    精通WebSphere Message Broker》-陈宇翔-源代码

    《精通WebSphere Message Broker》-陈宇翔-源代码及相关工具-4482这个压缩包文件主要聚焦于IBM的WebSphere Message Broker(WMB)技术,这是一款强大的企业级消息中间件产品。通过深入理解和掌握WMB,开发者能够构建...

    Websphere Message Broker实例教程

    精通Websphere Message Broker,最主要的功能是消息选路、消息传送、消息扩展和发布/订阅。显然,这些功能使WebSphere Message Broker成为实现ESB业务集成的首选

Global site tag (gtag.js) - Google Analytics