`

spark thrift server 修改

 
阅读更多
org.apache.spark.sql.hive.thriftserver.server.UdfLoadUtils

package org.apache.spark.sql.hive.thriftserver.server

import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types.{DataType, DataTypes}

import scala.collection.mutable.ArrayBuffer
import scala.io.Source


object UdfLoadUtils {
  var configArray: Array[String] = getConfigArray
  def udfRegister( spark: SparkSession): Unit = {
    //   name,classname,returnType(udf need)
    configArray.foreach(record => {
      val registerInfoArray = record.split(",")
      println(s"register udf info : $record")
      if (registerInfoArray.size == 2) {
        val Array(udfName, className) = registerInfoArray
        val instance = getUDAFInstanceByClass(className)
        spark.sqlContext.udf.register(udfName, instance)
      } else if (registerInfoArray.size == 3) {
        val Array(udfName, className, returnType) = registerInfoArray
        var returnDataType: DataType = null
        returnType match {
          // Numeric types
          case "ByteType" => returnDataType = DataTypes.ByteType
          case "ShortType" => returnDataType = DataTypes.ShortType
          case "IntegerType" => returnDataType = DataTypes.IntegerType
          case "LongType" => returnDataType = DataTypes.LongType
          case "FloatType" => returnDataType = DataTypes.FloatType
          case "DoubleType" => returnDataType = DataTypes.DoubleType
          //case "DecimalType" => returnDataType = DecimalType
          // String types
          case "StringType" => returnDataType = DataTypes.StringType
          // Binary type
          case "BinaryType" => returnDataType = DataTypes.BinaryType
          // Boolean type
          case "BooleanType" => returnDataType = DataTypes.BooleanType
          // Datetime type
          case "TimestampType" => returnDataType = DataTypes.TimestampType
          case "DateType" => returnDataType = DataTypes.DateType
          // Complex types
          //case "ArrayType" => returnDataType = ArrayType
          //case "MapType" => returnDataType = MapType
          //case "StructType" => returnDataType = StructType
          case _ => None
        }
        spark.sqlContext.udf.registerJava(udfName, className, returnDataType)
      }
    })


  }

  def getUDAFInstanceByClass(className: String): UserDefinedAggregateFunction = {
    var instance: UserDefinedAggregateFunction = null
    try {
      instance = Class.forName(className).newInstance.asInstanceOf[UserDefinedAggregateFunction]
    } catch {
      case ex: Throwable => {
        println(s" instance $className  error ,error info : ${ex.getCause} ...................... ")
        ex.printStackTrace()
      }
    }
    instance
  }




  def getConfigArray():Array[String] ={
    val configArray = new ArrayBuffer[String]()
    try {
      println(s"SparkFiles config.properties , path :" + SparkFiles.get("udf.config"))
      val source = Source.fromFile(SparkFiles.get("udf.config"))
      val sparkFiles = source.getLines().toArray
      configArray ++= sparkFiles
      println(s"SparkFiles udf.config , path : SparkFiles.get(udf.config)  done!")
    } catch {
      case x: Throwable =>
    }

    try {
      println(s"local  config.properties , path : ./udf.config")
      val source = Source.fromFile("./udf.config")
      val localFiles = source.getLines().toArray
      if(configArray.size == 0 )  configArray ++= localFiles
      //localFiles.foreach(kv => println(s"localFiles config pop : key  ${kv._1} ,value ${kv._2}   "))
      println(s"local udf.config , path : ./udf.config done!")
    } catch {
      case x: Throwable =>
    }

    try {
      val path = SparkFiles.getRootDirectory() +  "/udf.config"
      println(s"SparkFilesroot udf.config ,  path  : ${path}")
      val source = Source.fromFile(path)
      val sparkFilesroot = source.getLines().toArray
      if(configArray.size == 0 )  configArray ++= sparkFilesroot
      println(s"sparkFilesroot udf.config , path : ./udf.config done!")
    } catch {
      case x: Throwable =>
    }

    configArray.toArray
  }




}



org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver.server

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}

/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
private[thriftserver] class SparkSQLOperationManager()
  extends OperationManager with Logging {

  val handleToOperation = ReflectionUtils
    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
  var udfNotInited = true


  override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {
    val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
    require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
      s" initialized or had already closed.")

   if(udfNotInited) {
     UdfLoadUtils.udfRegister(sqlContext.sparkSession)
     udfNotInited = false
   }



    val conf = sqlContext.sessionState.conf
    val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
      runInBackground)(sqlContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }
}
分享到:
评论

相关推荐

    goshine:spark-thriftserver 的 go 客户端库

    `goshine` 是一个专为 Apache Spark Thrift Server 设计的 Go 语言客户端库,它使得在 Go 应用程序中与 Spark 进行交互变得更加便捷。Spark Thrift Server 提供了一个 JDBC/ODBC 接口,允许各种编程语言通过标准的 ...

    hive3.x编译spark3.x包

    例如,`mvn clean package -Phadoop-3.2 -Pyarn -Pspark-3.0 -Phive -Phive-thriftserver -Dhadoop.version=3.2.0 -Dhive.version=3.1.x`。 7. **验证和测试**:编译完成后,将生成的二进制包部署到Hadoop集群上,...

    清华大学精品大数据实战课程(Hadoop、Hbase、Hive、Spark)PPT课件含习题(29页) 第6章 Spark SQL.pptx

    - **Thrift JDBC/ODBC Server**:Spark SQL可以通过Thrift服务器提供JDBC和ODBC接口,使得其他支持这些协议的工具(如Excel、Tableau)能够连接并查询Spark数据。配置Thrift服务器包括启动服务、检查服务状态以及...

    基于CDH7平台的Spark3.5.0定制版设计源码

    本项目为基于CDH7平台的Spark3.5.0定制...项目主要针对Spark3.5.0版本进行定制化编译,但Thrift Server部分的修改尚未完成。该版本适用于需要集成Spark处理框架的应用场景,尤其适用于对Spark进行深度定制的开发需求。

    2-8+Apache+Kyuubi+(Incubating)+在网易的深度实践.pdf

    Kyuubi相比于传统的Hive Server2和Spark ThriftServer,具有显著的优势。首先,Kyuubi提供了多租户支持,这意味着多个用户或部门可以在同一平台上安全地共享资源,而无需担心数据隔离问题。其次,Kyuubi支持Hive ...

    大数据平台notebook工具-Zeppelin

    - 若使用 Spark,还需要启动 Spark Thrift Server,以便于通过 SQL 进行查询。 3. **常见问题及解决方案**: - 问题 1 涉及到 Hadoop 版本不兼容的问题。当遇到 `NoSuchMethodError` 错误时,通常是因为类路径中...

    03开源NewSql数据库TiDB-Deep Dive into TiDB

    去年十月,TiDB 1.0 版本发布,在接下来的六个月中,开发团队一方面...ThriftServer/JDBC 支持 Spark-SQL 交互支持 PySpark Shell 支持 SparkR 支持 相关链接 TiDB 的详细介绍:点击查看 TiDB 的下载地址:点击下载

    apache-hive-1.2.1-src.zip

    5. **Hive Server2**: Hive Server2 提供了一个安全的、多用户的接口来执行 Hive 查询。`hiveserver2` 目录包含了服务器端的实现,包括 Thrift 接口和 JDBC/ODBC 支持。 6. **Hcatalog**: Hive HCatalog 是一个元...

    HIVE 0.12安装配置(HADOOP2.2)

    <description>Thrift URI for the Hive metastore server ``` **7. 启动Hive服务** 启动Hive Metastore 服务:`hive --service metastore`。接着启动Hive Server2:`hive --service hiveserver2`。 **8. 使用...

Global site tag (gtag.js) - Google Analytics