- 浏览: 568715 次
- 性别:
- 来自: 济南
-
文章分类
- 全部博客 (270)
- Ask chenwq (10)
- JSF (2)
- ExtJS (5)
- Life (19)
- jQuery (5)
- ASP (7)
- JavaScript (5)
- SQL Server (1)
- MySQL (4)
- En (1)
- development tools (14)
- Data mining related (35)
- Hadoop (33)
- Oracle (13)
- To Do (2)
- SSO (2)
- work/study diary (10)
- SOA (6)
- Ubuntu (7)
- J2SE (18)
- NetWorks (1)
- Struts2 (2)
- algorithm (9)
- funny (1)
- BMP (1)
- Paper Reading (2)
- MapReduce (23)
- Weka (3)
- web design (1)
- Data visualisation&R (1)
- Mahout (7)
- Social Recommendation (1)
- statistical methods (1)
- Git&GitHub (1)
- Python (1)
- Linux (1)
最新评论
-
brandNewUser:
楼主你好,问个问题,为什么我写的如下的:JobConf pha ...
Hadoop ChainMap -
Molisa:
Molisa 写道mapred.min.split.size指 ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
Molisa:
mapred.min.split.size指的是block数, ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
heyongcs:
请问导入之后,那些错误怎么解决?
Eclipse导入Mahout -
a420144030:
看了你的文章深受启发,想请教你几个问题我的数据都放到hbase ...
Mahout clustering Canopy+K-means 源码分析
0、测试集样例
ball, 3.5, 12.7, 9.0 car, 15, 23.76, 42.23 device, 0.0, 12.4, -67.1
1、测试Point3D InputFormat
import java.io.IOException; import java.net.URI; import javax.xml.soap.Text; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * desc:Custom Data Types <code>TestPoint3DInputFormat</code> * * @author chenwq */ public class TestPoint3DInputFormat { /** * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // TODO Auto-generated method stub System.out.println("hello,chenwq!"); Job job=new Job(); Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(args[1]), conf); fs.delete(new Path(args[1])); job.setJobName("测试MyInputFormat程序。。。。。"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(Point3DinputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Point3D.class); job.setMapperClass(Point3DMapper.class); job.setNumReduceTasks(0); job.waitForCompletion(false); } }
2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * desc:Custom Data Types <code>Point</code> * * @author chenwq */ public class Point3D implements WritableComparable { public float x; public float y; public float z; public Point3D(float x, float y, float z) { this.x = x; this.y = y; this.z = z; } public Point3D() { this(0.0f, 0.0f, 0.0f); } public void set(float x, float y, float z) { this.x = x; this.y = y; this.z = z; } public void write(DataOutput out) throws IOException { out.writeFloat(x); out.writeFloat(y); out.writeFloat(z); } public void readFields(DataInput in) throws IOException { x = in.readFloat(); y = in.readFloat(); z = in.readFloat(); } public String toString() { return Float.toString(x) + ", " + Float.toString(y) + ", " + Float.toString(z); } public float distanceFromOrigin() { return (float) Math.sqrt(x * x + y * y + z * z); } public int compareTo(Object other) { float myDistance = this.distanceFromOrigin(); float otherDistance = ((Point3D) other).distanceFromOrigin(); return Float.compare(myDistance, otherDistance); } public boolean equals(Object o) { Point3D other = (Point3D) o; if (!(other instanceof Point3D)) { return false; } return this.x == other.x && this.y == other.y && this.z == other.z; } public int hashCode() { return Float.floatToIntBits(x) ^ Float.floatToIntBits(y) ^ Float.floatToIntBits(z); } }
3、自定义Point3DInputFormat类型,供MapReduce编程模型使用
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class Point3DinputFormat extends FileInputFormat<Text, Point3D> { @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } @Override public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new objPosRecordReader(); } public static class objPosRecordReader extends RecordReader<Text,Point3D>{ public LineReader in; public Text lineKey; public Point3D lineValue; public StringTokenizer token=null; public Text line; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { //lineKey.set(token.nextToken()); return lineKey; } @Override public Point3D getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job); line=new Text(); lineKey=new Text(); lineValue=new Point3D(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub int linesize=in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split(","); if(pieces.length != 4){ throw new IOException("Invalid record received"); } // try to parse floating point components of value float fx, fy, fz; try{ fx = Float.parseFloat(pieces[1].trim()); fy = Float.parseFloat(pieces[2].trim()); fz = Float.parseFloat(pieces[3].trim()); }catch(NumberFormatException nfe){ throw new IOException("Error parsing floating poing value in record"); } lineKey.set(pieces[0]); lineValue.set(fx, fy, fz); return true; } } }
4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{ protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{ context.write(key, value); } }
发表评论
-
Parallel K-Means Clustering Based on MapReduce
2012-08-04 20:28 1431K-means is a pleasingly paral ... -
Pagerank在Hadoop上的实现原理
2012-07-19 16:04 1491转自:pagerank 在 hadoop 上的实现原理 ... -
Including external jars in a Hadoop job
2012-06-25 20:24 1232办法1: 把所有的第三方jar和自己的class打成一个大 ... -
[转]BSP模型与实例分析(一)
2012-06-15 22:26 0一、BSP模型概念 BSP(Bulk Synchr ... -
Hadoop中两表JOIN的处理方法
2012-05-29 10:35 9761. 概述 在传统数据库(如:MYSQL)中,JOIN ... -
Hadoop DistributedCache
2012-05-27 23:45 1137Hadoop的DistributedCache,可以把 ... -
MapReduce,组合式,迭代式,链式
2012-05-27 23:27 24011.迭代式mapreduce 一些复杂的任务难以用一 ... -
Hadoop ChainMap
2012-05-27 23:09 2001单一MapReduce对一些非常简单的问题提供了很好的支持。 ... -
广度优先BFS的MapReduce实现
2012-05-25 21:47 4322社交网络中的图模型经常需要构造一棵树型结构:从一个特定的节点出 ... -
HADOOP程序日志
2012-05-23 19:53 1034*.log日志文件和*.out日志文件 进入Hadoo ... -
TFIDF based on MapReduce
2012-05-23 11:58 964Job1: Map: input: ( ... -
个人Hadoop 错误列表
2012-05-23 11:31 1505错误1:Too many fetch-failure ... -
Hadoop Map&Reduce个数优化设置以及JVM重用
2012-05-22 11:29 2452Hadoop与JVM重用对应的参数是map ... -
有空读下
2012-05-20 23:59 0MapReduce: JT默认task scheduli ... -
Hadoop MapReduce Job性能调优——修改Map和Reduce个数
2012-05-20 23:46 26800map task的数量即mapred ... -
Hadoop用于和Map Reduce作业交互的命令
2012-05-20 16:02 1237用法:hadoop job [GENERIC_OPTION ... -
Eclipse:Run on Hadoop 没有反应
2012-05-20 11:46 1294原因: hadoop-0.20.2下自带的eclise ... -
Hadoop0.20+ custom MultipleOutputFormat
2012-05-20 11:46 1556Hadoop0.20.2中无法使用MultipleOutput ... -
Custom KeyValueTextInputFormat
2012-05-19 16:23 1729在看老版的API时,发现旧的KeyValueTextInpu ... -
Hadoop SequenceFile Writer And Reader
2012-05-19 15:22 2079package cn.edu.xmu.dm.mpdemo ...
相关推荐
5. **配置和使用自定义InputFormat**:在你的MapReduce作业中,通过设置`job.setInputFormatClass()`方法指定你的自定义InputFormat类。同时,如果需要,你还可以在JobConf中添加额外的配置参数来指导InputFormat和...
本压缩包“Hadoop高级编程——构建与实现大数据解决方案”将深入探讨如何利用Hadoop进行高效的数据操作,构建实际的大数据解决方案。 一、Hadoop概述 Hadoop是由Apache基金会开发的开源项目,主要由Hadoop ...
本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据项目。 首先,我们要理解Hadoop的基础架构。Hadoop由两个主要组件构成:Hadoop ...
大数据实验 实验五:MapReduce 初级编程实践
实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...
【MapReduce初级编程实践】是大数据处理中的一项基础任务,主要应用于大规模数据集的并行计算。在这个实验中,我们关注的是如何利用MapReduce来实现文件的合并与去重操作。MapReduce是一种分布式计算模型,由Google...
在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...
### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...
在MapReduce编程模型中,InputFormat是至关重要的组件,它负责将存储在HDFS(Hadoop Distributed File System)上的数据转化为可以被MapTask处理的键值对。本文将深入讲解MapReduce的InputFormat,特别是默认的...
自定义数据类型在MapReduce中扮演着重要角色,尤其在处理不同类型数据和复杂数据结构时显得尤为重要。 首先,了解Java和Hadoop之间的基本字段映射关系是理解自定义数据类型的基础。例如,在Java中常用的String类型...
3. **MapReduce编程**:理解MapReduce的工作原理,编写Map函数和Reduce函数,处理键值对数据,以及如何定义InputFormat和OutputFormat。 4. **YARN资源调度**:YARN作为新一代的资源管理器,负责任务调度和集群资源...
在分布式计算领域,MapReduce是一种广泛使用的编程模型,主要用于处理和生成大规模数据集。本篇文章将深入探讨MapReduce中的本地聚集(Local Aggregation)和Combinner的概念,这两个特性是优化MapReduce性能的关键...
MapReduce编程的高级特性,如自定义分区、排序和分组,以及计数器的使用,也是提高数据处理能力的关键。在开发中,开发者往往需要根据具体的业务需求,灵活地应用这些高级特性。比如,自定义分区可以控制Map输出键值...
MapReduce是Apache Hadoop框架下的并行计算模型,用于处理和生成大数据集。在这个实例中,我们将详细探讨如何使用MapReduce实现倒排索引。 首先,我们来看`Mapper`类。`InvertedIndexMapper`是Map阶段的核心,它...
**大数据技术原理及应用——MapReduce初级编程实践** MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个实验中,我们将学习如何利用MapReduce编程解决实际问题,包括数据去重...
MapReduce是分布式计算的一种编程模型,常用于处理大规模数据集。在这个实例中,我们看到的是一个基于MapReduce的数据去重操作,这个操作在大数据处理中非常常见,尤其是当处理的数据源包含重复记录时。下面将详细...
一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...
"云计算技术之————MapReduce" 云计算技术中的 MapReduce 是一种分布式编程模型,用于大规模群组中的海量数据处理。MapReduce 由 Google 公司的 Jeffrey Dean 和 Sanjay Ghemawat 开发,旨在解决大规模数据处理...