LogProcess.java
package mytest;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Map;
import mytest.ThroughputTest.GenSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LogProcess {
public static class FileSpout extends BaseRichSpout {
/**
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector _collector;
private BufferedReader br;
private String dataFile;
//定义spout文件
FileSpout(String dataFile){
this.dataFile = dataFile;
}
//定义如何读取spout文件
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
File csv = new File(dataFile); // log file
try {
br = new BufferedReader(new FileReader(csv));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//获取下一个tuple的方法
@Override
public void nextTuple() {
// TODO Auto-generated method stub
try {
String line = null;
while ((line = br.readLine()) != null) {
_collector.emit(new Values(line));
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
public static class Process extends BaseRichBolt{
private String _seperator;
private String _outFile;
PrintWriter pw;
private OutputCollector _collector;
private BufferedWriter bw;
public Process(String seperator,String outFile) {
this._seperator = seperator;
this._outFile = outFile;
}
//把输出结果保存到外部文件里面。
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this._collector = collector;
File out = new File(_outFile);
try {
// br = new BufferedWriter(new FileWriter(out));
bw = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(out, true)));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
//blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String line = input.getString(0);
// System.out.println(line);
String[] str = line.split(_seperator);
System.out.println(str[2]);
try {
bw.write(str[2]+",bkeep"+"\n");
bw.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
_collector.emit(new Values(line));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
String dataFile = argv[0]; //输入文件
String seperator = argv[1]; //分隔符
String outFile = argv[2]; //输出文件
boolean distribute = Boolean.valueOf(argv[3]); //本地模式还是集群模式
TopologyBuilder builder = new TopologyBuilder(); //build一个topology
builder.setSpout("spout", new FileSpout(dataFile), 1); //指定spout
builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout"); //指定bolt,包括bolt、process和grouping
Config conf = new Config();
if(distribute){
StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());
}else{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogProcess", conf, builder.createTopology());
}
}
}
运行
[admin@vkvm161064 guandao]$ pwd
/home/admin/guandao
[admin@vkvm161064 guandao]$ ls
out.txt storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar tmp.txt
输入文件:
[admin@vkvm161064 guandao]$ cat tmp.txt
a,b,c,d
1,2,3,4
A,B,C,D
xx,ff,ff,ss
xx,ff,alibaba,ss
xx,ff,taobao,ss
xx,xx,xx,xx
xx,xx,ll,xx
xx,xx,hero,xx
输出文件:
[admin@vkvm161064 guandao]$ cat out.txt
c,bkeep
3,bkeep
C,bkeep
ff,bkeep
alibaba,bkeep
taobao,bkeep
xx,bkeep
ll,bkeep
hero,bkeep
提交topology
[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase
语法:storm jar 自己开发的topology topology_name inputfile 分隔符 outputfile true/false(true代表集群运行)
相关推荐
在"Storm简单示例"中,Bolt可能会实现诸如计数、词频统计等简单的数据处理任务。 4. **Stream Grouping**:这是控制数据如何在Bolt之间流动的方式。有多种分组策略,如shuffle grouping(随机分组)、fields ...
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...
通过这个示例,开发者可以学习如何在Storm中创建和配置Topology,理解EPL语句的编写,以及如何在Eclipse中调试和运行实时流处理任务。此外,还能了解到如何实现计算任务的隔离,提高系统的并发处理能力和性能。对于...
Storm在大数据领域的应用广泛,尤其适合实时分析和在线机器学习任务。它的核心概念包括拓扑(Topology)、节点(Bolt)和源(Spout)。 在我们的"storm+kafka源码示例"中,拓扑(Topology)是Storm的基本工作单元,...
在分布式计算领域,Apache Storm是一个实时计算系统,它被广泛用于处理无边界数据流,确保数据...通过学习这个示例,开发者能够掌握如何构建一个简单的Storm应用,从而为进一步探索复杂的实时数据处理任务打下基础。
【标题】"storm+mq整合完整示例"中涉及的知识点主要集中在分布式计算框架Apache Storm与消息队列MQ(如RabbitMQ、Kafka等)的集成应用上,旨在实现数据流的实时处理。以下是对这些知识点的详细阐述: 1. **Apache ...
Storm采用Master-Slave架构,Nimbus作为Master负责任务调度,Supervisor作为Slave执行具体的计算任务。数据流(Stream)由Spout产生,经过一系列Bolt进行处理,形成新的数据流。Bolt可以进行过滤、聚合、映射等操作...
总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...
本资料包提供了一个基于Java的示例代码,将帮助我们了解如何在Java环境中配置和运行Storm集群。 首先,我们要理解Storm的基本概念。Storm由多个组件构成,包括Nimbus(主控节点)、Supervisor(工作节点)、Worker...
8. **监控与故障恢复**:由于Storm的容错机制,如果某个worker节点失败,其任务会被重新分配到其他节点,确保数据处理的连续性。同时,Kafka的消息持久化特性也保证了数据不会丢失。 9. **优化与性能**:在实际部署...
【Storm入门级JAVA示例演示】是一篇关于Apache Storm的初级教程,主要针对Java开发者。Apache Storm是一个开源的分布式实时计算系统,它允许用户连续处理数据流,类似于Hadoop处理批处理任务。在这个示例中,我们将...
6. **Examples**: 压缩包可能还包括示例代码,帮助初学者理解如何构建和部署Storm拓扑。 7. **Libraries**: 额外的库文件,可能包含Storm与外部系统(如Hadoop、Cassandra等)集成所需的依赖。 8. **Docs**: 文档...
标题中的"storm-starter-master"指的是Apache Storm的一个入门示例项目,它是一个开源的分布式实时计算系统。Apache Storm被广泛应用于大数据的实时处理,能够处理无界的数据流,并且保证消息的精确一次处理...
5. `examples` 目录:示例项目,用于演示如何构建和运行 Storm 作业。 6. `jars` 或 `extlib` 目录:用户可以放置自定义的库文件,以便在 Storm 中使用。 7. `LICENSE` 和 `NOTICE` 文件:Apache Storm 的许可和版权...
书中的附录部分则提供了Storm客户端的安装指南、常用命令、Storm集群的安装与部署方法,以及如何运行书中的示例程序。 本书的翻译者提到,通过翻译这本书,他不仅希望能够帮助读者了解Storm,也希望自身的技术能力...
在“Storm demo”中,我们通常会看到如何设置开发环境,创建拓扑结构,以及运行基本的流处理任务。这些示例会涵盖以下关键知识点: 1. **安装与配置**:首先,你需要在本地或者集群环境中安装Storm。这包括下载源码...
在Storm项目中,Maven可以方便地添加对Storm库和其他依赖的引用,执行编译、测试和打包等任务。 3. **Storm拓扑**:Storm中的核心概念是拓扑,它定义了数据流的处理逻辑。一个拓扑由多个 bolts(处理数据的组件) ...
"storm DRPC简单例程" 指的是使用Apache Storm分布式实时计算系统中的DRPC(Distributed Remote Procedure Calls)功能的一个基础示例。DRPC允许用户在Storm集群上执行分布式远程过程调用,使得在分布式环境中进行高...
在"storm-starter-master"这个压缩包中,我们可以找到Storm的一些基础示例代码,这是学习Storm的绝佳起点。这些示例展示了如何构建和部署简单的拓扑结构,以及如何定义和使用Spout和Bolt。 1. **基本概念理解**: ...
"storm-wordcount"是Storm中的一个经典示例,用于演示如何处理实时数据流并进行简单的统计计算,类似于Hadoop的WordCount程序。这个例子的核心目标是统计输入文本中每个单词出现的次数。 在Storm中,数据流被抽象为...