`

将hdfs文件保存到hbase中

 
阅读更多

 0.0  wlan.dat文件内容:

 

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985079 	13823070001	20-7C-8F-70-68-1F:CMCC	120.196.100.99			6	3	360	180	200
1363157985069 	13600217502	00-1F-64-E2-E8-B1:CMCC	120.196.100.55			18	138	1080	186852	200

 

 

 

0 将上网日志数据传到hdfs中

    hdfs dfs -put /opt/wlan.dat  /zmdata/

 

1.在HBase中创建表wlan_log
   create 'wlan' ,'cf'

 

2.确定行键是什么
  手机号码:时间戳      

如果仅仅用手机号作为行健,那么同一手机号作为行健下,hbase的数据会被覆盖掉, 上述文件中,手机号

13560439658出现了两次重复,这里组成主键的时间戳 应该用 yyyyMMddHHmmssSSSS 最好带上毫秒,

否则依旧会出现最后hbase输出主键重复下只输出21条的结果,测试过,出现过这种情况(hbase主键相同下 数据会被覆盖)

这是使用SSSS后的主键:

13560439658:201602280032110142

13560439658:201602280032110143

 

3.代码

 

注意: 

1 mapper, reducer类都需要 static ,否则执行时候会报找不到对应类错误

2 FileInputFormat,TextInputFormat要引用org.apache.hadoop.mapreduce.lib.input下

3 因为写出数据到hbase 因此fileoutputformat不需要了

 

package hbase;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HbaseMRImport {
	
	
	// Mapper<后的泛型中,前两个参数表示k1 v1类型,后两个泛型参数表示k2,v2类型
	static class HbaseMRMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
		Text v2 = new Text();
		@Override
		protected void map(LongWritable k1, Text v1,  Context context) throws IOException, InterruptedException {
			String v1str = v1.toString();
			String[] v1arr = v1str.split("\t");
			String phone = v1arr[1]; // 得到手机号
			DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSSS");
			String dateStr = df.format(new Date());
			v2.set(phone+":"+dateStr + "\t" + v1.toString()); // 将手机号和当前时间戳作为第一个字段,然后当前行其余字段作为剩余字段,重新写出到reduce阶段
			context.write(k1, v2);
		}
	}
	
	static class HbaseMRReduce extends TableReducer<LongWritable, Text, NullWritable> {
		String family = "cf";
		@Override
		protected void reduce(LongWritable k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
			 for(Text v2 : v2s) {
				 String v2Str = v2.toString();
				 String[] splited = v2Str.split("\t");
				 String rowkey = splited[0];
				 
				Put put = new Put(rowkey.getBytes());
				put.add(family.getBytes(), "raw".getBytes(), v2.toString().getBytes()); // 将正行都保存起来,下面是将每个字段单独保存 方便灵活获取不同需求下数据
				put.add(family.getBytes(), "rePortTime".getBytes(), splited[1].getBytes());
				put.add(family.getBytes(), "msisdn".getBytes(), splited[2].getBytes());
				put.add(family.getBytes(), "apmac".getBytes(), splited[3].getBytes());
				put.add(family.getBytes(), "acmac".getBytes(), splited[4].getBytes());
				put.add(family.getBytes(), "host".getBytes(), splited[5].getBytes());
				put.add(family.getBytes(), "siteType".getBytes(), splited[6].getBytes());
				put.add(family.getBytes(), "upPackNum".getBytes(), splited[7].getBytes());
				put.add(family.getBytes(), "downPackNum".getBytes(), splited[8].getBytes());
				put.add(family.getBytes(), "upPayLoad".getBytes(), splited[9].getBytes());
				put.add(family.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes());
				put.add(family.getBytes(), "httpStatus".getBytes(), splited[11].getBytes());
				
				context.write(NullWritable.get(), put);
			 }
		}
	}

	/**
	 * @throws Exception 
	 * 
	 */
	public static void main(String[] args) throws Exception {
		
		// 0 初始化conf文件
		Configuration conf = new Configuration();
		conf.set("hbase.zookeeper.quorum", "hadoop3"); // 这里hbase是单节点,没有使用外部zk 这行和下面这行不加入测试依旧可以
		conf.set("hbase.rootdir", "hdfs://hadoop3:9000/hbase"); 
		conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan");
		// 0.1 定义任务
		Job job = new Job(conf,HbaseMRImport.class.getSimpleName());
		TableMapReduceUtil.addDependencyJars(job);
		job.setJarByClass(HbaseMRImport.class);
		
		// 1 设置自定义的mapper  reducer类的处理逻辑
		job.setMapperClass(HbaseMRMapper.class);
	    job.setReducerClass(HbaseMRReduce.class);
	    
	    // 2 设置 mapper类的 k2 v2输出类型
	    job.setMapOutputKeyClass(LongWritable.class);
	    job.setMapOutputValueClass(Text.class);
	    
	    // 设置reduce最后输出的 k3 v3类型 这里是对表输出 没有设置
	    /*job.setOutputKeyClass(theClass);
	    job.setOutputValueClass(theClass);*/
	    
	    // 3 设置读取文件 format 和输出文件 format
	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TableOutputFormat.class);
		
	    // 4 指定Job的输入源 和 输出目标
	    FileInputFormat.setInputPaths(job, "hdfs://hadoop3:9000/zmdata/wlan.dat");
	    // FileOutputFormat.setOutputPath(job, outputDir); 如果是写出到hdfs 那么需要目标hdfs文件位置
	    
	    // 5 提交Job
	    job.waitForCompletion(true);
	}

	
	
}

 

 

 

 

 

 

4.使用ant发到远程linux服务器运行,把HBase的相关jar包放到HADOOP_CLASSPATH中
  find / -name hadoop.env.sh  查找文件
  在hadoop-env.sh文件中增加如下代码:
  export HADOOP_CLASSPATH=/usr/local/hbase-0.98.8-hadoop2/lib/*

 或者将hbase的lib都拷贝到hadoop节点某一个文件夹下 然后在引入

 可以参看 hbase java操作代码简介和NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguratio 

 

 

 

 ant脚本:

 

 

<?xml version="1.0" encoding="UTF-8"?>

<project name="项目名称" basedir="." default="sshexec">
	<description>本配置文件供ANT编译项目、自动进行单元测试、打包并部署之用。</description>
	<description>默认操作(输入命令:ant)为编译源程序并发布运行。</description>

	<!--属性设置-->
	<property environment="env" />
	<property file="build.properties" />
	<property name="src.dir" value="${basedir}/src" />
	<property name="java.lib.dir" value="${env.JAVA_HOME}/lib" />
	<property name="classes.dir" value="${basedir}/classes" />
	<property name="dist.dir" value="${basedir}/dist" />
	<property name="third.lib.dir" value="${basedir}/lib" />
	<property name="localpath.dir" value="${basedir}" />
	<property name="remote.host" value="hadoop3"/>
	<property name="remote.username" value="root"/>
	<property name="remote.password" value="123456"/>
	<property name="remote.home" value="~"/>
	<!--每次需要知道的main类,写到这里-->
	<property name="main.class" value="hbase.MyHbaseAPI"/>

	<!-- 基本编译路径设置 -->
	<path id="compile.classpath">
		<fileset dir="${java.lib.dir}">
			<include name="tools.jar" />
		</fileset>
		<fileset dir="${third.lib.dir}">
			<include name="*.jar"/>
		</fileset>		
	</path>
	
	<!-- 运行路径设置 -->
	<path id="run.classpath">
		<path refid="compile.classpath" />
		<pathelement location="${classes.dir}" />
	</path>
	<!-- 清理,删除临时目录 -->
	<target name="clean" description="清理,删除临时目录">
		<!--delete dir="${build.dir}" /-->
		<delete dir="${dist.dir}" />
		<delete dir="${classes.dir}" />
		<echo level="info">清理完毕</echo>
	</target>
	<!-- 初始化,建立目录,复制文件 -->
	<target name="init" depends="clean" description="初始化,建立目录,复制文件">
		<mkdir dir="${classes.dir}" />
		<mkdir dir="${dist.dir}" />
	</target>
	<!-- 编译源文件-->
	<target name="compile" depends="init" description="编译源文件">
		<javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7"  includeAntRuntime="false" debug="false" verbose="false">
		    <compilerarg line="-encoding UTF-8 "/> 
			<classpath refid="compile.classpath" />
		</javac>
	</target>
	<!-- 打包类文件 -->
	<target name="jar" depends="compile" description="打包类文件">
		<jar jarfile="${dist.dir}/jar.jar">
			<fileset dir="${classes.dir}" includes="**/*.*" />
		</jar>
	</target>
	
	<!--上传到服务器
	**需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。
	-->
	<target name="ssh" depends="jar">
		<scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.host}:${remote.home}" password="${remote.password}" trust="true"/>
	</target>
	
	<target name="sshexec" depends="ssh">
	      <sshexec host="${remote.host}" username="${remote.username}"  password="${remote.password}" trust="true" command="source /etc/profile;hadoop jar ${remote.home}/jar.jar ${main.class}"/>
	</target>
</project>

 

 

5   手机上网日志字段介绍:

 

  • 大小: 18.3 KB
分享到:
评论

相关推荐

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    通过结合上述两段代码,你可以实现从HBase中读取数据并写入HDFS的功能。这只是一个基本的实现,实际应用中可能需要处理更复杂的情况,例如批量读取、错误处理、数据转换等。同时,为了提高性能,你还可以考虑使用...

    HDFS_HBaseShell的常用命令

    1. 将磁盘上的文件放到HDFS上:Hadoop fs -put /local /hdfs 2. 将HDFS上的文件放到磁盘上:Hadoop dfs -get /hdfs /local 3. 列出HDFS上的目录内容:Hadoop dfs –ls /hdfsDir 4. 删除HDFS下的文档:Hadoop dfs -...

    14、HDFS 透明加密KMS

    【HDFS 透明加密KMS】是Hadoop分布式文件系统(HDFS)提供的一种安全特性,用于保护存储在HDFS中的数据,确保数据在传输和存储时的安全性。HDFS透明加密通过端到端的方式实现了数据的加密和解密,无需修改用户的应用...

    4.抽取mysql数据到hbase表中.docx

    在这个案例中,我们需要将MySQL中的数据抽取出来并加载到HBase中。 #### 二、HBase集群搭建与使用 1. **HBase集群搭建**: - 需要安装好Hadoop环境,并且确保Hadoop集群可以正常运行。 - 下载并配置HBase,包括...

    HbaseTemplate 操作hbase

    HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data Hadoop是Spring框架的一部分,它提供了与Hadoop生态系统集成的工具,包括对HBase的操作支持。本篇文章...

    hbase_libJar包

    本文将深入探讨`hbase_libJar包`的组成、作用以及如何在Linux系统中正确配置环境变量`HBASE_HOME`,以确保HBase服务的稳定运行。 首先,我们来了解`hbase_libJar包`的构成。这个压缩包通常包含了大量的JAR文件,...

    Hbase 官方中文文档

    此外,文档还提供了将内容保存为PDF或本地文件的方式,方便用户在离线状态下阅读。 最后,文档中提到了一些关于Hadoop生态系统中的其他组件,比如Hadoop、MapReduce、HDFS和Zookeeper。了解这些组件的工作原理对于...

    【HBase企业应用开发】工作中自己总结的Hbase文档,非常全面!

    HBase是一种分布式的、面向列的数据库管理系统,它利用Hadoop HDFS作为其文件存储系统,使用Hadoop MapReduce来处理HBase中的海量数据,并且使用Zookeeper作为其协同服务。HBase以表的形式存储数据,表由行和列组成...

    hbase安装包

    3. 行键有序:HBase中的行是根据行键(Row Key)排序的,这有利于范围查询和扫描操作。 4. 实时读写:HBase提供了毫秒级的读写速度,对于大数据实时处理非常有用。 5. 高可用性:通过HMaster和RegionServer的架构...

    HBase实现批量存取

    此外,HBase还提供了批量操作的工具,如HBase的`BulkLoad`功能,它可以将预先格式化的数据文件直接加载到HFile中,进一步提升写入速度。这个过程通常包括数据预处理、生成SequenceFile、上传到HDFS以及执行`...

    hadoop hbase从入门到精通

    HDFS是分布式文件系统,它将大文件分割成块并存储在集群的不同节点上,确保高可用性和容错性。MapReduce则是一种编程模型,用于大规模数据集的并行处理,它将复杂任务分解为“映射”和“归约”阶段,使得数据处理...

    HBase详细讲解

    它采用了Google的Bigtable论文中提到的数据模型,并且与Hadoop的HDFS文件系统紧密集成,适合存储大量非结构化数据。 HBase的历史可以追溯到2006年,当时作为一个项目发起,后于2010年升级成为Apache的顶级项目。...

    在Ubuntu安装配置hbase

    将下载的HBase 2.2.2压缩包复制到虚拟机中,通常使用SSH或共享文件夹功能。解压缩文件,将其放在一个合适的目录下,比如 `/usr/local`。记住这个路径,因为后续配置会用到。 3. **设置环境变量**: 打开终端,...

    05_将数据导入HDFS.docx

    Sqoop使用JDBC(Java数据库连接)连接到RDBMS,并自动生成一个Java类将数据导入HDFS。Sqoop功能包括导入单个表或数据库中的所有表、指定导入哪些行、列、提供任意的SELECT语句等。 Hadoop提供的REST接口 Hadoop...

    安装HBase,并启动运行

    例如,你可以通过FTP或SCP等方式将`hbase-1.2.6-bin.tar.gz`文件上传到`hadoop0`节点。 一旦安装包上传成功,下一步是解压缩。使用`tar`命令来完成这个任务,如`tar -zxvf hbase-1.2.6-bin.tar.gz`,这会将安装包解...

    hadoop-weather-analysis:该项目将下载世界上大多数国家的天气历史数据,并将数据存储到HDFS中。 将数据放入HDFS后,映射器和化简器作业将针对该数据运行,并将分析结果保存到HBase。 该代码是使用Java和Hbase作为NoSQL数据库在Hadoop 2.8上开发和执行的

    将数据放入HDFS后,映射器和化简器作业将针对该数据运行,并将分析结果保存到HBase。 该代码是使用Java和Hbase作为NoSQL数据库在Hadoop 2.8上开发和执行的。 这是运行该应用程序的步骤 ###### 1。 运行shell脚本和...

    Hbase分布式数据库 v1.7.2.zip

    HBase,全称为Apache HBase,是构建在Hadoop文件系统(HDFS)之上的开源、非关系型、分布式数据库。它属于NoSQL数据库的一种,特别适合于处理海量数据,尤其适用于实时读写操作。HBase v1.7.2作为该系统的最新稳定...

    eclipse链接hbase所需jar包,hbase版本1.2.6,Hadoop版本2.7.1

    在开发HBase应用程序时,Eclipse作为Java IDE是常见的选择,因为它提供了强大的代码编辑、调试和项目管理...通过正确配置和使用这些jar包,你将能够在Eclipse中顺利地与HBase进行交互,进行数据读写、表管理等操作。

    HBase大数据.zip

    HBase,作为Apache软件基金会的一个顶级项目,是构建在Hadoop文件系统(HDFS)之上的非关系型数据库,专门设计用于处理大规模数据集。它的主要特点是列式存储、分布式、可扩展性以及实时读写能力,使其成为大数据...

    Hbase MOB 样例

    当用户尝试写入一个小文件到 HBase 表时,如果该文件大小小于 MOB 阈值,HBase 会将其转换为一个 MOB 值,并存储在 HBase 的特殊区域中,同时在原表中保存一个指向 MOB 值的引用。** **在“archive-master”这个...

Global site tag (gtag.js) - Google Analytics