`
dwj147258
  • 浏览: 194726 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

项目中实现JMS消息的发送

阅读更多

   一、JMS简介

       JMS消息可以有效的调动程序中的各种动作,例如,当我们完成一个动作后,我们需要一些程序完成他们自己相应的动作,这时候我们只需要发送一个消息出来,当他们接收到这个消息时,就可以完成自己的事情,是一个很方便的技术,现在用到的JMS消息一般都是通过ActiveMQ来完成,ActIveMQ是一个成熟的框架,可以通过tcp发送JMS,还可以在程序内发送JMS,下面来通过一个实例来介绍。

二、环境搭建

      这个实例是在Spring的基础上完成的,所以需要导入spring的所有jar包,当然还需要导入activemq的jar包,下载activemq以后就可以找到一个activemq-all.jar的jar包,导入即可

三、配置文件简介

      首先application.xml配置文件内容如下

 

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"  
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:jms="http://www.springframework.org/schema/jms"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
        	  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
              http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
              http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
        <!-- 激活组件扫描功能,在包cn.ysh.studio.spring.aop及其子包下面自动扫描通过注解配置的组件 -->
     <context:annotation-config />  
	<context:component-scan base-package="main.java.com"/>
	<!-- 激活自动代理功能 -->
	<aop:aspectj-autoproxy proxy-target-class="true"/>
	<import resource="jms_client.xml"/>
  </beans>

 这里的spring就是完成了组件扫描以及aop代理的一些配置,然后就是导入配置activemq的配置文件,如下

 

 

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"  
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:jms="http://www.springframework.org/schema/jms"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
        	  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
              http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
              http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
        <!-- 激活组件扫描功能,在包cn.ysh.studio.spring.aop及其子包下面自动扫描通过注解配置的组件 -->
	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">   
     <property name="config" value="classpath:main/java/conf/ActiveMQConfig.xml" />   
     <property name="start"  value="true" />   
    </bean>    
     <bean id="myamqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">    
         <property name="brokerURL" value="tcp://10.10.11.37:61616"/>    
         <property name="trustedPackages">  
        <list>  
            <value>main.java</value>  
        </list>  
    </property>   
    </bean>  
     <bean id="myconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
        <property name="targetConnectionFactory" ref="myamqConnectionFactory"></property>  
        <property name="sessionCacheSize" value="100" />  
    </bean>
    <bean id="myjmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="false" />
    </bean>

    <bean id="myjmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="true" />
    </bean>
  </beans>

 这里第一步因为不是完整的activemq,是spring嵌入式activemq,所以需要启动一个broker来启动activemq,这里需要用到的是一个另外的activemq配置,内容如下:

 

 

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <bean
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
    <broker useJmx="false" persistent="false"
        xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://10.10.11.37:61616"/>
        </transportConnectors>
    </broker>
</beans>

 启动完broker之后就算是完成了acrivemqmq的启动,接下来就开始定义连接工厂了,连接工厂需要制定brokerURL和信任包(不知道有什么卵用),接着就是吧activemq的连接工厂转换成spring的连接工厂,最后两个就不多说了,一个是队列消息templete一个是主题消息template,都是非常简单的,其中如果我们把tcp改为vm则是在程序内发送JMS消息

 

四、一个服务

     在这里我们需要写一个服务用来注册消息监听器,发送消息 , 先把代码贴出来吧

 

package main.java.com.consumer;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component("msgService")
public class MsgServiceImpl {
	//存放注册了的监听器
	private Map<String ,Object> consumers = new ConcurrentHashMap<String , Object>();
	
	@Autowired
	@Qualifier("myjmsQueueTemplate")
	private JmsTemplate queueTemplate ;
	
	@Autowired
	@Qualifier("myjmsTopicTemplate")
	private JmsTemplate topicTemplate ;
	
	@Autowired
	@Qualifier("myconnectionFactory")
	private ConnectionFactory connectionFactory ;
	
	public void sendMsg(String destation ,final Serializable message){
		if(consumers.containsKey(destation)){
			if(consumers.get(destation) instanceof Queue){
				queueTemplate.send(destation , new MessageCreator() {
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createObjectMessage(message);
					}
				});
				return ;
			}
			if(consumers.get(destation) instanceof Topic){
				topicTemplate.send(destation , new MessageCreator() {
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createObjectMessage(message);
					}
				});
			}
			return ;
		}
	}
	
	public void registerConsumer(Consumer consumer , JMSConstant type){
		try {
			Connection connection = connectionFactory.createConnection() ;
			connection.start();
			Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE) ;
			if(type != null && type.equals(JMSConstant.TYPE_TOPIC)){
				Topic topic = new ActiveMQTopic(consumer.getName()) ;
				MessageConsumer messageConsumer = session.createConsumer(topic) ;
				messageConsumer.setMessageListener(new JmsMessageListener(consumer));
				consumers.put(consumer.getName(), topic) ;
			}else{
				Queue queue = new ActiveMQQueue(consumer.getName()) ;
				MessageConsumer messageConsumer = session.createConsumer(queue) ;
				messageConsumer.setMessageListener(new JmsMessageListener(consumer));
				consumers.put(consumer.getName(), queue) ;
			}
			
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 简单说一下吧,首先把这个类注册到spring中,名字为msgService 然后定义一个map来存放目的地,接着就是开始定义的template和从测测提哦你Factory了这些都不多说,都是开始定义了的东西

第一个方法:

sendMsg

可以看到他的参数有两个,一个为目的地,一个为消息内容  ,如果目的地未注册则返回不发送消息,发送消息我们选择发送Object消息,当然所有的Object实例都需要实现Serializable序列化,然后创建消息发送出去

 

第二个方法
registerConsumer

注册监听器,这里先介绍接口Consumer

package main.java.com.consumer;

import java.io.Serializable;

public interface Consumer {
	public String getName() ;
	
	public void onMessage(Serializable msg);
}

 所有的监听器都需要实现这个接口,

然后自己定义一个JmsMessageListener :

package main.java.com.consumer;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

public class JmsMessageListener implements MessageListener{
	private Consumer consumer ;
	
	public JmsMessageListener(Consumer consumer){
		this.consumer = consumer ;
	}
	@Override
	public void onMessage(Message msg) {
		if(msg != null){
			try{
				consumer.onMessage(createObjectMsg(msg)) ;
			}catch(Exception e){
				e.printStackTrace() ;
			}
		}
	}
	
	private Serializable createObjectMsg(Message msg) throws Exception {
		Serializable message = ((ObjectMessage) msg).getObject() ;
		return message ;
	}
	
}

 这个很简单不多说,这是这个服务两个简单必须的方法、

这样我们只需定义一个监听器,

package main.java.com.consumer;

import java.io.Serializable;

public class JmsConsumer1 implements Consumer{

	@Override
	public String getName() {
		return MsgTypeInfo.TEST.name();
	}

	@Override
	public void onMessage(Serializable msg) {
		System.out.println("JmsConsumer1 收到 消息   "+ msg);
	}
	
}

 然后把它注册到服务,msgService.registerConsumer(new JmsConsumer1(), JMSConstant.TYPE_TOPIC) ;

我们发送一条消息:

msgService.sendMsg(MsgTypeInfo.TEST.name(), "132") ;

就可以看到控制台打印到了接收到的消息。

0
0
分享到:
评论

相关推荐

    编码实现MQ连接池实现JMS消息发送连接管理

    这些提供商提供了实现JMS规范的具体库,使得我们可以通过Java API与其交互。 2. **创建ConnectionFactory**:ConnectionFactory是创建JMS连接的工厂类。在连接池实现中,我们需要一个池化的ConnectionFactory,这...

    Spring发送接收JMS消息

    **Spring与JMS消息传递** 在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中...在实际项目中,结合具体的业务需求,可以灵活选择合适的模式和策略来利用Spring和JMS实现高效的消息通信。

    JMS消息发送及订阅

    在这个主题中,我们将深入探讨JMS消息的发送和订阅,以及如何通过Apache Camel这一集成框架来实现。 **JMS核心概念** 1. **消息**: JMS中的基本单元,它包含了要传递的数据。 2. **生产者**: 创建并发送消息的应用...

    weblogic中使用JMS发送和接受消息

    WebLogic Server是一款由Oracle公司提供的企业级应用服务器,它支持Java Message Service (JMS) 规范,允许在分布式环境中可靠地发送和接收消息。JMS是Java平台上的标准接口,用于实现应用程序间的异步通信。本文将...

    Spring+weblogic9.2发送JMS消息

    本话题主要探讨如何在Spring框架下与WebLogic 9.2集成,实现JMS(Java Message Service)消息的发送。 首先,JMS是一种标准的API,用于在分布式环境中传递消息。通过JMS,应用可以在异步和解耦的方式下进行通信,...

    OSB中JMS配置及队列使用说明

    OSB 中的 JMS 配置及队列使用是实现消息队列的重要手段,本文将详细介绍 OSB 中 JMS 配置及队列使用的步骤。 环境准备 在开始配置 JMS 之前,需要安装 Oracle Service Bus (OSB) 10.3.1 和 Weblogic 服务器。同时...

    J2EE中的JMS 消息服务

    **J2EE中的JMS消息服务** Java Message Service(JMS)是Java平台上的一个标准接口,用于在分布式环境中提供异步的消息传递...在实际项目中,配合使用源码和工具,如`MessageDriverBean`,可以更好地实现JMS的功能。

    java实现的基于jms协议的消息队列中间件,源码!

    1. **消息对象**:实现JMS接口,如`MessageProducer`和`MessageConsumer`,以及各种特定类型的消息如`TextMessage`和`ObjectMessage`。 2. **连接管理**:如何建立和管理与消息服务器的连接,包括连接工厂...

    activemq与spring整合发送jms消息入门实例

    在Java世界中,ActiveMQ和Spring的整合是企业级应用中常见的消息中间件解决方案,用于实现JMS(Java Message Service)消息传递。本教程将深入探讨如何将这两个强大的工具结合在一起,以创建一个简单的发送JMS消息的...

    JMS之Spring +activeMQ实现消息队列

    在本篇讨论中,我们将关注如何利用Spring框架和ActiveMQ来实现JMS消息队列。 首先,我们需要理解Spring框架中的JMS支持。Spring提供了一个强大的抽象层,简化了与各种MQ提供商(如ActiveMQ、RabbitMQ等)的集成。它...

    Spring整合Blazeds实现ActiveMQ JMS消息服务

    标题中的“Spring整合Blazeds实现ActiveMQ JMS消息服务”指的是在Java应用程序中使用Spring框架与Blazeds(一个Flex和Java之间的消息传递中间件)集成,通过ActiveMQ(一个流行的开源JMS提供商)来实现消息队列服务...

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    在这个测试中,你可以看到如何使用Spring JMS API来发送和接收消息,验证ActiveMQ配置的正确性,以及消息传递的有效性。 总的来说,这个示例涵盖了以下关键知识点: 1. ActiveMQ的安装和运行。 2. Spring框架中JMS...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    在本文中,我们将深入探讨如何使用SpringBoot、...通过理解这些技术的工作原理和集成方式,你可以更好地实现后台消费者的监听、生产者的发送,以及消息接口和服务的实现,从而在你的项目中构建出强大的消息处理能力。

    spring+jms+jta事务的消息发送和消息消费

    而JMS(Java Message Service)是Java平台中用于创建、发送、接收和读取消息的标准API,它支持异步通信和解耦应用程序。JTA(Java Transaction API)则是处理分布式事务的标准,能够协调多个资源管理器(如数据库和...

    Spring In Action 使用Spring发送和接收JMS消息

    通过Spring框架,我们可以方便地在应用程序中集成JMS,实现消息的异步处理,提高系统的解耦性和容错性。这不仅有助于提升系统性能,还能使代码更加清晰、易于维护。在实际项目中,结合Spring Boot和ActiveMQ等开源...

    利用soapUI3.5测试JMS消息

    1. **创建JMS测试步骤**:在soapUI项目中,右键点击测试套件或测试步骤,选择“Add New Step” -&gt; “JMS Request”。在这里,你需要配置JMS连接工厂、目的地类型(队列或主题)、目的地名称等参数。 2. **配置JMS...

    springboot整合jms进行邮件的发送

    在本文中,我们将深入探讨如何使用Spring Boot与Java消息服务(JMS)来发送电子邮件,包括文本、HTML、图片和附件。Spring Boot以其强大的依赖管理和自动化配置能力,使得集成各种功能变得异常简单,包括集成JMS进行...

    【JAVA、JMS发送端所需要用到的jar包】

    在Java环境中实现JMS发送端时,通常需要依赖一些特定的jar包。下面我们将详细讨论这些jar包以及它们在JMS发送端中的作用。 1. **JMS API**:这是核心的JMS接口,定义了生产者(发送者)、消费者(接收者)以及消息...

    【JMS接收端需要用到的jar包】

    同时,正确配置JMS连接工厂、目的地(如队列或主题)、以及消费者和生产者的实现是成功接收和发送消息的关键。记得在部署环境中添加这些依赖,并在代码中正确引用,以实现高效、可靠的JMS通信。

Global site tag (gtag.js) - Google Analytics