`
qindongliang1922
  • 浏览: 2183987 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117536
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125922
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59912
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71301
社区版块
存档分类
最新评论

如何使用Spark大规模并行构建索引

阅读更多

使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。

先看下,整体的拓扑图:





然后,再来看下,使用scala写的spark程序:

package com.easy.build.index

import java.util

import org.apache.solr.client.solrj.beans.Field
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.annotation.meta.field
/**
  * Created by qindongliang on 2016/1/21.
  */

//注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下
case class Record(
                   @(Field@field)("rowkey")     rowkey:String,
                   @(Field@field)("title")  title:String,
                   @(Field@field)("content") content:String,
                   @(Field@field)("isdel") isdel:String,
                   @(Field@field)("t1") t1:String,
                   @(Field@field)("t2")t2:String,
                   @(Field@field)("t3")t3:String,
                   @(Field@field)("dtime") dtime:String


                 )

/***
  * Spark构建索引==>Solr
  */
object SparkIndex {

  //solr客户端
  val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");
  //批提交的条数
  val batchCount=10000;

  def main2(args: Array[String]) {

    val d1=new Record("row1","title","content","1","01","57","58","3");
    val d2=new Record("row2","title","content","1","01","57","58","45");
    val d3=new Record("row3","title","content","1","01","57","58",null);
    client.addBean(d1);
    client.addBean(d2)
    client.addBean(d3)
    client.commit();
    println("提交成功!")


  }


  /***
    * 迭代分区数据(一个迭代器集合),然后进行处理
    * @param lines 处理每个分区的数据
    */
  def  indexPartition(lines:scala.Iterator[String] ): Unit ={
          //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等
          val datas = new util.ArrayList[Record]()
          //迭代处理每条数据,符合条件会提交数据
          lines.foreach(line=>indexLineToModel(line,datas))
          //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据
          commitSolr(datas,true);
  }

  /***
    *  提交索引数据到solr中
    *
    * @param datas 索引数据
    * @param isEnd 是否为最后一次提交
    */
  def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={
          //仅仅最后一次提交和集合长度等于批处理的数量时才提交
          if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {
            client.addBeans(datas);
            client.commit(); //提交数据
            datas.clear();//清空集合,便于重用
          }
  }


  /***
    * 得到分区的数据具体每一行,并映射
    * 到Model,进行后续索引处理
    *
    * @param line 每行具体数据
    * @param datas 添加数据的集合,用于批量提交索引
    */
  def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={
    //数组数据清洗转换
    val fields=line.split("\1",-1).map(field =>etl_field(field))
    //将清洗完后的数组映射成Tuple类型
    val tuple=buildTuble(fields)
    //将Tuple转换成Bean类型
    val recoder=Record.tupled(tuple)
    //将实体类添加至集合,方便批处理提交
    datas.add(recoder);
    //提交索引到solr
    commitSolr(datas,false);
  }


  /***
    * 将数组映射成Tuple集合,方便与Bean绑定
    * @param array field集合数组
    * @return tuple集合
    */
  def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={
     array match {
       case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)
     }
  }


  /***
    *  对field进行加工处理
    * 空值替换为null,这样索引里面就不会索引这个字段
    * ,正常值就还是原样返回
    *
    * @param field 用来走特定规则的数据
    * @return 映射完的数据
    */
  def etl_field(field:String):String={
    field match {
      case "" => null
      case _ => field
    }
  }

  /***
    * 根据条件清空某一类索引数据
    * @param query 删除的查询条件
    */
  def deleteSolrByQuery(query:String): Unit ={
    client.deleteByQuery(query);
    client.commit()
    println("删除成功!")
  }


  def main(args: Array[String]) {
    //根据条件删除一些数据
    deleteSolrByQuery("t1:03")
    //远程提交时,需要提交打包后的jar
    val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";
    //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统
    System.setProperty("user.name", "webmaster");
    //初始化SparkConf
    val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");
    //上传运行时依赖的jar包
    val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"
    conf.setJars(seq)
    //初始化SparkContext上下文
    val sc = new SparkContext(conf);
    //此目录下所有的数据,将会被构建索引,格式一定是约定好的
    val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");
    //通过rdd构建索引
    indexRDD(rdd);
    //关闭索引资源
    client.close();
    //关闭SparkContext上下文
    sc.stop();


  }


  /***
    * 处理rdd数据,构建索引
    * @param rdd
    */
  def indexRDD(rdd:RDD[String]): Unit ={
    //遍历分区,构建索引
    rdd.foreachPartition(line=>indexPartition(line));
  }



}









ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建






有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园





  • 大小: 41.8 KB
分享到:
评论

相关推荐

    spark-annoy:在Apache Spark上构建Annoy索引 开发技术 - 其它.zip

    通过Spark的并行计算能力,我们可以快速构建和查询大规模数据的Annoy索引,而在C#环境中,尽管需要一些额外的适配工作,但也为开发提供了更多可能性。在实际项目中,理解这两者的结合方式以及如何在C#中操作,对于...

    大数据语义索引并行构建.pptx

    ### 大数据语义索引并行构建的关键知识点 #### 一、语义索引并行构建的必要性 1. **数据爆炸式增长**:随着互联网和物联网技术的飞速发展,数据量呈现指数级增长,特别是非结构化数据(如文本、图像和视频)的增长...

    基于hadoop和spark建立的倒排索引.zip

    总结来说,这个项目结合了Hadoop和Spark的优势,实现了大规模文本数据的倒排索引构建,从而提升了大数据环境下的文本检索性能。这展示了在人工智能领域,如何利用分布式计算技术解决实际问题,为大数据处理提供了一...

    郭小燕-基于Spark的大规模视频内容搜索(最终版)

    在总结中,作者指出,尽管Spark不是专为视频处理设计,但通过合理的设计和一些技术手段,如使用Spark的数据处理能力和与其他视频处理算法库的结合,仍然可以有效地构建大规模视频内容搜索系统。 文档还提到了使用...

    spark实现财经新闻搜索引擎(正文提取、中文分词、倒排索引构建、执行搜索)

    此外,将第二步和第三步的处理过程在大规模数据集上运行,如曙光系统,也是评估性能的重要标准。 通过以上步骤,我们可以构建出一个高效、准确的财经新闻搜索引擎,能够快速响应用户的查询需求,提供相关性强的搜索...

    一种基于Spark的分布式时态索引方法.pdf

    这种方法不仅设计了基于Spark的时态索引构建方法,而且引入了基于Spark RDD(弹性分布式数据集)的并行查询策略。这样的策略允许在大数据环境下对时态数据进行快速且有效的查询。 在实际应用中,时态查询通常涉及对...

    spark

    标题中的"Spark"指的是Apache Spark,一个开源的大数据处理框架,它被设计用于高效地执行大规模数据处理任务。Spark提供了一种快速、通用且可扩展的数据处理方式,支持批处理、交互式查询、流处理和机器学习等多种...

    电信设备-海量数据信息索引系统和索引构建方法.zip

    2. **分布式索引技术**:如HBase的RegionServer、Google的Bigtable、Apache Lucene/Solr的分布式搜索等,以及它们如何处理大规模数据的分布式存储和索引构建。 3. **数据压缩与存储优化**:讨论如何通过数据压缩减少...

    大数据处理平台Spark基础实践研究.pdf

    例如,计算圆周率的程序展示了如何使用Spark对大规模数据进行并行计算。而单词计数则是数据处理领域的一个经典案例,通过map-reduce模式将任务拆分成多个小任务,再进行合并计算。倒排索引在搜索引擎等领域应用广泛...

    大数据技术分享 Spark技术讲座 批量和流中自定义ML流水线的大规模模糊名称匹配 共28页.pdf

    ### 大数据技术分享:Spark技术讲座——批量与流中自定义ML流水线的大规模模糊名称匹配 #### 核心知识点概述 本资料主要聚焦于介绍如何利用Spark平台实现大规模模糊名称匹配(Fuzzy Name Matching),并通过构建...

    Spark for Data Science

    4. Spark的流处理能力:Spark Streaming是Spark的一个扩展模块,它将流处理作为一系列短小的批处理作业来处理,让开发者能够使用Spark API进行大规模的实时数据处理。用户可以将数据流视为一系列的小批量,然后用...

    spark search

    Spark Search正是为了解决这些问题而诞生,它利用Spark的并行处理能力,加速了对大规模数据集的搜索和分析,同时结合Lucene的搜索技术,提供了更强大的查询功能。 2. Spark Search 2.1. 架构 Spark Search的架构...

    Spark GraphX In Action 2016英文原版.pdf

    它为Spark提供了统一的API,用于创建、操作和分析大规模图数据集。GraphX将图数据抽象为一个顶点集合和边集合,每个顶点和边都可以携带丰富的属性。通过这种抽象,GraphX能够支持各种图算法,如PageRank、Shortest...

    mastering apache spark

    除了MLlib,Spark还提供了Spark SQL组件,用于处理大规模结构化数据的批处理和流式查询。Spark SQL通过DataFrame API提供了一个高级API来操作结构化数据,并且可以无缝处理来自Hive、JSON、Parquet和Cassandra等不同...

    Spark-SQL-Inverted-Index-Search-Engine:使用spark sql实现带有倒排索引的搜索引擎

    需要注意的是,Spark SQL的优势在于并行计算能力,因此在处理大规模数据时,性能表现优异。 总的来说,利用Spark SQL实现倒排索引搜索引擎,既展示了Spark处理大数据的能力,也展示了其在数据处理中的灵活性。通过...

    企业大数据处理:Spark、Druid、Flume与Kafka应用实践(超清完整版).pdf

    例如,书中可能会介绍如何利用Spark进行大规模数据的并行处理,如何使用Druid实现实时数据查询和分析,以及如何借助Flume和Kafka搭建可靠高效的数据流管道等。通过这些实战案例的学习,读者不仅可以掌握核心技术,还...

    基于MongoDB+Spark+ElasticSearch的电影推荐系统.zip

    Spark是一个分布式计算框架,擅长快速处理大规模数据;而ElasticSearch则是一个实时的、分布式的搜索和分析引擎,常用于数据检索与分析。 首先,MongoDB在项目中的作用是存储用户信息、电影数据以及用户行为记录等...

    SparkML算法详解(关于DataFrame的API操作)--数据挖掘(Scala与Java版)

    ### Spark MLlib 机器学习库概述 #### 一、Machine Learning **MLlib** 是 Apache Spark 的核心机器...通过使用 Spark MLlib,用户可以在分布式环境下高效地处理大规模数据集,并构建出强大而可靠的机器学习应用程序。

    mastering-apache-spark

    Apache Spark是一个开源分布式计算系统,它提供了一个快速、通用的计算引擎,适用于大规模数据处理。...通过阅读本文档,用户可以深入理解Spark的主要架构和组件,掌握如何使用Spark进行高效的大规模数据处理和分析。

Global site tag (gtag.js) - Google Analytics