`

Storm实现实时feed信息处理

阅读更多
这篇文章通过用一个如何处理feed数据的应用场景来说明为什么会出现Storm实时计算框架, 对我们自己的技术选型和系统架构设计非常有参考价值.

原文在这里

Storm 成为最近开源社区的一个热门, 其作者Nathan Marz 所在的公司Backtype现在已经被Twitter收购. 该项目的wiki 非常完善. 从这里对Storm做一个全面了解.

目前的场景是这样的, 通过解析xml feed来生成索引数据. 已经通过搭建hadoop来批量生成全量索引. 但是如果需要实时更新数据增量生成索引该如何处理呢?

1.最简单单机方案
用一台机器持续抓取feed数据, 但是这个会受到单机的处理和网络带宽限制, 系统不具备可伸缩性.

2.可伸缩架构方案
通过多台机器搭建集群, 并根据feedId hash值取模将整个feed数据分散到多台机器. 这种方式缺点非常明显: 无法做到系统自动伸缩, 每加入一台机器, 需要重新调整整个集群的feed处理. 另外也无法实现自动容灾(failover), 一台机器挂了,必须手动启动备机.

3.Queue/Worker方案
通过在master上维护一个消息队列(queue), 然后分发到slave机器上的worker进行处理. 该方案能解决上一种方案中的其中一台机器挂掉造成的局部处理失败的问题.

另外这种方案也是类似场景中的常用架构.

不过这种架构也有一些自己的复杂性: 比如消息队列的选择, 消息队列的运维问题.

4.Hadoop方案
采用Hadoop来更新feed, 这里需要编写Map/Reduce Job, 先将更新的feed导入Map中, 然后将所有数据汇总到Reduce, 这里每个Reduce处理其中一部分feed, 需要直到所有Reduce执行完成才能得到最终的索引结果. 因此实时性会大打折扣.
注:我觉得Hadoop更侧重于批量并行处理海量的数据, 而不是来一条更新数据处理一条.

5.Multi Queue/Worker方案
在原来的基础上再加一层Queue, 第一层N个worker从Work Queue获取feed数据, 然后添加到DB Queue, 最后M个(M<N)DB worker从DB Queue中获取数据更新到数据库或者其他地方.

注:老实说我也没看出来为什么需要搞两层Queue. 但是Strom就是这么干的.

6.Storm方案
Storm对Multi Queue/Worker系统的复杂性进行了抽象. 用户只需要编写Topology而无需关系系统的扩展, 容灾以及进程间的通讯.

Storm集群包括一个master节点(Nimbus)和N个slave节点(Supervisor), 节点之间通过ZooKeeper来协调, 进程间通讯采用ZeroMQ. 而Topology不仅可以使用Java实现, 还支持其他动态语言.

这里我们使用FeedSpout来获取feed url并将其发射(emit)给FetcherBolt, 然后FetcherBolt去下载并解析数据.

下面是FeedSpout代码:
public class FeedSpout extends SimpleSpout {

  private static final long serialVersionUID = 1L;
  Queue<String> feedQueue = new LinkedList<String>();
  String[] feeds;

  public FeedSpout(String[] feeds) {
    this.feeds = feeds;
  }

  @Override
  public void nextTuple() {
    String nextFeed = feedQueue.poll();
    if(nextFeed != null) {
      collector.emit(new Values(nextFeed), nextFeed);
    }
  }

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    super.open(conf, context, collector);
    for(String feed: feeds) {
      feedQueue.add(feed);
    }
  }

  @Override
  public void ack(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void fail(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("feed"));
  }
}


这里对例子进行了简化, 在FeedSpout构造器中直接传入feed数据, 然后在open方法中将其添加的内存queue中. 而Storm内部会负责控制nextTuple方法的调用. 这样来保证queue中的数据持续不断的被处理. 如果消息处理成功会调用ack方法, 失败会调用fail方法

下面是FetcherBolt方法:

public class FetcherBolt implements IRichBolt {

  private static final long serialVersionUID = 1L;
  private OutputCollector collector;

  @Override
  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    FeedFetcher feedFetcher = new HttpURLFeedFetcher();
    String feedUrl = input.getStringByField("feed");
    try {
      SyndFeed feed = feedFetcher.retrieveFeed(new URL(feedUrl));
      for(Object obj : feed.getEntries()) {
        SyndEntry syndEntry = (SyndEntry) obj;
        Date entryDate = getDate(syndEntry, feed);
        collector.emit(new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));
      }
      collector.ack(input);
    } catch(Throwable t) {
      t.printStackTrace();
      collector.fail(input);
    }
  }

  private Date getDate(SyndEntry syndEntry, SyndFeed feed) {
    return syndEntry.getUpdatedDate() == null ? (syndEntry.getPublishedDate() == null ? feed.getPublishedDate() : syndEntry.getPublishedDate()) : syndEntry.getUpdatedDate();
  }

  @Override
  public void cleanup() {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("link", "date", "description"));
  }
}


这里对Bolt也做了简化, 在execute方法中根据feed url下载并解析feed数据, 最后将数据发送出去(这里只发送了"link", "date", "description"字段), 这些数据将被下一个Bolt处理(如果有的话).

在Bolt中如果处理数据成功或失败, 将会调用ack或者fail方法. 在Storm内部会用Acker来跟踪Tuple的执行流程. Strom将发射Tuple(会带一个id)的操作称之为Anchoring, 如果一个Tuple在中间某个Bolt处理失败或者超时(内部默认是30s), Storm会向Spout发送FAIL消息, 这样我们就可以采用相应的策略来保证数据可靠.

最后是编写FeedTopology来封装Spout和Bolt:
package datasalt.storm.feeds;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

/**
* This class builds the topology that needs to be submitted to Storm. It puts {@link FeedSpout}, {@link FetcherBolt}
* and {@link ListingBolt} all together.
*
* @author pere
*
*/
public class FeedTopology {

public static StormTopology buildTopology(String[] feeds) {
TopologyBuilder builder = new TopologyBuilder();

// One single feed spout feeding data
builder.setSpout("feedSpout", new FeedSpout(feeds), 1);

// Various (2) fetcher bolts -> shuffle grouping from feed spout
builder.setBolt("fetcherBolt", new FetcherBolt(), 2).shuffleGrouping("feedSpout");
// One single listing bolt calculating statistics
builder.setBolt("listingBolt", new ListingBolt(), 1).globalGrouping("fetcherBolt");

return builder.createTopology();
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(1);
StormSubmitter.submitTopology("feedTopology", conf, buildTopology(Constants.FEEDS));
}
}


更详细的例子可以从这里下载源代码

总结
通过上面的例子来说明Strom是什么, 以及如何使用Storm. 这里我们可以知道, 如果没有Storm, 要实现多层Queue/Worker架构将会是多么复杂. 而有了Storm, 我们只需要在Toplogy中实现我们的业务逻辑然后部署到Storm集群中即可. 而要对系统进行水平扩展, 只需要向Storm集群中添加机器即可.

另外Storm主要专注于实时计算, 而Hadoop则侧重于批量计算.

更多的例子常见这里
分享到:
评论
1 楼 qiuboboy 2012-02-08  
引用

collector.emit(new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));

是不是应该改为
collector.emit(input,new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));

相关推荐

    Storm实时数据处理

    Storm实时数据处理

    基于Storm技术的实时数据处理平台研究与实现.pdf

    本文将重点探讨基于Storm技术的实时数据处理平台的研究与实现。 首先,随着移动互联网和大数据技术的发展,实时数据分析的需求越来越明显,尤其是当传统的大数据分析技术,如基于Hadoop的离线分析或Spark Streaming...

    基于Storm构建实时热力分布项目实战-地址.txt

    - **开发Bolt与Spout**:编写Storm Topology中的Spout(数据源)和Bolt(数据处理单元),实现数据的实时处理逻辑。 - **热力图算法设计**:根据业务需求设计热力图生成算法,确保热力图能够准确反映数据分布特点。 ...

    大数据-Storm实时数据处理

    系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...

    storm实时数据处理

    Storm可以与Hadoop、Cassandra、Kafka等大数据处理框架集成,实现数据的实时摄入、处理和存储,构建完整的实时数据管道。 七、监控与调试 Storm提供了丰富的监控接口和工具,如Zabbix、Ganglia等,帮助开发者实时...

    基于Storm构建实时热力分布项目实战.txt

    Apache Storm作为一款开源的分布式实时计算系统,在处理大规模数据流方面表现出色,被广泛应用于实时分析、在线机器学习、持续计算等领域。本实战项目旨在通过构建一个实时热力分布系统,实现对大量实时数据的高效...

    ( Storm实时数据处理.zip )PDF 高清版

    **Storm实时数据处理** Apache Storm是一个开源的分布式实时计算系统,它被设计用来处理无界数据流,确保每个事件都能得到正确的处理,即使在高并发和大规模数据输入的情况下也能保持低延迟。本资料《Storm实时数据...

    Storm实时数据处理.pdf

    根据提供的文件信息,“Storm实时数据处理.pdf”,我们可以深入探讨与Apache Storm相关的实时数据处理技术。 ### Apache Storm简介 Apache Storm是一种分布式实时计算系统,能够处理无界数据流,即连续不断的数据...

    Storm API实现词频统计

    在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...

    Storm实时数据处理-超清文字版.pdf

    《Storm实时数据处理》这本书是大数据处理领域的重要参考资料,它主要聚焦于Apache Storm这一开源分布式实时计算系统。Storm被广泛应用于实时分析、持续计算、分布式RPC、机器学习等多个场景,其核心理念是允许用户...

    使用Storm实现实时大数据分析.doc

    通过Spout获取数据,Bolt进行处理,Stream Groupings定制数据流向,配合Nimbus和Supervisor的集群管理,以及Zookeeper的协调,Storm能够实现复杂实时分析任务的高效执行。在应对现代大数据挑战时,Storm是一个强大的...

    使用Storm实现实时大数据分析!

    考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是Storm ——Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理...

    《Storm实时数据处理》PDF.zip

    系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...

    基于Storm的实时大数据处理.pdf

    【基于Storm的实时大数据处理】 在当今信息化时代,大数据的快速增长和实时处理需求催生了诸如云计算和大数据技术的发展。Storm,作为一个实时流数据处理系统,是应对这些挑战的关键工具之一。它由Twitter开源,...

    基于Storm的实时大数据处理.docx

    【基于Storm的实时大数据处理】 随着互联网的飞速发展,数据量呈指数级增长,对实时数据处理的需求日益增强。传统的批处理系统如Hadoop的MapReduce在应对实时响应和低延迟需求时显得力不从心。这时,Twitter推出的...

    Storm分布式实时计算在物联网系统中的应用.pdf

    在物联网系统中,Storm的应用背景主要是处理设备生成的海量实时数据,例如传感器数据、设备状态信息等。通过实时分析这些数据,可以迅速做出响应,例如预警、控制决策等。安装Storm涉及设置Nimbus主控节点、...

    《Storm实时数据处理》PDF

    《Storm实时数据处理》

    基于Storm的分布式实时信号处理系统.pdf

    基于Storm的分布式实时信号处理系统是一种利用了Storm这一流式云计算系统进行实时数据处理的架构。Storm系统由Twitter公司开发,它是一个开源的分布式实时计算系统,提供了实时处理数据流的高效平台。Storm系统的...

    Storm实战构建大数据实时计算

    Storm官方网站有段简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

Global site tag (gtag.js) - Google Analytics