`
cloudeagle_bupt
  • 浏览: 579094 次
文章分类
社区版块
存档分类
最新评论

Hama共享内存通信问题

 
阅读更多

1. Java的共享内存通信


总体性能未见提高,且由于避免了sync()中的通信,使得数据传输时间较短,导致如下问题:

16/09/12 18:16:27 INFO graph.GraphJobRunner: Start process msg: 1473675387360
16/09/12 18:16:27 INFO message.MesssagShareManager: messageManager memoryRead finish: 1473675387864

可见当读完数据时,循环已经开始读取并计算。因此反而会影响真正的计算过程,虽然一轮超步中的sync时间缩短了,但是looping计算的时间却增加了,所以总体性能未见提高。杯具!

附上解决代码:

package org.apache.hama.bsp.message;

import io.netty.util.internal.ConcurrentSet;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.WritableUtils;

public class MessageShareHandler<M extends Writable> implements Callable<Boolean>{
	public MappedByteBuffer mbb;
	public static String  MEMORY_SHARE_FILE = "/opt/hama-0.7.1/share/" ;
	RandomAccessFile raf; // 共享内存对应文件
	public static int bufferSize = 30 * 1024 * 1024;  //内存
	ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ;  //消息容器,消息进来时已经经过combine,因此不会重复Id
	
	protected static final Log LOG = LogFactory
		      .getLog(MessageShareHandler.class);
	
	public MessageShareHandler(String shareFile, Configuration conf) {
		try {
			raf = new RandomAccessFile(shareFile , "rw");
		} catch (IOException e) {
			e.printStackTrace();
		} 
	}
	
	public void addLocalMessage(M msg) {
		localMessages.add((GraphJobMessage)msg) ;
	}
	
	@Override
	public Boolean call() throws Exception {
		try {
			FileChannel fc = raf.getChannel();
			FileLock flock = fc.tryLock();
			
			if(flock==null) {
				Thread.sleep(10) ;
				LOG.info("MessageShareHandler sleep 10 ms") ;
			} else {
				mbb = fc.map(MapMode.READ_WRITE, 0, bufferSize);  //因为写之前不知道需要映射多大共享内存,暂定30M
				mbb.position(4) ; //预留一个int长度(4个字节)作为文件长度
				int totalLength  = 4 ; //最大2G
				
				//模拟发送,最好像v0.6.4版本中,为每个任务的数据设置一个目录,所有发送到该任务的共享内存数据全部映射到这里!
			    Iterator<GraphJobMessage> it = localMessages.iterator();
		        while (it.hasNext()) {
		          GraphJobMessage e = it.next();
		          it.remove();
		          LOG.info("Msg : " + e.toString() + " is shared! ") ;
		          byte[] message = WritableUtils.serialize(e) ;
		          int msgLen = message.length ;
		          mbb.putInt(msgLen) ;                   //这样快还是写入流读取,比如写到流里, DataOutput output output.toByteArray()
		          mbb.put(message);
//		          System.out.println("Position : " + mbb.position());
		          totalLength = totalLength + msgLen +4 ;
		        }
		        mbb.putInt(0,mbb.position()) ; //补写长度
		        flock.release() ;
				LOG.info("IsLoaded: " + mbb.isLoaded() + " Length: "+ totalLength +" Position: " + mbb.position());
			}
			close() ;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return true;
	}

  /**
   * 每轮同步结束时需要清空下	
   */
	public void close() {
//		try {
			mbb.clear() ;
//			raf.close() ;
			localMessages.clear() ;
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
	}
}


package org.apache.hama.bsp.message;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.ReflectionUtils;
import org.apache.hama.util.WritableUtils;

public class MessageShareReader<M extends Writable> implements Callable<Boolean> {
	public MappedByteBuffer mbb;
	RandomAccessFile raf; // 共享内存对应文件
//	ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ;  //接收共享目录下的各任务消息
	private Combiner<Writable> combiner ; 
	MessageManager mManager ; //消息接收 

	protected static final Log LOG = LogFactory
		      .getLog(MessageShareHandler.class);
	
	public MessageShareReader(String shareFile,  MessageManager messageManager ,Configuration conf) {
		try {
//		    final String combinerName = "org.apache.hama.examples.PageRank$PagerankCombiner"; 
		    final String combinerName = conf.get(Constants.COMBINER_CLASS);
		    if (combinerName != null) {
				combiner = (Combiner<Writable>) ReflectionUtils
			            .newInstance(combinerName);
		    }
		    
			mManager = messageManager ;
			raf = new RandomAccessFile(shareFile, "rw");
			FileChannel fc = raf.getChannel();
			mbb = fc.map(MapMode.READ_ONLY, 0, fc.size()); // 映射的共享内存
			mbb.load() ; // 预加载进内存
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
	
	public void close() {
		try {
			mbb.clear() ; //清空mbb
			raf.close() ;
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public Boolean call() throws Exception {
		try {
			FileChannel fc = raf.getChannel();
			FileLock flock = fc.tryLock();
			while(flock==null) {           //轮询等待读取消息
				Thread.sleep(10) ;
			} 
			LOG.info("IsLoaded: " + mbb.isLoaded() +" position:"+ mbb.position());
			int fileLength = mbb.getInt() ;
//			buffer = new byte[fileLength] ;
//			mbb.get(buffer) ; //本地消息缓存, 是否一次性读出?
			
			BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>() ;
			while(mbb.position() < fileLength ) {
				int msgLength = mbb.getInt() ;
				if(msgLength>0) {
					byte[] message = new byte[msgLength] ;
					mbb.get(message) ;
					GraphJobMessage gjm = new GraphJobMessage() ;
					WritableUtils.deserialize(message, gjm) ;
					bundle.addMessage(gjm); //先不考虑同目录下消息合并
				}
			}
			fileLength = 0 ;
			mbb.putInt(0, fileLength) ; //清空文件标志位
			flock.release() ;
			mManager.loopBackBundle(bundle) ;
			close() ;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return true;
	}
}

package org.apache.hama.bsp.message;

public class MesssagShareManager implements Runnable {
	AbstractMessageManager messageManager ;
	RpcSyncClient rpcc ;
	public static boolean ifShare = false;
	
	public MesssagShareManager(AbstractMessageManager aMessageManager, RpcSyncClient rpcc) {
		messageManager = aMessageManager ;
		this.rpcc = rpcc; 
	}
	
	@Override
	public void run() {
		messageManager.transferLocalMessages();
		
		if(ifShare==false) {
			try {
				rpcc.enterBarrier() ;
				messageManager.startMemoryShare() ;
				rpcc.leaveBarrier() ;
				messageManager.startMemoryRead() ;  //首轮超步需同步一次以避免丢失数据
				ifShare = true ;
			} catch (Exception e) {
				e.printStackTrace();
			}
		} else {
			messageManager.startMemoryShare() ;
			messageManager.startMemoryRead() ;
		}
	}
}








2. 解决Hama中出现的java.lang.NullPointerException
at org.apache.hama.util.UnsafeByteArrayInputStream.<init>(UnsafeByteArrayInputStream.java:63)
at org.apache.hama.util.WritableUtils.unsafeDeserialize(WritableUtils.java:63)
at org.apache.hama.graph.MapVerticesInfo.get(MapVerticesInfo.java:101)
at org.apache.hama.graph.GraphJobRunner$ComputeRunnable.<init>(GraphJobRunner.java:376)


在增加顶点打印或者消息打印后,有时候这个问题反而没有了,猜测可能是由于其容器ConcurrenHashMap的并发读取问题,
所以换成另外一种捕获异常的方法,在捕获异常时打印对应的顶点ID或者消息ID.


3.当ERROR精度设定极小时(如10-6),值精度每次计算略有不同(小数点10位以后),不影响结果的正确性。


4. 启动新线程的代价往往是极小的,如:

	    new Thread(new LocalMessageTransfer(((AbstractMessageManager)messenger),getSuperstepCount())).start() ; 
	    LOG.info("Start LocalMessageTransfer last: " + (System.currentTimeMillis() - start) + " ms") ;


16/09/12 22:05:27 INFO bsp.BSPPeerImpl: Start LocalMessageTransfer last: 0 ms


5. 关于IncomingVertexMessageManager中的消息容器

这里使用 ConcurrentHashMap 作为消息容器,没有加锁,但是由于 ConcurrentHashMap特点,只在同一个ID互斥操作时加锁,确实不会出现冲突。即使部分外部代码可能导致的没有加锁的乱序操作,也是无碍,是因为该操作满足结合律,谁先加后加,没有区别,不会出现顺序颠倒导致结果不对的情况,和银行账户操作不同。

分享到:
评论

相关推荐

    hama-core-0.7.1.zip

    例如,通过Refined库,Hama Core可能对输入数据或计算结果定义了特定的类型约束,以确保数据的正确性,并减少由于类型不匹配导致的潜在问题。 【标签】"开源项目" 指出Hama Core是一个开放源代码的项目,这意味着它...

    Hama-0.6.0

    在实际应用中,Hama-0.6.0常用于解决图算法问题,如PageRank、单源最短路径和社区检测等。这些算法在社交网络分析、推荐系统、网络流量分析等领域有着广泛的应用。Hama的BSP实现使得这些算法能够在分布式环境中快速...

    Hama 安装笔记

    分布式模式的Hama安装笔记,内容如下: 1.参考“hadoop安装.txt”,完成hadoop的安装。节点信息如下: 192.168.1.160 hadoop-1 192.168.1.161 hadoop-2 192.168.1.162 hadoop-3 2.添加环境变量 在/etc/profile...

    汉密顿焦虑量表(HAMA)(Hamilton Anxiety Scale.doc

    ### 汉密顿焦虑量表(HAMA)详解 #### 一、概述 汉密顿焦虑量表(Hamilton Anxiety Scale,简称HAMA)是由M. Hamilton于1959年编制而成的精神医学评估工具,旨在量化评价个体的焦虑水平。作为一种广泛应用于临床实践...

    汉密顿焦虑量表(HAMA).doc

    5. 记忆或注意障碍:检查患者是否出现注意力难以集中和记忆力下降的问题。 6. 抑郁心境:考察患者是否伴有抑郁情绪,如失去兴趣和早醒等。 7. 肌肉系统症状:关注患者是否有肌肉不适或不灵活等。 8. 感觉系统症状:...

    基于Hama并行计算框架的多层级作业调度算法的研究及实现

    基于Hama并行计算框架的多层级作业调度算法的研究及实现 胡月胜

    2021年HAMA焦虑量表.docx

    HAMA量表由一系列问题构成,这些问题覆盖了两个主要因子:躯体性焦虑和精神性焦虑。躯体性焦虑因子包含了肌肉紧张、感觉不适、心跳加速、呼吸困难、胃肠道问题、尿频尿急等躯体性症状。而精神性焦虑因子则涉及焦虑...

    HAMA抑郁量表.pdf

    HAMA抑郁量表.pdf

    基于Hama并行计算框架的多层级作业调度算法的研究及实现.pdf

    【基于Hama并行计算框架的多层级作业调度算法研究及实现】 Hama是一个基于Bulk Synchronous Parallel (BSP)模型的分布式并行计算框架,主要用于大规模科学计算。Hama弥补了Hadoop平台的局限性,特别是在图计算领域...

    HAMA焦虑量表.doc

    汉密尔顿焦虑量表(HAMA)是一种用于评估焦虑症状严重程度的心理测量工具,广泛应用于临床心理学和精神科领域。该量表由14个条目组成,每个条目针对一个特定的焦虑症状,通过评分来判断患者的情况。下面我们将详细...

    Hama图计算模型 Pi计算编译文件

    Hama图计算模型 Pi计算编译文件

    综合护理干预对无肝素血液透析患者HAMA凝血程度及不良反应率的影响分析

    2. HAMA焦虑量表:HAMA(Hamilton Anxiety Rating Scale)是评估焦虑程度的临床量表,通过一系列问题或观察,来确定患者焦虑症状的严重程度。在本研究中,使用HAMA量表来评估护理干预前后患者焦虑水平的变化。 3. ...

    汉密尔顿焦虑量表HAMA项打印版.pdf

    "汉密尔顿焦虑量表HAMA项打印版.pdf" 汉密尔顿焦虑量表(HAMA)是一种常用的评估工具,用于评估个体的焦虑水平。该量表由十四个项目组成,涵盖了焦虑的多个方面,包括情绪、认知、躯体性症状、生殖泌尿神经系统症状...

    汉密尔顿焦虑量表HAMA(14项打印版)-2页.pdf

    汉密尔顿焦虑量表HAMA(Hamilton Anxiety Rating Scale) 汉密尔顿焦虑量表HAMA是一种常用的评估工具,用于评估焦虑症状的严重性和变化。该量表由 Max Hamilton 在1959年开发,包含14个项目,评定员通过对被评定者...

    通过顶点分类在基于BSP的图形处理中实现零通信延迟

    批量同步并行(BSP)模型将绘图算法分为多个超步,在分布式图形处理系统中已变得... 通过在内部群集上运行一组图算法,我们的评估表明,我们的系统可以最大程度地消除通信延迟的情况下,可以达到Hama的平均2倍加速。

    论文研究-基于Hadoop分布式计算平台的磁流体动力学模型仿真研究.pdf

    将Spark和Hama两种分布式并行计算模型整合到Hadoop生态系统中,分别用于支持内存计算和整体同步并行计算。将Hadoop集群构建成master-slave对等结构,解决全局同步和局部同步问题。最后,在Hadoop集群上,利用有限...

    hamabeads:Hama Beads 编辑器

    哈马珠,又称Perler Beads或Hama Beads,是一种流行的创意手工活动材料,尤其是对于儿童和手工爱好者。它们是一系列彩色的小珠子,通过在铁板上排列出特定图案,然后用熨斗热熔成形,可以创造出各种精美的像素艺术...

Global site tag (gtag.js) - Google Analytics