`
wbj0110
  • 浏览: 1639710 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

storm任务示例

阅读更多

 LogProcess.java

package mytest;

 

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.util.Map;

 

import mytest.ThroughputTest.GenSpout;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class LogProcess {

         public static class FileSpout extends BaseRichSpout {

 

                   /**

                    */

                   private static final long serialVersionUID = 1L;

                   private SpoutOutputCollector _collector;

                   private BufferedReader br;

                   private String dataFile;

                  

                   //定义spout文件

                   FileSpout(String dataFile){

                            this.dataFile = dataFile;

                   }

 

                   //定义如何读取spout文件

                   @Override

                   public void open(Map conf, TopologyContext context,

                                     SpoutOutputCollector collector) {

                            // TODO Auto-generated method stub

                            _collector = collector;

                            File csv = new File(dataFile); // log file

                            try {

                                     br = new BufferedReader(new FileReader(csv));

                            } catch (FileNotFoundException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                   }

 

                   //获取下一个tuple的方法

                   @Override

                   public void nextTuple() {

                            // TODO Auto-generated method stub

                            try {

                                    

                                     String line = null;

                                     while ((line = br.readLine()) != null) {

                                               _collector.emit(new Values(line));

                                     }

                            } catch (FileNotFoundException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            } catch (IOException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                   }

 

 

                   @Override

                   public void declareOutputFields(OutputFieldsDeclarer declarer) {

                            // TODO Auto-generated method stub

                            declarer.declare(new Fields("line"));

                   }

                  

         }

        

 

         public static class Process extends BaseRichBolt{

 

                   private String _seperator;

                   private String _outFile;

                   PrintWriter pw;

                   private OutputCollector _collector;

                   private BufferedWriter bw;

                  

                   public Process(String seperator,String outFile) {

                            this._seperator = seperator;

                            this._outFile   = outFile;

                           

                   }

                  

                   //把输出结果保存到外部文件里面。

                   @Override

                   public void prepare(Map stormConf, TopologyContext context,

                                     OutputCollector collector) {

                            // TODO Auto-generated method stub

                            this._collector = collector;

                            File out = new File(_outFile);

                            try {

//                                  br = new BufferedWriter(new FileWriter(out));

                                     bw = new BufferedWriter(new OutputStreamWriter( 

                             new FileOutputStream(out, true))); 

                            } catch (IOException e1) {

                                     // TODO Auto-generated catch block

                                     e1.printStackTrace();

                            }                

                   }

                  

                   //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。

                   @Override

                   public void execute(Tuple input) {

                            // TODO Auto-generated method stub

                            String line = input.getString(0);

//                         System.out.println(line);

                            String[] str = line.split(_seperator);

                            System.out.println(str[2]);

                            try {

                                     bw.write(str[2]+",bkeep"+"\n");

                                     bw.flush();

                            } catch (IOException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                           

                            _collector.emit(new Values(line));

                   }

 

                   @Override

                   public void declareOutputFields(OutputFieldsDeclarer declarer) {

                            // TODO Auto-generated method stub

                            declarer.declare(new Fields("line"));

                   }

                  

         }

        

         public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{

                   String dataFile = argv[0]; //输入文件

                   String seperator = argv[1];      //分隔符

                   String outFile   = argv[2]; //输出文件

                   boolean distribute = Boolean.valueOf(argv[3]);       //本地模式还是集群模式

                   TopologyBuilder builder = new TopologyBuilder();  //build一个topology

        builder.setSpout("spout", new FileSpout(dataFile), 1);   //指定spout

        builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout");  //指定bolt,包括boltprocessgrouping

        Config conf = new Config();

        if(distribute){

            StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());

        }else{

                 LocalCluster cluster = new LocalCluster();

                 cluster.submitTopology("LogProcess", conf, builder.createTopology());

        }

         }       

}

 

 

运行

[admin@vkvm161064 guandao]$ pwd

/home/admin/guandao

[admin@vkvm161064 guandao]$ ls

out.txt  storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar  tmp.txt

 

输入文件:

[admin@vkvm161064 guandao]$ cat tmp.txt

a,b,c,d

1,2,3,4

A,B,C,D

xx,ff,ff,ss

xx,ff,alibaba,ss

xx,ff,taobao,ss

xx,xx,xx,xx

xx,xx,ll,xx

xx,xx,hero,xx

 

输出文件:

[admin@vkvm161064 guandao]$ cat out.txt

c,bkeep

3,bkeep

C,bkeep

ff,bkeep

alibaba,bkeep

taobao,bkeep

xx,bkeep

ll,bkeep

hero,bkeep

 

提交topology

[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase

 

语法:storm  jar 自己开发的topology   topology_name  inputfile 分隔符   outputfile  true/false(true代表集群运行)

分享到:
评论

相关推荐

    Storm 简单示例

    在"Storm简单示例"中,Bolt可能会实现诸如计数、词频统计等简单的数据处理任务。 4. **Stream Grouping**:这是控制数据如何在Bolt之间流动的方式。有多种分组策略,如shuffle grouping(随机分组)、fields ...

    storm开发jar包以及storm例子源码

    标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...

    Storm-EPL-Example-2.0.19(sp2)

    通过这个示例,开发者可以学习如何在Storm中创建和配置Topology,理解EPL语句的编写,以及如何在Eclipse中调试和运行实时流处理任务。此外,还能了解到如何实现计算任务的隔离,提高系统的并发处理能力和性能。对于...

    storm+kafka源码示例

    Storm在大数据领域的应用广泛,尤其适合实时分析和在线机器学习任务。它的核心概念包括拓扑(Topology)、节点(Bolt)和源(Spout)。 在我们的"storm+kafka源码示例"中,拓扑(Topology)是Storm的基本工作单元,...

    storm之WordCount示例Java代码.zip

    在分布式计算领域,Apache Storm是一个实时计算系统,它被广泛用于处理无边界数据流,确保数据...通过学习这个示例,开发者能够掌握如何构建一个简单的Storm应用,从而为进一步探索复杂的实时数据处理任务打下基础。

    storm+mq整合完整示例

    【标题】"storm+mq整合完整示例"中涉及的知识点主要集中在分布式计算框架Apache Storm与消息队列MQ(如RabbitMQ、Kafka等)的集成应用上,旨在实现数据流的实时处理。以下是对这些知识点的详细阐述: 1. **Apache ...

    storm_jars.zip

    Storm采用Master-Slave架构,Nimbus作为Master负责任务调度,Supervisor作为Slave执行具体的计算任务。数据流(Stream)由Spout产生,经过一系列Bolt进行处理,形成新的数据流。Bolt可以进行过滤、聚合、映射等操作...

    storm之drpc操作demo示例.zip

    总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...

    storm集群的搭建-java示例代码.zip

    本资料包提供了一个基于Java的示例代码,将帮助我们了解如何在Java环境中配置和运行Storm集群。 首先,我们要理解Storm的基本概念。Storm由多个组件构成,包括Nimbus(主控节点)、Supervisor(工作节点)、Worker...

    storm之集成kafka操作示例代码.zip

    8. **监控与故障恢复**:由于Storm的容错机制,如果某个worker节点失败,其任务会被重新分配到其他节点,确保数据处理的连续性。同时,Kafka的消息持久化特性也保证了数据不会丢失。 9. **优化与性能**:在实际部署...

    【Storm入门级JAVA示例演示】

    【Storm入门级JAVA示例演示】是一篇关于Apache Storm的初级教程,主要针对Java开发者。Apache Storm是一个开源的分布式实时计算系统,它允许用户连续处理数据流,类似于Hadoop处理批处理任务。在这个示例中,我们将...

    storm的jar包

    6. **Examples**: 压缩包可能还包括示例代码,帮助初学者理解如何构建和部署Storm拓扑。 7. **Libraries**: 额外的库文件,可能包含Storm与外部系统(如Hadoop、Cassandra等)集成所需的依赖。 8. **Docs**: 文档...

    storm-starter-master

    标题中的"storm-starter-master"指的是Apache Storm的一个入门示例项目,它是一个开源的分布式实时计算系统。Apache Storm被广泛应用于大数据的实时处理,能够处理无界的数据流,并且保证消息的精确一次处理...

    apache-storm-2.4.0.tar.gz

    5. `examples` 目录:示例项目,用于演示如何构建和运行 Storm 作业。 6. `jars` 或 `extlib` 目录:用户可以放置自定义的库文件,以便在 Storm 中使用。 7. `LICENSE` 和 `NOTICE` 文件:Apache Storm 的许可和版权...

    storm入门.pdf

    书中的附录部分则提供了Storm客户端的安装指南、常用命令、Storm集群的安装与部署方法,以及如何运行书中的示例程序。 本书的翻译者提到,通过翻译这本书,他不仅希望能够帮助读者了解Storm,也希望自身的技术能力...

    Storm 上手 demo 例子 演示

    在“Storm demo”中,我们通常会看到如何设置开发环境,创建拓扑结构,以及运行基本的流处理任务。这些示例会涵盖以下关键知识点: 1. **安装与配置**:首先,你需要在本地或者集群环境中安装Storm。这包括下载源码...

    storm demo 单机版 maven

    在Storm项目中,Maven可以方便地添加对Storm库和其他依赖的引用,执行编译、测试和打包等任务。 3. **Storm拓扑**:Storm中的核心概念是拓扑,它定义了数据流的处理逻辑。一个拓扑由多个 bolts(处理数据的组件) ...

    storm DRPC简单例程

    "storm DRPC简单例程" 指的是使用Apache Storm分布式实时计算系统中的DRPC(Distributed Remote Procedure Calls)功能的一个基础示例。DRPC允许用户在Storm集群上执行分布式远程过程调用,使得在分布式环境中进行高...

    storm的测试源码

    在"storm-starter-master"这个压缩包中,我们可以找到Storm的一些基础示例代码,这是学习Storm的绝佳起点。这些示例展示了如何构建和部署简单的拓扑结构,以及如何定义和使用Spout和Bolt。 1. **基本概念理解**: ...

    storm-wordcount例子

    "storm-wordcount"是Storm中的一个经典示例,用于演示如何处理实时数据流并进行简单的统计计算,类似于Hadoop的WordCount程序。这个例子的核心目标是统计输入文本中每个单词出现的次数。 在Storm中,数据流被抽象为...

Global site tag (gtag.js) - Google Analytics