- 浏览: 3422515 次
- 性别:
- 来自: 珠海
文章分类
- 全部博客 (1633)
- Java (250)
- Android&HTML5 (111)
- Struts (10)
- Spring (236)
- Hibernate&MyBatis (115)
- SSH (49)
- jQuery插件收集 (55)
- Javascript (145)
- PHP (77)
- REST&WebService (18)
- BIRT (27)
- .NET (7)
- Database (105)
- 设计模式 (16)
- 自动化和测试 (19)
- Maven&Ant (43)
- 工作流 (36)
- 开源应用 (156)
- 其他 (16)
- 前台&美工 (119)
- 工作积累 (0)
- OS&Docker (83)
- Python&爬虫 (28)
- 工具软件 (157)
- 问题收集 (61)
- OFbiz (6)
- noSQL (12)
最新评论
-
HEZR曾嶸:
你好博主,这个不是很理解,能解释一下嘛//左边+1,上边+1, ...
java 两字符串相似度计算算法 -
天使建站:
写得不错,可以看这里,和这里的这篇文章一起看,有 ...
jquery 遍历对象、数组、集合 -
xue88ming:
很有用,谢谢
@PathVariable映射出现错误: Name for argument type -
jnjeC:
厉害,困扰了我很久
MyBatis排序时使用order by 动态参数时需要注意,用$而不是# -
TopLongMan:
非常好,很实用啊。。
PostgreSQL递归查询实现树状结构查询
http://my.oschina.net/mkh/blog/493885
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下
import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.URI; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.xml.XMLSerializer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; public class Email { private static final String USERNAME = "123456@qq.com";//发送邮件的用户名 private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码 private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host public static void main(String args[]) { try { sendEmail("测试邮件", "测试邮件内容!", "test@qq.com"); System.out.println("email ok !"); } catch (MessagingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取 * @param to 目标邮箱(可以多个邮箱,用,号隔开) * @param job 通过mapreduce的job获取jobID * @param time 通过时间戳访问错误日志路径 * @throws Exception */ public static void sendErrMail(String to, Job job, String time) throws Exception { String subject = job.getJobName(); String message = getErr(job, time); LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(message, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写 * @param subject 主题 * @param body 内容 * @param to 目标邮箱 * @throws MessagingException */ public static void sendEmail(String subject, String body, String to) throws MessagingException { LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(body, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 获取日志文件 * @param job * @param time * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream getFile(Job job, String time) throws IOException { String year = time.substring(0, 4); String month = time.substring(4, 6); String day = time.substring(6, 8); String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/" + year + "/" + month + "/" + day + "/000000"; FileSystem fs = FileSystem.get(URI.create(dst), new Configuration()); FileStatus[] status = fs.listStatus(new Path(dst)); FSDataInputStream in = null; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName() .contains(job.getJobID().toString()) && status[i].getPath().getName().endsWith("jhist")) { in = new FSDataInputStream(fs.open(status[i].getPath())); } } return in; } /** * @category 解析文件类容为xml * @param job * @param time * @return xml * @throws IOException * @throws InterruptedException */ public static String getErr(Job job, String time) throws IOException, InterruptedException { FSDataInputStream in = getFile(job, time); Thread t1 = new Thread(); while (in == null) { t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成 t1.join(); in = getFile(job, time); } BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = ""; JSONObject jo; JSONArray jsa = new JSONArray(); String xml = ""; XMLSerializer xmlSerializer = new XMLSerializer(); while ((line = br.readLine()) != null) { if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) { jo = JSONObject.fromObject(line); jsa.add(jo); } } xml = xmlSerializer.write(jsa); in.close(); br.close(); return xml; } /** * @category 获取try-catch中的异常内容 * @param e Exception * @return 异常内容 */ public static String getException(Exception e) { ByteArrayOutputStream out = new ByteArrayOutputStream(); PrintStream pout = new PrintStream(out); e.printStackTrace(pout); String ret = new String(out.toByteArray()); pout.close(); try { out.close(); } catch (Exception ex) { } return ret; } } class LoginMail extends Authenticator { private String username; private String password; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } public LoginMail(String username, String password) { this.username = username; this.password = password; } }
发表评论
-
Java Comparable和Comparator
2016-06-26 08:52 694http://my.oschina.net/android52 ... -
Java集合框架之fastutil & koloboke
2016-06-23 14:04 2471Java集合框架之fastutil http://rensan ... -
ehcache 分布式支持
2016-06-05 22:26 1098原文 http://my.oschina.net/glenxu ... -
Intellij IDEA插件开发入门
2016-05-26 11:42 2882原文: http://blog.csdn.net/dc_726 ... -
阿里巴巴Druid数据源的配置与使用
2016-05-24 17:42 1544http://my.oschina.net/wjme/blog ... -
mysql中间件研究(Atlas,cobar,TDDL), 分库分表插件
2016-05-09 14:15 3444http://www.guokr.com/blog/47576 ... -
Java集合: Queue和Deque
2016-05-09 09:49 1862Queue http://my.oschina.net/kev ... -
使用gzip优化web应用(filter实现)
2016-05-07 01:45 1031使用gzip优化web应用(filter实现) http:// ... -
Byteman 3.0.5 发布,Java 字节码注入工具
2016-04-23 10:29 1769Byteman 3.0.5 发布,Java 字 ... -
RandomStringUtils的说明和生成随机汉字
2016-04-20 15:21 1389更多参考: http://my.oschina.net/wil ... -
通过IP地址获取地理位置
2016-04-20 15:19 894http://my.oschina.net/githubhty ... -
Java编程中使用正则表达式过滤非数字字符串
2016-04-14 13:51 1716/** * * @param str ... -
非对称加密DH算法,DH代码实现
2016-04-13 11:33 1356RSA算法原理(一)http:// ... -
企业支付宝账号开发接口教程
2016-03-31 14:52 1236企业支付宝账号开发接口教程--JAVA-UTF-8(实际操作- ... -
java double类型数据操作工具类
2016-03-28 17:36 1239http://my.oschina.net/yxwblog/b ... -
double转换到BigDecimal
2016-03-28 17:11 1545BigDecimal b = new BigDecimal(d ... -
Java 生成好看的验证码
2016-03-23 10:52 3357http://www.oschina.net/code/sni ... -
Linux环境安装配置Swftools
2016-03-22 21:01 1105http://tetop.blog.51cto.com/188 ... -
java压缩与解压缩文件
2016-03-20 22:03 1469http://www.oschina.net/code/sni ... -
java图像压缩
2016-03-19 23:20 951http://my.oschina.net/686991/bl ...
相关推荐
MapReduce通过将任务分解为Map和Reduce两个阶段来简化并行处理过程。Map阶段负责将输入数据切分成小块并进行处理,Reduce阶段负责汇总处理结果。 #### 三、Hadoop环境搭建 Hadoop的环境搭建可以分为三种模式:单机...
在Hadoop中,MapReduce作业可以通过JobTracker(Hadoop 1.x)或YARN(Hadoop 2.x及以后版本)进行调度和监控,确保分布式计算的正确性和效率。 综上所述,这个项目可能是设计一个利用Hadoop MapReduce处理大规模...
- **邮件提醒**:在任务成功或失败时发送邮件通知。 ##### 1.6 Azkaban的架构 Azkaban的核心组件包括: 1. **AzkabanWebServer**:作为主要的管理者,负责用户认证、项目管理、定时执行工作流以及跟踪执行进度等...
仿真实验结果表明,使用DAG图结合的MapReduce任务分配方法能够有效地提升云计算中计算节点的工作效率,并且通过实验结果验证了该算法的正确性。这一研究成果对于云计算系统的优化和资源分配具有重要的指导意义,它...
- **MapReduce作业**:任务执行状态,作业延迟,资源利用率。 - **YARN资源**:Container分配,内存和CPU使用,队列容量。 6. **报警与通知** Zabbix提供了灵活的报警机制,可根据预设阈值配置邮件通知。例如,...
4. 任务监控:提供实时的任务监控界面,可查看任务状态、日志、执行历史等信息。 5. 任务告警:支持任务执行失败时的通知机制,通过邮件、短信等方式提醒管理员。 6. API接口:提供了丰富的API接口,方便集成到各类...
在大数据处理场景中,通常会涉及到多个独立或相互依赖的任务单元,如Shell脚本、Java程序、MapReduce任务和Hive脚本等。这些任务单元之间存在时间先后顺序和依赖关系,例如在数据清洗和分析过程中,原始数据首先需要...
每个应用都由一个ApplicationMaster负责管理,它负责与ResourceManager协调资源,并监控应用各个任务的执行。 这本书籍系列是Addison-Wesley Data and Analytics Series的一部分,该系列致力于提供读者在数据分析...
提供RESTful API接口,方便与其他系统集成,实现自动化部署和监控。 8. **Web界面**: Web UI界面直观易用,提供任务创建、修改、查看、启动、停止等操作,并有实时监控和日志查看功能。 9. **扩展性**: ...
大数据系统通常涉及多个任务单元,如Shell脚本、Java程序、MapReduce任务和Hive脚本,这些任务之间存在时间先后和依赖关系。Azkaban通过其工作流调度能力,自动管理这些复杂流程,确保任务按照正确的顺序执行,减少...
3. **作业(Jobs)**:作业是工作流中的基本单元,可以是单一的命令、脚本或者Hadoop MapReduce任务等。 4. **调度器(Scheduler)**:调度器负责根据预设的时间表或触发条件启动工作流。 5. **执行器(Executor)**...
其次,可以通过跟踪代码执行流程,模拟数据读写或MapReduce任务执行,加深对系统运行机制的理解。最后,可以参考Hadoop社区的邮件列表和JIRA问题追踪系统,了解开发者如何解决实际问题。 总的来说,Hadoop 2.7源...
告警管理能实时监控系统和任务状态,通过邮件、短信等方式发送告警通知。调度平台支持多种任务类型,并提供灵活的调度策略。事件生成和引用功能使得任务间的交互更加智能化。异常处理功能如重跑、补数据、停止、暂停...
- JobTracker:在Hadoop 1.0中负责管理MapReduce任务的执行,调度任务到TaskTracker,并监控它们的执行状态。 - TaskTracker:运行MapReduce任务,接受JobTracker的调度,并把任务执行状态报告给JobTracker。 - ...
为了方便开发者在Eclipse IDE中编写、调试和部署Hadoop MapReduce任务,专门开发了一款名为"Hadoop Plugin"的插件。这个插件极大地简化了Hadoop开发过程,提高了开发效率。 1. **Hadoop Plugin概述** Hadoop ...
HDFS的NameNode负责监控和调度数据存储,而MapReduce的JobTracker负责监控和调度数据的并行处理。从节点负责数据的存储和计算任务,每个从节点既是数据节点,也负责与主节点通信的守护进程。 Client机器负责将数据...
2. **工作流定义**:Azkaban使用JSON格式定义工作流,每个工作流由一系列任务(或称为作业)组成,这些任务可以是命令行脚本、Hadoop MapReduce作业、Spark作业等。任务之间可以通过依赖关系设定执行顺序,例如A任务...