- 浏览: 87055 次
- 性别:
- 来自: 郑州
文章分类
- 全部博客 (69)
- java (28)
- linux (6)
- redis (4)
- C# (3)
- 架构 (10)
- java ee (1)
- web (1)
- 操作系统 (7)
- sqlserver (1)
- android (2)
- Hadoop (12)
- 大数据 (21)
- 分布式 事务 消息 (10)
- linux mysql (1)
- 数据库 (3)
- 关于hadoop之bootshell使用 (2)
- 关于hbase---HTableInterfaceFactory (1)
- Spring (3)
- Hbase (5)
- jstorm (10)
- nginx (1)
- 分布式 (1)
- 区块链 (3)
- dubbo (1)
- nacos (1)
- 阿里 (1)
- go (3)
- 缓存 (1)
- memcached (1)
- ssdb (1)
- 源码 (1)
最新评论
-
想个可以用的名字:
楼主,能不能给发一份源代码,1300246542@qqq.co ...
spring+websocket的使用 -
wahahachuang5:
web实时推送技术使用越来越广泛,但是自己开发又太麻烦了,我觉 ...
websocket -
dalan_123:
前提是你用的是spring mvc 才需要加的1、在web.x ...
spring+websocket的使用 -
string2020:
CharacterEncodingFilter这个filter ...
spring+websocket的使用
一、测试
public class MrBatchApp {
// Log
private static final Log log = LogFactory.getLog(MrBatchApp.class);
//
public static void main(String[] args) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
System.out.println("TEST");
// 加载对应的xml配置文件
AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:/META-INF/spring/*-context.xml");
log.info("Batch Tweet Hashtag MR Job Running");
// 关闭"钩子" 为了方便在适当的时候关闭 spring ioc
// (在非web环境下,关闭spring ioc需要手动完成)
context.registerShutdownHook();
// job 发射器
// JobLaucher是一个简化的job的控制接口;基于运行时不同的标识
// 该接口并不能确保执行job是同步还是异步
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
// job
Job job = context.getBean(Job.class);
// 运行job
jobLauncher.run(job, new JobParameters());
}
}
二、xml配置文件
(1)、common 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
// job 仓库
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
// 事务
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
// job launcher
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>
</beans>
(2)、特殊配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
// 引入common 配置
<import resource="batch-common-context.xml"/>
// hdfs uri/分析目录/统计目录/分析源文件
<context:property-placeholder location="hadoop.properties"/>
<context:component-scan base-package="org.springframework.samples.hadoop.mapreduce" />
// 指定hdfs
<hdp:configuration>
fs.defaultFS=${hd.fs}
</hdp:configuration>
// 设定hdp 脚本 用于创建localSourceFile、inputDir、outputDir
<hdp:script id="setupScript" location="file-prep.groovy" run-at-startup="true">
<hdp:property name="localSourceFile" value="${localSourceFile}"/>
<hdp:property name="inputDir" value="${tweets.input.path}"/>
<hdp:property name="outputDir" value="${tweets.output.path}"/>
</hdp:script>
// 指定mapreduce step to step执行
<!-- required since Hadoop Job is a class not an interface and we need to use a Job with step scope to access #{jobParameters['...']} -->
<bean class="org.springframework.batch.core.scope.StepScope">
<property name="proxyTargetClass" value="true"/>
</bean>
// 设置job steps
<job id="job" xmlns="http://www.springframework.org/schema/batch">
<step id="hashtagcount" next="result-step">
<tasklet ref="hashtagcount-tasklet" />
</step>
<step id="result-step">
<tasklet ref="results"/>
</step>
</job>
<hdp:job-tasklet id="hashtagcount-tasklet" job-ref="hashtagcountJob" scope="step"/>
<hdp:job id="hashtagcountJob"
input-path="${tweets.input.path}"
output-path="${tweets.output.path}"
mapper="org.springframework.samples.hadoop.mapreduce.HashtagCount$TokenizerMapper"
reducer="org.springframework.samples.hadoop.mapreduce.HashtagCount$LongSumReducer"
scope="step" />
// 指定统计结果 输出
<hdp:script-tasklet id="results" scope="step">
<hdp:script location="classpath:results.groovy">
<hdp:property name="outputDir" value="${tweets.output.path}"/>
</hdp:script>
</hdp:script-tasklet>
</beans>
三、groovy脚本
// 判断分析源文件所在的目录是否存在 不存在创建 并将源文件复制到指定目录下
// 同时修改该文件夹的权限
if (!fsh.test(inputDir)) {
fsh.mkdir(inputDir);
fsh.copyFromLocal(localSourceFile, inputDir);
fsh.chmod(700, inputDir)
}
// 判断统计结果目录是否存在 存在则删除
if (fsh.test(outputDir)) {
fsh.rmr(outputDir)
}
-----------------------------------------------------------------------
// 输出分析统计结果的内容
println "RESULTS from " + outputDir
old = new File('results.txt')
if( old.exists() ) {
old.delete()
}
fsh.get(outputDir + '/part-r-*', 'results.txt');
String fileContents = new File('results.txt').text
println fileContents
以上即可完全通过xml完成mapreduce的batch处理
public class MrBatchApp {
// Log
private static final Log log = LogFactory.getLog(MrBatchApp.class);
//
public static void main(String[] args) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
System.out.println("TEST");
// 加载对应的xml配置文件
AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:/META-INF/spring/*-context.xml");
log.info("Batch Tweet Hashtag MR Job Running");
// 关闭"钩子" 为了方便在适当的时候关闭 spring ioc
// (在非web环境下,关闭spring ioc需要手动完成)
context.registerShutdownHook();
// job 发射器
// JobLaucher是一个简化的job的控制接口;基于运行时不同的标识
// 该接口并不能确保执行job是同步还是异步
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
// job
Job job = context.getBean(Job.class);
// 运行job
jobLauncher.run(job, new JobParameters());
}
}
二、xml配置文件
(1)、common 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
// job 仓库
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
// 事务
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
// job launcher
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>
</beans>
(2)、特殊配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
// 引入common 配置
<import resource="batch-common-context.xml"/>
// hdfs uri/分析目录/统计目录/分析源文件
<context:property-placeholder location="hadoop.properties"/>
<context:component-scan base-package="org.springframework.samples.hadoop.mapreduce" />
// 指定hdfs
<hdp:configuration>
fs.defaultFS=${hd.fs}
</hdp:configuration>
// 设定hdp 脚本 用于创建localSourceFile、inputDir、outputDir
<hdp:script id="setupScript" location="file-prep.groovy" run-at-startup="true">
<hdp:property name="localSourceFile" value="${localSourceFile}"/>
<hdp:property name="inputDir" value="${tweets.input.path}"/>
<hdp:property name="outputDir" value="${tweets.output.path}"/>
</hdp:script>
// 指定mapreduce step to step执行
<!-- required since Hadoop Job is a class not an interface and we need to use a Job with step scope to access #{jobParameters['...']} -->
<bean class="org.springframework.batch.core.scope.StepScope">
<property name="proxyTargetClass" value="true"/>
</bean>
// 设置job steps
<job id="job" xmlns="http://www.springframework.org/schema/batch">
<step id="hashtagcount" next="result-step">
<tasklet ref="hashtagcount-tasklet" />
</step>
<step id="result-step">
<tasklet ref="results"/>
</step>
</job>
<hdp:job-tasklet id="hashtagcount-tasklet" job-ref="hashtagcountJob" scope="step"/>
<hdp:job id="hashtagcountJob"
input-path="${tweets.input.path}"
output-path="${tweets.output.path}"
mapper="org.springframework.samples.hadoop.mapreduce.HashtagCount$TokenizerMapper"
reducer="org.springframework.samples.hadoop.mapreduce.HashtagCount$LongSumReducer"
scope="step" />
// 指定统计结果 输出
<hdp:script-tasklet id="results" scope="step">
<hdp:script location="classpath:results.groovy">
<hdp:property name="outputDir" value="${tweets.output.path}"/>
</hdp:script>
</hdp:script-tasklet>
</beans>
三、groovy脚本
// 判断分析源文件所在的目录是否存在 不存在创建 并将源文件复制到指定目录下
// 同时修改该文件夹的权限
if (!fsh.test(inputDir)) {
fsh.mkdir(inputDir);
fsh.copyFromLocal(localSourceFile, inputDir);
fsh.chmod(700, inputDir)
}
// 判断统计结果目录是否存在 存在则删除
if (fsh.test(outputDir)) {
fsh.rmr(outputDir)
}
-----------------------------------------------------------------------
// 输出分析统计结果的内容
println "RESULTS from " + outputDir
old = new File('results.txt')
if( old.exists() ) {
old.delete()
}
fsh.get(outputDir + '/part-r-*', 'results.txt');
String fileContents = new File('results.txt').text
println fileContents
以上即可完全通过xml完成mapreduce的batch处理
发表评论
-
nacos单机源码调试
2018-12-17 11:35 1220首先从github上获取对应的源码Nacos源码git cl ... -
jstorm源码之TransactionalState
2016-03-21 19:31 888一、作用 主要是通过结合zookeeper,在zookee ... -
jstorm源码之RotatingTransactionalState
2016-03-21 19:29 581一、作用 构建一个Rotationg transacti ... -
jstorm源码之PartitionedTridentSpoutExecutor
2016-03-21 19:28 887一、作用 Partition Spout对应的exec ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:28 0一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之RotatingMap
2016-03-21 19:27 876一、作用 基于LinkedList + HashM ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:24 615一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之TridentTopology
2016-03-16 18:12 2363在jstorm中对应TridentTopology的源码如下, ... -
jstorm操作命令
2016-03-15 18:04 2737启动ZOOPKEEPER zkServer.sh start ... -
JStorm之Supervisor简介
2016-03-15 18:02 1248一、简介Supervisor是JStorm中的工作节点,类似 ... -
JStorm介绍
2016-03-15 17:56 918一、简介Storm是开源的 ... -
mycat的使用---sqlserver和mysql
2016-01-11 14:33 8617数据库中间件mycat的使 ... -
jstorm安装
2015-12-03 19:43 1747关于jstorm单机安装可以 ... -
HBase系列一
2015-11-30 16:17 711关于hbase 一、客户端类 HTable 和 HTabl ... -
spring hadoop系列(六)---HbaseSystemException
2015-11-30 09:13 497一、源码 /** * HBase Data Access e ... -
spring hadoop系列(五)---spring hadoop hbase之HbaseSynchronizationManager
2015-11-27 18:16 873一、源码如下 /** * Synchronization m ... -
spring hadoop 系列(三)--spring hadoop hbase HbaseConfigurationFactoryBean
2015-11-27 16:28 1540一、源码分析 /** * 设定Hbase指定Configu ... -
spring hadoop 系列(二)
2015-11-27 15:26 597一、源码分析 /** * * HbaseAccesso ... -
spring hadoop之mapreduce batch
2015-11-24 15:51 635一、测试 // 定义hadoop configuration ... -
spring hadoop
2015-11-24 10:19 595一、源码 // 如下代码实现 读取指定hdfs路径下tmp文件 ...
相关推荐
Spring Data Hadoop 是一个扩展框架,它为 Spring 框架、Spring Batch 和 Spring Integration 提供了扩展支持,以便构建可管理且健壮的数据处理管道。此框架旨在简化在 Hadoop 生态系统中的开发工作,提供了一种更加...
Hadoop主要用于处理PB级别的大数据集,而Spring Batch则在GB到TB级别的数据处理上表现更为出色。 - **架构设计**:Spring Batch的设计更加轻量级,易于嵌入现有的应用程序中。而Hadoop则是一套完整的分布式计算框架...
4. 高度集成:Spring Hadoop与其他Spring模块(如Spring Batch、Spring Data等)无缝集成,可实现更复杂的分布式数据处理方案。 总结,Spring for Apache Hadoop 是Java开发者在Hadoop世界中的一把利器,它通过...
Spring Batch 能很好地与 Spring 框架的其他模块(如 Spring Data、Spring Integration)以及外部系统(如 JDBC、JMS、Hadoop)集成,实现数据处理的无缝对接。 **七、最佳实践** 在使用 Spring Batch 时,应遵循...
对于需要处理大量数据的任务,Spring Batch 提供了一种可靠的批量处理解决方案。 #### 13. NoSQL and Big Data 随着大数据时代的到来,NoSQL 数据库和 Hadoop 等大数据处理技术变得越来越重要。Spring 也提供了...
由Spring Batch支持的Hadoop工作流引擎。 特征 Pig / Hive / Spark脚本或通用Shell命令的工作流程。 参数管理。 顺序或并行脚本执行。 轻松设置Java / Python UDF。 要求 要构建项目,您将需要安装以下软件: ...
Part I. Background 1. The Spring Data Project ...13. Creating Big Data Pipelines with Spring Batch and Spring Integration Part VI. Data Grids 14. GemFire: A Distributed Data Grid Bibliography Index
Spring Data项目就是一种简化Java应用构建的数据访问技术,它可以帮助开发人员高效地使用最新的数据处理和管理工具,同时还能够以最新的方式使用传统的数据库。 《Spring Data实战》从Spring Data背景知识、关系型...
此外,书中还介绍了一些高级特性,比如使用Hadoop和MongoDB来处理大数据和云环境下的数据存储。 在Web开发方面,本书主要讨论了Spring MVC、动态脚本语言的集成、与Grails框架和Groovy语言的整合以及RESTful Web...
7. **集成能力**:Spring XD与Spring生态系统紧密集成,可以轻松地与Spring Boot、Spring Batch、Spring Cloud Data Flow等其他Spring项目配合使用。此外,它还支持与Apache Hadoop、Cassandra、MongoDB等大数据技术...
"batch2"这个标题可能是指一个批量处理任务的第二部分,或者是一个项目或程序的第二个批次。批量处理通常涉及到处理大量数据或执行一系列自动化任务,这在大数据分析、系统维护或后台服务中非常常见。在Java世界里,...
4. **Spring框架**:Spring不仅用于后端服务的开发,还可以与大数据平台结合,例如Spring Batch用于批处理,Spring Cloud Data Flow用于大数据流处理。Spring Boot简化了微服务的开发,Spring Cloud则为服务发现、...
- POI常与Spring Batch、Quartz等任务调度框架结合,用于定时生成或更新Office文档。 - 在大数据领域,POI可以与Hadoop结合,处理存储在HDFS上的Office文件。 在实际开发中,Apache POI广泛应用于报表生成、数据...
- Hadoop大数据处理平台的基本命令。 4. **分布式系统** - 分布式系统的幂等性设计。 - 微服务架构下的数据管理策略。 - 分布式Session管理的最佳实践。 5. **高级主题** - JVM的内部结构与性能调优。 - ...
Python是一种多用途的编程语言,广泛应用于Web开发、数据分析、机器学习等多个领域,其简洁的语法和强大的库支持使其成为开发者首选的语言之一。 在数据库领域,OpenTSDB是一个分布式的、基于时间序列的数据库,...