`
xinklabi
  • 浏览: 1588011 次
  • 性别: Icon_minigender_1
  • 来自: 吉林
文章分类
社区版块
存档分类
最新评论

Strom介绍以及示例

 
阅读更多
转自:http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis

简单和明了,Storm让大数据分析变得轻松加愉快。

当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是Storm —— Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。

Storm

对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:

  • 易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
  • 每条信息的处理都可以得到保证。
  • Storm集群管理简易。
  • Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
  • 尽管通常使用Java,Storm中的topology可以用任何语言设计。

当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:

  • 从Storm官方下载Storm安装文件
  • 将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。

Storm组件

Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。

主节点:

主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。

工作节点:

工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。

Zookeeper

Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。

Spout:

简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。

Bolt:

Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。

Stream Groupings:

Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。

2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。

3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。

5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。

6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。

项目实施

当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。

下一节将对用例进行详细介绍。

临界分析

这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。

  • 瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
  • 时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。

Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。

XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。

Figure 1:Storm中建立的topology,用以实现数据实时处理

如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。

Spout的实现

Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)

Figure2:数据从日志文件到Spout的流程图

Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。

Listing Two:用以描述日志文件的XML文件。

  1. <TUPLEINFO> 
  2. <FIELDLIST> 
  3. <FIELD> 
  4. <COLUMNNAME>vehicle_number</COLUMNNAME> 
  5. <COLUMNTYPE>string</COLUMNTYPE> 
  6. </FIELD> 
  7.  
  8. <FIELD>
  9. <COLUMNNAME>speed</COLUMNNAME> 
  10. <COLUMNTYPE>int</COLUMNTYPE> 
  11. </FIELD> 
  12.  
  13. <FIELD> 
  14. <COLUMNNAME>location</COLUMNNAME> 
  15. <COLUMNTYPE>string</COLUMNTYPE> 
  16. </FIELD> 
  17. </FIELDLIST> 
  18. <DELIMITER>,</DELIMITER> 
  19. </TUPLEINFO>   

通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。

Spout的实现步骤:

  • 对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
  • 在数据得到了字段的说明后,将其转换成tuple。
  • 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。

Spout的具体编码在Listing Three中显示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。

  1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   
  2. {   
  3.            _collector = collector;   
  4.          try   
  5.          {   
  6.          fileReader  =  new BufferedReader(new FileReader(new File(file)));  
  7.          }  
  8.          catch (FileNotFoundException e)  
  9.          {  
  10.          System.exit(1);   
  11.          }  
  12. }                                                          
  13.  
  14. public void nextTuple()  
  15. {  
  16.          protected void ListenFile(File file)  
  17.          {  
  18.          Utils.sleep(2000);  
  19.          RandomAccessFile access = null;  
  20.          String line = null;   
  21.             try   
  22.             {  
  23.                 while ((line = access.readLine()) != null)  
  24.                 {  
  25.                     if (line !=null)  
  26.                     {   
  27.                          String[] fields=null;  
  28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   
  29.                           else   
  30.                           fields = line.split  (tupleInfo.getDelimiter());   
  31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
  32.                     }  
  33.                }  
  34.             }  
  35.             catch (IOException ex){ }  
  36.             }  
  37. }  
  38.  
  39. public void declareOutputFields(OutputFieldsDeclarer declarer)  
  40. {  
  41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
  43.       {  
  44.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
  45.       }  
  46. declarer.declare(new Fields(fieldsArr));  
  47. }      

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

  • 临界值栏数检查(拆分成字段的数目)
  • 临界值数据类型(拆分后字段的类型)
  • 临界值出现的频数
  • 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类

  1. public class ThresholdInfo implementsSerializable  
  2.  
  3. {    
  4.         private String action;   
  5.         private String rule;   
  6.         private Object thresholdValue;  
  7.         private int thresholdColNumber;   
  8.         private Integer timeWindow;   
  9.         private int frequencyOfOccurence;   
  10. }   

基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

Listing Five:临界值检测代码段

  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     if(tuple!=null)   
  4.     {  
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();   
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();   
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();  
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();  
  11.          if(thresholdDataType.equalsIgnoreCase("string"))  
  12.          {  
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
  14.              String frequencyChkOp = thresholdInfo.getAction();  
  15.              if(timeWindow!=null)  
  16.              {  
  17.                  long curTime = System.currentTimeMillis();  
  18.                  long diffInMinutes = (curTime-startTime)/(1000);  
  19.                  if(diffInMinutes>=timeWindow)  
  20.                  {  
  21.                      if(frequencyChkOp.equals("=="))  
  22.                      {  
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  24.                           {  
  25.                               count.incrementAndGet();  
  26.                               if(count.get() > frequency)  
  27.                                   splitAndEmit(inputTupleList,collector);  
  28.                           }  
  29.                      }  
  30.                      else if(frequencyChkOp.equals("!="))  
  31.                      {  
  32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  33.                          {  
  34.                               count.incrementAndGet();  
  35.                               if(count.get() > frequency)  
  36.                                   splitAndEmit(inputTupleList,collector);  
  37.                           }  
  38.                       }  
  39.                       else                         System.out.println("Operator not supported");   
  40.                   }  
  41.               }  
  42.               else 
  43.               {  
  44.                   if(frequencyChkOp.equals("=="))  
  45.                   {  
  46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  47.                       {  
  48.                           count.incrementAndGet();  
  49.                           if(count.get() > frequency)  
  50.                               splitAndEmit(inputTupleList,collector);  
  51.                           }  
  52.                   }  
  53.                   else if(frequencyChkOp.equals("!="))  
  54.                   {  
  55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  56.                        {  
  57.                            count.incrementAndGet();  
  58.                            if(count.get() > frequency)  
  59.                                splitAndEmit(inputTupleList,collector);  
  60.                           }  
  61.                    }  
  62.                }  
  63.             }  
  64.             else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))  
  65.             {  
  66.                 String frequencyChkOp = thresholdInfo.getAction();  
  67.                 if(timeWindow!=null)  
  68.                 {  
  69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
  70.                      long curTime = System.currentTimeMillis();  
  71.                      long diffInMinutes = (curTime-startTime)/(1000);  
  72.                      System.out.println("Difference in minutes="+diffInMinutes);  
  73.                      if(diffInMinutes>=timeWindow)  
  74.                      {  
  75.                           if(frequencyChkOp.equals("<"))  
  76.                           {  
  77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
  78.                               {  
  79.                                    count.incrementAndGet();  
  80.                                    if(count.get() > frequency)  
  81.                                        splitAndEmit(inputTupleList,collector);  
  82.                               }  
  83.                           }  
  84.                           else if(frequencyChkOp.equals(">"))  
  85.                           {  
  86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
  87.                                 {  
  88.                                    count.incrementAndGet();  
  89.                                    if(count.get() > frequency)  
  90.                                        splitAndEmit(inputTupleList,collector);  
  91.                                }  
  92.                            }  
  93.                            else if(frequencyChkOp.equals("=="))  
  94.                            {  
  95.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
  96.                               {  
  97.                                   count.incrementAndGet();  
  98.                                   if(count.get() > frequency)  
  99.                                       splitAndEmit(inputTupleList,collector);  
  100.                                }  
  101.                            }  
  102.                            else if(frequencyChkOp.equals("!="))  
  103.                            {  
  104.     . . .  
  105.                             }  
  106.                        }  
  107.              }  
  108.       else 
  109.           splitAndEmit(null,collector);  
  110.       }  
  111.       else 
  112.      {  
  113.            System.err.println("Emitting null in bolt");  
  114.            splitAndEmit(null,collector);  
  115.     }  

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。

DBWriterBolt

经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。

  1. public void prepare( Map StormConf, TopologyContext context )   
  2. {         
  3.     try   
  4.     {  
  5.         Class.forName(dbClass);  
  6.     }   
  7.     catch (ClassNotFoundException e)   
  8.     {  
  9.         System.out.println("Driver not found");  
  10.         e.printStackTrace();  
  11.     }  
  12.    
  13.     try   
  14.     {  
  15.        connection driverManager.getConnection(   
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
  18.    
  19.        StringBuilder createQuery = new StringBuilder(  
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
  21.        for(Field fields : tupleInfo.getFieldList())  
  22.        {  
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))  
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
  25.            else 
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
  27.        }  
  28.        createQuery.append("thresholdTimeStamp timestamp)");  
  29.        connection.prepareStatement(createQuery.toString()).execute();  
  30.    
  31.        // Insert Query  
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
  33.        String tempCreateQuery = new String();  
  34.        for(Field fields : tupleInfo.getFieldList())  
  35.        {  
  36.             insertQuery.append(fields.getColumnName()+",");  
  37.        }  
  38.        insertQuery.append("thresholdTimeStamp").append(") values (");  
  39.        for(Field fields : tupleInfo.getFieldList())  
  40.        {  
  41.            insertQuery.append("?,");  
  42.        }  
  43.    
  44.        insertQuery.append("?)");  
  45.        prepStatement = connection.prepareStatement(insertQuery.toString());  
  46.     }  
  47.     catch (SQLException e)   
  48.     {         
  49.         e.printStackTrace();  
  50.     }         
  51. }  

数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。

Listing Seven:数据插入的代码部分。

  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     batchExecuted=false;  
  4.     if(tuple!=null)  
  5.     {  
  6.        List&#60;Object&#62; inputTupleList = (List&#60;Object&#62;) tuple.getValues();  
  7.        int dbIndex=0;  
  8.        for(int i=0;i&#60;tupleInfo.getFieldList().size();i++)  
  9.        {  
  10.            Field field = tupleInfo.getFieldList().get(i);  
  11.            try {  
  12.                dbIndex = i+1;  
  13.                if(field.getColumnType().equalsIgnoreCase("String"))               
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))  
  16.                    prepStatement.setInt(dbIndex,  
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));  
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))  
  19.                    prepStatement.setLong(dbIndex,   
  20.                        Long.parseLong(inputTupleList.get(i).toString()));  
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))  
  22.                    prepStatement.setFloat(dbIndex,   
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));  
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))  
  25.                    prepStatement.setDouble(dbIndex,   
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));  
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))  
  28.                    prepStatement.setShort(dbIndex,   
  29.                        Short.parseShort(inputTupleList.get(i).toString()));  
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))  
  31.                    prepStatement.setBoolean(dbIndex,   
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));  
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))  
  34.                    prepStatement.setByte(dbIndex,   
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));  
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))  
  37.                {  
  38.                   Date dateToAdd=null;  
  39.                   if (!(inputTupleList.get(i) instanceof Date))    
  40.                   {    
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
  42.                        try   
  43.                        {  
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());  
  45.                        }  
  46.                        catch (ParseException e)   
  47.                        {  
  48.                            System.err.println("Data type not valid");  
  49.                        }  
  50.                    }    
  51.                    else 
  52.                    {  
  53.             dateToAdd = (Date)inputTupleList.get(i);  
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
  55.             prepStatement.setDate(dbIndex, sqlDate);  
  56.             }     
  57.             }   
  58.         catch (SQLException e)   
  59.         {  
  60.              e.printStackTrace();  
  61.         }  
  62.     }  
  63.     Date now = new Date();            
  64.     try 
  65.     {  
  66.         prepStatement.setTimestamp(dbIndex+1new java.sql.Timestamp(now.getTime()));  
  67.         prepStatement.addBatch();  
  68.         counter.incrementAndGet();  
  69.         if (counter.get()== batchSize)   
  70.         executeBatch();  
  71.     }   
  72.     catch (SQLException e1)   
  73.     {  
  74.         e1.printStackTrace();  
  75.     }             
  76.    }  
  77.    else 
  78.    {  
  79.         long curTime = System.currentTimeMillis();  
  80.        long diffInSeconds = (curTime-startTime)/(60*1000);  
  81.        if(counter.get()&#60;batchSize && diffInSeconds&#62;batchTimeWindowInSeconds)  
  82.        {  
  83.             try {  
  84.                 executeBatch();  
  85.                 startTime = System.currentTimeMillis();  
  86.             }  
  87.             catch (SQLException e) {  
  88.                  e.printStackTrace();  
  89.             }  
  90.        }  
  91.    }  
  92. }  
  93.    
  94. public void executeBatch() throws SQLException  
  95. {  
  96.     batchExecuted=true;  
  97.     prepStatement.executeBatch();  
  98.     counter = new AtomicInteger(0);  

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

  • 通过TopologyBuilder建立topology。
  • 使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
  • 提交topology。

Listing Eight:建立和执行topology。

  1. public class StormMain  
  2. {  
  3.      public static void main(String[] args) throws AlreadyAliveException,   
  4.                                                    InvalidTopologyException,   
  5.                                                    InterruptedException   
  6.      {  
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();  
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();  
  10.           TopologyBuilder builder = new TopologyBuilder();  
  11.           builder.setSpout("spout", parallelFileSpout, 1);  
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
  14.           if(this.argsMain!=null && this.argsMain.length &#620)   
  15.           {  
  16.               conf.setNumWorkers(1);  
  17.               StormSubmitter.submitTopology(   
  18.                    this.argsMain[0], conf, builder.createTopology());  
  19.           }  
  20.           else 
  21.           {      
  22.               Config conf = new Config();  
  23.               conf.setDebug(true);  
  24.               conf.setMaxTaskParallelism(3);  
  25.               LocalCluster cluster = new LocalCluster();  
  26.               cluster.submitTopology(  
  27.               "Threshold_Test", conf, builder.createTopology());  
  28.           }  
  29.      }  

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择

分享到:
评论

相关推荐

    Strom优化

    在本篇文章中,我们将深入探讨Strom的核心概念、优化策略以及如何利用它来提升数据处理效率。 1. **Strom核心概念**: - **Spout**:Spout是Storm的数据输入源,负责生成数据流。它可以是从数据库、消息队列或其他...

    超级简单入门的strom的java代码demo

    本示例项目适用于Java开发者,特别是初学者,帮助他们快速上手Storm。 【描述】中的信息表明,此压缩包包含了一个可以直接在Eclipse集成开发环境中运行的Storm项目。这意味着项目已经配置好了所有必要的依赖和设置...

    strom的安装

    要测试集群,可以运行 Storm 提供的示例 Topology,例如 WordCount,通过 `storm jar` 命令提交 Topology,然后使用 `storm kill` 命令可以终止运行的 Topology。 总的来说,Strom 的强大在于其灵活的实时处理能力...

    【Storm入门级JAVA示例演示】

    在这个示例中,我们将探讨如何使用Java编写Storm拓扑结构,以及如何在本地或集群环境中运行它们。 首先,理解Storm的基本概念至关重要。一个Storm拓扑是数据流处理任务的逻辑表示,由Spouts(数据源)和Bolts(数据...

    storm 的安装使用

    通常,这样的示例会包括配置文件、拓扑结构定义(如`.java`或`.jars`)以及运行和测试示例的说明。解压并研究这个stormdemo,可以帮助理解Storm的工作原理和如何编写实时数据处理的Topologies。 总结来说,Apache ...

    storm开发jar包以及storm例子源码

    在压缩包中的"strom开发"文件可能包含了上述过程的详细步骤,源码示例,以及可能的配置文件和文档。通过学习和实践这些材料,开发者可以深入理解Storm的工作原理,掌握实时数据处理的基本技能。

    07_Nexus的介绍和安装

    对于安装步骤,文档中详细介绍了在Windows系统上安装Nexus的过程,包括下载安装包、解压缩、配置环境变量以及修改配置文件等步骤。首先,需要从官方网站下载Nexus的安装包。Nexus为不同的操作系统提供了不同格式的...

    workshop-tinkerforge-strom:通过JavaFX可视化电流和电压的研讨会

    【压缩包子文件的文件名称列表】:"workshop-tinkerforge-strom-master"可能包含了该研讨会的源代码、教程文档、示例项目和其他相关资源。这个目录名暗示着它是项目的主要分支或原始版本,通常在GitHub等版本控制...

    php 输入输出流详解及示例代码

    本文将深入探讨PHP的输入输出流,特别是如何在PHP中接收和发送数据,以及如何处理POST请求和不同类型的表单编码。 首先,了解HTTP协议的基本原理是必要的。HTTP协议定义了客户端(通常是浏览器)与服务器之间交换...

    ePHASORsim_STM32F103_

    1. **STM32F103微控制器**:介绍其特性,如Cortex-M3内核、内存配置、外设接口,以及如何在开发环境中设置和编程。 2. **ePHASORsim软件**:解释该软件的工作原理,如何创建和配置电力系统模型,以及如何进行动态...

    85精美英文简历模板.docx

    以下是一份基于模板的示例简历: 个人信息: - 姓名:Adam Miller - 电子邮件:adam.miller@mail.com - 地址:3099 Julia Street, Rome - Italy - 电话:+1325 7894 5612 - 社交媒体链接:Facebook、Twitter、...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 24.使用Zookeeper构建应用(共34页).pptx

    Hadoop是大数据处理的核心框架,包括HDFS(分布式文件系统)、MapReduce(分布式计算模型)以及YARN(资源调度器)等方面,课程详细介绍了这些组件的安装入门、开发环境配置、实战应用以及深入原理。 【Zookeeper】...

    大数据课程体系

    - **使用Strom开发一个WordCount例子**:通过一个简单的WordCount示例学习Storm的使用。 - **Storm程序本地模式debug、Storm程序远程debug**:调试Storm程序的技巧。 - **Storm事物处理**:实现事务性的数据处理流程...

    bake:如何制作一本书

    我非常感谢 Chris Strom 和 Tom Stuart,他们回答了我最初的很多问题,也感谢的作者,我从中抄录了一些工具链。 我需要什么? 您需要这些程序,如果您使用的是 Ubuntu,则可以使用apt-get install 。 asciidoc ...

    trident-kafka-without-mongodb-state-impl:三叉戟-kafka-without-mongodb-state-impl

    这是 strom 三叉戟拓扑的示例。 我正在阅读 kafka 源代码并在三叉戟风暴中进行处理。 三叉戟-kafka-without-mongodb-state-impl 运行命令: ./storm jar /home/ec2-user/software/trident-kafka-without-state-...

    twitter4j-2.2.6.zip

    2. **readme.txt**:通常包含项目的基本信息、安装指南、快速入门示例等,是开发者开始使用Twitter4j的第一步。 3. **pom.xml**:这是一个Maven项目的配置文件,包含了项目的依赖、构建指令等信息,方便用户在Maven...

    python集合用法实例分析

    通过以上示例,我们可以看到Python集合的强大功能以及其在实际应用中的灵活性。掌握集合的基本操作和高级特性对于进行高效的数据处理和算法实现非常重要。希望本文所述内容能够帮助读者更好地理解和运用Python中的...

    skyfree-storm-hbase

    - "skyfree-storm-hbase-master" 可能包含源代码、配置文件、测试用例以及文档,帮助用户理解和运行这个集成示例。 综上所述,"skyfree-storm-hbase"项目展示了如何利用Apache Storm进行实时数据处理,并将处理...

Global site tag (gtag.js) - Google Analytics