- 浏览: 74306 次
林林总总玩了Spark快一个月了, 打算试一下kafka的消息系统加上Spark Streaming 进行实时推送数据的处理。
简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer
Producer 的run方法产生数据:
SparkStreaming的consumer:
zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port
结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from localhost:9092
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。
关键是, localhost是在哪里改。。。。
尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。
折腾了两天, 基本上把本机可以动的地方都动了, 没用。
后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:
#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了
简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer
Producer 的run方法产生数据:
public void run() { KafkaProducer<Integer, String> producer = getProducer(); int messageNum = 0; Random rd = new Random(); while(true){ String page = "Page_" + rd.nextInt(15) + ".html"; Integer click = rd.nextInt(10); float stayTime = rd.nextFloat(); Integer likeOrNot = rd.nextInt(3); String messageStr = page + "\t" + click + "\t" + stayTime + "\t" + likeOrNot; long startTime = System.currentTimeMillis(); System.out.println("sending message: " + messageStr); producer.send(new ProducerRecord<>(topic, 999, messageStr)); messageNum++; try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
SparkStreaming的consumer:
val ssc = new StreamingContext("local[2]", "appName", Seconds(5),System.getenv("SPARK_HOME")) ssc.checkpoint("./") val kafkaStream = KafkaUtils.createStream(ssc, "10.32.190.165:2181", "test", Map("test"->1)) // val kafkaStream = KafkaUtils.createStream(ssc, Map("group.id"->"test","zookeeper.connect"->"10.32.190.165:2181", "zookeeper.connection.timeout.ms"->"10000"), Map("test"->1),StorageLevel.MEMORY_AND_DISK_SER_2) // kafkaStream.ytt val msgRDD = kafkaStream.map(_._2) val newRdd = msgRDD.map { x => (x.split("\t")(0), getValueOfPage(x.split("\t"))) }.reduceByKey((a,b) => a + b) val resultRdd = newRdd.transform(x =>x.sortByKey(false)) var updateFunc = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(x=> { var page = x._1 var nowValue = x._2.sum var oldValue : Double = x._3.getOrElse(0) Some(nowValue + oldValue) }.map { y => (x._1, y) }) } val initRDD = ssc.sparkContext.parallelize(List(("page_0.html", 0.0))) val stateRDD = newRdd.updateStateByKey[Double](updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true,initRDD); // val sortRDD = stateRDD.map(x => (x._2, x._1)) // newRdd. stateRDD.foreachRDD(r =>{ val sortedRDD = r.map(x => (x._2, x._1)).sortByKey(false) val topK = sortedRDD.take(3) topK.foreach(y => println(y)) }) // resultRdd.print() ssc.start() ssc.awaitTermination()
zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port
结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from localhost:9092
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。
关键是, localhost是在哪里改。。。。
尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。
折腾了两天, 基本上把本机可以动的地方都动了, 没用。
后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:
#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1115最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1438之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1852前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1486前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3381之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4133在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6670前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2243前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1487前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10171前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4667一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4738在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8807最近在玩spark streaming, 感觉到了他的强大。 ...
相关推荐
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
基于Flink+ClickHouse构建的分析平台,涉及 Flink1.9.0 、ClickHouse、Hadoop、Hbase、Kafka、Hive、Jmeter、Docker 、HDFS、MapReduce 、Zookeeper 等技术
Linux 环境下 Hive 的安装部署 CLI 和 Beeline 命令行的基本使用 常用 DDL 操作 分区表和分桶表 视图和索引 常用 DML 操作 数据查询详解 三、Spark Spark Core Spark SQL Spark Streaming 五、Flink 核心概念综述 ...
通过VirtualBox安装多台虚拟机,实现集群环境搭建。 优势:一台电脑即可。 应用场景:测试,学习。...内附百度网盘下载地址,有hadoop+zookeeper+spark+kafka等等·····需要的安装包和配置文件
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明。项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 含有代码注释,满分...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
这里提到的"基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时)"就是一个这样的解决方案,结合了三个关键组件:Apache Flume、Apache Kafka和Apache Spark。下面将详细介绍这三个技术及其在系统中的作用。...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
基于flume+kafka+HBase+spark+ElasticSearch的用户统计查询大数据开发项目无线上网基于flume+kafka+HBase+spark+ElasticSearch的用户统计查询大数据开发项目项目名称实时用户账单查询项目项目介绍利用企业建设的WIFI...
Spark-Streaming 提供了丰富的操作符,如窗口操作、DStream 之间的连接和转换,方便开发复杂的实时分析逻辑。 5. **异常检测**:在日志分析中,异常检测是关键部分。通过Spark-Streaming,我们可以利用机器学习算法...
在“Hadoop+Spark+Kafka+jar包”的场景下,jar包通常包含以下内容: 1. Hadoop的相关库,用于Spark与HDFS之间的交互,包括Hadoop客户端、配置文件等。 2. Spark的库,包括Spark Core、Spark SQL、Spark Streaming等...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络...
大数据 hadoop spark hbase ambari全套视频教程(购买的付费视频)
大数据实习hdfs+flume+kafka+spark+hbase+hive项目大发展大数据实习hdfs+flume+kafka+spark+hbase+hive项目
本项目"基于spark streaming+flume+kafka+hbase的实时日志处理分析系统"结合了多个关键技术,构建了一个高效、实时的数据处理流水线。下面将详细阐述这些技术及其在系统中的作用。 1. Spark Streaming: Spark ...
本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目的难度比较适中,内容都是经过助教老师审定过的,应该能够满足学习、使用需求,如果有需要的话可以放心下载使用。...