- 浏览: 88140 次
- 性别:
- 来自: 郑州
-
文章分类
- 全部博客 (69)
- java (28)
- linux (6)
- redis (4)
- C# (3)
- 架构 (10)
- java ee (1)
- web (1)
- 操作系统 (7)
- sqlserver (1)
- android (2)
- Hadoop (12)
- 大数据 (21)
- 分布式 事务 消息 (10)
- linux mysql (1)
- 数据库 (3)
- 关于hadoop之bootshell使用 (2)
- 关于hbase---HTableInterfaceFactory (1)
- Spring (3)
- Hbase (5)
- jstorm (10)
- nginx (1)
- 分布式 (1)
- 区块链 (3)
- dubbo (1)
- nacos (1)
- 阿里 (1)
- go (3)
- 缓存 (1)
- memcached (1)
- ssdb (1)
- 源码 (1)
最新评论
-
想个可以用的名字:
楼主,能不能给发一份源代码,1300246542@qqq.co ...
spring+websocket的使用 -
wahahachuang5:
web实时推送技术使用越来越广泛,但是自己开发又太麻烦了,我觉 ...
websocket -
dalan_123:
前提是你用的是spring mvc 才需要加的1、在web.x ...
spring+websocket的使用 -
string2020:
CharacterEncodingFilter这个filter ...
spring+websocket的使用
一、测试
// 定义hadoop configuration
Configuration conf = new Configuration();
// 指定hdfs上获取分析文件目录和输出分析结果目录
// 格式:hdfs://10.33.96.241:8020/user/tweets/input
// hdfs://10.33.96.241:8020/user/tweets/output
// 最好使用当前hdfs系统用户目录;比如linux系统用户为tweets
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: HaashtagCount <in> <out>");
System.exit(2);
}
// 定义job 指定hadoop configuration和名称
Job job = new Job(conf, "hashtag count");
// 设定class所在的jar文件
job.setJarByClass(HashtagCount.class);
// 设定mapper
job.setMapperClass(TokenizerMapper.class);
// 设定job的合并类 一般为reduce实现对应的类
job.setCombinerClass(LongSumReducer.class);
// 设定reduce
job.setReducerClass(LongSumReducer.class);
// 由于mapreduce使用的key-value的格式
// 设定分析结果输出内容key的类型
job.setOutputKeyClass(Text.class);
// 设定分析结果输出内容value的类型
job.setOutputValueClass(LongWritable.class);
// 设定分析文件所在的路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 设定分析结果输出的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 等待job完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
二、map对应实现类
// 首先凡是map都必须继承Mapper 实现map方法
// 类Mapper四个参数:前两个执行mapper的key和value 最后两个为mapper执行
// 后输出的key和value
public static class TokenizerMapper extends
Mapper<Object, Text, Text, LongWritable> {
// 指定正则表达式
final static Pattern TAG_PATTERN = Pattern.compile("\"hashTags\":\\[([^\\]]*)");
// 指定执行mapper分解之后的输出结果key与value的类型
private final static LongWritable ONE = new LongWritable(1L);
private Text word = new Text();
// 必须实现的方面 执行mapper的操作均在该方法中
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 获取符合要求的内容 循环进行分割内容
Matcher matcher = TAG_PATTERN.matcher(value.toString());
while (matcher.find()) {
String found = matcher.group();
String cleanedString = found.replaceFirst("\"hashTags\":\\[\\{\"text\":\"", "");
String superPolished = cleanedString.split("\",\"")[0];
String useMe = superPolished;
if (superPolished.startsWith("\\u")) {
useMe = StringEscapeUtils.unescapeJava(superPolished);
}
useMe = useMe.split("\"")[0];
// 将符合要求的内容和统计结果输出到对应分析结果中
// 注:以上代码主要是对分析内容进行拆分 因为对应的统计结果ONE均为1
word.set(useMe.toLowerCase());
context.write(word, ONE);
}
}
}
三、reduce实现类
// 凡是实现reduce的必须继承Reducer类;前两个参数为mapper分析之后的结果
// key与value 最后两个参数为统计结果的key与value
// 同时每个类都必须实现reduce方法
public static class LongSumReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable result = new LongWritable();
// 实现reduce方法
// 这个里面根据实际的业务需要实现对应的业务内容统计
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
// 由于该处针对的是同一个内容统计,因而只需要处理统计结果的value即可
result.set(sum);
// 将统计结果提交到task的上下文环境中 输出
context.write(key, result);
}
}
以上即为简单的mapreduce框架的实现过程,基于spring-hadoop api实现
// 定义hadoop configuration
Configuration conf = new Configuration();
// 指定hdfs上获取分析文件目录和输出分析结果目录
// 格式:hdfs://10.33.96.241:8020/user/tweets/input
// hdfs://10.33.96.241:8020/user/tweets/output
// 最好使用当前hdfs系统用户目录;比如linux系统用户为tweets
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: HaashtagCount <in> <out>");
System.exit(2);
}
// 定义job 指定hadoop configuration和名称
Job job = new Job(conf, "hashtag count");
// 设定class所在的jar文件
job.setJarByClass(HashtagCount.class);
// 设定mapper
job.setMapperClass(TokenizerMapper.class);
// 设定job的合并类 一般为reduce实现对应的类
job.setCombinerClass(LongSumReducer.class);
// 设定reduce
job.setReducerClass(LongSumReducer.class);
// 由于mapreduce使用的key-value的格式
// 设定分析结果输出内容key的类型
job.setOutputKeyClass(Text.class);
// 设定分析结果输出内容value的类型
job.setOutputValueClass(LongWritable.class);
// 设定分析文件所在的路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 设定分析结果输出的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 等待job完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
二、map对应实现类
// 首先凡是map都必须继承Mapper 实现map方法
// 类Mapper四个参数:前两个执行mapper的key和value 最后两个为mapper执行
// 后输出的key和value
public static class TokenizerMapper extends
Mapper<Object, Text, Text, LongWritable> {
// 指定正则表达式
final static Pattern TAG_PATTERN = Pattern.compile("\"hashTags\":\\[([^\\]]*)");
// 指定执行mapper分解之后的输出结果key与value的类型
private final static LongWritable ONE = new LongWritable(1L);
private Text word = new Text();
// 必须实现的方面 执行mapper的操作均在该方法中
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 获取符合要求的内容 循环进行分割内容
Matcher matcher = TAG_PATTERN.matcher(value.toString());
while (matcher.find()) {
String found = matcher.group();
String cleanedString = found.replaceFirst("\"hashTags\":\\[\\{\"text\":\"", "");
String superPolished = cleanedString.split("\",\"")[0];
String useMe = superPolished;
if (superPolished.startsWith("\\u")) {
useMe = StringEscapeUtils.unescapeJava(superPolished);
}
useMe = useMe.split("\"")[0];
// 将符合要求的内容和统计结果输出到对应分析结果中
// 注:以上代码主要是对分析内容进行拆分 因为对应的统计结果ONE均为1
word.set(useMe.toLowerCase());
context.write(word, ONE);
}
}
}
三、reduce实现类
// 凡是实现reduce的必须继承Reducer类;前两个参数为mapper分析之后的结果
// key与value 最后两个参数为统计结果的key与value
// 同时每个类都必须实现reduce方法
public static class LongSumReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable result = new LongWritable();
// 实现reduce方法
// 这个里面根据实际的业务需要实现对应的业务内容统计
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
// 由于该处针对的是同一个内容统计,因而只需要处理统计结果的value即可
result.set(sum);
// 将统计结果提交到task的上下文环境中 输出
context.write(key, result);
}
}
以上即为简单的mapreduce框架的实现过程,基于spring-hadoop api实现
发表评论
-
nacos单机源码调试
2018-12-17 11:35 1241首先从github上获取对应的源码Nacos源码git cl ... -
jstorm源码之TransactionalState
2016-03-21 19:31 903一、作用 主要是通过结合zookeeper,在zookee ... -
jstorm源码之RotatingTransactionalState
2016-03-21 19:29 595一、作用 构建一个Rotationg transacti ... -
jstorm源码之PartitionedTridentSpoutExecutor
2016-03-21 19:28 899一、作用 Partition Spout对应的exec ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:28 0一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之RotatingMap
2016-03-21 19:27 889一、作用 基于LinkedList + HashM ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:24 633一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之TridentTopology
2016-03-16 18:12 2368在jstorm中对应TridentTopology的源码如下, ... -
jstorm操作命令
2016-03-15 18:04 2749启动ZOOPKEEPER zkServer.sh start ... -
JStorm之Supervisor简介
2016-03-15 18:02 1270一、简介Supervisor是JStorm中的工作节点,类似 ... -
JStorm介绍
2016-03-15 17:56 926一、简介Storm是开源的 ... -
mycat的使用---sqlserver和mysql
2016-01-11 14:33 8636数据库中间件mycat的使 ... -
jstorm安装
2015-12-03 19:43 1765关于jstorm单机安装可以 ... -
HBase系列一
2015-11-30 16:17 721关于hbase 一、客户端类 HTable 和 HTabl ... -
spring hadoop系列(六)---HbaseSystemException
2015-11-30 09:13 524一、源码 /** * HBase Data Access e ... -
spring hadoop系列(五)---spring hadoop hbase之HbaseSynchronizationManager
2015-11-27 18:16 878一、源码如下 /** * Synchronization m ... -
spring hadoop 系列(三)--spring hadoop hbase HbaseConfigurationFactoryBean
2015-11-27 16:28 1546一、源码分析 /** * 设定Hbase指定Configu ... -
spring hadoop 系列(二)
2015-11-27 15:26 605一、源码分析 /** * * HbaseAccesso ... -
spring hadoop之batch处理(二)
2015-11-24 18:10 1530一、测试 public class MrBatchApp { ... -
centos6.7 64位 伪分布 安装 cdh5.4.8 + jdk 8
2015-11-09 00:37 2334一、安装JAVA # 创建JAVA的目录 mkdir -p / ...
相关推荐
Spring Hadoop 提供了与之集成的功能。 #### 九、Spring Hadoop 示例应用程序 文档中还包含了一些示例应用程序,这些示例涵盖了从简单的 WordCount 到更复杂的批处理作业,可以帮助开发者快速上手并理解如何使用 ...
3. MapReduce支持:Spring Hadoop提供了MapReduce作业的抽象,包括Mapper、Reducer、Combiner等,允许开发者用面向对象的方式编写MapReduce程序。 二、Spring Hadoop的主要模块 1. Spring Hadoop Core:核心模块,...
- **使用场景**:两者可以互补使用,例如可以在Spring Batch中定期将日志推送到Hadoop的HDFS系统中进行存储和进一步的分析处理。 #### 分层架构 Spring Batch采用了清晰的分层架构设计,主要包括以下几个层面: - *...
4. **Spring框架**:Spring不仅用于后端服务的开发,还可以与大数据平台结合,例如Spring Batch用于批处理,Spring Cloud Data Flow用于大数据流处理。Spring Boot简化了微服务的开发,Spring Cloud则为服务发现、...
当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析。此外还有针对实时在线流式数据处理方面的,同样也是分布式...
1. **批处理框架**:Java世界中有许多批处理框架,如Spring Batch和Apache Batchee,它们提供了完整的批处理生命周期管理,包括读取、处理和写入数据,以及错误处理和事务管理。 2. **数据分块**:为了处理大数据,...
对于复杂的数据转换和处理,Hadoop MapReduce、Apache Spark等大数据处理框架提供了分布式处理能力,适合处理PB级别的数据。 8. **监控和日志**: 对于批处理作业,监控其运行状态和日志记录非常重要。Java应用...