sqoop 求订单完成是平均时间精确到0.1天
package hdfs.demo3; import hdfs.constants.DemoLineInputFormat; import hdfs.constants.Utils; import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.Date; import java.util.Vector; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Demo3 { //解析 hdfs中的数据 //DAYTIME 在下标为 51位置的值 //RCV_DATE 在下标为 47位置的值 //DISPOSEID 在下标为 24位置的值 public static class MapHDFS extends Mapper<LongWritable,Text, LongWritable, Text>{ LongWritable rkey = new LongWritable(); Text rval = new Text(); long time; //一次交易花费的时间 String[] strs=new String[1]; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { strs=value.toString().split("\1"); Date startTime= Utils.getDate(strs[51]); Date stopTime= Utils.getDate(strs[47]); if(startTime==null||stopTime==null){ return ; } time=stopTime.getTime()/1000-startTime.getTime()/1000; if(time<0){ //错误数据 return ; } rkey.set(time%1000); //毫秒 变 秒 rval.set(time/3600+"") ; // 精确到小时 context.write(rkey, rval); // 输出到Combiner中 并组合 } } static class Combin extends Reducer<LongWritable,Text, LongWritable, Text>{ LongWritable wkey = new LongWritable(); Text val = new Text(); public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum =0; int count=0; for(Text v:values){ count++; sum+=Long.valueOf(v.toString()); } val.set("#"+count+"#"+sum); context.write(wkey,val); } } public static class Reduce extends Reducer<LongWritable, Text, Text, Text> { Text writKey = new Text(); Text writValue = new Text(); public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long count=0; long sum = 0; String [] strs=new String [2]; for (Text element : values) { strs=element.toString().split("#"); count+=Long.valueOf(strs[1]); // key sum+=Long.valueOf(strs[2]); // value } Timestamp tt=new Timestamp(sum/count*3600*1000); writKey.set(count+""); writValue.set(getDate(tt.getTime())+"\t"+getDayDate(tt.getTime())); context.write(writKey, writValue); } } /** * 取天 精确到0.1天 * @param tim 时间戳 毫秒级别 * @return */ private static String getDayDate(Long tim){ String str =tim+":时间:\t"; str+=new BigDecimal(tim/(1000*60*60*24.0)).setScale(2, BigDecimal.ROUND_HALF_UP)+"天 "; return str; } /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /** * ./sqoop import --connect jdbc:oracle:thin:@10.10.35.65:1521/crm_standby * --username CRMDEV --password crmdev --table ORDR_MAIN * --where 'rownum<1000' --fields-terminated-by '\0x001' --lines-terminated-by '\0x002' */ String outputPath="hdfs://172.19.121.125:9000/user/admin/ORDR_MAIN"; new Demo3().avgTime(outputPath,conf); } private void avgTime(String path,Configuration conf){ Path outPath=new Path(path+"_avg"); try { FileSystem fs=outPath.getFileSystem(conf); fs.deleteOnExit(outPath); } catch (IOException e1) { // e1.printStackTrace(); } try { Job job = new Job(conf, "tAvgTime"); // 设置map job.setMapperClass(MapHDFS.class); job.setCombinerClass(Combin.class); job.setReducerClass(Reduce.class); // 设置输出类型 (付款时间和结账时间的 时间戳值) job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(DemoLineInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(path)); // 设置输入输出路径 FileOutputFormat.setOutputPath(job, new Path(path+"_avg")); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
main 方法中贴出的sqoop 语句 由于指定了字段分隔符和行分隔符所以需要自定义LineReader分割文件;
参考读取text文件的方法
复制 org.apache.hadoop.mapreduce.lib.input.LineInputFormat 为 DemoLineInputFormat
复制 org.apache.hadoop.mapreduce.lib.input.LineRecordReader 为 DemoLineRecordReader
复制 org.apache.hadoop.util.LineReader 为 DemoLineReader
job 设置读取文件的 InputFormat
job.setInputFormatClass(DemoLineInputFormat.class);
修改他们的引用关系:
DemoLineInputFormat 中的 LineRecordReader 修改为DemoLineRecordReader
DemoLineRecordReader 中的 LineReader 修改为DemoLineReader
修改 DemoLineReader 的常量LF 为sqoop导出语句中的行分隔符 '\2'
private static final byte LF = '\2'; // 默认为'\n' //HDFS文件的换行符
相关推荐
* 准备工作:在 Intellij IDEA 或者是 (My)Eclipse 中导入该 Maven 项目,会根据 pom.xml 自动下载依赖包,时间可能有些长。如果遇到网络状况不佳,自行百度一下怎么在这些开发环境中更换 maven 源为国内的源,等待...
2. **数据导入**:将数据文件(如data.txt)复制到Hadoop环境,创建Hive数据库和表结构,字段包括电影ID、名称、投票人数、类型、产地、上映时间、时长、年代、评分和首映地点。使用LOAD DATA命令将数据加载到Hive表...
在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取高频词语。 Hadoop的优势在于其高容错性和可扩展性,能够处理PB级别的数据。通过分布式计算,即使面对...
例如,时间序列图显示订单量随时间的变化,柱状图或饼图展示各菜品销量,热力图揭示不同地区的订单分布。这些可视化结果可以帮助管理层快速把握业务状况,制定决策。 在这个系统中,可能还需要考虑数据流的实时性。...
- **读测试**:读取同样大小的10个文件,平均读取速率高达44.81MB/s,测试执行时间缩短至67.595秒,表明Hadoop在数据读取方面表现优秀,且速度较快。 - **清理测试**:使用TestDFSIO的clean选项清除了测试数据,...
其次,FlexScheduler引入了多种调度指标,通过动态调整资源分配策略,实现对不同指标的优化,例如最小化平均完成时间。这种灵活的调度策略,使得Hadoop平台能够更好地适应各种应用场景,提高整体性能。 总结: IBM...
3. 数据分组与统计:在Reduce阶段,根据特定的键(如IP地址、时间窗口)对数据进行分组,并计算每个分组的总流量、平均流量、峰值流量等指标。 4. 异常检测:通过设定阈值或者使用机器学习算法,识别异常流量模式,...
天气数据的分析可能涉及MapReduce,例如,统计每一年的平均气温、最高温度和最低温度等。 3. **压缩与Gzip**:1901.gz和1902.gz是使用Gzip压缩算法压缩的文件。Gzip是一种广泛使用的数据压缩工具,它可以减少网络...
例如,可能需要计算某个地区的平均气温、最高最低温度、降雨量等气象指标。 接着,我们提到了SSM框架,这是一个由Spring Boot、Spring MVC和MyBatis组成的Java开发框架,常用于构建Web应用。在这里,SSM框架可能...
接下来,我们可能会对通话记录进行一系列统计分析,例如计算每个用户的平均通话时长、通话频率、最常联系的号码等。这些分析可以帮助我们理解用户的通信行为模式,进一步可以用于客户细分、预测用户需求或识别异常...
LATE(Least Average Time to End)调度算法是一种试图最小化平均完成时间的算法。然而,在面对Hadoop集群的异构性(即节点间硬件配置差异)和不同的工作负载时,LATE调度算法的表现并不尽如人意。具体来说,LATE...
"avg-time hadoop程序"的标题和描述暗示了我们正在讨论一个与计算平均时间相关的Hadoop程序。这个程序可能设计用于分析数据集中的时间戳信息,以计算各种操作或事件的平均耗时。在Hadoop环境中,这种类型的程序通常...
1. **订单数据**:包含订单编号、用户ID、商户ID、菜品信息、下单时间、送达时间等。通过对订单数据的分析,可以了解订单量的时空分布,高峰期和低谷期,以及用户喜好。 2. **用户行为数据**:如浏览记录、搜索...
- **Reduce**阶段示例:对每个年份的温度求平均值,输出年份及其平均温度。 #### 大数据案例 本节将通过一个具体的案例——处理天气数据,进一步理解MapReduce的工作原理。 **原始数据**:每行一条记录,每行...
| ost4 (Datanode) | eth0:192.168.188.204 | 容量:80G 接口标准:IDE 转速:7200rpm 缓存容量:2M 平均寻道时间:9ms 传输标准:ATA133 | - **角色分配**: - **Namenode**: ost2和ost3 - **...
实验结果分析表明,对于非排序的基准测试程序,使用Spark或Flink替代Hadoop,分别带来平均77%和70%执行时间的降低。整体上,Spark的性能结果最好;而Flink通过使用的显式迭代程序,极大提高了迭代算法的性能。
4. **时间序列分析**:分析电影评分随时间的变化,了解用户口味的变迁。 5. **推荐系统**:基于用户历史评分,构建协同过滤算法来推荐相似口味的电影。 6. **情感分析**:对用户评论进行情感分析,了解公众对电影的...
例如,我们可以编写一个Map函数,将每条气象记录按照日期和地点进行分区,然后Reduce函数对每个分区内的数据进行汇总,计算特定地点在一段时间内的平均温度。 此外,Hadoop生态中的其他工具,如Pig、Hive和Spark,...
- 蓝汛使用了6000台设备,300个集群,每个集群平均由30台服务器组成,整体使用率为40%。 - 每日处理的数据量相当庞大,包括6TB的扫描数据,1.5TB的输出数据,以及3TB的原始LZO压缩数据。 2. **Cloudera和其产品**...
如果数据包含评分,还可以绘制评分分布图,探索电影的平均评分、评分标准差等统计特性,帮助理解用户的满意度。 标签中的“生活娱乐”和“数据分析”表明这个项目不仅关注技术实现,还关注如何将分析结果应用于实际...