`
somkens
  • 浏览: 7435 次
  • 性别: Icon_minigender_1
  • 来自: 上海
最近访客 更多访客>>
社区版块
存档分类
最新评论

Hadoop-MapReduce气象站分析示例

阅读更多

ttt.txt文件

内容:

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

使用MapReduce把年份和温度数据提取出来

编写:TemperatureMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

pageage org.somken;

import java.io.IOException; 

   
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 
   
/**
 * 提取年份,温度数据
 
 
 */ 
public class TemperatureMapper extends MapReduceBase implements 
        Mapper<LongWritable, Text, Text, IntWritable> { 
   
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException { 
        String line = value.toString(); 
        // 提取年份 
        String year = line.substring(1519); 
        // 提取温度  
        String temp = line.substring(8792); 
        if (!missing(temp)) { 
            int airTemperature = Integer.parseInt(temp); 
            output.collect(new Text(year), new IntWritable(airTemperature)); 
        
    
   
    /**
     * 如果提取出来的温度达到9999,认为是提取不到数据
     
     * @param temp
     * @return 是否能正确提取温度数据
     */ 
    private boolean missing(String temp) { 
        return temp.equals("+9999"); 
    
   
}

编写:TemperatureReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.somken;
 
import java.io.IOException; 
import java.util.Iterator; 
   
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
   
/**
 * 输出当年最高温度
 
 */ 
public class  TemperatureReducer extends MapReduceBase implements 
        Reducer<Text, IntWritable, Text, IntWritable> { 
    public void reduce(Text key, Iterator<IntWritable> values, 
                       OutputCollector<Text, IntWritable> output, Reporter reporter) 
            throws IOException { 
        int maxValue = Integer.MIN_VALUE; 
        while (values.hasNext()) { 
            maxValue = Math.max(maxValue, values.next().get()); 
        
        output.collect(key, new IntWritable(maxValue)); 
    
}

编写:TemperatureDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.somken;
 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
   
/**
程序入口
*/
public class TemperatureDriver extends Configured implements Tool { 
    @Override 
    public int run(String[] args) throws Exception { 
        if (args.length != 2) { 
            System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass() 
                    .getSimpleName()); 
            ToolRunner.printGenericCommandUsage(System.err); 
            return -1
        
        JobConf conf = new JobConf(getConf(), getClass()); 
        conf.setJobName("最高温度"); 
        FileInputFormat.addInputPath(conf, new Path(args[0])); 
        FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
        conf.setOutputKeyClass(Text.class); 
        conf.setOutputValueClass(IntWritable.class); 
        conf.setMapperClass(TemperatureMapper.class); 
        conf.setCombinerClass(TemperatureReducer.class); 
        conf.setReducerClass(TemperatureReducer.class); 
        JobClient.runJob(conf); 
        return 0
    
   
    public static void main(String[] args) throws Exception { 
        int exitCode = ToolRunner.run(new TemperatureDriver(), args); 
        System.exit(exitCode); 
    

运行前:

    打成  temperature.jar包

    拷贝到 Hadoop的安装目录

    运行使用命令  hadoop jar  temperature.jar  org.somken.TemperatureDriver  ttt.txt  /user/output

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics