`
字母哥
  • 浏览: 70306 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

flume+kafka+sparkstreaming搭建整合

阅读更多
主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory

agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
 
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
 
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1

然后启动flume即可
 bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4

开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties

然后启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

然后创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

可以查看一下所有主题,验证一下
bin/kafka-topics.sh --list --zookeeper localhost:2181


然后开始写spark部分的代码
首先加入kafka的maven依赖
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.2.1</version>
		</dependency>

代码如下:
package cn.han

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._

object MySparkkafka {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
    val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")  
    //创建StreamingContext,3秒一个批次  
    val ssc = new StreamingContext(sc, Seconds(4))  
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
    val readParallelism = 5
    val topics = Map("test" -> 1)
    val sl=StorageLevel.MEMORY_ONLY
    val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
   
    
    val rss2=kafkaStream.flatMap(x =>{
      val by=x._2
      val sb=by.split(" ")
      sb
    })
    
    val rdd3=rss2.map(x=>(x,1))
    val rdd4=rdd3.reduceByKey(_+_)
    rdd4.print()
    //开始运行  
    ssc.start()  
    //计算完毕退出  
    ssc.awaitTermination()  
    sc.stop()  
  }
}


还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。
0
2
分享到:
评论

相关推荐

    基于flume+kafka_spark streaming+hbase的流式处理系统设计与实现.zip

    在构建实时大数据处理系统时,基于Flume、Kafka、Spark Streaming和HBase的组合是一个常见的选择。这个设计和实现的项目着重展示了如何利用这些工具搭建一个完整的流处理平台,适用于实时数据采集、存储和分析。以下...

    实时处理.rar

    第1章 课程介绍课程介绍 第2章 初识实时流处理 第3章 分布式日志收集框架Flume 第4章 分布式发布订阅消息系统Kafka ...第11章 Spark Streaming整合Flume&Kafka;打造通用流处理基础 第12章 Spark Streaming项目实战

    Spark Streaming实时流处理项目实战视频网盘下载

    第1章 课程介绍 第2章 初识实时流处理 第3章 分布式日志收集框架Flume 第4章 分布式发布订阅消息系统Kafka 第5章 实战环境搭建 第6章 Spark Streaming入门 第7章 Spark Streaming核心概念与编程 第8章 Spark ...

    Hadoop+Spark生态系统操作与实战指南.epub

    包括SparkCore、SparkSQL、SparkStreaming、DataFrame,以及介绍Scala、SparkAPI、SparkSQL、SparkStreaming、DataFrame原理和CDH版本环境下实战操作,其中Flume和Kafka属于Apache*开源项目也放在本篇讲解。...

    个人使用大数据开发-计算机专业简历.doc

    - **架构**:Nginx + Tomcat + Hadoop + Flume + Zookeeper + Kafka + Spark + HBase - **开发环境**:未详细说明 - **项目描述**:构建了一套用于收集和处理手机App日志的系统,支持实时数据分析与应用。 - **...

    vm安装高可以spark集群.rar

    总结,这个压缩包提供的资源涵盖了大数据生态中的关键组件,包括Spark集群的搭建、开发环境的配置,以及与其他工具如Hadoop、Hive、Kafka和Flume的整合。通过学习和实践这些内容,可以深入理解大数据处理的流程和...

    项目介绍5.docx大数据项目+项目介绍+面试辅导

    3. **实时项目**:利用SparkStreaming进行实时数据处理,处理用户行为数据和业务数据。SparkStreaming是Apache Spark的一个扩展,用于处理连续的数据流,支持微批处理。 4. **集群规模与硬件配置**:项目使用了12台...

    O2O行业数据平台实战从监控到诊断的数据产品搭建共33页

    在这个阶段,我们可能需要用到如Flume、Kafka等工具来收集来自各个业务端的数据,Hadoop或Spark Streaming用于实时处理这些数据,而Elasticsearch、Kibana等则用于数据的存储和可视化,以便快速洞察业务动态。...

    大数据工程师简历3份.pdf

    3. **流处理与日志框架**:掌握Storm流处理,熟悉Flume和Kafka,能整合这些工具进行实时数据处理。这对于处理不断生成的大量实时数据非常有用。 4. **数据仓库与数据库**:熟练运用Hive数据仓库进行数据查询和统计...

    大数据工程师简历3份.docx

    5. **日志处理框架**:熟悉Flume和Kafka,能够整合这些工具与Storm和Spark,实现数据的实时采集和处理。 6. **Hive数据仓库**:熟练使用Hive进行日志数据的查询和统计,具备一定的数据优化经验,能够提升数据分析...

    大数据入门指南v1.0

    - **Spark Streaming**:实时流处理,与Flume和Kafka的集成。 4. **Storm**: - **流处理简介**:介绍Storm作为实时处理平台的角色。 - **Storm环境搭建**:单机和集群部署步骤。 - **编程模型**:Tuples、...

    实现了一整套电商数仓的搭建,包括数据采集平台的搭建,将用户的行为数据分为四层分别分层搭建,并实现业务数据库的分层搭建

    我们可能需要构建ETL(提取、转换、加载)流程,使用工具如Kafka、Flume或Spark Streaming来实时处理和传输这些数据。同时,日志服务如Logstash或 beats 用于收集服务器、应用和用户行为的日志信息。 **2. 用户行为...

    基于Spark的电商用户分析系统-开题报告.pdf

    本课题旨在设计并实现一个基于Spark的电商用户分析系统,该系统将整合CDH大数据集群、Flume监控日志、Kafka数据传输、HDFS存储、Spark Streaming实时处理等多种技术,旨在对电商用户的访问行为、购物行为、点击行为...

    广西大数据应用专题开发技术方案-标包3.docx

    - **实时分析**:基于Spark Streaming,实现数据的实时批量处理和分析。 - **离线分析**:利用Spark SQL进行定时数据分析。 9. **ArcGIS二次开发**:利用ArcGIS提供的开发工具和技术,实现地理信息系统(GIS)的...

    数据采集处理项目-技术方案(DOC59页).doc

    总体设计可能涵盖了云基础设施的选择,如AWS、Azure或Google Cloud,以及数据采集工具的部署,如Flume、Kafka等。核心技术及功能将详细介绍如何实现这些需求,例如使用Spark Streaming进行实时数据处理,HDFS用于...

    2023年史上最全的大数据面试背诵草稿(适用于大数据开发,大数据运维,云计算,数据治理,大数据架构师)

    6. **实时计算系统**:面试者应熟悉实时处理技术,如Apache Flink或Spark Streaming,这些工具用于处理高速流入的数据流,满足低延迟需求。 7. **离线计算系统**:Hadoop MapReduce或Spark批处理框架用于处理大量...

    案例正文_基于招聘网站的离线统计及实时分析系统1

    - 配合Zookeeper进行集群管理,Kafka用于数据流处理,Flume收集日志,MongoDB和Redis存储非结构化和半结构化数据。 3. **数据处理与分析**: - 数据预处理阶段,清洗、整合爬取的原始数据,去除无效或重复信息,...

Global site tag (gtag.js) - Google Analytics