`
bartholomew4
  • 浏览: 11707 次
社区版块
存档分类
最新评论

ActiveMQ(五)

阅读更多

今天本篇为ActiveMQQueue基础使用

 

 

    在我看来ActvieMQQueue是其常用的消息发送模式,其应用性比topic远要来的广(大牛勿喷,公司业务、公司行业决定了topic方式在我接触到的项目中使用并不广泛)。

 

 

1.Topicqueue的技术特点对比

 

Topic

Queue

中文全称

发布订阅消息

点对点

有无状态

topic是无状态的并且数据默认不落地。

queue数据默认会在服务器上以文件形式保存,比如Active MQ默认储存在$AMQ_HOME\data\kr-store\data下,亦可配置成DB存储。

完整性保障

不保证发布者发布的每条数据,订阅者都能接受到。

保证每条数据都能被接收者接收。

消息是否会丢失

一般来说发布者发布消息到某一个订阅消息时,只有正在监听该topic地址的订阅者能够接收到消息;如果没有订阅者在监听,该topic就丢失了。

消息发起人发送消息到目标Queue,接收者可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有接收者来取,也不会丢失,直到消息被接收。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个订阅者都能收到发布者发送的消息。订阅者接收完通知服务器

点对点的消息发布接收策略,一个消息发起人发送的消息,只能有一个接受者接收。接收者接收完后,通知服务器已接收,服务器对queue里的消息采取删除或其他操作。

 Topicqueue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

从应用场景上来说topic更适合与电商中的广告推送,广撒传单不必关心是否有人拿到。而Queue更适合用来处理严谨事务,如客户邮件,重要消息发布等需要确认消息抵达的场景。

 

2.(转)效率对比(为之前个人学习抄录并未对来源摘入)

    通过增加监听客户端的并发数来验证,topic的消息推送,是否会因为监听客户端的并发上升而出现明显的下降,测试环境的服务器为ci环境的ActiveMQ,客户端为我的本机。

        从实测的结果来看,topic方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者(线程)并发的前提下,效率差异很明显(由于500线程并发的情况下,我本机的cpu占用率已高达70-90%,所以无法确认是我本机测试造成的性能瓶颈还是topic消息发送方式存在性能瓶颈,造成效率下降如此明显)。

        Topic方式发送的消息与queue方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者并发的前提下,topic方式的效率明显低于queue

        Queue方式发送的消息,在一个订阅者、100个订阅者和500个订阅者的前提下,发送和接收的效率没有明显变化。

Topic实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

101ms

100订阅者

100

10000

103ms

500订阅者

100

50000

14162ms

 

Queue实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

96ms

100订阅者

100

100

96ms

500订阅者

100

100

100ms

 

PS:这份仅供参考吧,在个人看来这份数据并没有太大的说服力,首先对queue来说无论有多少个消息接收者,MQ的消息发送总条数都是以消息发起人发起的条数为准,而topic不同的MQ发送总条数是发布者发布的条数与订阅者个数的乘积。

 

下面就是代码了:

首先是消息发起人(Sender):

public class Sender {
	private static ConnectionFactory connectionFactory;
	private static Connection connection;
	private static Session session;
	private static int total=100;

	public Sender() throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

	public MessageProducer getMessageProducer(String stock, int dMode) throws JMSException {
		MessageProducer producer = session.createProducer(session.createQueue("myQueue"));
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		return producer;
	}

	
	public TextMessage setMessageByText(String text) throws JMSException {
		TextMessage message = session.createTextMessage();
		message.setText(text);
		return message;
	}
	public TextMessage setMessageByMap(Map<String, Object> map) throws JMSException {
		TextMessage message = session.createTextMessage();
		for (String key : map.keySet()) {
			Object o=map.get(key);
			if(o instanceof Integer){
				message.setIntProperty(key, (Integer)o);
			}else if(o instanceof Boolean){
				message.setBooleanProperty(key, (Boolean)o);
			}else if(o instanceof Long){
				message.setLongProperty(key, (Long)o);
			}else if(o instanceof String){
				message.setStringProperty(key, (String)o);
			}else if(o instanceof Double){
				message.setDoubleProperty(key, (Double)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Float){
				message.setFloatProperty(key, (Float)o);
			}
		}
		return message;
	}

	public void sendMessage(MessageProducer producer,TextMessage message) throws JMSException{
		producer.send(message);
	}
	public static void main(String[] args) throws JMSException {
		Sender sender = new Sender();
		MessageProducer producer = sender.getMessageProducer("test", DeliveryMode.NON_PERSISTENT);
		int count=0;
		while (true) {
			TextMessage message;
			if(count%2==0){
				Map<String,Object> map=new HashMap<String,Object>();
				map.put("name", "My message");
				map.put("writer", "Bartholomew");
				map.put("content", "this is ActiveMQ!"+count);
				message= sender.setMessageByMap(map);
				sender.sendMessage(producer,message);
				System.out.println("发送第"+ (++count)+"条信息: " + message.toString());
			}else{
				message=sender.setMessageByText("hello world!"+count);
				sender.sendMessage(producer,message);
				System.out.println("发送第"+ (++count)+"条信息: " + message.getText());
			}
			if(total<=count){
				break;
			}
			
		}
	}
}

 接着是消息接收人的监听类:监听类是根据消息的发送情况来写的,请大家自己修改

public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		String content;
		try {
			content = tm.getText();
			if(content!=null){
				System.out.println("Received message: " + content);
			}else{
				Enumeration<String> pnames =tm.getPropertyNames();
				while(pnames.hasMoreElements()){
					String o = (String)pnames.nextElement();
//					message.getObjectProperty(o);
					System.out.print(o+":"+message.getObjectProperty(o)+",");
				}
				System.out.println();
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

 最后是消息接收人

public class Receiver {
	public static void main(String[] args) throws JMSException {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("myQueue");
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MyMessageListener());
	}
}

 

关于运行,对queue来说并没有谁先跑谁后跑的规定。而且一条消息仅有一个接收人,大家可以随意决定执行顺序。

分享到:
评论

相关推荐

    ActiveMQ客户端

    4. **消息类型**:JMS定义了五种消息类型:TextMessage(文本消息)、ObjectMessage(Java对象)、BytesMessage(字节流)、MapMessage(键值对)和StreamMessage(连续的数据流)。 5. **持久化**:ActiveMQ支持...

    ActiveMQ高并发处理方案

    #### 五、总结 通过上述优化措施,可以显著提升ActiveMQ在高并发场景下的表现。无论是通过连接池管理资源、调整预取策略来实现消息的均衡分配,还是通过垂直扩展和水平扩展来提高系统的整体吞吐量,都是确保...

    activemq

    #### 五、开发前准备 - **安装 ActiveMQ-CPP 及 Winkeemq-cpp**:在开始开发之前,需要先安装这两个库。具体的安装步骤可以参考相关的文档。 - **资源链接**:官方文档和其他资源可以参考 ...

    CentOS安装Activemq图文教程

    五、测试Activemq 输入`http://localhost:8161/admin/`,以测试Activemq是否安装成功。 安装和配置Activemq需要以下步骤: 1. 下载Activemq安装包 2. 解压缩安装包 3. 配置Activemq 4. 设置开机启动 5. 配置端口 ...

    ActiveMQ入门

    #### 五、ActiveMQ 的使用场景 **场景示例**: 1. **订单处理**:电商平台接收到订单后,通过 ActiveMQ 将订单信息发送给库存管理、物流配送等多个下游系统。 2. **日志聚合**:不同服务器生成的日志文件通过 ...

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    五、使用ActiveMQ实践 1. 创建连接工厂:通过JNDI查找或编程方式创建ConnectionFactory。 2. 创建生产者:使用ConnectionFactory创建Connection,再创建Session和Producer。 3. 发送消息:在Producer上调用send方法...

    ActiveMQ快速上手 PDF

    #### 五、ActiveMQ的Transport - **多种传输协议**:ActiveMQ 支持多种传输协议,如 TCP、SSL、NIO、UDP 等,每种协议都有其适用场景。 - **配置和使用**:通过配置文件 `activemq.xml` 中的 `&lt;transportConnectors&gt;...

    activeMq in action 使用activeMq开发JMS的简单讲述

    五、总结 ActiveMQ作为一款强大的消息中间件,提供了丰富的特性和良好的社区支持,是开发JMS应用的理想选择。理解其工作原理和使用方式,有助于构建稳定、高效的分布式系统。通过实际项目中的应用,开发者可以进一步...

    activemq5.5.1 Spring模板

    五、最佳实践与优化 1. 消息确认策略:根据应用场景选择合适的确认模式,例如自动确认、客户端确认或DUPS_OK确认。 2. 消息监听器:使用`MessageListener`接口,可以实现异步接收消息,提高系统性能。 3. 事务支持:...

    ActiveMQ使用入门.pdf

    - **Message**:消息对象,包括消息头(必填)、属性(可选)和消息体(可选),有五种类型:TextMessage、MapMessage、ByteMessage、StreamMessage和ObjectMessage。 3. ActiveMQ的安装与运行 ActiveMQ的安装非常...

    ActiveMq总结.docx

    ### 五、总结 ActiveMQ作为一种成熟稳定的消息中间件,在现代软件架构中扮演着重要的角色。通过合理利用ActiveMQ的功能,可以极大地提高系统的灵活性和可扩展性。特别是在高并发、分布式环境中,ActiveMQ能够帮助...

    ActiveMQ消息总线介绍

    #### 五、ActiveMQ与Spring框架的集成 Spring框架提供了对JMS的支持,使得开发者可以通过简单的配置就能够使用ActiveMQ进行消息通信。具体来说,可以利用Spring的`JmsTemplate`类来简化消息的发送和接收过程。此外...

    activemq新手大全

    **五、activemq整合spring** 整合activemq和spring可以让应用更方便地使用消息队列。Spring提供了一套完整的JMS抽象层,包括ConnectionFactory、Destination和MessageListener等接口,可以简化配置和代码编写。通过...

    消息队列activemq.zip

    五、总结 ActiveMQ作为强大的消息中间件,其灵活性和稳定性使其在复杂的企业级应用中扮演着重要角色。无论是Windows还是Linux环境,都可以轻松部署和管理。理解并熟练运用ActiveMQ,将有助于构建更加健壮、高效的...

    apache-activemq5.10

    **五、应用场景** 1. **解耦应用**:在分布式系统中,ActiveMQ可以作为消息中间件,使得不同的服务之间通过异步通信实现解耦。 2. **流量削峰**:当系统面临高并发请求时,ActiveMQ可以临时存储消息,避免后端服务...

    activemq 入门示例代码

    ### 五、运行与测试 1. 编译并运行 `ActiveMQProducer`,将消息发送到指定的队列或主题。 2. 同时编译并运行 `ActiveMQConsumer`,查看是否能成功接收到消息。 至此,你已经完成了 ActiveMQ 入门示例的构建和运行...

    ActiveMQ使用手册(中文版)

    #### 五、ActiveMQ Broker 的 Transport **5.1 TCP Transport:** - **定义:** 基于TCP协议的传输。 - **配置:** 配置简单,适用于大多数场景。 **5.2 Failover Transport:** - **定义:** 提供故障转移机制的传输...

    apache-activemq-5.5.1

    **五、高级特性** 1. **虚拟主题(Virtual Topics)**:允许广播消息到多个订阅者,每个订阅者看到的是一个独立的逻辑视图。 2. **消息优先级**:消息可以根据优先级进行排序和处理,优先级高的消息先被处理。 3. **...

    activemq的安装包

    五、访问Web控制台 ActiveMQ内置了一个Web控制台,可以通过浏览器访问。默认情况下,它监听在`localhost`的8161端口上,URL为`http://localhost:8161/admin/`。首次登录,默认用户名和密码都是`admin`。 六、配置和...

Global site tag (gtag.js) - Google Analytics