1. Java的共享内存通信
总体性能未见提高,且由于避免了sync()中的通信,使得数据传输时间较短,导致如下问题:
16/09/12 18:16:27 INFO graph.GraphJobRunner: Start process msg: 1473675387360
16/09/12 18:16:27 INFO message.MesssagShareManager: messageManager memoryRead finish: 1473675387864
可见当读完数据时,循环已经开始读取并计算。因此反而会影响真正的计算过程,虽然一轮超步中的sync时间缩短了,但是looping计算的时间却增加了,所以总体性能未见提高。杯具!
附上解决代码:
package org.apache.hama.bsp.message;
import io.netty.util.internal.ConcurrentSet;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.WritableUtils;
public class MessageShareHandler<M extends Writable> implements Callable<Boolean>{
public MappedByteBuffer mbb;
public static String MEMORY_SHARE_FILE = "/opt/hama-0.7.1/share/" ;
RandomAccessFile raf; // 共享内存对应文件
public static int bufferSize = 30 * 1024 * 1024; //内存
ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ; //消息容器,消息进来时已经经过combine,因此不会重复Id
protected static final Log LOG = LogFactory
.getLog(MessageShareHandler.class);
public MessageShareHandler(String shareFile, Configuration conf) {
try {
raf = new RandomAccessFile(shareFile , "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void addLocalMessage(M msg) {
localMessages.add((GraphJobMessage)msg) ;
}
@Override
public Boolean call() throws Exception {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();
if(flock==null) {
Thread.sleep(10) ;
LOG.info("MessageShareHandler sleep 10 ms") ;
} else {
mbb = fc.map(MapMode.READ_WRITE, 0, bufferSize); //因为写之前不知道需要映射多大共享内存,暂定30M
mbb.position(4) ; //预留一个int长度(4个字节)作为文件长度
int totalLength = 4 ; //最大2G
//模拟发送,最好像v0.6.4版本中,为每个任务的数据设置一个目录,所有发送到该任务的共享内存数据全部映射到这里!
Iterator<GraphJobMessage> it = localMessages.iterator();
while (it.hasNext()) {
GraphJobMessage e = it.next();
it.remove();
LOG.info("Msg : " + e.toString() + " is shared! ") ;
byte[] message = WritableUtils.serialize(e) ;
int msgLen = message.length ;
mbb.putInt(msgLen) ; //这样快还是写入流读取,比如写到流里, DataOutput output output.toByteArray()
mbb.put(message);
// System.out.println("Position : " + mbb.position());
totalLength = totalLength + msgLen +4 ;
}
mbb.putInt(0,mbb.position()) ; //补写长度
flock.release() ;
LOG.info("IsLoaded: " + mbb.isLoaded() + " Length: "+ totalLength +" Position: " + mbb.position());
}
close() ;
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 每轮同步结束时需要清空下
*/
public void close() {
// try {
mbb.clear() ;
// raf.close() ;
localMessages.clear() ;
// } catch (IOException e) {
// e.printStackTrace();
// }
}
}
package org.apache.hama.bsp.message;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.ReflectionUtils;
import org.apache.hama.util.WritableUtils;
public class MessageShareReader<M extends Writable> implements Callable<Boolean> {
public MappedByteBuffer mbb;
RandomAccessFile raf; // 共享内存对应文件
// ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ; //接收共享目录下的各任务消息
private Combiner<Writable> combiner ;
MessageManager mManager ; //消息接收
protected static final Log LOG = LogFactory
.getLog(MessageShareHandler.class);
public MessageShareReader(String shareFile, MessageManager messageManager ,Configuration conf) {
try {
// final String combinerName = "org.apache.hama.examples.PageRank$PagerankCombiner";
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
combiner = (Combiner<Writable>) ReflectionUtils
.newInstance(combinerName);
}
mManager = messageManager ;
raf = new RandomAccessFile(shareFile, "rw");
FileChannel fc = raf.getChannel();
mbb = fc.map(MapMode.READ_ONLY, 0, fc.size()); // 映射的共享内存
mbb.load() ; // 预加载进内存
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() {
try {
mbb.clear() ; //清空mbb
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public Boolean call() throws Exception {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();
while(flock==null) { //轮询等待读取消息
Thread.sleep(10) ;
}
LOG.info("IsLoaded: " + mbb.isLoaded() +" position:"+ mbb.position());
int fileLength = mbb.getInt() ;
// buffer = new byte[fileLength] ;
// mbb.get(buffer) ; //本地消息缓存, 是否一次性读出?
BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>() ;
while(mbb.position() < fileLength ) {
int msgLength = mbb.getInt() ;
if(msgLength>0) {
byte[] message = new byte[msgLength] ;
mbb.get(message) ;
GraphJobMessage gjm = new GraphJobMessage() ;
WritableUtils.deserialize(message, gjm) ;
bundle.addMessage(gjm); //先不考虑同目录下消息合并
}
}
fileLength = 0 ;
mbb.putInt(0, fileLength) ; //清空文件标志位
flock.release() ;
mManager.loopBackBundle(bundle) ;
close() ;
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
package org.apache.hama.bsp.message;
public class MesssagShareManager implements Runnable {
AbstractMessageManager messageManager ;
RpcSyncClient rpcc ;
public static boolean ifShare = false;
public MesssagShareManager(AbstractMessageManager aMessageManager, RpcSyncClient rpcc) {
messageManager = aMessageManager ;
this.rpcc = rpcc;
}
@Override
public void run() {
messageManager.transferLocalMessages();
if(ifShare==false) {
try {
rpcc.enterBarrier() ;
messageManager.startMemoryShare() ;
rpcc.leaveBarrier() ;
messageManager.startMemoryRead() ; //首轮超步需同步一次以避免丢失数据
ifShare = true ;
} catch (Exception e) {
e.printStackTrace();
}
} else {
messageManager.startMemoryShare() ;
messageManager.startMemoryRead() ;
}
}
}
2. 解决Hama中出现的java.lang.NullPointerException
at org.apache.hama.util.UnsafeByteArrayInputStream.<init>(UnsafeByteArrayInputStream.java:63)
at org.apache.hama.util.WritableUtils.unsafeDeserialize(WritableUtils.java:63)
at org.apache.hama.graph.MapVerticesInfo.get(MapVerticesInfo.java:101)
at org.apache.hama.graph.GraphJobRunner$ComputeRunnable.<init>(GraphJobRunner.java:376)
在增加顶点打印或者消息打印后,有时候这个问题反而没有了,猜测可能是由于其容器ConcurrenHashMap的并发读取问题,
所以换成另外一种捕获异常的方法,在捕获异常时打印对应的顶点ID或者消息ID.
3.当ERROR精度设定极小时(如10-6),值精度每次计算略有不同(小数点10位以后),不影响结果的正确性。
4. 启动新线程的代价往往是极小的,如:
new Thread(new LocalMessageTransfer(((AbstractMessageManager)messenger),getSuperstepCount())).start() ;
LOG.info("Start LocalMessageTransfer last: " + (System.currentTimeMillis() - start) + " ms") ;
16/09/12 22:05:27 INFO bsp.BSPPeerImpl: Start LocalMessageTransfer last: 0 ms
5. 关于IncomingVertexMessageManager中的消息容器
这里使用 ConcurrentHashMap 作为消息容器,没有加锁,但是由于 ConcurrentHashMap特点,只在同一个ID互斥操作时加锁,确实不会出现冲突。即使部分外部代码可能导致的没有加锁的乱序操作,也是无碍,是因为该操作满足结合律,谁先加后加,没有区别,不会出现顺序颠倒导致结果不对的情况,和银行账户操作不同。
分享到:
相关推荐
例如,通过Refined库,Hama Core可能对输入数据或计算结果定义了特定的类型约束,以确保数据的正确性,并减少由于类型不匹配导致的潜在问题。 【标签】"开源项目" 指出Hama Core是一个开放源代码的项目,这意味着它...
在实际应用中,Hama-0.6.0常用于解决图算法问题,如PageRank、单源最短路径和社区检测等。这些算法在社交网络分析、推荐系统、网络流量分析等领域有着广泛的应用。Hama的BSP实现使得这些算法能够在分布式环境中快速...
分布式模式的Hama安装笔记,内容如下: 1.参考“hadoop安装.txt”,完成hadoop的安装。节点信息如下: 192.168.1.160 hadoop-1 192.168.1.161 hadoop-2 192.168.1.162 hadoop-3 2.添加环境变量 在/etc/profile...
### 汉密顿焦虑量表(HAMA)详解 #### 一、概述 汉密顿焦虑量表(Hamilton Anxiety Scale,简称HAMA)是由M. Hamilton于1959年编制而成的精神医学评估工具,旨在量化评价个体的焦虑水平。作为一种广泛应用于临床实践...
5. 记忆或注意障碍:检查患者是否出现注意力难以集中和记忆力下降的问题。 6. 抑郁心境:考察患者是否伴有抑郁情绪,如失去兴趣和早醒等。 7. 肌肉系统症状:关注患者是否有肌肉不适或不灵活等。 8. 感觉系统症状:...
基于Hama并行计算框架的多层级作业调度算法的研究及实现 胡月胜
HAMA量表由一系列问题构成,这些问题覆盖了两个主要因子:躯体性焦虑和精神性焦虑。躯体性焦虑因子包含了肌肉紧张、感觉不适、心跳加速、呼吸困难、胃肠道问题、尿频尿急等躯体性症状。而精神性焦虑因子则涉及焦虑...
HAMA抑郁量表.pdf
【基于Hama并行计算框架的多层级作业调度算法研究及实现】 Hama是一个基于Bulk Synchronous Parallel (BSP)模型的分布式并行计算框架,主要用于大规模科学计算。Hama弥补了Hadoop平台的局限性,特别是在图计算领域...
汉密尔顿焦虑量表(HAMA)是一种用于评估焦虑症状严重程度的心理测量工具,广泛应用于临床心理学和精神科领域。该量表由14个条目组成,每个条目针对一个特定的焦虑症状,通过评分来判断患者的情况。下面我们将详细...
Hama图计算模型 Pi计算编译文件
2. HAMA焦虑量表:HAMA(Hamilton Anxiety Rating Scale)是评估焦虑程度的临床量表,通过一系列问题或观察,来确定患者焦虑症状的严重程度。在本研究中,使用HAMA量表来评估护理干预前后患者焦虑水平的变化。 3. ...
"汉密尔顿焦虑量表HAMA项打印版.pdf" 汉密尔顿焦虑量表(HAMA)是一种常用的评估工具,用于评估个体的焦虑水平。该量表由十四个项目组成,涵盖了焦虑的多个方面,包括情绪、认知、躯体性症状、生殖泌尿神经系统症状...
汉密尔顿焦虑量表HAMA(Hamilton Anxiety Rating Scale) 汉密尔顿焦虑量表HAMA是一种常用的评估工具,用于评估焦虑症状的严重性和变化。该量表由 Max Hamilton 在1959年开发,包含14个项目,评定员通过对被评定者...
批量同步并行(BSP)模型将绘图算法分为多个超步,在分布式图形处理系统中已变得... 通过在内部群集上运行一组图算法,我们的评估表明,我们的系统可以最大程度地消除通信延迟的情况下,可以达到Hama的平均2倍加速。
将Spark和Hama两种分布式并行计算模型整合到Hadoop生态系统中,分别用于支持内存计算和整体同步并行计算。将Hadoop集群构建成master-slave对等结构,解决全局同步和局部同步问题。最后,在Hadoop集群上,利用有限...
哈马珠,又称Perler Beads或Hama Beads,是一种流行的创意手工活动材料,尤其是对于儿童和手工爱好者。它们是一系列彩色的小珠子,通过在铁板上排列出特定图案,然后用熨斗热熔成形,可以创造出各种精美的像素艺术...