`

关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错

阅读更多
林林总总玩了Spark快一个月了, 打算试一下kafka的消息系统加上Spark Streaming 进行实时推送数据的处理。

简单的写了一个类作为kafka的producer, 然后SparkStreaming的类作为consumer

Producer 的run方法产生数据:

public void run() {
		
		KafkaProducer<Integer, String> producer = getProducer();
		
		int messageNum = 0;
		
		Random rd = new Random();
		
		while(true){
			
			
			String page = "Page_" + rd.nextInt(15) + ".html";
			
			Integer click = rd.nextInt(10);
			
			float stayTime = rd.nextFloat();
			Integer likeOrNot = rd.nextInt(3);
			
			String messageStr = page + "\t" + click + "\t" + stayTime + "\t" + likeOrNot;
			
            long startTime = System.currentTimeMillis();
            System.out.println("sending message: " + messageStr);
            producer.send(new ProducerRecord<>(topic, 999, messageStr));
            messageNum++;
            
            try {
				Thread.sleep(200);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		
	}


SparkStreaming的consumer:

val ssc = new StreamingContext("local[2]", "appName", Seconds(5),System.getenv("SPARK_HOME"))
    ssc.checkpoint("./")
    
    val kafkaStream = KafkaUtils.createStream(ssc, "10.32.190.165:2181", "test", Map("test"->1))
//    val kafkaStream = KafkaUtils.createStream(ssc, Map("group.id"->"test","zookeeper.connect"->"10.32.190.165:2181", "zookeeper.connection.timeout.ms"->"10000"),  Map("test"->1),StorageLevel.MEMORY_AND_DISK_SER_2)
//    kafkaStream.ytt
    
    val msgRDD = kafkaStream.map(_._2)
    val newRdd = msgRDD.map { x => (x.split("\t")(0), getValueOfPage(x.split("\t"))) }.reduceByKey((a,b) => a + b)
    
    val resultRdd = newRdd.transform(x =>x.sortByKey(false))
    
    var updateFunc = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
      
      
      iterator.flatMap(x=> {
        
        var page = x._1
        var nowValue = x._2.sum
        var oldValue : Double = x._3.getOrElse(0)
        
        
        Some(nowValue + oldValue)
        
      }.map { y => (x._1, y) })
    }
    
    val initRDD = ssc.sparkContext.parallelize(List(("page_0.html", 0.0)))
    
    val stateRDD = newRdd.updateStateByKey[Double](updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true,initRDD);
    
//    val sortRDD = stateRDD.map(x => (x._2, x._1))
    
    
    
//    newRdd.
    
    stateRDD.foreachRDD(r =>{
      
      val sortedRDD = r.map(x => (x._2, x._1)).sortByKey(false)
      
      val topK = sortedRDD.take(3)
      
      topK.foreach(y => println(y))
      
      
    })
//    resultRdd.print()
    ssc.start()
    ssc.awaitTermination()



zookeeper和Kafka的config都是默认配置, 由于资源不够, 目前都是单机环境, 就改了一下zookeeper的server port, 和kafka这边zookeeper的host+port

结果运行的时候就报错:
16/06/28 16:55:33 WARN ClientUtils$: Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/28 16:55:33 INFO SyncProducer: Disconnecting from localhost:9092
16/06/28 16:55:33 WARN ConsumerFetcherManager$LeaderFinderThread: [test_CNCSHUM4L3C-1467104130749-71111338-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more

可以看到kafka的broker是去找localhost:9092, 由于eclipse环境在本地, kafka和zookeeper是在vm上面, 基本锁定是这个原因。

关键是, localhost是在哪里改。。。。

尝试跟踪源码, 发现KafkaUtils.createStream 有一个方法是可以传一个Map进去, Map里面存的configuration。 尝试传入"metadata.broker.list", 结果从spark的日志中看到这个属性不能在这里设置, 直接被ingore了。

折腾了两天, 基本上把本机可以动的地方都动了, 没用。

后来想到会不会是kafka启动的时候用的server.properties里面有设置, 打开一看, 果然, 有一个属性:

#advertised.listeners=PLAINTEXT://your.host.name:9092
默认被注释掉了, 看说明如果被注释掉了后就直接设置成localhost了, 果断修改成Kafka的IP:Port,重启Kafka, 启动producer, 运行Consumer, 错误消失, 分析结果出来了 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics