HDFS API编程 修改hadoop-env.sh export HADOOP_CLASSPATH=/home/anker/hadoop-1.1.2/myclass 设置环境变量 修改.bash_profile,当用户一登陆,就会执行此文件 PATH=$PATH:$HOME/bin:/usr/jdk1.7.0_51/bin JAVA_HOME=/usr/jdk1.7.0_51/ export JAVA_HOME export PATH 设置好后,可以通过env来检查变量。 //编译时要指定classpath javac -classpath ../hadoop-core-1.1.2.jar URLCat.java //若是编译的类在jar包中,需要指定具体的jar包。 //保险的办法是加载$HADOOP_HOME目录以及lib子目录下所有的jar包 //执行 ../bin/hadoop URLCat hdfs://anker.centos1:9000/user/anker/in/test2.txt //1.通过URL来读取HDFS 中的文件内容 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; import java.io.InputStream; import java.net.URL; //从Hadoop URL中读取数据 public class URLCat { static{//JVM只能调用一次此方法,所以选择此静态方法。这个限制意味着玉如其他程序的其他组件,已经声明了一个URLStreamHandlerFactory,将无法再使用上述方法从Hadoop中读取数据。 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());//通过FsurlStreamHandlerFactory示例调用setURLStreamHandlerFactory实现java能够实现Hadoop的hdfs url } public static void main(String[] args) { InputStream in = null; try{ in = new URL(args[0]).openStream(); IOUtils.copyBytes(in,System.out,4096,false);//IOUtils类可以实现输入流和输出流(system.out)之间复制数据,4096为缓存区大小,false为复制结束后是否关闭数据流 }finally{ IOUtils.closeStream(in); } } } //2. ant程序示例 可以将此放入到.bash_profile文件中 export HADOOP_HOME=/home/anker/hadoop-1.1.2 /* This sample demostrate use of HDFS Java API * This sample is loosely based on the * http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HDFSJavaAPIDemo { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); //Provides access to configuration parameters. A New Configuration conf.addResource(new Path( "/u/hadoop-1.0.2/conf/core-site.xml"));//Add a configuration resource conf.addResource(new Path( "/u/hadoop-1.0.2/conf/hdfs-site.xml"));//Add a configuration resource Add a configuration resourcefile - //file-path of resource to be added, the local filesystem is examined directly to find the resource, //without referring to the classpath FileSystem fileSystem = FileSystem.get(conf);//Returns the configured filesystem implementation. System.out.println(fileSystem.getUri());//Returns a URI whose scheme and authority identify this FileSystem Path file = new Path("demo.txt"); if (fileSystem.exists(file)) { System.out.println("File exists."); } else { // Writing to file FSDataOutputStream outStream = fileSystem.create(file);//Utility that wraps a OutputStream in a DataOutputStream, buffers output through a BufferedOutputStream and creates a checksum file. outStream.writeUTF("Welcome to HDFS Java API!!!"); outStream.close(); } // Reading from file FSDataInputStream inStream = fileSystem.open(file); String data = inStream.readUTF(); System.out.println(data); inStream.close(); // deleting the file. Non-recursively. // fileSystem.delete(file, false); fileSystem.close(); } } 如何执行该程序 bin/hadoop jar HDFSJavaAPI.jar HDFSJavaAPIDemo Hadoop文件系统中通过Hadoop Path对象来代表文件,可以将一条路径视为一个文件。如hdfs://localhost/user/tom/quangle.txt FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例。获取FileSystem实例有几种静态工厂方法: public static FileSystem get(Configuration conf) throws IOException public static FileSystem get(URI uri, Configuration conf) throws IOException public static FileSystem get(URI uri, Configuration conf, String user) throws IOException Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径(conf/core-site.xml)。第一个方法返回的是 默认文件系统(在conf/core-site.xml中指定。若没有指定,则使用默认的本地文件系统) 第二个通过给定的URI方案和权限来确定要使用的文件系统,如果给定的URI没有指定方案,则返回默认方案。 第三种是通过一个指定的用户获取filesystem 在某些例子中,如果想获取一个本地的filesystem实例,可以使用以下方法: public static LocalFileSystem getLocal(Configuration conf) throws IOException 当获取到Filesystem的实例后,我们调用open()函数来获取文件的输入流。 public FSDataInputStream open(Path f) throws IOException public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException 第一个方法默认的buffer size 为4k //3.FileSystem API读取数据 import org.apache.hadoop.conf.Configuration; import java.io.InputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.net.URI; public class FileSystemCat { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf);//Returns the FileSystem for this URI's scheme and authority. InputStream in = null; try { in = fs.open(new Path(uri));//Opens an FSDataInputStream at the indicated Path IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } } //实际上,FileSystem对象中的open()方法返回的是FSdataINputStream对象,这个类继承了java.io.DataInputStream接口的一个特殊类, 并支持随机访问,因此可以从流的任意位置读取数据。 Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置便宜量(getpos())的方法。 调用seek()来定位大于文件长度的位置将导致IOException异常。 package org.apache.hadoop.fs; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable,Closeable { // implementation elided } FSDataInputStream类也实现了PositionedReable接口,从一个指定偏移量处读取文件一部分: public interface PositionedReadable { public int read(long position, byte[] buffer, int offset, int length) throws IOException; public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; public void readFully(long position, byte[] buffer) throws IOException; } //4.FSDataInputStream API读取数据 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileSystemDoubleCat { public static void main(String[] args) { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); FSDataInputStream in = null; try{ in = fs.open(new Path(uri)); IOUtils.copyBytes(in,System.out,4096,false); in.seek(0);//go back to the start of the file IOUtils.copyBytes(in,System.out,4096,false); }finally{ IOUtils.closeStream(in); } } }; //5.写入数据 FileSystem类有一系列创建文件的方法。最简单的方法时给准备创建的文件制定一个path对象,然后返回一个 用户写入数据的数据流。 public FSDataOutputStream create(Path f) throws IOException 上述方法有多个重载对象,允许我们制定是否需要强制覆盖已有的文件,文件备份数量,写入文件时所用的缓冲区 大小,文件块大小及文件权限。 还有一个重载方法Progressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到应用: package org.apache.hadoop.util; public interface Progressable { public void progress(); } 另一种新建文件的方法,是使用append()方法在一个已有文件末尾追加数据 public FSDataOutputStream append(Path f) throws IOException 该追加操作允许一个writer打开文件后再访问该文件的最后偏移量追加数据。 每次Hadoop调用progress方法时,也就是每次将64kb数据包写入datanode管线后-- 打印一个时间点来显示整个过程。 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import java.net.URI; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.InputStream; import java.io.FileInputStream; import java.io.BufferedInputStream; import java.io.OutputStream; import org.apache.hadoop.util.Progressable public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = args[0]; String dst = args[1]; InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() { //Create an FSDataOutputStream at the indicated Path with write-progress reporting. public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, out, 4096, true);//Copies from one stream to another. true means close the inputstream and outputstream at then end. } } FSDataoutputStream对象 FileSystem 实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似, 它也有一个查询文件当前位置的方法: package org.apache.hadoop.fs; public class FSDataOutputStream extends DataOutputStream implements Syncable { public long getPos() throws IOException { // implementation elided } // implementation elided } 但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个 已打开的文件顺序写入,或在现有文件的末尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入, 因此写入时定位就没有任何意义。 目录 FileSystem实例提供了创建目录的方法 public boolean mkdirs(Path f) throws IOException 这个方法可以一次性创建所有必要还没有的父目录,就像java.io.file类的mkdirs()方法。如果目录都已创建成功, 则返回true。 通常,你不需要显式创建一个目录,因为调用create()写入文件时会自动创建父目录。 FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。 public class ShowFileStatusTest { private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing private FileSystem fs; @Before public void setUp() throws IOException { Configuration conf = new Configuration(); if (System.getProperty("test.build.data") == null) { System.setProperty("test.build.data", "/tmp"); } cluster = new MiniDFSCluster(conf, 1, true, null); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException { if (fs != null) { fs.close(); } if (cluster != null) { cluster.shutdown(); } } @Test(expected = FileNotFoundException.class) public void throwsFileNotFoundForNonExistentFile() throws IOException { fs.getFileStatus(new Path("no-such-file")); } @Test public void fileStatusForFile() throws IOException { Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(), is("/dir/file")); assertThat(stat.isDir(), is(false)); assertThat(stat.getLen(), is(7L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short) 1)); assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L)); assertThat(stat.getOwner(), is("tom")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rw-r--r--")); } @Test public void fileStatusForDirectory() throws IOException { Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().toUri().getPath(), is("/dir")); assertThat(stat.isDir(), is(true)); assertThat(stat.getLen(), is(0L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(), is((short) 0)); assertThat(stat.getBlockSize(), is(0L)); assertThat(stat.getOwner(), is("tom")); assertThat(stat.getGroup(), is("supergroup")); assertThat(stat.getPermission().toString(), is("rwxr-xr-x")); } } 列出文件 列出一个文件或者目录的信息,通过FileSystem的listStatus()方法。 public FileStatus[] listStatus(Path f) throws IOException public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException public FileStatus[] listStatus(Path[] files) throws IOException public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException 当传入的参数是一个文件时,它会简单转变为以数组形式长度为1的FileStatus对象。当传入的参数 是一个目录时,则返回0或多个FileStatus对象,表示此目录包含的文件和目录。 一个重载的方法时允许使用PathFilter来限制匹配的文件和目录。如果指定一组路径,其执行结果相当 与依次轮流传递每条路径并调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中。 但该方法更为方便,这从文件系统树的不同分支构建输入文件列表时,这是很有用的。 注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组。 % hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom public class ListStatus { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]); } FileStatus[] status = fs.listStatus(paths); Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths) { System.out.println(p); } } } 文件模式,在单个操作中处理一批文件,是一个常见要求。在一个表达式中使用通配符来匹配多个文件 是比较方便的,无需列举每个文件和目录来指定输入,该操作称为统配(globbing),Hadoop为执行统配提供了 两个FileSystem方法: public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException globStatus()方法返回与路径匹配的所有文件的FileStatus对象数组,并按照路径排序。PathFileter命令 可以作为可选项进一步对匹配进行限制。 Hadoop支持的统配符与Unix bash相同。 通配符及含义 * 匹配0或多个字符 ? 匹配单一字符 [ab] 匹配{a,b}集合中的一个字符 [ ab] 匹配非{a,b}集合中的一个字符 [a-b] 匹配一个在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b [ a-b] 匹配一个不在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b {a,b} 匹配包含a或b中的一个表达式 \c 转义字符 PathFilter对象 统配符并不总能精确地描述我们想要访问的文件集。FileSystem中的listStatus()和globStatus() 提供了可选的PathFilter对象,使我们可以通过编程方式控制通配符: public interface PathFilter { boolean accpet(Path path);//Tests whether or not the specified abstract pathname should be included in a pathname list. } public class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { return !path.toString().matches(regex); } } 删除数据 使用FileSystem的delete()方法可以永久性删除文件或目录 public boolean delete(Path f,boolean recursive) throws IOException 如果f是一个文件或空目录,那么recursive值就会被忽略。只有在recrusive值为true时,一个非空目录 及其内容才会被删除。(否则会抛出IOException异常。)
相关推荐
在Java编程环境中,结合Hadoop HDFS(Hadoop Distributed File System)进行大数据处理是一种常见的实践。本项目聚焦于从HDFS读取数据,执行GROUP BY操作,并将统计结果以条形图的形式展示出来,提供了直观的数据...
而Python作为一种灵活易用的编程语言,常常被用来与Hadoop HDFS进行交互,实现数据的读取、写入和下载操作。本篇文章将深入探讨如何使用Python来操作Hadoop HDFS。 首先,我们需要一个Python库,如`hdfs`或`pyarrow...
适合刚接触hadoop的学生或小白,内容包括HDFS shell操作及HDFS Java API编程 有图片有代码
在Hadoop生态系统中,Java API是开发者与HDFS(Hadoop Distributed File System)进行交互的主要方式。本资料主要涵盖了如何使用Eclipse环境进行Java开发,利用Hadoop的HDFS API来操作分布式文件系统。以下是对这些...
内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...
### Hadoop新旧API对比及应用 #### 一、引言 随着Hadoop生态系统的不断发展和完善,其核心组件之一——MapReduce也在不断演进。为了更好地支持分布式计算的需求,Hadoop引入了新的API(Application Programming ...
在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...
我们将探讨Hadoop的环境配置、HDFS API的使用以及具体创建目录的步骤。 首先,理解Hadoop的环境配置至关重要。在进行Java编程之前,你需要确保已经在本地或集群上正确安装了Hadoop,并配置好了`hadoop-env.sh`和`...
综上所述,Hadoop HDFS应用的知识点涵盖了HDFS的分布式文件系统概念、支持的多种文件系统类型及URI模式、权限及用户体系、命令行操作接口以及JAVA编程接口等。掌握这些知识点对于实现高效的大数据处理和分析具有重要...
Hadoop2 API帮助文档是针对Hadoop 2.x版本的核心组件提供的一份详细参考资料,旨在帮助开发者高效地利用Hadoop框架进行大数据处理和分析。Hadoop是一个开源的分布式计算框架,由Apache软件基金会维护,其核心包括...
总的来说,本实验旨在使学习者熟悉Hadoop环境下的Java编程,理解如何调用HDFS API进行文件操作,这是一项重要的技能,因为在大数据处理中,HDFS是数据存储的核心组件。通过这样的实践,学生将能够更好地理解和应用大...
Hadoop API中文说明文档是针对Apache Hadoop框架的开发者指南,它详细解释了如何使用Hadoop的编程接口来处理大规模数据。Hadoop是开源的分布式计算框架,它允许在廉价硬件集群上存储和处理海量数据。这个文档对于...
HDFS 文件系统基本文件命令、编程读写 HDFS HDFS(Hadoop Distributed File System)是一种分布式文件系统,用于存储和管理大规模数据。它是 Hadoop 云计算平台的核心组件之一,提供了高效、可靠、可扩展的数据存储...
3. **熟悉Java API**:除了Shell命令外,Hadoop还提供了丰富的Java API来操作HDFS。熟悉这些API有助于开发者在实际项目中更加灵活地使用HDFS。 #### 实验过程 ##### Shell命令实践 - **创建与查看文件**:使用`...
Hadoop中文版API.chm文档将提供详细的函数解释和示例,帮助开发者快速掌握Hadoop的编程模型。 此外,Hadoop生态还包括其他重要组件,如Hive(数据仓库工具)、Pig(数据处理语言)、Spark(快速数据处理引擎)等,...
这些JAR包包含了Hadoop的相关API和实现,使得用户可以通过编程方式操作HDFS。 标题"hadop之hdfs中所依赖jar"指出的关键点是,为了成功地进行HDFS操作,必须确保正确地引入了必要的JAR包。这里提到的"Hadoop之上传...
Hadoop是一个开源的分布式计算框架,它允许用户通过简单易用的编程模型处理大型数据集,而HDFS(Hadoop Distributed File System)是其核心组件,用于存储和处理大数据。 首先,Hadoop是一个由Apache软件基金会开发...
该项目是一个大数据课程的期末项目,主要利用了Spark、Hadoop HDFS和MongoDB等技术,通过Scala编程语言来实现电影推荐系统。这个系统是基于大数据处理的,因此涉及到的知识点非常广泛,涵盖了分布式计算、数据存储、...
HDFS Java API 是 Hadoop 中的一部分,提供了一个 Java 编程接口来访问和操作 HDFS 中的文件和目录。该 API 提供了多种方法来操作文件和目录,包括创建、删除、读取和写入文件,列出目录中的文件和子目录等。 二、...
1. **HDFS API**:包括`org.apache.hadoop.hdfs`包,提供了对HDFS的读写操作。例如,`DFSClient`类是与HDFS通信的主要客户端,`DFSInputStream`和`DFSOutputStream`分别用于文件的读写。此外,`FileSystem`接口提供...