`

实时分析数据分析平台——storm

 
阅读更多

 storm集群和hadoop集群类似,在hadoop上运行mapreduce任务,而在storm上称为topologies任务,两种任务之间有区别,典型的一点是mapreduce任务最后会结束,而topologies不会结束。

 

一个storm集群包含两种节点,master节点和worker节点,其中master节点运行一个daemon进程("Nimbus"),在hadoop中称为JobTracker,Nimbus负责向集群分发代码,分配任务,处理错误。

每个worker节点运行一个daemon进程("Supervisor"),它负责监听分配给自己的任务并开启工作进程处理。nimbus和supervisors之间通过zookeeper协调。

 

stream是Storm里抽象的核心概念,stream是一组没有边界的数组(tuples)

spout是stream的来源(可能有多个spout源头),比如spout从Kestrel队列中读取数据发送成一个流;bolt负责处理输入流,发送新的数据流到别的bolt,Bolts能通过运行函数做很多事情,过滤tuples,聚集和拼装数据流,和操作数据库等。spout和bolt的网络结构也会被打包到"topology"中,数据流会按照网络结构流下去。_一个topology会永远运行知道我们kill它,storm会重分配失败的任务,此外它保证不因为机器挂掉丢失正在处理的数据



下面的代码统计各个数字0~100之间的计数结果

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.FeederSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * This is a basic example of a Storm topology.
 */
public class CounterTopology {
    
    public static class CounterBolt extends BaseRichBolt {
		OutputCollector _collector;
		HashMap<Integer,Integer> hm;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
            hm = new HashMap<Integer,Integer>();
        }

        @Override
        public void execute(Tuple tuple) {
        	
        	int id = tuple.getInteger(0);
        	int count = tuple.getInteger(1);
        	
        	Object oldcount = hm.get(id);
        	if(oldcount != null){
        		count += (Integer)oldcount;
        	}
        	hm.put(id, count);
            _collector.emit(tuple, new Values(id,count));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id","count"));
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        FeederSpout cs = new FeederSpout(new Fields("id","count"));
        builder.setSpout("countspout", cs, 10);        
        builder.setBolt("countbolt1", new CounterBolt(), 3)
                .shuffleGrouping("countspout");
//        builder.setBolt("countbolt2", new CounterBolt(), 2)
//                .shuffleGrouping("countbolt1");
                
        Config conf = new Config();
        conf.setDebug(true);
        
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);
            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
        
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("counter", conf, builder.createTopology());
//
//            Utils.sleep(10000);
//            cluster.killTopology("counter");
//            cluster.shutdown();    
        }
        
        
        while(true){
        	Utils.sleep(100);
        	
        	Random rand = new Random();
        	int id = rand.nextInt(100);
        	int count = rand.nextInt(100);
        
        	cs.feed(new Values(id,count));
        }
    }
}

 运行方式如下:

javac -cp storm-0.8.2.jar storm/starter/CounterTopology.java
jar cvf CT.jar storm
bin/storm jar CT.jar storm.starter.CounterTopology

 

生产环境使用storm

1,定义一个topology任务(java语言使用TopologyBuilder)

2,使用StormSubmitter提交任务到集群,三个参数分别是(任务名称,配置项,任务)

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

3,创建代码jar文件以及提交到集群

storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

针对topology的常用配置如下:

Config.TOPOLOGY_WORKERS:集群中处理任务的进程数,如果配置为25,并发数设置为150,则每个进程会开启6个线程处理任务
Config.TOPOLOGY_ACKERS:用来检测任务是否正确处理的进程数目
Config.TOPOLOGY_MAX_SPOUT_PENDING:配置spout上最多可以保存为未处理和未失败的任务数,建议设置以免queue爆满
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:默认30s
Config.TOPOLOGY_SERIALIZATIONS:注册自定义serilaizer

终止topology: storm kill {stormname}

更新运行中的topology:暂时只能杀掉重启,计划设计storm swap命令实现此功能

监控topology:使用storm ui

 

bolt的格式如下:

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
	//prepare函数里面也可以通过collector发送数据流
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
	//处理数据,发送数据流,记着ack保证数据不丢失
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
	//需要声明输出数据流的格式
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

参照网页

https://github.com/nathanmarz/storm/wiki/Tutorial

https://github.com/nathanmarz/storm-starter

http://xumingming.sinaapp.com/category/storm/

http://www.oschina.net/p/twitter-storm

Trident用来做实时分析不错

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/index.html

  • 大小: 22.6 KB
分享到:
评论

相关推荐

    使用Storm实现实时大数据分析!

    简单和明了,Storm让大数据分析变得轻松加愉快。当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据...

    《数据分析基础——基于Excel和SPSS》习题答案.pdf

    《数据分析基础——基于Excel和SPSS》这本教材的习题答案涵盖了数据分析的基本概念和实践操作,主要针对使用Excel和SPSS这两种常见的数据分析工具。在学习这个知识点时,我们需要理解以下几个核心要点: 1. **数据...

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    《Storm流计算项目:1号店电商实时数据分析系统——storm-kafka详解与实战案例》 在大数据处理领域,实时计算已经成为不可或缺的一部分,特别是在电商行业中,实时数据分析能够帮助企业快速响应市场变化,提高运营...

    Storm流计算项目:1号店电商实时数据分析系统-11.基于HBase的Dao基类和实现类开发一.pptx

    《Storm流计算项目:1号店电商实时数据分析系统——基于HBase的Dao开发》 在大数据处理领域,Storm作为实时流计算的利器,被广泛应用于实时数据处理与分析。本项目以1号店电商实时数据分析系统为例,深入探讨了如何...

    实时数据分析平台1

    【腾讯看点实时数据分析平台实践】 腾讯看点作为一家大型互联网公司的数据驱动决策系统,其每日处理的数据量高达万亿条。为了满足这种大规模实时数据分析的需求,腾讯看点选择了基于Flink和ClickHouse的实时数据...

    基于Storm的实时大数据处理.pdf

    结合HBase等分布式存储系统,实时计算能够应用于各种业务场景,从实时监控到智能推荐,提升了数据分析的速度和准确性。随着技术的不断发展,实时大数据处理将继续发挥重要作用,推动信息技术的进步。

    Storm流计算项目:1号店电商实时数据分析系统-16.项目1-地区销售额-优化Bolt支持重启及结果数据核查.pptx

    《Storm流计算项目:1号店电商实时数据分析系统——优化Bolt支持重启及结果数据核查》 在大数据处理领域,Storm作为一个实时计算框架,被广泛应用于实时数据分析和处理任务。在1号店的电商实时数据分析系统中,项目...

    大数据技术——数据处理和分析.pdf

    Hive是一种基于Hadoop的数据仓库工具,它提供了类SQL的查询语言,使得非程序员也能相对容易地进行数据分析。Hive将SQL查询转化为MapReduce任务执行,降低了开发成本。开发者还可以用Java编写用户定义函数(UDF)来...

    flume及kafka及storm搭建.rar

    Storm可以用来实时处理大规模数据流,例如实时分析、在线机器学习、持续计算和大型互联网服务的后端处理。其核心概念包括:Topology(拓扑结构),定义了数据流的处理逻辑;Spout(数据源),负责生成数据流;Bolt...

    STORM-User-guide-V3.2

    为此,开发了一款开放且灵活的多处理器调度仿真与评估平台——STORM(Simulation Tool for Realtime Multiprocessor scheduling)。该平台旨在通过仿真手段来评估和比较不同实时多处理器调度策略的性能。 STORM的...

    数据实时分析平台Heron.zip

    实时流系统是在大规模数据分析的基础上实现系统性的分析。另外,它还需要:每分钟处理数十亿事件的能力、有秒级延迟,和行为可预见;在故障时保证数据的准确性,在达到流量峰值时是弹性的,并且易于调试和在共享的...

    行业文档-设计装置-一种基于准实时数据平台的数据访问方法.zip

    Apache Storm和Druid等工具可以用于实时数据分析。 8. **可视化与仪表盘**:实时数据的结果需要以直观的方式呈现,以便决策者能够快速理解并采取行动。例如,使用Tableau或Kibana创建实时数据仪表盘。 通过综合...

    开源力量——数据挖掘原理与实战

    要点 数据分析流程、方法论(PEST、5W2H、逻辑树)、基础数据分析方法、数据分析师能力层级、数据的度量、探索、抽样、原理及实际操作,结合SPSS工具使用 第2周 数据挖掘基础 要点(数据挖掘概念、流程、重要环节、...

    storm集成kafka插demo.zip

    1. **Apache Storm**:Storm是一个分布式、容错的实时计算系统,它可以持续处理无限的数据流,适用于实时分析、在线机器学习、连续查询、日志聚合等场景。 2. **Apache Kafka**:Kafka是一个高吞吐量、低延迟的...

    Storm定时匹配插入mysql,源数据录入hdfs

    这可能是基于时间窗口、滑动窗口或会话窗口的实时数据分析。 1. **Storm的实时处理**:Storm的核心概念包括Spout(数据源)和Bolt(数据处理)。Spout负责从外部源接收数据,而Bolt则执行各种处理任务,如过滤、...

    storm demo

    【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...

    大数据分析技术在安全领域的运用.pdf

    他们搭建了基于Hadoop、HDFS、Pig、Hive、Mahout和MLlib的大数据分析平台,通过用户行为数据构建分析模型,如异常行为分类预测模型、统计预测分析模型和社交网络分析模型,精准挖掘出违规电话号码。这种用户行为分析...

    大数据分析技术在安全领域的运用.docx

    他们建立了一个基于Hadoop、HDFS、Pig、Hive、Mahout、MLlib的大数据分析平台,通过用户行为数据构建分析模型,如异常行为分类预测模型、统计预测分析模型和社交网络分析模型,以识别垃圾短信和骚扰电话。...

Global site tag (gtag.js) - Google Analytics