http://wiki.apache.org/hama/BSPModel
Overview
In Apache Hama, you can implement your own BSP method by extending from org.apache.hama.bsp.BSP class. Apache Hama provides in this class a user-defined function bsp() that can be used to write your own BSP program.
The bsp() function handles whole parallel part of the program. (So it just gets called once, not all over again)
There are also setup() and cleanup() which will be called at the beginning of your computation, respectively at the end of the computation.
cleanup() is guranteed to run after the computation or in case of failure. (In 0.4.0 it is actually not, we expect this to be fixed in 0.5.0).
You can simply override the functions you need from BSP class.
Basically, a BSP program consists of a sequence of supersteps. Each superstep consists of the three phases:
- Local computation
- Process communication
- Barrier synchronization
NOTE that these phases should be always sequential order.
In Apache Hama, the communication between tasks (or peers) is done within the barrier synchronization.
BSP Function
The "bsp()" function is a user-defined function that handles the whole parallel part of the program. It only takes one argument "BSPPeer", which contains an communication, counters, and IO interfaces.
Input and Output
General Information
Since Hama 0.4.0 we provide a input and output system for BSP Jobs.
We choose the key/value model from Hadoop, since we want to provide a conherent API to widely used products like Hadoop MapReduce (SequenceFiles) and HBase (Column-storage).
Input
Configuring Input
When setting up a BSPJob, you can provide a InputFormat and a Path where to find the input.
BSPJob job = new BSPJob(); // detail stuff omitted job.setInputPath(new Path("/tmp/test.seq"); job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
Another way to add input paths is following:
SequenceFileInputFormat.addInputPath(job, new Path("/tmp/test.seq"));
You can also add multiple paths by using this method:
SequenceFileInputFormat.addInputPaths(job, "/tmp/test.seq,/tmp/test2.seq,/tmp/test3.seq");
Note that these paths must be separated by a comma.
In case of a SequenceFileInputFormat the key and value pair are parsed from the header.
When you use want to read a basic textfile with TextInputFormat the key is always LongWritable which contains how much bytes have been read and Text which contains a line of your input.
Using Input
You can now read the input from each of the functions in BSP class which has BSPPeer as parameter. (e.G. setup / bsp / cleanup)
In this case we read a normal text file:
@Override public final void bsp( BSPPeer<LongWritable, Text, KEYOUT, VALUEOUT, MESSAGE_TYPE> peer) throws IOException, InterruptedException, SyncException { // this method reads the next key value record from file KeyValuePair<LongWritable, Text> pair = peer.readNext(); // the following lines do the same: LongWritable key = new LongWritable(); Text value = new Text(); peer.readNext(key, value); }
Consult the docs for more detail on events like end of file.
There is also a function which allows you to re-read the input from the beginning.
This snippet reads the input five times:
for(int i = 0; i < 5; i++){ LongWritable key = new LongWritable(); Text value = new Text(); while (peer.readNext(key, value)) { // read everything } // reopens the input peer.reopenInput() }
You must not consume the whole input to reopen it.
Custom Inputformat
You can implement your own inputformat. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.
Output
Configuring Output
Like the input, you can configure the output while setting up your BSPJob.
job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.setOutputFormat(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, TMP_OUTPUT);
As you can see there are 3 major sections.
The first section is about setting the classes for output key and output value.
The second section is about setting the format of your output. In this case this is TextOutputFormat, it outputs key separated by tabstops ('\t') from the value. Each record (key+value) is separated by a newline ('\n').
The third and last section is about setting the path where your output should go. You can use the static method in your choosen Outputformat as well as the convenience method in BSPJob:
job.setOutputPath(new Path("/tmp/out"));
If you don't provide output, no output folder or collector will be allocated.
Using Output
From your BSP, you can output like this:
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { peer.write(new Text("Estimated value of PI is"), new DoubleWritable(3.14)); }
Note that you can always output, even from Setup or Cleanup methods!
Custom Outputformat
You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats, so you can use existing literature to get into it.
Communication Model
Within the bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The following table describes all the functions you can use:
Function |
Description |
send(String peerName, BSPMessage msg) |
Send a message to another peer. |
getCurrentMessage() |
Get a received message from the queue. |
getNumCurrentMessages() |
Get the number of messages currently in the queue. |
sync() |
Starts the barrier synchronization. |
getPeerName() |
Get the peer name of this task. |
getPeerName(int index) |
Gets the n-th peer name. |
getNumPeers() |
Get the number of peers. |
getAllPeerNames() |
Get all peer names (including "this" task). (Hint: These are always sorted in ascending order) |
The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, LongMessage> peer) throws IOException, SyncException, InterruptedException { for (String peerName : peer.getAllPeerNames()) { peer.send(peerName, new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis())); } peer.sync(); }
Synchronization
When all the processes have entered the barrier via the sync() function, the Hama proceeds to the next superstep. In the previous example, the BSP job will be finished by one synchronization after sending a message "Hello from ..." to all peers.
But, keep in mind that the sync() function is not the end of the BSP job. As was previously mentioned, all the communication functions are very flexible. For example, the sync() function also can be called in a for loop so that you can use to program the iterative methods sequentially:
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, Writable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < 100; i++) { // send some messages peer.sync(); } }
The BSP job will be finished only when all processes have no more local and outgoing queues entries and all processes done or is killed by the user.
Counters
Just like in Hadoop MapReduce you can use Counters.
Counters are basically enums that you can only increment. You can use them to track meaningful metrics in your code, e.G. how often a loop has been executed.
From your BSP code you can use counters like this:
// enum definition enum LoopCounter{ LOOPS } @Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < iterations; i++) { // details ommitted peer.getCounter(LoopCounter.LOOPS).increment(1L); } // rest ommitted }
Setup and Cleanup
Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:
public class MyEstimator extends BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> { @Override public void setup( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException { //Setup: Choose one as a master this.masterTask = peer.getPeerName(peer.getNumPeers() / 2); } @Override public void cleanup( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException { // your cleanup here } @Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { // your computation here } }
Setup is called before bsp method, and cleanup is executed at the end after bsp. You can do everything in setup and cleanup: sync, send, increment counters, write output or even read from the input.
Combiners
Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.
public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, IntegerMessage> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < 1000; i++) { peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i)); } peer.sync(); if (peer.getPeerName().equals(masterTask)) { IntegerMessage received; while ((received = peer.getCurrentMessage()) != null) { sum += received.getData(); } } }
If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.
public static class SumCombiner extends Combiner { @Override public BSPMessageBundle combine(Iterable<BSPMessage> messages) { BSPMessageBundle bundle = new BSPMessageBundle(); int sum = 0; Iterator<BSPMessage> it = messages.iterator(); while (it.hasNext()) { sum += ((IntegerMessage) it.next()).getData(); } bundle.addMessage(new IntegerMessage("Sum", sum)); return bundle; } }
Implementation notes
Internal implementation details
BSPJobClient
- Create the splits for the job
- writeNewSplits()
- job.set("bsp.job.split.file", submitSplitFile.toString());
- Sets the number of peers to split.lenth
- Receives splitFile
-
Add split argument to TaskInProgress constructor
Task
- Gets his split from Groom
- Initializes everything in BSPPeerImpl
相关推荐
2. **BSP Job API**:Hama提供了一套API来编写BSP应用,用户可以通过这些接口定义计算任务,包括初始化、计算和同步等步骤。 3. **图计算**:Hama尤其擅长处理图数据,支持诸如PageRank、单源最短路径等算法,是...
一旦环境准备就绪,用户可以通过Hama提供的API编写BSP程序,或者利用已有的工具如Pregel+进行图计算。在源码层面,用户可以查看和修改Hama的源代码,以优化性能或添加新的功能。 总之,Hama-0.6.0是一个强大的工具...
分布式模式的Hama安装笔记,内容如下: 1.参考“hadoop安装.txt”,完成hadoop的安装。节点信息如下: 192.168.1.160 hadoop-1 192.168.1.161 hadoop-2 192.168.1.162 hadoop-3 2.添加环境变量 在/etc/profile...
### 汉密顿焦虑量表(HAMA)详解 #### 一、概述 汉密顿焦虑量表(Hamilton Anxiety Scale,简称HAMA)是由M. Hamilton于1959年编制而成的精神医学评估工具,旨在量化评价个体的焦虑水平。作为一种广泛应用于临床实践...
汉密顿焦虑量表(HAMA)是精神科领域广泛应用的一种心理评估工具,由Max Hamilton于1959年制定。该量表主要用于评估患者焦虑症状的严重程度,它包含14个项目,每个项目采用0至4分的评分法,对应不同的症状严重程度。...
基于Hama并行计算框架的多层级作业调度算法的研究及实现 胡月胜
BSP ( Bu lk Synch ronou s Paral2 lel) 是一种独立于体系结构且具有可预测性的并行 模型, 它既是算法理论分析模型, 也是一种并行程序 设计的规范 . 由于BSP 模型对并行程序设计没有过 多的限制, 程序员在编程方法 ...
Hama是一个基于Bulk Synchronous Parallel (BSP)模型的分布式并行计算框架,主要用于大规模科学计算。Hama弥补了Hadoop平台的局限性,特别是在图计算领域展现出明显优势。尽管Hama的应用实践相对较短,仍有很大的...
汉密尔顿焦虑量表(HAMA)是一种用于评估焦虑症状严重程度的心理测量工具,广泛应用于临床心理学和精神科领域。该量表由14个条目组成,每个条目针对一个特定的焦虑症状,通过评分来判断患者的情况。下面我们将详细...
汉密尔顿焦虑量表(HAMA)自推出以来,在心理评估领域发挥了不可替代的作用,它是一种精准的心理测量工具,用于评估个体的焦虑症状严重程度,尤其在临床环境中对患者的心理健康状况进行量化。HAMA作为医学心理学和...
HAMA抑郁量表.pdf
Hama图计算模型 Pi计算编译文件
"汉密尔顿焦虑量表HAMA项打印版.pdf" 汉密尔顿焦虑量表(HAMA)是一种常用的评估工具,用于评估个体的焦虑水平。该量表由十四个项目组成,涵盖了焦虑的多个方面,包括情绪、认知、躯体性症状、生殖泌尿神经系统症状...
2. HAMA焦虑量表:HAMA(Hamilton Anxiety Rating Scale)是评估焦虑程度的临床量表,通过一系列问题或观察,来确定患者焦虑症状的严重程度。在本研究中,使用HAMA量表来评估护理干预前后患者焦虑水平的变化。 3. ...
哈马珠,又称Perler Beads或Hama Beads,是一种流行的创意手工活动材料,尤其是对于儿童和手工爱好者。它们是一系列彩色的小珠子,通过在铁板上排列出特定图案,然后用熨斗热熔成形,可以创造出各种精美的像素艺术...
尽管本文提供的信息有限,无法得知BC-BSP系统具体实现的细节和更多性能评估,但通过对比实验可以看出,BC-BSP平台在性能上已经超越了其他类似系统如Hama和Giraph,这表明其在大数据并行处理领域的领先地位。...
汉密尔顿焦虑量表HAMA(Hamilton Anxiety Rating Scale) 汉密尔顿焦虑量表HAMA是一种常用的评估工具,用于评估焦虑症状的严重性和变化。该量表由 Max Hamilton 在1959年开发,包含14个项目,评定员通过对被评定者...