主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory
agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1
然后启动flume即可
bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4
开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
然后创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以查看一下所有主题,验证一下
bin/kafka-topics.sh --list --zookeeper localhost:2181
然后开始写spark部分的代码
首先加入kafka的maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.1</version>
</dependency>
代码如下:
package cn.han
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._
object MySparkkafka {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")
//创建StreamingContext,3秒一个批次
val ssc = new StreamingContext(sc, Seconds(4))
val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
val readParallelism = 5
val topics = Map("test" -> 1)
val sl=StorageLevel.MEMORY_ONLY
val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
val rss2=kafkaStream.flatMap(x =>{
val by=x._2
val sb=by.split(" ")
sb
})
val rdd3=rss2.map(x=>(x,1))
val rdd4=rdd3.reduceByKey(_+_)
rdd4.print()
//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()
sc.stop()
}
}
还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。
分享到:
相关推荐
在构建实时大数据处理系统时,基于Flume、Kafka、Spark Streaming和HBase的组合是一个常见的选择。这个设计和实现的项目着重展示了如何利用这些工具搭建一个完整的流处理平台,适用于实时数据采集、存储和分析。以下...
第1章 课程介绍课程介绍 第2章 初识实时流处理 第3章 分布式日志收集框架Flume 第4章 分布式发布订阅消息系统Kafka ...第11章 Spark Streaming整合Flume&Kafka;打造通用流处理基础 第12章 Spark Streaming项目实战
第1章 课程介绍 第2章 初识实时流处理 第3章 分布式日志收集框架Flume 第4章 分布式发布订阅消息系统Kafka 第5章 实战环境搭建 第6章 Spark Streaming入门 第7章 Spark Streaming核心概念与编程 第8章 Spark ...
包括SparkCore、SparkSQL、SparkStreaming、DataFrame,以及介绍Scala、SparkAPI、SparkSQL、SparkStreaming、DataFrame原理和CDH版本环境下实战操作,其中Flume和Kafka属于Apache*开源项目也放在本篇讲解。...
- **架构**:Nginx + Tomcat + Hadoop + Flume + Zookeeper + Kafka + Spark + HBase - **开发环境**:未详细说明 - **项目描述**:构建了一套用于收集和处理手机App日志的系统,支持实时数据分析与应用。 - **...
总结,这个压缩包提供的资源涵盖了大数据生态中的关键组件,包括Spark集群的搭建、开发环境的配置,以及与其他工具如Hadoop、Hive、Kafka和Flume的整合。通过学习和实践这些内容,可以深入理解大数据处理的流程和...
3. **实时项目**:利用SparkStreaming进行实时数据处理,处理用户行为数据和业务数据。SparkStreaming是Apache Spark的一个扩展,用于处理连续的数据流,支持微批处理。 4. **集群规模与硬件配置**:项目使用了12台...
在这个阶段,我们可能需要用到如Flume、Kafka等工具来收集来自各个业务端的数据,Hadoop或Spark Streaming用于实时处理这些数据,而Elasticsearch、Kibana等则用于数据的存储和可视化,以便快速洞察业务动态。...
3. **流处理与日志框架**:掌握Storm流处理,熟悉Flume和Kafka,能整合这些工具进行实时数据处理。这对于处理不断生成的大量实时数据非常有用。 4. **数据仓库与数据库**:熟练运用Hive数据仓库进行数据查询和统计...
5. **日志处理框架**:熟悉Flume和Kafka,能够整合这些工具与Storm和Spark,实现数据的实时采集和处理。 6. **Hive数据仓库**:熟练使用Hive进行日志数据的查询和统计,具备一定的数据优化经验,能够提升数据分析...
- **Spark Streaming**:实时流处理,与Flume和Kafka的集成。 4. **Storm**: - **流处理简介**:介绍Storm作为实时处理平台的角色。 - **Storm环境搭建**:单机和集群部署步骤。 - **编程模型**:Tuples、...
我们可能需要构建ETL(提取、转换、加载)流程,使用工具如Kafka、Flume或Spark Streaming来实时处理和传输这些数据。同时,日志服务如Logstash或 beats 用于收集服务器、应用和用户行为的日志信息。 **2. 用户行为...
本课题旨在设计并实现一个基于Spark的电商用户分析系统,该系统将整合CDH大数据集群、Flume监控日志、Kafka数据传输、HDFS存储、Spark Streaming实时处理等多种技术,旨在对电商用户的访问行为、购物行为、点击行为...
- **实时分析**:基于Spark Streaming,实现数据的实时批量处理和分析。 - **离线分析**:利用Spark SQL进行定时数据分析。 9. **ArcGIS二次开发**:利用ArcGIS提供的开发工具和技术,实现地理信息系统(GIS)的...
总体设计可能涵盖了云基础设施的选择,如AWS、Azure或Google Cloud,以及数据采集工具的部署,如Flume、Kafka等。核心技术及功能将详细介绍如何实现这些需求,例如使用Spark Streaming进行实时数据处理,HDFS用于...
6. **实时计算系统**:面试者应熟悉实时处理技术,如Apache Flink或Spark Streaming,这些工具用于处理高速流入的数据流,满足低延迟需求。 7. **离线计算系统**:Hadoop MapReduce或Spark批处理框架用于处理大量...
- 配合Zookeeper进行集群管理,Kafka用于数据流处理,Flume收集日志,MongoDB和Redis存储非结构化和半结构化数据。 3. **数据处理与分析**: - 数据预处理阶段,清洗、整合爬取的原始数据,去除无效或重复信息,...