`

Hadoop HDFS API编程

 
阅读更多
HDFS API编程

修改hadoop-env.sh
export HADOOP_CLASSPATH=/home/anker/hadoop-1.1.2/myclass

设置环境变量
修改.bash_profile,当用户一登陆,就会执行此文件
PATH=$PATH:$HOME/bin:/usr/jdk1.7.0_51/bin
JAVA_HOME=/usr/jdk1.7.0_51/

export JAVA_HOME
export PATH

设置好后,可以通过env来检查变量。

//编译时要指定classpath javac -classpath ../hadoop-core-1.1.2.jar URLCat.java

//若是编译的类在jar包中,需要指定具体的jar包。
//保险的办法是加载$HADOOP_HOME目录以及lib子目录下所有的jar包

//执行 ../bin/hadoop URLCat hdfs://anker.centos1:9000/user/anker/in/test2.txt

//1.通过URL来读取HDFS 中的文件内容
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URL;

//从Hadoop URL中读取数据
public class  URLCat
{
	static{//JVM只能调用一次此方法,所以选择此静态方法。这个限制意味着玉如其他程序的其他组件,已经声明了一个URLStreamHandlerFactory,将无法再使用上述方法从Hadoop中读取数据。
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());//通过FsurlStreamHandlerFactory示例调用setURLStreamHandlerFactory实现java能够实现Hadoop的hdfs url
	}
	public static void main(String[] args) 
	{
		InputStream in = null;
		try{
			in = new URL(args[0]).openStream();
			IOUtils.copyBytes(in,System.out,4096,false);//IOUtils类可以实现输入流和输出流(system.out)之间复制数据,4096为缓存区大小,false为复制结束后是否关闭数据流
		}finally{
			IOUtils.closeStream(in);
		}
	}
}


//2. ant程序示例

可以将此放入到.bash_profile文件中
export HADOOP_HOME=/home/anker/hadoop-1.1.2

/* This sample demostrate use of HDFS Java API
 * This sample is loosely based on the 
 * http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample
*/
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSJavaAPIDemo {

        public static void main(String[] args) throws IOException {
                Configuration conf = new Configuration(); //Provides access to configuration parameters. A New Configuration
                conf.addResource(new Path(
                                "/u/hadoop-1.0.2/conf/core-site.xml"));//Add a configuration resource
                conf.addResource(new Path(
                                "/u/hadoop-1.0.2/conf/hdfs-site.xml"));//Add a configuration resource Add a configuration resourcefile -
									//file-path of resource to be added, the local filesystem is examined directly to find the resource, 
									//without referring to the classpath

                FileSystem fileSystem = FileSystem.get(conf);//Returns the configured filesystem implementation. 
                System.out.println(fileSystem.getUri());//Returns a URI whose scheme and authority identify this FileSystem

                Path file = new Path("demo.txt");
                if (fileSystem.exists(file)) {
                        System.out.println("File exists.");
                } else {
                        // Writing to file
                        FSDataOutputStream outStream = fileSystem.create(file);//Utility that wraps a OutputStream in a DataOutputStream, buffers output through a BufferedOutputStream and creates a checksum file.
                        outStream.writeUTF("Welcome to HDFS Java API!!!");
                        outStream.close();
                }

                // Reading from file
                FSDataInputStream inStream = fileSystem.open(file);
                String data = inStream.readUTF();
                System.out.println(data);
                inStream.close();

                // deleting the file. Non-recursively.
                // fileSystem.delete(file, false);
                fileSystem.close();
        }
}




如何执行该程序
bin/hadoop jar HDFSJavaAPI.jar HDFSJavaAPIDemo


Hadoop文件系统中通过Hadoop Path对象来代表文件,可以将一条路径视为一个文件。如hdfs://localhost/user/tom/quangle.txt
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例。获取FileSystem实例有几种静态工厂方法:
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)
throws IOException
Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径(conf/core-site.xml)。第一个方法返回的是
默认文件系统(在conf/core-site.xml中指定。若没有指定,则使用默认的本地文件系统)
第二个通过给定的URI方案和权限来确定要使用的文件系统,如果给定的URI没有指定方案,则返回默认方案。 
第三种是通过一个指定的用户获取filesystem

在某些例子中,如果想获取一个本地的filesystem实例,可以使用以下方法:
public static LocalFileSystem getLocal(Configuration conf) throws IOException

当获取到Filesystem的实例后,我们调用open()函数来获取文件的输入流。
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

第一个方法默认的buffer size 为4k

//3.FileSystem API读取数据
import org.apache.hadoop.conf.Configuration;
import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.net.URI;
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);//Returns the FileSystem for this URI's scheme and authority.
InputStream in = null;
try {
in = fs.open(new Path(uri));//Opens an FSDataInputStream at the indicated Path
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
//实际上,FileSystem对象中的open()方法返回的是FSdataINputStream对象,这个类继承了java.io.DataInputStream接口的一个特殊类,
并支持随机访问,因此可以从流的任意位置读取数据。
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置便宜量(getpos())的方法。
调用seek()来定位大于文件长度的位置将导致IOException异常。

package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,Closeable {
// implementation elided
}

FSDataInputStream类也实现了PositionedReable接口,从一个指定偏移量处读取文件一部分:


public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}



//4.FSDataInputStream API读取数据

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class  FileSystemDoubleCat
{
	public static void main(String[] args) 
	{
		String uri = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri),conf);
		FSDataInputStream in = null;
		try{
			in = fs.open(new Path(uri));
			IOUtils.copyBytes(in,System.out,4096,false);
			in.seek(0);//go back to the start of the file
			IOUtils.copyBytes(in,System.out,4096,false);	
		}finally{
			IOUtils.closeStream(in);
		}
	}
};




//5.写入数据

FileSystem类有一系列创建文件的方法。最简单的方法时给准备创建的文件制定一个path对象,然后返回一个
用户写入数据的数据流。
public FSDataOutputStream create(Path f) throws IOException

上述方法有多个重载对象,允许我们制定是否需要强制覆盖已有的文件,文件备份数量,写入文件时所用的缓冲区
大小,文件块大小及文件权限。

还有一个重载方法Progressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到应用:

package org.apache.hadoop.util;
public interface Progressable {
public void progress();
}

另一种新建文件的方法,是使用append()方法在一个已有文件末尾追加数据
public FSDataOutputStream append(Path f) throws IOException

该追加操作允许一个writer打开文件后再访问该文件的最后偏移量追加数据。
每次Hadoop调用progress方法时,也就是每次将64kb数据包写入datanode管线后--
打印一个时间点来显示整个过程。


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.io.OutputStream;
import org.apache.hadoop.util.Progressable

public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() { //Create an FSDataOutputStream at the indicated Path with write-progress reporting.
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);//Copies from one stream to another. true means close the inputstream and outputstream at then end.
}
}


FSDataoutputStream对象

FileSystem 实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,
它也有一个查询文件当前位置的方法:
package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
	// implementation elided
}
// implementation elided
}

但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个
已打开的文件顺序写入,或在现有文件的末尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入,
因此写入时定位就没有任何意义。



目录
FileSystem实例提供了创建目录的方法
public boolean mkdirs(Path f) throws IOException

这个方法可以一次性创建所有必要还没有的父目录,就像java.io.file类的mkdirs()方法。如果目录都已创建成功,
则返回true。

通常,你不需要显式创建一个目录,因为调用create()写入文件时会自动创建父目录。

FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。


public class ShowFileStatusTest {
private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
private FileSystem fs;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster(conf, 1, true, null);
fs = cluster.getFileSystem();

OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
}
@After
public void tearDown() throws IOException {
if (fs != null) { fs.close(); }
if (cluster != null) { cluster.shutdown(); }
}
@Test(expected = FileNotFoundException.class)
public void throwsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
}
@Test
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
assertThat(stat.isDir(), is(false));
assertThat(stat.getLen(), is(7L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 1));
assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rw-r--r--"));
}
@Test
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
FileStatus stat = fs.getFileStatus(dir);
assertThat(stat.getPath().toUri().getPath(), is("/dir"));
assertThat(stat.isDir(), is(true));
assertThat(stat.getLen(), is(0L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 0));
assertThat(stat.getBlockSize(), is(0L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
}

列出文件

列出一个文件或者目录的信息,通过FileSystem的listStatus()方法。
public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

当传入的参数是一个文件时,它会简单转变为以数组形式长度为1的FileStatus对象。当传入的参数
是一个目录时,则返回0或多个FileStatus对象,表示此目录包含的文件和目录。

一个重载的方法时允许使用PathFilter来限制匹配的文件和目录。如果指定一组路径,其执行结果相当
与依次轮流传递每条路径并调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中。
但该方法更为方便,这从文件系统树的不同分支构建输入文件列表时,这是很有用的。

注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组。

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom

public class ListStatus {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}


文件模式,在单个操作中处理一批文件,是一个常见要求。在一个表达式中使用通配符来匹配多个文件
是比较方便的,无需列举每个文件和目录来指定输入,该操作称为统配(globbing),Hadoop为执行统配提供了
两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws
IOException

globStatus()方法返回与路径匹配的所有文件的FileStatus对象数组,并按照路径排序。PathFileter命令
可以作为可选项进一步对匹配进行限制。
Hadoop支持的统配符与Unix bash相同。

通配符及含义
*		匹配0或多个字符
?		匹配单一字符
[ab]	匹配{a,b}集合中的一个字符
[ ab]	匹配非{a,b}集合中的一个字符
[a-b]	匹配一个在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b
[ a-b]	匹配一个不在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b
{a,b}	匹配包含a或b中的一个表达式
\c		转义字符

PathFilter对象

统配符并不总能精确地描述我们想要访问的文件集。FileSystem中的listStatus()和globStatus()
提供了可选的PathFilter对象,使我们可以通过编程方式控制通配符:
public interface PathFilter
{
	boolean accpet(Path path);//Tests whether or not the specified abstract pathname should be included in a pathname list.
}
public class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}

删除数据
使用FileSystem的delete()方法可以永久性删除文件或目录
public boolean delete(Path f,boolean recursive) throws IOException
如果f是一个文件或空目录,那么recursive值就会被忽略。只有在recrusive值为true时,一个非空目录
及其内容才会被删除。(否则会抛出IOException异常。)

 

分享到:
评论

相关推荐

    java 从hadoop hdfs读取文件 进行groupby并显示为条形图

    在Java编程环境中,结合Hadoop HDFS(Hadoop Distributed File System)进行大数据处理是一种常见的实践。本项目聚焦于从HDFS读取数据,执行GROUP BY操作,并将统计结果以条形图的形式展示出来,提供了直观的数据...

    python 操作 Hadoop hdfs

    而Python作为一种灵活易用的编程语言,常常被用来与Hadoop HDFS进行交互,实现数据的读取、写入和下载操作。本篇文章将深入探讨如何使用Python来操作Hadoop HDFS。 首先,我们需要一个Python库,如`hdfs`或`pyarrow...

    实验二、HDFS shell操作及HDFS Java API编程

    适合刚接触hadoop的学生或小白,内容包括HDFS shell操作及HDFS Java API编程 有图片有代码

    elcipse java hadoop操作hdfs的api

    在Hadoop生态系统中,Java API是开发者与HDFS(Hadoop Distributed File System)进行交互的主要方式。本资料主要涵盖了如何使用Eclipse环境进行Java开发,利用Hadoop的HDFS API来操作分布式文件系统。以下是对这些...

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...

    教你如何查看API及使用hadoop新api编程 高清完整版PDF下载

    ### Hadoop新旧API对比及应用 #### 一、引言 随着Hadoop生态系统的不断发展和完善,其核心组件之一——MapReduce也在不断演进。为了更好地支持分布式计算的需求,Hadoop引入了新的API(Application Programming ...

    java操作Hadoop源码之HDFS Java API操作-上传文件

    在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...

    java操作Hadoop源码之HDFS Java API操作-创建目录

    我们将探讨Hadoop的环境配置、HDFS API的使用以及具体创建目录的步骤。 首先,理解Hadoop的环境配置至关重要。在进行Java编程之前,你需要确保已经在本地或集群上正确安装了Hadoop,并配置好了`hadoop-env.sh`和`...

    Hadoop HDFS应用

    综上所述,Hadoop HDFS应用的知识点涵盖了HDFS的分布式文件系统概念、支持的多种文件系统类型及URI模式、权限及用户体系、命令行操作接口以及JAVA编程接口等。掌握这些知识点对于实现高效的大数据处理和分析具有重要...

    hadoop2API帮助文档

    Hadoop2 API帮助文档是针对Hadoop 2.x版本的核心组件提供的一份详细参考资料,旨在帮助开发者高效地利用Hadoop框架进行大数据处理和分析。Hadoop是一个开源的分布式计算框架,由Apache软件基金会维护,其核心包括...

    大数据技术基础实验报告-调用Java API实现HDFS操作.doc

    总的来说,本实验旨在使学习者熟悉Hadoop环境下的Java编程,理解如何调用HDFS API进行文件操作,这是一项重要的技能,因为在大数据处理中,HDFS是数据存储的核心组件。通过这样的实践,学生将能够更好地理解和应用大...

    hadoop-api中文说明文档

    Hadoop API中文说明文档是针对Apache Hadoop框架的开发者指南,它详细解释了如何使用Hadoop的编程接口来处理大规模数据。Hadoop是开源的分布式计算框架,它允许在廉价硬件集群上存储和处理海量数据。这个文档对于...

    HDFS文件系统基本文件命令、编程读写HDFS

    HDFS 文件系统基本文件命令、编程读写 HDFS HDFS(Hadoop Distributed File System)是一种分布式文件系统,用于存储和管理大规模数据。它是 Hadoop 云计算平台的核心组件之一,提供了高效、可靠、可扩展的数据存储...

    大数据实验二-HDFS编程实践

    3. **熟悉Java API**:除了Shell命令外,Hadoop还提供了丰富的Java API来操作HDFS。熟悉这些API有助于开发者在实际项目中更加灵活地使用HDFS。 #### 实验过程 ##### Shell命令实践 - **创建与查看文件**:使用`...

    hadoop-API.zip_Hadoop学习_hadoop api

    Hadoop中文版API.chm文档将提供详细的函数解释和示例,帮助开发者快速掌握Hadoop的编程模型。 此外,Hadoop生态还包括其他重要组件,如Hive(数据仓库工具)、Pig(数据处理语言)、Spark(快速数据处理引擎)等,...

    hadoop之hdfs中所依赖jar

    这些JAR包包含了Hadoop的相关API和实现,使得用户可以通过编程方式操作HDFS。 标题"hadop之hdfs中所依赖jar"指出的关键点是,为了成功地进行HDFS操作,必须确保正确地引入了必要的JAR包。这里提到的"Hadoop之上传...

    Hadoop-HDFS-实践教程

    Hadoop是一个开源的分布式计算框架,它允许用户通过简单易用的编程模型处理大型数据集,而HDFS(Hadoop Distributed File System)是其核心组件,用于存储和处理大数据。 首先,Hadoop是一个由Apache软件基金会开发...

    大数据课程的期末项目,基于spark、hadoop hdfs、mongodb,使用scala,进行电影推荐.zip

    该项目是一个大数据课程的期末项目,主要利用了Spark、Hadoop HDFS和MongoDB等技术,通过Scala编程语言来实现电影推荐系统。这个系统是基于大数据处理的,因此涉及到的知识点非常广泛,涵盖了分布式计算、数据存储、...

    02--HDFS Java API操作.docx

    HDFS Java API 是 Hadoop 中的一部分,提供了一个 Java 编程接口来访问和操作 HDFS 中的文件和目录。该 API 提供了多种方法来操作文件和目录,包括创建、删除、读取和写入文件,列出目录中的文件和子目录等。 二、...

    hadoop2.6-api.zip

    1. **HDFS API**:包括`org.apache.hadoop.hdfs`包,提供了对HDFS的读写操作。例如,`DFSClient`类是与HDFS通信的主要客户端,`DFSInputStream`和`DFSOutputStream`分别用于文件的读写。此外,`FileSystem`接口提供...

Global site tag (gtag.js) - Google Analytics