`

Storm实时读取Kafka

阅读更多

利用Storm的实时处理功能,从Kafka中读取消息,将消息合并后并打印(依次输入hello world .)

Storm版本:1.1.1

Kafka版本:2.12-0.11.0.0

Zookeeper版本:3.4.9

1、Kafka配置

server.properties文件修改

#发布外网ip

advertised.listeners=PLAINTEXT://*.*.*.*:9092

#删除topic

delete.topic.enable=true

 

kafka相关脚本

//启动
./bin/kafka-server-start.sh ./config/server.properties
//创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic words_topic
//发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words_topic

 2、maven工程

新建maven项目,添加pom.xml配置

 

<properties>
	<kafka.version>0.11.0.0</kafka.version>
	<storm.version>1.1.1</storm.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.12</artifactId>
		<version>${kafka.version}</version>
		<exclusions>
			<exclusion>
				<groupId>javax.jms</groupId>
				<artifactId>jms</artifactId>
			</exclusion>
			<exclusion>
				<groupId>com.sun.jdmk</groupId>
				<artifactId>jmxtools</artifactId>
			</exclusion>
			<exclusion>
				<groupId>com.sun.jmx</groupId>
				<artifactId>jmxri</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-kafka</artifactId>
		<version>${storm.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>${storm.version}</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>log4j-over-slf4j</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>commons-collections</groupId>
		<artifactId>commons-collections</artifactId>
		<version>3.2.1</version>
	</dependency>
		<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>15.0</version>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.5.1</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.codehaus.mojo</groupId>
			<artifactId>exec-maven-plugin</artifactId>
			<version>1.2.1</version>
			<executions>
				<execution>
					<goals>
						<goal>exec</goal>
					</goals>
				</execution>
			</executions>
			<configuration>
				<executable>java</executable>
				<includeProjectDependencies>true</includeProjectDependencies>
				<includePluginDependencies>false</includePluginDependencies>
				<classpathScope>compile</classpathScope>
				<mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
			</configuration>
		</plugin>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
				<archive>
					<manifest>
						<mainClass></mainClass>
					</manifest>
				</archive>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
					<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

 创建SentenceBolt

public class SentenceBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9063211371729556973L;

	private List<String> words = new ArrayList<String>();

	OutputCollector _collector;

	@Override
	public void execute(Tuple input) {
		// 获取输入单词
		String word = input.getString(0);
		if (StringUtils.isBlank(word)) {
			return;
		}
		System.out.println("Received Word:" + word);
		// add word to current list of words
		words.add(word);
		if (word.endsWith(".")) { //结束符,打印整条语句
			_collector.emit(ImmutableList.of((Object) StringUtils.join(words, ' ')));
			words.clear();
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		_collector = collector;
	}

}

 创建PrinterBolt

public class PrinterBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9063211371729556973L;

	@Override
	public void execute(Tuple input) {
		String sentence = input.getString(0);
		System.out.println("Received Sentence: " + sentence);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
	}

}

 创建KafkaTopology

public class KafkaTopology {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		// zookeeper hosts for the Kafka cluster
		ZkHosts zkHosts = new ZkHosts("ip:2181");
		
		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7");
		
		kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme) new StringScheme());
		
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
		
		builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
		builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
		//本地模式
		LocalCluster cluster = new LocalCluster();
		Config conf = new Config();
		// Submit topology for execution
		cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
		try {
			System.out.println("Waiting to consume from kafka");
			Thread.sleep(30000);
		} catch (Exception exception) {
			System.out.println("Thread interrupted exception : " + exception);
		}
		// kill the KafkaTopology
		cluster.killTopology("KafkaToplogy");
		// shut down the storm test cluster
		cluster.shutdown();
	}
}

 运行结果

//控制台发送消息
>hello
>world
>.

//结果显示
Waiting to consume from kafka
Received Word:hello
Received Word:world
Received Word:.
Received Sentence: hello world.

 

 

分享到:
评论

相关推荐

    StormStorm集成Kafka 从Kafka中读取数据

    在大数据实时处理领域,Apache Storm与Apache Kafka经常被结合使用,形成高效的数据流处理系统。本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一...

    Storm综合案例二Storm集群向Kafka集群读取数据并写入MySQL远程模式

    今天接上文,来实现一个Storm数据流处理综合案例的第二部分,Storm集群向Kafka集群源源不断读取数据,通过MyBatis写入到MySQL数据库,并部署为远程模式 准备工作 参考上文准备工作 代码编写 思路:Storm集群从...

    使用Storm实时处理交通大数据(数据源:kafka,集群管理:zookeeper).zip

    在交通大数据场景中,spout可能是一个连接到Kafka的消费者,负责读取并分发交通事件;而bolts则执行各种操作,如数据清洗、聚合、过滤或计算速度、流量等交通指标。 Zookeeper在集群管理中的角色至关重要。它是...

    StormStorm集成Kafka 写数据到Kafka

    三、从Kafka中读取数据 整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) ...

    storm-kafka整合代码

    Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 Kafka Spout**:首先,你需要配置 Kafka Spout,指定 Kafka 的...

    storm集成kafka插demo.zip

    6. **实时数据处理**:这个示例可能展示了如何实现实时地从Kafka读取数据,然后进行处理并输出结果,这对于实时监控、实时告警等应用场景非常重要。 7. **开发环境**:为了运行这个示例,开发者可能需要安装JDK、...

    storm之集成kafka操作示例代码.zip

    3. **Storm-Kafka Integration**:为了将Storm与Kafka结合,我们需要使用Storm的Kafka Spout,这是一个特殊的spout,可以从Kafka主题中读取数据。Spout会连接到Kafka集群,订阅特定的主题,并持续地将新产生的消息...

    storm-kafka实时趋势分析

    本文将深入探讨"storm-kafka实时趋势分析"这一主题,讲解这两个技术如何协同工作,以及如何实现通用性强、适应多场景的实时趋势分析。 Storm是由Twitter开源的分布式实时计算系统,它可以持续地处理数据流,提供低...

    storm+kafka源码示例

    Storm是一个实时处理系统,而Kafka则是一个高吞吐量的消息队列。本示例将深入探讨如何结合这两个技术来构建一个实时数据流处理应用。 首先,让我们来了解Apache Storm。Storm是一个开源的分布式实时计算系统,它...

    storm链接kafka时需要的jar包

    `storm-kafka`是Storm的一个扩展,它提供了一个预定义的Spout,允许Storm拓扑从Kafka主题中读取数据。这个JAR包包含了`storm-kafka-client`或`storm-kafka-bolt`等,具体取决于你使用的Storm版本。它实现了Kafka...

    storm+kafka+jdbc整合实例

    在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    《Storm流计算项目:1号店电商实时数据分析系统——storm-kafka详解与实战案例》 在大数据处理领域,实时计算已经成为不可或缺的一部分,特别是在电商行业中,实时数据分析能够帮助企业快速响应市场变化,提高运营...

    Storm整合Kafka

    在Storm中,KafkaSpout是连接到Kafka的特殊Spout,负责从Kafka主题读取数据并分发给Bolts进行处理。它提供了容错机制,确保即使在故障情况下也能正确处理数据。 6. **容错性和可扩展性**: 整合后的系统具备了...

    flume+kafka+storm最完整讲解

    这个实验组合展示了如何构建一个实时数据处理架构,从数据源(通过 Flume)到中间存储(Kafka)再到实时处理(Storm),这对于大数据实时分析和监控场景非常有用。理解并掌握这一流程对于 IT 专业人士来说至关重要,...

    flume,kafka,storm整合

    Flume从源收集数据,将其发送到Kafka的`topic1`,Storm消费者从`topic1`读取数据进行实时处理,然后将处理结果写入`topic2`。这个系统适用于大规模日志收集、实时分析等场景,为大数据处理提供了灵活高效的解决方案...

    kafkastorm:kafka整合storm案例

    Kafka是一种高吞吐量、分布式的发布订阅消息系统,而Storm则是一个实时计算系统,能够处理流数据并进行连续计算。将Kafka与Storm整合,可以构建出强大的实时数据处理管道,这在大数据实时分析、监控、报警等领域有着...

    Kafka+FlumeNG+Storm+HBase构架设计

    通过结合Kafka、FlumeNG、Storm与HBase等关键技术,本架构设计实现了对大数据集的实时处理和历史数据的高效查询。不仅可以支持实时数据处理,还能够有效管理海量数据,为用户提供快速、准确的服务。

    03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar

    3. **Storm与Kafka集成**:学习如何配置和建立Storm topology,以便从Kafka读取数据并进行处理,同时将结果写回到Kafka或其他持久化存储。 4. **实时数据处理**:探讨实时数据分析的挑战和解决方案,如延迟、数据...

    storm实时单词计数

    在这个例子中,spout 可能是从某种消息队列(如Kafka)或者文件系统中读取实时文本数据。 2. **Splitter Bolt**: 这个bolt接收到spout发送的文本数据,然后将文本分割成单个单词。这通常涉及到字符串的处理,如使用...

    kafka-storm.7z

    《Kafka与Storm整合:构建大数据实时处理流水线》 在大数据处理领域,Apache Kafka和Apache Storm都是非常重要的组件。Kafka作为一个高吞吐量、分布式的消息发布订阅系统,常用于实时数据管道,而Storm则是一个...

Global site tag (gtag.js) - Google Analytics