存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩后的文件,我们直接在应用程序中如何读取里面的数据?答案是肯定的,但是比普通的文本读取要稍微复杂一点,需要使用到Hadoop的压缩工具类支持,比如处理gz,snappy,lzo,bz压缩的,前提是首先我们的Hadoop集群得支持上面提到的各种压缩文件。
本次就给出一个读取gz压缩文件的例子核心代码:
def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={
//访问hdfs文件,只读取gz结尾的压缩文件,如果是.tmp结尾的不会读取
val path=new Path("/collect_data/userlog/"+date+"/log*.gz")
//实例化压缩工厂编码类
val factory = new CompressionCodecFactory(conf)
//读取通配路径
val items=fs.globStatus(path)
var count=0
//遍历每一个路径文件
items.foreach(f=>{
//打印全路径
println(f.getPath)
//通过全路径获取其编码
val codec = factory.getCodec(f.getPath())//获取编码
//读取成数据流
var stream:InputStream = null;
if(codec!=null){
//如果编码识别直接从编码创建输入流
stream = codec.createInputStream(fs.open(f.getPath()))
}else{
//如果不识别则直接打开
stream = fs.open(f.getPath())
}
val writer=new StringWriter()
//将字节流转成字符串流
IOUtils.copy(stream,writer,"UTF-8")
//得到字符串内容
val raw=writer.toString
//根据字符串内容split出所有的行数据,至此解压数据完毕
val raw_array=raw.split("\n")
//遍历数据
raw_array.foreach(line=>{
val array = line.split("--",2) //拆分数组
val map = JSON.parseObject(array(1)).asScala
val userId = map.get("userId").getOrElse("").asInstanceOf[String] //为空为非法数据
val time = map.get("time").getOrElse("") //为空为非法数据
if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有数据
pushToKafka(topic,userId,line)
count=count+1
}
})
})
}
压缩和解压模块用的工具包是apache-commons下面的类:
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
如果想在Windows上调试,可以直接设置HDFS的地址即可
- val conf = new Configuration()//获取hadoop的conf
// conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上调试用
至此数据已经解压并读取完毕,其实并不是很复杂,用java代码和上面的代码也差不多类似,如果直接用原生的api读取会稍微复杂,但如果我们使用Hive,Spark框架的时候,框架内部会自动帮我们完成压缩文件的读取或者写入,对用户透明,当然底层也是封装了不同压缩格式的读取和写入代码,这样以来使用者将会方便许多。
参考文章:
https://blog.matthewrathbone.com/2013/12/28/reading-data-from-hdfs-even-if-it-is-compressed
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
Hadoop的设计目标是在由廉价商用硬件组成的大型集群上提供可靠的数据处理服务。HDFS提供了一种高效、可靠的方式来存储和管理海量数据,而MapReduce则是一种用于处理和生成大数据集的编程模型。 **MapReduce的运行...
本文详细介绍了如何在 Ubuntu 系统上配置 Hadoop 和 Apache Spark 环境。首先需要确保系统满足硬件和软件需求,然后按照顺序安装 Java、Hadoop 和 Spark,并正确配置它们的环境变量和属性文件。最后,通过启动 Spark...
3. **bin目录**:在解压后的"apache-zookeeper-3.8.0-bin.tar.gz"中,bin目录包含可执行文件,如`zkServer.sh`(启动ZooKeeper服务器)和`zkCli.sh`(ZooKeeper客户端命令行工具),方便用户管理和操作ZooKeeper集群...
2. **配置Hadoop**:需要在每台服务器上配置`core-site.xml`、`hdfs-site.xml`、`yarn-site.xml`等关键配置文件,以便能够正确地读取Hadoop配置。 #### 搭建Spark集群步骤 接下来,我们将详细介绍如何使用三台...
- **创建RDD**:在Spark中创建一个RDD(弹性分布式数据集),用于读取HDFS上的文本文件。 ```scala val lines = sc.textFile("hdfs://localhost:9000/network.txt") ``` - **数据处理**: - 使用`flatMap`函数...
- 使用 `textFile()` 方法读取文本文件或压缩文件。 - 支持通配符读取多个文件,如 `textFile("file:///dfs/data/*.txt")`。 #### 四、Spark 的数据处理流程 - **创建 RDD**: 通过并行化集合或读取 Hadoop 数据...
- 内存缓存:Tachyon3-3.0.0将数据存储在内存中,允许程序快速读取,减少了磁盘I/O操作,极大地提高了数据处理速度。 - 跨平台支持:该库可无缝集成到Hadoop、Spark等大数据处理框架中,同时支持多种操作系统,...
- 确保Spark配置正确,并能够连接到Hadoop集群或独立运行。 #### 三、基本操作 - **HDFS基本操作**: - 启动HDFS服务: ```bash $HADOOP_HOME/sbin/start-dfs.sh ``` - 创建目录: ```bash hdfs dfs -...
支持多种文件格式和压缩类型,如`.gz`、`.txt`等。 ##### 6. 数据读取策略 - **单个文件**: 直接读取特定路径下的文件。 - **文件夹**: 可以读取整个文件夹下的所有文件。 - **通配符**: 支持使用通配符读取多个...