- 浏览: 254573 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
leibnitz:
有几点要请教下;a.在二阶段里有这样一句:引用例如如果一个 p ...
zookeeper源码学习 -
nettm:
不错,我也遇到了第一个问题
mongodb客户端错误集合 -
lingqi1818:
xiaoych 写道很好,研究了一年多了吧,哈哈 难得你上 ...
80x86系统启动原理 -
xiaoych:
很好,研究了一年多了吧,哈哈
80x86系统启动原理 -
pengpeng:
pengpeng 写道很强大。我觉得mas-slave那块可以 ...
分布式计算需求场景以及解决方案
最近有个需求要对mysql的全量数据迁移到hbase,虽然hbase的设计非常利于高效的读取,但是它的compaction实现对海量数据写入造成非常大的影响,数据到一定量之后,就开始抽风。
分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。
刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到:
可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。
这里我做了个偷懒的demo:
执行然之后,会在hdfs的/tmp目录下生成一份文件。注意批量写数据的时候一定要保证key的有序性
这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路径:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
执行完之后:
运行./hbase shell
>list
就会显示刚才导入的offer表了。
分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。
刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到:
/** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.mortbay.log.Log; /** * Writes HFiles. Passed KeyValues must arrive in order. * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted * attribute on created hfiles. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> { public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456); final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); // Invented config. Add to hbase-*.xml if other than default compression. final String compression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); return new RecordWriter<ImmutableBytesWritable, KeyValue>() { // Map of families to writers and how much has been output on the writer. private final Map<byte [], WriterLength> writers = new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { long length = kv.getLength(); byte [] family = kv.getFamily(); WriterLength wl = this.writers.get(family); if (wl == null || ((length + wl.written) >= maxsize) && Bytes.compareTo(this.previousRow, 0, this.previousRow.length, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) { // Get a new writer. Path basedir = new Path(outputdir, Bytes.toString(family)); if (wl == null) { wl = new WriterLength(); this.writers.put(family, wl); if (this.writers.size() > 1) throw new IOException("One family only"); // If wl == null, first file in family. Ensure family dir exits. if (!fs.exists(basedir)) fs.mkdirs(basedir); } wl.writer = getNewWriter(wl.writer, basedir); Log.info("Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); wl.written = 0; } kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = kv.getRow(); } /* Create a new HFile.Writer. Close current if there is one. * @param writer * @param familydir * @return A new HFile.Writer. * @throws IOException */ private HFile.Writer getNewWriter(final HFile.Writer writer, final Path familydir) throws IOException { close(writer); return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); } private void close(final HFile.Writer w) throws IOException { if (w != null) { StoreFile.appendMetadata(w, System.currentTimeMillis(), true); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) { close(e.getValue().writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; HFile.Writer writer = null; } }
可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。
这里我做了个偷懒的demo:
HFileOutputFormat hf = new HFileOutputFormat(); HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml")); conf.set("mapred.output.dir", "/tmp"); conf.set("hfile.compression", Compression.Algorithm.LZO.getName()); TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID()); RecordWriter writer = hf.getRecordWriter(context); KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); writer.write(null, kv); writer.write(null, kv1); writer.write(null, kv3); writer.write(null, kv4); writer.close(context);
执行然之后,会在hdfs的/tmp目录下生成一份文件。注意批量写数据的时候一定要保证key的有序性
这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路径:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
执行完之后:
运行./hbase shell
>list
就会显示刚才导入的offer表了。
评论
3 楼
Angel_Night
2012-03-26
解决了我的难题啊
十分感谢
十分感谢
2 楼
依然仰望天空
2011-11-04
很不错,学习
1 楼
pengpeng
2011-02-18
牛 ~~ 学习了
发表评论
-
spring mvc介绍
2014-06-29 18:18 865项目中用到了spring mvc,整理个文档给新手入门使用,欢 ... -
【转】GCC内嵌汇编
2012-07-26 15:37 1017http://wenku.baidu.com/view/58f ... -
汇编和C相互调用
2012-06-06 11:28 1037这里有几个原则: 1.调用者需要在调用前声明被调用者。 c的做 ... -
commons-io引起的ygc问题
2012-05-18 16:49 1352今天接到任务,图片上传服务器的性能有问题,高峰期间YGC频率在 ... -
mongodb客户端错误集合
2011-12-20 10:38 13567错误一: 调用代码: String map = "f ... -
jmeter java请求参数配置
2011-12-20 10:36 2181<JavaSampler guiclass=&quo ... -
openfire简介
2011-09-23 15:07 26952详细文章请下载附件。。。。。。 Openfire简介 ... -
计算机缓存漫谈
2011-06-27 16:36 1031见附件 见附件 见附件 -
va_list和vsnprintf
2011-06-22 15:40 1251http://blog.sina.com.cn/s/blog_ ... -
memcached源代码分析
2011-06-17 11:12 4945目录 一. 概述... 3 二 ... -
[转]关于SASL的介绍文档
2011-05-20 11:11 2http://docs. ... -
jetty服务器性能调整过程分析
2011-05-13 10:27 2659见附件 见附件 见附件 -
【转】“INT 21H”指令说明及使用方法
2011-04-07 14:29 2397很多初学汇编语言的同学可能会对INT 21H这条指令感到困 ... -
hbase-0.20.6数据写入服务端代码性能瓶颈分析
2011-03-29 16:33 1783目前我的实际配置是4台8核CPU,装4个regionServe ... -
再见c3p0
2011-03-28 16:24 1190c3p0已经很久不维护了,以后java数据库连接池的代码打算都 ... -
无侵入,系统性能监测程序,配置简单,欢迎下载
2011-03-21 09:51 2966本外挂主要目的是对系 ... -
keepalive的来龙去脉
2011-03-02 09:35 4791今天有同事反应在性能测试环境cpu load很高有500多,我 ... -
深入浅出IO程序设计—序
2011-02-24 10:31 1345作为一个程序员,除了 ... -
hbase&hadoop初探
2011-02-17 10:44 965见附件。。。 -
服务端到手机端的推送方式
2011-02-11 11:13 13151.无连接的方式 前提条件服务端知道客户端的IP地址,并且客户 ...
相关推荐
本文当是一个基于HBase的海量数据的实实时查询系统的原理分析。详细的介绍了大数据查询的原理。
从HBase的集群搭建、HBaseshell操作、java编程、架构、原理、涉及的数据结构,并且结合陌陌海量消息存储案例来讲解实战HBase 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为...
elasticsearch+hbase海量数据查询,支持千万数据秒回查询 一、ElasticSearch和Hbase ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch...
海量图片的存储是通过HBase实现的,HBase是一种面向列的NoSQL数据库,特别适合存储海量数据。 一、直接上传本地栅格数据将导致的问题 栅格数据的特点是每层的图片个数都为上层数量的四倍。在第20层时,仅仅第20层...
Hive 数据导入 HBase 的方法 Hive 是一个基于 Hadoop 的数据仓库工具,而 HBase 是一个基于 Hadoop 的 NoSQL 数据库。它们都是大数据处理的重要组件。在数据处理过程中,经常需要将数据从 Hive 导入到 HBase 中。...
本项目是一个基于Apache HBase的工具,旨在从CSV文件中批量导入数据到HBase数据库,并提供基本的数据操作示例。HBase是一个分布式、可扩展的大数据存储系统,适用于处理海量数据。本项目利用HBase的强大功能,实现了...
总之,Java在Hive和HBase的数据交互中起到桥梁作用,通过精心设计的数据处理流程和合理的利用HBase的Bulk Load特性,可以高效地将Hive中的大量数据导入到HBase,满足实时查询的需求。在大数据场景下,这种方案具有很...
在大数据领域,HBase是一个基于Hadoop的分布式数据库,它为海量结构化和半结构化数据提供了高可靠性、高性能的存储方案。HBase备份和数据恢复是系统运维中至关重要的一环,确保了业务连续性和数据安全性。同时,...
7. **监控和验证**:导入完成后,通过HBase的监控工具或自定义脚本检查导入结果,确保数据正确无误地导入到HBase。 在提供的"ImpDataToHbase"源代码中,我们可以看到这些步骤的具体实现。源代码可能包括了数据...
《HBase数据可视化系统构建详解》 在大数据领域,HBase作为一款分布式列式数据库,因其高并发、低...在实际应用中,可以根据需求进一步扩展功能,例如支持更复杂的查询条件、数据导出导入等,以满足不同场景的需求。
基于MapReduce和HBase的海量网络数据处理 大数据时代,网络数据的处理和分析变得越来越重要。传统的网络数据处理模式已经无法满足大数据量的需求,需要寻求更高效的网络数据计算模式和数据存储模式。基于MapReduce...
MySQL数据库同redis以及hbase高速全量,增量同步工具我的巴士实现MySQL数据库到Redis,以及HBASE的全量,以及增量同步支持通过正则表达式指定需要导出的db以及表总线程序无状态,每一行都有自己的位置点,位置点信息...
在大数据处理领域,基于Hadoop和HBase的大规模海量数据去重是一个常见的需求。Hadoop是Apache开源项目,提供了一个分布式文件系统(HDFS)和MapReduce计算框架,旨在处理和存储海量数据。HBase是建立在Hadoop之上的...
本文提出并实现了一套基于HBase的海量数据分布式序列存储方案,旨在优化存储性能,并提高系统的扩展性。 要理解本文提出的优化策略,首先需要了解HBase的基本概念和数据模型。HBase的表可以看作一个多维映射表,...
接下来,创建一个 DataFrame 代表 MySQL 表结构,然后使用 `saveAsTable` 方法将数据写入: ```scala import org.apache.spark.sql.jdbc.JdbcDialects$ val jdbcUrl = s"$url?useSSL=false&serverTimezone=UTC" ...
文章通过在HBase集群环境下使用真实GIS数据验证了上述方法,实验结果显示,提出的系统在海量数据存储和检索方面性能卓越,能够实现地理信息数据的高效存储和实时高速检索。 文章中的关键词包括大数据、HBase、栅格...
本篇文章将详细介绍如何将关系型数据库的数据导入到Hbase中,包括离线和实时两种方式。 1. 离线数据导入: 离线数据导入通常在系统低峰期进行,适用于大量数据迁移。常见的工具包括Apache Nifi、Sqoop和Hadoop ...