在集群中查看Task日志的方法,一般有两个:
1,通过Hadoop提供的WebConsole,直接在页面中追踪查看;
2,到集群中运行该task的节点上,查看日志文件。每个tasktracker子进程都会用log4j产生三个日志文件,分别是syslog,stdout,stderr。这些日志文件存放到%HADOOP_LOG_DIR%目录下的userlogs的子目录中。但是通过该方法,需要追踪到哪个节点运行了该task。
下面,通过使用JobClient,以及JobClient的几个私有方法(displayTaskLogs()、getTaskLogs()、getTaskLogURL(),方法参数省略,具体见代码),来获取日志信息。代码如下:
package myTest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.*; import java.io.*; public class test { static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); } //JobClient中的该方法,没有Writer参数,这是为了得到输出流加的 private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl, Writer sw) throws IOException { // The tasktracker for a 'failed/killed' job might not be around... if (baseUrl != null) { // Construct the url for the tasklogs String taskLogUrl = getTaskLogURL(taskId, baseUrl); // Copy task's stderr to stderr of the JobClient getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), sw); } } //JobClient中的该方法,参数不是Writer类型,而是OutputStream类型,直接打印到控制台。 private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, Writer out) { try { URLConnection connection = taskLogUrl.openConnection(); connection.setReadTimeout(1000000); connection.setConnectTimeout(1000000); BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream())); BufferedWriter output = new BufferedWriter(out); try { String logData = null; while ((logData = input.readLine()) != null) { if (logData.length() > 0) { output.write(taskId + ": " + logData + "\n"); output.flush(); } } } finally { input.close(); } }catch(IOException ioe){ System.out.println("Error reading task output" + ioe.getMessage()); } } public static void main(String[] args) throws IOException, InterruptedException { Configuration conf = new Configuration(); conf.addResource(new Path("conf/mapred-site.xml")); conf.addResource(new Path("conf/core-site.xml")); conf.addResource(new Path("conf/hdfs-site.xml")); //输出配置文件的所有属性 // for (Map.Entry<String, String> entry : conf) { // System.out.println(entry.getKey() + "\t=\t" + entry.getValue()); // } JobConf job = new JobConf(conf); JobClient jc = new JobClient(job); jc.init(job); JobID jobIdNew = new JobID("201304151829", 6316); RunningJob runJob = jc.getJob(jobIdNew); StringWriter sw = new StringWriter(); TaskCompletionEvent[] events = runJob.getTaskCompletionEvents(0); for(TaskCompletionEvent event : events){ displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp(), sw); } System.out.println(sw.toString()); // /** // * mapProgress()/reduceProgress() // * result:1.0 // */ // System.out.println(runJob.mapProgress()); // System.out.println(runJob.reduceProgress()); // // // /**getTrackingURL() // * result: // * http://baby6:35030/jobdetails.jsp?jobid=job_201304151829_5768 // */ // System.out.println(runJob.getTrackingURL()); // // // /**displayTasks() // * result: // * attempt_201304151829_5768_m_000000_0 // */ // jc.displayTasks(jobIdNew, "map", "completed"); // // /** // * 获取集群中taskTracker个数 // */ // System.out.println(jc.getClusterStatus().getTaskTrackers()); /** *获取集群中活着的节点名称 */ // Collection<String> c = jc.getClusterStatus(true).getActiveTrackerNames(); // Iterator it = c.iterator(); // while (it.hasNext()) { // System.out.println(it.next()); // } // JobStatus[] jobs = jc.getAllJobs(); // System.out.println(jobs.length); } }
注:该方法只能获取的到非历史Job的日志信息,如果该job已经变成History job时,获取为空。
一般一个job经过24小时会变成history job,这个可以在集群中设置。
相关推荐
2. **HADOOP_OPTS**: 这个变量包含了一组特定于Hadoop的Java选项,可能包括安全、日志、调试等相关的配置。在实际运行时,这些选项会被合并到JVM的启动参数中。 3. **CLASSPATH**: 这是Java应用程序寻找类文件的...
Hive作为大数据处理领域的一个重要工具,它允许用户通过SQL语句来查询和管理存储在Hadoop集群上的大规模数据。在Hive中,复杂的SQL查询会被解析并转换为一系列的任务(Task),这些任务通常是MapReduce作业。本篇将...
在爬虫项目中,虽然可能没有传统的用户界面,但可以通过RESTful API来启动爬虫任务或者获取爬取结果。例如,可以创建一个CrawlerController,接收HTTP请求,调用SpiderService的相关方法,并返回爬取的页面数据或...
开发者可以通过API编程,将数据从源头推送到Kafka。 Flume管道内存,flume宕机了数据丢失怎么解决: 在Flume中,内存中的数据丢失是一个常见问题,因为数据首先会缓存在内存中。可以通过设置合适的批处理大小和块...
- **日志查看:**通过查看MapReduce的日志文件,可以获取关于任务执行的详细信息。 - **控制台访问:**JobTracker提供了Web界面,用户可以通过浏览器访问该界面来查看作业的状态和进度。 #### 九、MapReduce的未来...
- **Spark Core**:负责任务调度,使用DAG(有向无环图)来表示任务,通过TaskScheduler接口实现任务调度。内存管理采用Tachyon或HDFS作为持久化存储,容错机制基于检查点和事件日志。 - **Spark SQL**:提供了...
- 官方文档:提供详细的安装指南、API参考、最佳实践等内容,是学习和使用Storm的重要参考资料。 - 社区论坛:用户交流平台,可获取解决方案和经验分享。 总之,Apache Storm作为一个强大的实时计算框架,通过其...
以上内容概述了大数据领域的核心概念和技术细节,包括大数据的基本定义、特征、采集、存储、计算、处理流程、商业和社会价值等方面,并深入探讨了大数据与人工智能的关系、集群与虚拟化技术、Hadoop框架及其核心组件...
- **知识点**: 客户端维护与Zookeeper服务器之间的连接,通过该连接发送请求、获取响应以及发送心跳等操作。 31. **ZooKeeper的起源** - **知识点**: ZooKeeper框架最初是在雅虎公司内部开发的,后来成为Apache的...
与Hadoop MapReduce这样的批量处理框架不同,jStorm专注于实时流处理,允许数据在处理过程中持续流动,极大地降低了延迟,提升了数据处理的效率。该工具由阿里巴巴集团开发,并在社区中广泛推广,为大数据处理提供了...