`

hadoop mr实现单表列转行--mr system.out数据位置

 
阅读更多

 

 

 

 

1 代码和业务:

 

package mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
a	b	1 
a	b   2 
a	b	3 
c	d	4 
c	d	5 
c	d	6

期待变成
a	b	1,2,3 
c	d	4,5,6 

 * @author zm
 * 
 */
public class ConcatWSMapReduce {

	public static class ConcatWSMapper extends Mapper<LongWritable, Text, ConcatWS, Text>{
		/**
		 * 每一行执行一次map函数
		 * @param key 表示字节在源文件中偏移量
		 * @param value 行文本内容
		 */
		
		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
			final String[] splited = value.toString().split("\t");
			System.out.println("splited length is:  " + splited.length);
			String col1 = splited[0];
			String col2 = splited[1];
			String col3 = splited[2];
			System.out.println("col1: " + col1  +  "col2: " + col2 + "col3: " + col3);
			context.write(new ConcatWS(col1,col2), new Text(col3));
		};
	}
	
	
	//分组:<hello,{1,1}><me,{1}><you,{1}>【把相同key的value放到一起】    reduce方法是每一组调用一次 左侧结果 为3组 则调用3次reduce方法
	public static class ConcatWSReducer extends Reducer<ConcatWS, Text, Text, Text>{
		/**
		 * 每个组调用一次reduce函数
		 * @param word 表示单词
		 * @param times 表示相同key的value的迭代器
		 */
		protected void reduce(ConcatWS ws, Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<ConcatWS,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
			StringBuilder  sb = new StringBuilder("");
			for (Text col3 : v2s) {
				sb.append(col3.toString()).append(",");
			}
			System.out.println("reduce key content:  " + ws.toString());
			System.out.println("reduce val content:  " + sb.toString());
			context.write(new Text(ws.toString()), new Text(sb.toString()));
		};
	}
	
	
	public static void main(String[] args) throws Exception {
		// 设置Job对象
		final Configuration conf = new Configuration();
		final Job job = new Job(conf);
		job.setJobName(ConcatWSMapReduce.class.getSimpleName());
		job.setJarByClass(ConcatWSMapReduce.class);
		// 给Job对象设置自定义 mapper  reducer
		job.setMapperClass(ConcatWSMapper.class);
		job.setReducerClass(ConcatWSReducer.class);
		// 设置map reduce输出参数类型
		job.setMapOutputKeyClass(ConcatWS.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 设置Job任务要处理的数据源和输出数据目的地
		FileInputFormat.addInputPaths(job, "/zmdata/zm.txt"); // 注意是addInputPaths 用的是复数的方法
		Path outputpath = new Path("/zmdata/zmout");
		FileSystem fs = FileSystem.get(new URI("/"), conf);
		if(fs.exists(outputpath)){
			fs.delete(outputpath, true);
		}
		FileOutputFormat.setOutputPath(job, outputpath);
		// 执行Job
		job.waitForCompletion(true);
	}

	// 注意写成内部public 类 否则执行mr时 会报不识别ConcatWS.source
public static class ConcatWS implements WritableComparable<ConcatWS>{

			private String col1 = "";
			private String col2 = "";
			
			public ConcatWS(){}
			
			public ConcatWS(String col1, String col2){
				this.col1 = col1;
				this.col2 = col2;
			}
			
			@Override
			public void write(DataOutput out) throws IOException {
				out.writeUTF(col1);
				out.writeUTF(col2);
			}

			@Override
			public void readFields(DataInput in) throws IOException {
				this.col1 = in.readUTF();
				this.col2 = in.readUTF();
			}

			@Override
			public int compareTo(ConcatWS ws) {
				int result = 0;
				result = this.col1.compareTo(ws.col1);
				if(result == 0){
					result = this.col2.compareTo(ws.col2);
				}
				return result;
			}   
			
			 @Override  
		    public String toString() {   
		        return col1 + "\t" + col2 ;   
		    }  
			 
			
		}
	 
	 
}















 

 

 

 

2 本机使用ant脚本提交后执行,ant脚本如下:

 

<?xml version="1.0" encoding="UTF-8"?>

<!-- 该文件与src文件夹、lib文件夹同一级  -->
<project name="hadoop2测试项目" basedir="." default="sshexec">

	<!--属性设置-->
	<property environment="env" />
	<property file="build.properties" />
	<property name="src.dir" value="${basedir}/src" />
	<property name="java.lib.dir" value="${env.JAVA_HOME}/lib" />
	<property name="classes.dir" value="${basedir}/classes" />
	<property name="dist.dir" value="${basedir}/dist" />
	<property name="project.lib.dir" value="${basedir}/lib" />
	<property name="localpath.dir" value="${basedir}" />
	<property name="remote.home" value="~"/>
	<!--可以修改:hadoop集群的hostname或者ip-->
	<property name="remote.hostname" value="hadoop3"/>
	<!--可以修改:登录hadoop集群所在linux的用户名-->
	<property name="remote.username" value="root"/>
	<!--可以修改:登录hadoop集群所在liniux的密码-->
	<property name="remote.password" value="123456"/>
	<!--可以修改:每次需要运行的main类,写到这里。运行时拼接为hadoop jar xxx.jar MainClass -->
	<property name="main.class" value="mapreduce.ConcatWSMapReduce"/>
	<!--可以修改:hadoop集群在linux的部署路径-->
	<property name="hadoop.path" value="/opt/hadoop-2.5.2"/>
	
	<!-- 基本编译路径设置 -->
	<path id="compile.classpath">
		<fileset dir="${java.lib.dir}">
			<include name="tools.jar" />
		</fileset>
		<fileset dir="${project.lib.dir}">
			<include name="*.jar" />
		</fileset>
	</path>

	<!-- 运行路径设置 -->
	<path id="run.classpath">
		<path refid="compile.classpath" />
		<pathelement location="${classes.dir}" />
	</path>
	<!-- 清理,删除临时目录 -->
	<target name="clean" description="清理,删除临时目录">
		<!--delete dir="${build.dir}" /-->
		<delete dir="${dist.dir}" />
		<delete dir="${classes.dir}" />
		<echo level="info">清理完毕</echo>
	</target>
	<!-- 初始化,建立目录,复制文件 -->
	<target name="init" depends="clean" description="初始化,建立目录,复制文件">
		<mkdir dir="${classes.dir}" />
		<mkdir dir="${dist.dir}" />
	</target>
	<!-- 编译源文件-->
	<target name="compile" depends="init" description="编译源文件">
		<javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7"  includeAntRuntime="false">
			<classpath refid="compile.classpath" />
			<compilerarg line="-encoding UTF-8 "/>  
		</javac>
	</target>

	<!-- 打包类文件 -->
	<target name="jar" depends="compile" description="打包类文件">
		<jar jarfile="${dist.dir}/jar.jar">
			<fileset dir="${classes.dir}" includes="**/*.*" />
		</jar>
	</target>
	
	<!--上传到服务器
	**需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。
	-->
	<target name="ssh" depends="jar">
		<scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.hostname}:${remote.home}" password="${remote.password}" trust="true"/>
	</target>
	
	<target name="sshexec" depends="ssh">
	      <sshexec host="${remote.hostname}" username="${remote.username}"  password="${remote.password}" trust="true" command="${hadoop.path}/bin/hadoop jar ${remote.home}/jar.jar ${main.class}"/>
	</target>
	
</project>

 

 

 

3  执行的mr代码中写上了 sysotem.out 那么这些数据在哪里呢?  比如上面的输出:

 



 

上图所示,找到你执行mr任务的编号,进去后看如下图:

 



 

 

 

  • 大小: 8.6 KB
  • 大小: 30.7 KB
分享到:
评论

相关推荐

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    本文将针对"flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz"这一特定压缩包,探讨Flink 1.14.0如何与Hadoop 3.x实现兼容,并深入解析其背后的原理。 Flink 1.14.0是一个强大的流处理引擎,它提供了...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....

    hadoop-common-2.6.0-bin-master.zip

    Hadoop是大数据处理领域的一个关键框架,它由Apache软件基金会维护,主要负责分布式存储和计算。`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    eclipse运行mr插件hadoop-eclipse-plugin-2.6.0.jar

    在IT行业中,大数据处理是一个至关重要的领域,而Hadoop作为开源的大数据处理框架,为开发者提供了强大的工具。本文将深入探讨如何使用Eclipse IDE结合hadoop-eclipse-plugin-2.6.0.jar插件,实现在Windows环境下...

    hadoop-2.7.4-with-centos-6.7.tar.gz

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的出现为大数据处理提供了高效、可靠的解决方案。Hadoop 2.7.4是Hadoop发展过程中的一个重要版本,它在Hadoop 2.x系列中引入了许多增强功能和修复了大量...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop插件apache-hadoop-3.1.0-winutils-master.zip

    例如,`hadoop fs -ls /`可以列出根目录下的所有文件和目录。 7. **MapReduce编程**:如果你打算在Windows上进行MapReduce编程,还需要配置开发环境,包括设置Hadoop的类路径,以及使用IDE(如IntelliJ IDEA或...

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-eclipse-plugin-3.1.1.tar.gz

    Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...

    hadoop-2.7.4-with-windows.tar.gz

    Hadoop是Apache软件基金会的一个开源项目,主要由HDFS(Hadoop Distributed File System)和YARN(Yet Another Resource Negotiator)两大部分组成,用于构建分布式计算系统。在Hadoop 2.7.4中,HDFS提供了高容错性...

    hudi-hadoop-mr-bundle-0.11.0.jar

    hudi-hadoop-mr-bundle-0.11.0.jar 配合文档

    flink-shaded-hadoop-2-uber-2.6.5-10.0.zip

    《Flink与Hadoop的深度整合:flink-shaded-hadoop-2-uber-2.6.5-10.0.zip详解》 在大数据处理领域,Apache Flink 和 Apache Hadoop 是两个不可或缺的重要组件。Flink作为一个实时流处理框架,以其高效的事件驱动和...

    hadoop-3.1.1.3.1.4.0-315.tar.gz

    ambari-2.7.5 编译过程中四个大包下载很慢,所以需要提前下载,包含:hbase-2.0.2.3.1.4.0-315-bin.tar.gz ,hadoop-3.1.1.3.1.4.0-315.tar.gz , grafana-6.4.2.linux-amd64.tar.gz ,phoenix-5.0.0.3.1.4.0-315....

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar

    Flink-1.11.2与Hadoop3集成JAR包,放到flink安装包的lib目录下,可以避免Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the ...这个报错,实现Flink与Hadoop的集成

    hadoop-common-2.7.3-bin-master包含hadoop.dll、winutils.exe

    Hadoop是大数据处理领域中的一个核心框架,由Apache软件基金会维护。它主要设计用于分布式存储和计算,使得大规模数据处理变得更加高效和便捷。Hadoop 2.7.3是Hadoop的一个版本,其中包含了`hadoop-common-2.7.3-bin...

    flink-shaded-hadoop-2-uber-2.7.2-10.0.jar

    Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber

    hadoop-eclipse-plugin-3.3.1.jar

    Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar

Global site tag (gtag.js) - Google Analytics