Morphlines 是一个开源框架,它降低了 Hadoop 平台上开发和维护 ETL 流程的时间和成本,通过 Morphlines 可以抽取、转换并加载数据到 Apache Solr 、 HBase 、HDFS 、企业数据仓库或者在线分析应用。我们可以很方便的通过配置文件对 morphline 的流程进行配置,不需要进行 Java 编码工作。
Morphlines 本身是一个类库,可以嵌入到任何 Java 代码里。一个 morphline 是一个转换命令集合,这些命令对记录进行加载、转换等等。记录是一个内存数据结构,由键值对组成,可能含有 blob 或 POJO 附件。
1. 系统概要
1.1. 背景介绍
在 Hadoop 平台上开发 ETL 应用是一项费时费力的工作,要编写一些复杂的处理程序,如 MapReduce 作业、 Oozie 工作流等等。为了适应多样化的应用需求,缩短应用开发周期,降低开发维护成本, Cloudera 公司开发了 Morphlines 框架。这个框架通过简单的配置文件来定义一个 ETL 流程,不需要任何 Java 编码即可满足大部分应用需求。如果既有的命令无法满足应用需求,程序员也可以写一些命令,这些命令可以方便的整合到 Morphlines 框架中。
1.2. 系统架构
1) 输入源: ETL 数据源, Morphlines 可以从 Flume 、 HBase 和 HDFS 中读取数据。
2 ) Morphline :一个自定义的工作流配置文件。
3 )输出: Morphline 可以输出数据到多个系统,如 HBase 、 HDFS 、 Solr 及企业数据仓库等等。
1.3. UDH Search与Morphline的关系
Morphline 最初是作为 Search 的一部分来开发的,后来为了有更多的开发者和使用者能够了解和扩展 Morphline 的功能, Morphline 形成了一个独立的开源项目。利用 Morphline 可以实现 Search 中的 ETL 阶段工作,也可以实现 Search 之外的一些 ETL 工作。本文以 Morphline 在 Search 中的应用为例,讲述代码开发和部署,以及一些需要注意的问题。下图是 Morphline 在 Search 整体架构中的角色应用。
2. 系统安装与配置
2.1. UDH Search服务安装
在集群上部署 UDH Search 服务,参见“专题 -UDH Search 系统构建及其应用”一文。需要配置 Lily HBase Batch Indexer 和 Lily HBase NRT Indexer ,分别用于索引的批量创建和实时创建,其中索引的实时创建是利用 HBase 的 Replication 机制实现的。
3. Morphline 命令开发
Morhpline 命令需要实现 Java 接口 Command 或它的子类 AbstractCommand ,另外还需要一个 CommandBuilder 接口的实现类,在这个 CommandBuilder 里定义Morhpline 命令的名字。下面以删除某字段里 Html 标签为例,给出代码的具体实现。
3.1. CommandBuilder
在 Builder 里指定命令名称并创建命令实例(工厂方法)。
package com.yonyou.udhsearch.morphlines;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.CommandBuilder;
import org.kitesdk.morphline.api.MorphlineContext;
import com.typesafe.config.Config;
public final class DropHtmlTags implements CommandBuilder {
@Override
public Collection<String> getNames() {
return Collections.singletonList(" htmlDropper ");
}
@Override
public Command build(Config config, Command parent, Command child, MorphlineContext context) {
return new HtmlDropper(this, config, parent, child, context);
}
}
3.2. Command
在命令实现类里实现 doProcess(Record record) 方法,在这个方法加入我们需要的处理逻辑,在本例中是删除 Html 标签及回车换行符。
package com.yonyou.udhsearch.morphlines ;
import java.util.Collection;
import java.util.Collections;
import java.util.ListIterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.AbstractCommand;
import com.typesafe.config.Config;
private static final class HtmlDropper extends AbstractCommand {
private static final String regEx_script = "<script[^>]*?>[\\s\\S]*?<\\/script>";
private static final String regEx_style = "<style[^>]*?>[\\s\\S]*?<\\/style>"; //
private static final String regEx_html = "<[^>]+>";
private static final String regEx_space = "\\s*|\t|\r|\n";
private final String fieldName;
public HtmlDropper(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) {
super(builder, config, parent, child, context);
this.fieldName = getConfigs().getString(config, " field ");
validateArguments();
}
@Override
protected boolean doProcess(Record record) {
ListIterator iter = record.get(fieldName).listIterator();
while (iter.hasNext()) {
iter.set(transformFieldValue(iter.next()));
}
// pass record to next command in chain:
return super.doProcess(record);
}
/** Transforms the given input value to some output value */
private Object transformFieldValue(Object value) {
if (value == null) return "";
String htmlStr = value.toString();
try {
Pattern p_script = Pattern.compile(regEx_script, Pattern.CASE_INSENSITIVE);
Matcher m_script = p_script.matcher(htmlStr);
htmlStr = m_script.replaceAll("");
Pattern p_style = Pattern.compile(regEx_style, Pattern.CASE_INSENSITIVE);
Matcher m_style = p_style.matcher(htmlStr);
htmlStr = m_style.replaceAll("");
Pattern p_html = Pattern.compile(regEx_html, Pattern.CASE_INSENSITIVE);
Matcher m_html = p_html.matcher(htmlStr);
htmlStr = m_html.replaceAll("");
Pattern p_space = Pattern.compile(regEx_space, Pattern.CASE_INSENSITIVE);
Matcher m_space = p_space.matcher(htmlStr);
htmlStr = m_space.replaceAll("");
if (htmlStr.indexOf(" ") != -1)
htmlStr = htmlStr.replaceAll(" ", "");
} catch (Exception e) {
htmlStr = value.toString();
}
return htmlStr;
}
@Override
protected void doNotify(Record notification) {
LOG.debug("myNotification: {}", notification);
super.doNotify(notification);
}
}
4. Morphline 配置文件
在 Command 的构造函数里,我们指定了一个属性 ’ field ’ ,也就是说,我们可以通过 field 来指定对哪个字段进行操作。除了 field 属性,还需要在 Morphline 的配置文件里引入新开发的命令类。在这个例子里,对字段 data_f2 进行 Html 标签删除操作。如下所示:
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.morphline.**", "com.ngdata.**", " com.yonyou.udhsearch.morphlines.** "]
commands : [
{
extractHBaseCells {
mappings : [
{
inputColumn : "data:f1"
outputField : "data_f1"
type : string
source : value
}
{
inputColumn : "data:f2"
outputField : "data_f2"
type : string
source : value
}
]
}
}
{
htmlDropper {
field : data_f2
}
}
{ logTrace { format : "output record: {}", args : ["@{}"] } }
]
}
]
5. Morphline 部署运行
运行时,需要保证新开发的命令类在 MapReduce 作业或 Lily NRT Indexer 服务的类路径里。
5.1. MapReduce作业
对于 MR 作业,可直接将打好的 jar 包包含在命令行里,如下所示:
HADOOP_CLASSPATH=/usr/lib/hbase/hbase-protocol.jar \
hadoop --config /etc/hadoop/conf jar \
/usr/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar --conf \
/etc/hbase/conf/hbase-site.xml --libjars /root/tools/MyMorphlineCommands/bin/DropHtmlTags.jar \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file /root/search/hbase_html_tags_dropper/conf/hbase_html_tags_dropper_mapper.xml --zk-host \
127.0.0.1/solr --collection hbase_html_tags_dropper --go-live --log4j \
/etc/hbase/conf.dist/log4j.properties
5.2. Lily NRT Indexer
将 jar 包放在服务的 lib 目录下,即 /usr/lib/hbase-solr/lib/ ,重启服务。
分享到:
相关推荐
flume morphline处理json数据依赖包,对json数据进行预处理
mumu-morphlines是一个kite_morphlines测试程序,主要通过这个项目来了解和_mumu-morphlines
Morphlines配置文件定义了一系列命令,这些命令按照顺序执行,对输入数据进行清洗、解析、转换等操作,最终将数据格式化为Solr可以理解的文档。在HBase到Solr的索引构建过程中,morphlines配置会定义如何从HBase的列...
数据结构学习资料 数据结构项目实践 数据结构源代码 供参考数据结构学习资料 数据结构项目实践 数据结构源代码 供参考数据结构学习资料 数据结构项目实践 数据结构源代码 供参考数据结构学习资料 数据结构项目实践 ...
标题 "kite-morphlines-elasticsearch" 指的是一个专门针对 Elasticsearch 设计的工具,它是 Kite SDK(一个开源的 Java 开发框架)的一部分。Kite SDK 提供了一组库,帮助开发者处理数据,而 "morphlines" 是 Kite ...
10. **solr-morphlines-core-6.3.0.jar**:Morphlines 是一个用于数据预处理的库,能够处理各种输入数据流,将其转换成 Solr 可以理解的格式。 在使用 SolrJ 时,开发人员需要将这些库添加到项目的类路径中,以便...
在描述中提到"Used to demonstrate best practices for managing back-office systems",我们可以推断这个压缩包可能包含了一套完整的源代码、配置文件以及相关文档,用于展示如何高效地处理后台业务流程,如库存...
9. **solr-morphlines-core-5.3.0.jar**:Morphlines 是一种配置驱动的工具,用于预处理和转换输入数据。它可以在索引前对原始数据进行清洗、解析和转换,以满足 Solr 的需求。 10. **solr-uima-5.3.0.jar**:Solr ...
本文档详细介绍了如何利用HBase和Solr创建二级索引的过程。通过整合HBase与Solr的优势,可以构建高性能的数据存储与检索系统。HBase作为分布式列族数据库,能够处理海量数据,并提供快速随机读写能力。而Solr则是一...
在本文档中,我们将详细介绍如何使用 Solr 实现 HBase 的二级索引。我们将从创建 HBase 表和插入数据开始,然后创建 Solr 的分片,配置 morphline-hbase-mapper.xml 文件,最后注册 Lily HBase Indexer ...
MapReduce 与 Morphlines 介绍 Morphlne-MR 是一些 ETL 的简单 hadoop mapreduce 作业。 它从 Hdfs 读取输入数据并进行 morphline 处理,然后将文件写入 Hdfs。 morphline.conf 包含所有的魔法。 从_attachment_...
阿帕奇·索尔 这里的示例演示了如何使用LocalIndexer.java在本地建立文件...avro-tools-1.7.6-cdh5.11.2.jar kite-data-core-1.0.0-cdh5.11.2.jar kite-morphlines-solr-core-1.0.0-cdh5.11.2.jar httpclient-4.2.5.jar
lastfm-morphlines-etl模块: hdp-sandbox-access模块: mapreduce-morphline模块: 纱线队列测试模块:纱线 tez-dag-jobs模块:Tez一起 纱线监控-R模块: scalding-correlation模块: spark-clustering模块: ...