`
usezhou
  • 浏览: 12881 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mapred.map.tasks 如何影响map的个数

 
阅读更多

且具体到底产生多少个分片(split) 因为多少个map 是有关系。(此处是根据新的API来分析,因为新的API 终究要调用到就得API来做具体的动作)

可能会说这个值 是系统根据文件大小 和根据文件分片大小 算出来的,那具体是如何算出来的呢,我们根据源码 一步一步来分析

首先Job.submit()

public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();

// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf); //此处用到JobClient的submitJobInternal 方法 看下面源码2

super.setJobID(info.getID());
state = JobState.RUNNING;
}

源码2 JobClient.submitJobInternal()

我们看 此方法中的 下面一段

// Create the splits for the job
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);//在此处计算 具体有多少个map紧接着看下面源码3
jobCopy.setNumMapTasks(maps);

源码3 writeSplits()

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {//新API
maps = writeNewSplits(job, jobSubmitDir); //见下面源码5
} else {//旧API
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}

源码5 writeNewSplits()

int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job); //我们需要关注的是这一行 调用input 中的getSplits 方法 我们会用FileInputFormat的getSplits方法来做实例 看源码6
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

源码6 FileInputFormat.getSplits(JobConf job, int numSplits)

FileStatus[] files = listStatus(job);

// Save the number of input files in the job-conf
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDir()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);//此处计算 split size 从这可以看出来 是根据goal,minSize,blockSize 三个参数来计算的,那需要仔细看看上面三个参数的由来,再继续看源码 7

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);

源码7 computeSplitSize

protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize)); 具体计算公式如下,可以看出
}

所以从上面源码的解读,我们可以看出来,这个参数 mapred.map.tasks 的设置对具体的mapreduce 对输入进行分片产生一定的作用,因为具体产生多少分片,多少个map

是根据三个参数来决定的 一个是dfs.block.size 另外一个是mapred.map.tasks 还有一个 就是 mapred.min.split.size 但一般情况下如果不设置 mapred.map.tasks 的情况下 则会根据其它两个参数来决定,但一般情况下 mapred.min.split.size 参数我们也不设置,所以 dfs.block.size 自然就是我们默认的分片大小,如果mapred.min.split.size 大于dfs.block.size 则系统分片就会大于文件系统 块的大小,从而map的个数也会相应的减少。

分享到:
评论

相关推荐

    mapred.zip_hadoop_hadoop mapreduce_mapReduce

    这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及进行实际开发是非常宝贵的资源。 MapReduce的核心理念是将大规模数据处理任务分解为两个主要阶段...

    hadoop 2.9.0 mapred-default.xml 属性集

    10. mapreduce.map.sort.spill.percent 配置Map任务缓冲区使用量达到该百分比时将溢写到磁盘。 11. mapreduce.jobtracker.address 指定作业追踪器的主机名和端口号。 12. mapreduce.cluster.temp.dir 指定Hadoop...

    avro-mapred-1.7.8-SNAPSHOT-hadoop2

    在MapReduce任务中读取Avro文件,会使用到avro-mapred.jar。 然而目前的avro-mapred.jar是基于较老的版本的,使用时会报错: org.apache.hadoop.mapred.YarnChild: Error running child : java.lang....

    mapred.github.io

    标题 "mapred.github.io" 暗示我们讨论的主题与Hadoop MapReduce有关,这是一个分布式计算框架,广泛用于处理和分析大数据集。该主题通常涉及并行计算、数据分片、任务调度以及与GitHub的关联,后者是一个代码托管...

    hive工作调优小结

    但是需要注意的是,直接修改**Mapred.map.tasks**是无效的。 **优化建议**: - 如果作业执行速度较慢,可以考虑增加Map任务的数量,以提高并行度。 - 如果作业执行速度较快,增加Map任务数量可能不会带来明显加速...

    hadoop 配置项的调优

    2. **mapred.map.tasks.speculative.execution** 和 **mapred.reduce.tasks.speculative.execution**:这两个参数控制是否启用推测执行。当某些任务由于硬件或负载问题执行较慢时,推测执行会启动新的任务副本,加快...

    hadoop作业调优参数整理及原理

    - **mapred.map.tasks**和**mapred.reduce.tasks**:这两个参数分别用于设置Mapper和Reducer的数量。适当调整可以平衡集群资源的使用,提高作业并行度。 5. **Split大小** - **mapred.max.split.size**和**mapred...

    hive性能优化

    与Map不同,Reduce任务的个数可以通过直接设置`mapred.reduce.tasks`来调整。然而,这种方式并不利于集群资源的自动扩展。通常,Hive会根据输入数据量动态计算Reduce任务数量,公式为`num_reduce_tasks = min(${hive...

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition org.apache.hadoop.mapreduce.lib.reduce org.apache.hadoop.mapreduce.security ...

    hadoop1.0 Failed to set permissions of path 解决方案

    ERROR org.apache.hadoop.mapred.TaskTracker: Can not start task tracker because java.io.IOException: Failed to set permissions of path: \tmp\hadoop-admin \mapred\local\ttprivate to 0700 at org.apache...

    hadoop集群各种配置文件

    3. `mapred.map.child.java.opts`和`mapred.reduce.child.java.opts`分别设置了Map任务和Reduce任务子进程的Java堆大小,分别为512MB和1024MB。 最后,`hadoop-env.sh`是Hadoop环境变量的配置,它定义了运行Hadoop...

    hive调优策略

    - 通过设置`mapred.reduce.tasks`参数来手动增加Map任务的数量。 - 使用`distribute by`子句创建一个分布表,将原始数据按随机原则重新分布到多个文件中,从而增加Map任务数量。 #### 四、深入理解大小表Join优化...

    Hadoop源码 包含mapred及mapreduce

    Hadoop源码 包含mapred

    企业级IT架构分享 云计算架构师成长之路 Hadoop公平调度器指南 共8页.pdf

    - **调度器类**:`mapred.jobtracker.taskScheduler`用于指定使用的调度器类,对于公平调度器而言,应设置为`org.apache.hadoop.mapred.FairScheduler`。 - **默认资源池**:`mapred.fairscheduler.default-pool`...

    Hadoop源代码分析

    3. Map任务:在`org.apache.hadoop.mapred.MapTask`类中,源代码解释了如何将输入数据分片、执行映射函数并生成中间键值对。 4. Reduce任务:`org.apache.hadoop.mapred.ReduceTask`类负责从各个Map任务获取中间...

    深入浅出数据仓库中SQL性能优化

    \[ \text{num_Map_tasks} = \max[\text{Mapred.min.split.size}, \min(\text{dfs.block.size}, \text{Mapred.max.split.size})] \] - **Mapred.min.split.size**:数据的最小分割单元大小,默认值为1B。 - **Mapred...

    hive优化.docx

    * 设置Reduce个数:set mapred.reduce.tasks=800; * 设置Reduce处理的数据量:set hive.exec.reducers.bytes.per.reducer = 100000000; 4. 小表与大表关联 小表与大表关联也容易导致数据倾斜问题。解决方法是: *...

    Map-Reduce体系架构

    - 可以通过调整`mapred.tasktracker.map.tasks.maximum`和`mapred.tasktracker.reduce.tasks.maximum`等配置参数,来优化每个节点上运行的任务数量。 #### 五、案例分析 虽然给定内容中并未详细列出具体的案例...

    Hadoop performance models

    | pNumMappers | mapred.map.tasks | - | Map 任务总数 | 以上列表仅列举了部分 Hadoop 参数,实际应用中可能还需要考虑更多参数的影响。例如,Hadoop 的配置文件中还包括关于任务调度、数据块存储位置等参数。 ##...

    hive优化经典.pdf

    通过设置mapred.reduce.tasks参数,可以控制拆分后的文件数量,从而确定Map任务的数量。 在实际操作中,应该根据具体的业务需求和数据特点来决定Map数量的控制策略。关键是要遵循两个原则:一是保证大数据量能够...

Global site tag (gtag.js) - Google Analytics