Mapreduce概念:一种分布式编程模型,在hadoop中用于负责mapreduce任务的是JobTracker,TaskTracker,
Hadoop中只有一台JobTracker。
1.JobTracker:用于任务的管理和调度工作。
2.TaskTracker:用于执行工作。
Mapreduce实现概述:
Hadoop中,maprecude的形式是个Job,每个Job又分为Map阶段和Reduce阶段。
Map阶段:接收到<K,V>形式的输入,进行处理后,输出格式为<K,V>给Reduce。
Recude阶段:将接受回来的<K,list of v>,处理后,输出到HDFS,格式为<K,V>
Map阶段:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * <code>InputFormat</code> describes the input-specification for a * Map-Reduce job. * * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the * job to:<p> * <ol> * <li> * Validate the input-specification of the job. * <li> * Split-up the input file(s) into logical {@link InputSplit}s, each of * which is then assigned to an individual {@link Mapper}. * </li> * <li> * Provide the {@link RecordReader} implementation to be used to glean * input records from the logical <code>InputSplit</code> for processing by * the {@link Mapper}. * </li> * </ol> * * <p>The default behavior of file-based {@link InputFormat}s, typically * sub-classes of {@link FileInputFormat}, is to split the * input into <i>logical</i> {@link InputSplit}s based on the total size, in * bytes, of the input files. However, the {@link FileSystem} blocksize of * the input files is treated as an upper bound for input splits. A lower bound * on the split size can be set via * <a href="{@docRoot}/../mapred-default.html#mapred.min.split.size"> * mapred.min.split.size</a>.</p> * * <p>Clearly, logical splits based on input-size is insufficient for many * applications since record boundaries are to respected. In such cases, the * application has to also implement a {@link RecordReader} on whom lies the * responsibility to respect record-boundaries and present a record-oriented * view of the logical <code>InputSplit</code> to the individual task. * * @see InputSplit * @see RecordReader * @see FileInputFormat */ public abstract class InputFormat<K, V> { /** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} * for processing.</p> * * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the * input files are not physically split into chunks. For e.g. a split could * be <i><input-file-path, start, offset></i> tuple. The InputFormat * also creates the {@link RecordReader} to read the {@link InputSplit}. * * @param context job configuration. * @return an array of {@link InputSplit}s for the job. */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /** * Create a record reader for a given split. The framework will call * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before * the split is used. * @param split the split to be read * @param context the information about the task * @return a new record reader * @throws IOException * @throws InterruptedException */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
@SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context
extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
}
}
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
/** * The record reader breaks the data into key/value pairs for input to the * {@link Mapper}. * @param <KEYIN> * @param <VALUEIN> */ public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { /** * Called once at initialization. * @param split the split that defines the range of records to read * @param context the information about the task * @throws IOException * @throws InterruptedException */ public abstract void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; /** * Read the next key, value pair. * @return true if a key/value pair was read * @throws IOException * @throws InterruptedException */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key * @return the current key or null if there is no current key * @throws IOException * @throws InterruptedException */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. * @return the object that was read * @throws IOException * @throws InterruptedException */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * The current progress of the record reader through its data. * @return a number between 0.0 and 1.0 that is the fraction of the data read * @throws IOException * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; /** * Close the record reader. */ public abstract void close() throws IOException; }
InputFormat 类 定义了两个抽象方法:getSplits和createRecordReader,前者根据作业(job)的配置,将输入文件切片并返回一个 InputSplit类型的数组;后者为InputSplit实例对象生成一个RecordReader对象,在InputSplit对象使用 前,MapReduce框架会先进行RecordReader的实例化操作。
InputSplit类定义了两个抽象方法:getLength和getLocations,getLength方法获取分片的大小,因此能根据输入分片的大小进行排序;getLocations方法获取输入数据所在节点的名称。
当数据传送给Map时,Map会生成InputSplit集合(切片),传给InputFormat,然后InputFormat再通过RecordReader的getCurrentKey()和getCurrentKey(),生成<K,V>集合。
相关推荐
Hadoop-Eclipse-Plugin 2.8.0的出现,反映了Hadoop生态系统从Hadoop 1到Hadoop 2的重大转变,尤其是在资源管理和任务调度方面的改进。同时,这也意味着对于那些已经习惯了Eclipse或MyEclipse的开发者来说,他们无需...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
1. `hadoop.dll`:这是一个动态链接库文件,用于在Windows环境中提供Hadoop的相关功能。 2. `winutils.exe`:如前所述,这是Windows上的一个关键工具,用于执行Hadoop相关的系统任务,如设置HDFS的权限和管理本地...
赠送jar包:hadoop-auth-2.5.1.jar; 赠送原API文档:hadoop-auth-2.5.1-javadoc.jar; 赠送源代码:hadoop-auth-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-auth-2.5.1.pom; 包含翻译后的API文档:hadoop...
1. `hadoop`: Hadoop命令行工具,用于执行各种Hadoop相关的操作,如启动、停止服务、管理文件系统等。 2. `hdfs`: 与Hadoop分布式文件系统(HDFS)交互的命令行工具,支持文件的创建、删除、复制等操作。 3. `yarn`:...
在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
1. 解压`hadoop-common-2.6.0-bin-master.zip`到你选择的目录,例如`C:\hadoop\hadoop-2.6.0`。 2. 打开系统属性,进入“高级”选项卡,点击“环境变量”按钮。 3. 在系统变量部分,找到名为`Path`的变量,点击...
1. **hadoop-winutils**: 在Windows环境下,由于系统内核差异,Hadoop的一些功能需要特定的工具集来支持,这就是winutils。它提供了一系列的命令行工具,如设置Hadoop环境变量、启动HDFS服务、管理HDFS文件等。在...
hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
1. **Apache Flink**:Flink的核心理念是提供低延迟、高吞吐量的数据处理能力,支持实时流数据和批量数据的处理。它提供了丰富的数据连接器和API,使得开发者能够方便地实现复杂的数据处理任务。Flink的流处理模型...
Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar
赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...