基础
Storm
是一个分布式,可靠的,容错好的,用于处理数据流的系统。整个处理工作有不同的可靠组件来完成,每个组件负责一个简单的具体任务。Storm
集群的输入数据流由一个成为spout
的组件处理。spout
再将数据传给bolt
组件,其中会对数据做某种变换。bolt
或者持久化数据到某个存储介质中,或者将数据发送到其他的bolt
。你可以将Storm
集群想像成一个由bolt
组成的链条,每个bolt
都对spout
产生的数据进行某种变换。
这里给出一个例子用于说明这个概念。昨晚,当播音员开始谈论政治家及他们对不同事情的看法时,我正在看新闻。他们重复着不同的名字,我就知道是否每个名字都被提到了相同的次数,或者在提到的名字中是否有偏见存在。
将播音员说话时的字幕想像成一个输入数据流。你就拥有一个从文件(socket, HTTP, 或者其他方式)读取输入的’spout’。每当文本行到达,spout
将它们丢给一个将文本行切割成单词的bolt
。单词流接着传给另一bolt
,这里会将每一个单词与预先设置好的政治家的名字进行比较。当名字匹配时,第二个bolt
就会将数据库的那个名字相应的计数器的值加一。无论何时你想看这些结果,只需要查询数据库,该数据库是随着数据的到来而实时更新的。所有的组件(spout
和bolt
)和它们之间的联系方式成为Topology
(见图 1-1)。
现在考虑定义每个bolt
和spout
在整个集群的并行度,这样一来就可以扩展topology
。很神奇,对不对?虽然这是一个简单的例子,但是对于Storm
的牛叉可见一斑。
Storm
主要用于哪些场景?
- 流处理正如在前面的例子中提到的,有别于其他的处理系统,
Storm
不需要中间队列 - 连续计算连续发送数据到客户端,这样就能实时更新和展示数据,例如网站统计
- DRP调用方便并行化CPU密集计算
Storm的组件
在Storm
集群中,节点都组织到一个不断运行主节点。
Storm
集群中有两种节点:master节点和worker节点。Master运行一个成为Nimbus的进程,用于在集群中发布代码,分配任务到到每个工作节点,监视任务的失败。工作节点运行一个称为Supervisor的进程,执行拓扑的一部分。Storm中的拓扑运行在不同机器上的多个工作节点上。
由于Storm将集群状态保存在ZooKeeper或者本地磁盘上,守护进程是无状态的,可以挂掉或者重启,而不会影响系统的正确性(见图 1-2)
底层,Storm使用了zeromq,一个高级的,可嵌入的网络库,它提供了一些使得Storm成为可能的特性。下面是一些zeromq的特性
- 作为并行框架的socket库
- 在集群产品和超级计算中比TCP快
- 进程间携带消息,IPC,TCP和多播
- 用于可扩展的多核消息传输应用的异步I/O
- 以fanout,pubsub,pipeline,reque-replay的方式完成N对N的链接
Storm的属性
除了上面所有的概念和考虑,有一些很nice的属性使得Storm变得很特殊。
易于编程
如果你曾经做个实时处理相关的,你就知道它是多么的操蛋。使用Storm,你会发现复杂性指数级下降
支持多种编程语言
虽然可以很容易的使用基于JVM的语言进行开发,当时Storm支持任何语言,只要实现一个小的中间库
容错性
Storm集群会知道工作进程挂掉,有必要的时候会重新分配任务。
扩展性
对于扩展所要做的就是增加机器到集群中。当新增的机器可用时,Storm就会分配任务给它们。
可靠性
所有的消息都保证被处理至少一遍。如果发生错误,消息有可能处理多遍,但是可以保证不会丢失任何消息。
快
速度是Storm设计中的考虑的一个关键因素。
事务
提供了精确的一次消息语义用于计算。
开始
本章,我们会创建一个storm工程和第一个strom topology。
接下来我们假设你安装的JRE的版本最低为1.6。我们推荐使用Oracle提供的JRE,可以从这里下载。
工作模式
在开始之前,有必要明白strom的工作模式。storm有两种工作方式。
本地模式
在 本地模式中,storm拓扑本地机器上的单个JVM中。该模式用于开发,测试和调试,因为这是查看所有拓扑组件工作的最简单的方式。在这个模式中,可以通 过调整参数来查看拓扑在不同的strom配置环境中是如何运行的。为了在本地模式中运行拓扑,需要下载storm的开发依赖,即开发和测试拓扑所需要的所 有东东。不久在我们创建第一个storm工程的时候,就会知道如何来做。
在本地模式中运行拓扑类似于在storm集群中运行。除了需要保证所有的组件是线程安全的,因为当拓扑部署到远程模式中,可能运行在不同的没有直接通信或者共享内存的物理主机上的不同JVM上。
本章中的所有例子中,我们都运行在本地模式中。
远程模式
在远程模式中,拓扑被提交到storm集群中,给集群由运行在不同机器上的许多进程组成。远程模式不显示调试信息,这就是为什么它成为生产环境。然而,在单台开发机上是可以建立一个storm集群的,而且在部署到生产环境前这样做是一个很好的做法,这样可以保证在生产环境中运行拓扑不会出问题。
在第六章我们会介绍更多关于远程模式,并在附录B中会展示如何建立集群。
HELLO WORLD STORM
在 这个工程中,我们将会创建一个简单的拓扑用于数单词。可以将其视为storm版的hello world。然而,这是一个灰常niubility的拓扑,因为可以扩展到任意大小,并且稍作修改后甚至可以用来建立一个统计系统。例如,我们可以修改这 个工程来寻找Twitter上主题的变化趋势。
创建一个这样的拓扑,需要一个用来读取单词的spout,第一个bolt用来归一化单词,第二个bolt用来数单词,具体如图2-1所示。
可以从https://github.com/storm-book/examples-ch02-getting_started/zipball/master将源代码以ZIP文件的形式下载下来。
如果你使用git(一个分布式的版本控制和源码管理工具),你可以在打算保存源码的目录下运行命令
git clone git@github.com:storm-book/examples-ch02-getting_started.git
。
检查JAVA的安装
建立环境的第一步就是检查当前运行的java的版本。打开终端并运行命令java -version
。我们可以看到类似下面的内容:
java -version
java version "1.6.0_26"
Java(TM) SE Runtime Environment (build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)
如果不是,请检查java的安装。(具体可以见< http://www.java.com/download/>)
创建工程
在建立工程前,先建立一个存放工程的文件夹(就像建立一个java应用那样)。这个文件夹用来存放工程的源码。
接下来,我们需要下载storm的依赖:一系列的需要放到classpath的jar包。可以用下面的两种方式之一完成:
- 下载依赖,解压,添加到classpath
- 使用apache maven
在定义工程结构之前,我们需要建立一个pom.xml(project object model)文件,该文件描述了依赖,打包,源码等信息。我们将使用nathanmarz (https://github.com/nathanmarz/)提供的依赖和maven仓库。依赖可以从这里找到https://github.com/nathanmarz/storm/wiki/Maven
使用依赖,我们需要在pom.xml文件中指明运行拓扑所需的必要组件。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.book</groupId>
<artifactId>Getting-Started</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerVersion>1.6</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<!-- Repository where we can found the storm dependencies -->
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<!-- Storm Dependency -->
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.6.0</version>
</dependency>
</dependencies>
</project>
开始的几行指定工程名和版本。然后增加了一个编译插件,用于高速Maven源码需要使用Java 1.6编译。接着定义了仓库(对于同一个工程Maven支持多个仓库)。clojars就是storm依赖所在的仓库。Maven会自动下载在本地模式中运行storm所需的子依赖。
应用将会有下面的结构,典型的Maven java工程:
our-application-folder/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
java目录下的文件夹中是我们的源码,需要处理的单词文件在resou文件夹中。
创建第一个拓扑
要建立我们的第一个拓扑,我们需要实现运行数单词需要的类。有可能本例的部分在目前看来不太清楚,我们在后续的章节会进一步解释。
Spout
WordReader spout
实现了接口IRichSpout
。更多细节在第四章给出。WordReader负责读取文件,并提供每一个行给bolt。
例2-1给出了该类的完整代码(我们会在后续的例子中分析这段代码的每一部分)。
例2-1 src/main/java/spouts/WordReader.java
package spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public boolean isDistributed() {return false;}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//Read all lines
while((str = reader.readLine()) != null){
/**
* By each line emmit a new value with the line as a their
*/
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
/**
* We will create the file and get the collector object
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file
["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
/**
* Declare the output field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
在任何一个spout中首先被调用的方法是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
。接收的参数分别是TopologyContext
,包含了所有拓扑相关的数据;conf
对象,在定义拓扑的时候指定;SpoutOutputCollector
发射需要处理的数据到bolts。下面的代码块就是open方法的实现
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
在该方法中,我们创建了一个reader,用来读取文件。接下来需要实现方法public void nextTuple()
,在该方法中我们会发射需要bolts处理的数据。在我们的例子中,该方法会读取文件,每读一行发射一个值。
public void nextTuple() {
if(completed){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try{
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str));
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
方法nextTuple()
跟方法ack()
和fail()
会在同一个循环中周期性的被调用。在没有任务可做的时候需要放弃对线程的控制权,这样其他的方法才有机会被调用。所以nextTuple
的第一行检查处理是否完成。如果完成,它就会沉睡至少1ms来降低处理的负载。如果有任务需要处理,文件的每一行都会被读到一个value中,然后发射出去。
Bolts
现在,我们已经拥有了一个读取文件,并且每一行发射一个tuple的spout。还要设计两个bolts来处理刚才发射的tuple(见图2-1)。这些bolts实现了接口backtype.storm.topology.IRichBolt
。
bolt中最重要的方法是void execute(Tuple input)
,每接收到一个tuple就会调用该方法。对于每个接收的tuple,bolt可以发射出多个后续的tuple。
一个bolt或者spout可以发射出足够多的所需的tuple。当方法
nextTuple
或者execute
被调用时,它们可能会发射出0,1或者许多个tuple。我们会在第五章做更多说明。
第一个bolt,wordNormalizer
,负责接收每一行,并归一化。它将每一行文本切割成单词,转成小写并去除两边的空格。首先,需要声明bolt的输出参数:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
这里指明bolt会发射出一个名为word的域。
接下来,我们实现方法public void execute(Tuple tuple)
,用出来输入的tuples。
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
collector.emit(new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
第一行从tuple中读取值。值可以根据位置或者名字来获取。值在处理结束后,使用collector对象发射出去。在每一个tuple都被处理后,collector对象的ack()
方法会被调用一表明整个处理过程成功结束。如果tuple没有被处理,那么应该调用fail()
方法。
例2-2 给出该类的完整代码
例2-2 src/main/java/bolts/WordNormalizer.java
package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt {
private OutputCollector collector;
public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
该类中,我们给出了一个在单次
execute
方法调用中发射多个tuple的例子。如果该方法接收了一行句子This is the Storm book
,那么在单次execute
方法调用中,它将会发射5个新tuple。
第二个bolt,WordCounter
,负责对单词计数。当拓扑运行结束后(当cleanup()
方法被调用),会给出每个单词的计数值。
这是一个不发射任何东西的bolt的示例。在这里,数据被添加到一个map中,但是在实际中bolt可以把数据存到数据库。
package bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
System.out.println("-- Word Counter ["+name+"-"+id+"] --");
for(Map.Entry<String, Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}
/**
* On each word We will count
*/
@Override
public void execute(Tuple input) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
//Set the tuple as Acknowledge
collector.ack(input);
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
execute
方法使用一个map
来收集单词并计数。当拓扑终止的时候,调用cleanup()
方法并打印出计数器map。(这里只是一个示例,实际中应该使用cleanup
方法在拓扑终止的时候来关闭连接以及其他的资源。)
main方法
在main方法中,需要创建一个LocalCluster
对象,这可以使得我们在本地测试拓扑。结合Config
对象,LocalCluster
允许设置不同的集群配置。例如,如果使用了一个全局变量或者类变量,那么在不同数目的工作节点中测试拓扑就发现错误。(在第三章会详细说明)
拓扑所有的节点都应该独立的运行,进程之间没有共享数据(例如,没有全局变量或者类变量),因为当一个拓扑被部署到集群后,这些进程有可能运行在不同的物理主机上。
使用TopologyBuilder
来创建拓扑,它指明了节点是如何安排的以及数据是如何交换的。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
spout和bolts使用shuffleGroupings
连接。这种分组类型指明消息从源节点以随机的方式发送到目标节点。
接下来,创建一个包含拓扑配置的Config
对象,在拓扑运行的时候这些配置会和集群的配置合并,并通过prepare
方法发送到所有的节点。
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
设置属性wordsFile
为spout读取的文件名,因为在开发环境中,所以设置debug
属性为true
。当debug为true
的时候,Storm会打印出节点间交换的所有信息,以及其他一些有用的调试信息,这些有助于理解拓扑是如何运行的。
正如之前所提到的,我们会使用LocalCluster
来运行拓扑。在生产环境中,拓扑会一直运行下去,但是在这个例子中,我们只会让它运行几秒钟,这样我们很快就会看到结果了。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
使用createTopology
和submitTopology
来创建并运行拓扑,沉睡2秒钟(拓扑运行在不同的线程中),然后通过关闭集群来停止拓扑。
完整的代码见例2-3
例2-3 src/main/java/TopologyMain.java
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf,
builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
运行结果
已经准备运行我们的第一个拓扑了!如果文件src/main/resources/words.txt
中每行一个单词,那么用下面的命令运行拓扑:
mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"
例如,如果words.txt
的内容如下:
Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great
在日志中,我们会看到下面的输出:
is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1
在这个例子中,每个节点只用了一个实例。但是如果日志文件灰常大,会怎么样呢?可以简单调整节点数来并行化处理。在这个例子中,可以创建两个WordCounter
实例。
builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");
再次运行程序,可以看到:
-- Word Counter [word-counter-2] --
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
-- Word Counter [word-counter-3] --
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1
Niubility!改变并行数是如此简单(当然了,在实际中,每个实例都会运行在各自的机器上。)但是还是有个问题:单词is
和great
在每个WordCounter
实例中被计算了一次。为什么?当使用shuffleGrouping
时,就等于告诉storm以一种随机的方式发射消息到bolt的实例。在这个例子中,理想的做法是同一个单词发送到同一个wordCounter
。要这么做,可以将shuffleGrouping("word-normalizer")
改成fieldsGrouping("word-normalizer",new Fields("word"))
。试着改一下,再次运行并确认结果。
相关推荐
TutorialsPoint Apache Storm教程.epub
除了书籍《Getting Started with Storm》之外,还有一些社区和网站提供了Storm的使用文档和教程,这些都是学习Storm技术的宝贵资源。在实践中,开发者应重视对Storm各种术语的准确理解和运用,例如spout、bolt、...
Storm入门到精通 Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。它的核心组件包括Spout和Bolt,分别负责数据的输入和处理。下面是对 Storm 的一个概述,从基础知识到实践应用。 Storm 组件 Storm ...
该资料为工作中用到的技术难点及大数据处理流程图,用最好的技术去挑战工作中的难点
在这个教程里面我们将学习如何创建Topologies,并且把topologies部署到storm的集群里面去。Java将是我们主要的示范语言, 个别例子会使用python以演示storm的多语言特性。这个教程使用storm-starter项目里面的例子。...
在构建大数据实时处理系统时,`Flume`、`Kafka` 和 `Storm` 是三个重要的组件,它们分别用于数据采集、数据传输和实时数据处理。以下是对这些组件的详细说明: **Flume** 是一个分布式、可靠且可用于有效收集、聚合...
Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...
storm学习文档
10. **性能优化**: 教程可能还会讨论如何优化 Storm Topology 的性能,包括减少延迟、提高吞吐量以及调整资源分配等。 通过深入学习并实践 `storm-tutorial` 中的示例,你不仅可以掌握 Apache Storm 的基本原理,还...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
风暴教程这是一个针对 Apache Storm 的教程项目,旨在帮助刚接触 Storm、Streaming 和 Kafka 的人更快地上手。 这里的目标是创建一个简单的 Kafka Producer 应用程序(在 Python 中),然后触发 Storm 流来处理它。 ...
storm 视频教程 从入门到精通 提供 项目实战 分析 及实现
01-storm简介 02-storm部署-1 03-storm部署-2 04-storm部署概念 05-streamgrouping 06-storm组件生命周期 07-storm可靠性1 08-storm可靠性2
学习Storm从入门到精通,大数据流式处理必备,配合kafka.
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。...在教程中阅读更多内容。
【Storm实战培训教程】 Storm是一个开源的分布式实时计算系统,起源于Twitter的内部项目,后来在2011年开源,极大地推动了实时流处理的发展。它的主要特点是编程模型简单、可扩展、高可靠性和高容错性。Storm在实时...
大数据全套视频教程。linux,hadoop,spark,storm,hive,flume,oozie,,hbase,zookeeper,mysql,mongodb,redis,多个项目实践等等,应有尽有。
storm+kafka jar包 ,curator-client-2.8.0、curator-framework-2.8.0、curator-recipes-2.8.0、guava-18.0、kafka_2.9.2-0.8.2.2、metrics-core-2.2.0、scala-library-2.10.4、storm-kafka-0.9.2-incubating、...