`
frank-liu
  • 浏览: 1685260 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

CyclicBarrier的应用

阅读更多

简介

    我们在多线程应用的一些设计中会碰到一些问题。比如说利用多个线程去分别计算某个问题的部分结果,然后再将结果存储在某个地方。等所有这些线程都结束之后,我们再将这些线程产生的结果合并起来并得到问题的解。这种方法基于这么一个前提,就是所有这些线程可以并行的执行,他们之间不会有互相的干扰。另外,线程产生的结果不会有冲突。比如说,我们可以给线程一个编号限制,某个线程产生的结果放到某个编号的元素里,这样只要他们的编号不重复,就不会有保存结果的冲突。这样,整个执行的过程就可以拆分成多个线程执行,提高了并行度,使得解决问题的效率更高。

 

应用场景分析

    前面我们提到的这种场景确实挺好。而且通过分别的编号也可以解决数据访问冲突的问题。但是,这边还有一个问题就是。我怎么知道所有并发执行的线程都执行结束了呢?因为我需要知道他们所有的都执行结束了我才可以去他们产生的结果里统计最终的运算结果。这个时候,我们就需要用到一个类似于栅栏的机制了。在Java里有这么一个实现,就是CyclicBarrier。该怎么理解这个东西呢?我们可以用一个田径比赛来做类比。

    假设我们有一个跑步的田径比赛。每个线程就相当于是一个参赛选手。这些参赛选手每个人都占据一个跑道,他们只能沿着自己的跑道往前跑,不能窜到别人的跑道上去。在比赛开始后,如果我们想得到所有比赛选手的成绩,很显然,我们需要比赛结束了。怎么才能让比赛结束了呢?最起码是要保证跑在最后的一个选手也到达了终点,只有这个时候我们才能得到所有选手的成绩。这里,CyclicBarrier就好像是那个记分员,每到达一个选手,他会记录一下成绩。但是只有在最后一个选手到达后,他才能把所有的成绩送去做总的统计。

    ok,有了前面这部分分析,我们可以发现。CyclicBarrier就相当于这么一个阻断机制,在前面达到这个点的线程会等在那里,一直到最后一个线程到达后,他才会在这个点让那些线程继续做自己的事情。好像在这个点的时候,所有的线程又站在了同一个起跑线。

 

示例

    现在,根据前面的讨论。我们来举一个实际的例子。假设我们有一个矩阵M*N的矩阵。那里放了若干了数字。假定给定一个数字,我们要统计里面所有等于这个数字的元素个数。那么,除了传统的那个顺序走过每一个元素的办法以外,我们还可以考虑到一个多线程的办法。

    既然线程是M*N的,我们可以将线程按照行分成若干块。比如说有5块,那么从0到M/5 - 1这一行这部分我们分配一个线程来统计。M/5到2*M/5 - 1这部分分配第二个线程来统计,依次类推。我们可以分配5个线程,每个统计其中的一部分。

    统计完了之后呢?我们需要保存结果。既然不希望结果产生冲突。我们可以将结果写到一个数组里。比如说第0行的统计结果就写到数组的索引0,第1行的统计就写到索引1。等所有的线程都跑完之后,我们再启动一个线程来将结果统计出来。

    现在我们就一步步按照这边讨论的来,首先我们来定义一个矩阵,并通过随机的方式生成里面的数字:

import java.util.Random;

public class MatrixMock {
	private int[][] data;
	
	public MatrixMock(int size, int length, int number) {
		int counter = 0;
		data = new int[size][length];
		Random random = new Random();
		for(int i = 0; i < size; i++) {
			for(int j = 0; j < length; j++) {
				data[i][j] = random.nextInt(10);
				if(data[i][j] == number) {
					counter++;
				}
			}
		}
		System.out.printf("Mock: There are %d occurrences of number %d " +
				"in generated data.\n", counter, number);
	}
	
	public int[] getRow(int row) {
		if(row >= 0 && row < data.length) {
			return data[row];
		} else {
			return null;
		}
	}
}

    这里,我们在MatrixMock的构造函数里随机生成0到10之间的数字(不包括10)填充到矩阵中。

    然后,我们再定义一个存放统计结果的类:

public class Results {
	private int[] data;
	
	public Results(int size) {
		data = new int[size];
	}
	
	public void setData(int position, int value) {
		data[position] = value;
	}
	
	public int[] getData() {
		return data;
	}
}

    每个执行的线程通过调用setData可以将自己统计的结果写到数组里面。

    现在,我们再定义一个执行的线程。它根据我们提供的参数作为构造函数来扫描指定的行范围进行统计:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Searcher implements Runnable {
	private int firstRow;
	private int lastRow;
	private MatrixMock mock;
	private Results results;
	private int number;
	private final CyclicBarrier barrier;
	
	public Searcher(int firstRow, int lastRow, MatrixMock mock, 
			Results results, int number, CyclicBarrier barrier) {
		this.firstRow = firstRow;
		this.lastRow = lastRow;
		this.mock = mock;
		this.results = results;
		this.number = number;
		this.barrier = barrier;
	}
	
	@Override
	public void run() {
		int counter;
		System.out.printf("%s: Processing lines from %d to %d.\n",
				Thread.currentThread().getName(), firstRow, lastRow);
		for(int i = firstRow; i < lastRow; i++) {
			int[] row = mock.getRow(i);
			counter = 0;
			for(int j = 0; j < row.length; j++) {
				if(row[j] == number) {
					counter++;
				}
			}
			results.setData(i, counter);
		}
		System.out.printf("%s: Lines processed.\n",
				Thread.currentThread().getName());
		
		try {
			barrier.await();
		} catch(InterruptedException e) {
			e.printStackTrace();
		} catch(BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

    Searcher的代码看起来比较长,实际上并不复杂。在run方法中,它根据构造函数指定的行起始和终止范围去扫描矩阵。得到和期望结果相等的元素则将统计数字加一。再将对应的这一行的统计结果写到results中。

    这里有一个比较重要的地方就是执行完了我们前面提的这几个步骤之后,这里调用了一个barrier.await()。这个await()方法的作用就是让调用这个方法的线程进入等待状态,直到所有线程都调用了这个方法。所以,这个问题最核心的地方就在这里,有了barrier.await()这个收费站,只能等到所有线程到齐才能交够保护费了:)

    前面这些处理完之后,我们就该来统计结果了。我们可以通过启动一个线程的方式来做。这个统计线程的代码就相对很简单:

public class Grouper implements Runnable {
	private Results results;
	
	public Grouper(Results results) {
		this.results = results;
	}
	
	@Override
	public void run() {
		int finalResult = 0;
		System.out.printf("Grouper: Processing results...\n");
		int[] data = results.getData();
		for(int number : data) {
			finalResult += number;
		}
		System.out.printf("Grouper: Total result: %d.\n", finalResult);
	}
}

    就是一个遍历所有结果数组,将结果相加。

    现在,所有的东西都已经具备了,就差把他们都整合起来的东风:

import java.util.concurrent.CyclicBarrier;

public class Main {

	public static void main(String[] args) {
		final int ROWS = 10000;
		final int NUMBERS = 1000;
		final int SEARCH = 5;
		final int PARTICIPANTS = 5;
		final int LINE_PARTICIPANT = 2000;
		
		MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
		Results results = new Results(ROWS);
		Grouper grouper = new Grouper(results);
		
		CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
		
		Searcher[] searchers = new Searcher[PARTICIPANTS];
		for(int i = 0; i < PARTICIPANTS; i++) {
			searchers[i] = new Searcher(i * LINE_PARTICIPANT, (i * LINE_PARTICIPANT) + LINE_PARTICIPANT,
					mock, results, 5, barrier);
			Thread thread = new Thread(searchers[i]);
			thread.start();
		}
		System.out.printf("Main: The main thread has finished.\n");
	}
}

     这里定义了一个10000 × 1000的矩阵,并将它划分成5个区域。在for循环的地方启动了5个线程。每个在一个指定的区域开始工作。有一个需要特别注意的地方是创建CyclicBarrier对象的构造函数:CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);

    我们将统计线程作为一个参数传入它的构造函数。在所有等待线程都跑到终点之后,这个grouper线程就会被启动起来。

    如果运行这个程序,我们会发现有类似如下的显示结果:

Mock: There are 999916 occurrences of number 5 in generated data.
Thread-0: Processing lines from 0 to 2000.
Thread-4: Processing lines from 8000 to 10000.
Main: The main thread has finished.
Thread-3: Processing lines from 6000 to 8000.
Thread-2: Processing lines from 4000 to 6000.
Thread-1: Processing lines from 2000 to 4000.
Thread-4: Lines processed.
Thread-3: Lines processed.
Thread-0: Lines processed.
Thread-2: Lines processed.
Thread-1: Lines processed.
Grouper: Processing results...
Grouper: Total result: 999916.

 

总结

    CyclicBarrier是一个比较有意思的线程阻断机制。它可以让指定的一组线程都停在某个点上。对于一些线程并行执行和结果统计的方式,它是一个可选项。当然,后面结合一些高级的手法,还有更好的方式。    

参考材料

Java 7 concurrency cookbook

1
3
分享到:
评论

相关推荐

    CyclicBarrier的用法

    在实际应用中,`CyclicBarrier`常用于分布式计算、多步骤处理等场景,例如在多节点并行计算后需要聚合结果,或者在进行多轮迭代时确保所有工作都在下一轮开始前完成。 总结来说,`CyclicBarrier`是Java并发库中一种...

    Java中的CountDownLatch与CyclicBarrier:深入理解与应用实践

    本文将详细介绍CountDownLatch和CyclicBarrier的工作原理、使用场景以及如何在实际项目中应用它们。 CountDownLatch和CyclicBarrier是Java并发编程中两个非常有用的同步工具,它们在不同的场景下有着各自的优势。...

    CountDownLatch和CyclicBarrier用法实例大全

    在实际应用中,CountDownLatch常用于启动多线程后等待所有线程完成后再进行下一步,而CyclicBarrier则适用于多线程协同工作,每个阶段完成后所有线程再次聚合,比如赛车游戏的起点等待、分布式计算中的数据分区处理...

    Java并发系列之CyclicBarrier源码分析

    CyclicBarrier可以应用于各种需要多个线程相互等待的场景,例如: * 等待所有人到齐后开始吃饭 * 等待所有人到齐后开始旅游 * 等待所有运动员到齐后开始比赛 CyclicBarrier是一个非常有用的同步工具类,能够帮助...

    JAVA CyclicBarrier类详解.docx

    《JAVA CyclicBarrier类详解》 ...在实际应用中,CyclicBarrier可以广泛应用于多线程计算、分布式计算等领域,如矩阵运算、图形渲染等,通过合理利用它可以有效地提高多线程程序的性能和协同效率。

    Java中的CyclicBarrier类最全讲义

    4. **线程安全**:`CyclicBarrier`本身是线程安全的,但在实际应用中,还需要注意线程间共享数据的一致性和安全性。 总之,`CyclicBarrier`是一个非常强大的并发工具,能够有效地帮助实现多线程间的同步和协作。...

    java并发编程中CountDownLatch和CyclicBarrier的使用借鉴.pdf

    CyclicBarrier的应用场景: CyclicBarrier是另一个非常有用的线程控制工具,它可以使多个线程等待彼此达到某一目标后进行自己的下一步工作。在上面的例子中,CyclicBarrier被用来模拟四个游戏玩家玩游戏的场景,...

    Java并发编程(CyclicBarrier)实例详解

    在实际应用中,我们可以使用CyclicBarrier来控制多个线程之间的协作,例如,在游戏中,我们可以使用CyclicBarrier来控制四个玩家之间的协作,每个玩家需要到达某个关卡时,其他玩家也需要到达该关卡时,所有玩家才能...

    详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    在Java并发编程中,CountDownLatch和CyclicBarrier都是用于协调多线程间同步的重要工具,它们可以帮助开发者在特定条件满足时启动或者结束线程的执行。本文将详细探讨这两个类的内部实现机制以及它们在实际应用场景...

    28 人齐了,一起行动—CyclicBarrier详解.pdf

    CyclicBarrier 是 Java 并发库中的一种同步工具类...在设计多线程应用时,根据具体需求选择合适的同步工具,如 CyclicBarrier、CountDownLatch 或 Semaphores,可以有效地避免竞态条件和死锁,提高系统的并行处理能力。

    java并发编程中CountDownLatch和CyclicBarrier的使用.pdf

    在Java并发编程中,CountDownLatch和CyclicBarrier是两种非常重要的同步工具,用于协调多个线程之间的交互。它们都属于java.util.concurrent包下的类,为多线程编程提供了强大的支持。 **CountDownLatch** 是一个...

    CountDownLatch 和 CyclicBarrier 的运用(含AQS详解)

    ### CountDownLatch 和 CyclicBarrier 的运用(含AQS详解) #### CountDownLatch **定义与特点:** CountDownLatch 是 Java 并发包中的一个重要组件,它主要用于解决“一个或多个线程等待其他线程完成任务”的问题。...

    Java中CyclicBarrier的用法分析

    `CyclicBarrier`在多线程编程中有着广泛的应用,例如在分布式计算中,可以用来确保所有参与者完成初始化步骤后才开始执行下一步。或者在游戏开发中,可能需要所有玩家加载完资源后再开始游戏。总之,`CyclicBarrier`...

    java线程并发cyclicbarrier类使用示例

    Java中的`CyclicBarrier`是用于多线程同步的一个强大工具,主要应用于一组线程需要在继续执行之前等待其他线程到达某个特定点的情况。它允许开发者定义一个屏障点,只有当所有参与的线程都到达这个屏障点时,它们才...

    Java并发编程相关技术使用案例

    1、本资源包含并发编程基础知识的使用案例,包括:线程创建、Synchronized和Reentrantlock锁的使用、线程安全问题演示、Condition的应用、CountDownLatch的应用、Cyclicbarrier的应用、Semaphore的应用、线程池的...

    CountDownLatch、CyclicBarrier、Semaphore.md

    java 高并发应用场景

    Java并发编程之栅栏(CyclicBarrier)实例介绍

    Java并发编程中的栅栏(CyclicBarrier)是一个同步辅助类,它允许一组线程等待彼此到达某个特定点,然后一起继续执行。这个特定点被称为屏障点。与闭锁(CountDownLatch)不同,闭锁通常是一次性的,而CyclicBarrier...

    Android应用程序中多线程应用的方法研究.pdf

    Android提供了java.util.concurrent包下的并发工具类,如Semaphore(信号量)、CountDownLatch(倒计数器)、CyclicBarrier(回环栅栏)等,这些工具可以帮助开发者实现复杂的线程同步和协作。 总结,Android应用...

    分布式Java应用.基础与实践

    线程池、锁机制、并发容器(如ConcurrentHashMap)以及并发工具类(如CountDownLatch、CyclicBarrier)的应用都是这一领域的重点。 六、NoSQL与大数据存储 在分布式环境中,传统的关系型数据库可能无法满足高并发和...

Global site tag (gtag.js) - Google Analytics