HDFS是Hadoop生态系统的根基,也是Hadoop生态系统中的重要一员,大部分时候,我们都会使用Linux shell命令来管理HDFS,包括一些文件的创建,删除,修改,上传等等,因为使用shell命令操作HDFS的方式,相对比较简单,方便,但是有时候,我们也需要通过编程的方式来实现对文件系统的管理。
比如有如下的一个小需求,要求我们实现读取HDFS某个文件夹下所有日志,经过加工处理后在写入到HDFS上,或者存进Hbase里,或者存进其他一些存储系统。这时候使用shell的方式就有点麻烦了,所以这时候我们就可以使用编程的方式来完成这件事了 。
下面给出代码,以供参考:
001 |
package com.java.api.hdfs;
|
002 |
003 |
import java.io.BufferedReader;
|
004 |
import java.io.IOException;
|
005 |
import java.io.InputStream;
|
006 |
import java.io.InputStreamReader;
|
007 |
008 |
import org.apache.hadoop.conf.Configuration;
|
009 |
import org.apache.hadoop.fs.FileStatus;
|
010 |
import org.apache.hadoop.fs.FileSystem;
|
011 |
import org.apache.hadoop.fs.Path;
|
012 |
013 |
014 |
/** |
015 |
* @author 三劫散仙
|
016 |
* Java API操作HDFS
|
017 |
* 工具类
|
018 |
*
|
019 |
* **/
|
020 |
public class OperaHDFS {
|
021 |
|
022 |
|
023 |
public static void main(String[] args) throws Exception {
|
024 |
|
025 |
//System.out.println("aaa");
|
026 |
// uploadFile();
|
027 |
//createFileOnHDFS();
|
028 |
//deleteFileOnHDFS();
|
029 |
//createDirectoryOnHDFS();
|
030 |
//deleteDirectoryOnHDFS();
|
031 |
// renameFileOrDirectoryOnHDFS();
|
032 |
//downloadFileorDirectoryOnHDFS();
|
033 |
readHDFSListAll();
|
034 |
}
|
035 |
|
036 |
|
037 |
|
038 |
|
039 |
/***
|
040 |
* 加载配置文件
|
041 |
* **/
|
042 |
static Configuration conf= new Configuration();
|
043 |
|
044 |
|
045 |
|
046 |
/**
|
047 |
* 重名名一个文件夹或者文件
|
048 |
*
|
049 |
* **/
|
050 |
public static void renameFileOrDirectoryOnHDFS() throws Exception{
|
051 |
|
052 |
FileSystem fs=FileSystem.get(conf);
|
053 |
Path p1 = new Path( "hdfs://10.2.143.5:9090/root/myfile/my.txt" );
|
054 |
Path p2 = new Path( "hdfs://10.2.143.5:9090/root/myfile/my2.txt" );
|
055 |
fs.rename(p1, p2);
|
056 |
|
057 |
fs.close(); //释放资源
|
058 |
System.out.println( "重命名文件夹或文件成功....." );
|
059 |
|
060 |
}
|
061 |
|
062 |
|
063 |
/***
|
064 |
*
|
065 |
* 读取HDFS某个文件夹的所有
|
066 |
* 文件,并打印
|
067 |
*
|
068 |
* **/
|
069 |
public static void readHDFSListAll() throws Exception{
|
070 |
|
071 |
//流读入和写入
|
072 |
InputStream in= null ;
|
073 |
//获取HDFS的conf
|
074 |
//读取HDFS上的文件系统
|
075 |
FileSystem hdfs=FileSystem.get(conf);
|
076 |
//使用缓冲流,进行按行读取的功能
|
077 |
BufferedReader buff= null ;
|
078 |
//获取日志文件的根目录
|
079 |
Path listf = new Path( "hdfs://10.2.143.5:9090/root/myfile/" );
|
080 |
//获取根目录下的所有2级子文件目录
|
081 |
FileStatus stats[]=hdfs.listStatus(listf);
|
082 |
//自定义j,方便查看插入信息
|
083 |
int j= 0 ;
|
084 |
for ( int i = 0 ; i < stats.length; i++){
|
085 |
//获取子目录下的文件路径
|
086 |
FileStatus temp[]=hdfs.listStatus( new Path(stats[i].getPath().toString()));
|
087 |
for ( int k = 0 ; k < temp.length;k++){
|
088 |
System.out.println( "文件路径名:" +temp[k].getPath().toString());
|
089 |
//获取Path
|
090 |
Path p= new Path(temp[k].getPath().toString());
|
091 |
//打开文件流
|
092 |
in=hdfs.open(p);
|
093 |
//BufferedReader包装一个流
|
094 |
buff= new BufferedReader( new InputStreamReader(in));
|
095 |
String str= null ;
|
096 |
while ((str=buff.readLine())!= null ){
|
097 |
|
098 |
System.out.println(str);
|
099 |
}
|
100 |
buff.close();
|
101 |
in.close();
|
102 |
|
103 |
|
104 |
}
|
105 |
|
106 |
|
107 |
|
108 |
|
109 |
}
|
110 |
|
111 |
hdfs.close();
|
112 |
|
113 |
114 |
}
|
115 |
/**
|
116 |
* 从HDFS上下载文件或文件夹到本地
|
117 |
*
|
118 |
* **/
|
119 |
public static void downloadFileorDirectoryOnHDFS() throws Exception{
|
120 |
|
121 |
FileSystem fs=FileSystem.get(conf);
|
122 |
Path p1 = new Path( "hdfs://10.2.143.5:9090/root/myfile//my2.txt" );
|
123 |
Path p2 = new Path( "D://7" );
|
124 |
fs.copyToLocalFile(p1, p2);
|
125 |
fs.close(); //释放资源
|
126 |
System.out.println( "下载文件夹或文件成功....." );
|
127 |
|
128 |
}
|
129 |
/**
|
130 |
* 在HDFS上创建一个文件夹
|
131 |
*
|
132 |
* **/
|
133 |
public static void createDirectoryOnHDFS() throws Exception{
|
134 |
|
135 |
FileSystem fs=FileSystem.get(conf);
|
136 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/myfile" );
|
137 |
fs.mkdirs(p);
|
138 |
fs.close(); //释放资源
|
139 |
System.out.println( "创建文件夹成功....." );
|
140 |
|
141 |
}
|
142 |
|
143 |
/**
|
144 |
* 在HDFS上删除一个文件夹
|
145 |
*
|
146 |
* **/
|
147 |
public static void deleteDirectoryOnHDFS() throws Exception{
|
148 |
|
149 |
FileSystem fs=FileSystem.get(conf);
|
150 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/myfile" );
|
151 |
fs.deleteOnExit(p);
|
152 |
fs.close(); //释放资源
|
153 |
System.out.println( "删除文件夹成功....." );
|
154 |
|
155 |
}
|
156 |
/**
|
157 |
* 在HDFS上创建一个文件
|
158 |
*
|
159 |
* **/
|
160 |
public static void createFileOnHDFS() throws Exception{
|
161 |
|
162 |
FileSystem fs=FileSystem.get(conf);
|
163 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/abc.txt" );
|
164 |
fs.createNewFile(p);
|
165 |
//fs.create(p);
|
166 |
fs.close(); //释放资源
|
167 |
System.out.println( "创建文件成功....." );
|
168 |
|
169 |
}
|
170 |
|
171 |
/**
|
172 |
* 在HDFS上删除一个文件
|
173 |
*
|
174 |
* **/
|
175 |
public static void deleteFileOnHDFS() throws Exception{
|
176 |
|
177 |
FileSystem fs=FileSystem.get(conf);
|
178 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/abc.txt" );
|
179 |
fs.deleteOnExit(p);
|
180 |
fs.close(); //释放资源
|
181 |
System.out.println( "删除成功....." );
|
182 |
|
183 |
}
|
184 |
|
185 |
|
186 |
/***
|
187 |
* 上传本地文件到
|
188 |
* HDFS上
|
189 |
*
|
190 |
* **/
|
191 |
public static void uploadFile() throws Exception{
|
192 |
//加载默认配置
|
193 |
FileSystem fs=FileSystem.get(conf);
|
194 |
//本地文件
|
195 |
Path src = new Path( "D:\\6" );
|
196 |
//HDFS为止
|
197 |
Path dst = new Path( "hdfs://10.2.143.5:9090/root/" );
|
198 |
try {
|
199 |
fs.copyFromLocalFile(src, dst);
|
200 |
} catch (IOException e) {
|
201 |
// TODO Auto-generated catch block
|
202 |
e.printStackTrace();
|
203 |
}
|
204 |
System.out.println( "上传成功........" );
|
205 |
|
206 |
fs.close(); //释放资源
|
207 |
|
208 |
|
209 |
}
|
210 |
211 |
} |
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://h6:9000");
FileSystem fileSystem = FileSystem.get(conf);
1.创建文件夹:
判断是否存在
不存在再创建
if (!fileSystem.exists(new Path("/weir01"))) {
fileSystem.mkdirs(new Path("/weir01"));
}
2.创建文件:
in - InputStream to read from 原文件路径
out - OutputStream to write to hdfs 目录
the size of the buffer 缓冲大小
close - whether or not close the InputStream and OutputStream at the end. The streams are closed in the finally clause. 是否关闭流
FSDataOutputStream out =fileSystem.create(new Path("/d1"));
FileInputStream in = new FileInputStream("f:/hadoop.zip");
IOUtils.copyBytes(in, out, 1024, true);
3上传本地文件
delSrc - whether to delete the src是否删除源文件
overwrite - whether to overwrite an existing file是否覆盖已存在的文件
srcs - array of paths which are source 可以上传多个文件数组方式
dst – path 目标路径
fileSystem.copyFromLocalFile(src, dst);
fileSystem.copyFromLocalFile(delSrc, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, srcs, dst);
4 重命名HDFS文件
fileSystem.rename(src, dst);
5.删除文件
True 表示递归删除
fileSystem.delete(new Path("/d1"), true);
6.查看目录及文件信息
FileStatus[] fs = fileSystem.listStatus(new Path("/"));
for (FileStatus f : fs) {
String dir = f.isDirectory() ? "目录":"文件";
String name = f.getPath().getName();
String path = f.getPath().toString();
System.out.println(dir+"----"+name+" path:"+path);
System.out.println(f.getAccessTime());
System.out.println(f.getBlockSize());
System.out.println(f.getGroup());
System.out.println(f.getLen());
System.out.println(f.getModificationTime());
System.out.println(f.getOwner());
System.out.println(f.getPermission());
System.out.println(f.getReplication());
System.out.println(f.getSymlink());
}
7.查找某个文件在HDFS集群的位置
FileStatus fs = fileSystem.getFileStatus(new Path("/data"));
BlockLocation[] bls=fileSystem.getFileBlockLocations(fs, 0, fs.getLen());
for (int i = 0,h=bls.length; i < h; i++) {
String[] hosts= bls[i].getHosts();
System.out.println("block_"+i+"_location: "+hosts[0]);
}
8.获取HDFS集群上所有节点名称信息
DistributedFileSystem hdfs = (DistributedFileSystem) fileSystem;
DatanodeInfo[] dns=hdfs.getDataNodeStats();
for (int i = 0,h=dns.length; i < h; i++) {
System.out.println("datanode_"+i+"_name: "+dns[i].getHostName());
}
http://www.xuehuile.com/blog/db693eb859b64c39b945d7ae333a1343.html
http://weir2009.iteye.com/blog/2082445
相关推荐
在Java程序中操作HDFS文件主要依赖于`org.apache.hadoop.fs.FileSystem`类,该类提供了许多方法用于执行文件系统操作,如创建文件、删除文件、读写文件等。 ##### 1. 创建文件系统实例 ```java Configuration conf ...
本文将详细讲解如何使用Java API来操作HDFS,特别是创建目录的功能。我们将探讨Hadoop的环境配置、HDFS API的使用以及具体创建目录的步骤。 首先,理解Hadoop的环境配置至关重要。在进行Java编程之前,你需要确保...
HDFS 提供了一个 API,允许开发者使用 Java 语言编写程序来操作 HDFS 文件系统。该 API 包括了 open、read、write、close 等方法,用于读写 HDFS 文件。 使用 HDFS API 可以实现以下操作: 1. 上传本地文件:使用 ...
在Java API操作HDFS(Hadoop Distributed File System)的过程中,主要涉及了以下几个核心知识点: 1. **依赖管理**: 在进行HDFS操作时,首先需要在项目中引入Apache Hadoop的相关依赖。在Maven项目中,这通常...
通过上述Java API,开发者可以构建出复杂的HDFS操作逻辑,实现数据的高效读写、管理和处理。在实际应用中,还需要考虑性能优化、容错机制以及与其他Hadoop生态组件的集成。例如,结合MapReduce进行大规模数据处理,...
在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...
在Hadoop生态系统中,Java API是开发者常用的一种与HDFS(Hadoop Distributed File System)进行...通过这些操作,开发者可以在Java程序中实现对HDFS的读写、管理和操作,从而利用Hadoop的强大功能处理大规模的数据。
而HDFS Java API则是开发人员与HDFS进行交互的主要接口,使得Java应用程序能够便捷地读写HDFS上的文件。本文将深入探讨HDFS Java API的原理、使用方法及其在实际应用中的最佳实践。 一、HDFS基础 1.1 HDFS架构:...
Java作为广泛使用的编程语言,提供了丰富的API来操作HDFS,使得开发者能够方便地进行文件的读取、写入、复制、移动等操作。本文将详细讲解如何使用Java对HDFS进行文件操作,并介绍两个相关的项目示例。 首先,Java...
Java API是与HDFS交互的主要方式,它提供了一系列的类和方法,使得开发者能够轻松地完成文件的读写操作。主要涉及的类有`FileSystem`和`DFSClient`。 三、HDFS文件上传 1. **配置HDFS环境**:首先需要配置Hadoop的...
实验二:“熟悉常用的HDFS操作”旨在帮助学习者深入理解Hadoop分布式文件系统(HDFS)在大数据处理中的核心地位,以及如何通过Shell命令和Java API进行高效操作。HDFS在Hadoop架构中扮演着存储大数据的核心角色,为...
Java-HDFS 客户端是 HDFS 的 Java 实现,提供了一个 Java API 来访问 HDFS。Java-HDFS 客户端可以用来开发基于 HDFS 的应用程序。 * RPCClient:RPCClient 是 Java-HDFS 客户端的主要组件,负责与 NameNode 和 ...
HDFS Java API是开发人员与HDFS进行通信的主要接口,它提供了丰富的类和方法,使得在Java程序中读写HDFS文件变得简单易行。 首先,我们需要导入Hadoop的相关库。在Java项目中,确保你的`pom.xml`或`build.gradle`...
开发时,你需要导入这些库,并通过Hadoop的API来实现对HDFS的操作,例如打开一个`FileSystem`实例,创建或打开一个`FSDataInputStream`或`FSDataOutputStream`,以及执行文件的读写操作。 总的来说,这些依赖包构建...
本项目“Java-Operate-HDFS.zip_hdfs”提供了使用Java API操作HDFS的示例,帮助开发者理解如何在Java应用程序中与HDFS进行交互。下面将详细介绍HDFS的基本概念以及如何通过Java进行操作。 1. HDFS简介: HDFS是...
通过上述API,开发者可以实现对HDFS的全面控制,进行文件的读写、目录管理,以及利用HDFS的高级特性进行大数据处理。不过,需要注意的是,所有的操作都应该确保在finally块中关闭流和文件系统,以防止资源泄漏。 ...
标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
HDFS Java API允许开发者通过编程方式对HDFS进行高级操作,如创建、读取、修改和删除文件等。 五、HDFS Java API编程 通过Java API,学生可以实现上传本地文件到HDFS的功能。例如,使用FSDataInputStream和...