Kafka 与 Storm 整合
原创编写: 王宇
2016-10-27
整合Storm的概念
-
整合场景
A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped.
-
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker - KafkaConfig API
This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows
public KafkaConfig(BrokerHosts hosts, string topic)Hosts − The BrokerHosts can be ZkHosts / StaticHosts.
Topic − topic name -
SpoutConfig API
Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)- Hosts − The BrokerHosts can be any implementation of BrokerHosts interface
- Topic − topic name.
- zkRoot − ZooKeeper root path.
- id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout.
-
SchemeAsMultiScheme (调度)????
- KafkaSpout API
KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the messages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig.
// ZooKeeper connection string
BrokerHosts hosts =newZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig =newSpoutConfig(hosts,
topicName,"/"+ topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout =newKafkaSpout(spoutConfig);
创建 Bolt
- Bolt 接口定义参考《Storm学习笔记》
- 分割统计单词个数的Bolt例子
- SplitBolt.java
import java.util.Map;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.task.TopologyContext;
publicclassSplitBoltimplementsIRichBolt{
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
collector.emit(newValues(word));
}
}
collector.ack(input);
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("word"));
}
@Override
publicvoid cleanup(){}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
- CountBolt.java
import java.util.Map;
import java.util.HashMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.task.TopologyContext;
publicclassCountBoltimplementsIRichBolt{
Map<String,Integer> counters;
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.counters =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str)+1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
提交到 Topology
- Topology概念参考《Storm学习笔记》
- KafkaStormSample.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.Broker;
import org.apache.storm.kafka.StaticHosts;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.KafkaConfig;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.StringScheme;
publicclassKafkaStormSample{
publicstaticvoid main(String[] args)throwsException{
Config config =newConfig();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
String zkConnString ="localhost:2181";
String topic ="my-first-topic";
BrokerHosts hosts =newZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
// kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
版本
- Zookeeper: zookeeper-3.5.2-alpha.tar.gz
- Curator: 2.9.1
- SLF4j: slf4j-1.7.21.tar.gz
- Kafka: kafka_2.11-0.10.1.0
- Storm : apache-storm-1.0.2.tar.gz
- JSON: json-simple-1.1.1.jar
- JDK: 1.8.0
编译
-
依赖包
- Curator
Before moving compilation, Kakfa-Storm integration needs curator ZooKeeper client java library. 这个包的路径,要加到CLASSPATH中
curator-client-2.9.1.jar
curator-framework-2.9.1.jar
- JSON
json-simple-1.1.1.jar - Storm-Kafka
json-simple-1.1.1.jar - Kafka Lib
- Storm Lib
- SLF4J
当CLASSPATH中,包含了Kafka 和Storm 的lib 后, SLF4j会产生冲突,将Kafka中的SLF4j 移除CLASSPATH即可
- Curator
-
编译命令
$ javac *.java
执行
- 开启服务: Zookeeper Kafka Storm
$cd /opt/zookeeper
$.bin/zkServer.sh start
$cd /opt/kafka
$./bin/kafaka-server-start.sh config/server.properties
$ cd /opt/storm
$./bin/storm nimbus
$./bin/storm supervisor
- 创建Topic: “my-first-topic”
$ ./bin/kafktopics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic my-first-topic
$ ./bin/kafktopics.sh --list --zookeeper localhost:2181
- 在Kafka Producer CLI 输入message
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
hello
kafka
storm
spark
test message
anther test message
- 执行例子
$ java KafkaStormSample
- 输出结果
storm :1
test :2
spark :1
another :1
kafka :1
hello :1
message :2
相关推荐
当我们将Kafka与Storm整合时,可以构建强大的实时数据流处理平台。以下是一个基于Java的客户端使用实例,详细解释了如何将这两个工具结合在一起。 首先,理解Kafka的基本概念至关重要。Kafka是由LinkedIn开发并贡献...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
这张图片详细的描述了kafka、和storm的过程。。。。。
本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...
《Kafka与Storm整合:构建大数据实时处理流水线》 在大数据处理领域,Apache Kafka和Apache Storm都是非常重要的组件。Kafka作为一个高吞吐量、分布式的消息发布订阅系统,常用于实时数据管道,而Storm则是一个...
将Kafka与Storm整合,可以构建出强大的实时数据处理管道,这在大数据实时分析、监控、报警等领域有着广泛应用。 Kafka作为消息中间件,它主要负责数据的生产与消费,支持多个消费者组同时消费数据,确保数据的高...
8. **监控与优化**:设置监控系统,监控Kafka、Storm和HBase的运行状态,根据性能指标进行调优。 《中国移动项目架构图.jpg》应该详细描绘了整个系统的架构,包括各个组件之间的交互关系。《cmcc02_hbase.rar》和...
在Netty、Kafka和Storm的整合过程中,JDBC扮演了数据存储的角色。经过Storm实时处理后的数据,会被写入到MySQL数据库中,以便后续的查询和分析。 整合这些技术的具体步骤可能包括以下部分: 1. 使用Netty创建一个...
该文档与教程http://blog.csdn.net/u012185296/article/details/37762793配套使用的,主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去...
在大数据实时处理领域,Apache Storm与Apache Kafka经常被结合使用,形成高效的数据流处理系统。本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一...
在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...
当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...
本文详细介绍了如何将Log4j、Flume与Kafka进行整合,使得日志数据能够从Log4j经由Flume传输至Kafka。这一过程不仅涉及具体的配置细节,还包括了环境准备、测试验证等多个方面,确保了整个系统能够稳定高效地运行。...
标题中的"storm与kafka整合jar包"指的是在Apache Storm和Apache Kafka两个开源项目之间进行数据流处理集成的Java归档(JAR)文件。Apache Storm是一个分布式实时计算系统,而Apache Kafka是一个高吞吐量的分布式发布...
【标题】"03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar" 提供了一个关于实时大数据处理的实战教程,主要聚焦于Apache Storm和Apache Kafka的整合应用。Apache Storm是一个开源的分布式实时计算系统,而Kafka则...
Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档...
为了解决这些问题,我们需要了解Spring Boot的启动方式及配置,并了解如何将Storm整合到Spring Boot中。在整合之前,我们需要了解Spring Boot的相关知识,并且需要了解Storm和Kafka的相关知识。 解决方法 为了解决...
与Hadoop处理批量数据不同,Storm专注于实时数据流的处理,能够快速地处理和响应新产生的数据。 在"zk-kafka-redis-storm安装"过程中,首先需要安装Zookeeper,确保集群的协调和数据一致性。接着是Kafka的部署,...
标题中的"storm与kafka整合的客户端开发实例java源码.zip"表明这是一个关于使用Java语言在Storm和Kafka之间建立集成的案例项目。这个压缩包包含的源代码为我们提供了实现这种集成的具体步骤和方法。 首先,让我们...