package
com.xxx.analyse.job;
import
java.io.IOException;
import
java.sql.Connection;
import
java.sql.SQLException;
import
java.util.Iterator;
import
java.util.Map;
import
java.util.concurrent.ConcurrentHashMap;
import
org.apache.commons.lang3.StringUtils;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
com.xxx.analyse.jdbc.DBConnPool;
import
com.xxx.analyse.nginx.SysNodeCache;
import
com.xxx.analyse.nginx.TrackNodeBatchInsertDB;
import
com.xxx.analyse.parser.NginxLogParser;
import
com.xxx.analyse.po.NginxLog;
import
com.xxx.analyse.po.TrackNode;
import
com.xxx.analyse.util.Converter;
import
com.xxx.analyse.util.DateUtil;
import
com.xxx.analyse.util.HdfsFileOuter;
import
com.xxx.analyse.util.HdfsFileUtil;
import
com.xxx.analyse.util.RedisUtil;
/**
* @Title: 用户轨迹流量统计PV、UV
* @Description: Mapper & Reducer
* @Team: 技术1部Java开发小组
* @Author Andy-ZhichengYuan
* @Date 2015年11月25日
* @Version V1.0 */
public
class
TrackNodeJob {
/** 存放Redis缓存的Key:t_bi_nodev_dlog表数据,TrackNode对象 */
public
static
final
String RedisKey =
"bi_nodev_dlog"
;
/** 当前时间的配置Key */
private
static
final
String DayLogDate =
"TrackNodeJobDate"
;
private
static
final
String RedisNodeID =
"NodeID_"
;
/** 用户轨迹流量统计的Mapper */
public
static
class
TrackNodeMapper
extends
Mapper<Object, Text, Text, IntWritable> {
private
Text PreNodePvKey =
new
Text();
private
Text PreNodeUvKey =
new
Text();
private
Text NodePvKey =
new
Text();
private
Text NodeUvKey =
new
Text();
private
final
static
IntWritable PreNodePv =
new
IntWritable(
1
);
private
final
static
IntWritable PreNodeUv =
new
IntWritable(
1
);
private
final
static
IntWritable NodePv =
new
IntWritable(
1
);
private
final
static
IntWritable NodeUv =
new
IntWritable(
1
);
@Override
protected
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
String line = value.toString();
NginxLog log = NginxLogParser.parseNginxLogAll(line);
if
(log !=
null
&& log.isValid()) {
Long iPreNode = SysNodeCache.getNodeIdByURL(log.getRequest().getsReferUrl());
Long iNode = SysNodeCache.getNodeIdByURL(log.getRequest().getsPageUrl());
Long staySecond = log.getRequest().getStaySeconds();
if
(staySecond !=
null
&& staySecond.longValue() >
0
) {
Map<String, String> map = RedisUtil.getMapRedisCacheInfo(RedisKey);
if
(map ==
null
) {
map =
new
ConcurrentHashMap<String, String>();
map.put(
""
+iNode,
""
+staySecond);
RedisUtil.setMapRedisCacheInfo(RedisKey, map, RedisUtil.DAY);
}
else
{
if
( map.containsKey(
""
+iNode) ) {
staySecond += Converter.parseLong( map.get(
""
+iNode) );
map.put(
""
+iNode,
""
+staySecond);
}
else
{
map.put(
""
+iNode,
""
+staySecond);
}
RedisUtil.setMapRedisCacheInfo(RedisKey, map, RedisUtil.DAY);
}
}
PreNodePvKey.set( iPreNode +
"|"
+ iNode );
context.write(PreNodePvKey, PreNodePv);
NodePvKey.set(
""
+ iNode );
context.write(NodePvKey, NodePv);
PreNodeUvKey.set( iPreNode +
"|"
+ iNode +
":"
+ log.getsGuid() );
context.write(PreNodeUvKey, PreNodeUv);
NodeUvKey.set( iNode +
":"
+ log.getsGuid() );
context.write(NodeUvKey, NodeUv);
}
line =
null
;
log =
null
;
}
}
/** 组合器Combiner:在Map输出于Reduce之前做合并计算,其实质是Reduce的处理逻辑 */
public
static
class
TrackNodeCombiner
extends
Reducer<Text, IntWritable, Text, IntWritable> {
private
IntWritable result =
new
IntWritable();
public
void
reduce(Text key, Iterable<IntWritable> values, Context context)
throws
IOException, InterruptedException {
int
sum =
0
;
for
(IntWritable val : values) {
sum += val.get();
}
String mapKey = key.toString();
if
( StringUtils.contains(mapKey,
":"
) ) {
mapKey = (StringUtils.substringBeforeLast(mapKey,
":"
) +
":"
);
result.set( sum );
context.write(
new
Text(mapKey), result);
}
else
{
result.set( sum );
context.write(key, result);
}
}
}
/** 用户轨迹流量统计的Reduce类 */
public
static
class
TrackNodeReducer
extends
Reducer<Text, IntWritable, TrackNode, Text> {
private
Long iDate =
null
;
public
void
reduce(Text key, Iterable<IntWritable> values, Context context)
throws
IOException, InterruptedException {
long
sum =
0
;
String mapKey = key.toString();
int
iType =
this
.getKeyType(mapKey);
String iNode =
this
.getNodeID(iType, mapKey);
if
(iType ==
1
|| iType ==
3
) {
Iterator<IntWritable> itr = values.iterator();
while
( itr.hasNext() ) {
itr.next();
sum ++;
}
}
else
{
for
(IntWritable val : values) {
sum += val.get();
}
}
if
(iDate ==
null
|| iDate.longValue() ==
0
) {
Configuration conf = context.getConfiguration();
String yesterday = DateUtil.getAgoBackDate(-
1
);
Long iDateDef = DateUtil.getDateSec( yesterday );
iDate = conf.getLong(DayLogDate, iDateDef);
}
TrackNode tNode = RedisUtil.getJson2ObjectCacheInfo(RedisNodeID+iNode, TrackNode.
class
);
if
(tNode !=
null
) {
switch
(iType) {
case
1
:
if
(tNode.getPreNode() ==
0
) {
tNode.setPreNode(Converter.parseLong(
this
.getPreNodeID(mapKey) ));
}
Long iPreUv = tNode.getPreUV();
tNode.setPreUV(iPreUv + sum);
break
;
case
2
:
if
(tNode.getPreNode() ==
0
) {
tNode.setPreNode(Converter.parseLong(
this
.getPreNodeID(mapKey) ));
}
Long iPrePv = tNode.getPrePV();
tNode.setPrePV(iPrePv + sum);
break
;
case
3
:
Long iUv = tNode.getUv();
tNode.setUv(iUv + sum);
break
;
default
:
Long iPv = tNode.getPv();
tNode.setPv(iPv + sum);
break
;
}
RedisUtil.setObject2JsonCacheInfo(RedisNodeID+iNode, tNode, RedisUtil.DAY);
}
else
{
tNode =
new
TrackNode();
tNode.setNode( Converter.parseLong(iNode) );
tNode.setCreateTime( iDate );
switch
(iType) {
case
1
:
tNode.setPreNode(Converter.parseLong(
this
.getPreNodeID(mapKey) ));
tNode.setPreUV( sum );
break
;
case
2
:
tNode.setPreNode(Converter.parseLong(
this
.getPreNodeID(mapKey) ));
tNode.setPrePV( sum );
break
;
case
3
:
tNode.setUv( sum );
break
;
default
:
tNode.setPv( sum );
break
;
}
RedisUtil.setObject2JsonCacheInfo(RedisNodeID+iNode, tNode, RedisUtil.DAY);
}
}
/** 根据Key判断业务类型:
*<p>1 = UV Key:iPreNode +"|"+ iNode +":"+ GUID </p>
*<p>2 = PV Key:iPreNode +"|"+ iNode </p>
*<p>3 = UV Key:iNode +":"+ GUID </p>
* 0 = PV Key:iNode (default) */
public
int
getKeyType(String mapKey) {
int
iType =
0
;
if
(StringUtils.contains(mapKey,
"|"
) && StringUtils.contains(mapKey,
":"
)) {
iType =
1
;
}
else
{
if
(StringUtils.contains(mapKey,
"|"
)) {
iType =
2
;
}
else
{
if
(StringUtils.contains(mapKey,
":"
)) {
iType =
3
;
}
else
{
iType =
0
;
}
}
}
return
iType;
}
/** 根据业务类型来获取当前节点的ID */
public
String getNodeID(
int
iType, String mapKey) {
switch
(iType) {
case
1
:
return
StringUtils.substringBetween(mapKey,
"|"
,
":"
);
case
2
:
return
StringUtils.substringAfterLast(mapKey,
"|"
);
case
3
:
return
StringUtils.substringBefore(mapKey,
":"
);
default
:
return
mapKey;
}
}
/** 仅当存在上游节点时,才根据MapKey来获取上游节点的ID;否则返回null */
public
String getPreNodeID(String mapKey) {
if
( StringUtils.contains(mapKey,
"|"
) ) {
return
StringUtils.substringBefore(mapKey,
"|"
);
}
return
null
;
}
}
/** 运行TrackNodeJob用户轨迹流量统计的MapReduce作业 */
public
static
int
runTrackNode2File(String input, String output, Long iDate) {
int
exit =
0
;
try
{
Configuration conf =
new
Configuration();
conf.setLong(DayLogDate, iDate);
Job job = Job.getInstance(conf,
"t_bi_nodev_dlog"
);
job.setJarByClass(TrackNodeJob.
class
);
job.setMapperClass(TrackNodeMapper.
class
);
job.setCombinerClass(TrackNodeCombiner.
class
);
job.setReducerClass(TrackNodeReducer.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(IntWritable.
class
);
FileInputFormat.setInputPaths(job,
new
Path(input));
HdfsFileUtil.delete(conf, output,
true
);
FileOutputFormat.setOutputPath(job,
new
Path(output));
exit = job.waitForCompletion(
true
) ?
0
:
1
;
}
catch
(IOException e) {
e.printStackTrace();
}
catch
(ClassNotFoundException e) {
e.printStackTrace();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
return
exit;
}
/** 将TrackNodeJob用户轨迹流量统计的PV、UV结果保存到DB数据库中 */
public
static
void
storeTrackNode2DB(String output) {
Connection conn =
null
;
HdfsFileOuter outer =
null
;
try
{
Map<String, String> map = RedisUtil.getMapRedisCacheInfo(RedisKey);
if
(map !=
null
&& !map.isEmpty()) {
conn = DBConnPool.getInstance().getConn();
TrackNodeBatchInsertDB inserter =
new
TrackNodeBatchInsertDB(conn,
10
);
outer =
new
HdfsFileOuter(output,
true
,
100
);
outer.open();
String iNode =
null
;
Long iAvgSec =
null
;
TrackNode node =
null
;
Iterator<String> itr = map.keySet().iterator();
while
(itr.hasNext()) {
iNode = itr.next();
iAvgSec = Converter.parseLong( map.get(iNode) );
node = RedisUtil.getJson2ObjectCacheInfo(RedisNodeID+iNode, TrackNode.
class
);
if
(node ==
null
) {
System.out.println(
"未缓存的节点:"
+ iNode);
continue
;
}
node.setAvgSec( iAvgSec/node.getPv() );
outer.write( node.toString() );
outer.flush(
false
);
inserter.addBatch(node);
inserter.commit(
false
);
RedisUtil.delRedisCacheInfo(RedisNodeID+iNode);
}
inserter.commit(
true
);
}
}
catch
(Exception e) {
e.printStackTrace();
}
finally
{
if
(outer !=
null
) {
outer.flush(
true
);
outer.close();
}
if
(conn !=
null
) {
try
{
conn.close();
}
catch
(SQLException e) {
e.printStackTrace();
}
}
}
}
/** 内存数据清理 */
public
static
void
clear() {
RedisUtil.delRedisCacheInfo(RedisKey);
}
}
相关推荐
它主要由HDFS(Hadoop Distributed File System,分布式文件系统)和MapReduce编程模型组成。Hadoop的设计思想是通过简单的编程模型,来实现存储大规模数据集,以及对这些数据集进行高效处理。Hadoop具有可扩展、...
Hadoop是一个高性能的海量数据处理和分析平台,它由Nutch项目起步,为了解决在分布式计算环境下管理计算作业的困难。Hadoop的分布式计算部分从Nutch项目中分离出来,并命名为Hadoop。 2. Hadoop的发展历程 Hadoop...
Java编程起步是初学者踏入Java世界的关键步骤,这个压缩包文件"Java编程起步.pdf"很可能包含了一本关于Java基础知识的教程或指南。在这里,我们将深入探讨Java编程的一些核心概念和重要知识点,帮助你构建坚实的基础...
本课程的实践教学内容和方法涉及Java、Linux、Hadoop集群平台搭建、HDFS文件系统、MapReduce编程模型等多个方面。 首先,大数据技术与应用专业的核心课程是培养学生在大数据应用系统开发方面的核心技能,包括数据...
它通过MapReduce编程模型,能够将数据处理任务分解到多台计算机上并行执行,显著提升了数据处理速度。 #### 二、Hadoop处理电力系统大规模数据的关键技术 1. **数据加载模块**:Hadoop首先需要将来自电力系统的...
相比之下,国内的云存储虽然起步稍晚,但已经在技术和应用层面实现了追赶,如百度网盘提供了全面的云存储解决方案,包括文件存储、音乐、通讯录等服务,并通过信息对比技术提升了上传速度。网易云服务则结合有道云...
"源码"表示提供的是程序的原始代码,"软件"表明这是一个完整的应用,"hadoop"和"后端"明确了项目与大数据处理和后台服务有关,"java"则表明编程语言是Java,Java是SpringBoot和Hadoop生态的主要语言。 【压缩包子...
##### 4.3 Hadoop MapReduce编程模型 MapReduce是一种分布式计算模型,主要用于大规模数据集的并行处理。它将复杂的任务分解成两部分:Map阶段和Reduce阶段。Map阶段负责将输入数据分割成小块进行处理,Reduce阶段...
- **国外研究现状**:国外在基于Hadoop的大数据服务平台方面起步较早,已经有一些成熟的产品和服务应用于工业领域。例如,IBM、Oracle等公司提供的大数据分析工具在制造业中得到了广泛应用。 - **国内研究现状**:...
在国内,虽然这一领域的研究起步较晚,但近年来也有不少高校和研究机构开始关注并投入到相关技术的研发中。 #### 三、Hadoop技术概述 ##### 3.1 Hadoop基本概念 Hadoop是一个开源框架,用于分布式存储和处理大...
Hadoop的核心部分为HDFS分布式文件系统与MapReduce编程模型,通过设计基于Hadoop的并行化聚类集成方法,一方面提高了聚类结果的稳定性与准确率,另一方面提升了聚类集成的效率。 知识点5: 国内外聚类分析研究现状 ...
HDFS为大规模数据集提供了高容错性和高吞吐量的数据访问,MapReduce则是一种编程模型,用于大规模数据集的并行计算。在分布式网盘系统中,Hadoop作为底层存储平台,能够处理PB级别的文件,保证了系统的存储容量和...
完全从零起步,让大家可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程师哦转型为Spark大数据开发工程师,或是对于正在从事hadoop大数据开发的...
理解Hadoop的分发存储和分布式计算原理,以及如何使用Java API进行编程,是大数据开发的基础。 2. Spark:Spark是一个用于大规模数据处理的开源框架,其核心API是用Java编写的。学习Spark Streaming、Spark SQL和...
首先,云计算基础实验的起步阶段是环境的搭建,这包括操作系统的选择与安装。本指导书推荐用户使用Ubuntu 14.04操作系统,该系统由于其稳定性和开源特性,在云计算领域应用广泛。安装完毕后,用户需要创建一个新的...
- **行业发展现状**:虽然目前大数据在国内的应用还处于起步阶段,但在激烈的市场竞争环境中,各行各业都在积极探索和应用大数据技术。 #### 二、课程目标与教学要求 本课程旨在帮助学生理解和掌握大数据技术的...
当前,虽然大数据技术在国内的应用仍处于起步阶段,但各行业对于大数据分析的需求日益增长,预计未来几年内将会有更多的数据分析应用场景出现。 #### 二、课程教学目标 1. **理解大数据概念及特征**: - 掌握...
总的来说,HBase 2.2的安装文件提供了在各种场景下部署和使用HBase的基础,无论是简单的实验环境还是复杂的生产系统,都能通过这份安装指南顺利起步。对于数据驱动的业务来说,理解和掌握HBase的使用是提升数据处理...
1. Volume(大量):大数据的规模巨大,通常从TB级别起步,甚至扩展到PB、EB乃至YB级别。这种庞大的数据量使得传统数据处理技术难以应对,因此需要新的处理模式。 2. Velocity(高速):大数据的增长速度极快,要求...