写入
package shareMemory;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.channels.FileLock;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.ReflectionUtils;
import org.apache.hama.util.WritableUtils;
/**
* 模拟将同节点内部消息写在本地共享内存中以加速通信
*
* @author qiangliu
*
*/
public class ShareMemory {
public MappedByteBuffer mbb;
public static int bufferSize = 50 * 1024 * 1024; //内存
RandomAccessFile raf; // 共享内存对应文件
// public static String shareFile = "/opt/hama-0.7.1/shareMemory/shareMemory.txt";
public static String shareFile = "F:\\test\\shareMemory\\sharememory.txt";
ConcurrentHashMap<IntWritable, VertexMessage> vertexMessages = new ConcurrentHashMap<IntWritable, VertexMessage>(); //消息容器
private Combiner<Writable> combiner;
ShareMemory() {
try {
raf = new RandomAccessFile(shareFile, "rw");
final String combinerName = "org.apache.hama.examples.PageRank$PagerankCombiner";
// Class com = Class.forName(combinerName) ;
combiner = (Combiner<Writable>) ReflectionUtils
.newInstance(combinerName);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* mock 10000 Pagerank消息,消息在写入共享内存时,发送消息时先保存在链表中,写入时用WritableUtils序列化后按字节写入
*/
public void generateMessages() {
//method 1 : 使用WritableUtils,后面建议改成在最后写入时再序列化,前面直接combine! 但是会存在当到同一个id的值很多且不能combine时就不好处理了!
for(int j = 1; j<3 ; j++) { //模拟3个super-step
for(int i = 0 ;i<1000; i++) {
DoubleWritable value = new DoubleWritable(i/j) ;
VertexMessage vMessage = new VertexMessage(new IntWritable(i), value) ;
if (combiner != null && vertexMessages.get(vMessage.getVertexId())!=null ) {
DoubleWritable combined = (DoubleWritable) combiner.combine(getIterableMessages(value)) ;
vMessage.setVertexVertexValue(combined) ; //更改合并后的值
vertexMessages.put(vMessage.getVertexId(), vMessage);
}
vertexMessages.put(vMessage.getVertexId(), vMessage) ;
}
}
}
public void close() {
try {
mbb.clear() ;
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ShareMemory shm = new ShareMemory() ;
long startTime = System.currentTimeMillis() ;
shm.generateMessages() ;
System.out.println("Generate Messages last :" + (System.currentTimeMillis() - startTime));
startTime = System.currentTimeMillis() ;
ExecutorService pool = Executors.newCachedThreadPool() ;
CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(pool);
int destSize = 0 ;
destSize++ ;
exchangeResult.submit(shm.new MesssageShareSender()) ;
int count = 0 ;
while(count < destSize) {
Future<Boolean> f = exchangeResult.poll();
if(f == null) continue ;
count++;
}
System.out.println("Send Messages last :" + (System.currentTimeMillis() - startTime));
pool.shutdown() ;
shm.close() ;
}
public static Iterable<Writable> getIterableMessages(final Writable vertexValue) {
return new Iterable<Writable>() {
Writable value ;
@Override
public Iterator<Writable> iterator() {
value = vertexValue ;
return new Iterator<Writable>() {
int index = 1 ;
@Override
public boolean hasNext() {
return (index == 1) ? true : false;
}
@Override
public Writable next() {
index--;
return vertexValue;
}
@Override
public void remove() {
}
};
}
};
}
/**
* 同节点消息产生后,多线程发送给共享内存.两个问题: 1.为什么要用GraphJobMessage? 2. 直接写入快还是用流快?
* @author Administrator
*/
class MesssageShareSender implements Callable<Boolean>{
@Override
public Boolean call() throws Exception {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();
if(flock==null) {
Thread.sleep(10) ;
} else {
mbb = fc.map(MapMode.READ_WRITE, 0, ShareMemory.bufferSize); //因为写之前不知道需要映射多大共享内存,暂定50M
mbb.position(4) ; //预留一个int长度(4个字节)作为文件长度
int totalLength = 4 ; //最大2G
//模拟发送,最好像v0.6.4版本中,为每个任务的数据设置一个目录,所有发送到该任务的共享内存数据全部映射到这里!
Iterator<Entry<IntWritable, VertexMessage>> it = vertexMessages.entrySet()
.iterator();
while (it.hasNext()) {
Entry<IntWritable, VertexMessage> e = it.next();
it.remove();
byte[] serialized = WritableUtils.serialize(e.getValue().getVertexVertexValue()) ;
GraphJobMessage gjm = new GraphJobMessage(e.getValue().getVertexId(), serialized) ;
gjm.setVertexId(e.getKey());
gjm.setFlag(GraphJobMessage.VERTEX_FLAG);
byte[] message = WritableUtils.serialize(gjm) ;
int msgLen = message.length ;
mbb.putInt(msgLen) ; //这样快还是写入流读取,比如写到流里, DataOutput output output.toByteArray()
// System.out.println("Position : " + mbb.position());
mbb.put(message);
// System.out.println("Position : " + mbb.position());
totalLength = totalLength + msgLen +4 ;
}
mbb.putInt(0,mbb.position()) ; //补写长度
// System.out.println(" IsLoaded: " + mbb.isLoaded() + " Length: "+ totalLength +" Position: " + mbb.position());
}
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
}
读取
package shareMemory;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.GraphJobRunner;
import org.apache.hama.util.WritableUtils;
public class ReadingProcess {
byte[] buffer ; //缓存
public MappedByteBuffer mbb;
RandomAccessFile raf; // 共享内存对应文件
ConcurrentHashMap<IntWritable, VertexMessage> vertexMessages = new ConcurrentHashMap<IntWritable, VertexMessage>(); // 消息容器
public ReadingProcess() {
try {
raf = new RandomAccessFile(ShareMemory.shareFile, "rw");
FileChannel fc = raf.getChannel();
mbb = fc.map(MapMode.READ_ONLY, 0, fc.size()); // 映射的共享内存
mbb.load() ; // 预加载进内存
GraphJobRunner.VERTEX_ID_CLASS = IntWritable.class ;
GraphJobRunner.VERTEX_VALUE_CLASS = DoubleWritable.class ;
} catch (IOException e) {
e.printStackTrace();
}
}
public void close() {
try {
mbb.clear() ; //清空mbb
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
}
public void readData() {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();
while(flock==null) { //轮询等待读取消息
Thread.sleep(10) ;
}
System.out.println("IsLoaded: " + mbb.isLoaded() +" position:"+ mbb.position());
int fileLength = mbb.getInt() ;
// buffer = new byte[fileLength] ;
// mbb.get(buffer) ; //本地消息缓存, 是否一次性读出?
while(mbb.position() < fileLength ) {
int msgLength = mbb.getInt() ;
if(msgLength>0) {
// System.out.println("Position : " + mbb.position());
byte[] message = new byte[msgLength] ;
mbb.get(message) ;
// System.out.println("Position : " + mbb.position());
GraphJobMessage gjm = new GraphJobMessage() ;
WritableUtils.deserialize(message, gjm) ;
if (!vertexMessages.containsKey(gjm.getVertexId())) {
DoubleWritable vertexValue = new DoubleWritable() ;
WritableUtils.deserialize(gjm.getValuesBytes(), vertexValue) ;
IntWritable vertexId = (IntWritable) gjm.getVertexId() ;
vertexMessages.put(vertexId, new VertexMessage(vertexId, vertexValue)) ;
} else {
System.out.println("Combine Error!");
}
}
}
// System.out.println("test ");
close() ;
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ReadingProcess rp = new ReadingProcess();
rp.readData() ;
}
}
分享到:
相关推荐
标题“IPC.rar_IPC_java i_java ipc_java共享内存_共享内存”以及描述“IPC共享内存,文件映射编程,实现原理详解”都指向了一个核心主题:Java中的进程间通信(IPC)以及如何利用共享内存进行数据交换。在这个话题中...
在Java中,可以通过`java.nio.MappedByteBuffer`类来实现共享内存功能,这被称为内存映射文件(Memory-Mapped File,MMF)。 `MappedByteBuffer`是NIO中的一种特殊缓冲区,它将文件的一部分映射到内存中,使得文件...
在Windows系统中,内存映射文件可以是独立于物理文件的共享内存区域,通过命名来实现不同进程之间的数据共享。而在Java NIO中,MappedByteBuffer总是与特定的物理文件相关联,这意味着创建MappedByteBuffer时必须...
Java不直接支持共享内存,但可以通过JNI调用POSIX或System V的共享内存接口。 6. **内存映射(Memory Mapping)**: 内存映射文件使得进程可以共享同一文件的不同部分,Java的`java.nio.MappedByteBuffer`类提供了...
MappedByteBuffer out = new RandomAccessFile("test.dat", "rw").getChannel() .map(FileChannel.MapMode.READ_WRITE, 0, length); // 写入数据 for (int i = 0; i ; i++) { out.put((byte) 'x'); } ...
以下是一个简单的示例,展示了如何使用`MappedByteBuffer`创建共享内存: ```java RandomAccessFile raFile = new RandomAccessFile(filename, "rw"); FileChannel fc = raFile.getChannel(); int size = (int) fc....
最后,如果多个进程共享同一个内存文件系统,需要考虑并发控制和数据一致性问题。 总结来说,Java中的内存文件系统是一个高效、灵活的数据管理工具,适合需要高速读写和动态文件操作的场景。通过熟练掌握NIO相关API...
在Java中,内存映射文件是通过`java.nio.MappedByteBuffer`类来实现的,它允许程序将文件的一部分或全部映射到虚拟内存空间,从而使得多个进程可以直接共享数据,而无需传统的读写文件操作。 **内存映射文件的工作...
在Java中,线程可以通过共享内存区域来交换信息。如果两个或多个线程访问同一变量,就需要同步机制,如`synchronized`关键字或`java.util.concurrent`包中的工具类,以防止数据竞争。在JavaFX中,如果UI线程(也...
共享内存允许进程直接访问同一块内存区域,Java本身不提供直接支持,但可以通过JNI与操作系统交互来实现。 6. **消息队列**: 消息队列允许进程发送和接收消息,Java没有内置的消息队列支持,但可以通过JNI或者...
mmfinvoker通过内存映射文件的方式实现了共享内存的IPC,这种方式不需要额外的数据结构,只需操作内存即可,减少了数据复制和上下文切换的开销。 **请求-响应模式** 请求-响应模式是一种客户端-服务器架构中的典型...
多线程是指在一个应用程序中同时运行多个线程,每个线程都有自己的执行路径,它们可以并发执行,共享同一内存空间。在下载场景中,多线程技术可以理解为同时开启多个下载通道,每个通道代表一个线程,通过这种方式,...
在Java中,可以使用java.nio.MappedByteBuffer类来创建内存映射文件,提高读写效率。 2. "07_soa.rar" - 这个文件的扩展名是RAR,通常用于压缩和归档文件。"07_soa"可能代表第7部分的面向服务架构(SOA)相关材料,SOA...
使用`java.nio.MappedByteBuffer`,可以直接将文件映射到内存中,这样可以减少磁盘I/O,提高性能。但需要注意,映射的文件大小不应超过可用的物理内存,否则可能导致`OutOfMemoryError`。 在Android 6.0(API级别23...
`mmap`(Memory Mapped File)是Java的`MappedByteBuffer`的基础,它允许文件内容直接映射到内存中,用户空间和内核空间可以共享同一块内存。这样,文件到网络传输时,数据只需从内核缓冲区直接复制到Socket缓冲区...
Java中线程通信相对简单,因为线程之间共享进程内的内存空间。Java提供了volatile关键字、等待/通知机制、join方法、InheritableThreadLocal、MappedByteBuffer等多种线程通信机制。 #### Java并发机制 在Java中,...
例如,可以使用MappedByteBuffer实现文件的内存映射,从而提高文件读写的速度。 5. **多路复用器(Multiplexers)**:Java的Selector实现了一个多路复用器,它能够监视多个通道的事件,当某个通道准备就绪时,选择...
通过MappedByteBuffer,可以直接将文件映射到内存,提高文件读写的效率。 6. **网络I/O**:NIO为TCP和UDP提供了SocketChannel和DatagramChannel,用于处理网络连接。SocketChannel用于TCP连接,而DatagramChannel则...
14. **内存映射文件**:MappedByteBuffer类允许将文件直接映射到内存,提供高效的文件操作方式。 IBM-ETP的Java培训课程可能通过实例演示和练习来深化这些概念的理解,帮助学员掌握在实际项目中运用Java IO解决各种...