`
liyonghui160com
  • 浏览: 774753 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hive读取Flume正在写入的HDFS

阅读更多

 

Hive的表创建为外部分区表,例如:

 

USE mydb;

CREATE EXTERNAL TABLE mytable

  c1 String,

  c2 INT,

  c3 INT,

  create_time String

)

PARTITIONED BY (dt STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '|||';

 

 

然后创建分区,如:

 

ALTER TABLE mytable ADD PARTITION (dt = '2013-09-25') LOCATION '/data/mytable/2013-09-25/';

ALTER TABLE mytable ADD PARTITION (dt = '2013-09-26') LOCATION '/data/mytable/2013-09-26/';

ALTER TABLE mytable ADD PARTITION (dt = '2013-09-27') LOCATION '/data/mytable/2013-09-27/';

 

即Hive的表按天进行分区。指定到相应目录。

 

删除分区

 

alter table mytable drop partition (dt='2012-03-06')

 

注意hdfs的起始路径,从hdfs的根目录开始,不然会加载不到数据。

 

分区可以写成脚本每天自动执行。处理昨天的数据

 

yesterday=$(date -d '-1 day' '+%Y-%m-%d') 

$HIVE_HOME/bin/hive -e "use mydb;ALTER TABLE mytable ADD PARTITION (dt = '$yesterday') LOCATION '/user/hive/warehouse/tail/$yesterday/';"

 

 

而Flume中配置将数据保存到HDFS中,即HDFS sink。计划每天一个文件,进行日切。如2013-09-25对应的文件就保存在:

 

hdfs://<hive.metastore.warehouse.dir>/data/mytable/2013-09-25/FlumeData.xxx

 

这样,只要文件生成,就能直接通过操作Hive的mytable表来对文件进行统计了。

 

业务上要求统计工作是按照小时进行,考虑到按照小时进行分区过于细化,而且会导致过多的文件给NameNode造成内存压力,所以如上Hive层面按天进行划分。

 

统计执行时首先指定天分区,然后根据create_time(mm:hh:ss)指定统计时间段,如:

 

SELECT c1,

            SUM(c2),

            SUM(c3)

FROM mytable

WHERE dt = ’2013-09-25′

      AND create_time BETWEEN ’22:00:00′ AND ’22:59:59′

GROUP BY c1;

 

 

在实践的过程中遇到如下两个问题:

 

1.对于正在写入的文件,通过hadoop fs -ls 命令查看,其大小始终是0,即使通过hadoop fs -cat可以看到实际已经有内容存在!通过hive处理的话也看不到其中的数据。

 

2.Flume正在写入的文件,默认会有.tmp后缀。如果Hive在执行过程中,Flume切换文件,即将xxx.tmp重命名为xxx,这时Hive会报错如file not found xxx.tmp。

 

 

针对问题1

 

首先了解HDFS的特点:

 

HDFS中所有文件都是由块BLOCK组成,默认块大小为64MB。在我们的测试中由于数据量小,始终在写入文件的第一个BLOCK。而HDFS与一般的POSIX要求的文件系统不太一样,其文件数据的可见性是这样的:

 

    如果创建了文件,这个文件可以立即可见;

    写入文件的数据则不被保证可见了,哪怕是执行了刷新操作(flush/sync)。只有数据量大于1个BLOCK时,第一个BLOCK的数据才会被看到,后续的BLOCK也同样的特性。正在写入的BLOCK始终不会被其他用户看到!

    HDFS中的sync()保证数据持久化到了datanode上,然后可以被其他用户看到。

 

针对HDFS的特点,可以解释问题1中的现象,正在写入无法查看。但是使用Hive统计时Flume还在写入那个BLOCK(数据量小的时候),那岂不是统计不到信息?

 

解决方案:

 

每天再按小时切分文件——这样虽然每天文件较多,但是能够保证统计时数据可见!Flume上的配置项为hdfs.rollInterval。

 

如果文件数多,那么还可以考虑对以前的每天的小时文件合并为每天一个文件!

 

 

针对问题2

 

原因比较明显,Hive处理前获取了对应分区下的所有文件信息,其中包含xxx.tmp文件,而传递给MapReduce处理时,由于Flume进行了切换,导致原来的xxx.tmp变成了xxx,新的.tmp名称又变成了yyy.tmp,这样自然找不到xxx.tmp了。

 

解决方案:

 

解决这个问题想法之一是想控制Hive的处理时机,但是显然不是那么好控制。

 

进一步了解到HDFS的Java API读取HDFS文件时,会忽略以”.”和”_”开头的文件!类似于Linux中默认.xx是隐藏的一样,应用程序读取HDFS文件时默认也不读取.xxx和_xxx这样名称的文件!

 

这样就产生了针对问题2的处理方案一)配置Flume,针对正在写入的文件,以.号开头。涉及Flume配置项hdfs.inUsePrefix。

 

 

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'a
#agent section  
producer.sources = s 
producer.channels = c
producer.sinks = r

#producer.sources.s.type = seq
producer.sources.s.channels = c
producer.sources.s.type = exec
producer.sources.s.command=tail -n 0 -F /usr/local/nginx/nginxlog/access.log
producer.sources.s.deletePolicy=never
#producer.sources.s.type = avro
#producer.sources.s.bind = localhost
#producer.sources.s.port = 10000
# Each sink's type must be defined(给谁了)
#producer.sinks.r.type = avro
#producer.sinks.r.hostname = 10.1.1.100
#producer.sinks.r.port = 20000
producer.sources.source1.interceptors = i1
producer.sources.source1.interceptors.i1.type = timestamp

producer.sinks.r.type = hdfs
producer.sinks.r.hdfs.path = hdfs://localhost:8010/user/hive/warehouse/tail/%Y-%m-%d
producer.sinks.r.hdfs.inUsePrefix = .
producer.sinks.r.hdfs.maxOpenFiles = 5000
producer.sinks.r.hdfs.batchSize= 1000
producer.sinks.r.hdfs.fileType = DataStream
producer.sinks.r.hdfs.writeFormat =Text
producer.sinks.r.hdfs.rollSize = 128000000
producer.sinks.r.hdfs.rollCount = 0
producer.sinks.r.hdfs.rollInterval = 3600
producer.sinks.r.hdfs.useLocalTimeStamp = true
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000000
producer.channels.c.transactionCapacity = 1000
#producer.channels.c.type=file
#producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent
#producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen

 

 

分享到:
评论

相关推荐

    利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka

    Flume可以将MySQL的数据写入HDFS,以便进行后续的批处理分析或者作为其他Hadoop服务(如Hive、Pig等)的数据源。 4. **Kafka**: Kafka是一个高吞吐量的分布式消息系统,通常用作实时数据管道,将数据从一个位置传输...

    kafka+flume 实时采集oracle数据到hive中.docx

    二、Flume写入到HDFS Flume是一个分布式、可靠、高吞吐量的日志收集系统,能够实时地从Kafka中提取数据,并将其写入到HDFS中。为了实现这一点,需要先安装Flume,版本号为flume-1.9.0-bin.tar.gz。然后,需要配置...

    大数据采集技术-Flume监控日志到HDFS.pptx

    - **Sink配置**:设置HDFS Sink,将数据写入HDFS。需要指定HDFS路径和相关参数。 4. **启动Flume**:使用`bin/flume-ng agent`命令启动Flume客户端,指定配置文件,如`--conf-file job/flume-file-hdfs.conf`。 5...

    最好的大数据项目。用flume-kafaka-flume进行日志的读取,在hive设计数仓.zip

    3. Hive 数仓设计:Hadoop Hive 是一种基于 HDFS 的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合做离线批处理分析。在本项目中,Hive 被用于设计和构建数据仓库,对从 Kafka ...

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    例如,一个简单的配置可能包括一个从日志目录读取数据的 FileSource,一个内存通道以快速传输数据,以及一个将数据写入 HDFS 的 HDFS Sink。 在 CDH 5.5.0 中,Flume 可能已经集成了与其它 CDH 组件(如 HBase、...

    05_将数据导入HDFS.docx

    WebHDFS是HDFS的REST接口,提供了对HDFS的基本操作,如读取、写入、删除等。HttpFS是HDFS的另一个REST接口,提供了对HDFS的高级操作,如文件上传、下载、目录管理等。 导入数据的最佳实践 在导入数据时,有一些...

    flume1.8文档中文完整翻译版

    3. 接收器(Sinks):Flume的接收器有多种选择,如HDFSSink用于写入Hadoop分布式文件系统,HBaseSink用于写入HBase数据库,LoggerSink用于将数据记录到日志,还有KafkaSink用于将数据发送到Kafka主题。 三、Flume...

    Flume总结 全.pdf

    配置时,可以使用File Source来读取文件系统的文件,并配置适当的Sink将数据写入HDFS。 Flume 的灵活性和可扩展性使其成为大数据环境中数据流处理的首选工具。通过组合不同的Source、Channel和Sink,以及配置适当的...

    apache-flume-1.5.0-cdh5.3.6-bin.zip

    例如,你可以设置一个 Flume 代理来从一个日志文件源读取数据,并将数据写入 HDFS。 Flume 的核心概念包括: - **Source**:数据流入 Flume 的入口,可以是日志文件、网络套接字、JMS 消息等。 - **Channel**:...

    基于Java的日志服务器 Apache Flume.zip

    此外,Flume 还可以与其他大数据工具如Hive、Spark等结合,构建完整的数据处理流水线。 综上所述,Apache Flume 是一个强大的Java日志服务器,通过其灵活的架构和丰富的组件,为企业提供了一种高效、可靠的日志管理...

    大数据与云计算教程课件 优质大数据课程 08.HDFS文件接口(共41页).pptx

    本课程涵盖了Hadoop的多个关键组件,如MapReduce、YARN、Hive、HBase、Pig、Zookeeper、Sqoop、Flume、Kafka和Storm等,这些都是大数据处理和云计算中的重要工具和技术。通过学习这些课程,你可以全面了解大数据生态...

    大数据采集技术-Flume基本结构.pptx

    3. Sink:Sink是数据的出口,负责将Channel中的数据发送到目标位置,如HDFS Sink将数据写入Hadoop分布式文件系统,Hive Sink可以直接将数据插入到Hive表或分区,Avro Sink则将Flume事件转换成Avro格式并发送到指定的...

    基于大数据的电商数仓数据分析.doc

    - 第二层Flume从Kafka中读取消息,写入HDFS进行持久化存储。 - 在Hive中创建表结构,并导入HDFS上的数据。 4. **数据分析与展示**: - 利用Hive SQL对数据进行复杂查询和统计分析。 - 使用Presto进行结果展示,...

    大数据 HCIA-Big Data H13-711考题.docx

    在 Flume 传输数据的过程中,Sink 取走数据并写入目的地后,会将 events 从 channel 中删除。 六、FusionInsight Manager 界面显示 Hive 服务状态 FusionInsight Manager 界面显示 Hive 服务状态为 Bad 时,可能的...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    hadoop集群搭建以及大数据平台搭建

    配置Flume时,你需要定义Source、Channel和Sink,例如,可以创建一个从Web服务器读取日志的Source,将数据暂存在内存或磁盘的Channel,然后将数据写入HDFS的Sink。 Hive是基于Hadoop的数据仓库工具,可以将结构化的...

    大数据框架整理.pdf

    sinks负责数据输出,如写入HDFS或Kafka。Flume支持自定义拦截器和多级连接,实现数据流的定制处理和高可用性。 最后,Apache Storm是一个实时计算框架,用于处理持续的数据流。Spout从外部系统(如Kafka)获取数据...

    华为大数据认证HCIP-Big Data Developer H13-723大数据题库

    FileSystem类用于操作文件系统,比如读取和写入HDFS上的文件。Context类用于执行MapReduce作业的上下文。因此,答案为A.Job。 3. FusionInsightHD系统中Oozie作业提交:Oozie是一个用于管理和调度Hadoop作业的工作...

    基于Kafka Spark的数据处理系统.pptx

    Spark Streaming与Kafka集成,从Kafka中读取数据并写入HDFS。进一步的处理可能涉及到HBase等NoSQL数据库,用于快速查询和存储。例如,Spark Streaming可以将数据分批写入HBase,建立索引以便快速访问。此外,Spark的...

Global site tag (gtag.js) - Google Analytics