`

spark thrift server 修改

 
阅读更多
org.apache.spark.sql.hive.thriftserver.server.UdfLoadUtils

package org.apache.spark.sql.hive.thriftserver.server

import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types.{DataType, DataTypes}

import scala.collection.mutable.ArrayBuffer
import scala.io.Source


object UdfLoadUtils {
  var configArray: Array[String] = getConfigArray
  def udfRegister( spark: SparkSession): Unit = {
    //   name,classname,returnType(udf need)
    configArray.foreach(record => {
      val registerInfoArray = record.split(",")
      println(s"register udf info : $record")
      if (registerInfoArray.size == 2) {
        val Array(udfName, className) = registerInfoArray
        val instance = getUDAFInstanceByClass(className)
        spark.sqlContext.udf.register(udfName, instance)
      } else if (registerInfoArray.size == 3) {
        val Array(udfName, className, returnType) = registerInfoArray
        var returnDataType: DataType = null
        returnType match {
          // Numeric types
          case "ByteType" => returnDataType = DataTypes.ByteType
          case "ShortType" => returnDataType = DataTypes.ShortType
          case "IntegerType" => returnDataType = DataTypes.IntegerType
          case "LongType" => returnDataType = DataTypes.LongType
          case "FloatType" => returnDataType = DataTypes.FloatType
          case "DoubleType" => returnDataType = DataTypes.DoubleType
          //case "DecimalType" => returnDataType = DecimalType
          // String types
          case "StringType" => returnDataType = DataTypes.StringType
          // Binary type
          case "BinaryType" => returnDataType = DataTypes.BinaryType
          // Boolean type
          case "BooleanType" => returnDataType = DataTypes.BooleanType
          // Datetime type
          case "TimestampType" => returnDataType = DataTypes.TimestampType
          case "DateType" => returnDataType = DataTypes.DateType
          // Complex types
          //case "ArrayType" => returnDataType = ArrayType
          //case "MapType" => returnDataType = MapType
          //case "StructType" => returnDataType = StructType
          case _ => None
        }
        spark.sqlContext.udf.registerJava(udfName, className, returnDataType)
      }
    })


  }

  def getUDAFInstanceByClass(className: String): UserDefinedAggregateFunction = {
    var instance: UserDefinedAggregateFunction = null
    try {
      instance = Class.forName(className).newInstance.asInstanceOf[UserDefinedAggregateFunction]
    } catch {
      case ex: Throwable => {
        println(s" instance $className  error ,error info : ${ex.getCause} ...................... ")
        ex.printStackTrace()
      }
    }
    instance
  }




  def getConfigArray():Array[String] ={
    val configArray = new ArrayBuffer[String]()
    try {
      println(s"SparkFiles config.properties , path :" + SparkFiles.get("udf.config"))
      val source = Source.fromFile(SparkFiles.get("udf.config"))
      val sparkFiles = source.getLines().toArray
      configArray ++= sparkFiles
      println(s"SparkFiles udf.config , path : SparkFiles.get(udf.config)  done!")
    } catch {
      case x: Throwable =>
    }

    try {
      println(s"local  config.properties , path : ./udf.config")
      val source = Source.fromFile("./udf.config")
      val localFiles = source.getLines().toArray
      if(configArray.size == 0 )  configArray ++= localFiles
      //localFiles.foreach(kv => println(s"localFiles config pop : key  ${kv._1} ,value ${kv._2}   "))
      println(s"local udf.config , path : ./udf.config done!")
    } catch {
      case x: Throwable =>
    }

    try {
      val path = SparkFiles.getRootDirectory() +  "/udf.config"
      println(s"SparkFilesroot udf.config ,  path  : ${path}")
      val source = Source.fromFile(path)
      val sparkFilesroot = source.getLines().toArray
      if(configArray.size == 0 )  configArray ++= sparkFilesroot
      println(s"sparkFilesroot udf.config , path : ./udf.config done!")
    } catch {
      case x: Throwable =>
    }

    configArray.toArray
  }




}



org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver.server

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}

/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
private[thriftserver] class SparkSQLOperationManager()
  extends OperationManager with Logging {

  val handleToOperation = ReflectionUtils
    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
  var udfNotInited = true


  override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {
    val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
    require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
      s" initialized or had already closed.")

   if(udfNotInited) {
     UdfLoadUtils.udfRegister(sqlContext.sparkSession)
     udfNotInited = false
   }



    val conf = sqlContext.sessionState.conf
    val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
      runInBackground)(sqlContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }
}
分享到:
评论

相关推荐

    基于Scala的Spark Thrift Server设计源码

    本设计源码提供了一个基于Scala的Spark Thrift Server。项目包含12731个文件,主要使用Scala、Java、Python、Shell、JavaScript、CSS、HTML、Ruby和C编程语言。文件类型包括3539个Scala源代码文件、1559个Q文件、...

    SparkSQL的分布式执行引擎(Spark ThriftServer)

    **SparkSQL的分布式执行引擎——Spark ThriftServer详解** SparkSQL是Apache Spark项目的一个核心组件,它提供了对结构化数据的处理能力,使得用户可以使用SQL或者DataFrame/Dataset API进行数据查询和分析。Spark ...

    spark thriftserver(或hive)基于mysql8.x元数据库

    支持mysql8.x,使用utf8mb4编码。

    spark-hive-thriftserver_2.11-2.1.3-SNAPSHOT-123456.jar

    spark-hive-thriftserver_2.11-2.1.spark-hive-thrift

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0...spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0.jar

    spark-hive-thriftserver_2.11-2.4.5.jar

    spark和hive结合依赖,如何使用请看我博客https://blog.csdn.net/z1987865446/article/details/109372818

    spark替代Hive实现ETL作业

    它支持Hive SQL语法和Hive Server,这使得从Hive到Spark-SQL的迁移变得更加平滑。 2. **性能提升**:在相同的硬件配置下,Spark引擎相比Hadoop MapReduce展现出显著的性能优势。这是因为Spark采用了内存计算模型,...

    thrift介绍、各种server的比较、多接口服务实现

    Thrift 是一个高性能的跨语言服务开发框架,最初由 Facebook 开发并开源。它通过接口定义语言(IDL)来定义数据类型和服务,使得不同语言之间能够进行高效且可靠的通信。Thrift IDL 文件被编译成多种编程语言的代码...

    goshine:spark-thriftserver 的 go 客户端库

    `goshine` 是一个专为 Apache Spark Thrift Server 设计的 Go 语言客户端库,它使得在 Go 应用程序中与 Spark 进行交互变得更加便捷。Spark Thrift Server 提供了一个 JDBC/ODBC 接口,允许各种编程语言通过标准的 ...

    pyhs2-master.zip

    《PyHS2:Python连接Hive与Spark Thrift Server的利器》 PyHS2,一个在Python编程环境中用于连接Hive Server和Spark Thrift Server的客户端库,为开发者提供了便捷的方式来执行HQL(Hive查询语言)并获取数据。这个...

    bee:适用于HiveServer 2 Spark ThriftServer的R客户端

    蜜蜂用于R的Hive客户端,它也与Spark的ThriftServer兼容。 与其他此类库不同,Bee不需要运行rJava,客户端JVM或任何其他服务。 Bee通过C ++节俭库与HiveServer交互,从而使客户端依赖性最小化,并且处理速度很快。...

    【Hue警告】必须在 HBase 服务中配置 Thrift Server 角色以使用 Hue HBase Browser 应用程序。

    比如,可能需要确保Hue知道如何连接到刚刚配置的Thrift Server,这可能涉及到修改Hue的配置文件,指定Thrift Server的地址和端口。更改完成后,只需重启Hue组件,警告应该就会消除,从而能够正常使用Hue HBase ...

    ThriftClient&Server.zip

    一组用thrift写的Java RPC框架,是两个maven项目,直接可以用,可以传字符串。 方便初学者了解thrift RPC各部分的原理,如果想自己加功能,就要新建XX.thrift文件,定义好接口,用用thrift生成

    maven-thrift-server

    【 Maven-Thrift-Server:构建Thrift服务的Maven实践】 在软件开发中,Thrift是一种高效的跨语言服务开发框架,由Facebook开发并开源。它允许定义数据类型和服务接口,然后自动生成各种编程语言的代码,使得不同...

    Thrift-server与spring集成

    - 创建Spring配置文件,如`thrift-server.xml`,配置Thrift服务器实例(通常是`TNonBlockingServer`或`TSimpleServer`)和Thrift服务处理器。 - 使用Spring的`<bean>`标签定义Thrift服务接口的实现,并通过`@...

    thrift server support db operation and rest protocol

    标题中的“thrift server support db operation and rest protocol”指出,我们正在讨论一个经过修改的Thrift服务器,它扩展了对数据库操作的支持,并且实现了REST协议。这个服务器现在能够处理HTTP请求,内容格式为...

    spark-2.4.8-bin-2.6.0-with-hive.tgz

    该包可以启动spark的thriftserver。可以解决报错failed load org.apache.spark.sql.hive.thriftserver.HiveThriftServer2的报错。

    2.Spark编译与部署(下)--Spark编译安装.pdf

    1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....

    thrift通过openssl加密证书实现双向通信

    thrift框架通过vs2013编译好的静态库,然后通过vs2013实现双向通信代码,通信协议利用openssl加密证书的方式来实现,本资源给出了完整的实现代码,证书可以在网上百度一下,看看如何生成客户端和服务端的,然后编译...

Global site tag (gtag.js) - Google Analytics