`

storm: storm-kafka spout

 
阅读更多

 

 

 

package inok.storm.kafka.sample;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyKafkaTopology {
	public static class KafkaWordSplitter extends BaseRichBolt {

		private static final Log LOG = LogFactory
				.getLog(KafkaWordSplitter.class);
		private static final long serialVersionUID = 886149197481637894L;
		private OutputCollector collector;

		public void prepare(Map stormConf, TopologyContext context,
				OutputCollector collector) {
			this.collector = collector;
		}

		public void execute(Tuple input) {
			String line = input.getString(0);
			LOG.info("RECV[kafka -> splitter] " + line);
			String[] words = line.split("\\s+");
			for (String word : words) {
				LOG.info("EMIT[splitter -> counter] " + word);
				collector.emit(input, new Values(word, 1));
			}
			collector.ack(input);
		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word", "count"));
		}

	}

	public static class WordCounter extends BaseRichBolt {

		private static final Log LOG = LogFactory.getLog(WordCounter.class);
		private static final long serialVersionUID = 886149197481637894L;
		private OutputCollector collector;
		private Map<String, AtomicInteger> counterMap;

		public void prepare(Map stormConf, TopologyContext context,
				OutputCollector collector) {
			this.collector = collector;
			this.counterMap = new HashMap<String, AtomicInteger>();
		}

		public void execute(Tuple input) {
			String word = input.getString(0);
			int count = input.getInteger(1);
			LOG.info("RECV[splitter -> counter] " + word + " : " + count);
			AtomicInteger ai = this.counterMap.get(word);
			if (ai == null) {
				ai = new AtomicInteger();
				this.counterMap.put(word, ai);
			}
			ai.addAndGet(count);
			collector.ack(input);
			LOG.info("CHECK statistics map: " + this.counterMap);
		}

		@Override
		public void cleanup() {
			LOG.info("The final result:");
			Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
					.entrySet().iterator();
			while (iter.hasNext()) {
				Entry<String, AtomicInteger> entry = iter.next();
				LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
			}

		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word", "count"));
		}
	}

	public static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, InterruptedException, IOException {

		String path = System.getProperty("user.dir") + "/kafkaSpout.properties";
		FileInputStream is = new FileInputStream(path);
		Properties props = new Properties();
		props.load(is);

		String zks = props.getProperty("zks");
		String topic = props.getProperty("topic");
		String zkRoot = props.getProperty("zkRoot");

		// String zks = "localhost:2181";
		// String topic = "inoktext";
		// String zkRoot = "/storm";

		String id = "word";

		BrokerHosts brokerHosts = new ZkHosts(zks);
		SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		spoutConf.forceFromStart = true;
		spoutConf.zkServers = Arrays.asList(new String[] { "localhost" });
		spoutConf.zkPort = 2181;

		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
		builder.setBolt("word-splitter", new KafkaWordSplitter(), 2)
				.shuffleGrouping("kafka-reader");
		builder.setBolt("word-counter", new WordCounter()).fieldsGrouping(
				"word-splitter", new Fields("word"));

		Config conf = new Config();

		String name = MyKafkaTopology.class.getSimpleName();
		if (args != null && args.length > 0) {
			// Nimbus host name passed from command line
			conf.put(Config.NIMBUS_HOST, args[0]);
			conf.setNumWorkers(3);
			StormSubmitter.submitTopologyWithProgressBar(name, conf,
					builder.createTopology());
		} else {
			conf.setMaxTaskParallelism(3);
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology(name, conf, builder.createTopology());
			Thread.sleep(60000);
			cluster.shutdown();
		}
	}
}

 

 

 

 

 

 

 

 

References

http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/

https://github.com/apache/storm/tree/master/external/storm-kafka

https://storm.apache.org/2014/06/25/storm092-released.html

http://www.tuicool.com/articles/NzyqAn

https://github.com/stealthly/dropwizard-kafka-http

分享到:
评论

相关推荐

    03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar

    【标题】"03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar" 提供了一个关于实时大数据处理的实战教程,主要聚焦于Apache Storm和Apache Kafka的整合应用。Apache Storm是一个开源的分布式实时计算系统,而Kafka则...

    03、storm项目实战课程-Kafka0.8Optr2.rar

    3. **Storm与Kafka集成**:如何在Storm拓扑中使用Kafka Spout来从Kafka读取数据,以及如何配置和优化此过程。 4. **实时数据处理**:探讨实时数据处理的需求、挑战和解决方案,以及Storm在其中的角色。 5. **容错...

    storm-kafka整合代码

    将 Storm 和 Kafka 结合,主要涉及 Storm 的 Kafka Spout。Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 ...

    StormStorm集成Kafka 从Kafka中读取数据

    `KafkaSpout`是一个特殊的Spout,它负责从Kafka获取数据并将其作为流传递到Storm拓扑的其余部分。以下步骤概述了这一过程: 1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 ...

    storm-kafka:Storm-kafka 项目

    Storm的Spout test,订阅Kafka的producer,数据在bolt中处理完成之后再次发送到Kafka中 1.启动zookeeper ./bin/zkServer.sh start 2.启动Kafka nohup ./bin/kafka-server-start.sh ./config/server.properties & 这...

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    集成storm-kafka时,我们需要理解其核心概念,包括Spout和Bolt。Spout作为数据的源头,负责从Kafka读取消息;Bolt则处理这些数据,执行各种业务逻辑。在实际项目中,如1号店的电商实时数据分析系统,Spout会作为...

    storm-kafka实时趋势分析

    使用KafkaSpout作为Spout,它可以从Kafka的特定主题中读取消息,然后传递给下游的Bolt。Bolt可以自定义实现,比如可以包含计数器、滑动窗口或复杂的计算逻辑,用于实时分析数据并输出结果。 为了使这个系统具有通用...

    apache-storm-2.4.0.tar.gz

    Apache Storm 是一个分布式实时计算系统,它被设计用于处理大规模数据流,提供高吞吐量、低延迟的数据处理能力。在大数据领域,Storm 被广泛应用于实时分析、在线机器学习、持续计算、数据集成以及任何需要实时处理...

    storm链接kafka时需要的jar包

    在代码层面,你需要创建一个Storm拓扑,并在其中配置一个使用`KafkaSpout`的Bolt。`KafkaSpout`会从Kafka主题中拉取数据,然后传递给下游的Bolts进行处理。你需要指定Kafka服务器的地址、主题名、消费者组ID等参数...

    storm集成kafka插demo.zip

    通常,这样的项目会有一个主类(如`StormKafkaDemo.java`),其中定义了Storm拓扑,并使用KafkaSpout来从Kafka消费者主题读取数据,然后通过Bolts进行处理,最后可能会将结果写回到Kafka的另一个主题,或者存储到...

    storm-kafak相关jar

    这些JAR文件通常包含必要的API,使开发者能够在Storm拓扑中创建和配置KafkaSpout(用于从Kafka读取数据)和KafkaBolt(用于写入Kafka)。 在Java开发中,JAR文件是将多个类打包在一起的容器,这样就可以方便地分发...

    storm-kafka-hdfs-starter

    在这个"storm-kafka-hdfs-starter"项目中,KafkaSpout是Storm的一个组件,它扮演了从Kafka消费数据的角色。KafkaSpout可以从Kafka主题中读取消息,然后将其作为数据流发送到Storm拓扑中。而HdfsBolt则是Storm的一个 ...

    Storm-Kafka-ES:风暴拓扑将风暴与Kafka和Elasticsearch集成

    该Storm拓扑使用Kafka Spout读取来自Kafka的消息,并使用Bolt将从Kafka读取的传入消息解析为JSON消息。 然后将已解析的JSON消息加载到Elastic搜索中以使用Kibana进行仪表板和分析 该项目的前提条件:Zookeeper安装...

    flume-kafka-storm-drools-example

    在这个示例中,Storm从Kafka获取日志消息,通过自定义的Spout组件消费这些消息。然后,Storm将数据传递给Bolt组件,进行进一步的业务逻辑处理,比如使用Drools进行规则引擎处理。 4. Drools:Drools是一个强大的...

    flume及kafka及storm搭建.rar

    在大数据处理领域,Flume、Kafka和Storm是三个至关重要的工具,它们分别在数据采集、数据分发和实时处理方面发挥着核心作用。这里我们将深入探讨这三个组件以及如何搭建它们。 1. Flume:Flume是Apache软件基金会的...

    storm、kafka、flume性能测试

    1. **Storm 吞吐量因素**:影响 Storm 吞吐量的主要因素包括 spout 并发数、worker 数量(与 slot 挂钩)、Kafka 的 partition 数量。Spout 的并发数与 Kafka 的 partition 数量紧密相关。 2. **worker 数量限制**:...

    kafka使用与安装

    3. **配置**:在Storm拓扑中,需要配置KafkaSpout的消费者组、主题和Zookeeper地址等信息。 4. **容错机制**:storm-kafka提供了幂等性和 Exactly-once 语义,确保数据在处理过程中不丢失或重复。 综上所述,Kafka...

    Storm整合Kafka

    在Storm中,KafkaSpout是连接到Kafka的特殊Spout,负责从Kafka主题读取数据并分发给Bolts进行处理。它提供了容错机制,确保即使在故障情况下也能正确处理数据。 6. **容错性和可扩展性**: 整合后的系统具备了...

    kafka-spout:Kafka 消费者将消息作为风暴元组发送

    Storm spout 实现从 kafka 主题读取消息,并将这些消息作为单字段元组发送到风暴拓扑中。 可以在上找到文档。 发展 该实现由荷兰法医研究所创建,仍在开发中。 欢迎投稿,请阅读。 执照 这项工作是根据 Apache 许可...

    Storm流计算项目:1号店电商实时数据分析系统-07.Kafka Java API 简单开发测试.pptx

    我们将学习如何在Storm拓扑中使用KafkaSpout,接收Kafka中的实时流数据。 8. **测试与部署**:如何编写单元测试验证Kafka producer和consumer的正确性,以及如何在生产环境中部署和管理Kafka和Storm集群。 在这个...

Global site tag (gtag.js) - Google Analytics