`
ganliang13
  • 浏览: 253322 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java api 操作hdfs文件

阅读更多
package com.bfd.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseTest  {
 
    public static Configuration configuration;

 /**
  * a.配置host, 例如:bfdbjc2 192.168.11.72 
  * b.参考:hbase-site.xml: 
  * <property> <name>hbase.zookeeper.quorum</name>
  *     <value>bfdbjc2,bfdbjc3,bfdbjc4</value>
  * </property> 
  * <property> <name>hbase.rootdir</name> 
  *      <value>hdfs://bfdbjc1:12000/hbase</value>
  * </property>
  */
     static {  
         configuration = HBaseConfiguration.create(); 
      //String ZOOKEEPER_QUORAM =  "zk-1:2181,zk-2:2181,zk-3:2181,zk-4:2181,zk-5:2181,zk-6:2181";

         configuration.set("hbase.zookeeper.quorum","bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181");  
         configuration.set("hbase.rootdir", "hdfs://bfdbjc1:12000/hbase");  
     }  
     
     
     public static void main(String[] args) throws Exception {  
        // createTable("gangliang13");   
         //insertData("gangliang13");
      //deleteRow("gangliang13", "112");
         //queryAllLimit("gangliang13",2); 
      //queryByRowKey("GidCross","0100020a000056c100004e374e267993");
      //queryByColumn("gangliang13","aaa1");
      //testLikeQuery("gangliang13","11");
      //queryByManyColumn("gangliang13");
      //queryAll("gangliang13");
      //System.out.println("input GidCross rawkey:"+args[0]);
      //queryByRowKey("GidCross", "0100020a000056c100004e374e267993");
      testScanByTimeStamp();
     }  
     
     /**
      * 如果存在要创建的表,那么先删除,再创建
      * @param tableName
      */
     public static void createTable(String tableName) {  
         System.out.println("start create table ......");  
         try {  
             HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);  
             if (hBaseAdmin.tableExists(tableName)) {  
                 hBaseAdmin.disableTable(tableName);  
                 hBaseAdmin.deleteTable(tableName);  
                 System.out.println(tableName + " is exist,detele....");  
             }  
             HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  
             tableDescriptor.addFamily(new HColumnDescriptor("name"));  
             tableDescriptor.addFamily(new HColumnDescriptor("age"));  
             tableDescriptor.addFamily(new HColumnDescriptor("sex"));  
             hBaseAdmin.createTable(tableDescriptor);  
         } catch (MasterNotRunningException e) {  
             e.printStackTrace();  
         } catch (ZooKeeperConnectionException e) {  
             e.printStackTrace();  
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
         System.out.println("end create table ......");  
     }  
     
     /**
      * 插入数据
      * @param tableName
      */
     public static void insertData(String tableName) {  
         System.out.println("start insert data ......");  
         HTablePool pool = new HTablePool(configuration, 1000);  
         HTableInterface table =  pool.getTable(tableName);  
         Put put = new Put("111".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
         put.add("name".getBytes(), null, "aaa1".getBytes());// 本行数据的第一列  
         put.add("age".getBytes(), null, "bbb1".getBytes());// 本行数据的第三列  
         put.add("sex".getBytes(), null, "ccc1".getBytes());// 本行数据的第三列  
         
         Put put2 = new Put("222".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
         put2.add("name".getBytes(), null, "aaa2".getBytes());// 本行数据的第一列  
         put2.add("name".getBytes(), "nickname".getBytes(), "aaabbbbbnick".getBytes());
         put2.add("age".getBytes(), null, "bbb2".getBytes());// 本行数据的第三列  
         put2.add("sex".getBytes(), null, "ccc2".getBytes());// 本行数据的第三列  
         try {  
             table.put(put);  
             table.put(put2);
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
         System.out.println("end insert data ......");  
     }  
     
     /**
      * 
      * @param tableName 取前面limit条
      */
     public static void queryAllLimit(String tableName,int limit) {  
         HTablePool pool = new HTablePool(configuration, 1000); 
         HTableInterface table = pool.getTable(tableName);
         try {  
             Scan scan = new Scan();  
             scan.setCaching(1);  
             Filter filter = new PageFilter(limit); 
             scan.setFilter(filter);  
             ResultScanner scanner = table.getScanner(scan);// 执行扫描查找 int num = 0;  
             Iterator<Result> res = scanner.iterator();// 返回查询遍历器  
             while (res.hasNext()) {  
                 Result result = res.next();  
                 table.setWriteBufferSize(1024*1024*1);
                 KeyValue[] kv = result.raw();
                 for (int i = 0; i < kv.length; i++) {
                  System.out.print(new String(kv[i].getRow()) + "  ");
                     System.out.print(new String(kv[i].getFamily()) + ":");
                     System.out.print(new String(kv[i].getQualifier()) + "  ");
                     System.out.print(kv[i].getTimestamp() + "  ");
                     System.out.println(new String(kv[i].getValue()));
                 }
             }  
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
     }  
     
     public static void testScanByTimeStamp() throws IOException{
             //Scan类常用方法说明
             //指定需要的family或column ,如果没有调用任何addFamily或Column,会返回所有的columns; 
             // scan.addFamily(); 
             // scan.addColumn();
             // scan.setMaxVersions(); //指定最大的版本个数。如果不带任何参数调用setMaxVersions,表示取所有的版本。如果不掉用setMaxVersions,只会取到最新的版本.
             // scan.setTimeRange(); //指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取.
             // scan.setTimeStamp(); //指定时间戳
             // scan.setFilter(); //指定Filter来过滤掉不需要的信息
             // scan.setStartRow(); //指定开始的行。如果不调用,则从表头开始;
             // scan.setStopRow(); //指定结束的行(不含此行);
             // scan.setBatch(); //指定最多返回的Cell数目。用于防止一行中有过多的数据,导致OutofMemory错误。
             
             //过滤器
             //1、FilterList代表一个过滤器列表
             //FilterList.Operator.MUST_PASS_ALL -->and
             //FilterList.Operator.MUST_PASS_ONE -->or
             //eg、FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
             //2、SingleColumnValueFilter
             //3、ColumnPrefixFilter用于指定列名前缀值相等
             //4、MultipleColumnPrefixFilter和ColumnPrefixFilter行为差不多,但可以指定多个前缀。
             //5、QualifierFilter是基于列名的过滤器。
             //6、RowFilter
             //7、RegexStringComparator是支持正则表达式的比较器。
             //8、SubstringComparator用于检测一个子串是否存在于值中,大小写不敏感。
       
          HTablePool pool = new HTablePool(configuration, 1000); 
          HTableInterface table = pool.getTable("ganglia2");
             Scan scan = new Scan(); 

             scan.setTimeStamp(NumberUtils.toLong("1367985291545"));
             //scan.setTimeRange(NumberUtils.toLong("1370336286283"), NumberUtils.toLong("1370336337163"));
             //scan.setStartRow(Bytes.toBytes("quanzhou"));
             //scan.setStopRow(Bytes.toBytes("xiamen"));
             //scan.addFamily(Bytes.toBytes("info")); 
             //scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"));
             
             //查询列镞为info,列id值为1的记录
             //方法一(单个查询)
             // Filter filter = new SingleColumnValueFilter(
             //         Bytes.toBytes("info"), Bytes.toBytes("id"), CompareOp.EQUAL, Bytes.toBytes("1")); 
             // scan.setFilter(filter);
             
             //方法二(组合查询)
             //FilterList filterList=new FilterList();
             //Filter filter = new SingleColumnValueFilter(
             //    Bytes.toBytes("info"), Bytes.toBytes("id"), CompareOp.EQUAL, Bytes.toBytes("1"));
             //filterList.addFilter(filter);
             //scan.setFilter(filterList);
             
             ResultScanner rs = table.getScanner(scan);
             
             for (Result r : rs) {
                 for (KeyValue kv : r.raw()) {
                     System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.", 
                             Bytes.toString(kv.getRow()), 
                             Bytes.toString(kv.getFamily()), 
                             Bytes.toString(kv.getQualifier()), 
                             Bytes.toString(kv.getValue()),
                             kv.getTimestamp()));
                 }
             }
             
             rs.close();
     }
     
     /**
      * 对行key进行like查询
      * @param table
      * @param likeQuery
      * @throws Exception
      */
     public static void testLikeQuery(String table,String likeQuery) throws Exception {  
         Scan scan = new Scan();  
         RegexStringComparator comp = new RegexStringComparator(likeQuery);  
         RowFilter filter = new RowFilter(CompareOp.EQUAL, comp);  
         scan.setFilter(filter);  
         scan.setCaching(200);  
         scan.setCacheBlocks(false);  
         HTable hTable = new HTable(configuration, table);  
         ResultScanner scanner = hTable.getScanner(scan);  
         for (Result r : scanner) {  
          KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
                 System.out.print(new String(kv[i].getRow()) + "  ");
                    System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                    System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                } 
            }  
     }  
     
     /**
      * 查询表所有行
      * @param tableName
      */
     public static void queryAll(String tableName) {  
         HTablePool pool = new HTablePool(configuration, 1000); 
         HTableInterface table = pool.getTable(tableName);
         try {  
             ResultScanner rs = table.getScanner(new Scan());  
             for (Result r : rs) {  
              KeyValue[] kv = r.raw();
                 for (int i = 0; i < kv.length; i++) {
                  System.out.print(new String(kv[i].getRow()) + "  ");
                     System.out.print(new String(kv[i].getFamily()) + ":");
                     System.out.print(new String(kv[i].getQualifier()) + "  ");
                     System.out.print(kv[i].getTimestamp() + "  ");
                     System.out.println(new String(kv[i].getValue()));
                 } 
             }  
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
     }  
     
     /**
      * 根据行记录值删除
      * @param tablename
      * @param rowkey
      */
     public static void deleteRow(String tablename, String rowkey)  {  
         try {  
             HTable table = new HTable(configuration, tablename);  
             List<Delete> list = new ArrayList<Delete>();  
             Delete d1 = new Delete(rowkey.getBytes());  
             list.add(d1);  
             table.delete(list);  
             System.out.println("删除行成功!");  
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
     }  
     
     /**
      * 根据行记录索引
      * @param tableName
      * @param row
      * @throws IOException
      */
     public static void queryByRowKey(String tableName,String row) throws IOException {  
      HTable table = new HTable(configuration, tableName); 
      System.err.println(table.getRegionLocation(row.getBytes()));
         try {  
             Get scan = new Get(row.getBytes());// 根据rowkey查询  
             Result r = table.get(scan);  
          KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
                // System.out.print(new String(kv[i].getRow()) + "  ");
                    //System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                   // System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                }
         } catch (IOException e) {  
             e.printStackTrace();  
         }  
     }  
     
     /** 
      * 按列查询,查询多条记录 
      * @param tableName 
      */  
     public static void queryByColumn(String tableName,String columnValue) {  
         try {  
             HTable table = new HTable(configuration,tableName);  
             Filter filter = new SingleColumnValueFilter(Bytes.toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes(columnValue)); // 当列column1的值为aaa时进行查询  
             Scan s = new Scan();  
             s.setFilter(filter);  
             ResultScanner rs = table.getScanner(s);  
             for (Result r : rs) {  
              KeyValue[] kv = r.raw();
                 for (int i = 0; i < kv.length; i++) {
                  System.out.print(new String(kv[i].getRow()) + "  ");
                     System.out.print(new String(kv[i].getFamily()) + ":");
                     System.out.print(new String(kv[i].getQualifier()) + "  ");
                     System.out.print(kv[i].getTimestamp() + "  ");
                     System.out.println(new String(kv[i].getValue()));
                 }
             }  
         } catch (Exception e) {  
             e.printStackTrace();  
         }  
     }  
     
     /** 
      * 按多列查询,查询多条记录 
      * @param tableName 
      */  
     public static void queryByManyColumn(String tableName) {  
       try {  
              HTable table =  new HTable(configuration,tableName); 
              List<Filter> filters = new ArrayList<Filter>();  
    
              Filter filter1 = new SingleColumnValueFilter(Bytes  
                      .toBytes("age"), null, CompareOp.EQUAL, Bytes.toBytes("bbb1"));  
              filters.add(filter1);  
    
              Filter filter2 = new SingleColumnValueFilter(Bytes  
                      .toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes("aaa1")); 
              filters.add(filter2);  
    
              FilterList filterList1 = new FilterList(filters);  
    
              Scan scan = new Scan();  
              scan.setFilter(filterList1);  
              ResultScanner rs = table.getScanner(scan);  
              for (Result r : rs) {  
              KeyValue[] kv = r.raw();
                    for (int i = 0; i < kv.length; i++) {
                     System.out.print(new String(kv[i].getRow()) + "  ");
                        System.out.print(new String(kv[i].getFamily()) + ":");
                        System.out.print(new String(kv[i].getQualifier()) + "  ");
                        System.out.print(kv[i].getTimestamp() + "  ");
                        System.out.println(new String(kv[i].getValue()));
                    }  
              }  
              rs.close();  
    
          } catch (Exception e) {  
              e.printStackTrace();  
          }  
     }  
}

 

 

分享到:
评论

相关推荐

    利用javaAPI访问HDFS的文件

    ### 使用Java API访问HDFS文件的关键知识点 #### 一、HDFS概述 Hadoop Distributed File System(HDFS)是Apache Hadoop项目的核心组件之一,它为海量数据提供了高吞吐量的数据访问,非常适合大规模数据集的应用...

    大数据技术基础实验报告-调用Java API实现HDFS操作.doc

    总的来说,本实验旨在使学习者熟悉Hadoop环境下的Java编程,理解如何调用HDFS API进行文件操作,这是一项重要的技能,因为在大数据处理中,HDFS是数据存储的核心组件。通过这样的实践,学生将能够更好地理解和应用大...

    Java API操作HDFS示例代码.rar

    Windows下配置Hadoop的Java开发环境以及用Java API操作HDFS: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119379055

    elcipse java hadoop操作hdfs的api

    2. **FileSystem对象**:`FileSystem`是HDFS API的入口点,通过它你可以执行所有文件操作。首先需要实例化一个`FileSystem`对象,通常使用`FileSystem.get(conf)`方法,其中`conf`是`Configuration`对象,配置了...

    02--HDFS Java API操作.docx

    Java API 是 HDFS 的一个编程接口,允许开发者使用 Java 语言来操作 HDFS 中的文件和目录。本文将介绍如何使用 HDFS Java API 实现文件下载和上传操作。 一、HDFS Java API 概述 HDFS Java API 是 Hadoop 中的一...

    java操作Hadoop源码之HDFS Java API操作-创建目录

    本文将详细讲解如何使用Java API来操作HDFS,特别是创建目录的功能。我们将探讨Hadoop的环境配置、HDFS API的使用以及具体创建目录的步骤。 首先,理解Hadoop的环境配置至关重要。在进行Java编程之前,你需要确保...

    使用Java Api操作HDFS过程详解

    Java 应用程序接口(API)是 Java 语言提供的一组编程接口,用于访问和操作 Hadoop 分布式文件系统(HDFS)。本文将详细介绍使用 Java API 操作 HDFS 的过程,并提供了一个示例代码,展示如何使用 Java API 读取和...

    使用Java API操作HDFS分布式文件系统

    4. **文件操作** - **创建文件**:使用`FileSystem.create(path)`创建新文件,需要指定文件路径。 - **打开文件**:使用`FileSystem.open(path)`打开已存在文件,返回`FSDataInputStream`,可以进行读取操作。 - ...

    hdfs-java-api

    * 高性能:HDFS Java API 提供了高性能的文件操作,能够满足大规模数据处理的需求。 * 高可靠性:HDFS Java API 提供了高可靠性的文件操作,能够确保数据的安全。 * 高灵活性:HDFS Java API 提供了高灵活性的文件...

    java操作Hadoop源码之HDFS Java API操作-上传文件

    这里的`false`参数表示不覆盖已存在的HDFS文件。如果希望覆盖,可以传入`true`。 4. **关闭资源**: 在完成文件操作后,记得关闭`FileSystem`实例,释放系统资源: ```java fs.close(); ``` 5. **异常处理**...

    实验二:熟悉常用的HDFS操作

    实验二:“熟悉常用的HDFS操作”旨在帮助学习者深入理解Hadoop分布式文件系统(HDFS)在大数据处理中的核心地位,以及如何通过Shell命令和Java API进行高效操作。HDFS在Hadoop架构中扮演着存储大数据的核心角色,为...

    Javaapi操作HDFS综合练习.pdf

    总之,Java API为开发者提供了一套全面的工具,可以方便地在HDFS上执行各种文件操作。通过这些API,开发人员能够构建高效的数据处理应用,利用Hadoop的分布式存储能力。在实际项目中,理解并熟练掌握这些操作对于...

    java对大数据HDFS文件操作

    6. **项目示例**:提供的两个项目"java对HDFS文件操作jar包版"和"java对HDFS文件操作maven版.zip.zip"可能包含了上述操作的完整实现。`jar包版`可能是一个已经编译好的可执行程序,可以直接运行。而`maven版`则是一...

    HDFS文件系统基本文件命令、编程读写HDFS

    HDFS 提供了一个 API,允许开发者使用 Java 语言编写程序来操作 HDFS 文件系统。该 API 包括了 open、read、write、close 等方法,用于读写 HDFS 文件。 使用 HDFS API 可以实现以下操作: 1. 上传本地文件:使用 ...

    HDFSJavaAPI.tar.gz_HDFS JAVA API_hdfs

    2. **文件操作**: - **创建文件**:调用`FileSystem`对象的`create()`方法,传入`Path`对象表示的文件路径,以及可选的文件权限、缓冲区大小等参数。 - **读取文件**:使用`FileSystem`的`open()`方法打开文件,...

    java管理hdfs文件和文件夹项目hadoop2.4

    本文将深入探讨使用Java管理HDFS文件和文件夹的关键知识点,特别是基于Hadoop 2.4版本。 首先,理解HDFS的基础概念至关重要。HDFS是一种分布式文件系统,设计用于跨大量廉价硬件节点存储和处理大数据。它遵循主从...

    java 从hadoop hdfs读取文件 进行groupby并显示为条形图

    Java API提供了访问HDFS的接口,例如`org.apache.hadoop.fs.FileSystem`类,可以用于读取、写入和管理文件系统中的文件。 2. **Hadoop MapReduce**:MapReduce是Hadoop用于并行处理和分析大数据的编程模型。在GROUP...

    HDFS判断文件或目录是否存在——Shell命令实现 + Java代码实现

    hdfs dfs -test -e 文件或目录名 第三步,查看检测结果: echo $? 若输出结果为0,则说明文件或目录存在,若为1,则说明文件或目录不存在。 二、Java代码实现 import org.apache.hadoop.conf.Configuration; ...

    实验二、HDFS shell操作及HDFS Java API编程

    适合刚接触hadoop的学生或小白,内容包括HDFS shell操作及HDFS Java API编程 有图片有代码

    大数据-java操作HDFS基本操作

    在大数据领域,Java被广泛用于开发和操作分布式文件系统,如Hadoop的HDFS(Hadoop Distributed File System)。本文将详细讲解如何使用Java API进行HDFS的基本操作,包括增、删、查等常见任务。 首先,理解HDFS是至...

Global site tag (gtag.js) - Google Analytics