工作需要,自定义实现hadoop的一个inputformat,使用v1的接口(org.apache.hadoop.mapred),此inputformat的功能为读取mysql数据库的数据,将这些数据分成几块作为多个InputSplit,
package com.demo7;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
public class MysqlInputformat implements InputFormat<Text,Text>{
private static Logger logger = Logger.getLogger(MysqlInputformat.class);
private String beginTradeDay = Config.getConfig().getProperty("dealday.begin.mysqlinputformat");
private String endTradeDay = Config.getConfig().getProperty("dealday.end.mysqlinputformat");
private int oneTaskStocks = Integer.valueOf(Config.getConfig().getProperty("stocknumber_permap.mysqlinputformat"));
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
MysqlInputSplit mysqlSplit = (MysqlInputSplit)split;
logger.info("---------------------ln------------------------");
logger.info(mysqlSplit.tradeDay);
logger.info(mysqlSplit.stockcodes);
return new MysqlRecordReader(mysqlSplit.tradeDay, mysqlSplit.stockcodes);
}
@Override
public InputSplit[] getSplits(JobConf arg0, int arg1) throws IOException {
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
logger.info(String.format("begin generate map task, from %s to %s", beginTradeDay, endTradeDay));
HashMap<String,ArrayList<String>> dayStocks = new HashMap<String,ArrayList<String>>(); //key为交易日,value为股票列表
Connection conn = null;
try {
conn = DriverManager.getConnection(MysqlProxy.getProxoolUrl(), MysqlProxy.getProxoolConf());
// 创建一个Statement对象
Statement stmt = conn.createStatement(); // 创建Statement对象
String sql = String.format(
"select date,stock_code from gushitong.s_kline_day_complexbeforeright_ln " +
" where date>='%s' and date<='%s'",
beginTradeDay, endTradeDay);
ResultSet rs = stmt.executeQuery(sql);// 创建数据对象
String date = null;
String stockcode = null;
while (rs.next()) {
date = rs.getString("date");
stockcode = rs.getString("stock_code");
if(dayStocks.containsKey(date) == false){
dayStocks.put(date, new ArrayList<String>(3300));
}
dayStocks.get(date).add(stockcode);
}
rs.close();
stmt.close();
conn.close();
} catch (Exception e) {
logger.error(e);
}
Joiner joiner = Joiner.on(":").useForNull("");
SimpleDateFormat sdf_1 = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdf_2 = new SimpleDateFormat("yyyy-MM-dd");
for(Map.Entry<String, ArrayList<String>> dayStockEntry : dayStocks.entrySet()){
String tradeDay = dayStockEntry.getKey();
for(int i=0; i<dayStockEntry.getValue().size();){
int endindex;
if(i+oneTaskStocks<=dayStockEntry.getValue().size()){
endindex = i+oneTaskStocks;
}else{
endindex = dayStockEntry.getValue().size();
}
String stocks = joiner.join(dayStockEntry.getValue().subList(i, endindex));
i = endindex;
try {
MysqlInputSplit split = new MysqlInputSplit();
split.tradeDay = sdf_2.format(sdf_1.parse(tradeDay));
split.stockcodes = stocks;
splits.add(split);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
InputSplit[] rtn = splits.toArray(new InputSplit[splits.size()]);
return rtn;
}
public static class MysqlInputSplit implements InputSplit{
public String getTradeDay() {
return tradeDay;
}
public void setTradeDay(String tradeDay) {
this.tradeDay = tradeDay;
}
public String getStockcodes() {
return stockcodes;
}
public void setStockcodes(String stockcodes) {
this.stockcodes = stockcodes;
}
private String tradeDay = null;
private String stockcodes = null;
@Override
public void readFields(DataInput in) throws IOException {
this.tradeDay = Text.readString(in);
this.stockcodes = Text.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, tradeDay);
Text.writeString(out, stockcodes);
}
@Override
public long getLength() throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public String[] getLocations() throws IOException {
String[] arr = {"aa"}; //必须有,因为不管有没有用,框架都要用。
return arr;
}
}
public static class MysqlRecordReader implements RecordReader<Text, Text>{
public String tradeDay = null;
public String stockcodes = null;
private boolean isDeal = false;
private long begintimeLong = new Date().getTime();
public MysqlRecordReader(String tradeDay, String stockcodes){
this.tradeDay = tradeDay;
this.stockcodes = stockcodes;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text createKey() {
return new Text();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return 0;
}
/**
* 预计15小时为100%
*/
@Override
public float getProgress() throws IOException {
logger.info(String.format("get process is %f", ((float)(new Date().getTime() - this.begintimeLong))/(float)(15*3600*1000)));
return Math.min(0.9f, ((float)(new Date().getTime() - this.begintimeLong))/(float)(15*3600*1000));
}
@Override
public synchronized boolean next(Text key, Text value) throws IOException {
if(this.isDeal == true){
return false;
}else{
this.isDeal = true;
}
key.set(this.tradeDay);
value.set(this.stockcodes);
return true;
}
}
}
分享到:
相关推荐
本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...
在Hadoop MapReduce框架中,InputFormat是处理输入数据的核心组件。它负责将原始数据分割成逻辑上的键值对(key-value pairs),然后为每个分区分配一个或多个这些键值对给Mapper。默认情况下,Hadoop支持如...
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
3. **数据输入与输出**:探讨InputFormat和OutputFormat接口,理解如何自定义输入输出格式以适应不同类型的数据源。 4. **错误处理与容错机制**:讲解Hadoop的检查点、重试和故障恢复策略,以确保任务的可靠性。 5...
MapReduce是一种分布式计算模型,由Google提出,Hadoop对其进行了实现。在MapReduce中,数据处理分为两个主要阶段:Map阶段和Reduce阶段。Map阶段将原始数据分解成小块,然后对每个小块进行并行处理;Reduce阶段则...
此外,可能会介绍与MapReduce相关的高级主题,如MapReduce与Spark、Tez等新型计算框架的对比,以及如何在Hadoop上实现迭代计算。 总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
现在我们来深入探讨如何使用 Hadoop MapReduce 实现 KMeans 算法。 首先,我们需要理解 KMeans 算法的基本原理。KMeans 算法的核心思想是通过迭代找到最优的簇中心,使得每个数据点到所属簇中心的距离最小。算法...
YARN(Yet Another Resource Negotiator)作为Hadoop V2的核心组成部分,实现了资源管理与计算框架的分离。其主要组件包括: - **ResourceManager (RM)**:集群级别的资源管理器,负责接收来自ApplicationMaster的...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,以及熟悉Python编程语言在大数据环境下的应用。 首先,Hadoop ...
在实际开发中,我们还需要处理数据输入和输出格式,例如使用`TextInputFormat`和`TextOutputFormat`,以及自定义的`InputFormat`和`OutputFormat`。此外,可以通过配置Job对象来调整MapReduce作业的行为,如设置并行...
AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目),含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来,...
在大数据处理领域,Apriori算法与Hadoop MapReduce的结合是实现大规模数据挖掘的关键技术之一。Apriori算法是一种经典的关联规则学习算法,用于发现数据集中频繁出现的项集,进而挖掘出有趣的关联规则。而Hadoop ...
这可能是指某本书的第10章第4节,该章节详细介绍了如何在 Hadoop MapReduce 中实现 MR_DesicionTreeBuilder。书中可能会涵盖以下内容: - 数据模型和输入输出格式 - Map 和 Reduce 函数的实现细节 - 特征选择...
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
Hadoop MapReduce是Apache Hadoop框架的核心组件之一,它设计用于分布式处理大规模数据集,而v2的引入主要是为了解决v1在资源管理和效率上的局限性。 MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing