1 wordcount
2 倒排序
3 自定义分区(不同规则输出到不同的文件)
4 自定义文件输出
5 统计文件流
1 自定义输出类
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author root * * @param <Text> reduce 输出的key类型 value类型 * @param <LongWritable> */ public class MyCustomOutputFormat<Text, LongWritable> extends FileOutputFormat<Text, LongWritable>{ @Override public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream os1 = hdfs.create(new Path("/wc/output1/file1.log")); FSDataOutputStream os2 = hdfs.create(new Path("/wc/output2/file2.log")); return new MyRecordWriter<Text, LongWritable>( os1, os2); } public static class MyRecordWriter<Text, LongWritable> extends RecordWriter<Text, LongWritable>{ FSDataOutputStream os1 = null ; FSDataOutputStream os2 = null; public MyRecordWriter(FSDataOutputStream os1, FSDataOutputStream os2) { this.os1 = os1 ; this.os2 = os2 ; } @Override public void write(Text key, LongWritable value2) throws IOException, InterruptedException { Long hang = Long.parseLong( value2.toString()); if(hang%2==0){ os1.writeBytes(key.toString() ); }else{ os2.writeBytes(key.toString() ); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(os1!=null){ os1.close(); } if(os2!=null){ os2.close(); } } } }
2 Mapper 数据整理类
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CIOMapper extends Mapper<LongWritable, Text , Text, LongWritable >{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = StringUtils.split(line, " "); for(String word :words ){ context.write( new Text(word) , key );; } } }
3 运行的主类(Map中数据直接输出所以没有使用到reducer)
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CIORunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration() ; Job job = Job.getInstance(conf) ; job.setJarByClass(CIORunner.class ); job.setMapperClass( CIOMapper.class ); //job.setReducerClass( CIOReducer.class ); 没有reducer就不用了 job.setOutputKeyClass( Text.class ); job.setOutputValueClass(LongWritable.class ); job.setMapOutputKeyClass( Text.class); job.setMapOutputValueClass( LongWritable.class ); job.setOutputFormatClass(MyCustomOutputFormat.class); FileInputFormat.setInputPaths(job, "/wc/input/xiyou.txt"); FileOutputFormat.setOutputPath(job, new Path("/wc/outputcount")); // FileInputFormat.setInputPaths(job, "D:\\wordcount\\wordcount.txt"); // FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output")); job.waitForCompletion(true) ; } }
相关推荐
2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算的认识**:认识到分布式计算的局限性与优势,在实际应用中需要权衡数据规模和计算需求。 4. ...
在IT行业中,Hadoop是一个广泛使用的开源框架,用于处理和存储大规模数据。...通过遵循`README.md`中的指导,结合对这些技术的理解,我们可以成功构建出适配Hadoop 2.x的Eclipse插件,从而提升MapReduce开发效率。
可以从Github上下载`hadoop2x-eclipse-plugin`,解压缩后,将jar文件复制到Eclipse的plugins目录,并运行Eclipse的清理命令以启用插件。这一步骤只需在首次安装后执行一次。 配置Hadoop-Eclipse-Plugin是实验的关键...
总之,Spark-x.x.x-bin-hadoop包为本地调试提供了便利,涵盖了Spark的各种功能和与Hadoop的集成,使开发者能够在本地环境中高效地测试和优化大数据处理任务。通过理解Spark的组件和其与Hadoop的交互方式,你将能更好...
2. **JobBuilder**:用于构建MapReduce作业,支持添加Mapper、Reducer类,并能设置输入输出格式、分区器和排序器等。 3. **Job提交与监控**:插件集成了Hadoop作业提交的功能,开发者可以直接在Eclipse中启动作业,...