`
sunwinner
  • 浏览: 203784 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Getting Started with Apache Crunch

 
阅读更多

The Apache Crunch Java library provides a framework for writing, testing, and running MapReduce pipelines. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run. Running on top of Hadoop MapReduce, the Apache Crunch library is a simple Java API for tasks like joining and data aggregation that are tedious to implement on plain MapReduce. The APIs are especially useful when processing data that does not fit naturally into relational model, such as time series, serialized object formats like protocol buffers or Avro records, and HBase rows and columns. For more information, you can visit Apache Crunch Homepage.

 

In this blog post, I'll show you how to write a word counting programming that you might familiar with if you know Hadoop, it's Hello World in Hadoop world. Firstly, let's look at the basic concepts in Crunch, including its type system and pipelined architecture.

 

  • Data Pipelines:
The crunch pipeline is represented with the Pipeline interface and MRPipeline implementation class, as you can see in below class outline:

 As you can see, the pipeline class contains methods to read and write collections. These collection classes have methods to perform operations on the contents of collections to produce a new result collection. Therefore, a pipeline consists of the definition of one or more input collections, a number of operations on these intermediary collections, and the writing of the collections to data sinks. The execution of all the actual pipeline operations is delayed until the run or done methods are called, at which point Crunch translates the pipeline into one or more MapReduce jobs and starts their execution.
The Pipeline interface defines a readTextFile method that takes in a String and returns a PCollection of Strings. In addition to text files, the library supports reading data from SequenceFiles and Avro container files, via the SequenceFileSource and AvroFileSource classes defined in the org.apache.crunch.io package. Note that each PCollection is a reference to a source of data, no data is actually loaded into a PCollection on the client machine.

 

  • Collections:
In Crunch the collection interface represent a distributed set of elements. A collection can be created in one of two ways: as a result of a read method invocation on the Pipeline class, or as a result of an operation on another collection. There are three types of collections in Crunch, as below figure shown:
 Collection classes contains a number of methods, which operate on the contents of the collections, these operations are executed in either the map or reduce phase. Among them, the PGroupedTable is a special collection that's a result of calling groupByKey method on the PTable, this results in a reduce phase being executed to perform the grouping.

  • Data Functions:
Functions can be applied to the collections that you just saw using parallelDo method in the collection interface. All the parallelDo methods take a DoFn inplementation, which perform the actual operation on the collection in MapReduce, you can see the DoFn class in below figure:

 As you can see, all DoFn instances are required to be java.io.Serializable. This is a key aspect of the library's design: once a particular DoFn is assigned to the Map or Reduce stage of a MapReduce job, all of the state of that DoFn is serialized so that it may be distributed to all of the nodes in the Hadoop cluster that will be running that task. There are two important implications of this for developers: 
  1. All member values of a DoFn must be either serializable or marked as transient. 
  2. All anonymous DoFn instances must be defined in a static method or in a class that is itself serializable. 
Sometimes you will need to work with non-serializable objects inside of a DoFn, every DoFn provides an initialize method that is called before the process method is ever called so that any initialization tasks, such as creating a non-serializable member variable, can be performed before processing begins. Similarly, all DoFn instances have a cleanup method that may be called after processing has finished to perform any required cleanup tasks.
Crunch comes with a bunch of built-in operations (joining, grouping, counting, etc.), which represent MapReduce operations that you commonly perform on you data. Because Crunch already has these operations defined, you don't need to wrestle with MapReduce to write your own. You can also define your own custom operations if you want to. The class hierarchy of built-in operations shown as below figure:

 
  • Types and Serialization

The parallelDo method on the PCollection interface all take either a PType or PTableType argument, depending on whether the result was a PCollection or PTable. These interfaces are used by Crunch to map between the data types used in the Crunch pipeline, and the serialization format used when reading or writing data in HDFS. The class hierarchy of types in Crunch shown as below:

 As you can see, Crunch has serializatio support for both native Hadoop Writable classes as well as Avro types.

 

Now let's look at the WordCount in Crunch:

package crunch;
import org.apache.crunch.*;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.types.writable.Writables;

public class WordCount {

    public static void main(String[] args) throws Exception {

        Pipeline pipeline = new MRPipeline(WordCount.class);
        PCollection<String> lines = pipeline.readTextFile(args[0]);

        PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() {
            public void process(String line, Emitter<String> emitter) {
                for (String word : line.split("\\s+")) {
                    emitter.emit(word);
                }
            }
        }, Writables.strings());

        PTable<String, Long> counts = Aggregate.count(words);

        pipeline.writeTextFile(counts, args[1]);
        pipeline.run();
    }
}
Simple enough, right? Let's go through this example line by line.
Step 1: creating a Pipeline and reference a text file. Note the keyword reference here, no data is actually loaded into a PCollection on the client machine.
 
Step 2: Splitting the lines of text into words. 
  • The first argument to parallelDo is a string that is used to identify this step in the pipeline. When a pipeline is composed into a series of MapReduce jobs, it is often the case that multiple stages will run within the same Mapper or Reducer. Having a string that identifies each processing step is useful for debugging errors that occur in a running pipeline.
  • The second argument to parallelDo is an anonymous subclass of DoFn. Each DoFn subclass must override the process method, which takes in a record from the input PCollection and an Emitter object that may have any number of output values written to it. In this case, our DoFn splits each lines up into words, using a blank space as a separator, and emits the words from the split to the output PCollection.
  • The last argument to parallelDo is an instance of the PType interface, which specifies how the data in the output PCollection is serialized. While the API takes advantage of Java Generics to provide compile-time type safety, the generic type information is not available at runtime. The job planner needs to know how to map the records stored in each PCollection into a Hadoop-supported serialization format in order to read and write data to disk. Two serialization implementations are supported in Crunch via the PTypeFamily interface: a Writable-based system that is defined in the org.apache.crunch.types.writable package, and an Avro-based system that is defined in the org.apache.crunch.types.avro package. Each implementation provides convenience methods for working with the common PTypes (Strings, longs, bytes, etc.) as well as utility methods for creating PTypes from existing Writable classes or Avro schemas.

Step 3: counting the words. This is acomplished by the Aggregate.count(...) method, let's look at it's implementation.

  /**
   * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count
   * of their occurrences.
   */
  public static <S> PTable<S, Long> count(PCollection<S> collect) {
    // get the PTypeFamily that is associated with the PType for the collection.
    PTypeFamily tf = collect.getTypeFamily();
    return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() {
      public Pair<S, Long> map(S input) {
        return Pair.of(input, 1L);
      }
    }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey()
        .combineValues(Aggregators.SUM_LONGS());
  }
The call to parallelDo converts each record in this PCollection into a Pair of the input record and the number 1 by extending the MapFn convenience subclass of DoFn, and uses the tableOf method of the PTypeFamily to specify that the returned PCollection should be a PTable instance, with the key being the PType of the PCollection and the value being the Long implementation for this PTypeFamily.

 

The groupByKey operation may only be applied to a PTable, and returns an instance of the PGroupedTable interface, which references the grouping of all of the values in the PTable that have the same key. The groupByKey operation is what triggers the reduce phase of a MapReduce.
// In Aggregators.java  
/**
   * Sum up all {@code long} values.
   * @return The newly constructed instance
   */
  public static Aggregator<Long> SUM_LONGS() {
    return new SumLongs();
  }

  private static class SumLongs extends SimpleAggregator<Long> {
    private long sum = 0;

    @Override
    public void reset() {
      sum = 0;
    }

    @Override
    public void update(Long next) {
      sum += next;
    }

    @Override
    public Iterable<Long> results() {
      return ImmutableList.of(sum);
    }
  }
 
The combineValues operator takes a CombineFn as an argument, which is a specialized subclass of DoFn that operates on an implementation of Java's Iterable interface. The use of combineValues (as opposed to parallelDo) signals to the planner that the CombineFn may be used to aggregate values for the same key on the map side of a MapReduce job as well as the reduce side.
 
Step 4: writing the output and running the pipeline. The writeTextFile convenience method indicating that a PCollection should be written to a text file. There are also output targets for SequenceFiles and Avro container files, available in the org.apache.crunch.io package.
 
Step 5: after you are finished constructing a pipeline and specifying the output destinations, call the pipeline's blocking run method in order to compile the pipeline into one or more MapReduce jobs and execute them. You can also call the non-blocking runAsync() method to return PipeLineExecution object to allow clients to control job execution.
  /**
   * Constructs and executes a series of MapReduce jobs in order to write data
   * to the output targets.
   */
  PipelineResult run();

  /**
   * Constructs and starts a series of MapReduce jobs in order ot write data to
   * the output targets, but returns a {@code ListenableFuture} to allow clients to control
   * job execution.
   * @return
   */
  PipelineExecution runAsync();
 The class hierarchy of PipelineExecution shown as below:

 With the returned instance of PipelineExecution, you can control a Crunch pipeline as it runs, this interface is implemented to be thread safe. For example, you can query the job status by calling getStatus(), wait for a specified time interval by calling waitFor(long, TimeUnit),  kill the job by calling kill() and so on, see the class diagram for details.
  • 大小: 42.6 KB
  • 大小: 206.7 KB
  • 大小: 43 KB
  • 大小: 43.5 KB
  • 大小: 34.6 KB
  • 大小: 43.8 KB
分享到:
评论

相关推荐

    [APACHE]Apache Crunch - Getting Started.pdf

    文档《Getting Started》将引导读者通过创建一个简单的Crunch管道来统计文本文件中单词的数量这一分布式计算的"Hello World"过程。在创建这个管道的过程中,会解释Crunch的核心概念以及如何使用这些概念来创建有效且...

    Apache Hadoop---Crunch.docx

    Apache Hadoop 中的 Apache Crunch 是一个专为简化 MapReduce 作业设计的 Java 类库,它构建于 FlumeJava 之上,旨在提供一个高效且灵活的数据处理框架。Crunch 提供了丰富的 API,使得开发人员能够更轻松地创建复杂...

    sprunch:基于 Apache Crunch 的极简 Scala API

    "Sprunch" 是一个基于 Apache Crunch 的实验性 Scala API,它旨在提供一个更为简洁的接口,使得 Scala 开发者在处理大规模数据时能更方便地利用 Apache Crunch 的功能。Apache Crunch 是一个 Java 库,设计用于简化...

    secondary-sort:Apache Crunch中辅助排序的演示

    Apache Crunch是一个基于Google的FlumeJava库构建的Java框架,它简化了Hadoop MapReduce的编程模型,提供了更高级别的抽象,使得开发人员可以编写更简洁、可读性强的代码来处理大规模数据。在本示例中,我们将关注一...

    varcrunch:在 Apache Crunch 上处理 BAMsSAM 以进行变体调用

    VarCrunch 是一种生殖系和体细胞变异调用程序,它使用 Apache (S)Crunch API 在 Hadoop 上运行 变异调用算法本身来自 Guacamole,但 VarCrunch 用作包装器,使用 MapReduce 在 Hadoop 上处理 DNA 测序读取。 用法 ...

    紧缩:Apache紧缩的镜像(正在孵化)

    Apache Crunch 是一个构建在 Apache Hadoop 和 Apache Spark 上的数据处理库,主要针对大规模数据集的批处理和流处理任务。这个“紧缩”的镜像是Apache Crunch项目的一个版本,目前正处于孵化阶段,意味着它是一个...

    crunch-hbase-0.10.0.zip

    Crunch是一个用于Apache Hadoop的Java库,它简化了MapReduce作业的编写和测试,特别是对于数据处理任务。HBase,作为Apache的一个开源项目,是一个分布式、版本化的非关系型数据库,基于Google的Bigtable设计,运行...

    Windows-Crunch-1.1_C51_C++_windows_

    Windows version of linux ganerating dictionary tool - crunch

    crunch-3.6.tgz

    crunch默认安装在kali环境中(05-Password Attacks),Crunch可以按照指定的规则生成密码字典,生成的字典字符序列可以输出到屏幕、文件或重定向到另一个程序中,Crunch可以参数可能的组合和排列,其最新版本为3.6。...

    crunch-example

    紧缩示例这是一个示例项目,演示了Kite Morphline SDK和Apache Crunch 。样本输入CSV 输入,使用 gzip 压缩,结构如下: id, name, age, salary, years_spent, title, department示例 1:查找每个部门的平均工资这是...

    CDH5-Release-Notes.pdf

    ### CDH5 Release Notes 关键知识点 #### 重要通知与版权信息 ...此外,CDH5 Beta 2及Apache Crunch的加入也为开发者提供了更多灵活且高效的工具选择,进一步推动了大数据处理领域的技术创新和发展。

    ETL和特性抽取工具Crunch.zip

    Crunch 是一个用 Go 语言开发的基于 Hadoop 的 ETL 和特性抽取工具,特点是速度快。 标签:Crunch ETL工具框架

    前端项目-crunch.zip

    "前端项目-crunch.zip"提供了一个专为前端设计的高精度整数算术库——Crunch。这个库允许开发者在浏览器环境中进行高效的大数计算,无需依赖后端服务。 Crunch库的核心功能包括: 1. **大素数查找**:在大数范围内...

    Windows-Crunch:代码

    如果要使用Crunch的更新版本,只需将crunch.c放在项目根文件夹中并进行构建。 PS Code :: Blocks可以分别存储构建路径。 如果项目不是在IDE下构建的,只需将根路径添加到搜索目录或手动替换# include &lt; pthread&gt; to...

    Python-Crunch是用于有损PNG图像文件优化的工具

    Python-Crunch是一款高效且易于使用的有损PNG图像文件优化工具,专为开发者和设计师们设计,以减小PNG图像的文件大小而不显著降低图像质量。这个工具由Chris Simpkins开发,它利用了特定的算法来压缩图像数据,从而...

    crunch-csv-to-rcfile:将 CSV 压缩为 RCFile 示例

    这个简单的项目以输入 CSV 数据为例,演示了如何使用 Apache Crunch 写出 RCFile 文件。 使用以下命令运行作业: hadoop jar crunchcsvtorcfile-0.0.1-SNAPSHOT-job.jar [numberofcolumnsinthedata] /your/path/...

    node_crunch:允许在多个节点之间分布计算

    《Node_Crunch:利用Rust实现分布式计算的利器》 在现代的计算机科学中,分布式计算已经成为处理大规模数据和高并发任务的关键技术。Node_Crunch,作为一个以Rust编程语言实现的工具,旨在提供一种高效、可靠的跨...

    Crunch 在mac上创建ios应用图标,非常方便

    Crunch 在mac上创建ios应用图标,非常方便Crunch 在mac上创建ios应用图标,非常方便Crunch 在mac上创建ios应用图标,非常方便Crunch 在mac上创建ios应用图标,非常方便Crunch 在mac上创建ios应用图标,非常方便...

Global site tag (gtag.js) - Google Analytics