`
knight_black_bob
  • 浏览: 862442 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

storm kafka hdfs 详细

阅读更多

 




 
 

package com.curiousby.baoyou.cn.storm;

import java.util.UUID;

import org.apache.storm.hdfs.bolt.HdfsBolt;
 
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; 

/**
 * @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology
 * @Type TerminalInfosAnalysisTopology.java
 * @Desc 
 * @author cmcc-B100036
 * @date 2016年12月15日 下午4:54:50
 * @version 
 */
public class TerminalInfosAnalysisTopology {

    private static String topicName = "baoy-topic";
    private static String zkRoot = "/kafka" ;

    public static void main(String[] args) {
        BrokerHosts hosts = new ZkHosts(
                "172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka");
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
        spoutConfig.forceFromStart= false;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        //spoutConfig.socketTimeoutMs=60; 
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
 
       RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("\r\n"); 
        SyncPolicy syncPolicy = new CountSyncPolicy(2); 
        FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.HOURS);  
        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/user/hadoop/storm/").withPrefix("terminalInfo_").withExtension(".log");  
         HdfsBolt hdfsBolt = new HdfsBolt()
                 .withFsUrl("hdfs://172.23.27.120:9000/")
                 .withFileNameFormat(fileNameFormat).withRecordFormat(format)
                 .withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);

         
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("terminalInfosAnalysisIsValidBolt", new TerminalInfosAnalysisIsValidBolt(),1).shuffleGrouping("kafkaSpout");  
        builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");  
        builder.setBolt("terminalInfosAnalysisHdfsReportBolt", new TerminalInfosAnalysisHdfsReportBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");  
        builder.setBolt("terminalInfo", hdfsBolt,1).fieldsGrouping("terminalInfosAnalysisHdfsReportBolt",new Fields("hdfs-terminalinfo"));  
        // builder.setBolt("terminalInfosAnalysisHdfsBolt", new TerminalInfosAnalysisHdfsBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");  
        
         
        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            try {
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf,  builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else { 
            conf.setMaxSpoutPending(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology());

        }

    }
}

 

public class TerminalInfosAnalysisIsValidBolt extends BaseRichBolt {

    private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisIsValidBolt.class);
    private OutputCollector collector;
    
    @Override
    public void execute(Tuple tuple) {

        System.out.println(tuple.size());
        logger.info("============================TerminalInfosAnalysisIsValidBolt execute===============================");
        for (int i = 0; i < tuple.size(); i++) {
            JSONObject formate = TerminalInfos.formate(tuple.getString(i));
            TerminalInfos entity = new TerminalInfos();
            entity.formate(formate);
            if (entity != null && entity.isValid()) {
                System.out.println(entity);
                collector.emit(tuple, new Values(entity));
                collector.ack(tuple); 
            }
        }
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("after_isvalid"));  
    }

}

 

public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt {

    private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class);
    private OutputCollector collector;
    JedisPool pool; 
    
    
    @Override
    public void execute(Tuple tuple) {  
        Jedis jedis = pool.getResource(); 
        logger.info("============================TerminalInfosAnalysisRedisBolt execute===============================");
        for (int i = 0; i < tuple.size(); i++) {
            TerminalInfos entity = (TerminalInfos) tuple.getValue(i);
            TerminalInfoHeader tih = entity.getTerminalInfoHeader(); 
            String key = tih.getAppId()+"-"+tih.getDeviceToken();
            String value = jedis.get(key);
            if (value == null || "".equals(value)) {
                // 
                jedis.set( key,  JSON.toJSONString(tih));
                // insert es all infos
                
            }else{
                //update  es lastupdatetime 
            }
            
            
            
        }
         
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        logger.info("============================redis prepare===============================");
        this.collector = collector;
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxActive(1000);
        config.setMaxIdle(50);
        config.setMaxWait(1000l);
        config.setTestOnBorrow(false);
        this.pool = new JedisPool(config, "172.23.27.120", 6379); 
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

 

public class TerminalInfosAnalysisHdfsReportBolt extends BaseRichBolt {

    private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisHdfsReportBolt.class);
    private OutputCollector collector;  
    
    
    
    @Override
    public void execute(Tuple tuple) {  
        logger.info("============================TerminalInfosAnalysisHdfsReportBolt execute===============================");
       for (int i = 0; i < tuple.size(); i++) {
           TerminalInfos entity = (TerminalInfos) tuple.getValue(i); 
           TerminalInfoHeader tih = entity.getTerminalInfoHeader();
           StringBuffer sb = new StringBuffer();
           sb.append(tih.getAppId()).append(",");
           sb.append(tih.getDeviceMac()).append(",");
           sb.append(tih.getDeviceId()).append(",");
           sb.append(tih.getDeviceToken()).append(",");
           sb.append(tih.getDeviceImsi()).append(",");
           sb.append(tih.getDeviceModel()).append(",");
           sb.append(tih.getDeviceManufacture()).append(",");
           sb.append(tih.getChannel()).append(",");
           sb.append(tih.getAppKey()).append(",");
           sb.append(tih.getUserId()).append(",");
           sb.append(tih.getAppVersion()).append(",");
           sb.append(tih.getVersionCode()).append(","); 
           sb.append(tih.getSdkType()).append(",");
           sb.append(tih.getOs()).append(",");
           sb.append(tih.getCountry()).append(",");
           sb.append(tih.getLanguage()).append(",");
           sb.append(tih.getTimezone()).append(",");
           sb.append(tih.getResolution()).append(",");
           sb.append(tih.getAccess()).append(",");
           sb.append(tih.getAccessSubtype()).append(",");
           sb.append(tih.getCarrier()).append(",");
           sb.append(tih.getCpu());
           collector.emit(tuple, new Values("hdfs-terminalinfo",sb.toString()));
           collector.ack(tuple); 
           
       }
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
        this.collector = collector; 
      
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("hdfs-terminalinfo", "record"));
    }

}

 

 



 

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.curiousby.baoy.cn</groupId>
  <artifactId>KafkaStormJavaDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>SpringKafkaStormDemo</name>
  <url>http://maven.apache.org</url>



	<!-- properties constant -->
	<properties>
		<spring.version>4.2.5.RELEASE</spring.version>
		  <java.version>1.7</java.version>
		  <log4j.version>1.2.17</log4j.version>
	</properties>

	<repositories>
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>
 

	 
		
		
	<dependencies> 
	 
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>0.9.4</version>
             <scope>provided</scope>
             <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>log4j-over-slf4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>

        </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-kafka</artifactId>
             <version>0.9.4</version>
             <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-hdfs</artifactId>
             <version>0.9.4</version>
              <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
         </dependency>
         


         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>
             <version>0.8.2.1</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
                     <artifactId>zookeeper</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion> 
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion> 
             </exclusions>
         </dependency>
	
	
		<!-- json start -->	
		<dependency>
		  <groupId>com.googlecode.json-simple</groupId>
		  <artifactId>json-simple</artifactId>
		  <version>1.1.1</version>
		</dependency>
		<!-- JSON转化 -->
		<dependency>
			<groupId>org.codehaus.jackson</groupId>
			<artifactId>jackson-mapper-asl</artifactId>
			<version>1.9.13</version>
		</dependency>
		<!-- JSON库 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.1.23</version>
		</dependency>
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20160810</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.2.3</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-annotations</artifactId>
			<version>2.2.3</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.2.3</version>
		</dependency>
		<!--  <dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.4</version>
			<type>jar</type>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.4</version>
			<type>jar</type>
		</dependency>  --> 
		
		
		 <!-- Other Dependency -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.2.0</version>
		</dependency>
		
		<!-- hdfs start -->
 		<!-- 
 		<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>2.2.0</version>
        </dependency>
        -->
		
		
	 <!-- <dependency>  
         <groupId>org.apache.storm</groupId>  
         <artifactId>storm-core</artifactId>  
        <version>0.9.5</version>  
         <scope>provided</scope>  
     </dependency>  
     <dependency>  
        <groupId>org.apache.storm</groupId>  
         <artifactId>storm-kafka</artifactId>  
         <version>0.9.5</version>  
     </dependency>  
     <dependency>  
         <groupId>org.apache.kafka</groupId>  
         <artifactId>kafka_2.10</artifactId>  
         <version>0.9.0.1</version>  
             <exclusions>  
                 <exclusion>  
                     <groupId>org.apache.zookeeper</groupId>  
                     <artifactId>zookeeper</artifactId>  
                </exclusion>  
                 <exclusion>  
                     <groupId>log4j</groupId>  
                     <artifactId>log4j</artifactId>  
                 </exclusion>  
                <exclusion>  
                     <groupId>org.slf4j</groupId>  
                    <artifactId>slf4j-log4j12</artifactId>  
                </exclusion>  
             </exclusions>  
     </dependency>  
     --> 
	 
 		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.7</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>


	</dependencies>
	<build>
		<finalName>SpringKafkaStormDemo</finalName>
		 
		<plugins> 
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
				<dependencies>
					<dependency>
						<groupId>org.codehaus.plexus</groupId>
						<artifactId>plexus-compiler-javac</artifactId>
						<version>2.5</version>
					</dependency>
				</dependencies>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<encoding>UTF-8</encoding>
					<compilerArguments>
						<verbose />
						<bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>
					</compilerArguments>
				</configuration>
			</plugin>
		 
			<plugin>
			    <groupId>org.apache.maven.plugins</groupId>
			    <artifactId>maven-shade-plugin</artifactId>
			    <version>1.4</version>
			    <configuration>
			        <appendAssemblyId>false</appendAssemblyId> 
				   <finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName>
			        <createDependencyReducedPom>true</createDependencyReducedPom>
			    </configuration>
			    <executions>
			        <execution>
			            <phase>package</phase>
			            <goals>
			                <goal>shade</goal>
			            </goals>
			            <configuration>
			                <transformers>
			                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
			                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
			                        <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
			                    </transformer>
			                </transformers>
			            </configuration>
			        </execution>
			    </executions>
			</plugin>
			<plugin>  
              <artifactId>maven-assembly-plugin</artifactId>  
              <configuration>  
                   <appendAssemblyId>false</appendAssemblyId> 
				   <finalName>${project.artifactId}_main_start</finalName>
                  <descriptorRefs>    
                      <descriptorRef>jar-with-dependencies</descriptorRef>  
                  </descriptorRefs>  
                  <archive>  
                     <manifest>  
                       <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>  
                     </manifest>  
                   </archive>  
               </configuration>  
            </plugin> 
			
		</plugins>
	</build>
</project>
 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 谢谢您的赞助,我会做的更好!

 

 

 

 

 

  • 大小: 49.2 KB
  • 大小: 55.6 KB
1
0
分享到:
评论

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    在这个过程中,文档《Twitter Storm系列》flume-ng+Kafka+Storm+HDFS 实时系统搭建.docx和《安装扎记.pdf》将提供详细的步骤指导和常见问题解决方案,帮助你顺利完成整个系统的搭建和优化。 总的来说,LNMP与实时大...

    kafka-storm-hdfs

    kafka-storm-hdfs这个项目有3个小项目分别是kafka 到 storm ; storm 到 hdfs ; kafka ~ storm ~ hdfsstorm 到hdfs 需要手动添加这几个类到storm的 lib 下commons-cli-1.2.jarcommons-collections-3.2.1....

    storm-kafka-hdfs-starter

    标题中的"storm-kafka-hdfs-starter"是一个用于整合Apache Storm、Apache Kafka和Apache Hadoop HDFS的项目,主要用于处理实时数据流。这个项目的主要目的是为开发者提供一个起点,帮助他们快速搭建一个从Kafka消费...

    利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka

    在这个场景中,Flume将数据推送到Kafka,使得数据可以实时地被其他消费者(如Storm、Spark Streaming等)处理。 5. **wlslog.sql**: 这个文件可能是SQL脚本,用于创建或操作MySQL中的表结构,以配合Flume的数据抽取...

    2017零基础学云计算大数据视频教程hadoop storm kafka spark开发

    本套视频教程主要针对2017年时的云计算与大数据技术进行讲解,重点涵盖了Hadoop、Storm、Kafka和Spark等核心组件的开发与应用。对于初学者来说,这是一份非常有价值的资源,旨在帮助他们从零基础快速建立起对大数据...

    大数据架构:flume-ng+Kafka+Storm+HDFS实时系统组合

    直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边...

    flume及kafka及storm搭建.rar

    Sources负责从各种数据源如Web服务器日志、syslog等获取数据,Channels作为临时存储,确保数据在处理过程中的可靠性,而Sinks则负责将数据发送到目标位置,如HDFS、HBase或Kafka。 2. Kafka:Kafka是由LinkedIn开发...

    2017零基础学云计算大数据视频教程hadoop storm kafka spark开发(重发)

    综上所述,这份视频教程涵盖了云计算和大数据领域的核心技术和工具,包括Hadoop、Storm、Kafka以及Spark等,适合初学者入门学习。通过系统地学习这些技术,可以帮助学习者掌握大数据处理的基本原理和方法,为进一步...

    Kafka+FlumeNG+Storm+HBase构架设计

    本文将详细介绍如何利用Kafka、FlumeNG、Storm与HBase搭建一套高效的数据处理系统。该系统旨在实现以下目标: - 实时处理任意规模的数据集。 - 支持多种类型的处理操作。 - 结合多种技术和工具,构建一个全方位的大...

    flume+kafka+storm搭建

    本文将详细介绍如何利用Flume、Kafka和Storm搭建一个大数据消息平台。 首先,我们来了解Flume。Flume是Cloudera公司提供的一款分布式、可靠且高可用的海量日志采集、聚合和传输的系统。它允许你定制数据发送方,...

    flume-kafka-storm源程序

    在大数据处理领域,Flume、Kafka和Storm是三个非常重要的工具,它们分别负责数据采集、消息中间件和实时流处理。"flume-kafka-storm源程序"这个压缩包很可能是包含这三个组件的集成示例或者源代码,用于帮助开发者...

    使用Storm实时处理交通大数据(数据源:kafka,集群管理:zookeeper).zip

    标题中的“使用Storm实时处理交通大数据(数据源:kafka,集群管理:zookeeper)”表明这个项目涉及了三个核心的技术:Apache Storm、Apache Kafka以及Apache ZooKeeper,这些都是大数据处理和分布式系统中的关键...

    kafka课件.rar

    7. **Kafka与大数据生态集成**:如Hadoop(MapReduce、HDFS)、Flume的数据集成,以及Spark、Storm等实时处理系统的配合使用。 8. **实战案例**:可能包含一些实际场景的应用示例,如日志收集、监控数据流、实时...

    storm深入学习.pdf

    Storm深入学习 ...3. 灵活性:Storm可以与各种数据源集成,例如Kafka、HDFS等。 本文档详细讲解了Storm的安装、配置和使用,涵盖了Storm的基本概念、安装过程、配置文件的解释和Storm的应用场景。

    BigDataLearning:Spark、Hadoop、Flink、Storm、Kafka编程实例学习

    例如,Hadoop HDFS作为数据存储,Spark或Flink进行数据处理,Storm用于实时计算,Kafka则作为数据流的中间件。通过学习这些工具的源码,可以深入了解它们的内部机制和优化策略。在"BigDataLearning-master"这个...

    一、Kafka简介.docx

    Kafka在大数据生态系统中占有重要位置,与其他工具如Flume(数据收集)、HDFS(分布式存储)、HBase(分布式数据库)、Hive(数据分析)和Zookeeper(分布式协调)等一起构建起复杂的数据处理流程。通过这样的集成,...

    关于大数据的面试题,包括hadoop、hbase、hive、spark、storm、zookeeper、kafka、.zip

    本篇文章将深入探讨在面试中可能会遇到的一些核心知识点,主要涵盖Hadoop、HBase、Hive、Spark、Storm、Zookeeper以及Kafka。 1. **Hadoop**: Hadoop是Apache基金会的一个开源项目,是大数据处理的基础框架,其...

    大数据组件Kafka讲解

    - **Hadoop**:Kafka可以与Hadoop生态系统无缝集成,如HDFS和HBase,用于大数据分析。 - **Storm/Typhoon**:结合实时计算框架,实现流式数据处理。 - **Spark**:Spark可以作为Kafka的消费者,进行实时数据处理...

Global site tag (gtag.js) - Google Analytics