`
yangzc106
  • 浏览: 156383 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop远程开发

阅读更多

 

哎,最近公司又让我做flex的东西。也没什么时间搞Hadoop了。这里先把这段时间的东西做一个简单的总结。

 

思路:把本地的类编译打包->上传到Hadoop集群环境->运行。

上述属于废话,网上的相关内容很多。

 

Hadoop配置方面的就不多说了,baidu一下你就知道。

为了达到远程开发的目的,之前在配置过程中犯了很多错误。主要是masters里的问题,还有我的ubuntu的hosts文件总出问题。

 

另外,又简单看了看hadoop源码,唯一的感觉就是到处都是配置项。

 

这里还是主要是贴代码吧。

 

实现:还以WordCount为例。

 

 

代码如下:

    WordCount.java

package org.yangzc.hadoop.demo;

import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.yangzc.hadoop.tools.FileTools;
import org.yangzc.hadoop.tools.JarTools;
import org.yangzc.hadoop.tools.JobTools;

/**
 * 测试用例
 * @author yangzc
 *
 */
public class WordCount {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileTools.copyToRemote(conf, "C:/Documents and Settings/Administrator/桌面/test.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt");
//		FileTools.copyToLocal(conf, "C:/Documents and Settings/Administrator/桌面/indexhaha.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/e8e035dcd123404fa8cf8f132fa37e4a");
		
		JobTools.addClassPath("D:/workspace/Hadoop/conf");
		Thread.currentThread().setContextClassLoader(JobTools.getClassLoader());
		
		File jarpath = JarTools.makeJar("bin");
		conf.set("mapred.jar", jarpath.toString());
		Job job = new Job(conf, "word count");
		
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(
				"hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt"));
		
		Path destpath = new Path(
				"hdfs://ubuntu:9000/usr/devsoft/tmp/" + UUID.randomUUID().toString().replace("-", ""));
		FileOutputFormat.setOutputPath(job, destpath);
		
		Path rtnpath = FileOutputFormat.getOutputPath(job);
		
		System.out.println(rtnpath);
		job.waitForCompletion(false);
		
//		FileTools.copyDirToLocal(conf, "C:/Documents and Settings/Administrator/桌面", destpath.toString());
		
		Reader reader = new Reader(FileSystem.get(conf),destpath.toString()+"/part-r-00000", conf);
		Writable wa = reader.get(new Text("hao"), new IntWritable(1));
		System.out.println(wa.toString());
	}
}

 

帮助类:

FileTools.java

package org.yangzc.hadoop.tools;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileTools {
	
	public static List<File> getFileList(String parent){
		List<File> filelst = new ArrayList<File>();
		//取得文件列表
		getList(filelst, parent);
		return filelst;
	}
	
	private static void getList(List<File> filelst, String parent){
		File p = new File(parent);
		if(!p.exists())return;
		if(p.isDirectory()){
			File clst[] = p.listFiles();
			if(clst == null || clst.length == 0)return;
			for(File f: clst){
				getList(filelst, f.getAbsolutePath());
			}
		}else{
			filelst.add(p);
		}
	}
	
	public static void copyToRemote(Configuration conf, String local, String dest) throws IOException{
		FileSystem fs = FileSystem.get(conf);
		fs.copyFromLocalFile(new Path(local), new Path(dest));
		fs.close();
	}
	
	public static void copyToLocal(Configuration conf, String local, String dest) throws IOException{
		FileSystem fs = FileSystem.get(conf);
		FSDataInputStream fis = fs.open(new Path(dest));
		FileOutputStream fos = new FileOutputStream(local);
		byte buf[] = new byte[1024];
		int len = -1;
		while((len = fis.read(buf, 0, 1024)) != -1){
			fos.write(buf, 0, len);
		}
		fos.flush();
		fos.close();
		fis.close();
		fs.close();
	}
	
	public static void copyDirToLocal(Configuration conf, String localdir, String destdir) throws IOException{
		if(!new File(localdir).exists())new File(localdir).mkdirs();
		FileSystem fs = FileSystem.get(conf);
		FileStatus filestatus[] = fs.listStatus(new Path(destdir));
		for(FileStatus f: filestatus){
			Path path = f.getPath();
			String local = localdir + "/" + path.getName();
			try{
				copyToLocal(conf, local, path.toString());
			}catch(Exception e){
				System.out.println("read remote file error!!!");
			}
		}
	}
}

 

   JarTools.java

package org.yangzc.hadoop.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;

public class JarTools {
	
	public static File makeJar(String classPath) throws Exception{
		if(classPath == null || "".equals(classPath))throw new Exception("classPath不能为空!!!");
		JarOutputStream jos = null;
		try {
			Manifest manifest = new Manifest();
			manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
			
			//创建临时文件
			final File jarfile = File.createTempFile("tmp_", ".jar", new File(System.getProperty("java.io.tmpdir")));
			
			//注册关闭钩子
			Runtime.getRuntime().addShutdownHook(new Thread(){
				@Override
				public void run() {
					jarfile.deleteOnExit();
				}
			});
			
			FileOutputStream fis = new FileOutputStream(jarfile);
			jos = new JarOutputStream(fis, manifest);
			File file = new File(classPath);
			if(!file.isDirectory())return null;
			List<File> files = FileTools.getFileList(classPath);
			for(File f: files){
				String filepath = f.getAbsolutePath().replace(new File(classPath).getAbsolutePath(), "");
				if(filepath.startsWith("\\")){
					filepath = filepath.substring(1);
				}
				JarEntry entry = new JarEntry(filepath.replace("\\", "/"));
				jos.putNextEntry(entry);
				
				FileInputStream is = new FileInputStream(f);
				byte buf[] = new byte[1024];
				int len = -1;
				while((len = is.read(buf, 0, 1024)) != -1){
					jos.write(buf, 0 , len);
				}
				is.close();
			}
			jos.flush();
			return jarfile;
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			if(jos != null)jos.close();
		}
		return null;
	}
	
	public static void main(String[] args) {
		try {
			JarTools.makeJar("D:/workspace/JarMaker/bin/");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

   JobTools.java

package org.yangzc.hadoop.tools;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.mapreduce.Job;

public class JobTools {
	
	private static List<URL> classPaths = new ArrayList<URL>();
	
	public static void addClassPath(String classPath){
		if(classPath != null && !"".equals(classPath)){
			File f = new File(classPath);
			if(!f.exists())return;
			try {
				classPaths.add(f.getCanonicalFile().toURI().toURL());
			} catch (MalformedURLException e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void addJarDir(String lib){
		File file = new File(lib);
		if(!file.exists())return;
		if(!file.isDirectory())return;
		File fs[] = file.listFiles();
		if(fs == null || fs.length == 0)return;
		for(File f: fs){
			addClassPath(f.getAbsolutePath());
		}
	}
	
	public static URLClassLoader getClassLoader(){
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		if(classLoader == null)classLoader = JobTools.getClassLoader();
		if(classLoader == null)classLoader = ClassLoader.getSystemClassLoader();
		return new URLClassLoader(classPaths.toArray(new URL[0]), classLoader);
	}
	
	public static boolean runJob(Job job){
		try {
			
			return job.waitForCompletion(true);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
		return false;
	}
	
	public static void main(String[] args) {
		
	}
}

 

分享到:
评论
3 楼 蕾恩love 2014-07-24  
Exception in thread "main" java.lang.NullPointerException
at com.pip.data.hadoop.util.tools.FileTools.copyDirToLocal(FileTools.java:75)
at com.pip.data.main.demo.log.WordCount2.main(WordCount2.java:158)
Exception in thread "Thread-8" java.lang.ExceptionInInitializerError
at java.io.File.deleteOnExit(File.java:939)
at com.pip.data.hadoop.util.tools.JarTools$1.run(JarTools.java:35)
Caused by: java.lang.IllegalStateException: Shutdown in progress
at java.lang.Shutdown.add(Shutdown.java:62)
at java.lang.System$2.registerShutdownHook(System.java:1146)
at java.io.DeleteOnExitHook.<clinit>(DeleteOnExitHook.java:20)
... 2 more
2 楼 yangzc106 2013-05-21  
很久了,顽疾了
1 楼 yaogege 2013-05-02  
你用的hadoop是哪个版本?

相关推荐

    hadoop远程访问资料

    标题 "hadoop远程访问资料" 涉及到的核心知识点主要集中在如何在Java环境中通过Eclipse等开发工具,实现对Hadoop集群的远程访问和管理,尤其是与NameNode和DataNode节点的交互。Hadoop是一个分布式文件系统,...

    远程调用执行Hadoop Map/Reduce

    本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...

    Hadoop大数据开发案例教程与项目实战 数据云盘+源代码+文档说明

    基于Hadoop开发出该系统。系统分为前台和后台两部分,前台采用 JSP 和Js编写界面和前端逻辑校验,后台开发主要采用Java语言,使用spring、bootmetro-master等Java EE开发框架,同时使用 MySQL、HDFS Java API等实现...

    【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)

    SpringBoot集成hadoop开发环境(复杂版的WordCount)前言环境清单创建SpringBoot项目创建包创建yml添加集群主机名映射hadoop配置文件环境变量HADOOP_HOME编写代码添加hadoop依赖jar包编译项目造数据IDEA远程提交...

    hadoop本地windows开发环境配置

    为了本地运行Hadoop任务或Spark任务,并将这些任务提交给远程Hadoop服务或Spark服务,需要提前打包所需的jar包。 1. **配置项目设置** 在IDEA中打开项目设置,指定jar包的名称、存储位置,并勾选`Build on Make`...

    Hadoop伪分布式部署文档(包括本地开发环境,eclipse远程连接Hadoop服务器)

    Hadoop伪分布式部署文档是指在单台机器上模拟分布式Hadoop集群的部署文档,包括服务器伪分布式部署、本地Hadoop开发环境部署、Eclipse远程连接Hadoop服务器等内容。下面是该文档的详细解释: 首先,需要安装JDK,...

    myEclipse10.0与hadoop集群远程连接

    将myEclipse与Hadoop集群远程连接起来,可以方便地在开发环境中编写、测试和部署Hadoop应用程序,从而实现高效的数据分析和处理。 首先,我们需要了解myEclipse中的远程系统视图(Remote Systems View)。这是...

    eclipse连接远程hadoop集群开发时权限不足问题解决方案 (2).pdf

    eclipse连接远程hadoop集群开发时权限不足问题解决方案 (2).pdfeclipse连接远程hadoop集群开发时权限不足问题解决方案 (2).pdf

    myeclipse开发hadoop插件

    开发者需要在本地环境中配置Hadoop,然后编写MapReduce程序,最后将这些程序提交到远程Hadoop集群执行。 为了在MyEclipse中开发Hadoop项目,我们需要安装Hadoop-eclipse-plugin。这个插件允许开发者直接在Eclipse或...

    Cygwin+Eclipse搭建Hadoop单机开发环境-3

    在本教程中,我们将深入探讨如何使用Cygwin和Eclipse搭建Hadoop的单机开发环境,以便在Windows操作系统上进行高效的数据处理和分析。Cygwin是一个提供Linux-like环境的开源工具集,使得Windows用户可以运行原本为...

    Cygwin+Eclipse搭建Hadoop单机开发环境离线包-cygwin-setup

    - 在搜索框中输入“ssh”,找到并勾选`openssh`相关的包,这将包含ssh服务器和客户端,用于Hadoop集群间的远程登录。 - 搜索其他必要的工具,如`tar`、`gzip`、`wget`等,它们在处理Hadoop相关的压缩文件和网络...

    hadoop windows上使用所需插件

    此外,Windows上的Hadoop环境可能还需要配置SSH服务,以便进行远程进程管理。可以使用plink或PuTTY等工具来实现。同时,为了模拟真实的集群环境,可以考虑使用Vagrant或Docker在Windows上创建多个虚拟机,每个虚拟机...

    修改hadoop中的io写的,远程调用对象的东西。

    在标题中提到的“修改Hadoop中的io写的,远程调用对象的东西”可能指的是对Hadoop的分布式文件系统(HDFS)进行定制化开发,以便更高效地处理数据或实现特定功能。下面我们将深入探讨这个主题。 1. **Hadoop I/O...

    cygwin+eclipse搭建hadoop开发环境,运行wordcount

    你需要在Eclipse中安装这些插件,然后设置Hadoop的本地或远程运行配置。这涉及到配置HADOOP_HOME环境变量,以及指定Hadoop的namenode和datanode的位置。 "Hadoop环境搭建及wordcount实例运行"文档将带你了解Hadoop...

    Eclipse开发Hadoop相关项目本地配置插件

    5. **连接Hadoop集群**:在Eclipse中配置Hadoop连接,可以让你在本地IDE中调试运行在远程集群上的作业。通过"Big Data Tools",可以在"Window" -&gt; "Preferences" -&gt; "Big Data" -&gt; "Hadoop Cluster"中添加和管理...

    大学hadoop大数据开发基础的环境搭建

    大学 Hadoop 大数据开发基础环境搭建 本文主要介绍了大学 Hadoop 大数据开发基础环境搭建的步骤,包括安装 VMware、CentOS、Hadoop 等相关组件,并配置 IP 设置和 Xshell 连接。 一、 VMware 安装与 LINUX 虚拟机...

    Hadoop集群搭建部署与MapReduce程序关键点个性化开发.doc

    本文将详细阐述如何搭建Hadoop集群以及进行MapReduce程序的关键点个性化开发。 首先,我们来看任务1——Hadoop集群的部署。这一步至关重要,因为它为整个大数据处理系统提供了基础架构。在虚拟机中安装Ubuntu Kylin...

    Windows下Eclispe远程开发Mapreduce程序

    总的来说,Windows下使用Eclipse远程开发MapReduce程序涉及到了Hadoop环境的配置、Eclipse的Hadoop插件、Maven的安装和配置等多个环节,每个步骤都需要细心操作以确保环境的正常运行。这是一个相对复杂的流程,但是...

    hadoop2.6(x64)Win7上远程调试hadoop 集群

    - 在开发环境中创建一个Hadoop项目,编写并编译源代码。 - 将编译后的JAR文件上传到HDFS,然后使用`hadoop jar`命令提交作业。 - 使用IDE(如Eclipse或IntelliJ IDEA)的远程调试功能,设置与`HADOOP_OPTS`中指定...

    eclipse开发hadoop2.5.2所用到都jar

    同时,为了测试和运行程序,你还需要配置本地或远程的Hadoop集群环境。 总结来说,"eclipse开发hadoop2.5.2所用到的jar"指的是在Eclipse中开发Hadoop应用时,需要导入的一系列JAR文件,包括Hadoop的核心组件、依赖...

Global site tag (gtag.js) - Google Analytics