论坛首页 Java企业应用论坛

kafka producer

浏览 2929 次
锁定老帖子 主题:kafka producer
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2015-03-31  
这时kafka一个入门例子
在运行时出现,不知道有没有谁也碰到的
或熟悉kafka的指导一下

环境
ubuntu 14.04
scala 2.10.5
jdk 1.8
kafka 2.10-0.8.2.1


Exception in thread "main" java.lang.NullPointerException
at scala.Predef$.Integer2int(Predef.scala:392)
at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:103)
at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:102)
at kafka.producer.BrokerPartitionInfo.<init>(BrokerPartitionInfo.scala:32)
at kafka.producer.async.DefaultEventHandler.<init>(DefaultEventHandler.scala:41)
at kafka.producer.Producer.<init>(Producer.scala:60)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.test.producer.TestProducer.main(TestProducer.java:32)



public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();

Properties props = new Properties();
              //props.put("metadata.broker.list",
              // "broker0:9092,broker1:9093,broker2:9094 ");
props.put("metadata.broker.list",
"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class",         "com.test.producer.SimplePartitioner");//props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
"page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics