`
ghost1392
  • 浏览: 2859 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop编程起步

阅读更多

一、与Eclipse集成

1、Hadoop开发包下载

     A、官网:http://hadoop.apache.org/#Download+Hadoop

           2016-03-25  更新版本为2.6.0,配置方法一样。

     B、我的共享目录:\\${共享盘}\share\yzc\Hadoop-dev


      C、Eclipse插件下载(hadoop-eclipse-plugin-x.x.x.jar)

2、Eclipse集成Hadoop开发配置

      A、解压安装与插件:

            Hadoop、Eclipse的zip包,统一归整好目录,将hadoop-eclipse-plugin-x.x.x.jar插件Jar包复制到Eclipse插件目录D:\eclipse\plugins\下。如下图所示:

        注意:解压Hadoop后,将D:\hadoop-2.5.1\bin\hadoop.dll文件复制到C:\Windows\System32\目录下。

       B、配置Hadoop环境变量:

             桌面 »» 右击【我的电脑】»» 选择【属性】 »» 左边导航【高级系统设置】 »» 【高级】选项卡的【环境变量(N)...】 »»  设置【系统变量(S)】。添加或编辑以下变量:

             HADOOP_HOME        »          D:\hadoop-2.5.1                                     ----Hadoop的安装解压目录

             Path                           »           %HADOOP_HOME%\bin;......                ----编辑Path配置,在最前面增加Hadoop bin的配置,以“;”分隔作为该项结束


                系统的环境变量配置说明:

                      (A)、没有权限配置的,可联系IT部的人(xxx)使用管理员权限,或通过idesk平台设置 ;

                      (B)、配置完环境变量后,需要重启机器才能生效。

     C、启动Eclipse,配置Hadoop HDFS:

            Eclipse菜单栏,【Window】 »» 【Preferences】 »» 【Hadoop Map/Reduce】 »» 【Browse...】 »» 浏览到Hadoop的解压安装目录。如下图所示:


              配置Hadoop HDFS:打开【Map/Reduce Locations】视图,在空白处右击菜单,选择【New Hadoop location...】打开配置窗口,配置Name Node的Host和相关的端口号,点击【Finish】保存。如下图所示:


           前提是了解Hadoop的集群环境,找到Name Node的配置文件。下面以我们开发环境为例,在Hadoop安装配置目录/etc/hadoop/conf/下有以下配置文件:

                  core-site.xml                              8020                            nameNode.fs.defaultFs.RPC                         namenode RPC交互端口,HDFS文件操作

                  hdfs-site.xml                              50020                          dfs.dataNode.ipc.address                              datanode的IPC服务器地址和端口

                  mapred-site.xml

           配置中的端口均采用如图默认配置值即可,点击【Finish】,切换到【Map/Reduce】窗口模式,就会在左边的【Project Explorer】区域查看到DFS Locations菜单下有刚在配置的Name Node的HDFS文件目录结构,以及右键菜单提供相应的文件操作。如下图所示:



右键菜单提供文件操作:   

 

二、Map / Reduce程序开发

1、构建Maven项目,加入依赖

pom.xml依赖Jar Collapse source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.5.2</version>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.5.2</version>
  </dependency>
<dependency>
      <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>0.12.0</version>
  </dependency>
  <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>0.12.0</version>
  </dependency>
  <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-metastore</artifactId>
        <version>0.12.0</version>
  </dependency>
  <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-service</artifactId>
    <version>0.12.0</version>
  </dependency>
  <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>libfb303</artifactId>
        <version>0.9.0</version>
  </dependency>

2、编写Map / Reduce的Job程序

Map/Reduce Job代码示例 Collapse source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package com.xxx.analyse.job;
 
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import com.xxx.analyse.jdbc.DBConnPool;
import com.xxx.analyse.nginx.SysNodeCache;
import com.xxx.analyse.nginx.TrackNodeBatchInsertDB;
import com.xxx.analyse.parser.NginxLogParser;
import com.xxx.analyse.po.NginxLog;
import com.xxx.analyse.po.TrackNode;
import com.xxx.analyse.util.Converter;
import com.xxx.analyse.util.DateUtil;
import com.xxx.analyse.util.HdfsFileOuter;
import com.xxx.analyse.util.HdfsFileUtil;
import com.xxx.analyse.util.RedisUtil;
 
/**  
 * @Title: 用户轨迹流量统计PV、UV
 * @Description: Mapper & Reducer
 * @Team: 技术1部Java开发小组
 * @Author Andy-ZhichengYuan
 * @Date 2015年11月25日
 * @Version V1.0   */
public class TrackNodeJob {
     
    /** 存放Redis缓存的Key:t_bi_nodev_dlog表数据,TrackNode对象  */
    public static final String RedisKey = "bi_nodev_dlog";
    /** 当前时间的配置Key */
    private static final String DayLogDate = "TrackNodeJobDate";
    private static final String RedisNodeID = "NodeID_";
     
     
    /** 用户轨迹流量统计的Mapper  */
    public static class TrackNodeMapper extends Mapper<Object, Text, Text, IntWritable> {
        private Text PreNodePvKey = new Text();
        private Text PreNodeUvKey = new Text();
        private Text NodePvKey = new Text();
        private Text NodeUvKey = new Text();
        private final static IntWritable PreNodePv = new IntWritable(1);
        private final static IntWritable PreNodeUv = new IntWritable(1);
        private final static IntWritable NodePv = new IntWritable(1);
        private final static IntWritable NodeUv = new IntWritable(1);
         
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            NginxLog log = NginxLogParser.parseNginxLogAll(line);
            if (log != null && log.isValid()) {
                //iAutoID,iPreNode,iPrePV,iPreUV,iNode,iPV,iUV,iAvgSec,iCreateTime
                Long iPreNode = SysNodeCache.getNodeIdByURL(log.getRequest().getsReferUrl());
                Long iNode = SysNodeCache.getNodeIdByURL(log.getRequest().getsPageUrl());
                //TODO 处理前后节点相同的情况
                 
                //当前节点的平均停留秒钟数:此处亦缓存了,在最后节点的流量统计数据入库时用作节点缓存的获取
                Long staySecond = log.getRequest().getStaySeconds();
                if(staySecond != null && staySecond.longValue() > 0) {
                    Map<String, String> map = RedisUtil.getMapRedisCacheInfo(RedisKey);
                    if(map == null) {
                        map = new ConcurrentHashMap<String, String>();
                        map.put(""+iNode, ""+staySecond);
                        RedisUtil.setMapRedisCacheInfo(RedisKey, map, RedisUtil.DAY);
                    else {
                        if( map.containsKey(""+iNode) ) {
                            staySecond += Converter.parseLong( map.get(""+iNode) );
                            map.put(""+iNode, ""+staySecond);
                        else {
                            map.put(""+iNode, ""+staySecond);
                        }
                        RedisUtil.setMapRedisCacheInfo(RedisKey, map, RedisUtil.DAY);
                    }
                }
                 
                //PV值
                PreNodePvKey.set( iPreNode +"|"+ iNode );
                context.write(PreNodePvKey, PreNodePv);
                 
                NodePvKey.set( ""+ iNode );
                context.write(NodePvKey, NodePv);
                 
                //UV值
                PreNodeUvKey.set( iPreNode +"|"+ iNode +":"+ log.getsGuid() );
                context.write(PreNodeUvKey, PreNodeUv);
                 
                NodeUvKey.set( iNode +":"+ log.getsGuid() );
                context.write(NodeUvKey, NodeUv);
                 
            }//非法日志
            line = null;
            log = null;
        }
    }
     
////////////////////////////////////////////////////////////////////////////////////
     
    /** 组合器Combiner:在Map输出于Reduce之前做合并计算,其实质是Reduce的处理逻辑  */
    public static class TrackNodeCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        //实现reduce函数
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
             
            String mapKey = key.toString();
            if( StringUtils.contains(mapKey, ":") ) {//UV,GUID处理
                mapKey = (StringUtils.substringBeforeLast(mapKey, ":") + ":");
                result.set( sum );
                context.write(new Text(mapKey), result);
            else {
                result.set( sum );
                context.write(key, result);
            }
        }
    }
     
///////////////////////////////////////////////////////////////////////////////////
     
    /** 用户轨迹流量统计的Reduce类  */
    public static class TrackNodeReducer extends Reducer<Text, IntWritable, TrackNode, Text> {
        private Long iDate = null;
         
        // 实现reduce函数
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            long sum = 0;
            String mapKey = key.toString();
            int iType = this.getKeyType(mapKey);//业务类型
            String iNode = this.getNodeID(iType, mapKey);
            if(iType == 1 || iType == 3) {//UV只求Values的个数
                Iterator<IntWritable> itr = values.iterator();
                while( itr.hasNext() ) {
                    itr.next();
                    sum ++;
                }
            else {//PV
                for (IntWritable val : values) {
                    sum += val.get();
                }
            }
             
            if(iDate == null || iDate.longValue() == 0) {
                Configuration conf = context.getConfiguration();
                String yesterday = DateUtil.getAgoBackDate(-1);
                Long iDateDef = DateUtil.getDateSec( yesterday );//秒钟数
                iDate = conf.getLong(DayLogDate, iDateDef);
            }
             
            //缓存当天所有节点的ID,做为入库时获取缓存的Key:在Map时处理当前节点的平均停留秒钟数已做了缓存
//            if( RedisUtil.exists(RedisKey) ) {
//                Set<String> keySet = RedisUtil.getSetRedisCacheInfo(RedisKey);
//                keySet.add( iNode );
//                RedisUtil.setSetRedisCacheInfo(RedisKey, keySet, RedisUtil.HOUR);
//            } else {
//                Set<String> keySet = new HashSet<String>();
//                keySet.add(iNode);
//                RedisUtil.setSetRedisCacheInfo(RedisKey, keySet, RedisUtil.HOUR);
//            }
             
            TrackNode tNode = RedisUtil.getJson2ObjectCacheInfo(RedisNodeID+iNode, TrackNode.class);
            if(tNode != null) {
                //iAutoID,iPreNode,iPrePV,iPreUV,iNode,iPV,iUV,iAvgSec,iCreateTime
//                tNode.setiNode( Converter.parseLong(iNode) );
//                tNode.setiCreateTime( iDate );//时间戳
                switch (iType) {
                case 1://iPreNode UV
                    if(tNode.getPreNode() == 0) {
                        tNode.setPreNode(Converter.parseLong( this.getPreNodeID(mapKey) ));
                    }
                    Long iPreUv = tNode.getPreUV();
                    tNode.setPreUV(iPreUv + sum);
                    break;
                case 2://iPreNode PV
                    if(tNode.getPreNode() == 0) {
                        tNode.setPreNode(Converter.parseLong( this.getPreNodeID(mapKey) ));
                    }
                    Long iPrePv = tNode.getPrePV();
                    tNode.setPrePV(iPrePv + sum);
                    break;
                case 3://iNode UV
                    Long iUv = tNode.getUv();
                    tNode.setUv(iUv + sum);
                    break;
                default://0 iNode PV
                    Long iPv = tNode.getPv();
                    tNode.setPv(iPv + sum);
                    break;
                }
                RedisUtil.setObject2JsonCacheInfo(RedisNodeID+iNode, tNode, RedisUtil.DAY);
            else {//不存在
                tNode = new TrackNode();
                //iAutoID,iPreNode,iPrePV,iPreUV,iNode,iPV,iUV,iAvgSec,iCreateTime
                tNode.setNode( Converter.parseLong(iNode) );
                tNode.setCreateTime( iDate );//时间戳
                switch (iType) {
                case 1://iPreNode UV
                    tNode.setPreNode(Converter.parseLong( this.getPreNodeID(mapKey) ));
                    tNode.setPreUV( sum );
                    break;
                case 2://iPreNode PV
                    tNode.setPreNode(Converter.parseLong( this.getPreNodeID(mapKey) ));
                    tNode.setPrePV( sum );
                    break;
                case 3://iNode UV
                    tNode.setUv( sum );
                    break;
                default://0 iNode PV
                    tNode.setPv( sum );
                    break;
                }
                RedisUtil.setObject2JsonCacheInfo(RedisNodeID+iNode, tNode, RedisUtil.DAY);
            }
        }
         
         
        /** 根据Key判断业务类型:
         *<p>1 = UV Key:iPreNode +"|"+ iNode +":"+ GUID </p>
         *<p>2 = PV Key:iPreNode +"|"+ iNode  </p>
         *<p>3 = UV Key:iNode +":"+ GUID  </p>
         * 0 = PV Key:iNode (default)   */
        public int getKeyType(String mapKey) {
            int iType = 0;//业务类型
            if(StringUtils.contains(mapKey, "|") && StringUtils.contains(mapKey, ":")) {//iPreNode UV
                iType = 1;
            else {
                if(StringUtils.contains(mapKey, "|")) {//iPreNode PV
                    iType = 2;
                else {
                    if(StringUtils.contains(mapKey, ":")) {//iNode UV
                        iType = 3;
                    else {//iNode PV
                        iType = 0;
                    }
                }
            }
            return iType;
        }
         
        /** 根据业务类型来获取当前节点的ID  */
        public String getNodeID(int iType, String mapKey) {
            switch (iType) {
            case 1://iPreNode UV
                return StringUtils.substringBetween(mapKey, "|"":");
            case 2://iPreNode PV
                return StringUtils.substringAfterLast(mapKey, "|");
            case 3://iNode UV
                return StringUtils.substringBefore(mapKey, ":");
            default://0 iNode PV
                return mapKey;
            }
        }
         
        /** 仅当存在上游节点时,才根据MapKey来获取上游节点的ID;否则返回null  */
        public String getPreNodeID(String mapKey) {
            if( StringUtils.contains(mapKey, "|") ) {
                return StringUtils.substringBefore(mapKey, "|");
            }
            return null;
        }
         
    }
     
     
//////////////////////////////////////////////////////////////////////////////////////////////
     
    /** 运行TrackNodeJob用户轨迹流量统计的MapReduce作业  */
    public static int runTrackNode2File(String input, String output, Long iDate) {
        int exit = 0;
        try {
            Configuration conf = new Configuration();
            conf.setLong(DayLogDate, iDate);//设置Job的常量,日志时间
             
            Job job = Job.getInstance(conf, "t_bi_nodev_dlog");
            job.setJarByClass(TrackNodeJob.class);
            job.setMapperClass(TrackNodeMapper.class);
            job.setCombinerClass(TrackNodeCombiner.class);
            job.setReducerClass(TrackNodeReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
             
//            FileInputFormat.addInputPath(job, new Path(input));
            FileInputFormat.setInputPaths(job, new Path(input));
             
            //输出:写入内存
            HdfsFileUtil.delete(conf, output, true);
            FileOutputFormat.setOutputPath(job, new Path(output));
            exit = job.waitForCompletion(true) ? 0 1;
             
        catch (IOException e) {
            e.printStackTrace();
        catch (ClassNotFoundException e) {
            e.printStackTrace();
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        return exit;
    }
     
     
    /** 将TrackNodeJob用户轨迹流量统计的PV、UV结果保存到DB数据库中    */
    public static void storeTrackNode2DB(String output) {
        Connection conn = null;
        HdfsFileOuter outer = null;
        try {
//            Set<String> keySet = RedisUtil.getSetRedisCacheInfo(RedisKey);
//            if(keySet != null && !keySet.isEmpty()) {
            Map<String, String> map = RedisUtil.getMapRedisCacheInfo(RedisKey);
            if(map != null && !map.isEmpty()) {
                conn = DBConnPool.getInstance().getConn();
                TrackNodeBatchInsertDB inserter = new TrackNodeBatchInsertDB(conn, 10);
                outer = new HdfsFileOuter(output, true100);
                outer.open();
                 
                String iNode = null;
                Long iAvgSec = null;
                TrackNode node = null;
                Iterator<String> itr = map.keySet().iterator();
                while (itr.hasNext()) {
                    iNode = itr.next();
                    iAvgSec = Converter.parseLong( map.get(iNode) );
                    node = RedisUtil.getJson2ObjectCacheInfo(RedisNodeID+iNode, TrackNode.class);
                    if(node == null) {
                        System.out.println("未缓存的节点:"+ iNode);
                        continue;
                    }
                    node.setAvgSec( iAvgSec/node.getPv() );//平均停留秒钟数
                     
                    outer.write( node.toString() );
                    outer.flush(false);
                     
                    inserter.addBatch(node);
                    inserter.commit(false);
                     
                    RedisUtil.delRedisCacheInfo(RedisNodeID+iNode);//删除Redis
                }
                inserter.commit(true);
            }
        }catch (Exception e) {
            e.printStackTrace();
        finally {
            if(outer != null) {
                outer.flush(true);
                outer.close();
            }
            if(conn != null) {
                try {
                    conn.close();
                catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
     
    /** 内存数据清理 */
    public static void clear() {
        RedisUtil.delRedisCacheInfo(RedisKey);//删除Redis
    }
     
     
}

 

 

另外在这里分享一下我的UV计算去重的方法:

 

首先要理解MR很重要,这里先不说内部原理,你目前最关心的应该是处理过程的问题。

MR是一行一行数据处理的:

M,如果你读取的是HDFS上的文件,M每次处理就会获取一行数据,在map()方法里面处理,至于如何处理看你喜好,基本都是字符串处理。

R,也是一行一行处理的,M的每一行来自于文件上的读取数据,R的每一行来自于M的输出。

 

所以在M的map()处理完后,想丢给R的就要通过context.write(key, value)给R处理。

注意:write时的key、value,所有相同的key会被组合成一个数据给R处理。

假设M输出write时,有:(Key的值是解析nginx access log后业务加工的一个访问节点ID:GUID)

Key                                                               Value

100:3aefa00eaa07def13a15a5be3a79a39e    1

100:ef5a4c4a838f4c46c0c3a83642ddd297    1

100:837e6a6a77a3db3304ffab75835f4177    1

100:6a36a270c57af55f768d14d6bcc34dc7    1

 

101:3aefa00eaa07def13a15a5be3a79a39e    1

101:837e6a6a77a3db3304ffab75835f4177    1

101:837e6a6a77a3db3304ffab75835f4177    1

101:837e6a6a77a3db3304ffab75835f4177    1

101:8cca950662f1d052fd1e05c886d841db    1

101:8cca950662f1d052fd1e05c886d841db    1

101:8cca950662f1d052fd1e05c886d841db    1

101:9628e18b945f4c3c8d2355583fb2495b    1

101:9628e18b945f4c3c8d2355583fb2495b    1

 

 

R会处理两次,第1次为Combiner输入是:

Key                                                              Vaules            迭代求和

100:3aefa00eaa07def13a15a5be3a79a39e    1                            1

100:ef5a4c4a838f4c46c0c3a83642ddd297    1                            1

100:837e6a6a77a3db3304ffab75835f4177    1                            1

100:6a36a270c57af55f768d14d6bcc34dc7    1                            1

 

101:3aefa00eaa07def13a15a5be3a79a39e    1                         1

101:837e6a6a77a3db3304ffab75835f4177    1,1,1                    3

101:8cca950662f1d052fd1e05c886d841db    1,1,1                    3

101:9628e18b945f4c3c8d2355583fb2495b    1,1                        2

而做完迭代求和后,我在这一步context.write(key, value)操作时,巧妙的将含有GUID的Key做了截段聚合处理,保证输出到下1次Reduce时的输入为以下格式的内容:

Key        Vaules

100:    1,1,1,1

101:    1,3,3,2

第2次Reduce时处理就简单了,处理一下访问节点的转换,迭代计数Values即可得到UV。

UV的输出结果是:

Key        UV(迭代计数)

100        4

101        4

而PV的输出结果是:

Key        PV(取值累加)

100        4

101        9

总结:在R里面,巧妙利用2次R运算的Key聚合,通过迭代values获取value里面的值,将值求和作累加得到的是PV;而只做values的迭代计数,相当于得到value的个数即为UV。

下面给大家看一下MR的运算图解就清楚了:



 

 

 
  • 大小: 85.2 KB
  • 大小: 174.4 KB
  • 大小: 157.6 KB
  • 大小: 155.4 KB
  • 大小: 134 KB
  • 大小: 449.2 KB
0
0
分享到:
评论
1 楼 ghost1392 2017-05-29  
再分享一下,我们在大数据当中遇到UV去重计算的历程:
1、用Hadoop的MR任务计算,通过本地Set或Map缓存记录GUID来判断GUID的重复性;(UV比实际偏高)
2、用Hadoop的MR任务计算,通过redis缓存记录GUID来判断GUID的重复性;(应该可行)
3、用Hadoop的MR任务计算,通过对MR计算原理和处理过程的理解,取迭代计数=UV;(没毛病)
4、先将解析后的nginx access log日志,输出为结构化的数据文本,再通过sqoop2导入到hive表中,然后通过impala用SQL去重函数查询UV。(一次解析,灵活分析统计)

相关推荐

    从零起步搭建Hadoop单机和伪分布式开发环境图文教程.

    它主要由HDFS(Hadoop Distributed File System,分布式文件系统)和MapReduce编程模型组成。Hadoop的设计思想是通过简单的编程模型,来实现存储大规模数据集,以及对这些数据集进行高效处理。Hadoop具有可扩展、...

    hadoop 大数据的存储与分析

    Hadoop是一个高性能的海量数据处理和分析平台,它由Nutch项目起步,为了解决在分布式计算环境下管理计算作业的困难。Hadoop的分布式计算部分从Nutch项目中分离出来,并命名为Hadoop。 2. Hadoop的发展历程 Hadoop...

    Java 编程起步

    Java编程起步是初学者踏入Java世界的关键步骤,这个压缩包文件"Java编程起步.pdf"很可能包含了一本关于Java基础知识的教程或指南。在这里,我们将深入探讨Java编程的一些核心概念和重要知识点,帮助你构建坚实的基础...

    大数据专业Hadoop开发技术课程实践教学探索.pdf

    本课程的实践教学内容和方法涉及Java、Linux、Hadoop集群平台搭建、HDFS文件系统、MapReduce编程模型等多个方面。 首先,大数据技术与应用专业的核心课程是培养学生在大数据应用系统开发方面的核心技能,包括数据...

    基于Hadoop的数据挖掘

    它通过MapReduce编程模型,能够将数据处理任务分解到多台计算机上并行执行,显著提升了数据处理速度。 #### 二、Hadoop处理电力系统大规模数据的关键技术 1. **数据加载模块**:Hadoop首先需要将来自电力系统的...

    基于Hadoop的在线文件管理系统-开题报告.pdf

    相比之下,国内的云存储虽然起步稍晚,但已经在技术和应用层面实现了追赶,如百度网盘提供了全面的云存储解决方案,包括文件存储、音乐、通讯录等服务,并通过信息对比技术提升了上传速度。网易云服务则结合有道云...

    基于SpringBoot的Hadoop-Yarn资源监控系统源码.zip

    "源码"表示提供的是程序的原始代码,"软件"表明这是一个完整的应用,"hadoop"和"后端"明确了项目与大数据处理和后台服务有关,"java"则表明编程语言是Java,Java是SpringBoot和Hadoop生态的主要语言。 【压缩包子...

    基于Hadoop的交通视频异常事件检测系统的设计与实现.docx

    ##### 4.3 Hadoop MapReduce编程模型 MapReduce是一种分布式计算模型,主要用于大规模数据集的并行处理。它将复杂的任务分解成两部分:Map阶段和Reduce阶段。Map阶段负责将输入数据分割成小块进行处理,Reduce阶段...

    基于Hadoop的线缆生产的大数据服务平台的设计与实现.docx

    - **国外研究现状**:国外在基于Hadoop的大数据服务平台方面起步较早,已经有一些成熟的产品和服务应用于工业领域。例如,IBM、Oracle等公司提供的大数据分析工具在制造业中得到了广泛应用。 - **国内研究现状**:...

    基于Hadoop生态的高校学生行为预警平台设计与实现.docx

    在国内,虽然这一领域的研究起步较晚,但近年来也有不少高校和研究机构开始关注并投入到相关技术的研发中。 #### 三、Hadoop技术概述 ##### 3.1 Hadoop基本概念 Hadoop是一个开源框架,用于分布式存储和处理大...

    计算机研究 -基于Hadoop的聚类集成方法研究.pdf

    Hadoop的核心部分为HDFS分布式文件系统与MapReduce编程模型,通过设计基于Hadoop的并行化聚类集成方法,一方面提高了聚类结果的稳定性与准确率,另一方面提升了聚类集成的效率。 知识点5: 国内外聚类分析研究现状 ...

    基于hadoop+hbase+springboot实现分布式网盘系统.zip

    HDFS为大规模数据集提供了高容错性和高吞吐量的数据访问,MapReduce则是一种编程模型,用于大规模数据集的并行计算。在分布式网盘系统中,Hadoop作为底层存储平台,能够处理PB级别的文件,保证了系统的存储容量和...

    Spark全面精讲

    完全从零起步,让大家可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程师哦转型为Spark大数据开发工程师,或是对于正在从事hadoop大数据开发的...

    Java基础可导入试题文件新.zip

    理解Hadoop的分发存储和分布式计算原理,以及如何使用Java API进行编程,是大数据开发的基础。 2. Spark:Spark是一个用于大规模数据处理的开源框架,其核心API是用Java编写的。学习Spark Streaming、Spark SQL和...

    大数据技术与应用基础-教学大纲.doc

    - **行业发展现状**:虽然目前大数据在国内的应用还处于起步阶段,但在激烈的市场竞争环境中,各行各业都在积极探索和应用大数据技术。 #### 二、课程目标与教学要求 本课程旨在帮助学生理解和掌握大数据技术的...

    《大数据处理技术》课程规划.pdf

    然而,国内在这方面仍处于起步阶段,课程建设亟待完善。 课程主要面向计算机科学与技术方向的高年级本科生和研究生,这一阶段的学生具备一定的基础知识,能更好地理解和掌握大数据处理的复杂概念。教学内容包括...

    大数据技术与应用基础_教学大纲.doc

    当前,虽然大数据技术在国内的应用仍处于起步阶段,但各行业对于大数据分析的需求日益增长,预计未来几年内将会有更多的数据分析应用场景出现。 #### 二、课程教学目标 1. **理解大数据概念及特征**: - 掌握...

    hbase2.2安装文件

    总的来说,HBase 2.2的安装文件提供了在各种场景下部署和使用HBase的基础,无论是简单的实验环境还是复杂的生产系统,都能通过这份安装指南顺利起步。对于数据驱动的业务来说,理解和掌握HBase的使用是提升数据处理...

    大数据技术22.pptx

    1. Volume(大量):大数据的规模巨大,通常从TB级别起步,甚至扩展到PB、EB乃至YB级别。这种庞大的数据量使得传统数据处理技术难以应对,因此需要新的处理模式。 2. Velocity(高速):大数据的增长速度极快,要求...

Global site tag (gtag.js) - Google Analytics