import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.RandomStringUtils;
/**
* @author zhaoqilong
* @version 创建时间:2012-6-7 上午9:16:56
*
*/
public class Test {
private static LinkedBlockingQueue<String> queue =new LinkedBlockingQueue<String>();
// 线程控制开关
private final CountDownLatch latch = new CountDownLatch(1);
//的线程池
private final ExecutorService pool;
//AtomicLong 计数 生产数量
private final AtomicLong output = new AtomicLong(0);
//AtomicLong 计数 销售数量
private final AtomicLong sales = new AtomicLong(0);
//是否停止线程
private final boolean clear;
public Test(boolean clear){
this.pool = Executors.newCachedThreadPool();
this.clear=clear;
}
public void service() throws InterruptedException{
Saler a=new Saler(queue, sales, latch, clear);
pool.submit(a);
Worker w=new Worker(queue, output, latch);
pool.submit(w);
latch.countDown();
}
public static void main(String[] args) {
Test t=new Test(false);
try {
t.service();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
class Saler implements Runnable{
private final LinkedBlockingQueue<String> queue;
private final AtomicLong sales;
private final CountDownLatch latch;
private final boolean clear;
public Saler(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear){
this.queue = queue;
this.sales = sales;
this.latch = latch;
this.clear = clear;
}
public void run() {
try {
latch.await(); // 放闸之前老实的等待着
for (;;) {
sale();
Thread.sleep(500);
}
}catch (InterruptedException e) {
if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程
cleanWarehouse();
} else {
System.out.println("Seller Thread will be interrupted...");
}
}
}
public void sale(){
System.out.println("==取take=");
try {
String item = queue.poll(50, TimeUnit.MILLISECONDS);
System.out.println(item);
if(item!=null){
sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 销售完队列剩余的产品
*/
private void cleanWarehouse() {
try {
while (queue.size() > 0) {
sale();
}
} catch (Exception ex) {
System.out.println("Seller Thread will be interrupted...");
}
}
}
/**
* 生产者
* @author Administrator
*
*/
class Worker implements Runnable{
private LinkedBlockingQueue<String> queue;
private CountDownLatch latch;
private AtomicLong output;
public Worker(){
}
public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch){
this.queue=queue;
this.latch=latch;
this.output=output;
}
public void run() {
try {
latch.await(); // 线程等待
for (;;) {
work();
Thread.sleep(100);
}
}catch (InterruptedException e) {
System.out.println("Worker thread will be interrupted...");
}
}
/**
* 工作
*/
public void work(){
try {
String product=RandomStringUtils.randomAscii(3);
boolean success=queue.offer(product, 100, TimeUnit.MILLISECONDS);
if(success){
output.incrementAndGet();// 可以声明long型的参数获得返回值,作为日志的参数
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
分享到:
相关推荐
下面通过一个具体的例子来说明如何使用 `LinkedBlockingQueue`: ```java import java.util.concurrent.*; public class BlockingQueueTest2 { // 定义一个篮子类 public class Basket { // 初始化篮子的容量...
Java的`BlockingQueue`接口提供了线程安全的数据结构,可以方便地实现这种模式,如`ArrayBlockingQueue`、`LinkedBlockingQueue`等。 再者,反射是Java提供的一种强大功能,允许程序在运行时动态访问和修改类的信息...
描述中的"WorkQueue"很可能就是指的这个阻塞队列,通常使用的是LinkedBlockingQueue。这是一种无界队列,如果线程池中的线程都处于活动状态,新任务会被加入到队列中等待,直到有线程完成任务并返回到线程池中。这种...
`TestBlockingQueue.java`可能包含如何使用`ArrayBlockingQueue`、`LinkedBlockingQueue`等实现线程间的通信和数据交换的示例。 3. **CompletionService**:`CompletionService`提供了一种获取异步任务结果的机制,...
下面是一个基本的创建ThreadPoolExecutor的例子: ```java int corePoolSize = 5; // 核心线程数 int maximumPoolSize = 10; // 最大线程数 long keepAliveTime = 60L; // 空闲线程存活时间 TimeUnit unit = ...
在这个例子中,我们将深入探讨这个概念以及如何在实际编程中实现。 生产者与消费者的模型通常包括两个主要角色:生产者(Producer)和消费者(Consumer)。生产者负责生成数据,而消费者则消耗这些数据。他们共同...
`java.util.concurrent`包下的`BlockingQueue`接口及其实现,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,提供了线程安全的队列操作。这些队列在多线程环境中用于存储待处理的任务,线程从队列中取出任务并执行...
在这个例子中,10个线程不断向队列添加和移除元素,而主线程则持续对队列进行`stream`遍历,寻找第一个空元素。正常情况下,这个过程应该能交替输出"beginscan, I'm still alive"和"finishscan, I'm still alive"。...
8. **线程间的通信**:Java的`BlockingQueue`接口和其实现(如`ArrayBlockingQueue`、`LinkedBlockingQueue`)可以方便地实现线程间的同步和通信,它们是生产者-消费者模型的典型应用。 综上所述,本示例可能通过...
在这个例子中,`QueueService`类维护了一个`LinkedBlockingQueue`,用于存储`ClientThread`对象。`enqueue`方法将新的客户线程加入队列,`dequeue`方法取出并处理队首的客户。`ClientThread`实现了`Runnable`接口,...
Java中实现此模式的常用工具是BlockingQueue,如ArrayBlockingQueue或LinkedBlockingQueue。这些队列具有阻塞特性,当队列满时,生产者会被阻塞,直到有空位;当队列为空时,消费者会被阻塞,直到有新产品。这种设计...
- `workQueue`:用于存储等待执行的任务的队列,不同的队列有不同的策略,如无界队列(LinkedBlockingQueue)、有界队列(ArrayBlockingQueue)和优先级队列(PriorityBlockingQueue)。 - `threadFactory`:创建...
在这个例子中,我们创建了一个容量为10的`LinkedBlockingQueue`作为共享资源。生产者线程不断生产数据并放入队列,消费者线程则持续从队列中取出并消费数据。当队列满时,生产者会被阻塞;当队列空时,消费者会被...
在这个例子中,`LinkedBlockingQueue`是`BlockingQueue`的一个实现,它使用链表结构,并且具有无界容量。生产者线程不断生产数字并将其放入队列,而消费者线程则从队列中取出数字并打印出来。当队列满时,`put`方法...
在这个例子中,`produce()`方法将数据放入队列,而`consume()`方法异步地从队列中取出数据进行处理。`put()`和`take()`方法会自动处理线程间的同步,保证了数据的一致性和安全性。 **应用场景** Spring Boot结合...
在这个例子中,我们使用了`LinkedBlockingQueue`,这是一个无界但有限的队列。`put`方法用于生产者插入元素,如果队列已满,生产者线程会被阻塞直到有空间可用。同样,消费者使用`take`方法取出元素,如果队列为空,...
在Java中,`java.util.concurrent`包提供了多种阻塞队列实现,包括`LinkedBlockingQueue`、`ArrayBlockingQueue`、`PriorityBlockingQueue`和`DelayQueue`。 `ArrayBlockingQueue`是一个有界的并发队列,它在创建时...
Java 5引入了BlockingQueue阻塞队列,提供了一种更安全的线程间通信方式,如ArrayBlockingQueue、LinkedBlockingQueue等。 在"线程池.rar"和"线程实例"这两个文件中,你可以找到关于这些概念的具体示例代码,通过...
在这个例子中,我们创建了一个`LinkedBlockingQueue`作为共享资源,生产者线程不断向队列中添加数据,消费者线程则持续从队列中取出并消费数据。`put()`方法用于生产者向队列添加元素,如果队列已满,生产者会被阻塞...
在这个例子中,我们创建了一个`LinkedBlockingQueue`作为共享资源,然后定义了生产者和消费者的线程。生产者不断生成数字并将其放入队列,消费者则从队列中取出数字并打印。通过`put`和`take`方法,生产者和消费者...