`
lc_koven
  • 浏览: 353529 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

在不同版本hdfs集群之间转移数据

阅读更多
本文仅供记录一下程序心得:
    很多人会有这样一个需求:将一个hdfs集群上的数据写入另一个hdfs集群所在的hbase数据库。通常情况下两个hdfs集群的版本差距并不大,这样的程序会很容易写。但有时会跨大版本。比如作者所在的厂子,数据都在基于hadoop0.19.2版本修改的hdfs集群上,要将这样的数据导入版本为0.20.2+的hdfs集群,就不能使用同一个hadoop jar包来完成了。如何实现呢?
    最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。
    不过这有几个问题:
  • 效率降低
  • 占用本地磁盘空间
  • 不能应付实时导数据需求
  • 两个进程需要协调,复杂度增加

    更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。
    以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();


    但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢?
    琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。
    这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数
    代码段如下:
main.java
void init(){
	ClassLoader jarloader = generateJarLoader();
	Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
	Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
	Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
	proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
	while((line = getLine()) != null) {
		proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
		Method addPut = proxy.getClass().getMethod("addPut",
				new Class[]{String.class, String.class, String.class});
		addPut.invoke(proxy, new Object[]{field, column, encode});
		proxy.getClass().getMethod("putLine").invoke(proxy);
	}
}

ClassLoader generateJarLoader() throws IOException {
      String libPath = System.getProperty("java.ext.dirs");
      FileFilter filter = new FileFilter() {
      @Override
      public boolean accept(File pathname) {
      	if(pathname.getName().startsWith("hadoop-0.19.2"))
          return false;
      	else
      		return pathname.getName().endsWith(".jar");
      }
      };
      File[] jars = new File(libPath).listFiles(filter);
      URL[] jarUrls = new URL[jars.length+1];
		
      int k = 0;
      for (int i = 0; i < jars.length; i++) {
        jarUrls[k++] = jars[i].toURI().toURL();
      }
      jarUrls[k] = new File("hadoop-0.20.205.jar")
      ClassLoader jarloader = new URLClassLoader(jarUrls, null);
      return jarloader;
}


HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
     throws IOException{
		Configuration conf = new Configuration();
		conf.addResource(new Path(hbase_conf));
		config = new Configuration(conf);
		htable = new HTable(config, tableName);
		admin = new HBaseAdmin(config);
		htable.setAutoFlush(autoflush);
	}
public void addPut(String field, String column, String encode) throws IOException {
    try {
			p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
					field.getBytes(encode));
		} catch (UnsupportedEncodingException e) {
			p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
					field.getBytes());
		}
		
	}
    public void generatePut(String rowkey){
		p = new Put(rowkey.getBytes());
	}
	
    public void putLine() throws IOException{
		htable.put(p);
	}

    总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。
分享到:
评论
3 楼 brandom520 2014-07-24  
请问lz,我从hbase0.94版本上的数据导入到0.96.1.1-cdh5.0.3版本里。按照你提供的方法测试报错。帮看看什么原因
Caused by: java.lang.RuntimeException: hbase-default.xml file seems to be for and old version of HBase (0.94.13), this version is 0.96.1.1-cdh5.0.3
        at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:70)
        at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:102)
        at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:113)
        at org.apache.hadoop.hbase.client.HConnectionManager.<clinit>(HConnectionManager.java:212)
        ... 9 more
2 楼 lc_koven 2011-10-27  
fakechris 写道
hadoop distcp hftp://from hdfs://to 也可以

恩,很多情况下这也是一个选择,我忘写上去了:)。不过这个方法有三个弱点:1 对于自己修改的版本通常不能使用(比如cloudrea版本hdfs无法读入ugi密码等问题)。这是让我们放弃这个方法的主要原因 2 hftp安全性问题 3 不灵活,必须拷贝完成后再做一次数据导入
1 楼 fakechris 2011-10-27  
hadoop distcp hftp://from hdfs://to 也可以

相关推荐

    星环大数据平台HDFS

    HDFS的设计目标是运行在大量廉价商用机器组成的集群之上,这些机器的硬件错误被认为是常态,因此系统需要提供容错机制。HDFS采用了简单的一致性模型,支持文件的一次写入和多次读取,允许追加写入但不支持文件的随机...

    hdfs的高可用搭建

    HDFS (Hadoop Distributed File System) 是Hadoop生态系统中的分布式文件系统组件,主要用于存储海量数据。随着业务的发展,对数据处理的需求越来越高,单点故障的风险也日益突出。因此,确保HDFS的高可用性变得至关...

    大数据HDFS文档

    - 为了确保数据的可靠性,HDFS默认将每个数据块复制三份,并存储在不同的DataNode上。 - 当一个DataNode失效时,NameNode会自动检测并恢复丢失的数据块。 #### HDFS读写过程 - **读过程**: - 客户端向NameNode...

    HDFS Design

    为了保持集群中各个DataNode的负载均衡,HDFS提供了一种称为“均衡器”的工具,它可以将数据块从负载较高的DataNode转移到负载较低的DataNode上。 **8.3 数据完整性** HDFS提供了一种机制来检测数据块是否损坏,并...

    巴豆大数据团队讲师课件HDFS.pdf

    在故障转移发生时,会通过心跳机制保证Standby NameNode能够快速获取到Active状态,并保证在此过程中只允许一个NameNode处于Active状态,以避免数据丢失的风险。 此外,Hadoop 2.0还引入了NameNode联邦(Federation...

    Oracle与HDFS的桥梁_Sqoop

    Sqoop 则提供了一种方便的方式来在 Hadoop 生态系统和传统 RDBMS 之间转移数据,支持多种数据库,如 Oracle,以及与 Hive、HBase 等数据存储系统的交互。 Sqoop 提供了多个命令工具,以满足不同的需求: 1. `...

    【HDFS篇11】HA高可用1

    在Hadoop 2.0之前,HDFS(Hadoop Distributed File System)中的NameNode是一个明显的单点故障(SPOF,Single Point of Failure),一旦NameNode出现问题,整个HDFS集群都将无法正常工作,这极大地限制了系统的可用...

    Hadoop 2.6.5在CentOS6.8版本下的集群部署

    Apache ZooKeeper是一个分布式协调服务,对于Hadoop集群中的高可用性和故障转移至关重要,这里使用的是3.4.6版本。最后,我们要安装的就是Hadoop 2.6.5,这是一个稳定的版本,支持大规模的数据处理。 在用户规划...

    Hadoop数据迁移--从Oracle向Hadoop

    Hadoop数据迁移是指将存储在传统数据库系统(如Oracle)中的数据转移到Hadoop文件系统(HDFS)的过程。在这个过程中,MapReduce作为一种编程模型,用于处理和生成大数据集,被用来连接Hadoop与Oracle数据库,使得...

    14_尚硅谷大数据之HDFS HA高可用1

    然而,Hadoop 2.0之前的版本中,NameNode作为HDFS的关键角色,存在单点故障(Single Point of Failure, SPOF)的问题,一旦NameNode出现问题,整个HDFS集群将无法正常工作。为了提高系统的高可用性(High ...

    HDFS的概念-HDFS的高可用性.pdf

    HDFS的高可用性通过活动-备用NameNode的架构、共享日志和故障转移控制器等技术手段,显著降低了NameNode单点故障的风险,提高了整个Hadoop集群的稳定性和可靠性。这一特性使得HDFS能够支持大规模、关键业务的分布式...

    HDFS高可用配置手册.docx

    ### HDFS高可用配置手册 #### 一、HDFS高可用 ##### 1、基础描述 HDFS(Hadoop Distributed File System)是Apache ...此外,通过Zookeeper集群的辅助,实现了自动化故障转移,大大提高了HDFS集群的整体稳定性。

    Hadoop集群配置文件备份

    在Hadoop集群中,配置文件扮演着至关重要的角色,它们定义了集群的行为、性能优化参数以及故障转移策略等。本文将深入探讨“Hadoop集群配置文件备份”的重要性、步骤和最佳实践。 **1. Hadoop配置文件概述** Hadoop...

    HDFS High Availability(HA)高可用配置.doc

    在Hadoop 2.0.0之前,NameNode的单点故障问题显著限制了HDFS集群的可用性。一旦NameNode所在的机器或进程出现问题,整个集群将无法正常工作,直至NameNode恢复或在其他机器上重启。为了解决这个问题,Hadoop引入了HA...

    大数据课程-Hadoop集群程序设计与开发-10.Sqoop数据迁移_lk_edit.pptx

    Sqoop是一款由Apache开发的开源软件,专门用于高效地在HDFS或Hive与MySQL、Oracle等传统RDBMS之间转移数据。 ### 10.1 Sqoop概述 Sqoop是一个用于Hadoop和关系数据库间数据传输的工具。它允许用户将数据从RDBMS...

    PyPI 官网下载 | dbnd-hdfs-0.34.0.tar.gz

    今天我们将聚焦于其中的一个组件——dbnd-hdfs-0.34.0,这是一个专门针对HDFS(Hadoop Distributed File System)操作的Python库,它与Zookeeper协同工作,为大数据处理提供强大支持。 首先,我们来看标题"PyPI ...

    Hadoop2.2.0集群安装

    在该版本中,HDFS支持了一个名为“High Availability”(HA)的功能,该功能使得Hadoop集群即使在某个关键组件发生故障的情况下也能继续正常运行。 #### 二、HDFSHA架构 在Hadoop2.2.0中,HDFSHA架构实现了NameNode...

    代码-Windows环境下对Linux环境下的HDFS进行基本操作.zip

    在IT行业中,尤其是在大数据处理领域,Hadoop分布式文件系统(HDFS)是核心组件之一,它为大规模数据存储提供了高可靠性和高可...通过熟练掌握这些内容,IT专业人员可以在不同操作系统之间灵活地管理和操作Hadoop集群。

    完整版大数据云计算课程 Hadoop数据分析平台系列课程 Hadoop 12 集群的应用 共39页.pptx

    2. **数据集成与传递**:课程将教授如何在Hadoop和传统的关系型数据库之间高效地转移数据。这对于企业从现有系统向大数据平台迁移至关重要。 3. **Map-Reduce编程**:Map-Reduce是Hadoop处理数据的核心算法。学习者...

    hadoop2.x集群搭建.txt(hdfs和yarn貌似正常,但mapreduce 提交job执行失败,请看我的另一个资源,另一个搭建是成功的)

    - 配置Zookeeper集群来管理NameNode之间的切换,确保故障转移的平滑进行。 #### 六、MapReduce Job提交失败问题排查 从提供的信息来看,HDFS和YARN部分似乎运行正常,但是遇到了MapReduce提交Job执行失败的问题。...

Global site tag (gtag.js) - Google Analytics