`
knight_black_bob
  • 浏览: 850396 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka-storm 详细

阅读更多

  

 

 

 

 

kafka storm 安装:

http://knight-black-bob.iteye.com/blog/2343192

 

15.安装kafka
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
tar xf kafka_2.10-0.10.0.0.tgz
ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka

/usr/local/zookeeper/bin/zkCli.sh
create /kafka ''

vim /usr/local/kafka/config/server.properties
broker.id=0
zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka

scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/

scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties

master slave 启动
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
创建topic
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic


/usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic

  

 

16. storm 安装
cd /usr/local/
wget http://mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
tar xf apache-storm-0.10.0.tar.gz
ln -s /usr/local/apache-storm-0.10.0 /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.10.0 /usr/local/storm
chown -R root:root /usr/local/apache-storm-0.10.0 /usr/local/storm

mkdir -p /tmp/storm/data/

cd storm
vim conf/storm.yaml
	storm.zookeeper.servers:
	     - "dev10.aoiplus.openpf"
	     - "dev05.aoiplus.openpf"
	     - "dev06.aoiplus.openpf"
	storm.zookeeper.port: 2181

	nimbus.host: "dev10.aoiplus.openpf"

	supervisor.slots.ports:
	    - 6700
	    - 6701
	    - 6702
	    - 6703

	storm.local.dir: "/tmp/storm/data"

scp -r /usr/local/storm/conf/storm.yaml root@dev06.aoiplus.openpf:/usr/local/storm/conf/
scp -r /usr/local/storm/conf/storm.yaml root@dev05.aoiplus.openpf:/usr/local/storm/conf/

启动
master
/usr/local/storm/bin/storm nimbus >/dev/null 2>&1 & 
/usr/local/storm/bin/storm ui >/dev/null 2>&1 & 
slaves
/usr/local/storm/bin/storm supervisor >/dev/null 2>&1 &
查看
http://dev10.aoiplus.openpf/index.html



cp /usr/local/kafka/libs/kafka_2.10-0.10.0.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.10.6.jar /usr/local/storm/lib/ 
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.1.2.4.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.8.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.21.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/




/usr/local/storm/bin/storm jar /home/baoy/soft/storm/KafkaStormJavaDemo_main_start.jar com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology  "terminalInfosAnalysisTopology" 

mkdir -p /home/baoy/soft/storm/logs
chmod -R 777 /home/baoy/soft/storm/logs

cd /usr/local/storm/log4j2/
vim cluster.xml
<property name="logpath">/home/baoy/soft/storm</property>   

关闭 storm
/usr/local/storm/bin/storm kill terminalInfosAnalysisTopology

 



 

 

 

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.curiousby.baoy.cn</groupId>
  <artifactId>KafkaStormJavaDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>SpringKafkaStormDemo</name>
  <url>http://maven.apache.org</url>



	<!-- properties constant -->
	<properties>
		<spring.version>4.2.5.RELEASE</spring.version>
		  <java.version>1.7</java.version>
	</properties>

	<repositories>
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>
 

		
		
	<dependencies> 
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20160810</version>
		</dependency>
		
		<!-- <dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.4</version>
			<type>jar</type>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.4</version>
			<type>jar</type>
		</dependency> -->
	 <dependency>  
         <groupId>org.apache.storm</groupId>  
         <artifactId>storm-core</artifactId>  
        <version>0.9.6</version>  
         <scope>provided</scope>  
     </dependency>  
     <dependency>  
        <groupId>org.apache.storm</groupId>  
         <artifactId>storm-kafka</artifactId>  
         <version>0.9.6</version>  
     </dependency>  
     <dependency>  
         <groupId>org.apache.kafka</groupId>  
         <artifactId>kafka_2.10</artifactId>  
         <version>0.9.0.1</version>  
             <exclusions>  
                 <exclusion>  
                     <groupId>org.apache.zookeeper</groupId>  
                     <artifactId>zookeeper</artifactId>  
                </exclusion>  
                 <exclusion>  
                     <groupId>log4j</groupId>  
                     <artifactId>log4j</artifactId>  
                 </exclusion>  
                <exclusion>  
                     <groupId>org.slf4j</groupId>  
                    <artifactId>slf4j-log4j12</artifactId>  
                </exclusion>  
             </exclusions>  
     </dependency>  
	 
 


	</dependencies>
	<build>
		<finalName>SpringKafkaStormDemo</finalName>
		 
		<plugins> 
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
				<dependencies>
					<dependency>
						<groupId>org.codehaus.plexus</groupId>
						<artifactId>plexus-compiler-javac</artifactId>
						<version>2.5</version>
					</dependency>
				</dependencies>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<encoding>UTF-8</encoding>
					<compilerArguments>
						<verbose />
						<bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>
					</compilerArguments>
				</configuration>
			</plugin>
		 
			<plugin>
			    <groupId>org.apache.maven.plugins</groupId>
			    <artifactId>maven-shade-plugin</artifactId>
			    <version>1.4</version>
			    <configuration>
			        <appendAssemblyId>false</appendAssemblyId> 
				   <finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName>
			        <createDependencyReducedPom>true</createDependencyReducedPom>
			    </configuration>
			    <executions>
			        <execution>
			            <phase>package</phase>
			            <goals>
			                <goal>shade</goal>
			            </goals>
			            <configuration>
			                <transformers>
			                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
			                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
			                        <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
			                    </transformer>
			                </transformers>
			            </configuration>
			        </execution>
			    </executions>
			</plugin>
			<plugin>  
              <artifactId>maven-assembly-plugin</artifactId>  
              <configuration>  
                   <appendAssemblyId>false</appendAssemblyId> 
				   <finalName>${project.artifactId}_main_start</finalName>
                  <descriptorRefs>    
                      <descriptorRef>jar-with-dependencies</descriptorRef>  
                  </descriptorRefs>  
                  <archive>  
                     <manifest>  
                       <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>  
                     </manifest>  
                   </archive>  
               </configuration>  
            </plugin> 
			
		</plugins>
	</build>
</project>
 

 

 

 

package com.curiousby.baoyou.cn.storm;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

/**
 * @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology
 * @Type TerminalInfosAnalysisTopology.java
 * @Desc 
 * @author cmcc-B100036
 * @date 2016年12月15日 下午4:54:50
 * @version 
 */
public class TerminalInfosAnalysisTopology {

    private static String topicName = "baoy-topic";
    private static String zkRoot = "" ;

    public static void main(String[] args) {
        BrokerHosts hosts = new ZkHosts(
                "172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka");
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
        spoutConfig.forceFromStart= false;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),2).shuffleGrouping("kafkaSpout");  
        builder.setBolt("terminalInfosAnalysisElasticsearchBolt", new TerminalInfosAnalysisElasticsearchBolt(), 2).shuffleGrouping("kafkaSpout");  
        
         
        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(2);
            try {
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf,  builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else { 
            conf.setMaxSpoutPending(2);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology());

        }

    }
}

 

 

public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt {

    private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class);
    private OutputCollector collector;
    
    @Override
    public void execute(Tuple tuple) { 
        JSONObject formate = TerminalInfos.formate(tuple.getString(0)); 
        TerminalInfos entity = new TerminalInfos();
        entity.formate(formate);  
        if (entity != null) {
            System.out.println(entity);
            logger.info("===========================================================");
            logger.info("========================="+entity+"=========================");
            logger.info("===========================================================");
        } 
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

 

 

使用 kafka 客户端 定时发送 

 

spring-kafka 客户端 服务器端 下载 http://knight-black-bob.iteye.com/blog/2344424



 

kafka-storm 及时处理 



 

 

这里面 我使用的是本地模式

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 谢谢您的赞助,我会做的更好!

 

 

 

 

 

 

  • 大小: 29.1 KB
  • 大小: 7 KB
  • 大小: 29.2 KB
  • 大小: 5.4 KB
  • 大小: 6.2 KB
  • 大小: 118.6 KB
1
4
分享到:
评论

相关推荐

    kafka-storm-starter-develop

    "kafka-storm-starter-develop" 项目则为开发者提供了一个入门级的框架,帮助他们快速上手这两个工具的集成开发。 ### Kafka 简介 Kafka 是由 LinkedIn 开发并开源的一个分布式消息系统,后来成为 Apache 的顶级...

    kafka-storm.7z

    《Kafka与Storm整合:构建大数据实时处理流水线》 在大数据处理领域,Apache Kafka和Apache Storm都是非常重要的组件。Kafka作为一个高吞吐量、分布式的消息发布订阅系统,常用于实时数据管道,而Storm则是一个...

    flume-kafka-storm源程序

    在大数据处理领域,Flume、Kafka和Storm是三个非常重要的工具,它们分别负责数据采集、消息中间件和实时流处理。"flume-kafka-storm源程序"这个压缩包很可能是包含这三个组件的集成示例或者源代码,用于帮助开发者...

    zk-kafka-redis-storm安装

    在IT行业中,流处理组件是大数据处理的关键技术之一,它涉及到多个开源项目,如Zookeeper(zk)、Kafka、Redis和Storm。这些组件各有其独特的功能,并常常协同工作以实现高效的数据管理和处理。 Zookeeper(动物园...

    storm+kafka

    storm+kafka jar包 ,curator-client-2.8.0、curator-framework-2.8.0、curator-recipes-2.8.0、guava-18.0、kafka_2.9.2-0.8.2.2、metrics-core-2.2.0、scala-library-2.10.4、storm-kafka-0.9.2-incubating、...

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    log-analysis-kafka-storm-docker:在Docker中运行的Kafka,Storm,Zookeeper和Openfire

    ##下载源码git clonekafka-log-appender:将日志内容写到kafka程序log-kafka-storm:docker-compose脚本和storm程序##准备docker环境###启动dockerdocker-compose环境搭建过程请查看我的进入log-kafka-storm目录,...

    storm-kafka-0.9.4.jar

    storm-kafka-0.9.4.jar

    storm-kafka-0.8-plus-test:Storm-kafka-0.8-plus的简单测试项目

    风暴卡夫卡0.8加测试根据以下信息提供的storm-kafka-0.8-plus测试项目: [ ]( ) [ ]( ) [ ]( ) [ ]( ) 还包含尝试基于[Hazelcast]的三叉戟状态的示例实现( ) ##使用环境设置如果您使用的是Mac,请按照的...

    kafka-storm-hdfs

    kafka-storm-hdfs这个项目有3个小项目分别是kafka 到 storm ; storm 到 hdfs ; kafka ~ storm ~ hdfsstorm 到hdfs 需要手动添加这几个类到storm的 lib 下commons-cli-1.2.jarcommons-collections-3.2.1....

    apache-storm-2.4.0.tar.gz

    标题 "apache-storm-2.4.0.tar.gz" 指的是 Apache Storm 的特定版本,即 2.4.0 版本的源码或二进制包,通常以 tar.gz 格式打包,这是一种常见的在 Linux 和类 Unix 系统上使用的归档和压缩格式。这个压缩包包含了...

    flume-kafka-storm-drools-example

    《构建实时大数据处理流水线:flume-kafka-storm-drools-example详解》 在大数据领域,实时数据处理是至关重要的。"flume-kafka-storm-drools-example"项目提供了一个综合示例,展示了如何利用Apache Flume、Apache...

    flume,kafka,storm整合

    本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...

    KafKa+Storm资料加源码安装包.7z

    获取到文件名称 : apache-storm-0.9.2-incubating.tar.gz 获取到文件名称 : KafKa+Storm资料加源码安装包.7z 获取到文件名称 : kafka-manager-1.0-SNAPSHOT.zip 获取到文件名称 : kafka_2.10-0.9.0.1.tgz.gz ...

    kafka-storm-metrics:向graphitegrafana报告风暴拓扑内置指标

    kafka-storm-metrics 这是一个 kafka-storm 项目,用于消费来自 kafka 生产者的数据并收集风暴方面的指标/数据。 该项目的目的是将您的 Storm 应用程序与我们的监控系统集成,并开始从我们的拓扑跟踪应用程序级指标...

    simple-kafka-storm-java

    简单的 kafka-storm-java 这是一个使用 Kafka、Storm 和 Storm Trident 的简单示例。 注意:您的本地 Maven 存储库中应该有最新版本的 Storm。 您可以通过在终端中执行来做到这一点: git clone ...

    Kafka+FlumeNG+Storm+HBase

    Kafka+FlumeNG+Storm+HBase实时处理系统介绍

    kafka0.8 storm0.9 pom.xml

    kafka-storm 0.9版本 pom.xml

    kafka-storm:通过storm打印kafka消息的测试环境和代码

    Storm-code 目录包含一个用于部署和测试的简单风暴拓扑。 有关该项目的更多详细信息... 用法 vagrant up 等待 ... 验证 worker 和 nimbus 已启动 vagrant ssh kafka cd 到 kafka 目录 bin/kafka-topics.sh

Global site tag (gtag.js) - Google Analytics