`

[Hadoop] 分布式Join : Replicated Join

 
阅读更多

上一篇文章说的ReduceSide Join的一个缺点就是,在map方法之中,只对数据加了tag、提取了groupkey,没有做任何的数据过滤,这样在map-reduce之中的shuffle过程会造成大量的 磁盘IO使得效率降低。

 

这次使用的是Replicated Join,完成的任务跟上次一样.

它有一个前提:需要关联在一起的两个文件,其中一个文件比较小,至少能放到内存之中。

 

其中一个关键的地方就是,在运行job之前,先将本地的小文件(简称为smallIn)上传到Hadoop集群的每一个服务器之中。在每个集群之中,大文件的split跟smallIn进行jion的操作。这样效率会比较高。

上传的代码:

 

Path smallIn = new Path("...");
DistributedCache.addCacheFile(smallIn.toUri(), conf);

 除了这种方式,可以在命令行调用的时候自动上传:

 

bin/hadoop jar -files smallIn DataJoinDC.jar big_in.txt output

 

我们对比两个文件的原始大小:

u.user: 23KB

u.data: 1933KB

所以,显然我们应该选择u.user作为DistributedCache文件。

 

在Map类之中,如果使用新的API,则在setup之中进行,如果是old api 则在configure()方法之中进行分解动作。

获取Cache之中的文件的代码如下:

URI[] cachesFiles = DistributedCache.getCacheFiles(conf);

 这跟<Hadoop in Action> 上的代码不太一样,书上是调用 

Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf);

 一般这是在Local模式下进行的。

 

 

Note:

以上一部分是猜测!因为这个程序在我的环境Eclipse + Ubunut (Pseodu-Distributed) 模式没有成功,无法正确的获取到Cache文件!

 

最后,贴上我的代码做个纪念,暂时保留这个问题:

 

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Main {

	private static final String U_DATA_SEPARATOR = "\t";
	private static final String U_USER_SEPARATOR = "[|]";
	
	public static class Map extends Mapper<Text, Text, Text, Text> {

		private Hashtable<String, String> table = new Hashtable<String, String>();
		
		@Override
		protected void map(Text key, Text value,
				Mapper<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String dataInfo = value.toString();
			if(dataInfo.trim().length() == 0) return;
			if(table.containsKey(key.toString())) {
				String userInfo = table.get(key.toString());
				String[] userTokens = userInfo.split(U_USER_SEPARATOR);
				String userData = "age=" + userTokens[0];
				
				String[] dataTokens = dataInfo.split(U_DATA_SEPARATOR);
				String ratingData = "ratings=" + dataTokens[1];
				
				context.write(key, new Text(userData + "|" + ratingData));
			}
		}

		@Override
		protected void setup(Mapper<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			try {
				Configuration conf = context.getConfiguration();
//				Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf);
				URI[] cachesFiles = DistributedCache.getCacheFiles(conf);
				
				if(cachesFiles != null && cachesFiles.length > 0) {
					Iterator<String> it = FileUtils.lineIterator(new File(cachesFiles[0].toString()));
					while(it.hasNext()) {
						String line = it.next();
						System.out.println("~~~~~~~ line=" + line);
						if(line.trim().length() == 0) continue;
						String[] tokens = line.split(U_USER_SEPARATOR, 2);
						table.put(tokens[0], tokens[1]);
					}
				} else {
					System.out.println("!!!!!!!!!!!!!!!! empty cache files");
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			
		}
		
		
		
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "word count");
		job.setJobName("Replicated Join");
		job.setJarByClass(Main.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(KeyValueTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
//		job.setOutputKeyClass(Text.class);
//		job.setOutputValueClass(TextOutputFormat.class);

		FileSystem fs = FileSystem.get(conf);
		
//		Path localSmallIn = new Path("/home/hadoop/DataSet/movielens/u.user");
		Path hdfsSmallIn = new Path("/data/u.user");
//		fs.copyFromLocalFile(true, localSmallIn , hdfsSmallIn);
//		DistributedCache.addCacheFile(
//				hdfsSmallIn.toUri(), conf);
		
		Path bigIn = new Path("/home/hadoop/DataSet/movielens/u.data");
		Path out = new Path("/home/hadoop/DataSet/movielens-Replicated-output");
		
//		Path bigIn = new Path("/data/u.data");
//		Path out = new Path("/data/movielens-Replicated-output");
		
		if(fs.exists(out)) {
			System.out.println("输出目录已经存在,将其删除~");
			fs.delete(out, true);
		}
		FileInputFormat.setInputPaths(job, bigIn);
		FileOutputFormat.setOutputPath(job, out);
		conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", U_DATA_SEPARATOR);
//		job.set("key.value.separator.in.input.line", U_DATA_SEPARATOR);		// for u.data
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}
}

 

 

分享到:
评论

相关推荐

    Hadoop分布式系统:系统设计与架构

    ### Hadoop分布式系统:系统设计与架构 #### Hadoop简介 Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它最初由Apache Software Foundation开发,并已成为大数据处理领域的核心工具之一。Hadoop...

    深入解析Hadoop分布式存储:架构与实现

    Hadoop是一个开源的分布式计算平台,以其高可靠性、高扩展性和高效性处理大规模数据集而闻名。Hadoop的核心组件HDFS(Hadoop Distributed File System)是实现数据分布式存储的关键。本文将详细探讨Hadoop如何通过...

    Hadoop分布式文件系统:架构和设计.doc

    Hadoop分布式文件系统架构和设计 Hadoop分布式文件系统是Hadoop生态系统的核心组件之一,负责存储和管理大规模数据集。下面将对Hadoop分布式文件系统的架构和设计进行详细介绍。 一、前提和设计目标 Hadoop分布式...

    基于Hadoop分布式爬虫设计综述.docx

    3. Hadoop分布式爬虫设计:Hadoop分布式爬虫设计是基于Hadoop分布式文件系统HDFS及其分布式计算框架MapReduce的基础上开发的分布式搜索引擎的爬虫设计相关技术、原理、流程图。 二、分布式爬虫技术原理 1. ...

    DFS命令行工具操作Hadoop分布式集群初体验

    1. DFS命令行工具操作Hadoop分布式集群:DFS代表的是分布式文件系统,它是Hadoop用来存储大规模数据集的关键组件。DFS命令行工具是操作Hadoop集群文件系统的接口,允许用户通过命令行与HDFS进行交互,实现数据的上传...

    Hadoop分布式文件系统——翻译

    ### Hadoop分布式文件系统(HDFS):关键技术与实践 #### 摘要 Hadoop分布式文件系统(HDFS)是Hadoop项目的核心组件之一,旨在为大规模数据集提供高效可靠的存储解决方案。HDFS的设计原则强调了数据的分布式存储与...

    Hadoop分布式文件系统:结构与设计

    Hadoop 分布式文件系统 (HDFS)是一个设计为用在普通硬件设备上的分布式文件系统。它与现有的分布式文件系统有很多近似的地方,但又和这些文件系统有很明显的不同。HDFS是高容错的,设计为部署在廉价硬件上的。HDFS对...

    Hadoop分布式云盘系统

    基于SpringMVC+Spring+HBase+Maven搭建的Hadoop分布式云盘系统。使用Hadoop HDFS作为文件存储系统、HBase作为数据存储仓库,采用SpringMVC+Spring框架实现,包括用户注册与登录、我的网盘、关注用户、我的分享、我...

    hadoop 分布式缓存源码

    Hadoop分布式缓存是Hadoop生态系统中的一个重要组成部分,它允许应用程序在执行MapReduce任务时共享和重用数据,从而提高整体性能。这份源码提供了深入理解Hadoop如何管理和利用分布式缓存的机会,对于想要优化...

    hadoop分布式文件系统搭建

    ### hadoop分布式文件系统搭建 #### 一、配置hadoop分布式文件系统环境搭建 ##### 1. 准备 在开始搭建Hadoop分布式文件系统之前,首先需要确保环境准备妥当。具体步骤包括: - **检查端口占用情况**:通过`...

    高可用性的HDFS-Hadoop分布式文件系统深度实践.part1.rar

    《高可用性的HDFS——Hadoop分布式文件系统深度实践》专注于Hadoop分布式文件系统(hdfs)的主流ha解决方案,内容包括:hdfs元数据解析、hadoop元数据备份方案、hadoop backup node方案、avatarnode解决方案以及最新...

    Hadoop分布式集群配置指南

    Hadoop分布式集群配置指南 Hadoop分布式集群配置是大数据处理的关键步骤之一,本指南将指导读者成功配置一个由5台计算机构成的Hadoop集群,并成功运行wordcount处理大型数据(大于50G)。 一、Hadoop集群架构简介 ...

    Hadoop分布式搭建笔记

    Hadoop分布式搭建环境: 系统:centos 6.5 64位 软件:Hadoop 2.2.0 64位 jdk 1.7 64位 用户: hadoop 运行环境:虚拟机vm 10 64位

    大数据之hadoop分布式集群初次启动 (2).docx

    大数据之 Hadoop 分布式集群初次启动 本文将指导读者如何从头开始启动 Hadoop 分布式集群,并对相关的知识点进行详细的解释。 1. SSH 免密登录 在 Hadoop 分布式集群中,需要配置集群中各个节点间的 SSH 免密登录...

    Hadoop分布式配置文件hdfs-site.xml

    Hadoop分布式配置文件hdfs-site.xml,用于在搭建Hadoop分布式集群时,设置集群规划所用,集群中虚拟机都需要修改该配置文件,除此之外,还需要修改其他配置文件,包括core-site.xml、mapred-site.xml和yarn-site.xml...

    Hadoop分布式配置文件mapred-site.xml

    Hadoop分布式配置文件mapred-site.xml,用于在搭建Hadoop分布式集群时,设置集群规划所用,集群中虚拟机都需要修改该配置文件,除此之外,还需要修改其他配置文件,包括core-site.xml、hdfs-site.xml和yarn-site.xml...

    第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf

    第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件...

    基于Hadoop分布式交通大数据存储分析平台设计.pdf

    本文将讨论如何基于Hadoop分布式存储与分析平台解决这一问题。 Hadoop是一个开源的分布式存储和处理大数据的框架,它能有效地存储和处理PB级别的数据。Hadoop的核心是HDFS(Hadoop Distributed File System),它...

    hadoop 分布式集群搭建

    Hadoop分布式集群搭建的知识点包括以下几个主要方面: 1. 环境准备与组件安装: - 首先,需要准备一个网络中各个节点之间能够通信的环境,确保集群中的每台计算机都能够通过SSH无密码登录,这对于集群中的各个服务...

Global site tag (gtag.js) - Google Analytics