kafka storm 安装:
http://knight-black-bob.iteye.com/blog/2343192
15.安装kafka cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz tar xf kafka_2.10-0.10.0.0.tgz ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka /usr/local/zookeeper/bin/zkCli.sh create /kafka '' vim /usr/local/kafka/config/server.properties broker.id=0 zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/ scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/ scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties master slave 启动 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 创建topic /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
16. storm 安装 cd /usr/local/ wget http://mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz tar xf apache-storm-0.10.0.tar.gz ln -s /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R root:root /usr/local/apache-storm-0.10.0 /usr/local/storm mkdir -p /tmp/storm/data/ cd storm vim conf/storm.yaml storm.zookeeper.servers: - "dev10.aoiplus.openpf" - "dev05.aoiplus.openpf" - "dev06.aoiplus.openpf" storm.zookeeper.port: 2181 nimbus.host: "dev10.aoiplus.openpf" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/tmp/storm/data" scp -r /usr/local/storm/conf/storm.yaml root@dev06.aoiplus.openpf:/usr/local/storm/conf/ scp -r /usr/local/storm/conf/storm.yaml root@dev05.aoiplus.openpf:/usr/local/storm/conf/ 启动 master /usr/local/storm/bin/storm nimbus >/dev/null 2>&1 & /usr/local/storm/bin/storm ui >/dev/null 2>&1 & slaves /usr/local/storm/bin/storm supervisor >/dev/null 2>&1 & 查看 http://dev10.aoiplus.openpf/index.html cp /usr/local/kafka/libs/kafka_2.10-0.10.0.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/scala-library-2.10.6.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/snappy-java-1.1.2.4.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/zkclient-0.8.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/slf4j-api-1.7.21.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/ /usr/local/storm/bin/storm jar /home/baoy/soft/storm/KafkaStormJavaDemo_main_start.jar com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology "terminalInfosAnalysisTopology" mkdir -p /home/baoy/soft/storm/logs chmod -R 777 /home/baoy/soft/storm/logs cd /usr/local/storm/log4j2/ vim cluster.xml <property name="logpath">/home/baoy/soft/storm</property> 关闭 storm /usr/local/storm/bin/storm kill terminalInfosAnalysisTopology
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.curiousby.baoy.cn</groupId> <artifactId>KafkaStormJavaDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SpringKafkaStormDemo</name> <url>http://maven.apache.org</url> <!-- properties constant --> <properties> <spring.version>4.2.5.RELEASE</spring.version> <java.version>1.7</java.version> </properties> <repositories> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <finalName>SpringKafkaStormDemo</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <dependencies> <dependency> <groupId>org.codehaus.plexus</groupId> <artifactId>plexus-compiler-javac</artifactId> <version>2.5</version> </dependency> </dependencies> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> <compilerArguments> <verbose /> <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_main_start</finalName> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
package com.curiousby.baoyou.cn.storm; import java.util.UUID; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; /** * @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology * @Type TerminalInfosAnalysisTopology.java * @Desc * @author cmcc-B100036 * @date 2016年12月15日 下午4:54:50 * @version */ public class TerminalInfosAnalysisTopology { private static String topicName = "baoy-topic"; private static String zkRoot = "" ; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts( "172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka"); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString()); spoutConfig.forceFromStart= false; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", kafkaSpout); builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),2).shuffleGrouping("kafkaSpout"); builder.setBolt("terminalInfosAnalysisElasticsearchBolt", new TerminalInfosAnalysisElasticsearchBolt(), 2).shuffleGrouping("kafkaSpout"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(2); try { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { conf.setMaxSpoutPending(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology()); } } }
public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt { private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class); private OutputCollector collector; @Override public void execute(Tuple tuple) { JSONObject formate = TerminalInfos.formate(tuple.getString(0)); TerminalInfos entity = new TerminalInfos(); entity.formate(formate); if (entity != null) { System.out.println(entity); logger.info("==========================================================="); logger.info("========================="+entity+"========================="); logger.info("==========================================================="); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
使用 kafka 客户端 定时发送
spring-kafka 客户端 服务器端 下载 http://knight-black-bob.iteye.com/blog/2344424
kafka-storm 及时处理
这里面 我使用的是本地模式
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
"kafka-storm-starter-develop" 项目则为开发者提供了一个入门级的框架,帮助他们快速上手这两个工具的集成开发。 ### Kafka 简介 Kafka 是由 LinkedIn 开发并开源的一个分布式消息系统,后来成为 Apache 的顶级...
《Kafka与Storm整合:构建大数据实时处理流水线》 在大数据处理领域,Apache Kafka和Apache Storm都是非常重要的组件。Kafka作为一个高吞吐量、分布式的消息发布订阅系统,常用于实时数据管道,而Storm则是一个...
在大数据处理领域,Flume、Kafka和Storm是三个非常重要的工具,它们分别负责数据采集、消息中间件和实时流处理。"flume-kafka-storm源程序"这个压缩包很可能是包含这三个组件的集成示例或者源代码,用于帮助开发者...
在IT行业中,流处理组件是大数据处理的关键技术之一,它涉及到多个开源项目,如Zookeeper(zk)、Kafka、Redis和Storm。这些组件各有其独特的功能,并常常协同工作以实现高效的数据管理和处理。 Zookeeper(动物园...
storm+kafka jar包 ,curator-client-2.8.0、curator-framework-2.8.0、curator-recipes-2.8.0、guava-18.0、kafka_2.9.2-0.8.2.2、metrics-core-2.2.0、scala-library-2.10.4、storm-kafka-0.9.2-incubating、...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
##下载源码git clonekafka-log-appender:将日志内容写到kafka程序log-kafka-storm:docker-compose脚本和storm程序##准备docker环境###启动dockerdocker-compose环境搭建过程请查看我的进入log-kafka-storm目录,...
storm-kafka-0.9.4.jar
风暴卡夫卡0.8加测试根据以下信息提供的storm-kafka-0.8-plus测试项目: [ ]( ) [ ]( ) [ ]( ) [ ]( ) 还包含尝试基于[Hazelcast]的三叉戟状态的示例实现( ) ##使用环境设置如果您使用的是Mac,请按照的...
kafka-storm-hdfs这个项目有3个小项目分别是kafka 到 storm ; storm 到 hdfs ; kafka ~ storm ~ hdfsstorm 到hdfs 需要手动添加这几个类到storm的 lib 下commons-cli-1.2.jarcommons-collections-3.2.1....
《构建实时大数据处理流水线:flume-kafka-storm-drools-example详解》 在大数据领域,实时数据处理是至关重要的。"flume-kafka-storm-drools-example"项目提供了一个综合示例,展示了如何利用Apache Flume、Apache...
标题 "apache-storm-2.4.0.tar.gz" 指的是 Apache Storm 的特定版本,即 2.4.0 版本的源码或二进制包,通常以 tar.gz 格式打包,这是一种常见的在 Linux 和类 Unix 系统上使用的归档和压缩格式。这个压缩包包含了...
本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...
获取到文件名称 : apache-storm-0.9.2-incubating.tar.gz 获取到文件名称 : KafKa+Storm资料加源码安装包.7z 获取到文件名称 : kafka-manager-1.0-SNAPSHOT.zip 获取到文件名称 : kafka_2.10-0.9.0.1.tgz.gz ...
kafka-storm-metrics 这是一个 kafka-storm 项目,用于消费来自 kafka 生产者的数据并收集风暴方面的指标/数据。 该项目的目的是将您的 Storm 应用程序与我们的监控系统集成,并开始从我们的拓扑跟踪应用程序级指标...
简单的 kafka-storm-java 这是一个使用 Kafka、Storm 和 Storm Trident 的简单示例。 注意:您的本地 Maven 存储库中应该有最新版本的 Storm。 您可以通过在终端中执行来做到这一点: git clone ...
Kafka+FlumeNG+Storm+HBase实时处理系统介绍
kafka-storm 0.9版本 pom.xml
Storm-code 目录包含一个用于部署和测试的简单风暴拓扑。 有关该项目的更多详细信息... 用法 vagrant up 等待 ... 验证 worker 和 nimbus 已启动 vagrant ssh kafka cd 到 kafka 目录 bin/kafka-topics.sh