`
y806839048
  • 浏览: 1130548 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

activeMq集成

 
阅读更多

activeMq集成:

mq类似redis安装好了软件之后,就在java端集成,用工厂工具类调用,mq用于消息推送(结合页面的轮询做聊天工具),结合redis缓解高并发压力---来不及处理的放在队列中,

多系统的事务回滚通知

 

pom.xml

<!-- activemq相关依赖 -->

<dependency>

<groupId>javax.jms</groupId>

<artifactId>jms</artifactId>

<version>1.1</version>

</dependency>

<!-- <dependency> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> 

<version>1.2.1</version> </dependency> <dependency> <groupId>com.sun.jdmk</groupId> 

<artifactId>jmxtools</artifactId> <version>1.2.1</version> </dependency> -->

 

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-core</artifactId>

<version>5.1.0</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>3.2.16.RELEASE</version>

</dependency>

<dependency>

<groupId>net.sf.json-lib</groupId>

<artifactId>json-lib</artifactId>

<classifier>jdk15</classifier>

<version>2.4</version>

</dependency>

<!-- activemq相关依赖 Over -->

 

application.xml

 <!-- 引入activemq -->

    <import resource="activemq.xml" />

 

activemq.xml;

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:aop="http://www.springframework.org/schema/aop"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">

 

<bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">

<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->

<!-- UDP传输方式 -->

<property name="brokerURL" value="tcp://10.0.1.126:61616" />

<!-- TCP传输方式 -->

<property name="useAsyncSend" value="true" />

</bean>

 

<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">

<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->

<!-- UDP传输方式需要在activemq上面做配置 -->

<property name="brokerURL" value="tcp://10.0.1.126:61616" />

<!-- TCP传输方式 -->

</bean>

<!-- 定义主题 -->

<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="esteelMessage-mq" />

</bean>

 

<bean id="messageConvertForSys" class="com.esteel.message.mq.MessageConvertForSys" />

 

<!-- TOPIC send jms模板 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="topicSendConnectionFactory" />

<property name="defaultDestination" ref="myTopic" />

<property name="messageConverter" ref="messageConvertForSys" />

<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->

<property name="deliveryMode" value="1" />

<property name="pubSubDomain" value="true" />

<!-- 开启订阅模式 -->

</bean>

<!-- 消息发送方 -->

<bean id="topicSender" class="com.esteel.message.mq.MessageSender">

<property name="jmsTemplate" ref="jmsTemplate" />

</bean>

 

<!-- <bean id="springContextUtil" class="com.esteel.common.SpringContextUtil" /> -->

    

 

<!-- 消息接收方 -->

<bean id="topicReceiver" class="com.esteel.message.mq.MessageReceiver" />

<!-- 主题消息监听容器,一经注册,自动监听 -->

<bean id="listenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="topicListenConnectionFactory" />

<property name="pubSubDomain" value="true" />

<!-- true 订阅模式 -->

<property name="destination" ref="myTopic" />

<!-- 目的地 myTopic -->

<property name="subscriptionDurable" value="true" />

<!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉 -->

<property name="clientId" value="clientId_esteelMessage_1" />

<property name="messageListener" ref="topicReceiver" />

</bean>

<!-- Servlet -->

<!-- <bean id="ControlServlet1" class="com.esteel.servlet.ControlServlet1"> 

<property name="topicSender" ref="topicSender" /> </bean> -->

</beans>  

 

 

自定义发送,接收,转化类:

 

转化:

package com.esteel.message.mq;

 

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Component;

 

public class MessageSender {

 

private JmsTemplate jmsTemplate;

 

public void sendMessage(String msg) {

jmsTemplate.convertAndSend(msg);

}

 

public JmsTemplate getJmsTemplate() {

return jmsTemplate;

}

 

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

 

}

 

 

 

接收:

 

package com.esteel.message.mq;

 

import java.text.SimpleDateFormat;

import java.util.Calendar;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.ObjectMessage;

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

 

import com.esteel.exception.EsteelException;

import com.esteel.message.bean.TbConObj;

import com.esteel.message.bean.TbConOrd;

import com.esteel.message.bean.TbConOrdPrice;

import com.esteel.message.beanVo.TbConOrdVo;

import com.esteel.message.redis.service.RedisService;

import com.esteel.message.service.TbConObjService;

import com.esteel.message.service.TbConOrdPriceService;

import com.esteel.message.service.TbConOrdService;

 

import net.sf.json.JSONObject;

 

@Controller

public class MessageReceiver implements MessageListener {

 

@Autowired

TbConOrdService tbConOrdService;

@Autowired

TbConOrdPriceService tbConOrdPriceService;

@Autowired

TbConObjService tbConObjService;

@Autowired

RedisService redisService;

 

public void onMessage(Message m) {

ObjectMessage om = (ObjectMessage) m;

try {

String key_esteelMessage = om.getStringProperty("key_esteelMessage");

JSONObject object1 = JSONObject.fromObject(key_esteelMessage);

String objectName = (String)object1.get("objectName");

if(objectName.equals("TbConOrdVo")){

JSONObject object2 = (JSONObject) object1.get("object");

TbConOrdVo tbConOrdVo=(TbConOrdVo)JSONObject.toBean(object2, TbConOrdVo.class);

TbConOrd tbConOrd = new TbConOrd();

/*获取ipAddress*/

String ipAddress = (String)object1.get("ipAddress");

/* 从tbConOrdVo中提取tbConOrd */

tbConOrd = copyTbConOrd(tbConOrdVo, tbConOrd, ipAddress);

/* 写入tbConOrd */

tbConOrd = tbConOrdService.insertTbConOrd(tbConOrd);

TbConOrdPrice tbConOrdPrice = new TbConOrdPrice();

tbConOrdPrice = copyTbConOrdPrice(tbConOrd, tbConOrdVo, tbConOrdPrice);

/* 写入聊天文字到 tbConOrdPrice*/

String msgText = tbConOrdPrice.getMsgText();

if (msgText.equals("请录入您的议价留言,最大为300个字符!按Ctrl+Enter提交!")) {

tbConOrdPrice.setMsgText("");

}

//tbConOrd = tbConOrdService.getTbConOrdByObj(tbConOrd);

//tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());

tbConOrdPrice=tbConOrdPriceService.insertTbConOrdPrice(tbConOrdPrice);

/* 还得写入tbConObj的数据 */

TbConObj tbConObj=tbConObjService.getTbConObjById(tbConOrd.getConobjKey());

if(tbConObj.getContradeType().equals("A")){

/*销售单*/

/*设置最低价位*/

if(tbConObj.getHighPrice()==null){

tbConObj.setHighPrice(tbConObj.getOrderPrice());

}

if(tbConOrdPrice.getOrdpriceMan()==null){

/*当前对象是客人*/

tbConObj.setLowPrice(tbConOrd.getOrderPrice());

}else if(tbConOrdPrice.getOrdpriceMan().equals("A")){

/*当前对象是客人*/

tbConObj.setLowPrice(tbConOrd.getOrderPrice());

}else{

/*当前对象是主人*/

tbConObj.setHighPrice(tbConOrd.getOrderPrice());

}

}else{

/*采购单*/

/*设置最低价位*/

if(tbConObj.getLowPrice()==null){

tbConObj.setLowPrice(tbConObj.getOrderPrice());

}

if(tbConOrdPrice.getOrdpriceMan()==null){

/*当前对象是客人*/

tbConObj.setHighPrice(tbConOrd.getOrderPrice());

}else if(tbConOrdPrice.getOrdpriceMan().equals("A")){

/*当前对象是客人*/

tbConObj.setHighPrice(tbConOrd.getOrderPrice());

}else{

/*当前对象是主人*/

tbConObj.setLowPrice(tbConOrd.getOrderPrice());

}

}

tbConObj = tbConObjService.updateTbConObj(tbConObj);

tbConObjService.listClearCache(tbConObj.getConobjKey());

tbConOrdService.listClearCache();

tbConOrdService.listClearCache(tbConObj.getConobjKey(),tbConOrdVo.getTradeCustomerKey());

}

System.out.println("==============MQ Write to Database success============");

} catch (JMSException e) {

e.printStackTrace();

} catch (EsteelException e) {

e.printStackTrace();

}

 

}

 

private TbConOrd getExistTbConOrd(TbConOrd tbConOrd) throws EsteelException{

TbConOrd tbConOrdNew = new TbConOrd();

tbConOrdNew.setConobjKey(tbConOrd.getConobjKey());

tbConOrdNew.setCustomerKey(tbConOrd.getCustomerKey());

TbConOrd tbConOrdNew2 = new TbConOrd();

tbConOrdNew2 = tbConOrdService.getTbConOrdByObj(tbConOrdNew);

if(null==tbConOrdNew2){

return tbConOrdNew2;

}

return tbConOrdNew2;

}

 

private TbConOrd copyTbConOrd(TbConOrdVo tbConOrdVo, TbConOrd tbConOrd, String ipAddress) {

tbConOrd.setConobjKey(tbConOrdVo.getConobjKey());

tbConOrd.setContradeType(tbConOrdVo.getContradeType());

tbConOrd.setCanTradeTypes(tbConOrdVo.getCanTradeTypes());

tbConOrd.setBailBankCode(tbConOrdVo.getBailBankCode());

tbConOrd.setOrderPrice(tbConOrdVo.getOrderPrice());

tbConOrd.setTradeComm(tbConOrdVo.getTradeComm());

tbConOrd.setDealBail(tbConOrdVo.getDealBail());

tbConOrd.setDisTime(Calendar.getInstance().getTime());

tbConOrd.setDisIp(tbConOrdVo.getDisIp());

tbConOrd.setStartTranDate(tbConOrdVo.getStartTranDate());

tbConOrd.setSubsidyInterest(tbConOrdVo.getSubsidyInterest());

tbConOrd.setBackCause(tbConOrdVo.getBackCause());

tbConOrd.setPickType(tbConOrdVo.getPickType());

tbConOrd.setUponRange(tbConOrdVo.getUponRange());

tbConOrd.setAddsubMark(tbConOrdVo.getAddsubMark());

/* 上面的是从Vo中继承过来的 */

/* 下面的是手动添加的 */

tbConOrd.setTradeDate(new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime()));

/*用tradeCustomerKey来替换customerKey*/

tbConOrd.setCustomerKey(tbConOrdVo.getTradeCustomerKey());

tbConOrd.setOrderNum(tbConOrdVo.getNewNum());

tbConOrd.setOrderPrice(tbConOrdVo.getNewPrice());

tbConOrd.setCntPrice(tbConOrdVo.getNewPrice());

tbConOrd.setOrderStatus("A");

tbConOrd.setTradeComm("0");

tbConOrd.setDealBail("0");

tbConOrd.setOrderTime(Calendar.getInstance().getTime());

tbConOrd.setOrderIp(ipAddress);

tbConOrd.setDisTime(tbConOrd.getOrderTime());

tbConOrd.setOrderIp(tbConOrd.getOrderIp());

tbConOrd.setSubsidyInterest("0");

tbConOrd.setMarkchkStatus("A");

tbConOrd.setShouldPayBail("0");

tbConOrd.setShouldPayMoney("0");

tbConOrd.setIsBailPayGoods("Y");

tbConOrd.setPickType("A");

return tbConOrd;

}

 

private TbConOrdPrice copyTbConOrdPrice(TbConOrd tbConOrd, TbConOrdVo tbConOrdVo, TbConOrdPrice tbConOrdPrice) throws EsteelException {

tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());

tbConOrdPrice.setCustomerKey(tbConOrd.getCustomerKey());

tbConOrdPrice.setKfCustomerKey(tbConOrd.getKfCustomerKey());

tbConOrdPrice.setNewNum(tbConOrd.getOrderNum());

tbConOrdPrice.setNewPrice(tbConOrd.getOrderPrice());

tbConOrdPrice.setIsNewprice("Y");

tbConOrdPrice.setTradeComm(tbConOrd.getTradeComm());

tbConOrdPrice.setDealBail(tbConOrd.getDealBail());

tbConOrdPrice.setMsgText(tbConOrdVo.getMsgText());

tbConOrdPrice.setIrdTime(tbConOrd.getOrderTime());

tbConOrdPrice.setOrdpriceType("A");

tbConOrdPrice.setIsLook("N");

tbConOrdPrice.setCdListKeys(tbConOrd.getCdListKeys());

tbConOrdPrice.setStartTranDate(tbConOrd.getStartTranDate());

tbConOrdPrice.setSubsidyInterest(tbConOrd.getSubsidyInterest());

tbConOrdPrice.setUponRange(tbConOrd.getUponRange());

tbConOrdPrice.setAddsubMark(tbConOrd.getAddsubMark());

tbConOrdPrice.setUponRange2(tbConOrd.getUponRange2());

tbConOrdPrice.setAddsubMark2(tbConOrd.getAddsubMark2());

tbConOrdPrice.setLookTimes("0");

tbConOrdPrice.setOrdpriceMan(tbConOrdVo.getOrdpriceMan());

/* 为了取当前的序号,需要获取当前的最新的值,然后+1*/

/* 最后用时间代替了这个number */

tbConOrdPrice.setOrdpriceNo(String.valueOf(System.currentTimeMillis()));

return tbConOrdPrice;

}

 

}

 

 

发送:

package com.esteel.message.mq;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.ObjectMessage;

import javax.jms.Session;

 

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

import org.springframework.stereotype.Component;

 

public class MessageConvertForSys implements MessageConverter {

 

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

System.out.println("sendMessage:" + object.toString());

ObjectMessage objectMessage = session.createObjectMessage();

objectMessage.setStringProperty("key_esteelMessage", object.toString());

return objectMessage;

}

 

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

ObjectMessage objectMessage = (ObjectMessage) message;

return objectMessage.getObjectProperty("key_esteelMessage");

}

 

}

 

 

分享到:
评论

相关推荐

    Spring boot 和内置ActiveMQ集成例子.zip

    将Spring Boot与ActiveMQ集成可以提供强大的消息处理能力,使应用能够解耦组件,提高可扩展性和容错性。 本文将详细讲解如何使用Spring Boot与内置的ActiveMQ进行集成,以及如何通过代码实现这一过程。 首先,我们...

    activemq集成tomcat

    在集成Apache ActiveMQ到Tomcat应用服务器的过程中,我们需要配置多个组件来确保消息传递系统的正常运行。ActiveMQ是一个开源的消息代理,它遵循Java消息服务(JMS)标准,为分布式应用程序提供可靠的异步通信。 ...

    Springboot ActiveMQ 集成.rar

    当我们需要在Spring Boot应用中集成ActiveMQ时,我们可以实现高效、异步的通信机制,提高系统的可扩展性和解耦性。 首先,让我们详细探讨一下Spring Boot与ActiveMQ的集成过程: 1. **配置ActiveMQ** - 在`...

    spring+activeMQ集成

    spring集成activeMQ框架 配置方式(内含三种常见的消息接受监听方式的配置)JMS 配置测试等等

    Spring MVC + JPA + MQ + redis +activemq 集成项目实例

    在本项目实例中,我们探讨的是一个基于Spring MVC、JPA、消息队列MQ以及缓存技术redis和ActiveMQ的集成应用。这个实例涵盖了多种关键的技术栈,旨在提供一个全面的解决方案,帮助开发者构建高效、可扩展的后端系统。...

    activeMQ集成SpringMVC,三种方式监听

    在Spring MVC框架中集成ActiveMQ,可以提升系统的解耦性和可扩展性。以下是集成ActiveMQ到Spring MVC应用的三种常见方式,以及相关知识点的详细说明: 1. **基于XML配置的集成** 在Spring MVC项目中,我们可以通过...

    activemq springMVC集成jar包

    将ActiveMQ集成到Spring MVC应用中,可以提升系统的可扩展性和解耦性。 集成ActiveMQ到Spring MVC主要涉及以下几个关键知识点: 1. **JMS(Java Message Service)**:这是Java平台定义的一个标准接口,用于规范...

    Spring与ActiveMQ整合完整案例

    在IT行业中,Spring框架是Java领域最常用的轻量级应用框架之一,而ActiveMQ则是Apache组织提供的一个开源消息中间件,常用于实现应用程序之间的异步通信。本案例将详细讲解如何将Spring与ActiveMQ整合,以提升系统的...

    ActiveMQ与Tomcat整合教程

    【ActiveMQ与Tomcat整合教程】是关于如何在Apache Tomcat服务器中集成开源消息中间件ActiveMQ的详细步骤。这个教程适用于Tomcat 6.0.14版本,但请注意不同版本可能存在配置上的差异。 首先,为了使Tomcat能够识别和...

    activemq-demo

    【标题】"activemq-demo" 是一个基于Spring框架与Apache ActiveMQ集成的示例项目,旨在展示如何在实际应用中构建一个高效、可靠的消息传递系统。ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循多种消息...

    ActiveMQ路由配置方式

    ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 使用Camel Choice进行配置的方式是通过在Route中使用Choice元素,可以根据不同的条件进行路由选择。例如,以下是一个...

    ActiveMQ客户端

    6. **协议支持**:除了JMS,ActiveMQ还支持STOMP、AMQP、OpenWire等多种协议,使得非Java语言的应用也能方便地与ActiveMQ集成。 7. **网络拓扑**:ActiveMQ可以通过网络连接形成集群,实现高可用性和负载均衡。在...

    ActiveMQ与spring集成实例之使用消息转换器

    **ActiveMQ与Spring集成实例——使用消息转换器** 在企业级应用开发中,消息队列(Message Queue,MQ)作为一种解耦和异步处理的重要工具,被广泛应用。Apache ActiveMQ 是一个开源的消息中间件,它支持多种消息...

    ActiveMQ与spring集成实例

    接下来,我们将讨论Spring与ActiveMQ集成的关键步骤: 1. **引入依赖**:在Spring项目中,首先需要添加ActiveMQ的依赖库。这通常通过Maven或Gradle的配置文件来完成,例如在Maven的pom.xml中添加如下依赖: ```xml...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    activemq实战项目,同ssh框架整合(生产者+消费者)

    - **代码注释**:项目的代码注释对于新手学习非常重要,它们解释了关键步骤和配置,帮助理解如何将ActiveMQ集成进SSH项目。 - **应用场景**:此项目可以作为学习示例,模拟实际业务场景,如订单处理、异步任务调度...

    Spring-ActiveMQ.rar_Spring Activemq_activemq_activemq spring

    而Spring框架,作为一个Java平台的全功能模块化解决方案,提供了与ActiveMQ集成的能力,让开发者能够轻松地在Spring应用中使用消息队列。本篇将深入探讨Spring与ActiveMQ的集成及其配置过程。 首先,理解Spring与...

    分享一些ActiveMQ的资料

    这份文档详细解释了如何将ActiveMQ集成到Tomcat服务器中,这对于构建基于Web的应用程序尤其有用,因为它可以利用消息队列提高系统的可伸缩性和可靠性。 4. **ActiveMQ相关资料.doc**: 这个文档可能包含了更广泛...

    spring-boot-activemq-demo

    在本示例项目"spring-boot-activemq-demo"中,我们关注的是如何将Spring Boot与Apache ActiveMQ集成,以实现高效的消息传递功能。ActiveMQ是Apache软件基金会的一个开源项目,它是Java消息服务(JMS)的实现,提供了...

Global site tag (gtag.js) - Google Analytics