- 浏览: 149278 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
EclipseEye:
fair_jm 写道不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程 -
fair_jm:
不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程
@Public
@Evolving
A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.
@Evolving
A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.
@InterfaceAudience.Public @InterfaceStability.Evolving public class DBRecordReader<T extends DBWritable> extends RecordReader<LongWritable, T> { private static final Log LOG = LogFactory.getLog(DBRecordReader.class); private ResultSet results = null; private Class<T> inputClass; private Configuration conf; private DBInputFormat.DBInputSplit split; private long pos = 0; private LongWritable key = null; private T value = null; private Connection connection; protected PreparedStatement statement; private DBConfiguration dbConf; private String conditions; private String [] fieldNames; private String tableName; /** * @param split The InputSplit to read data for * @throws SQLException */ public DBRecordReader(DBInputFormat.DBInputSplit split, Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, String cond, String [] fields, String table) throws SQLException { this.inputClass = inputClass; this.split = split; this.conf = conf; this.connection = conn; this.dbConf = dbConfig; this.conditions = cond; this.fieldNames = fields; this.tableName = table; } protected ResultSet executeQuery(String query) throws SQLException { this.statement = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); return statement.executeQuery(); } /** Returns the query for selecting the records, * subclasses can override this for custom behaviour.*/ protected String getSelectQuery() { StringBuilder query = new StringBuilder(); // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits. if(dbConf.getInputQuery() == null) { query.append("SELECT "); for (int i = 0; i < fieldNames.length; i++) { query.append(fieldNames[i]); if (i != fieldNames.length -1) { query.append(", "); } } query.append(" FROM ").append(tableName); query.append(" AS ").append(tableName); //in hsqldb this is necessary if (conditions != null && conditions.length() > 0) { query.append(" WHERE (").append(conditions).append(")"); } String orderBy = dbConf.getInputOrderBy(); if (orderBy != null && orderBy.length() > 0) { query.append(" ORDER BY ").append(orderBy); } } else { //PREBUILT QUERY query.append(dbConf.getInputQuery()); } try { query.append(" LIMIT ").append(split.getLength()); query.append(" OFFSET ").append(split.getStart()); } catch (IOException ex) { // Ignore, will not throw. } return query.toString(); } /** {@inheritDoc} */ public void close() throws IOException { try { if (null != results) { results.close(); } if (null != statement) { statement.close(); } if (null != connection) { connection.commit(); connection.close(); } } catch (SQLException e) { throw new IOException(e.getMessage()); } } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //do nothing } /** {@inheritDoc} */ public LongWritable getCurrentKey() { return key; } /** {@inheritDoc} */ public T getCurrentValue() { return value; } /** * @deprecated */ @Deprecated public long getPos() throws IOException { return pos; } /** {@inheritDoc} */ public float getProgress() throws IOException { return pos / (float)split.getLength(); } /** {@inheritDoc} */ public boolean nextKeyValue() throws IOException { try { if (key == null) { key = new LongWritable(); } if (value == null) { value = createValue(); } if (null == this.results) { // First time into this method, run the query. this.results = executeQuery(getSelectQuery()); } if (!results.next()) return false; // Set the key field value as the output key value key.set(pos + split.getStart()); value.readFields(results); pos ++; } catch (SQLException e) { throw new IOException("SQLException in nextKeyValue", e); } return true; } //... ... }
发表评论
-
数据迁移相关(关系型数据库mysql,oracle和nosql数据库如hbase)
2015-04-01 15:15 738HBase数据迁移(1) http://www.importn ... -
zookeeper适用场景:如何竞选Master及代码实现
2015-04-01 14:53 796zookeeper适用场景:如何竞选Master及代码实现 h ... -
MR/hive 数据去重
2015-04-01 14:43 739海量数据去重的五大策略 http://www.ciotimes ... -
面试牛x题
2015-03-18 23:50 0hive、mr(各需三道) 1.分别使用Hadoop MapR ... -
使用shell并发上传文件到hdfs
2015-03-16 21:41 1274使用shell并发上传文件到hdfs http://mos19 ... -
hadoop集群监控工具Apache Ambari
2015-03-14 17:27 0Apache Ambari官网 http://ambari.a ... -
Hadoop MapReduce优化相关
2015-03-16 21:46 474[大牛翻译系列]Hadoop 翻译文章索引 http://ww ... -
数据倾斜问题 牛逼(1)数据倾斜之MapReduce&hive
2015-03-16 21:43 805数据倾斜总结 http://www.alidata.org/a ... -
MapReduce牛逼(4)WritableComparable接口
2015-03-12 08:57 608@Public @Stable A Writable whi ... -
MapReduce牛逼(3)(继承WritableComparable)实现自定义key键,实现二重排序
2015-03-12 08:57 650package sort; import jav ... -
MapReduce牛逼(2)MR简单实现 导入数据到hbase例子
2015-03-12 08:57 1285package cmd; /** * MapRe ... -
MapReduce牛逼(1)MR单词计数例子
2015-03-11 00:44 1215package cmd; import org. ... -
InputFormat牛逼(9)FileInputFormat实现类之SequenceFileInputFormat
2015-03-11 00:24 1411一、SequenceFileInputFormat及Seque ... -
InputFormat牛逼(8)FileInputFormat实现类之TextInputFormat
2015-03-11 00:19 583/** An {@link InputFormat} for ... -
InputFormat牛逼(5)org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
2015-03-10 23:10 606@Public @Stable A InputFormat ... -
InputFormat牛逼(4)org.apache.hadoop.mapreduce.RecordReader<KEYIN, VALUEIN>
2015-03-10 22:50 373@Public @Stable The record rea ... -
InputFormat牛逼(3)org.apache.hadoop.mapreduce.InputFormat<K, V>
2015-03-10 22:46 665@Public @Stable InputFormat d ... -
InputFormat牛逼(2)org.apache.hadoop.mapreduce.InputSplit & DBInputSplit
2015-03-10 22:22 538@Public @Stable InputSplit rep ... -
InputFormat牛逼(1)org.apache.hadoop.mapreduce.lib.db.DBWritable
2015-03-10 22:07 558@Public @Stable Objects that a ... -
如何把hadoop2 的job作业 提交到 yarn平台
2015-01-08 21:09 0aaa萨芬撒点
相关推荐
Hadoop2lib还可能包含Hadoop MapReduce库,这是实现MapReduce任务的关键,它提供了编写和执行MapReduce作业所需的类和接口。此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和...
Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class);...org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10
6. **org.apache.hadoop.mapred**: 这是Hadoop MapReduce的旧版接口,包括作业提交、任务调度和执行。主要类有`JobConf`、`JobClient`和`JobTracker`,用于管理MapReduce作业的生命周期。 7. **org.apache.hadoop....
- **核心类库**:如`org.apache.hadoop.fs.FileSystem`、`org.apache.hadoop.mapreduce.Job`等,提供了与HDFS交互和MapReduce编程的基本接口。 4. **开发与调试** - **Hadoop API**:学习如何使用Hadoop API开发...
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class CustomSpaceDelimitedInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable,...
- **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final ...
<value>org.apache.hadoop.io.compress.SnappyCodec</value> </property> </configuration> ``` 同时,如果你想要在MapReduce作业中指定特定的输入或输出压缩格式,可以在作业配置中进行设置,如下所示: ```...
1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache.hadoop.mapreduce.InputFormat`的类。这个类将覆盖父类的方法来实现自定义的输入处理逻辑。 2. **实现`getSplits()`方法**:此方法用于将输入数据...
在源码层面,org.apache.hadoop.mapreduce包包含了关键的接口和类。Writeable、Counter和ID相关类处理计数和标识,Context类提供Mapper和Reducer所需的上下文信息,Mapper、Reducer和Job类定义了MapReduce的基本操作...
在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。...
6. **org.apache.hadoop.util**: 提供了一些通用工具和实用程序,比如`GenericOptionsParser`用于解析命令行参数,`Tool`接口使得编写可执行的Hadoop应用程序变得简单。 了解和熟练掌握Hadoop 0.20.2 API,对于开发...
-inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat org.apache.hadoop.mapred.TextOutputFormat 在本示例中,我们使用了BZip2Codec对数据进行压缩。该方法可以大大减少存储空间。
相反,OutputFormat接口定义了如何将Reduce任务的输出写回存储系统,`org.apache.hadoop.mapreduce.lib.output.FileOutputFormat`则是常见的输出格式。 **Job与TaskTracker** 在MapReduce框架中,JobTracker是任务...
《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...
import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private ...
通常会有自定义的Mapper和Reducer类,它们继承自Hadoop提供的基类,如`org.apache.hadoop.mapreduce.Mapper`和`org.apache.hadoop.mapreduce.Reducer`。此外,可能还有自定义的Driver类,它负责配置和启动MapReduce...
您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 < project> ... < dependencies> < ...
在Hadoop 2.x版本之后,MapReduce的API经历了一次重大的更新,产生了两个主要的版本:旧版的`org.apache.hadoop.mapred`和新版的`org.apache.hadoop.mapreduce`。本文将主要探讨旧版的`org.apache.hadoop.mapred`...
它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,而Java API是开发者与Hadoop交互的主要方式。本文将深入探讨Java如何操作Hadoop,以及在"Java-API-Operate-Hadoop.rar"压缩包中提供的资源。 ...