- 浏览: 1065162 次
- 性别:
- 来自: 长沙
文章分类
- 全部博客 (639)
- 服务器配置篇 (58)
- hibernate篇 (14)
- spring篇 (33)
- struts篇 (28)
- JS篇 (46)
- 其他技术篇 (46)
- 数据库集群配置 (6)
- JAVA基础相关 (48)
- 分布式框架HadHoop的应用 (2)
- FLEX篇 (8)
- SQLSERVER技术 (32)
- Android学习 (13)
- amchart学习笔记 (1)
- openfire+smark搭建即时通讯 (9)
- Linux学习 (18)
- Oracle数据库 (15)
- 网站优化技术 (12)
- mysql数据库 (2)
- 项目学习总结 (18)
- 工具类(JAVA) (12)
- 工具类(JS) (2)
- 设计模式 (10)
- Lucene学习 (24)
- EJB3学习 (6)
- Sphinx搜索引擎 (3)
- 工作中用到的软件小工具 (5)
- .NET (49)
- JAVA 连接SQLSERVER2008步骤 (1)
- MongoDB (19)
- Android手机开发 (3)
- Maven (6)
- vue (9)
- Shiro (4)
- mybatis (3)
- netty框架 (1)
- SpringCloud (3)
- spring-cloud (7)
- Git (1)
- dubbo (2)
- springboot (13)
- rocketmq (1)
- git学习 (2)
- kafka服务器 (2)
- linux (10)
- WEB系统辅助项目 (1)
- jenkins (2)
- docker (4)
- influxdb (3)
- python (2)
- nginx (1)
最新评论
-
jiangfuofu555:
这样数据量大,效率怎么样?
sqlserver 实现分页的前台代码 以及后台的sqlserver语句 -
w156445045:
博主请问下,如何做到实时的刷新呢,
另外我后台是Java 谢谢 ...
web 版本的汽车仪表盘,非常好看。还有各种图形 -
jackyin5918:
<transportConnector name=&qu ...
ActiveMQ的activemq.xml详细配置讲解 -
握着橄榄枝的人:
你这个不是spring1.x的吧
spring1.x使用AOP实例 -
xiaophai:
全乱套了!
openfire+spark搭建完美的及时通讯
package com.active;
import java.io.IOException;
import java.util.Arrays;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener, ExceptionListener {
private boolean running;
private Session session;
private Destination destination;
private MessageProducer replyProducer;
private boolean pauseBeforeShutdown;
private boolean verbose = true;
private int maxiumMessages =10;//该参数表示当接受到几条消息以后关闭客户端
private String subject = "topic1";
private boolean topic =true;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private boolean transacted;
private boolean durable =true;//表示是否持久化消息.这个参数要和setClientID方法同时使用
private String clientId="3161";
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private String consumerName = "James";
private long sleepTime;
private long receiveTimeOut;
/*
ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[messagecount] 发送消息数量,默认是10
[messagesize] 消息长度,默认是255
[clientID] durable为true的时候,需要配置clientID
[timeToLive] 消息存活时间
[sleepTime] 发送消息中间的休眠时间
[transacte] 是否采用事务
ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[maxiumMessages] 接受最大消息数量,0表示不限制
[clientID] durable为true的时候,需要配置clientID
[transacte] 是否采用事务
[sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
[receiveTimeOut] 接受超时
*/
public static void main(String[] args) {
ConsumerTool consumerTool = new ConsumerTool();
String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
if (unknown.length > 0) {
System.out.println("Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
consumerTool.run();
}
public void run() {
try {
running = true;
System.out.println("Connecting to URL: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
//首先建立一个连接
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
//使用这个方法的时候就是客户端告诉服务端保留我客户端一个ID的引用.该客户端每次都会以该ID去连接服务器
//如果服务器发送某个消息我客户端不在线,那么服务器将客户端未接受的消息保存,然后当有ID号的客户端上线//后,服务器将把客户断没有接受的消息发送给注册了ID号的客户端
}
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic)destination, consumerName);
//只有TOPIC的方式才有持久化的做法.
} else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void onMessage(Message message) {
//这是监听的方法.客户端通过注册一个监听器.只要服务器发送了消息.客户端就会调用该方法来得到消息.
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
if (verbose) {
String msg = txtMsg.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Received: " + msg);
}
} else {
if (verbose) {
System.out.println("Received: " + message);
}
}
if (message.getJMSReplyTo() != null) {
replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
}
if (transacted) {
session.commit();
} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
} catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
running = false;
}
synchronized boolean isRunning() {
return running;
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
for (int i = 0; i < maxiumMessages && isRunning();) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
Message message;
while ((message = consumer.receive(timeout)) != null) {
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
public void setAckMode(String ackMode) {
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.CLIENT_ACKNOWLEDGE;
}
if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.AUTO_ACKNOWLEDGE;
}
if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
if ("SESSION_TRANSACTED".equals(ackMode)) {
this.ackMode = Session.SESSION_TRANSACTED;
}
}
public void setClientId(String clientID) {
this.clientId = clientID;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public void setMaxiumMessages(int maxiumMessages) {
this.maxiumMessages = maxiumMessages;
}
public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
this.pauseBeforeShutdown = pauseBeforeShutdown;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setReceiveTimeOut(long receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}
发表评论
-
Windows下安装并设置Redis
2017-11-28 14:23 631版权声明:欢迎加入【开源技术交流群】: https:// ... -
由于安全狗导致网站访问出现302重定向问题
2015-08-06 18:14 1707网站开始的时候,碰到这种问题,开始没留意以为是浏览器导致的。后 ... -
windows下使用memcache并修改memcache最大使用内存
2015-02-27 22:42 779将memcache服务器安装包解压到C:\memcached ... -
nginx 日志文件切割
2014-12-13 08:23 835转载:http://www.cnblogs.com/beni ... -
nginx+tomcat7+memcached做集群以及session复制
2014-12-07 22:50 745首先:配置tomcat集群 nginx+ tomcat7 ... -
Memcached入门
2014-11-27 23:17 791Memcached是一个高性能的分布式内存对象缓存系统,用于 ... -
nginx + tomcat 配置,静态资源直接使用nginx
2014-11-26 22:34 978upstream tomcat_server { ... -
Nginx Location配置总结
2014-11-26 22:33 814语法规则: location [=|~|~*|^~] /ur ... -
Memche全站缓存
2014-11-13 17:23 916http://my249645546.iteye.com/bl ... -
煩惱的“java.lang.OutOfMemoryError:PermGen space"異常
2014-11-10 20:48 674最近在做一個jee的項目 ... -
nginx+tomcat实现负载均衡
2014-11-05 22:00 934作者:niumd Blog:http://ari.itey ... -
Tomcat 7优化前及优化后的性能对比
2014-11-05 09:52 1092Tomcat 7在我们日常开发、测试、生产环境都会使用到,但 ... -
resin 加花生壳搭建外网访问服务器
2012-10-08 20:44 1079花生壳发布WEB服务 -
resin 配置session过期
2012-03-23 10:29 1506Session的配置 <session ... -
nginx 解决session共享问题(jvm-route)方式
2012-02-28 13:45 2726前几天看到 http://code.google.com ... -
nginx.conf配置详细说明
2012-02-28 09:15 1316vim conf/nginx.conf#用户 用户组user ... -
nginx 解决session共享问题
2012-02-28 09:19 14481) 不使用session,换作cookie能把sessio ... -
java.lang.NoClassDefFoundError: com/caucho/make/PersistentDependency
2012-02-28 08:09 1773之前一直用resin-pro-3.0.22,这两天由于工作需要 ... -
玩玩负载均衡---在window与linux下配置nginx
2012-02-27 14:29 1421最近有些时间,开始接触负载均衡方面的东西,从硬件F5再到Cit ... -
squid 配置缓存服务器
2011-12-31 13:40 1612Squid for Windows 简介 是大家也许不知道, ...
相关推荐
这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...
这个"apache-activemq-5.16.6-bin.zip"文件包含了ActiveMQ的最新稳定版本5.16.6的二进制发行版,主要用于在各种环境中部署和运行。 **Apache ActiveMQ核心概念** 1. **消息队列(Message Queue)**: 消息队列是...
在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1. **JMS规范**:JMS是Java平台上的标准接口,用于与消息队列交互。它定义了生产者如何发送消息以及消费者如何接收消息的规则。ActiveMQ...
ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持...
5. **主题(Topic)**:支持多播,允许多个消费者同时接收相同的消息。发布/订阅模型,发布者发布消息到主题,多个订阅者可以订阅该主题并接收消息。 6. **消息代理(Message Broker)**:也称为消息中间件,是负责...
2. **核心组件**:ActiveMQ的核心组件包括Broker(消息代理)、Producer(生产者)、Consumer(消费者)、Topic(主题)和Queue(队列)。Broker负责路由和存储消息,生产者发送消息,消费者接收消息。Topic适用于...
- **生产者与消费者**:使用Java或其他语言编写应用程序,通过JMS API创建消息生产和消费逻辑。 - **监控**:通过Web控制台,可以查看消息队列状态,管理消费者,以及监控系统性能。 - **消息策略**:设置消息...
在“apache-activemq-5.12.0-bin”这个压缩包中,包含了运行Apache ActiveMQ所需的所有文件,适用于Windows操作系统。 Apache ActiveMQ作为消息队列的实现,主要功能包括: 1. **消息传输**:ActiveMQ允许应用程序...
4. **客户端连接**:开发者可以通过JMS API或其他支持的协议创建生产者和消费者,与ActiveMQ进行交互。 5. **监控管理**:ActiveMQ内置了一个Web控制台,可以在浏览器中访问`http://localhost:8161/admin`进行管理和...
**接收端** 则需要创建一个消费者来监听队列或主题。对于队列,消费者使用`MessageConsumer`对象接收消息;对于主题,消费者需要先创建一个`MessageListener`,当有新消息到达时,`onMessage()`方法会被调用。 实例...
activemq-web-console的默认使用方式是通过在activemq.xml中导入jetty.xml配置一个jetty server来实现的。其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一...
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
这个压缩包“apache-activemq-5.3.1-bin.tar.gz”是针对Linux/Unix系统的二进制发行版,通常包含运行和管理ActiveMQ所需的所有文件。 **1. ActiveMQ简介** Apache ActiveMQ是一个基于标准的消息中间件,它实现了JMS...
这个压缩包"apache-activemq-5.15.1-bin.tar.gz"是针对Linux操作系统的安装包,包含了运行和管理ActiveMQ所需的所有文件。 首先,让我们详细了解ActiveMQ的基本概念和功能: 1. **消息中间件**:ActiveMQ作为一个...
5. **创建和配置目的地** - 在Web控制台中,可以创建Queue或Topic,并配置相关的消费者和生产者。 6. **停止服务** - 当完成测试或配置后,使用`bin/activemq stop`命令关闭服务。 **在应用开发中使用ActiveMQ:**...
在压缩包`apache-activemq-5.2.0`中,你将找到以下内容: - bin目录:包含了启动和停止ActiveMQ服务器的脚本。 - conf目录:存放配置文件,如activemq.xml,用于配置broker和各种设置。 - lib目录:包含运行ActiveMQ...
9. **集成JMS**:在Java应用中,可以通过导入`javax.jms-1.1.jar`,然后配置相应的JMS提供者(如ActiveMQ、RabbitMQ等),就可以利用JMS API进行消息传递。在Spring框架中,还可以利用Spring JMS模块简化JMS的使用。...
- **Java API**:使用JMS API直接与ActiveMQ交互,创建生产者和消费者实例。 - **其他语言支持**:ActiveMQ支持多种编程语言,如Python、Ruby、C#等,通过相应的客户端库进行集成。 - **Spring框架集成**:Spring...
此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...
而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被广播给所有订阅者。 通过深入研究ActiveMQ的javax.jms源码,我们可以更深入地理解JMS的工作原理,了解如何利用ActiveMQ实现高效...