package com.bjsxt.height.design016; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) throws Exception { //内存缓冲区 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); //生产者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消费者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // cachePool.shutdown(); // cachePool.shutdownNow(); } }
package com.bjsxt.height.design016; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable{ private BlockingQueue<Data> queue; public Consumer(BlockingQueue queue){ this.queue = queue; } //随机对象 private static Random r = new Random(); @Override public void run() { while(true){ try { //获取数据 Data data = this.queue.take(); //进行数据处理。休眠0 - 1000毫秒模拟耗时 Thread.sleep(r.nextInt(1000)); System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package com.bjsxt.height.design016; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Provider implements Runnable{ //共享缓存区 private BlockingQueue<Data> queue; //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态 private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //随机对象 private static Random r = new Random(); public Provider(BlockingQueue queue){ this.queue = queue; } @Override public void run() { while(isRunning){ try { //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) Thread.sleep(r.nextInt(1000)); //获取的数据进行累计... int id = count.incrementAndGet(); //比如通过一个getData方法获取了 Data data = new Data(Integer.toString(id), "数据" + id); System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ System.out.println("提交缓冲区数据失败...."); //do something... 比如重新提交 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop(){ this.isRunning = false; } }
package com.bjsxt.height.design016; public final class Data { private String id; private String name; public Data(String id, String name){ this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString(){ return "{id: " + id + ", name: " + name + "}"; } }
相关推荐
说明:本人在给Java游戏开发特训班讲解多线程时,需要说明多线程同步的问题,其中讲解了使用“生产者-消费者”模型来解决同步问题。可是当时特训班的学生不是很明白,特别是不明白这个模型怎么使用。这可能是由于...
"MQGhost"可能是一个具体的MQ测试工具,或者代表了一种模拟MQ服务的行为,用于测试客户端连接和交互的能力。如果MQGhost是一个工具,它可能提供模拟不同MQ服务器行为的功能,帮助开发者在无需实际运行完整MQ服务的...
这个文件名可能是指一个Java示例程序,用于模拟消息在IBM MQ中的往返过程,即生产者发送消息,MQ接收,然后消费者从MQ获取并处理消息。这个程序可能包含创建线程、发送和接收消息的关键代码。 6. **多线程测试**:...
1. **消息发送与接收**:允许用户模拟消息的生产和消费,以验证MQ的正确性。 2. **配置管理**:如描述中提到的`config.xml`文件,用于配置MQ的相关参数,包括连接信息、主题或队列设置等。 3. **监控与调试**:可能...
MQ测试工具的主要作用在于模拟大量并发的消息生产与消费,检查消息队列系统的性能、稳定性和兼容性。这些工具能够帮助开发者和运维人员理解MQ服务在不同负载下的表现,确保其在实际运行环境中能够正常工作。MQ_TEST....
5. **故障模拟**:为了确保系统的健壮性,可以模拟网络中断、消费者宕机等故障情况,检查消息是否会妥善处理,如被重新投递或存储。 6. **性能测试**:评估在高并发场景下,RabbitMQ的吞吐量和消息的延迟,这有助于...
通过JMS,我们可以创建生产者(发送消息)和消费者(接收消息),并可以选择同步或异步模式。在这个场景下,“监听模式”指的是JMS消费者使用的一种异步接收消息的方式。 在监听模式中,消费者会创建一个JMS消息...
在提供的文件列表中,"MQsender"可能是一个用于模拟客户端(生产者)的工具或代码示例,用于发送监控信息到MQ服务器。而"MQreceive"可能是消费者端的实现,用于接收和处理从MQ服务器接收到的消息,用于展示如何实现...
MQ提供异步处理能力,解耦生产者和消费者,使得系统能够更好地扩展和处理高并发。例如,当一个Servlet接收到请求后,可以将处理任务封装成消息发送到MQ,然后由其他服务消费,这样可以提高系统的响应速度和可维护性...
- **消息中间件**:ActiveMQ作为消息中间件,它的主要任务是接收、存储和转发消息,使得生产者和消费者可以解耦,提高系统的可扩展性和可靠性。 - **消息模型**:ActiveMQ支持多种消息模型,如点对点(Queue)和...
6. 与MQ的交互:`FtpUtil`可能需要集成到MQ的生产者或消费者中,监听消息队列,接收上传或下载任务,完成FTP操作后,向MQ发送确认或回执。 7. 安全性:如果涉及敏感数据传输,可能会使用SSL/TLS加密连接,确保数据...
STM32系列由意法半导体(STMicroelectronics)生产,其强大的性能、低功耗和丰富的外设接口使其在物联网、自动化和消费电子等领域有着广泛应用。STM32通常采用Keil uVision或IAR Embedded Workbench等开发环境进行...
在MQ中,生产者将消息发送到消息队列,消费者则在需要时从队列中取出并处理这些消息。这种模式可以降低系统的直接依赖性,提高系统的扩展性和容错性。常见的MQ产品有RabbitMQ、Kafka、RocketMQ等。 结合标签“源码...
在点对点模型中,消息从一个生产者发送到一个队列,然后由一个消费者接收;而在发布/订阅模型中,消息从发布者发送到主题,多个订阅者可以接收这些消息。 在实现过程中,我们通常会使用JMS的API创建...
MQ服务作为中间人,负责存储和转发消息,确保它们能够正确地从生产者到达消费者,即使在生产者和消费者之间存在网络问题或者消费者暂时无法处理消息时。 消息队列有多种模式,如点对点(Point-to-Point,P2P)和...
通过JMS,我们可以创建生产者(发送消息)和消费者(接收消息)对象,实现消息的发送和接收。在LoadRunner的Java脚本中,我们可以通过导入相关的JMS库,创建MQ连接工厂,然后创建连接、会话、目的地(队列或主题)等...
6. **测试**:为了确保消息队列的正确性,项目通常会包含单元测试和集成测试,模拟不同的生产和消费场景,验证消息的正确传输和处理。 7. **文档**:开源项目通常会有详细的README文件,介绍如何搭建、配置和使用该...
- 集成测试:模拟真实环境进行集成测试,确保各个组件协同工作。 - 部署:将应用程序部署到生产环境,并监控其运行状态。 #### 六、最佳实践 1. **性能优化** - 调整消息大小:合理设置消息的大小,避免过大的...
6. 编写测试用例,模拟生产消息和消费消息的场景。 通过这样的实战练习,开发者可以深入理解RabbitMQ如何与SpringBoot结合使用,以及如何利用RabbitMQ实现异步处理、解耦系统组件等功能。实际项目中,还可以根据...
测试mq:测试生产者,模拟消费者接收消息,并验证发出的消息的正确性测试消费者,模拟生产者去发送消息,然后验证被测应该(exchange)能收到性能测试使用perf-test模拟发布者和一些消费者个数docker run -it --rm -...