`
qindongliang1922
  • 浏览: 2183988 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117536
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125922
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59912
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71301
社区版块
存档分类
最新评论

玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

阅读更多
散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。
在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点:

(一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响

(二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。

(三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。


鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢?

(1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。

(2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。

(3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。



下面,散仙就具体说下如何使用Pig和Solr集成?

(1)依旧访问这个地址下载源码压缩包。
(2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
(3)使用ant重新打包成jar
(4)在pig里,注册相关依赖的jar包,并使用索引存储



注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:


SolrOutputFormat函数

package com.pig.support.solr;



import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
/**
 * @author qindongliang
 * 支持SOlr的SolrOutputFormat
 * 如果你想了解,或学习更多这方面的
 * 知识,请加入我们的群:
 * 
 * 搜索技术交流群(2000人):324714439 
 * 大数据技术1号交流群(2000人):376932160  (已满)
 * 大数据技术2号交流群(2000人):415886155 
 * 微信公众号:我是攻城师(woshigcs)
 * 
 * */
public class SolrOutputFormat extends
		FileOutputFormat<Writable, SolrInputDocument> {

	final String address;
	final String collection;

	public SolrOutputFormat(String address, String collection) {
		this.address = address;
		this.collection = collection;
	}

	@Override
	public RecordWriter<Writable, SolrInputDocument> getRecordWriter(
			TaskAttemptContext ctx) throws IOException, InterruptedException {
		return new SolrRecordWriter(ctx, address, collection);
	}

	
	@Override
	public synchronized OutputCommitter getOutputCommitter(
			TaskAttemptContext arg0) throws IOException {
		return new OutputCommitter(){

			@Override
			public void abortTask(TaskAttemptContext ctx) throws IOException {
				
			}

			@Override
			public void commitTask(TaskAttemptContext ctx) throws IOException {
				
			}

			@Override
			public boolean needsTaskCommit(TaskAttemptContext arg0)
					throws IOException {
				return true;
			}

			@Override
			public void setupJob(JobContext ctx) throws IOException {
				
			}

			@Override
			public void setupTask(TaskAttemptContext ctx) throws IOException {
				
			}
			
			
		};
	}


	/**
	 * Write out the LuceneIndex to a local temporary location.<br/>
	 * On commit/close the index is copied to the hdfs output directory.<br/>
	 * 
	 */
	static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {
		/**Solr的地址*/
		SolrServer server;
		/**批处理提交的数量**/
		int batch = 5000;
		
		TaskAttemptContext ctx;
		
		List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);
		ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
		/**
		 * Opens and forces connect to CloudSolrServer
		 * 
		 * @param address
		 */
		public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {
			try {
				this.ctx = ctx;
				server = new HttpSolrServer(address);
				
				exec.scheduleWithFixedDelay(new Runnable(){
					public void run(){
						ctx.progress();
					}
				}, 1000, 1000, TimeUnit.MILLISECONDS);
			} catch (Exception e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

		
		/**
		 * On close we commit
		 */
		@Override
		public void close(final TaskAttemptContext ctx) throws IOException,
				InterruptedException {

			try {
				
				if (docs.size() > 0) {
					server.add(docs);
					docs.clear();
				}

				server.commit();
			} catch (SolrServerException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			} finally {
				server.shutdown();
				exec.shutdownNow();
			}
			
		}

		/**
		 * We add the indexed documents without commit
		 */
		@Override
		public void write(Writable key, SolrInputDocument doc)
				throws IOException, InterruptedException {
			try {
				docs.add(doc);
				if (docs.size() >= batch) {
					server.add(docs);
					docs.clear();
				}
			} catch (SolrServerException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

	}
}




SolrStore函数

package com.pig.support.solr;



import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreMetadata;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.solr.common.SolrInputDocument;

/**
 * 
 * Create a lucene index
 * 
 */
public class SolrStore extends StoreFunc implements StoreMetadata {

	private static final String SCHEMA_SIGNATURE = "solr.output.schema";

	ResourceSchema schema;
	String udfSignature;
	RecordWriter<Writable, SolrInputDocument> writer;

	String address;
	String collection;
	
	public SolrStore(String address, String collection) {
		this.address = address;
		this.collection = collection;
	}

	public void storeStatistics(ResourceStatistics stats, String location,
			Job job) throws IOException {
	}

	public void storeSchema(ResourceSchema schema, String location, Job job)
			throws IOException {
	}

	@Override
	public void checkSchema(ResourceSchema s) throws IOException {
		UDFContext udfc = UDFContext.getUDFContext();
		Properties p = udfc.getUDFProperties(this.getClass(),
				new String[] { udfSignature });
		p.setProperty(SCHEMA_SIGNATURE, s.toString());
	}

	public OutputFormat<Writable, SolrInputDocument> getOutputFormat()
			throws IOException {
		// not be used
		return new SolrOutputFormat(address, collection);
	}

	/**
	 * Not used
	 */
	@Override
	public void setStoreLocation(String location, Job job) throws IOException {
		FileOutputFormat.setOutputPath(job, new Path(location));
	}

	@Override
	public void setStoreFuncUDFContextSignature(String signature) {
		this.udfSignature = signature;
	}

	@SuppressWarnings({ "unchecked", "rawtypes" })
	@Override
	public void prepareToWrite(RecordWriter writer) throws IOException {
		this.writer = writer;
		UDFContext udc = UDFContext.getUDFContext();
		String schemaStr = udc.getUDFProperties(this.getClass(),
				new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);

		if (schemaStr == null) {
			throw new RuntimeException("Could not find udf signature");
		}

		schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));

	}

	/**
	 * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
	 * @param input
	 * @return
	 */
	private static String stripNonCharCodepoints(String input) {
		StringBuilder retval = new StringBuilder(input.length());
		char ch;

		for (int i = 0; i < input.length(); i++) {
			ch = input.charAt(i);

			// Strip all non-characters
			// http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
			// and non-printable control characters except tabulator, new line
			// and carriage return
			if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step
											// 0x10000
					ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
					(ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
					(ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {

				retval.append(ch);
			}
		}

		return retval.toString();
	}

	@Override
	public void putNext(Tuple t) throws IOException {

		final SolrInputDocument doc = new SolrInputDocument();

		final ResourceFieldSchema[] fields = schema.getFields();
		int docfields = 0;

		for (int i = 0; i < fields.length; i++) {
			final Object value = t.get(i);

			if (value != null) {
				docfields++;
				doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));
			}

		}

		try {
			if (docfields > 0)
				writer.write(null, doc);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			return;
		}

	}

}


Pig脚本如下:
--注册依赖文件的jar包
REGISTER ./dependfiles/tools.jar;

--注册solr相关的jar包
REGISTER  ./solrdependfiles/pigudf.jar; 
REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;
REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;
REGISTER  ./solrdependfiles/httpclient-4.3.1.jar
REGISTER  ./solrdependfiles/httpcore-4.3.jar
REGISTER  ./solrdependfiles/httpmime-4.3.1.jar
REGISTER  ./solrdependfiles/noggit-0.5.jar


--加载HDFS数据,并定义scheaml
a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);

--存储到solr中,并提供solr的ip地址和端口号
store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');
~                                                                                                                                                            
~                                                                      
~                               


配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下:






成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等!

同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待!



想了解更多有关电商互联网公司的搜索技术和大数据技术的使用,请欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!







  • 大小: 213.4 KB
分享到:
评论

相关推荐

    apache-zookeeper-3.6.3版本的压缩包,直接下载到本地,解压后即可使用

    ZooKeeper 顾名思义 动物园管理员,他是拿来管大象 (Hadoop) 、 蜜蜂 (Hive) 、 小猪 (Pig) 的管理员, Apache Hbase和 Apache Solr 以及LinkedIn sensei 等项目中都采用到了 Zookeeper。

    FreeRCH大数据一体化平台开发框架.doc

    本文档介绍了FreeRCH大数据一体化平台开发框架的概述、架构组件、数据源与SQL引擎、数据采集、数据处理、搜索引擎等方面的知识点。 一、什么是大数据一体化开发框架 大数据一体化开发框架是指将大数据平台、物联网...

    PigExtend:Apache Pig+MapReduce给LuceneSolrElasticSearch构建索引

    Apache Pig+MapReduce给Lucene/Solr/ElasticSearch构建索引 ####项目简介 主要是利用了Pig框架简化了自己写Hadoop MapReduce程序来构建大规模并行索引的问题,里面封装了主流的全文检索框架,如Lucene,Solr和...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 38.Lily(共23页).pptx

    【大数据与云计算教程】\n\n本教程涵盖了大数据与云计算领域的多个重要组件和技术,包括Hadoop、Spark、Neo4j等。这些课程旨在帮助学习者深入理解大数据处理和云计算平台的运作机制。\n\n1. **Hadoop**:作为大数据...

    apache doc

    8. **Apache Lucene和Solr文档**:Lucene是全文搜索引擎库,Solr是基于Lucene的企业级搜索平台。这两个项目的文档覆盖了索引创建、查询语法、集群配置等方面。 9. **Apache Kafka文档**:Kafka是一种分布式流处理...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 32.Spark入门之Scala(共173页).pptx

    - **Solr**:Solr是Apache的搜索服务器,支持全文搜索、命中高亮和分布式搜索。 - **Lily**:Lily是Hadoop上的数据集成层,提供多种数据源的接入。 - **Titan**:Titan是一个分布式图数据库,用于存储大规模图形...

    集群搭建(zookeeper集群+solr集群)

    **SolrCloud**是Apache Solr提供的分布式搜索解决方案,适用于需要处理大规模数据集、支持高并发搜索请求的场景。当单一服务器上的索引数据量较小、搜索请求量不多时,并不需要使用SolrCloud。但在数据量巨大且并发...

    hue-3.10大数据监控利器

    - **大数据监控**:Hue 3.10在大数据监控方面的功能是其主要应用场景之一。 - **hadoop**:Hue与Hadoop紧密集成,是Hadoop生态系统的重要组成部分。 总结,Hue 3.10作为大数据监控的利器,提供了一整套工具来简化...

    大数据参考学习的流行路线

    7. **5搜索与推荐专栏**:搜索引擎如Elasticsearch和Solr提供了高效的全文检索功能,它们在大数据场景中用于快速查询和分析。推荐系统则涉及机器学习算法,如协同过滤和基于内容的推荐,以及如何利用用户行为数据...

    《Hadoop at 10-the History and Evolution of the Apache Hadoop Ecosystem》

    - 核心Hadoop(HDFS和MapReduce):在原始Hadoop之上,发展出了诸多项目,如Solr、Pig、Hive、ZooKeeper等,扩展了Hadoop的应用范围。 - 新工作负载:随着YARN的引入,Hadoop开始了对新工作负载的适应,允许更灵活...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 22.Zookeeper(共28页).pptx

    总结起来,本教程全面覆盖了大数据与云计算的基础知识,从Hadoop的核心组件到Zookeeper的分布式协调服务,再到一系列的大数据处理工具,旨在帮助学习者深入理解并掌握大数据处理和云计算的实践技能。

    大数据Hue架构原理.pdf

    Hue组件包括HDFS、Hive、Solr、Impala、Spark、Pig、Oozie、Hbase、MR(MR1/MR2-Yarn)、Sqoop2、Zookeeper等,提供了一个完整的大数据解决方案。 Hue的使用 Hue提供了多种使用方式,包括文件浏览、Job浏览、Beeswax...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 02.MapReduce(共23页).pptx

    【大数据与云计算教程课件】提供了全面的大数据与云计算相关知识体系,涵盖了Hadoop、MapReduce、YARN、HDFS、Hive、HBase、Pig、Zookeeper、Spark等重要组件的学习内容。以下是其中核心知识点的详细说明: 1. **...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 23.Zookeeper服务(共47页).pptx

    【大数据与云计算教程】课程涵盖了从基础到高级的大数据处理技术,其中Zookeeper服务是重要的分布式协调服务组件。Zookeeper由Apache开发,提供了一个高可用、高性能的数据模型,用于管理和同步分布式应用程序的数据...

    大数据资料,自己整理的资料.rar

    大数据,这个标签揭示了我们即将探讨的是一个与海量数据处理、分析和挖掘相关的主题。"大数据资料,自己整理的资料.rar" 这个标题表明这是一个包含个人整理的大数据相关学习资源的压缩包。由于没有具体列出压缩包内...

    大数据技术分享 大数据技术深入浅出 共39页.pdf

    根据提供的文档信息,我们可以深入探讨其中提及的大数据技术的关键知识点,包括大数据的发展历史、Hadoop与Spark的基础介绍以及相关的开源技术框架。 ### 大数据技术发展简史 大数据技术的发展经历了多个阶段,从...

    就业提升day03.docx

    Hue作为一个集成框架,集成了众多大数据组件,提供统一的Web界面,使得用户能够方便地进行各种操作,但复杂问题的调试可能仍需借助原生系统。 三、Apache Impala 3.1 Impala概述 Impala是由Cloudera开发的高速SQL...

    大数据图标大全.docx

    大数据图标大全中列举了众多大数据领域中常用的工具和框架,这些工具涵盖了数据的收集、存储、处理、分析、可视化以及安全等多个方面。以下是对这些工具的详细解释: 1. **Falcon**: 一个数据生命周期管理框架,...

    Cloudera大数据行业应用介绍.pptx

    Cloudera是一家全球领先的大数据平台提供商,自2008年在硅谷成立以来,它一直是Hadoop生态系统的积极推动者和主要...通过与Apache开源社区的紧密合作,Cloudera不断推动技术进步,助力企业在大数据时代取得竞争优势。

    Big Data Glossary-大数据术语

    #### 一、大数据基础概念与技术 **1.1 文档导向型数据库(Document-Oriented)** 文档导向型数据库是一种非关系型数据库,它将数据存储为文档形式,通常使用JSON或XML格式。这种类型的数据库非常适合存储结构化和...

Global site tag (gtag.js) - Google Analytics