`
yehao0716
  • 浏览: 22717 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

使用MapReduce对数据文件进行切分

阅读更多

 

有一个格式化的数据文件,用\t分割列,第2列为产品名称。现在需求把数据文件根据产品名切分为多个文件,使用MapReduce程序要如何实现?

原始文件:

[root@localhost opt]# cat aprData

1       a1      a111

2       a2      a211

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

7       a2      a112

8       a2      a112

9       a2      a112

10      a3      a113

 

思路:

1.用一个mapreduce程序找出所有产品名称:

1.1map<k2,v2>为<产品名称,null>

1.2reduce<k3,v3>为<产品名称,null>

   实现:AprProduces类

[root@localhost opt]# hadoop jar apr-produces.jar /aprData /aprProduce-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 15:00:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 15:00:12 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 15:00:12 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 15:00:12 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 15:00:13 INFO mapred.JobClient: Running job: job_201605010048_0020

16/05/01 15:00:14 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 15:00:33 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 15:00:45 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 15:00:50 INFO mapred.JobClient: Job complete: job_201605010048_0020

16/05/01 15:00:50 INFO mapred.JobClient: Counters: 29

16/05/01 15:00:50 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 15:00:50 INFO mapred.JobClient:     Spilled Records=20

16/05/01 15:00:50 INFO mapred.JobClient:     Map output materialized bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3868389376

16/05/01 15:00:50 INFO mapred.JobClient:     Map input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 15:00:50 INFO mapred.JobClient:     Map output bytes=30

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce shuffle bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Physical memory (bytes) snapshot=240697344

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 15:00:50 INFO mapred.JobClient:     Combine output records=0

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce output records=3

16/05/01 15:00:50 INFO mapred.JobClient:     Map output records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Combine input records=0

16/05/01 15:00:50 INFO mapred.JobClient:     CPU time spent (ms)=1490

16/05/01 15:00:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=177016832

16/05/01 15:00:50 INFO mapred.JobClient:   File Input Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Read=101

16/05/01 15:00:50 INFO mapred.JobClient:   FileSystemCounters

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_READ=190

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43049

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_READ=56

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9

16/05/01 15:00:50 INFO mapred.JobClient:   Job Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     Launched reduce tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=11002

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13561

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:   File Output Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Written=9

[root@localhost opt]# hadoop fs -cat /aprProduce-output/part-r-00000

Warning: $HADOOP_HOME is deprecated.

 

a1

a2

a3

 

   

2.再用一个mapreduce程序对文件进行切分:

2.1map<k2,v2>为<产品名称,line>

2.2reduce<k3,v3>为<line,null>

2.3自定义分区partition,读取第一个mapreduce程序的输出文件,组装成一个map<产品名称,index>,在partition中判断产品名称并返回下标,没有找到放在0下标中。

2.4设置taskNum(reduce的个数),taskNum应该和partition的个数一致.

3.5使用MultipleOutPuts类进行重命名输出文件,输出文件为 xxx-00001 等

实现:AprClassify类

 

[root@localhost opt]# hadoop jar apr-classify.jar /aprData /apr-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 14:09:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 14:09:11 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 14:09:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 14:09:11 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 14:09:11 INFO mapred.JobClient: Running job: job_201605010048_0017

16/05/01 14:09:13 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 14:09:29 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 14:09:41 INFO mapred.JobClient:  map 100% reduce 33%

16/05/01 14:09:44 INFO mapred.JobClient:  map 100% reduce 66%

16/05/01 14:09:56 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 14:10:01 INFO mapred.JobClient: Job complete: job_201605010048_0017

16/05/01 14:10:01 INFO mapred.JobClient: Counters: 29

16/05/01 14:10:01 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 14:10:01 INFO mapred.JobClient:     Spilled Records=20

16/05/01 14:10:01 INFO mapred.JobClient:     Map output materialized bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=7754653696

16/05/01 14:10:01 INFO mapred.JobClient:     Map input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 14:10:01 INFO mapred.JobClient:     Map output bytes=131

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce shuffle bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Physical memory (bytes) snapshot=387825664

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 14:10:01 INFO mapred.JobClient:     Combine output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Map output records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Combine input records=0

16/05/01 14:10:01 INFO mapred.JobClient:     CPU time spent (ms)=3950

16/05/01 14:10:01 INFO mapred.JobClient:     Total committed heap usage (bytes)=209522688

16/05/01 14:10:01 INFO mapred.JobClient:   File Input Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Read=101

16/05/01 14:10:01 INFO mapred.JobClient:   FileSystemCounters

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_READ=199

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=86609

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_READ=169

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=104

16/05/01 14:10:01 INFO mapred.JobClient:   Job Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:     Launched reduce tasks=3

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=35295

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13681

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:   File Output Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Written=0

[root@localhost opt]# hadoop fs -ls /apr-output/

Warning: $HADOOP_HOME is deprecated.

 

Found 8 items

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/_SUCCESS

drwxr-xr-x   - root supergroup          0 2016-05-01 14:09 /apr-output/_logs

-rw-r--r--   1 root supergroup         51 2016-05-01 14:09 /apr-output/a1-r-00000

-rw-r--r--   1 root supergroup         41 2016-05-01 14:09 /apr-output/a2-r-00001

-rw-r--r--   1 root supergroup         12 2016-05-01 14:09 /apr-output/a3-r-00002

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00000

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00001

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00002

[root@localhost opt]# hadoop fs -cat /apr-output/a1-r-00000

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00000

Warning: $HADOOP_HOME is deprecated.

 

cat: File does not exist: /apr-output/a2-r-00000

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00001

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a3-r-00002

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

 

 

3.用hdfs对文件进行批量复制,重命名并转移产品数据文件到指定目录

实现:RenameApr类

 

[root@localhost opt]# hadoop fs -ls /aprProduces

Warning: $HADOOP_HOME is deprecated.

 

Found 3 items

-rw-r--r--   3 yehao supergroup         51 2016-05-01 14:37 /aprProduces/a1

-rw-r--r--   3 yehao supergroup         41 2016-05-01 14:37 /aprProduces/a2

-rw-r--r--   3 yehao supergroup         12 2016-05-01 14:37 /aprProduces/a3

[root@localhost opt]# hadoop fs -cat /aprProduces/a1

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a2

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a3

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

代码部分:

1.com.huawei.AprClassify

 

package com;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;


public class AprClassify {
	private static int taskNum = HdfsUtils.getMapSize();
	
	public static void main(String[] args)  throws Exception {
		Job job = new Job(new Configuration(), AprClassify.class.getSimpleName());
		job.setJarByClass(AprClassify.class);
		
		job.setMapperClass(AprClassifyMap.class);
		job.setReducerClass(AprClassifyReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setPartitionerClass(AprClassifyPartitioner.class);
		job.setNumReduceTasks(taskNum+1);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}


class AprClassifyReducer extends Reducer<Text, Text, Text, NullWritable>{
	private MultipleOutputs<Text, NullWritable> outputs; 
	
	protected void setup(Context context) throws IOException, InterruptedException {  
		outputs = new MultipleOutputs<Text, NullWritable>(context);  
	}
	
	@Override
	protected void reduce(Text k2, Iterable<Text> v2s,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		String st = "";
		for (Text text : v2s) {
			st += text.toString() +"\n";
		}

		Text k3 = new Text(st);
		outputs.write(k3, NullWritable.get(), k2.toString());
	}
	
	protected void cleanup(Context context) throws IOException,  
	    InterruptedException {  
		outputs.close();  
	}
}

class AprClassifyMap extends Mapper<LongWritable, Text, Text, Text>{
	Text k2 = new Text();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] splited = line.split("\t");
		k2.set(splited[1]);
		System.out.println(context);
		System.out.println(k2);
		System.out.println(value);
		context.write(k2, value);
	}
}

class AprClassifyPartitioner extends Partitioner<Text, Text> {

	private static Map<String, Integer> map = HdfsUtils.getMap();
	@Override
	public int getPartition(Text key, Text value, int numPartitions) {
		if(map.get(key.toString()) == null){
			return 0;
		}
		return map.get(key.toString());
	}
}

 

 

2.com.huawei.HdfsUtils

package com.huawei;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsUtils {
	
	private static FileSystem fileSystem;
	private static Map<String, Integer> map;
	
	private static FileSystem getFileSystem() throws URISyntaxException, IOException {
		if(fileSystem == null){
			Configuration conf = new Configuration();
			URI uri = new URI("hdfs://192.168.1.190:9000");
			fileSystem = FileSystem.get(uri, conf);
		}
		
		return fileSystem;
	}
	
	public static int getMapSize(){
		return getMap().size();
	}
	
	public static Map<String, Integer> getMap(){
		if(map == null){
			map = new HashMap<String, Integer>();
			FSDataInputStream in;
			BufferedReader reader = null;
			try{
				fileSystem = getFileSystem();
				in = fileSystem.open(new Path("hdfs://192.168.1.190:9000/aprProduce"));
				reader = new BufferedReader(new InputStreamReader(in));
				String line = null;
				int i = 1;
				while((line = reader.readLine()) != null) {
					map.put(line, i++);
				}
			}catch(Exception e){
				 e.printStackTrace();
			}finally{
				try {
				      if(reader != null) reader.close();
				 } catch (IOException e) {
				      e.printStackTrace();
				 }
			}
		}
		
		return map;
	}
	
	public static void copyProduces(String inputPath, String outPutDir)  throws Exception{
		FileStatus[] listStatus = getFileSystem().listStatus(new Path(inputPath));
		for (FileStatus fileStatus : listStatus) {
			String name = fileStatus.getPath().getName();
			if(!fileStatus.isDir() && !StringUtils.equals(name, "_SUCCESS") && !StringUtils.startsWith(name, "part-r-")){
				FSDataInputStream openStream = fileSystem.open(fileStatus.getPath());
				IOUtils.copyBytes(openStream, fileSystem.create(new Path("/"+outPutDir+"/"+name.split("-")[0])), 1024, false);
				IOUtils.closeStream(openStream);
			}
		}
	}
}

 

3.com.huawei.AprProduces

package com.huawei;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 分析文件,获得所有产品名
 * args[0] 原始文件
 * args[1] 输出文件:所有产品名
 *
 */
public class AprProduces {

	public static void main(String[] args) throws Exception {
		Job job = new Job(new Configuration(), AprProduces.class.getSimpleName());
		job.setJarByClass(AprProduces.class);
		
		job.setMapperClass(AprProducesMap.class);
		job.setReducerClass(AprProducesReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}

}


class AprProducesMap extends Mapper<LongWritable, Text, Text, NullWritable>{
	Text k2 = new Text();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] splited = line.split("\t");
		k2.set(splited[1]);//四个文件的 文件名的下标不一样,需要修改
		context.write(k2, NullWritable.get());
	}
}

class AprProducesReducer extends Reducer<Text, Text, Text, NullWritable>{
	@Override
	protected void reduce(Text k2, Iterable<Text> v2s,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(k2, NullWritable.get());
	}
}

 

4.com.huawei.RenameApr

package com.huawei;

public class RenameApr {
	public static void main(String[] args) throws Exception{
		//文件重命名
		HdfsUtils.copyProduces("/apr-output/", "aprProduce");
	}
}

 

0
1
分享到:
评论

相关推荐

    MapReduce详解包括配置文件

    2. **Reduce(归并)阶段**:这一阶段的任务是对Map阶段产生的键值对进行聚合处理,将相同键的值进行组合,并对这些值执行进一步的计算操作,最后生成最终结果。 #### 二、MapReduce的工作机制 1. **文件切片**:...

    MapReduce分析Youtube数据内含源码以及说明书可以自己运行复现.zip

    2. 数据切分:将大文件分割成多个键值对,键可能是视频ID或时间戳,值可能包含用户行为的具体信息。 3. 数据映射:对每个键值对执行自定义逻辑,例如提取关键词、计算用户活动等。 Reduce阶段则会聚合Map阶段产生的...

    使用Eclipse编译运行MapReduce程序.doc

    Map阶段负责切分输入数据并生成键值对,Reduce阶段则对相同键的键值对进行聚合。 ### 查看HDFS文件系统数据的三种方法 1. 使用Hadoop提供的命令行工具`hdfs dfs`,如`hdfs dfs -ls /`来查看根目录下的文件和目录。 ...

    Mapreduce原理

    - **Reduce阶段**:Reduce任务接收到Shuffle阶段传递过来的键值对,并对这些键值对进行进一步处理。Reduce函数的作用是对每个键的所有值进行聚合操作,最终产生结果。例如,可以统计具有相同键的值的总数。 **2. ...

    Google MapReduce(二)

    总的来说,Google MapReduce通过上述的优化手段,如分区函数保证数据的正确聚合,合并函数减少资源消耗,以及合理的输入文件切分策略,实现了高效、可扩展的大规模数据处理。这种编程模型已被广泛应用于各种场景,...

    基于mapreduce计算框架的数据分析.zip

    Map阶段负责对原始数据进行切分和处理,而Reduce阶段则将Map阶段的结果进行聚合,得出最终结果。 在Hadoop中,MapReduce与Hadoop分布式文件系统(HDFS)紧密结合,形成了一套高效的数据处理解决方案。HDFS为大数据...

    Google-MapReduce中文版_1.0.pdf

    ### MapReduce技术详解 ...通过对输入数据进行映射和归约操作,MapReduce不仅能够有效地处理PB级别的数据,还提供了强大的容错能力和高度的可扩展性。随着大数据时代的到来,MapReduce的应用前景将更加广阔。

    Hadoop MapReduce.pdf

    - Map函数对输入键值对进行处理,通常是对输入数据进行简单的转换操作。 - Map函数输出键值对,这些键值对会被按照键进行排序和分组。 3. **Shuffle阶段**: - 在Map任务完成后,其输出会根据键进行分区和排序,...

    mapreduce原理

    2. InputFormat 模块负责做 Map 前的预处理,主要包括验证输入的格式是否符合 JobConfig 的输入定义、将 input 的文件切分为逻辑上的输入 InputSplit。 3. 将 RecordReader 处理后的结果作为 Map 的输入,然后 Map ...

    18_尚硅谷大数据之MapReduce_Hadoop数据压缩1

    3. Bzip2(BZip2Codec):提供最高压缩率,但压缩和解压速度较慢,支持数据切分,适用于对压缩率有高要求但对速度不敏感的场景。 4. LZO(LzopCodec):压缩速度较快,解压速度更快,但需要预先建立索引,并且不支持...

    Hadoop MapReduce实现tfidf源码

    在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...

    MapReduce架构

    - **输入数据的切分**:MapReduce框架首先将输入数据分成若干个块,这些块可以分布在多台计算机上进行处理。 - **Map阶段**:每台计算机上的Map函数处理分配给它的数据块,并产生一系列中间键值对。 - **Shuffle阶段...

    Hadoop-MapReduce实践示例

    MapReduce的实践示例往往涉及到数据的分布式存储、任务的切分、并行处理以及最终的数据汇总。该实践涉及的主要知识点包括: 1. MapReduce的设计与工作原理:MapReduce的设计思想源自于函数式编程中的map和reduce...

    MapReduce算法

    WordCount是最经典的MapReduce应用案例之一,其目标是对大量文本文件中的单词进行计数。 1. **Map函数**:对于每一份文档,Map函数将文档中的每一个单词映射为键值对(单词,1),表示每一个单词出现一次。 2. **...

    MapReduce原理.docx

    2. **数据切片**:数据文件会被切分成多个块,每个块被分配给一个MapTask进行处理。 3. **Map阶段**:MapTask对分配给它的数据块进行处理,产生一系列键值对输出。 4. **Reduce阶段**:ReduceTask接收来自所有Map...

    实验五 MapReduce实验.docx

    在 Java 中,MapReduce 程序可以使用 Hadoop 提供的 API 进行开发,主要包括 InputFormat、OutputFormat、Mapper/Reducer 和 Writable 等。InputFormat 用于描述输入数据的格式,常用的为 TextInputFormat,提供数据...

    mapreduce secondarysort

    - 在Reduce函数执行前,MapReduce框架会根据`IntPair`的排序规则对数据进行排序。 4. **排序和分组**: - 为了实现二次排序,需要通过JobConf对象设置排序和分组比较器。 - **设置排序比较器**:通过`job....

    Hadoop_MapReduce教程

    - **输入切分**:首先,Hadoop MapReduce 将输入数据集切分为若干个数据块,每个数据块由一个 map 任务处理。 - **Map 阶段**:每个 map 任务处理一个数据块,产生中间键值对。 - **Shuffle 排序**:中间键值对...

Global site tag (gtag.js) - Google Analytics