If you know Hadoop, you're undoubtedly have seen WordCount before, WordCount serves as a hello world for Hadoop apps. This simple program provides a great test case for parallel processing:
- It requires a minimal amount of code.
- It demonstrates use of both symbolic and numeric values
- It shows a dependency graph of tuples as an abstraction
- It is not many steps away from useful search indexing
When a distributed computing framework can run WordCount in parallel at scale, it can handle much larger and more interesting algorithms as well. Along the way, we'll show you how to use a few more Cascading operations, plus show how to generate a flow diagram as a visualization. The code shown as below:
/* * Copyright (c) 2007-2013 Concurrent, Inc. All Rights Reserved. * * Project and contact information: http://www.cascading.org/ * * This file is part of the Cascading project. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package impatient; import cascading.flow.Flow; import cascading.flow.FlowDef; import cascading.flow.hadoop.HadoopFlowConnector; import cascading.operation.aggregator.Count; import cascading.operation.regex.RegexSplitGenerator; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.pipe.Pipe; import cascading.property.AppProps; import cascading.scheme.hadoop.TextDelimited; import cascading.tap.Tap; import cascading.tap.hadoop.Hfs; import cascading.tuple.Fields; import java.util.Properties; public class Main { public static void main(String[] args) { String docPath = args[0]; String wcPath = args[1]; Properties properties = new Properties(); AppProps.setApplicationJarClass(properties, Main.class); HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); // create source and sink taps Tap docTap = new Hfs(new TextDelimited(true, "\t"), docPath); Tap wcTap = new Hfs(new TextDelimited(true, "\t"), wcPath); // specify a regex operation to split the "document" text lines into a token stream Fields token = new Fields("token"); Fields text = new Fields("text"); RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]"); // only returns "token" Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS); // determine the word counts Pipe wcPipe = new Pipe("wc", docPipe); wcPipe = new GroupBy(wcPipe, token); wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL); // connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef() .setName("wc") .addSource(docPipe, docTap) .addTailSink(wcPipe, wcTap); // write a DOT file and run the flow Flow wcFlow = flowConnector.connect(flowDef); wcFlow.writeDOT("dot/wc.dot"); wcFlow.complete(); } }Let's go through the source code line by line.
- Define a docTap as a incoming tap, and a wcTap as a outcoming tap.
- Configure HadoopFlowConnector, which will be used to connect the pipe between source tap and sink tap, we will talk about phpe later.
-
Use a generator inside an Each object to split the document text into a token stream, the generator uses a regex pattern to split the input text on word boundaries: blank, [, ], (, ), ,(comma sign) and .(period sign).
RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]"); Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS);
- Out of that pipe, we get a tuple stream of token values. One benefit of using a regex is that it's simple to change. We can handle more complex cases of splitting tokens without having to rewrite the generator.
-
Next, we use a GroupBy to count the occurences of each token:
Pipe wcPipe = new Pipe("wc", docPipe); wcPipe = new GroupBy(wcPipe, token); wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL);
Note that we have used Each and Every to perform operations within the pipe assembly. The difference between these two is that an Each operates on individual tuples so that it takes Function operations. An Every operates on groups of tuples so that it takes Aggregator or Buffer operations - in this case, the GroupBy performed an aggregation. The different ways of inserting operations serve to categorize the different built-in operations in Cascading. -
From that wcPipe we get a resulting tuple stream of token and count for the output. Again, we connect the plumbing with a FlowDef:
FlowDef flowDef = FlowDef.flowDef() .setName("wc") .addSource(docPipe, docTap) .addTailSink(wcPipe, wcTap); Flow wcFlow = flowConnector.connect(flowDef);
-
Finally, we generate a dot file to depict the Cascading flow graphically, those diagrams are really helpful for troubleshooting workflows in Cascading:
// Generate a dot file to depict the flow. wcFlow.writeDOT("dot/wc.dot"); wcFlow.complete();
Below is what the diagram looks like in OmniGraffle.
相关推荐
Cascading Style Sheets: Designing for the Web, Third Edition By Håkon Wium Lie, Bert Bos ............................................... Publisher: Addison Wesley Professional Pub Date: April...
级联(Cascading)是一个开源的数据流处理框架,主要用于构建大规模的数据处理应用程序。它在Java平台上运行,并且设计目标是简化大数据的复杂性,提供一个高层次的抽象来处理分布式计算。"cascading.snippets"项目是...
这是 Cascading.Hive 模块。 它为 HCatalog 和 Scheme 提供了 Cascading Tap/Scheme for Hive 本机文件格式(RCFile 和 ORC)。 笔记 Maven 依赖 <groupId>com.ebay</groupId> <artifactId>cascading-hive ...
" kohana/cascading-filesystem " : " ~1.0 " } 现在在项目根目录的终端中运行composer install 。 然后,您必须通过要求自动加载文件来启用 Composer 自动加载(如果您还没有): // Enable composer ...
而Cascading是一个基于Hadoop MapReduce的开源Java库,它提供了一种高级抽象来简化复杂的数据处理任务。Cascading框架使得开发人员可以更加专注于业务逻辑,而不是底层的MapReduce实现细节。下面我们将深入探讨...
在这个名为“cascading-faiulure”的项目中,提供了处理级联故障的代码,帮助我们模拟和研究这一过程。代码可能包含了对电力网络的数学模型,以及故障发生、传播和控制的算法实现。 直流潮流模型是电力系统分析中的...
该项目提供了一个序列化器... Cascading-Thrift 支持 Cascading 1.2 和 Cascading 2.0。 对于级联 1.2 支持,请使用[backtype/cascading-thrift " 0.1.0 " ] 对于级联 2.0,使用[backtype/cascading-thrift " 0.2.5 " ]
适用于最新版本的 Cassandra 和 Cascading (2.0),经过测试,维护良好。 它对我们来说工作正常,但使用它需要您自担风险。 如果您不Cassandra,请查看我们的,它们最初是为和 Clojure Cassandra 驱动程序编写的,...
级联属性 ... 为什么? ... CSS 选择器自然而然地适合这一点。 ... 因此,我们不得不提出自己的解决方案。... CascadingPropertySet实例只需要一些属性定义和一组规则: var properties = new CascadingPropertySet ( ) ;...
cascading_ext cascading_ext是在平台之上构建的工具的集合,这些工具使构建,调试和运行简单而高性能的数据工作流变得容易。特征该项目中一些最有趣的公共课程(到目前为止)。子装配体布卢姆·乔恩被设计为CoGroup...
重要信息:我已经继续创建一个来支持Laravel 5中的级联配置。此存储库仅出于历史目的,不会被更新或监视。... 这个简单的要点是重新启用它(以及整个闪亮的灵活性)。设置在与应用程序的config目录相同的级别上,创建另...
电影级联推荐影片 大多数推荐系统推荐了K个项目的列表,例如餐馆,歌曲或电影。 用户检查从第一项到最后一项的推荐列表,并且通常单击吸引用户并且不检查其余项的第一项。 级联模型是一种简单,直观且流行的模型,...
垂直级联菜单(使用 JqueryUI) 该项目是 Jquery UI 菜单小部件的实现( ) 它接受一个 JSON 输入,并以潜在的无限深度递归地创建一个相应的垂直级联菜单 截至目前,它仅适用于 <ul> <li> 类型的菜单。...
每个网站都有层叠样式表(Cascading Style Sheet:CSS) CSS让HTML元素呈现出差异化,是那些具有完全相同修饰的元素呈现出不同的样式。比如,有一些标签如下: “green”> 还有一些是这样: “red”> 我们显然可以...
Hadoop编程框架Cascading是基于Hadoop MapReduce的一个高级抽象层,它的设计目标是让开发者能够更加便捷地编写和管理大数据处理作业。Cascading不仅简化了MapReduce的复杂性,还提供了一种声明式的数据流编程模型,...
转换工具通常会将Word文档中的样式信息转化为CSS(Cascading Style Sheets),以控制HTML元素的显示。CSS可以更好地控制网页的布局和视觉效果,使其在不同浏览器和设备上保持一致。 6. **保留原始格式**: 为了...
- HTML页面通常与CSS(Cascading Style Sheets)结合使用来控制样式,转换后的HTML可能需要额外的CSS来恢复或改进Word文档的原始格式。 - JavaScript可以用于增强交互性,例如表单验证、动态内容加载等。 6. **...
CSS是层叠样式表(Cascading Style Sheets)的缩写,主要用于增强网页的外观和布局,其属性能控制网页上的文本排版、颜色、边距、宽度、高度等多种样式。在本文中,我们要深入探讨的两个CSS属性是word-break与word-...
它通过标签来定义元素的结构和样式,与CSS(Cascading Style Sheets)结合使用可以实现更丰富的视觉效果。 3. **Word转HTML的过程** - **解析Word文档**:转换工具首先需要解析DOCX文件,读取XML内容,包括文本、...