`
MouseLearnJava
  • 浏览: 468146 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java并发编程: 使用Exchanger实现线程间的数据交换

阅读更多

本文介绍Exchanger工具类, 然后采用Exchanger给出一个两个线程交换数值的简单实例。

1. Exchanger介绍
/**
 * A synchronization point at which two threads can exchange objects.
 * Each thread presents some object on entry to the {@link #exchange
 * exchange} method, and receives the object presented by the other
 * thread on return.
*/


从上面的注释中可以看出:Exchanger提供了一个同步点在这个同步点,两个线程可以交换数据。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。

Exchanger通过Lock和Condition来完成功能,Exchanger的一个重要的public方法是exchange方法,用于线程的数据交换, 相关的类图以及详细的Exchanger类内容如下:



package java.util.concurrent;
import java.util.concurrent.locks.*;

/**
 * A synchronization point at which two threads can exchange objects.
 * Each thread presents some object on entry to the {@link #exchange
 * exchange} method, and receives the object presented by the other
 * thread on return.
 *
 * <p><b>Sample Usage:</b>
 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
 * swap buffers between threads so that the thread filling the
 * buffer gets a freshly
 * emptied one when it needs it, handing off the filled one to
 * the thread emptying the buffer.
 * <pre>
 * class FillAndEmpty {
 *   Exchanger<DataBuffer> exchanger = new Exchanger();
 *   DataBuffer initialEmptyBuffer = ... a made-up type
 *   DataBuffer initialFullBuffer = ...
 *
 *   class FillingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialEmptyBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           addToBuffer(currentBuffer);
 *           if (currentBuffer.full())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ... }
 *     }
 *   }
 *
 *   class EmptyingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialFullBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           takeFromBuffer(currentBuffer);
 *           if (currentBuffer.empty())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ...}
 *     }
 *   }
 *
 *   void start() {
 *     new Thread(new FillingLoop()).start();
 *     new Thread(new EmptyingLoop()).start();
 *   }
 * }
 * </pre>
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The type of objects that may be exchanged
 */
public class Exchanger<V> {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition taken = lock.newCondition();

    /** Holder for the item being exchanged */
    private V item;
    
    /**
     * Arrival count transitions from 0 to 1 to 2 then back to 0
     * during an exchange.
     */
    private int arrivalCount;

    /**
     * Main exchange function, handling the different policy variants.
     */
    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
        lock.lock();
        try {
            V other;

            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {
                if (!timed)
                    taken.await();
                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);
                else 
                    throw new TimeoutException();
            }

            int count = ++arrivalCount;

            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();
                return other;
            }

            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;
            try { 
                while (arrivalCount != 2) {
                    if (!timed)
                        taken.await();
                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);
                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }

            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();
            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {
                if (interrupted != null)
                    Thread.currentThread().interrupt();
                return other;
            }

            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;
            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Create a new Exchanger.
     **/
    public Exchanger() {
    }

    /**
     * Waits for another thread to arrive at this exchange point (unless
     * it is {@link Thread#interrupt interrupted}),
     * and then transfers the given object to it, receiving its object
     * in return.
     * <p>If another thread is already waiting at the exchange point then
     * it is resumed for thread scheduling purposes and receives the object
     * passed in by the current thread. The current thread returns immediately,
     * receiving the object passed to the exchange by that other thread.
     * <p>If no other thread is already waiting at the exchange then the 
     * current thread is disabled for thread scheduling purposes and lies
     * dormant until one of two things happens:
     * [list]
     * <li>Some other thread enters the exchange; or
     * <li>Some other thread {@link Thread#interrupt interrupts} the current
     * thread.
     * [/list]
     * <p>If the current thread:
     * [list]
     * <li>has its interrupted status set on entry to this method; or 
     * <li>is {@link Thread#interrupt interrupted} while waiting
     * for the exchange, 
     * [/list]
     * then {@link InterruptedException} is thrown and the current thread's 
     * interrupted status is cleared. 
     *
     * @param x the object to exchange
     * @return the object provided by the other thread.
     * @throws InterruptedException if current thread was interrupted 
     * while waiting
     **/
    public V exchange(V x) throws InterruptedException {
        try {
            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    }

    /**
     * Waits for another thread to arrive at this exchange point (unless
     * it is {@link Thread#interrupt interrupted}, or the specified waiting
     * time elapses),
     * and then transfers the given object to it, receiving its object
     * in return.
     *
     * <p>If another thread is already waiting at the exchange point then
     * it is resumed for thread scheduling purposes and receives the object
     * passed in by the current thread. The current thread returns immediately,
     * receiving the object passed to the exchange by that other thread.
     *
     * <p>If no other thread is already waiting at the exchange then the 
     * current thread is disabled for thread scheduling purposes and lies
     * dormant until one of three things happens:
     * [list]
     * <li>Some other thread enters the exchange; or
     * <li>Some other thread {@link Thread#interrupt interrupts} the current
     * thread; or
     * <li>The specified waiting time elapses.
     * [/list]
     * <p>If the current thread:
     * [list]
     * <li>has its interrupted status set on entry to this method; or 
     * <li>is {@link Thread#interrupt interrupted} while waiting
     * for the exchange, 
     * [/list]
     * then {@link InterruptedException} is thrown and the current thread's 
     * interrupted status is cleared. 
     *
     * <p>If the specified waiting time elapses then {@link TimeoutException}
     * is thrown.
     * If the time is 
     * less than or equal to zero, the method will not wait at all.
     *
     * @param x the object to exchange
     * @param timeout the maximum time to wait
     * @param unit the time unit of the <tt>timeout</tt> argument.
     * @return the object provided by the other thread.
     * @throws InterruptedException if current thread was interrupted
     * while waiting
     * @throws TimeoutException if the specified waiting time elapses before
     * another thread enters the exchange.
     **/
    public V exchange(V x, long timeout, TimeUnit unit) 
        throws InterruptedException, TimeoutException {
        return doExchange(x, true, unit.toNanos(timeout));
    }

}


2. Exchanger工具类的使用案例
本文给出一个简单的例子,实现两个线程之间交换数据,用Exchanger来做非常简单。

package my.concurrent.exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicReference;

public class ThreadA implements Runnable {

	private final Exchanger<Integer> exchanger;

	private final AtomicReference<Integer> last = new AtomicReference<Integer>(
			5);

	public ThreadA(Exchanger<Integer> exchanger) {
		this.exchanger = exchanger;
	}

	public void run() {
		try {
			while (true) {
				last.set(exchanger.exchange(last.get()));
				System.out.println(" After calling exchange. Thread A has value: " + last.get());
				Thread.sleep(2000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}


package my.concurrent.exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicReference;

public class ThreadB implements Runnable {

	private Exchanger<Integer> exchanger;

	private final AtomicReference<Integer> last = new AtomicReference<Integer>(
			10);

	public ThreadB(Exchanger<Integer> exchanger) {
		this.exchanger = exchanger;
	}

	public void run() {
		try {
			while (true) {
				last.set(exchanger.exchange(last.get()));
				System.out.println(" After calling exchange. Thread B has value: " + last.get());
				Thread.sleep(2000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}


package my.concurrent.exchanger;

import java.util.concurrent.Exchanger;

public class ExchangerTest {

	public static void main(String[] args) {
		Exchanger<Integer> exchanger = new Exchanger<Integer>();
		new Thread(new ThreadA(exchanger)).start();
		new Thread(new ThreadB(exchanger)).start();
	}

}


运行一段时间之后的输出结果如下:
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10



可以看出:两个线程的数据一直都在相互交换。
  • 大小: 19.3 KB
1
1
分享到:
评论

相关推荐

    java并发编程:设计原则与模式.rar

    《Java并发编程:设计原则与模式》是一本深入探讨Java多线程编程的书籍,它涵盖了并发编程中的关键概念、原则和模式。在Java中,并发处理是优化应用程序性能、提高资源利用率的重要手段,尤其在现代多核处理器的环境...

    JAVA并发编程实践

    根据给定文件的信息“JAVA并发编程实践”以及其描述为“Java并发学习资料”,我们可以从中提炼出关于Java并发编程的一些核心知识点。Java并发编程是Java高级特性之一,它允许开发者编写能够同时执行多个任务的程序,...

    java线程与并发编程实践

    线程安全的数据结构,如ConcurrentHashMap、ConcurrentLinkedQueue等,是Java并发编程的重要组成部分。它们内部实现了线程安全的更新策略,能够在高并发环境下保证数据一致性。 异常处理在多线程编程中同样重要。...

    Java并发编程全景图.pdf

    Java并发编程是Java语言中最为复杂且重要的部分之一,它涉及了多线程编程、内存模型、同步机制等多个领域。为了深入理解Java并发编程,有必要了解其核心技术点和相关实现原理,以下将详细介绍文件中提及的关键知识点...

    Java 并发编程实战 中英文+代码示例

    6. **线程池**:Executor框架是Java并发编程的重要组成部分,讲解ThreadPoolExecutor的使用,包括线程池的参数配置、工作队列的选择以及线程池的生命周期管理。 7. **死锁与活锁**:分析可能导致线程死锁的原因,...

    Java并发编程实践

    - 可视化阻塞数据结构:如`Exchanger`,用于线程间数据交换。 5. **第五章:线程池** - `Executor`框架:线程池的创建、管理和关闭。 - `ThreadPoolExecutor`的参数配置:核心线程数、最大线程数、工作队列等。 ...

    《Java并发编程实战》PDF版本下载.txt

    - **Exchanger**:允许两个线程交换对象的工具类。 ### Java并发编程实战案例 #### 1. 生产者消费者模式 生产者消费者模式是一种经典的多线程设计模式,用于解决多线程之间如何共享数据的问题。在Java中可以通过`...

    Java多线程编程之使用Exchanger数据交换实例

    Java多线程编程中的Exchanger是一个非常有用的工具类,它位于`java.util.concurrent`包下,主要用于线程间的数据交换。Exchanger的核心功能是让两个线程在一个同步点相遇,进行数据交换。当一个线程调用`exchange`...

    JAVA并发编程实践(中文)含源码

    《JAVA并发编程实践》是一本深入探讨Java多线程与并发控制的权威书籍,中文版本的出现使得更多国内开发者能够无障碍地学习这方面的知识。这本书不仅涵盖了理论基础,还提供了丰富的实战示例,包括源码,使读者能够...

    Java并发编程实战

    根据提供的文件信息,“Java并发编程实战”这本书主要聚焦于Java并发编程的核心概念和技术。下面将对并发编程的一些关键知识点进行详细解析。 ### 并发与并行 在深入讨论Java并发编程之前,我们首先需要理解并发...

    java并发编程技术

    Java并发编程技术是Java开发中的重要领域,它涉及到如何在多线程环境下高效地执行程序。并发编程可以充分利用多核处理器资源,提高系统的响应速度和处理能力。以下是一些核心的知识点: 1. **并行程序**:并行程序...

    29 一手交钱,一手交货—Exchanger详解.pdf

    在Java并发编程中,Exchanger是一个非常有用的工具类,它允许两个线程间进行数据交换。这个类在处理需要同步的交互式任务时特别有用,比如在多阶段处理或者需要线程间协作的情况。Exchanger的工作原理就像一个中介,...

    Java并发编程(学习笔记).xmind

    Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...

    java并发工具类(CountDownLatch+Semaphore+Exchanger)

    Java并发工具类是Java并发编程中的重要组成部分,其中包括了多种实用的工具,如CountDownLatch、Semaphore和Exchanger,这些工具类极大地简化了多线程环境下的同步和协调问题。 1. **CountDownLatch**: ...

    java 并发编程

    - **Exchanger**:用于线程间的数据交换。 #### 五、代理模式简介 除了线程的基础知识外,给定的内容还提到了**代理模式**,这是一种设计模式,用于在不改变原有接口的前提下,为对象添加新的责任。代理模式通常...

    Java并发编程实践-03章-使用JDK并发包构建程序1

    `java.util.concurrent`包是Java标准库中专门用于并发编程的模块,它包含了各种线程安全的数据结构、同步机制和执行模型。这个包的引入极大地简化了并发编程的复杂性,提供了一套高效且易用的并发工具。 **3.2 ...

Global site tag (gtag.js) - Google Analytics