- 浏览: 56456 次
- 性别:
- 来自: 北京
文章分类
最新评论
本地模式,是在eclipse等编译器编写strom运行文件,在于模拟storm在集群运行的结果,便于代码的编写和调试。
一、下载开发环境的zip文件,将storm相关jar包导入编辑器。注意是zip文件,不是gz运行linux文件。
http://storm.apache.org/downloads.html
二、创建数据文件
storm有Tail属性,只适合文本源,会对文本文件内容进行监听。
文件格式如下:
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:51
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:53
www.baidu.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2014-01-07 11:40:49
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 11:40:49
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:53
www.baidu.com CYYH6Y2345GHI899OFG4V9U567 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 10:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:52
www.baidu.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2014-01-07 12:40:49
www.baidu.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2014-01-07 12:40:49
www.baidu.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2014-01-07 10:40:49
二、编写读取数据源类MySpout
Open(配置文件,上下文,发射topology类)是初始化方法
nextTuple()循环发射数据
ack() 成功处理tuple回调方法
Fail()处理失败tuple回调方法
activate和deactivate :spout可以被暂时激活和关闭
close方法在该spout关闭当前执行,但是并不能得到保证其一定被执行。kill -9时不执行,
Storm kill {topoName} 时执行。
superviser、numberis 用Kill -9。Topology一般不用Kill -9。
三、编写数据处理类
prepare方法进行初始化,传入当前执行的上下文,与Spout的open方法一样
execute接受一个tuple进行处理,也可emit数据到下一级组件
cleanup 同ISpout的close方法,在关闭前调用,不保证其一定执行。
四、构建topology结构并运行
六、如果maven项目的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
七、运行结果
lines :2 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123
lines :3 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123
lines :4 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7
lines :5 session_id:XXYH6YCGFJYERTT834R52FDXV9U34
一、下载开发环境的zip文件,将storm相关jar包导入编辑器。注意是zip文件,不是gz运行linux文件。
http://storm.apache.org/downloads.html
二、创建数据文件
storm有Tail属性,只适合文本源,会对文本文件内容进行监听。
public class GetData { /** * 生成数据 * @param args */ public static void main(String[] args) { File logFile = new File("track.log"); Random random = new Random(); String[] hosts = { "www.baidu.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; StringBuffer sbBuffer = new StringBuffer() ; for (int i = 0; i < 50; i++) { sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]+"\n"); } if(! logFile.exists()) { try { logFile.createNewFile(); } catch (IOException e) { System.out.println("Create logFile fail !"); } } byte[] b = (sbBuffer.toString()).getBytes(); FileOutputStream fs; try { fs = new FileOutputStream(logFile); fs.write(b); fs.close(); } catch (Exception e) { e.printStackTrace(); } } }
文件格式如下:
引用
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:51
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:53
www.baidu.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2014-01-07 11:40:49
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 11:40:49
www.baidu.com XXYH6YCGFJYERTT834R52FDXV9U34 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:53
www.baidu.com CYYH6Y2345GHI899OFG4V9U567 2014-01-07 12:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 10:40:49
www.baidu.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2014-01-07 08:40:52
www.baidu.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2014-01-07 12:40:49
www.baidu.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2014-01-07 12:40:49
www.baidu.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2014-01-07 10:40:49
二、编写读取数据源类MySpout
Open(配置文件,上下文,发射topology类)是初始化方法
nextTuple()循环发射数据
ack() 成功处理tuple回调方法
Fail()处理失败tuple回调方法
activate和deactivate :spout可以被暂时激活和关闭
close方法在该spout关闭当前执行,但是并不能得到保证其一定被执行。kill -9时不执行,
Storm kill {topoName} 时执行。
superviser、numberis 用Kill -9。Topology一般不用Kill -9。
public class MySpout implements IRichSpout{ /** * */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; public void nextTuple() { try { while ((str = this.br.readLine()) != null) { // 过滤动作 collector.emit(new Values(str)); // Thread.sleep(3000); //to do } } catch (Exception e) { // TODO: handle exception } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } // 打开文件 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 发射数据格式,与bolt接收数据一致 declarer.declare(new Fields("loog")); } public Map<String, Object> getComponentConfiguration() { // 与ope方法中的map对应 return null; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } }
三、编写数据处理类
prepare方法进行初始化,传入当前执行的上下文,与Spout的open方法一样
execute接受一个tuple进行处理,也可emit数据到下一级组件
cleanup 同ISpout的close方法,在关闭前调用,不保证其一定执行。
public class MyBolt implements IRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector = null; public void cleanup() { } int num = 0; String valueString = null; public void execute(Tuple input) { // input.getValueByField("log"); // input.getValue(0); try { valueString = input.getStringByField("loog") ; if(valueString != null) { num ++ ; System.err.println("lines :"+num +" session_id:"+valueString.split("\t")[1]); } collector.ack(input); Thread.sleep(2000); } catch (Exception e) { collector.fail(input); e.printStackTrace(); } } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector ; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("")) ; } public Map<String, Object> getComponentConfiguration() { return null; } }
四、构建topology结构并运行
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt", new MyBolt(), 1).shuffleGrouping("spout");//与builder中的spout名称对应 Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
六、如果maven项目的pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
七、运行结果
引用
lines :2 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123
lines :3 session_id:ABYH6Y4V4SCVXTG6DPB4VH9U123
lines :4 session_id:BBYH61456FGHHJ7JL89RG5VV9UYU7
lines :5 session_id:XXYH6YCGFJYERTT834R52FDXV9U34
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1029一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6461、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 748一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 513英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 415一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6771、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5801.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4841、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8191、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 611Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2095事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4461、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1127统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 893汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 683一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10671、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 700一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 586并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5331、数据源读取,字符发射spout类 /** * 字符 ... -
Storm启动配置
2017-03-29 17:40 668一、安装Storm wget ...
相关推荐
**Storm本地模式WordCount亲测可用** 在大数据处理领域,Apache Storm是一个实时计算框架,它被广泛用于处理无界数据流。"Storm本地模式"是Storm提供的一种在单机环境中进行开发和测试的机制,无需分布式环境即可...
在本地模式下,Storm可以在单机上模拟整个集群环境,方便开发者进行调试和测试。在这种模式下,所有的组件都在同一个进程中运行,简化了开发和验证流程。要运行一个Storm拓扑在本地,需要使用`LocalCluster`类并提交...
Storm集群的运行模式包括本地模式和分布式模式。本地模式适用于开发和测试阶段,而分布式模式则适合生产环境。搭建Storm开发环境需要安装Java运行环境和Apache Storm软件包,并配置相应的环境变量。此外,还需要搭建...
5. **本地集群搭建**:在本地机器上,你可以使用`storm local`命令快速启动一个本地模式的Storm集群,以便于调试。首先,确保已下载并安装了Storm,然后在Storm的bin目录下运行: ``` storm local ``` 这将在...
* 最后,配置 Storm 的操作模式,可以是本地模式或远程模式。 Spout Spout 是 Storm 中的数据输入组件,负责从外部数据源读取数据。Spout 的主要方法包括: * nextTuple():读取下一个数据元组。 * ...
4. **本地模式测试**:在提交到集群前,可以使用Storm的本地模式进行快速验证,这使得开发者能够在本地单机环境中调试拓扑。 5. **提交拓扑**:一旦在本地模式下测试成功,你可以将拓扑提交到运行Storm的集群,进行...
大数据_Storm_Storm的集群模式与本地模式 (基于Storm 1.0.1)-附件资源
通常,这可以通过使用Maven的`storm-starter`依赖和Storm的本地模式来实现,这样就可以在本地环境中模拟整个Storm集群的行为。 **标签解析:** 1. **storm** - 指Apache Storm,一个开源的分布式实时处理系统,常...
3. **本地模式**:在开发过程中,可以使用本地模式在单机上模拟全分布式环境,方便快速测试和调试。 4. ** storm.yaml配置**:配置文件storm.yaml用于设置Storm集群的参数,如nimbus服务器地址、supervisor节点配置...
4. **本地模式与生产环境**:学习Storm时,通常会先在本地模式下运行拓扑,进行调试和测试,然后再部署到生产环境中。 5. ** Trident API**:Trident是Storm提供的高级API,它支持精确一次的语义,可以更方便地构建...
启动Storm的本地模式,执行`$STORM_HOME/bin/storm local nimbus`。这将在本地模拟一个Storm集群。若要部署到生产环境,需要配置多节点集群,包括Nimbus(主控节点)、Supervisor(工作节点)和UI(用户界面)。 **...
- **Local模式**:在本地模式下,你可以快速地运行和测试拓扑,无需部署到集群。 - **集群模式**:当拓扑准备好后,你可以将其提交到Zookeeper协调的Storm集群上,进行分布式运行。 4. **容错机制**: - Storm...
- **Cluster 模式**: Storm 可以部署在本地模式进行开发测试,也可以在多机器集群上进行生产部署。 6. **监控与日志** - **UI界面**: Storm 提供了一个 Web UI,展示拓扑状态、性能指标以及错误信息,方便监控和...
1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。 2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个...
4. **本地模式**:在本地环境中运行拓扑,进行测试和调试。 5. **提交到集群**:了解如何配置和提交拓扑到真实的Storm集群,以处理大规模的数据流。 6. **容错性**:学习Storm如何确保数据流的可靠性和无丢失处理。 ...
当用户提交一个Topology任务后,Nimbus会将该Topology分配给一个或多个Supervisor节点,Supervisor再在本地启动Worker进程执行Topology。每个Worker进程都会有自己的Spout和Bolt实例,这些实例通过Stream进行交互。 ...
书中还提到了如何创建项目中的主类,并通过“HelloWorldStorm”示例来展示如何在本地模式下运行一个简单的Storm程序。然后,作者指导读者如何创建一个拓扑结构,拓扑结构是Storm中用于定义数据流处理过程的实体。它...
9. **开发与部署**:学习如何编写Storm拓扑,使用本地模式进行测试,以及如何将它们部署到生产环境,是成为Storm开发者的必备技能。 10. **社区与资源**:Apache Storm有一个活跃的开发者社区,提供了丰富的教程、...