- 浏览: 347365 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
分析基于hadoop-0.19.2
MapTask和ReduceTask的入口是
org.apache.hadoop.mapred.Child.main(String[] args){ }
传入的args举例如下:
//args = [127.0.0.1, 57354, attempt_201107272049_0001_m_000003_0, 497563501]
//args = [127.0.0.1, 41819, attempt_201107280338_0001_r_000000_1, -2017927773]
String host = args[0]; // ip
int port = Integer.parseInt(args[1]); //端口号
InetSocketAddress address = new InetSocketAddress(host, port); //用于子进程跟TaskTracker通信用,接口是TaskUmbilicalProtocol。使用如下:
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
address,
defaultConf);
attempt_201107272049_0001_m_000003_0
// 201107272049_0001用于确定JobId,JobId=jobId = job_201107272049_0001
// m 说明是Map,r说明是Reduce。
// 其他id 还不清楚。
org.apache.hadoop.mapred.Child.main 被org.apache.hadoop.mapred.TaskRunner调用、启动,具体见TaskRunner的run方法。
在TaskRunner.run方法里:
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
默认运行map和reduce的子进程是200m。
mapred.child.java.opts这个属性是只有运行map和reduce的子进程用,所以我们可以传入一下其他参数,例如:
在hadoop-site.xml里加入:
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx200m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y</value>
</property>
这样子进程JVM运行的时候就会在8000端口监听,可以通过Eclipse远程debug进去,跟踪MapTask和ReduceTask的运行。
还需要改下其他参数比较好:
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>1</value>
<description>The maximum number of map tasks that will be run
simultaneously by a task tracker.
</description>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>1</value>
<description>The maximum number of reduce tasks that will be run
simultaneously by a task tracker.
</description>
</property>
<property>
<name>mapred.task.timeout</name>
<value>600000</value>
<description>The number of milliseconds before a task will be
terminated if it neither reads an input, writes an output, nor
updates its status string.
</description>
</property>
因为我是在自己的虚拟机上启动的一个hadoop集群,所有的进程都运行在一个机器上,任何时候都只能有一个进程在8000端口监听。所以设置一下同时最后只有一个 map或者一个reduce调度运行,mapred.task.timeout默认是10分钟超时,如果运行map或者reduce的子进程没有向TaskTracker上报信息或者心跳,就会被kill,debug过程需要的时间比较长,可以一个线程一个线程debug.
运行ReduceTask:
org.apache.hadoop.mapred.Child.main(String[] args){
Task task = null;
JvmTask myTask = umbilical.getTask(jvmId); // 子进程通过TaskUmbilicalProtocol从父进程TaskTracker获得一个Task的信息。
task = myTask.getTask();
task.run(job, umbilical); // run the task
}
Task task 这个Task运行map时是MapTask,在运行reduce时是ReduceTask.
ReduceTask.run(JobConf job, final TaskUmbilicalProtocol umbilical){
// start thread that will handle communication with parent
startCommunicationThread(umbilical);
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
}
}
copyPhase.complete(); // copy is already complete
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter)
: reduceCopier.createKVIterator(job, rfs, reporter);
sortPhase.complete(); // sort is complete
// make output collector
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
final RecordWriter out =
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector collector = new OutputCollector() {
public void collect(Object key, Object value)
throws IOException {
out.write(key, value);
reduceOutputCounter.increment(1);
// indicate that progress update needs to be sent
reporter.progress();
}
};
// collector 直接往HDFS写。
// apply reduce function
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
// 这里的Reducer就是实现的Reducer类,如用hive提交的job就是org.apache.hadoop.hive.ql.exec.ExecReducer
while (values.more()) { //最主要的循环处理
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);//对数据进行reduce
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
}
values.nextKey();
values.informReduceProgress();
}
done(umbilical); //reduce处理完了通过umbilical.done(getTaskID());通知TaskTracker。
}
new的线程:按时间顺序
(1)在Child.main 71行中创建线程Thread t = new Thread() 用于同步日志:
//every so often wake up and syncLogs so that we can track
//logs of the currently running task
(2)在ReduceTask.run方法开始运行时调用startCommunicationThread,见Task的startCommunicationThread(final TaskUmbilicalProtocol umbilical)方法,new了一个pingProgressThread线程,该线程检查ReduceTask是否有状态如进度有变化,如有就向TaskTracker上报,如没有向TaskTracker发送心跳,保活。
(3)copy阶段有多种多个线程:
主要是在reduceCopier.fetchOutputs()中调用。
(3.1)MapOutputCopier,copying threads默认5个
(3.2)LocalFSMerger,on-disk-merge thread线程,1个,merge磁盘上的文件
(3.3)InMemFSMergeThread,in memory merger thread,1个,merge内存的文件
一些线程的角色和分工是:
主线程从TaskTracker获得MapTask完成的事件信息,构造获得Map输出结果的URL放入队列mapLocations。TaskTracker是从JobTracker获得MapTask已完成的信息。
主线程遍历mapLocations把这些任务移到队列scheduledCopies中。
每个MapOutputCopier互斥的从队列scheduledCopies获取任务,如果没有就等待在队列上,如果获得一个通过http请求从TaskTracker获得那部分map output。
LocalFSMerger把MapOutputCopier获取过来放到disk上的map output进行归并。
InMemFSMergeThread合并内存上的map output。
Map:
(1)Read (reading map inputs)
(2)Map (map function processing)
(3)Collect (serializing to buffer and partitioning)
(4)Spill (sorting, combining, compressing, and writing map outputs to local disk)
(5)Merge (merging sorted spill files)
Reduce:
(1)Shuffle (transferring map outputs to reduce tasks, with decompression if needed)
(2)Merge (merging sorted map outputs)
(3)Reduce (reduce function processing)
(4)Write(writing reduce outputs to the distributed file-system)
MapTask和ReduceTask的入口是
org.apache.hadoop.mapred.Child.main(String[] args){ }
传入的args举例如下:
//args = [127.0.0.1, 57354, attempt_201107272049_0001_m_000003_0, 497563501]
//args = [127.0.0.1, 41819, attempt_201107280338_0001_r_000000_1, -2017927773]
String host = args[0]; // ip
int port = Integer.parseInt(args[1]); //端口号
InetSocketAddress address = new InetSocketAddress(host, port); //用于子进程跟TaskTracker通信用,接口是TaskUmbilicalProtocol。使用如下:
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
address,
defaultConf);
attempt_201107272049_0001_m_000003_0
// 201107272049_0001用于确定JobId,JobId=jobId = job_201107272049_0001
// m 说明是Map,r说明是Reduce。
// 其他id 还不清楚。
org.apache.hadoop.mapred.Child.main 被org.apache.hadoop.mapred.TaskRunner调用、启动,具体见TaskRunner的run方法。
在TaskRunner.run方法里:
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
默认运行map和reduce的子进程是200m。
mapred.child.java.opts这个属性是只有运行map和reduce的子进程用,所以我们可以传入一下其他参数,例如:
在hadoop-site.xml里加入:
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx200m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y</value>
</property>
这样子进程JVM运行的时候就会在8000端口监听,可以通过Eclipse远程debug进去,跟踪MapTask和ReduceTask的运行。
还需要改下其他参数比较好:
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>1</value>
<description>The maximum number of map tasks that will be run
simultaneously by a task tracker.
</description>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>1</value>
<description>The maximum number of reduce tasks that will be run
simultaneously by a task tracker.
</description>
</property>
<property>
<name>mapred.task.timeout</name>
<value>600000</value>
<description>The number of milliseconds before a task will be
terminated if it neither reads an input, writes an output, nor
updates its status string.
</description>
</property>
因为我是在自己的虚拟机上启动的一个hadoop集群,所有的进程都运行在一个机器上,任何时候都只能有一个进程在8000端口监听。所以设置一下同时最后只有一个 map或者一个reduce调度运行,mapred.task.timeout默认是10分钟超时,如果运行map或者reduce的子进程没有向TaskTracker上报信息或者心跳,就会被kill,debug过程需要的时间比较长,可以一个线程一个线程debug.
运行ReduceTask:
org.apache.hadoop.mapred.Child.main(String[] args){
Task task = null;
JvmTask myTask = umbilical.getTask(jvmId); // 子进程通过TaskUmbilicalProtocol从父进程TaskTracker获得一个Task的信息。
task = myTask.getTask();
task.run(job, umbilical); // run the task
}
Task task 这个Task运行map时是MapTask,在运行reduce时是ReduceTask.
ReduceTask.run(JobConf job, final TaskUmbilicalProtocol umbilical){
// start thread that will handle communication with parent
startCommunicationThread(umbilical);
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
}
}
copyPhase.complete(); // copy is already complete
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter)
: reduceCopier.createKVIterator(job, rfs, reporter);
sortPhase.complete(); // sort is complete
// make output collector
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
final RecordWriter out =
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector collector = new OutputCollector() {
public void collect(Object key, Object value)
throws IOException {
out.write(key, value);
reduceOutputCounter.increment(1);
// indicate that progress update needs to be sent
reporter.progress();
}
};
// collector 直接往HDFS写。
// apply reduce function
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
// 这里的Reducer就是实现的Reducer类,如用hive提交的job就是org.apache.hadoop.hive.ql.exec.ExecReducer
while (values.more()) { //最主要的循环处理
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);//对数据进行reduce
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
}
values.nextKey();
values.informReduceProgress();
}
done(umbilical); //reduce处理完了通过umbilical.done(getTaskID());通知TaskTracker。
}
new的线程:按时间顺序
(1)在Child.main 71行中创建线程Thread t = new Thread() 用于同步日志:
//every so often wake up and syncLogs so that we can track
//logs of the currently running task
(2)在ReduceTask.run方法开始运行时调用startCommunicationThread,见Task的startCommunicationThread(final TaskUmbilicalProtocol umbilical)方法,new了一个pingProgressThread线程,该线程检查ReduceTask是否有状态如进度有变化,如有就向TaskTracker上报,如没有向TaskTracker发送心跳,保活。
(3)copy阶段有多种多个线程:
主要是在reduceCopier.fetchOutputs()中调用。
(3.1)MapOutputCopier,copying threads默认5个
(3.2)LocalFSMerger,on-disk-merge thread线程,1个,merge磁盘上的文件
(3.3)InMemFSMergeThread,in memory merger thread,1个,merge内存的文件
一些线程的角色和分工是:
主线程从TaskTracker获得MapTask完成的事件信息,构造获得Map输出结果的URL放入队列mapLocations。TaskTracker是从JobTracker获得MapTask已完成的信息。
主线程遍历mapLocations把这些任务移到队列scheduledCopies中。
每个MapOutputCopier互斥的从队列scheduledCopies获取任务,如果没有就等待在队列上,如果获得一个通过http请求从TaskTracker获得那部分map output。
LocalFSMerger把MapOutputCopier获取过来放到disk上的map output进行归并。
InMemFSMergeThread合并内存上的map output。
Map:
(1)Read (reading map inputs)
(2)Map (map function processing)
(3)Collect (serializing to buffer and partitioning)
(4)Spill (sorting, combining, compressing, and writing map outputs to local disk)
(5)Merge (merging sorted spill files)
Reduce:
(1)Shuffle (transferring map outputs to reduce tasks, with decompression if needed)
(2)Merge (merging sorted map outputs)
(3)Reduce (reduce function processing)
(4)Write(writing reduce outputs to the distributed file-system)
发表评论
-
hadoop
2017-08-01 13:42 0audit log配置 http://hack ... -
hbase jmx
2013-12-11 20:42 2938conf/hbase-env.sh 里面配了 JMX后就可 ... -
Too many fetch failures
2013-10-29 10:42 1426http://lucene.472066.n3.na ... -
cdh3集群 distcp 数据到 cdh4集群
2013-09-26 21:54 1101从cdh3集群 distcp 数据到 cdh4集群上面 ... -
cdh4 vs cdh3 client处理DataNode异常的不同
2013-09-13 21:13 2203cdh4在处理pipeline中的错误时,逻辑上与原先不一 ... -
hdfs 升级,cdh3 升级 cdh4
2013-08-05 18:09 2190Step 1: 做下saveNamespace操作,停掉集 ... -
HDFS HBase NIO相关知识
2012-09-26 18:29 2649HDFS的NIO有一些相关的知识偶尔需要注意下: (1) 使 ... -
java.net.SocketTimeoutException: 480000 millis timeout hdfs
2012-08-13 16:45 8175hdfs集群出现SocketTimeoutException, ... -
HBase如何从Hadoop读取数据,DFSInputStream
2012-08-08 15:41 3336HDFS Client的读取流是从DFSInputStream ... -
DFSClient Packet dfs.write.packet.size
2012-07-30 20:01 1616HBase 里面调用DFSOutputStream的方法常用的 ... -
hbase、hadoop checksum相关
2012-07-25 21:16 1958support checksums in HBase bloc ... -
DFSClient 写一个Block的过程
2012-07-12 21:39 1235DFSClient 写一个Block的过程 ... -
cdh3u0的jetty导致Error Reading IndexFile
2012-04-13 20:21 2303在36个机器上面跑一个大作业,8千多个map,2w多个r ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1812Hive 的 OutputCommitter publi ... -
Hadoop MapOutputBuffer
2011-09-13 23:50 0http://blog.sina.com.cn/s/blog_ ... -
Hadoop如何组织中间数据的存储和传输(源码级分析)
2011-09-13 19:48 0http://blog.sina.com.cn/s/blog_ ... -
hadoop spill
2011-09-02 17:07 0bin/hadoop jar hadoop-*-example ... -
hadoop JobClient 提交作业的目录
2011-09-01 15:44 0mapred.system.dir 这个目录所有用户都有权限写 ... -
hadoop terasort
2011-08-29 22:29 0Hadoop TeraSort 基准测试实验 http://h ... -
hadoop WordCount
2011-08-26 14:42 0import java.io.IOException; imp ...
相关推荐
通过对源代码的详细分析和实际运行测试,用户可以更深入地理解Hadoop Map-Reduce的工作原理,掌握更高级的数据处理技巧。 总之,Hadoop Map-Reduce框架不仅为大数据处理提供了强大的工具,同时也为用户提供了丰富的...
例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...
### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...
基于Hadoop Hive健身馆可视化分析平台项目源码+数据库文件.zip启动方式 环境启动 hadoop hive2元数据库 sql导入 导入hivesql脚本,修改application.yml 启动主程序 HadoopApplication 基于Hadoop Hive健身馆可视化...
总结来说,【基于Hadoop的电影影评数据分析】项目是大数据技术在生活娱乐领域的应用实例,它涵盖了Hadoop环境的搭建、MapReduce编程模型的运用以及大数据分析的实践。通过这个项目,学生可以深入理解大数据处理流程...
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...
总的来说,设计一个基于Hadoop的数据分析系统涉及到多个环节,从需求分析到系统设计,再到具体的部署和优化,每个步骤都需要细致考虑和精心实施。通过这样的系统,企业能够高效地处理和分析海量数据,从而获取有价值...
【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...
在大数据处理领域,Hadoop和MapReduce是两个...然而,为了充分发挥其潜力,理解Hadoop的分布式原理以及如何优化Join策略至关重要。在实际应用中,需要根据数据规模、数据分布以及特定业务需求来选择最合适的Join方法。
在编码实现部分,本文介绍了使用MapReduce实现成绩分析的过程,包括初始数据的处理、计算每门课程的平均成绩、最高成绩和最低成绩,以及计算每门课程学生的平均成绩等。同时,也介绍了如何计算每门课程当中出现了...
Hadoop 的源代码分析可以分为三个部分:HDFS、MapReduce 和其他组件。 HDFS HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。...
使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...
Hadoop豆瓣电影数据分析(Hadoop)操作源码
标题中的“基于Hadoop的豆瓣电影影评...综上所述,这个项目涵盖了Hadoop的基本原理、大数据处理流程、文本分析技术、情感分析以及用户行为挖掘等多个方面的知识,对于理解和实践大数据在实际生活中的应用具有重要意义。
基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于...
基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...