`

Data ETL tools for hadoop ecosystem Morphlines

 
阅读更多

 

when i use

there is a error

java.lang.NoClassDefFoundError: org/kitesdk/morphline/api/MorphlineCompilationException
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:190)
	at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:93)
	at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
	at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
	at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

 

solution:

* clone flume from git 
* cd flume 
* edit flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml, in there do the following: 

** make cdk-morphlines-all required by commenting out this blurb: <optional>true</optional> 
** add the following mvn blurb to the <build> element in order to copy the dependency jars into the target/lib dir: 

      <plugin> 
        <groupId>org.apache.maven.plugins</groupId> 
        <artifactId>maven-dependency-plugin</artifactId> 
        <executions> 
          <execution> 
            <phase>package</phase> 
            <goals> 
              <goal>copy-dependencies</goal> 
            </goals> 
            <configuration> 
              <outputDirectory>${project.build.directory}/lib</outputDirectory> 
              <includeScope>runtime</includeScope> <!-- excludes test jars; see http://jira.codehaus.org/browse/MDEP-128 --> 
              <excludeScope>provided</excludeScope> 
            </configuration> 
          </execution> 
        </executions> 
      </plugin> 

* mvn -Dhadoop.profile=2 clean package -pl flume-ng-sinks/flume-ng-morphline-solr-sink 

* find flume-ng-sinks/flume-ng-morphline-solr-sink/target -name '*.jar' 

* copy the jars printed out by the above find command into the flume lib dir 

 

 

see : https://groups.google.com/a/cloudera.org/forum/#!msg/cdk-dev/7T4pTebdWN4/sBHGkoS70LkJ

 

---------

solr server's version is 4.10.1

 

error



 

 solution:

find solr's lib

ls /home/tomcat/solr/WEB-INF/lib/
antlr-runtime-3.5.jar                hadoop-hdfs-2.2.0.jar                  lexicon                               lucene-suggest-4.10.1.jar
asm-4.1.jar                          hppc-0.5.2.jar                         log4j-1.2.16.jar                      mahout-collections-1.0.jar
asm-commons-4.1.jar                  httpclient-4.3.1.jar                   lucene-analyzers-common-4.10.1.jar    mahout-math-0.6.jar
attributes-binder-1.2.2.jar          httpcore-4.3.jar                       lucene-analyzers-kuromoji-4.10.1.jar  noggit-0.5.jar
carrot2-core-3.10.0-SNAPSHOT.jar     httpmime-4.3.1.jar                     lucene-analyzers-phonetic-4.10.1.jar  org.restlet-2.1.1.jar
commons-cli-1.2.jar                  inok-solr-dataimportscheduler-1.1.jar  lucene-analyzers-smartcn-4.10.1.jar   org.restlet.ext.servlet-2.1.1.jar
commons-codec-1.9.jar                jackson-core-asl-1.9.13.jar            lucene-codecs-4.10.1.jar              protobuf-java-2.5.0.jar
commons-configuration-1.6.jar        jackson-mapper-asl-1.9.13.jar          lucene-core-4.10.1.jar                simple-xml-2.7.jar
commons-fileupload-1.2.1.jar         jcl-over-slf4j-1.6.6.jar               lucene-expressions-4.10.1.jar         slf4j-api-1.6.6.jar
commons-io-2.3.jar                   jcseg-analyzer-1.9.5.jar               lucene-grouping-4.10.1.jar            slf4j-log4j12-1.6.6.jar
commons-lang-2.6.jar                 jcseg-core-1.9.5.jar                   lucene-highlighter-4.10.1.jar         solr-core-4.10.1.jar
concurrentlinkedhashmap-lru-1.2.jar  jcseg-core-1.9.5.jar.old               lucene-join-4.10.1.jar                solr-solrj-4.10.1.jar
dom4j-1.6.1.jar                      jcseg.properties                       lucene-memory-4.10.1.jar              spatial4j-0.4.1.jar
guava-14.0.1.jar                     jcseg-solr-1.9.5.jar                   lucene-misc-4.10.1.jar                wstx-asl-3.2.7.jar
hadoop-annotations-2.2.0.jar         jcseg-solr-1.9.5.jar.old               lucene-queries-4.10.1.jar             zookeeper-3.4.6.jar
hadoop-auth-2.2.0.jar                joda-time-2.2.jar                      lucene-queryparser-4.10.1.jar
hadoop-common-2.2.0.jar              jul-to-slf4j-1.6.6.jar                 lucene-spatial-4.10.1.jar

 

#cp  /home/tomcat/solr/WEB-INF/lib/lucene-*-4.10.1.jar   /root/aflume/lib/

#cp /home/tomcat/solr/WEB-INF/lib/solr-*-4.10.1.jar    /root/aflume/lib/

#cp /home/tomcat/solr/WEB-INF/lib/http*-4.3.1.jar   /root/aflume/lib/

 

delete old related jars in /root/aflume/lib/  such as lucene-*-4.3.0.jar  solr-*-4.3.0.jar   http*-4.2.1.jar

 

-------

when run

#flume-ng agent -c ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n a1

 

flume/conf/flume.conf

# Define a memory channel called ch1 on agent1
a1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
a1.sources.mysqlbinlog.channels = ch1
a1.sources.mysqlbinlog.type = com.inoknok.mysqlbinlog.source.InokMysqlBinlogSource
a1.sources.mysqlbinlog.user=zhaohj
a1.sources.mysqlbinlog.password=zhaohj111
a1.sources.mysqlbinlog.host=192.168.0.135
a1.sources.mysqlbinlog.port=3306
a1.sources.mysqlbinlog.serverId=2
a1.sources.mysqlbinlog.autoReconnect=false


# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
a1.sinks.k1.channel = ch1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.morphlineFile = /root/aflume/conf/morphline.conf

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
a1.channels = ch1
a1.sources = mysqlbinlog
a1.sinks = k1

 

flume/conf/morphile.conf

morphlines : [
    {
        id : morphline1
        importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"]

        commands : [
          {
             readClob {
              charset : UTF-8
             }
          }

          {

             split {
                inputField : message
                outputFields : ["", "", "",user_id, username, age]
                separator : "\u0001"
                isRegex : false
                addEmptyStrings : false
                trim : true
             }
          }

          {
              sanitizeUnknownSolrFields {
                 solrLocator : {
                        solrUrl : "192.168.10.204:8983"
                        solrHomeDir : "/home/tomcat/solrHome/test.inok_user"
                        #collection : "@${tablename}"
                        #zkHost: "192.168.10.204:8983"
                 }
              }
          }
          {
             logDebug {
                format : "xxxxxxxxx   xxxxxx My output record: {}"
                args : ["@{}"]
             }
          }


            # load the record into a Solr server or MapReduce Reducer
          {
             loadSolr {
                solrLocator : {
                        solrUrl : "192.168.10.204:8983"
                        solrHomeDir : "/home/tomcat/solrHome/test.inok_user"
                   #collection : "@${tablename}"       # Name of solr collection
                   #zkHost : "192.168.10.204:8983" # ZooKeeper ensemble
                }
             }
          }

        ]
    }
]

 

 

error:



 

Solution:

 

        solrUrl : "http://192.168.10.204:8983/test.inok_user"
        solrHomeDir : "/home/tomcat/solrHome/test.inok_user"

 

----------

Morphline - Choose collection for loadSolr at run time

 https://groups.google.com/a/cloudera.org/forum/#!topic/search-user/z9A_Xe5FviM

 http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Custom_Command

Use case: I want to dynaic set solrLocator at run time, but faild. The replacement way is to use tryRule command, but the solrLocator must be setted hardcode.

 

morphlines : [
    {
        id : morphline1
        importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"]

        commands : [
          {
             readClob {
              charset : UTF-8
             }
          }


         {
              tryRules {
              catchExceptions : false
              throwExceptionIfAllRulesFailed : true
            rules : [
            {
              //test.inok_user
              commands : [
                {contains {tablename : [test.inok_user]} }
                {logDebug { format : "YYYYYYYYY 1111 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]} }
                {split {inputField : message, outputFields : [updatetime,"","",user_id, username, age],separator : "\u0001",
                        isRegex : false,addEmptyStrings : false, trim : true} }
                {convertTimestamp {
                        field : updatetime
                        inputFormats : ["unixTimeInMillis"]
                        inputTimezone : UTC
                        outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
                        outputTimezone : PRC
                        }
                }
                {sanitizeUnknownSolrFields {
                            solrLocator : {
                                solrUrl :"http://192.168.10.204:8983/test.inok_user/"
                                solrHomeDir : "/home/tomcat/solrHome/test.inok_user"
                             }
                         }}
                {logDebug { format : "xxxxxxxxx 1111 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]}  }
                 # load the record into a Solr server or MapReduce Reducer
                {loadSolr { solrLocator : {
                                    solrUrl : "http://192.168.10.204:8983/test.inok_user"
                                    solrHomeDir : "/home/tomcat/solrHome/test.inok_user"
                                    batchSize : 1
                             }  }  }
                ]
            }//end rule1
            #test.inoktest
            {
              //test.inoktest
              commands : [
                {contains {tablename : [test.inoktest]} }
                {logDebug {format : "YYYYYYYYY 2222 My output record: {}, talbename={}",args : ["@{}","@{tablename}"] } }
                {split {inputField : message, outputFields : [updatetime,"","",id, content],separator : "\u0001",
                        isRegex : false,addEmptyStrings : false, trim : true} }
                {convertTimestamp {
                        field : updatetime
                        inputFormats : ["unixTimeInMillis"]
                        inputTimezone : UTC
                        outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
                        outputTimezone : PRC
                        }
                }
                {sanitizeUnknownSolrFields {solrLocator : {
                                              solrUrl :"http://192.168.10.204:8983/test.inoktest/"
                                              solrHomeDir : "/home/tomcat/solrHome/test.inoktest"
                                              }
                                           }}
                {logDebug { format : "xxxxxxxxx 2222 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]} }
                 # load the record into a Solr server or MapReduce Reducer
                {loadSolr { solrLocator : {
                                    solrUrl : "http://192.168.10.204:8983/test.inoktest"
                                    solrHomeDir : "/home/tomcat/solrHome/test.inoktest"
                                    batchSize : 1
                             }
                           } }
             ]
            }//end rule2

        ]
       }//end rules

     }

  ]
 }
]

 

--------------------

when I configure avro source in agent a while an avro sink in agent b in different hosts.

flume.conf in host a

# Define a memory channel called ch1 on agent1
a1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
a1.sources.mysqlbinlog.channels = ch1
a1.sources.mysqlbinlog.type = com.inoknok.mysqlbinlog.source.InokMysqlBinlogSource
a1.sources.mysqlbinlog.user=zhaohj
a1.sources.mysqlbinlog.password=zhaohj111
a1.sources.mysqlbinlog.host=192.168.0.135
a1.sources.mysqlbinlog.port=3306
a1.sources.mysqlbinlog.serverId=2
a1.sources.mysqlbinlog.autoReconnect=false


#sink to solr
a1.sinks.k1.channel = ch1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.morphlineFile = /root/aflume/conf/morphline.conf


#sink to avro
a1.sinks.k3.type = avro
a1.sinks.k3.channel = ch1
a1.sinks.k3.hostname = 192.168.0.135
a1.sinks.k3.port = 4545


# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
a1.channels = ch1
a1.sources = mysqlbinlog
a1.sinks =  k3

 

 

flume.conf in host b

# Define a memory channel called ch1 on agent1
a1.channels.ch1.type = memory

#source avro
a1.sources.s1.channels = ch1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 4545

#sink to hdfs
a1.sinks.k1.channel = ch1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/mysqlbinlog/%{tablename}
a1.sinks.k1.hdfs.filePrefix = %{tablename}-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true


# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
a1.channels = ch1
a1.sources = s1
a1.sinks = k1

 

There is a error



 

 

 

--------------------

1. install    flume in the same machine with solr

2. add zookeeper-3.4.6.jar to flume/lib dir

3. put plugin in plugins.d dir

plugins.d/
└── mysqlbinlog
    ├── lib
    │   └── mysqlbinlog-sources.jar
    └── libext
        └── open-replicator-1.0.7.jar

 

4. set mysql to master/salve replication set and start up master and slave mysql daemons

5. start up solr

6. test by use command

flume-ng agent -c ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n a1

 

 

 

 

 

 

 

 

 

References

http://flume.apache.org/FlumeUserGuide.html#morphlinesolrsink

http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html

http://cloudera.github.io/cdk/docs/current/cdk-morphlines/morphlinesReferenceGuide.html

http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/search_flume_morphline_solr_sink_config_options.html

http://www.slideshare.net/cloudera/using-morphlines-for-onthefly-etl

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Custom_Command

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

  • 大小: 62.4 KB
  • 大小: 178.5 KB
  • 大小: 195 KB
分享到:
评论

相关推荐

    ETL+hadoop采集

    1. **修改ETL加载的大数据平台版本**:在`SDCETLDesigner\common\plugins\pentaho-big-data-plugin\plugin.properties`文件中,将`active.hadoop.configuration`的值修改为`hdp23`,这表示当前ETL软件将使用Hadoop ...

    The Data WarehouseETL Toolkit: Practical Techniques for

    标题中提到的“Practical Techniques for Extracting, Cleaning, Conforming, and Delivering Data”,暗示了书中包含的具体内容。首先,“Extracting”指的是数据抽取,即从各种数据源中获取数据。这些数据源可能是...

    基于Hadoop的ETL处理Shell架构

    【基于Hadoop的ETL处理Shell架构】是一种高效的数据处理框架,主要应用于大数据场景。ETL,即Extract-Transform-Load,是数据仓库领域的重要概念,主要包括数据抽取、转换和加载三个步骤。在这个架构中,Hadoop作为...

    基于Hadoop的ETL处理Shell架构5

    【基于Hadoop的ETL处理Shell架构5】深入解析 在大数据处理领域,ETL(Extract, Transform, Load)是核心环节,它负责从不同数据源抽取数据,进行清洗、转换,然后加载到目标存储中。Hadoop作为分布式计算框架,提供...

    Big Data Made Easy - A Working Guide To The Complete Hadoop Toolset

    This chapter covers tools and techniques for moving data into and out of Hadoop clusters: - **Hadoop Commands**: Basic commands for managing files and directories in HDFS. - **Sqoop**: A tool for ...

    DataETL 用户指南.pdf

    数据抽取转换加载(ETL)是数据仓库技术的核心和关键步骤,其主要目的是将分散的、异构数据源中的数据提取出来,并进行清洗、转换等处理,最后加载到目标数据仓库或数据存储中。ETL设计包括理解数据源、定义数据抽取...

    Big Data Networked Storage Solution for Hadoop - Redpaper - IBM

    **Big Data Networked Storage Solution for Hadoop**:此标题明确指出了文档的主要内容——面向Hadoop的大数据网络化存储解决方案。Hadoop作为一种流行的分布式处理框架,广泛应用于大数据分析领域。IBM在此基础上...

    基于Hadoop的ETL系统的设计与实现_王传金.caj

    基于Hadoop的ETL系统的设计与实现_______.caj

    The Data Warehouse ETL Toolkit中文版

    data staging, or the extract, transform, load (ETL) process Delineates best practices for extracting data from scattered sources, removing redundant and inaccurate data, transforming the remaining ...

    Modern Big Data Processing with Hadoop

     · Get an in-depth view of the Apache Hadoop ecosystem and an overview of the architectural patterns pertaining to the popular Big Data platform  · Conquer different data processing and analytics ...

    基于Hadoop平台的分布式ETL系统设计与实现.pdf

    为应对这一挑战,基于Hadoop平台的分布式ETL系统设计与实现成为研究的热点。 Hadoop平台是一个开源框架,由Apache基金会管理,它允许使用简单的编程模型在跨计算机集群的分布式环境中存储和处理大数据。其核心组成...

    Data warehouse ETL Toolkit(中文版)数据仓库ETL工具箱

    《Data warehouse ETL Toolkit》的中文版,中文名为数据仓库ETL工具箱, 一本介绍数据仓库ETL设计与开发的经典书籍,是Kimball数据仓库序列之作中的一本,其它两本为维度建模指南和数据仓库生命周期。

    数据仓库ETL工具箱 Data Warehouse ETL Toolkit.rar

    5. **ETL与大数据**:随着大数据技术的发展,ETL也扩展到Hadoop、Spark等分布式计算框架,如Hive、Pig、Spark SQL等用于数据处理。 6. **云计算中的ETL**:AWS Glue、Azure Data Factory和Google Cloud Dataflow等...

    A magical ETL tools : Kettle

    标题中的"A magical ETL tools: Kettle"指的是一种强大的数据处理工具——Kettle,它在信息技术领域,尤其是数据仓库和大数据分析中扮演着重要角色。ETL是Extract, Transform, Load的缩写,是数据仓库系统中用于将...

    The Data Warehouse ETL Toolkit.pdf

    根据提供的文件信息,“The Data Warehouse ETL Toolkit”是一本关于数据仓库中提取、清洗、转换和交付数据(ETL)过程的专业书籍。本书由Ralph Kimball和Joe Caserta撰写,并由Wiley Publishing, Inc.出版。以下是...

    The Data Warehouse ETL Toolkit

    《数据仓库ETL工具箱》是一本专注于数据提取、转换和加载(ETL)过程的专业书籍,对于理解和实践数据仓库建设具有重要的参考价值。ETL是数据仓库系统中的核心环节,它负责从各种异构数据源中抽取数据,进行必要的...

    Talend Open Studio For Data integration (ETL) 高清视频教程

    文档里有针对Talend Open Studio For Data integration (ETL) 的高清视频教程下载地址。有需要的人可以自行下载。

Global site tag (gtag.js) - Google Analytics