`

Mapreduce任务实现邮件监控

    博客分类:
  • Java
 
阅读更多
http://my.oschina.net/mkh/blog/493885
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下


import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;
 
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
 
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.xml.XMLSerializer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
 
public class Email {
 
    private static final String USERNAME = "123456@qq.com";//发送邮件的用户名
    private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码
    private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host
 
    public static void main(String args[]) {
        try {
            sendEmail("测试邮件", "测试邮件内容!", "test@qq.com");
            System.out.println("email ok !");
        } catch (MessagingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    /**
     * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取
     * @param to 目标邮箱(可以多个邮箱,用,号隔开)
     * @param job 通过mapreduce的job获取jobID
     * @param time 通过时间戳访问错误日志路径
     * @throws Exception
     */
    public static void sendErrMail(String to, Job job, String time)
            throws Exception {
        String subject = job.getJobName();
        String message = getErr(job, time);
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);
 
        // 创建 message
        Message msg = new MimeMessage(session);
 
        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));
 
        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);
 
        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(message, "text/html;charset=utf-8");
        Transport.send(msg);
    }
 
    /**
     * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写
     * @param subject 主题
     * @param body 内容
     * @param to 目标邮箱
     * @throws MessagingException
     */
    public static void sendEmail(String subject, String body, String to)
            throws MessagingException {
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);
 
        // 创建 message
        Message msg = new MimeMessage(session);
 
        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));
 
        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);
 
        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(body, "text/html;charset=utf-8");
        Transport.send(msg);
 
    }
 
    /**
     * @category 获取日志文件
     * @param job
     * @param time
     * @return FSDataInputStream
     * @throws IOException
     */
    public static FSDataInputStream getFile(Job job, String time)
            throws IOException {
        String year = time.substring(0, 4);
        String month = time.substring(4, 6);
        String day = time.substring(6, 8);
        String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/"
                + year + "/" + month + "/" + day + "/000000";
        FileSystem fs = FileSystem.get(URI.create(dst), new Configuration());
        FileStatus[] status = fs.listStatus(new Path(dst));
        FSDataInputStream in = null;
        for (int i = 0; i < status.length; i++) {
            if (status[i].getPath().getName()
                    .contains(job.getJobID().toString())
                    && status[i].getPath().getName().endsWith("jhist")) {
                in = new FSDataInputStream(fs.open(status[i].getPath()));
            }
        }
        return in;
    }
 
    /**
     * @category 解析文件类容为xml
     * @param job
     * @param time
     * @return xml
     * @throws IOException
     * @throws InterruptedException
     */
    public static String getErr(Job job, String time) throws IOException,
            InterruptedException {
        FSDataInputStream in = getFile(job, time);
        Thread t1 = new Thread();
        while (in == null) {
            t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成
            t1.join();
            in = getFile(job, time);
        }
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
 
        String line = "";
        JSONObject jo;
        JSONArray jsa = new JSONArray();
        String xml = "";
        XMLSerializer xmlSerializer = new XMLSerializer();
        while ((line = br.readLine()) != null) {
            if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) {
                jo = JSONObject.fromObject(line);
                jsa.add(jo);
            }
        }
        xml = xmlSerializer.write(jsa);
        in.close();
        br.close();
        return xml;
 
    }
 
    /**
     * @category 获取try-catch中的异常内容
     * @param e Exception
     * @return 异常内容
     */
    public static String getException(Exception e) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream pout = new PrintStream(out);
        e.printStackTrace(pout);
        String ret = new String(out.toByteArray());
        pout.close();
        try {
            out.close();
        } catch (Exception ex) {
        }
        return ret;
    }
}
 
class LoginMail extends Authenticator {
 
    private String username;
    private String password;
 
    public String getUsername() {
        return username;
    }
 
    public void setUsername(String username) {
        this.username = username;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
    @Override
    protected PasswordAuthentication getPasswordAuthentication() {
        return new PasswordAuthentication(username, password);
    }
 
    public LoginMail(String username, String password) {
        this.username = username;
        this.password = password;
    }
}
分享到:
评论

相关推荐

    hadoop文档, hdfs mapreduce,环境搭建,例子介绍等

    MapReduce通过将任务分解为Map和Reduce两个阶段来简化并行处理过程。Map阶段负责将输入数据切分成小块并进行处理,Reduce阶段负责汇总处理结果。 #### 三、Hadoop环境搭建 Hadoop的环境搭建可以分为三种模式:单机...

    基于MapReduce的贝叶斯分类.zip

    在Hadoop中,MapReduce作业可以通过JobTracker(Hadoop 1.x)或YARN(Hadoop 2.x及以后版本)进行调度和监控,确保分布式计算的正确性和效率。 综上所述,这个项目可能是设计一个利用Hadoop MapReduce处理大规模...

    云计算中任务分配研究.pdf

    仿真实验结果表明,使用DAG图结合的MapReduce任务分配方法能够有效地提升云计算中计算节点的工作效率,并且通过实验结果验证了该算法的正确性。这一研究成果对于云计算系统的优化和资源分配具有重要的指导意义,它...

    Azkaban任务调度安装配置和使用

    - **邮件提醒**:在任务成功或失败时发送邮件通知。 ##### 1.6 Azkaban的架构 Azkaban的核心组件包括: 1. **AzkabanWebServer**:作为主要的管理者,负责用户认证、项目管理、定时执行工作流以及跟踪执行进度等...

    大数据项目之电商数仓(6集群监控Zabbix)V4.2.doc

    - **MapReduce作业**:任务执行状态,作业延迟,资源利用率。 - **YARN资源**:Container分配,内存和CPU使用,队列容量。 6. **报警与通知** Zabbix提供了灵活的报警机制,可根据预设阈值配置邮件通知。例如,...

    xxljob定时任务管理平台.zip

    4. 任务监控:提供实时的任务监控界面,可查看任务状态、日志、执行历史等信息。 5. 任务告警:支持任务执行失败时的通知机制,通过邮件、短信等方式提醒管理员。 6. API接口:提供了丰富的API接口,方便集成到各类...

    Azkaban 大数据任务调度器

    在大数据处理场景中,通常会涉及到多个独立或相互依赖的任务单元,如Shell脚本、Java程序、MapReduce任务和Hive脚本等。这些任务单元之间存在时间先后顺序和依赖关系,例如在数据清洗和分析过程中,原始数据首先需要...

    Apache Hadoop YARN:Moving beyond MapReduce and Batch Processing with Hadoop 2

    每个应用都由一个ApplicationMaster负责管理,它负责与ResourceManager协调资源,并监控应用各个任务的执行。 这本书籍系列是Addison-Wesley Data and Analytics Series的一部分,该系列致力于提供读者在数据分析...

    DolphinScheduler任务调度系统 v1.3.5-源码.zip

    提供RESTful API接口,方便与其他系统集成,实现自动化部署和监控。 8. **Web界面**: Web UI界面直观易用,提供任务创建、修改、查看、启动、停止等操作,并有实时监控和日志查看功能。 9. **扩展性**: ...

    azkaban.docx

    大数据系统通常涉及多个任务单元,如Shell脚本、Java程序、MapReduce任务和Hive脚本,这些任务之间存在时间先后和依赖关系。Azkaban通过其工作流调度能力,自动管理这些复杂流程,确保任务按照正确的顺序执行,减少...

    azkaban大数据调度任务.zip

    3. **作业(Jobs)**:作业是工作流中的基本单元,可以是单一的命令、脚本或者Hadoop MapReduce任务等。 4. **调度器(Scheduler)**:调度器负责根据预设的时间表或触发条件启动工作流。 5. **执行器(Executor)**...

    hadoop2.7 source code

    其次,可以通过跟踪代码执行流程,模拟数据读写或MapReduce任务执行,加深对系统运行机制的理解。最后,可以参考Hadoop社区的邮件列表和JIRA问题追踪系统,了解开发者如何解决实际问题。 总的来说,Hadoop 2.7源...

    大数据相关技术资料.docx

    告警管理能实时监控系统和任务状态,通过邮件、短信等方式发送告警通知。调度平台支持多种任务类型,并提供灵活的调度策略。事件生成和引用功能使得任务间的交互更加智能化。异常处理功能如重跑、补数据、停止、暂停...

    理解hadoop集群

    - JobTracker:在Hadoop 1.0中负责管理MapReduce任务的执行,调度任务到TaskTracker,并监控它们的执行状态。 - TaskTracker:运行MapReduce任务,接受JobTracker的调度,并把任务执行状态报告给JobTracker。 - ...

    hadoop-plugin

    为了方便开发者在Eclipse IDE中编写、调试和部署Hadoop MapReduce任务,专门开发了一款名为"Hadoop Plugin"的插件。这个插件极大地简化了Hadoop开发过程,提高了开发效率。 1. **Hadoop Plugin概述** Hadoop ...

    azkaban学习文档

    2. **工作流定义**:Azkaban使用JSON格式定义工作流,每个工作流由一系列任务(或称为作业)组成,这些任务可以是命令行脚本、Hadoop MapReduce作业、Spark作业等。任务之间可以通过依赖关系设定执行顺序,例如A任务...

Global site tag (gtag.js) - Google Analytics