`

并发编程回顾:队列

 
阅读更多

原先多线程并发编程的学习笔记和代码整理一下贴上来。

---------------------------------

队列

可以使用同步队列来解决任务协作问题,同步队列在任意时刻都只允许一个任务插入或移除元素。

同步队列的实现:

1、java.util.concurrent包中的BlockingQueue接口提供了这个队列,且该接口有大量实现,举例如下:

首先定义一个任务Task类,该任务有3步操作:

class Task{
	public enum Status {FIRST,SECOND,THIRD};
	private Status status = Status.FIRST; //default
	private final int id;
	public Task(int id){
		this.id=id;
	}
	public void doFirst() throws Exception{
		System.out.println("taskId="+id+" doFirst!");
		TimeUnit.MILLISECONDS.sleep(1000);//执行第一步操作需要1s
		status=Status.FIRST;
	}
	public void doSecond() throws Exception{
		System.out.println("taskId="+id+" doSecond!");
		TimeUnit.MILLISECONDS.sleep(200);//执行第二步操作需要0.2s
		status=Status.SECOND;
	}
	public void doThird() throws Exception{
		System.out.println("taskId="+id+" doThird!");
		TimeUnit.MILLISECONDS.sleep(2000);//执行第三步操作需要2s
		status=Status.THIRD;
	}
	public Status getStatus(){
		return status;
	}
	public int getId(){
		return this.id;
	}
}

然后,对这个task进行处理,每一步都定义一个处理器,这里用到了LinkedBlockingQueue。

第一个任务处理器,启动task,并执行第一步操作doFirst,执行完后放入队列。

class HandlerFirst implements Runnable{
	private LinkedBlockingQueue<Task> firstQueue;
	private int count=0;//任务数
	public HandlerFirst(LinkedBlockingQueue<Task> firstQueue){
		this.firstQueue=firstQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task=new Task(count++);
				task.doFirst();
				firstQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

第二个处理器,从第一步完成的队列中取出任务,然后继续执行第二步操作,完成后再放入第二步完成队列:

class HandlerSecond implements Runnable{
	private LinkedBlockingQueue<Task> firstQueue,secondQueue;
	public HandlerSecond(LinkedBlockingQueue<Task> firstQueue,LinkedBlockingQueue<Task> secondQueue){
		this.firstQueue=firstQueue;
		this.secondQueue=secondQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = firstQueue.take();
				task.doSecond();
				secondQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

第三步,从完成第二步的队列中取出任务继续处理,之后放入全部完成的队列:

class HandlerThird implements Runnable{
	private LinkedBlockingQueue<Task> secondQueue,finishQueue;
	public HandlerThird(LinkedBlockingQueue<Task> secondQueue,LinkedBlockingQueue<Task> finishQueue){
		this.secondQueue=secondQueue;
		this.finishQueue=finishQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = secondQueue.take();
				task.doThird();
				finishQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

任务所有步骤都完成时的处理器,打印出taskid:

class FinishHandler implements Runnable{
	private LinkedBlockingQueue<Task> finishQueue;
	public FinishHandler(LinkedBlockingQueue<Task> finishQueue){
		this.finishQueue=finishQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = finishQueue.take();
				System.out.println("******* finish! taskId="+task.getId());
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

最后,测试一下这个程序:

LinkedBlockingQueue<Task> firstQueue=new LinkedBlockingQueue<Task>(),
		          secondQueue=new LinkedBlockingQueue<Task>(),
		          finishQueue=new LinkedBlockingQueue<Task>();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new HandlerFirst(firstQueue));
es.execute(new HandlerSecond(firstQueue,secondQueue));
es.execute(new HandlerThird(secondQueue,finishQueue));
es.execute(new FinishHandler(finishQueue));
es.shutdown();

定义3个阻塞队列,分别存放第一步、第二步、第三步完成后任务的队列。

然后分别启动他们,任务会不断的创建并将各个步骤完成的结果放入不同的队列中。

由于BlockingQueue内部已经进行了同步处理,所以并发访问时不再需要同步代码。

2、管道

主要就是使用管道流实现该功能:

首相,定义一个Sender,该sender不停的把字符写入流中:

class Sender implements Runnable {
  private Random rand = new Random();
  private PipedWriter out = new PipedWriter();
  public PipedWriter getPipedWriter() { return out; }
  public void run() {
    while(true) {
      for(char c = 'A'; c <= 'z'; c++) {
        try {
          out.write(c);
          TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
        } catch(Exception e) {
          throw new RuntimeException(e);
        }
      }
    }
  }
}
然后定义一个Receiver,不断的从管道中读取数据:
class Receiver implements Runnable {
  private PipedReader in;
  public Receiver(Sender sender) throws IOException {
    in = new PipedReader(sender.getPipedWriter());
  }
  public void run() {
    try {
      while(true) {
        // Blocks until characters are there:
        System.out.println("Read: " + (char)in.read());
      }
    } catch(IOException e) {
      throw new RuntimeException(e);
    }
  }
}
最后,测试一下这个程序,首先启动sender,然后启动receiver:
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService es = Executors.newCachedThreadPool();
es.execute(sender);
es.execute(receiver);

sender会不停的向管道中写入,receiver不停的从管道中读取,实现了一个队列的功能。

分享到:
评论

相关推荐

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...

    【并发编程】CAS到底是什么.pdf

    在开始探讨CAS之前,我们首先简要回顾一下并发编程的一些基本概念。 - **进程**:是计算机中的一个执行单元,它拥有独立的地址空间,是操作系统进行资源分配和调度的基本单位。 - **线程**:是进程内的一个执行单元...

    回顾:欧洲机架延迟效应

    7. **架构设计**:在分布式系统中,考虑使用消息队列、服务化架构,甚至微服务,以减少组件间的直接通信,从而降低延迟。 总结来说,“欧洲机架延迟效应”在C++编程中的理解和应对,是提高软件性能的关键环节。...

    Java-JUC-多线程 进阶

    Java-JUC-多线程进阶resources是 Java 并发编程的高级课程,涵盖了 Java 中的并发编程概念、线程安全、锁机制、集合类、线程池、函数式接口、Stream流式计算等多个方面。 什么是JUC JUC(Java Utilities for ...

    高并发系统架构(LVS负载均衡、Nginx、共享存储、队列缓存)21.redis实战案例及总回顾 共18页.pptx

    高并发架构实战案例分享-概述 共19页.pptx02.Piranha安装快速搭建LVS负载均衡集群 共16页.pptx03.LVS负载均衡DR模式安装调试介绍 共13页.pptx04.LVS负载均衡深入进阶实战 共13页.pptx05.LVS调度策略及负载均衡原理...

    UNIX环境高级编程第二版(高清电子书)

    - **多线程与并发**:讲解在网络编程中如何有效利用多线程来提高程序的并发性能。 #### 3. 高级主题 - **信号处理**:深度剖析信号在UNIX系统中的作用,包括异步I/O、错误处理、进程终止等场景的应用。 - **资源...

    Visual.C.网络高级编程2

    在深入了解本书之前,我们先回顾一下Visual C++网络编程的基础知识: 1. **Socket基础**:Socket是实现网络通信的一种方式,它为进程间提供了一种端对端的数据传输机制。在Visual C++中,主要通过Winsock API来实现...

    Java多线程与并发库高级应用

    在Java中,`java.util.concurrent`包提供了大量的工具类和接口,使得开发人员可以更方便地进行并发编程。 ##### 3.1 Executor框架 `java.util.concurrent.Executor`框架提供了一种灵活的方式来创建线程池,常见的...

    python高级编程.pdf

    #### 第十三章:AsyncIO并发编程 - **事件循环**:异步编程的核心机制。 - **协程嵌套**:在协程中调用其他协程。 - **`ThreadPoolExecutor`与`asyncio`结合**:将阻塞调用放在单独的线程中执行。 - **`asyncio`...

    实验一_SOCKET编程实验

    **实验一: SOCKET编程实验** 在计算机网络领域,Socket编程是一种基本...在完成实验后,分析并优化代码,例如,处理并发连接、提高数据传输效率等,以提升自己的编程能力。记得记录实验过程和结果,便于回顾和分享。

    unix环境高级编程第二版 完美书签

    2. **进程管理**:包括进程创建、进程控制、进程间通信(管道、消息队列、共享内存、信号量等)、进程调度和同步机制,这些都是编写高效并发程序的基础。 3. **文件I/O**:深入剖析Unix中的文件系统,如打开、关闭...

    Linux IPCs 编程手册(英文)

    - **总结**:回顾了文件锁定的重要性和使用场合。 #### 7. 消息队列 - **消息队列概念**:消息队列是一种更为灵活的进程间通信机制,允许发送和接收消息。 - **创建消息队列**:详细说明了如何创建消息队列以及如何...

    24小时学通QT编程

    2. **基础语法和数据类型**:回顾C++基础知识,为QT编程打下基础。 3. **QT基本组件和布局管理**:学习QWidget、QPushButton、QLabel、QLineEdit等常用控件,以及如何使用QLayout进行布局。 4. **信号与槽机制**:...

    一本关于Linux网络编程的好书,《Linux网络编程》

    - **套接字的历史、功能和类型**:回顾了套接字的发展历程,解释了其在网络编程中的核心作用,包括SOCKET_STREAM、SOCKET_DGRAM和SOCKET_RAW三种类型的特点和适用场景。 - **Linux支配的网络协议**:聚焦于Linux平台...

    Python100经典练习题(1).rar

    15. **并发编程**:线程和进程的概念,以及使用threading和multiprocessing模块进行并发处理。 16. **数据结构与算法**:栈、队列、堆、图等数据结构的实现,以及排序和搜索算法的理解与应用。 通过解答这些练习题...

Global site tag (gtag.js) - Google Analytics