`

hadoop 求平均时间

 
阅读更多

   sqoop 求订单完成是平均时间精确到0.1天

package hdfs.demo3;


import hdfs.constants.DemoLineInputFormat;
import hdfs.constants.Utils;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Vector;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Demo3 {  
 
	//解析 hdfs中的数据
	//DAYTIME 在下标为 51位置的值
	//RCV_DATE 在下标为 47位置的值
	//DISPOSEID 在下标为 24位置的值
	  public static class MapHDFS extends Mapper<LongWritable,Text, LongWritable, Text>{
		  	LongWritable	rkey	= new LongWritable();
		  	Text	rval	= new Text();
			long time; //一次交易花费的时间
			String[] strs=new String[1];
			public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
				strs=value.toString().split("\1");
				  
				Date startTime= Utils.getDate(strs[51]);
				Date stopTime= Utils.getDate(strs[47]);
				if(startTime==null||stopTime==null){
					return ;
				}
				time=stopTime.getTime()/1000-startTime.getTime()/1000;
				if(time<0){ //错误数据
					return ;
				}
				rkey.set(time%1000);  //毫秒 变 秒
				rval.set(time/3600+"") ; // 精确到小时 
				context.write(rkey, rval); // 输出到Combiner中 并组合
			}
	  }
	  
	  static class Combin extends Reducer<LongWritable,Text, LongWritable, Text>{
		  LongWritable	wkey	= new LongWritable();  
		  Text	val	= new Text();
		  public  void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			  int sum =0;
			  int count=0;
			  for(Text v:values){
				  count++;
				  sum+=Long.valueOf(v.toString());
			  }
			  val.set("#"+count+"#"+sum);
			  context.write(wkey,val);
		  }
	  }

	public static class Reduce extends Reducer<LongWritable, Text, Text, Text> {
		Text writKey = new Text();
		Text writValue = new Text();
		public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			long count=0;
			long sum = 0;
			String [] strs=new String [2];
			for (Text element : values) {
				strs=element.toString().split("#");
				count+=Long.valueOf(strs[1]); // key
				sum+=Long.valueOf(strs[2]); // value
			}
			Timestamp tt=new Timestamp(sum/count*3600*1000);
			writKey.set(count+"");
			writValue.set(getDate(tt.getTime())+"\t"+getDayDate(tt.getTime()));
			context.write(writKey, writValue);
		}

	}
	
	/**
	 *  取天 精确到0.1天
	 * @param tim 时间戳 毫秒级别
	 * @return
	 */
	private static String getDayDate(Long tim){
		String str =tim+":时间:\t";
		str+=new BigDecimal(tim/(1000*60*60*24.0)).setScale(2, BigDecimal.ROUND_HALF_UP)+"天 ";
		return str;
	}
	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		/**
		 * ./sqoop import --connect jdbc:oracle:thin:@10.10.35.65:1521/crm_standby  
		 *  --username CRMDEV --password crmdev --table ORDR_MAIN 
		 * --where 'rownum<1000' --fields-terminated-by '\0x001' --lines-terminated-by '\0x002'
		 */
		String outputPath="hdfs://172.19.121.125:9000/user/admin/ORDR_MAIN";
		new Demo3().avgTime(outputPath,conf);
	}
	 
	
	private void avgTime(String path,Configuration conf){
			Path outPath=new Path(path+"_avg");
			try {
				FileSystem fs=outPath.getFileSystem(conf);
				fs.deleteOnExit(outPath);
			} catch (IOException e1) {
//				e1.printStackTrace();
			}
		try {
			Job job = new Job(conf, "tAvgTime");
			
			// 设置map
			job.setMapperClass(MapHDFS.class);
			job.setCombinerClass(Combin.class);
			job.setReducerClass(Reduce.class);
			
			// 设置输出类型 (付款时间和结账时间的  时间戳值)
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(Text.class);
			
			job.setInputFormatClass(DemoLineInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			FileInputFormat.addInputPath(job, new Path(path));
			// 设置输入输出路径
			FileOutputFormat.setOutputPath(job, new Path(path+"_avg"));
			
			job.waitForCompletion(true);
			} catch (Exception e) {
				e.printStackTrace();
			}
					
	}
	
	
}

 

main 方法中贴出的sqoop 语句 由于指定了字段分隔符和行分隔符所以需要自定义LineReader分割文件;

       参考读取text文件的方法

复制 org.apache.hadoop.mapreduce.lib.input.LineInputFormat      为  DemoLineInputFormat

复制 org.apache.hadoop.mapreduce.lib.input.LineRecordReader  为  DemoLineRecordReader

复制 org.apache.hadoop.util.LineReader              为  DemoLineReader

 

         job 设置读取文件的 InputFormat

job.setInputFormatClass(DemoLineInputFormat.class);

          修改他们的引用关系:

 DemoLineInputFormat     中的  LineRecordReader 修改为DemoLineRecordReader

 DemoLineRecordReader 中的  LineReader  修改为DemoLineReader

 

           修改 DemoLineReader 的常量LF  为sqoop导出语句中的行分隔符 '\2'

private static final byte LF = '\2'; //  默认为'\n'   //HDFS文件的换行符

 

 

分享到:
评论

相关推荐

    Hadoop 分析统计学生考试成绩1

    * 准备工作:在 Intellij IDEA 或者是 (My)Eclipse 中导入该 Maven 项目,会根据 pom.xml 自动下载依赖包,时间可能有些长。如果遇到网络状况不佳,自行百度一下怎么在这些开发环境中更换 maven 源为国内的源,等待...

    基于Hadoop豆瓣电影数据分析实验报告

    2. **数据导入**:将数据文件(如data.txt)复制到Hadoop环境,创建Hive数据库和表结构,字段包括电影ID、名称、投票人数、类型、产地、上映时间、时长、年代、评分和首映地点。使用LOAD DATA命令将数据加载到Hive表...

    基于Hadoop的电影影评数据分析

    在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取高频词语。 Hadoop的优势在于其高容错性和可扩展性,能够处理PB级别的数据。通过分布式计算,即使面对...

    Hadoop之外卖订单数据分析系统

    例如,时间序列图显示订单量随时间的变化,柱状图或饼图展示各菜品销量,热力图揭示不同地区的订单分布。这些可视化结果可以帮助管理层快速把握业务状况,制定决策。 在这个系统中,可能还需要考虑数据流的实时性。...

    hadoop性能测试报告

    - **读测试**:读取同样大小的10个文件,平均读取速率高达44.81MB/s,测试执行时间缩短至67.595秒,表明Hadoop在数据读取方面表现优秀,且速度较快。 - **清理测试**:使用TestDFSIO的clean选项清除了测试数据,...

    IBM基于Hadoop工作的简介

    其次,FlexScheduler引入了多种调度指标,通过动态调整资源分配策略,实现对不同指标的优化,例如最小化平均完成时间。这种灵活的调度策略,使得Hadoop平台能够更好地适应各种应用场景,提高整体性能。 总结: IBM...

    hadoop流量统计程序

    3. 数据分组与统计:在Reduce阶段,根据特定的键(如IP地址、时间窗口)对数据进行分组,并计算每个分组的总流量、平均流量、峰值流量等指标。 4. 异常检测:通过设定阈值或者使用机器学习算法,识别异常流量模式,...

    Hadoop权威指南天气数据

    天气数据的分析可能涉及MapReduce,例如,统计每一年的平均气温、最高温度和最低温度等。 3. **压缩与Gzip**:1901.gz和1902.gz是使用Gzip压缩算法压缩的文件。Gzip是一种广泛使用的数据压缩工具,它可以减少网络...

    Hadoop分析气象数据完整版代码

    例如,可能需要计算某个地区的平均气温、最高最低温度、降雨量等气象指标。 接着,我们提到了SSM框架,这是一个由Spring Boot、Spring MVC和MyBatis组成的Java开发框架,常用于构建Web应用。在这里,SSM框架可能...

    基于hadoop的电信客服数据分析+文档

    接下来,我们可能会对通话记录进行一系列统计分析,例如计算每个用户的平均通话时长、通话频率、最常联系的号码等。这些分析可以帮助我们理解用户的通信行为模式,进一步可以用于客户细分、预测用户需求或识别异常...

    Hadoop+平台下改进的+LATE+调度算法

    LATE(Least Average Time to End)调度算法是一种试图最小化平均完成时间的算法。然而,在面对Hadoop集群的异构性(即节点间硬件配置差异)和不同的工作负载时,LATE调度算法的表现并不尽如人意。具体来说,LATE...

    avg-time hadoop程序

    "avg-time hadoop程序"的标题和描述暗示了我们正在讨论一个与计算平均时间相关的Hadoop程序。这个程序可能设计用于分析数据集中的时间戳信息,以计算各种操作或事件的平均耗时。在Hadoop环境中,这种类型的程序通常...

    基于Hadoop的美团外卖数据分析.zip

    1. **订单数据**:包含订单编号、用户ID、商户ID、菜品信息、下单时间、送达时间等。通过对订单数据的分析,可以了解订单量的时空分布,高峰期和低谷期,以及用户喜好。 2. **用户行为数据**:如浏览记录、搜索...

    Hadoop编程思想

    - **Reduce**阶段示例:对每个年份的温度求平均值,输出年份及其平均温度。 #### 大数据案例 本节将通过一个具体的案例——处理天气数据,进一步理解MapReduce的工作原理。 **原始数据**:每行一条记录,每行...

    Hadoop_HDFS安装和管理

    | ost4 (Datanode) | eth0:192.168.188.204 | 容量:80G 接口标准:IDE 转速:7200rpm 缓存容量:2M 平均寻道时间:9ms 传输标准:ATA133 | - **角色分配**: - **Namenode**: ost2和ost3 - **...

    基于Hadoop、Spark及Flink大规模数据分析的性能评价

    实验结果分析表明,对于非排序的基准测试程序,使用Spark或Flink替代Hadoop,分别带来平均77%和70%执行时间的降低。整体上,Spark的性能结果最好;而Flink通过使用的显式迭代程序,极大提高了迭代算法的性能。

    Hadoop电影数据集,包含字段说明

    4. **时间序列分析**:分析电影评分随时间的变化,了解用户口味的变迁。 5. **推荐系统**:基于用户历史评分,构建协同过滤算法来推荐相似口味的电影。 6. **情感分析**:对用户评论进行情感分析,了解公众对电影的...

    《hadoop权威指南》第二章的气象数据文件

    例如,我们可以编写一个Map函数,将每条气象记录按照日期和地点进行分区,然后Reduce函数对每个分区内的数据进行汇总,计算特定地点在一段时间内的平均温度。 此外,Hadoop生态中的其他工具,如Pig、Hive和Spark,...

    大数据云计算技术 Hadoop运维笔记(共21页).pptx

    - 蓝汛使用了6000台设备,300个集群,每个集群平均由30台服务器组成,整体使用率为40%。 - 每日处理的数据量相当庞大,包括6TB的扫描数据,1.5TB的输出数据,以及3TB的原始LZO压缩数据。 2. **Cloudera和其产品**...

    基于Hadoop的豆瓣电影影评数据分析(word文档)

    如果数据包含评分,还可以绘制评分分布图,探索电影的平均评分、评分标准差等统计特性,帮助理解用户的满意度。 标签中的“生活娱乐”和“数据分析”表明这个项目不仅关注技术实现,还关注如何将分析结果应用于实际...

Global site tag (gtag.js) - Google Analytics