package inok.storm.kafka.sample; import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyKafkaTopology { public static class KafkaWordSplitter extends BaseRichBolt { private static final Log LOG = LogFactory .getLog(KafkaWordSplitter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getString(0); LOG.info("RECV[kafka -> splitter] " + line); String[] words = line.split("\\s+"); for (String word : words) { LOG.info("EMIT[splitter -> counter] " + word); collector.emit(input, new Values(word, 1)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordCounter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(WordCounter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; private Map<String, AtomicInteger> counterMap; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counterMap = new HashMap<String, AtomicInteger>(); } public void execute(Tuple input) { String word = input.getString(0); int count = input.getInteger(1); LOG.info("RECV[splitter -> counter] " + word + " : " + count); AtomicInteger ai = this.counterMap.get(word); if (ai == null) { ai = new AtomicInteger(); this.counterMap.put(word, ai); } ai.addAndGet(count); collector.ack(input); LOG.info("CHECK statistics map: " + this.counterMap); } @Override public void cleanup() { LOG.info("The final result:"); Iterator<Entry<String, AtomicInteger>> iter = this.counterMap .entrySet().iterator(); while (iter.hasNext()) { Entry<String, AtomicInteger> entry = iter.next(); LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException { String path = System.getProperty("user.dir") + "/kafkaSpout.properties"; FileInputStream is = new FileInputStream(path); Properties props = new Properties(); props.load(is); String zks = props.getProperty("zks"); String topic = props.getProperty("topic"); String zkRoot = props.getProperty("zkRoot"); // String zks = "localhost:2181"; // String topic = "inoktext"; // String zkRoot = "/storm"; String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = true; spoutConf.zkServers = Arrays.asList(new String[] { "localhost" }); spoutConf.zkPort = 2181; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5 builder.setBolt("word-splitter", new KafkaWordSplitter(), 2) .shuffleGrouping("kafka-reader"); builder.setBolt("word-counter", new WordCounter()).fieldsGrouping( "word-splitter", new Fields("word")); Config conf = new Config(); String name = MyKafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
References
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/
https://github.com/apache/storm/tree/master/external/storm-kafka
https://storm.apache.org/2014/06/25/storm092-released.html
http://www.tuicool.com/articles/NzyqAn
https://github.com/stealthly/dropwizard-kafka-http
相关推荐
【标题】"03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar" 提供了一个关于实时大数据处理的实战教程,主要聚焦于Apache Storm和Apache Kafka的整合应用。Apache Storm是一个开源的分布式实时计算系统,而Kafka则...
3. **Storm与Kafka集成**:如何在Storm拓扑中使用Kafka Spout来从Kafka读取数据,以及如何配置和优化此过程。 4. **实时数据处理**:探讨实时数据处理的需求、挑战和解决方案,以及Storm在其中的角色。 5. **容错...
将 Storm 和 Kafka 结合,主要涉及 Storm 的 Kafka Spout。Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 ...
`KafkaSpout`是一个特殊的Spout,它负责从Kafka获取数据并将其作为流传递到Storm拓扑的其余部分。以下步骤概述了这一过程: 1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 ...
Storm的Spout test,订阅Kafka的producer,数据在bolt中处理完成之后再次发送到Kafka中 1.启动zookeeper ./bin/zkServer.sh start 2.启动Kafka nohup ./bin/kafka-server-start.sh ./config/server.properties & 这...
集成storm-kafka时,我们需要理解其核心概念,包括Spout和Bolt。Spout作为数据的源头,负责从Kafka读取消息;Bolt则处理这些数据,执行各种业务逻辑。在实际项目中,如1号店的电商实时数据分析系统,Spout会作为...
使用KafkaSpout作为Spout,它可以从Kafka的特定主题中读取消息,然后传递给下游的Bolt。Bolt可以自定义实现,比如可以包含计数器、滑动窗口或复杂的计算逻辑,用于实时分析数据并输出结果。 为了使这个系统具有通用...
Apache Storm 是一个分布式实时计算系统,它被设计用于处理大规模数据流,提供高吞吐量、低延迟的数据处理能力。在大数据领域,Storm 被广泛应用于实时分析、在线机器学习、持续计算、数据集成以及任何需要实时处理...
在代码层面,你需要创建一个Storm拓扑,并在其中配置一个使用`KafkaSpout`的Bolt。`KafkaSpout`会从Kafka主题中拉取数据,然后传递给下游的Bolts进行处理。你需要指定Kafka服务器的地址、主题名、消费者组ID等参数...
通常,这样的项目会有一个主类(如`StormKafkaDemo.java`),其中定义了Storm拓扑,并使用KafkaSpout来从Kafka消费者主题读取数据,然后通过Bolts进行处理,最后可能会将结果写回到Kafka的另一个主题,或者存储到...
这些JAR文件通常包含必要的API,使开发者能够在Storm拓扑中创建和配置KafkaSpout(用于从Kafka读取数据)和KafkaBolt(用于写入Kafka)。 在Java开发中,JAR文件是将多个类打包在一起的容器,这样就可以方便地分发...
在这个"storm-kafka-hdfs-starter"项目中,KafkaSpout是Storm的一个组件,它扮演了从Kafka消费数据的角色。KafkaSpout可以从Kafka主题中读取消息,然后将其作为数据流发送到Storm拓扑中。而HdfsBolt则是Storm的一个 ...
该Storm拓扑使用Kafka Spout读取来自Kafka的消息,并使用Bolt将从Kafka读取的传入消息解析为JSON消息。 然后将已解析的JSON消息加载到Elastic搜索中以使用Kibana进行仪表板和分析 该项目的前提条件:Zookeeper安装...
在这个示例中,Storm从Kafka获取日志消息,通过自定义的Spout组件消费这些消息。然后,Storm将数据传递给Bolt组件,进行进一步的业务逻辑处理,比如使用Drools进行规则引擎处理。 4. Drools:Drools是一个强大的...
在大数据处理领域,Flume、Kafka和Storm是三个至关重要的工具,它们分别在数据采集、数据分发和实时处理方面发挥着核心作用。这里我们将深入探讨这三个组件以及如何搭建它们。 1. Flume:Flume是Apache软件基金会的...
1. **Storm 吞吐量因素**:影响 Storm 吞吐量的主要因素包括 spout 并发数、worker 数量(与 slot 挂钩)、Kafka 的 partition 数量。Spout 的并发数与 Kafka 的 partition 数量紧密相关。 2. **worker 数量限制**:...
3. **配置**:在Storm拓扑中,需要配置KafkaSpout的消费者组、主题和Zookeeper地址等信息。 4. **容错机制**:storm-kafka提供了幂等性和 Exactly-once 语义,确保数据在处理过程中不丢失或重复。 综上所述,Kafka...
在Storm中,KafkaSpout是连接到Kafka的特殊Spout,负责从Kafka主题读取数据并分发给Bolts进行处理。它提供了容错机制,确保即使在故障情况下也能正确处理数据。 6. **容错性和可扩展性**: 整合后的系统具备了...
Storm spout 实现从 kafka 主题读取消息,并将这些消息作为单字段元组发送到风暴拓扑中。 可以在上找到文档。 发展 该实现由荷兰法医研究所创建,仍在开发中。 欢迎投稿,请阅读。 执照 这项工作是根据 Apache 许可...
我们将学习如何在Storm拓扑中使用KafkaSpout,接收Kafka中的实时流数据。 8. **测试与部署**:如何编写单元测试验证Kafka producer和consumer的正确性,以及如何在生产环境中部署和管理Kafka和Storm集群。 在这个...