`
ghost_face
  • 浏览: 54310 次
社区版块
存档分类
最新评论

通过HadoopAPI获取task日志内容

 
阅读更多

 在集群中查看Task日志的方法,一般有两个:

1,通过Hadoop提供的WebConsole,直接在页面中追踪查看;

2,到集群中运行该task的节点上,查看日志文件。每个tasktracker子进程都会用log4j产生三个日志文件,分别是syslog,stdout,stderr。这些日志文件存放到%HADOOP_LOG_DIR%目录下的userlogs的子目录中。但是通过该方法,需要追踪到哪个节点运行了该task。

 

下面,通过使用JobClient,以及JobClient的几个私有方法(displayTaskLogs()、getTaskLogs()、getTaskLogURL(),方法参数省略,具体见代码),来获取日志信息。代码如下:

package myTest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

import java.io.*;


public class test {

    static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
    }
//JobClient中的该方法,没有Writer参数,这是为了得到输出流加的
    private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl, Writer sw)
            throws IOException {
        // The tasktracker for a 'failed/killed' job might not be around...
        if (baseUrl != null) {
            // Construct the url for the tasklogs
            String taskLogUrl = getTaskLogURL(taskId, baseUrl);

            // Copy task's stderr to stderr of the JobClient
           getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), sw);
        }
    }

//JobClient中的该方法,参数不是Writer类型,而是OutputStream类型,直接打印到控制台。
    private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
                                   Writer out) {
        try {
            URLConnection connection = taskLogUrl.openConnection();
            connection.setReadTimeout(1000000);
            connection.setConnectTimeout(1000000);
            BufferedReader input =
                    new BufferedReader(new InputStreamReader(connection.getInputStream()));
            BufferedWriter output =
                    new BufferedWriter(out);
            try {
                String logData = null;
                while ((logData = input.readLine()) != null) {
                    if (logData.length() > 0) {
                        output.write(taskId + ": " + logData + "\n");
                        output.flush();
                    }
                }
            } finally {
                input.close();
            }
        }catch(IOException ioe){
            System.out.println("Error reading task output" + ioe.getMessage());
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        Configuration conf = new Configuration();
        conf.addResource(new Path("conf/mapred-site.xml"));
        conf.addResource(new Path("conf/core-site.xml"));
        conf.addResource(new Path("conf/hdfs-site.xml"));

//输出配置文件的所有属性
//        for (Map.Entry<String, String> entry : conf) {
//            System.out.println(entry.getKey() + "\t=\t" + entry.getValue());
//        }

        JobConf job = new JobConf(conf);
        JobClient jc = new JobClient(job);
       jc.init(job);

        JobID jobIdNew = new JobID("201304151829", 6316);
        RunningJob runJob = jc.getJob(jobIdNew);

        StringWriter sw = new StringWriter();
        TaskCompletionEvent[] events = runJob.getTaskCompletionEvents(0);
        for(TaskCompletionEvent event : events){
            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp(), sw);
        }
        System.out.println(sw.toString());

//        /**
//         * mapProgress()/reduceProgress()
//         * result:1.0
//         */
//        System.out.println(runJob.mapProgress());
//        System.out.println(runJob.reduceProgress());
//
//
//        /**getTrackingURL()
//         * result:
//         * http://baby6:35030/jobdetails.jsp?jobid=job_201304151829_5768
//         */
//        System.out.println(runJob.getTrackingURL());
//
//
//        /**displayTasks()
//         * result:
//         * attempt_201304151829_5768_m_000000_0
//         */
//        jc.displayTasks(jobIdNew, "map", "completed");
//

//        /**
//         * 获取集群中taskTracker个数
//         */
//        System.out.println(jc.getClusterStatus().getTaskTrackers());

/**
 *获取集群中活着的节点名称
 */
//        Collection<String> c = jc.getClusterStatus(true).getActiveTrackerNames();
//        Iterator it = c.iterator();
//        while (it.hasNext()) {
//            System.out.println(it.next());
//        }
//        JobStatus[] jobs = jc.getAllJobs();
//        System.out.println(jobs.length);


    }
}

 注:该方法只能获取的到非历史Job的日志信息,如果该job已经变成History job时,获取为空。

一般一个job经过24小时会变成history job,这个可以在集群中设置。

0
5
分享到:
评论

相关推荐

    hadoop提交作业分析.doc

    2. **HADOOP_OPTS**: 这个变量包含了一组特定于Hadoop的Java选项,可能包括安全、日志、调试等相关的配置。在实际运行时,这些选项会被合并到JVM的启动参数中。 3. **CLASSPATH**: 这是Java应用程序寻找类文件的...

    Hive任务提交流程.pdf

    Hive作为大数据处理领域的一个重要工具,它允许用户通过SQL语句来查询和管理存储在Hadoop集群上的大规模数据。在Hive中,复杂的SQL查询会被解析并转换为一系列的任务(Task),这些任务通常是MapReduce作业。本篇将...

    SSM框架实现爬虫

    在爬虫项目中,虽然可能没有传统的用户界面,但可以通过RESTful API来启动爬虫任务或者获取爬取结果。例如,可以创建一个CrawlerController,接收HTTP请求,调用SpiderService的相关方法,并返回爬取的页面数据或...

    大数据面试100题.pdf

    开发者可以通过API编程,将数据从源头推送到Kafka。 Flume管道内存,flume宕机了数据丢失怎么解决: 在Flume中,内存中的数据丢失是一个常见问题,因为数据首先会缓存在内存中。可以通过设置合适的批处理大小和块...

    大数据高级编程最佳实践

    - **日志查看:**通过查看MapReduce的日志文件,可以获取关于任务执行的详细信息。 - **控制台访问:**JobTracker提供了Web界面,用户可以通过浏览器访问该界面来查看作业的状态和进度。 #### 九、MapReduce的未来...

    spark-2.2.1.tar.gz 源码

    - **Spark Core**:负责任务调度,使用DAG(有向无环图)来表示任务,通过TaskScheduler接口实现任务调度。内存管理采用Tachyon或HDFS作为持久化存储,容错机制基于检查点和事件日志。 - **Spark SQL**:提供了...

    storm所需组件资源的资源

    - 官方文档:提供详细的安装指南、API参考、最佳实践等内容,是学习和使用Storm的重要参考资料。 - 社区论坛:用户交流平台,可获取解决方案和经验分享。 总之,Apache Storm作为一个强大的实时计算框架,通过其...

    lyu 大数据期末考试资料

    以上内容概述了大数据领域的核心概念和技术细节,包括大数据的基本定义、特征、采集、存储、计算、处理流程、商业和社会价值等方面,并深入探讨了大数据与人工智能的关系、集群与虚拟化技术、Hadoop框架及其核心组件...

    《大数据平台搭建与配置管理》期末考试卷及答案.docx

    - **知识点**: 客户端维护与Zookeeper服务器之间的连接,通过该连接发送请求、获取响应以及发送心跳等操作。 31. **ZooKeeper的起源** - **知识点**: ZooKeeper框架最初是在雅虎公司内部开发的,后来成为Apache的...

    jStorm-开源

    与Hadoop MapReduce这样的批量处理框架不同,jStorm专注于实时流处理,允许数据在处理过程中持续流动,极大地降低了延迟,提升了数据处理的效率。该工具由阿里巴巴集团开发,并在社区中广泛推广,为大数据处理提供了...

Global site tag (gtag.js) - Google Analytics