一、Storm大数据位置
解决方案 |
开发商 |
类型 |
描述 |
Storm |
|
流式处理 |
Twitter 的新流式大数据分析解决方案 |
S4 |
Yahoo! |
流式处理 |
来自 Yahoo! 的分布式流计算平台 |
Hadoop |
Apache |
批处理 |
MapReduce 范式的第一个开源实现 |
Spark |
UC Berkeley AMPLab |
批处理/流处理 |
支持内存中数据集和恢复能力的最新分析平台 |
Disco |
Nokia |
批处理 |
Nokia 的分布式 MapReduce 框架 |
HPCC |
LexisNexis |
批处理 |
HPC 大数据集群 |
二、Storm概念及组件
在Storm拓扑构建前我们先复习一下Storm概念及组件:
Nimbus:负责资源分配和任务调度。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
Worker:运行具体处理组件逻辑的进程。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个
spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为
topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用
此函数,用户只要在其中生成源数据即可。
Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等
任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函
数,用户可以在其中执行自己想要的操作。
Tuple:Storm Spout、Bolt组件消息传递的基本单元(数据模型),Tuple是包含名称的列表,Storm支持所
有原生类型,字节数组为Tuple字段传递,如果要传递自定义对象,需要实现接口serializer。
Stream:源源不断传递的tuple就组成了stream。
图(一)
三、创建逻缉组件
以下图二所示,创建Storm组件Spout、Bolt及Storm拓扑构建并结合拓扑描述并发情况(Worker、
Executor、Task关系)
图(二)
SPOUT:自定义类实现IRichSpout接口或继承BaseRichSpout即创建Spout组件。Spout从外部数据源读取数据
(队列、DRPC等)为Storm拓扑提供数据,Storm可以实现可靠或不可靠,可靠性表现在Storm可以对Storm
toplogy处理失败的tuple进行重发,反之不处理。
Spout可以发送多个流,通过在调用SpoutOutputCollector类的emit方法的同时使用declareStream方法申
明并指定多个流发送。
Spout中主要方法nextTuple,能够发送新流到toplopy及无数据流发送时直接返回。最重要的是不要阻塞
nextTuple方法,因为Spout在同一个线程执行。Spout主要将读取到的数据组织成Tuple发送到Bolt组件处
理。
Spout另一个重要的方法时ack和fail,Storm监控到tuple从Spout发送到toplogy成功完成或失败时调用ack
和fail(数据可靠性请参考其它文档)
示例Spout发tuple到默认流:
package com.sunshine.spout; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * 绿色Spout,参考图二 */ public class GreenSpout extends BaseRichSpout{ private static final long serialVersionUID = -1215556162813479167L; private SpoutOutputCollector collector; /** * Storm自动初始化 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * Storm不断调用此方法,传递单词到Bolt组件处理 */ @Override public void nextTuple() { String[] words = {"green", "yellow", "blue"}; for(String word : words){ /** * Values是List子类,发送数据需要封装在Values * 本次存储一个单词发送与declareOutputFields() * 方法申明输出一个字段对应 */ collector.emit(new Values(word), word); // 1 } } /** * 申明发送tuple字段名称 * 在Bolt组件可以根所名称或索引获取Spout传递的Tuple * tuple.getValue(0) * 或tuple.getValueByField("word"); */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); // 1 } @Override public void ack(Object msgId) { System.out.println("success-->" + msgId); } @Override public void fail(Object msgId) { System.out.println("fail-->" + msgId); } }
BOLT:自定义类实现IRichBolt接口或者继承BaseRichBolt类即Bolt组件。Toplogy里所有处理都在Bolt完成,
Bolt可以做任何事,如过滤、聚合、连接、操作数据库等,也可以将数据传递一下Bolt组件处理。
Bolt可以做简单流转化或发送多个流(参考Spout发送多个流的方式),Bolt成功将消息处理后通知ACK组
件(可靠)。
package com.sunshine.bolt; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** *黄色Spout,参考图二 */ public class YellowBolt extends BaseRichBolt{ private static final long serialVersionUID = 7593355203928566992L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = (String)input.getValueByField("word"); if(word != null){ collector.emit(new Values(word, word)); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("yword1","yword2")); } }
package com.sunshine.bolt; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /** * 蓝色Spout,参考图二 */ public class BlueBolt extends BaseRichBolt{ private static final long serialVersionUID = 4342676753918989102L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String yword1 = (String)input.getValueByField("yword1"); String yword2 = (String)input.getValueByField("yword2"); System.out.println("yword1:" + yword1 +",yword2:" + yword2); collector.ack(input); } /** * 结束 */ @Override public void cleanup() { super.cleanup(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 不再传递下一个Bolt组件处理 } }
Toplogy(拓扑):参考图二及上述代码,构建Storm拓扑代码如下;在WIN7上使用本地运行模式,存在
Zookeeper连接问题:
“java.net.SocketException: Address family not supported by protocol family: connect”
原因:Win7启动IPV6
解决:增加配置属性System.setProperty("java.net.preferIPv4Stack", "true");
package com.sunshine; import backtype.storm.Config; import backtype.storm.topology.TopologyBuilder; import com.sunshine.bolt.BlueBolt; import com.sunshine.bolt.YellowBolt; import com.sunshine.spout.GreenSpout; import com.sunshine.tools.StormRunner; /** * STROM启动类 * @author OY * @version 0.1 */ public class SimpleTopolog { public static void main(String[] args) throws Exception { // 解决ZOOKEEPER客户端连接服务端问题(IPV6) System.setProperty("java.net.preferIPv4Stack", "true"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("green-spout", new GreenSpout(), 2) // 2 executor(线程) .setNumTasks(4); // 4 task 对应2个executor builder.setBolt("yellow-bolt", new YellowBolt(),6) // 6 executor(线程) .shuffleGrouping("green-spout"); //tuple随机分发到bolt处理 builder.setBolt("blue-bolt", new BlueBolt(), 2) // 2 executor(线程) .shuffleGrouping("yellow-bolt"); Config conf = new Config(); /*设置工作进程数*/ conf.setNumWorkers(2); conf.setDebug(true); /*本地运行模式*/ StormRunner.runTopologyLocally(builder.createTopology(), "simpleTopology", conf, 0); } }
Storm运行模式工具类
package com.sunshine.tools; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; public final class StormRunner { private static final int MILLIS_IN_SEC = 1000; private StormRunner() { } public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException { StormSubmitter.submitTopology(topologyName, conf, topology); } }