`
dalan_123
  • 浏览: 88140 次
  • 性别: Icon_minigender_1
  • 来自: 郑州
社区版块
存档分类
最新评论

spring hadoop之mapreduce batch

阅读更多
一、测试
// 定义hadoop configuration
Configuration conf = new Configuration();
// 指定hdfs上获取分析文件目录和输出分析结果目录
// 格式:hdfs://10.33.96.241:8020/user/tweets/input
//       hdfs://10.33.96.241:8020/user/tweets/output
// 最好使用当前hdfs系统用户目录;比如linux系统用户为tweets
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: HaashtagCount <in> <out>");
System.exit(2);
}
// 定义job 指定hadoop configuration和名称
Job job = new Job(conf, "hashtag count");
// 设定class所在的jar文件
job.setJarByClass(HashtagCount.class);
// 设定mapper
job.setMapperClass(TokenizerMapper.class);
// 设定job的合并类 一般为reduce实现对应的类
job.setCombinerClass(LongSumReducer.class);
// 设定reduce
job.setReducerClass(LongSumReducer.class);
// 由于mapreduce使用的key-value的格式
// 设定分析结果输出内容key的类型
job.setOutputKeyClass(Text.class);
// 设定分析结果输出内容value的类型
job.setOutputValueClass(LongWritable.class);
// 设定分析文件所在的路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 设定分析结果输出的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 等待job完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
二、map对应实现类
// 首先凡是map都必须继承Mapper 实现map方法
// 类Mapper四个参数:前两个执行mapper的key和value 最后两个为mapper执行
// 后输出的key和value
public static class TokenizerMapper extends
Mapper<Object, Text, Text, LongWritable> {
// 指定正则表达式
final static Pattern TAG_PATTERN = Pattern.compile("\"hashTags\":\\[([^\\]]*)");
// 指定执行mapper分解之后的输出结果key与value的类型
private final static LongWritable ONE = new LongWritable(1L);
private Text word = new Text();
// 必须实现的方面 执行mapper的操作均在该方法中
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
  // 获取符合要求的内容 循环进行分割内容
  Matcher matcher = TAG_PATTERN.matcher(value.toString());
  while (matcher.find()) {
String found = matcher.group();
String cleanedString = found.replaceFirst("\"hashTags\":\\[\\{\"text\":\"", "");
String superPolished = cleanedString.split("\",\"")[0];

        String useMe = superPolished;
if (superPolished.startsWith("\\u")) {
useMe = StringEscapeUtils.unescapeJava(superPolished);
}
useMe = useMe.split("\"")[0];
        // 将符合要求的内容和统计结果输出到对应分析结果中
       // 注:以上代码主要是对分析内容进行拆分 因为对应的统计结果ONE均为1
word.set(useMe.toLowerCase());
context.write(word, ONE);
      }
   }

}
三、reduce实现类
// 凡是实现reduce的必须继承Reducer类;前两个参数为mapper分析之后的结果
// key与value 最后两个参数为统计结果的key与value
// 同时每个类都必须实现reduce方法
public static class LongSumReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
  private LongWritable result = new LongWritable();

  // 实现reduce方法
  // 这个里面根据实际的业务需要实现对应的业务内容统计
  public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable val : values) {
    sum += val.get();
}
     // 由于该处针对的是同一个内容统计,因而只需要处理统计结果的value即可
result.set(sum);
    // 将统计结果提交到task的上下文环境中 输出
context.write(key, result);
    }
}
以上即为简单的mapreduce框架的实现过程,基于spring-hadoop api实现
分享到:
评论

相关推荐

    spring data hadoop reference

    Spring Hadoop 提供了与之集成的功能。 #### 九、Spring Hadoop 示例应用程序 文档中还包含了一些示例应用程序,这些示例涵盖了从简单的 WordCount 到更复杂的批处理作业,可以帮助开发者快速上手并理解如何使用 ...

    spring-hadoop-getting-started:Spring for Apache Hadoop 入门示例

    3. MapReduce支持:Spring Hadoop提供了MapReduce作业的抽象,包括Mapper、Reducer、Combiner等,允许开发者用面向对象的方式编写MapReduce程序。 二、Spring Hadoop的主要模块 1. Spring Hadoop Core:核心模块,...

    基于Spring Batch的大数据量并行处理

    - **使用场景**:两者可以互补使用,例如可以在Spring Batch中定期将日志推送到Hadoop的HDFS系统中进行存储和进一步的分析处理。 #### 分层架构 Spring Batch采用了清晰的分层架构设计,主要包括以下几个层面: - *...

    携程大数据开发平台实践

    4. **Spring框架**:Spring不仅用于后端服务的开发,还可以与大数据平台结合,例如Spring Batch用于批处理,Spring Cloud Data Flow用于大数据流处理。Spring Boot简化了微服务的开发,Spring Cloud则为服务发现、...

    异步并行批处理框架设计的一些思考

    当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析。此外还有针对实时在线流式数据处理方面的,同样也是分布式...

    java批处理

    1. **批处理框架**:Java世界中有许多批处理框架,如Spring Batch和Apache Batchee,它们提供了完整的批处理生命周期管理,包括读取、处理和写入数据,以及错误处理和事务管理。 2. **数据分块**:为了处理大数据,...

    批处理

    对于复杂的数据转换和处理,Hadoop MapReduce、Apache Spark等大数据处理框架提供了分布式处理能力,适合处理PB级别的数据。 8. **监控和日志**: 对于批处理作业,监控其运行状态和日志记录非常重要。Java应用...

Global site tag (gtag.js) - Google Analytics