当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。
- Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。
- Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册
When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. This comes into play during shuffle operations, where potentially large amounts of data are transferred. By default Spark will use Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation, but cannot serialize all types of objects “out of the box.” Almost all applications will benefit from shifting to Kryo for serialization.
代码示例:
package spark.examples.kryo import com.esotericsoftware.kryo.Kryo import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.serializer.KryoRegistrator //两个成员变量name和age,同时必须实现java.io.Serializable接口 class MyClass1(val name: String, val age: Int) extends java.io.Serializable { } //两个成员变量name和age,同时必须实现java.io.Serializable接口 class MyClass2(val name: String, val age: Int) extends java.io.Serializable { } //注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializable class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[MyClass1]); kryo.register(classOf[MyClass2]); } } object SparkKryo { def main(args: Array[String]) { //设置序列化器为KryoSerializer,也可以在配置文件中进行配置 System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator") val conf = new SparkConf() conf.setAppName("SparkKryo") conf.setMaster("local[3]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(new MyClass1("Tom", 31), new MyClass1("Jack", 23), new MyClass1("Mary", 19))) val fileDir = "file:///d:/wordcount" + System.currentTimeMillis() //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中 rdd.saveAsObjectFile(fileDir) //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1 val rdd1 = sc.objectFile[MyClass1](fileDir + "/" + "part-00000") rdd1.foreachPartition(iter => { while (iter.hasNext) { val objOfMyClass1 = iter.next(); println(objOfMyClass1.name) } }) sc.stop } }
查看保存到文件中的内容,是个二进制数据:
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable 蓑_xi??蛔?z汲 i e ur [Lspark.examples.kryo.MyClass1;? 独#v? xp sr spark.examples.kryo.MyClass1z 峌# xp
问题:
对于普通字符,数字,字符串写入到object文件,是否也是序列化的过程?明确指定使用kvro序列化Int之后,保存的文件确实是二进制的。去掉对Int的注册之后,结果还是一样,序列化的结果完全一样,结果都是:
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable F脗?庻籡陭姯&? ' # ur [IM篳&v瓴? xp
import com.esotericsoftware.kryo.Kryo import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.serializer.KryoRegistrator //注册使用Kryo序列化的类,对Int进行序列化 class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[Int]); } } object SparkKryoPrimitiveType { def main(args: Array[String]) { //设置序列化器为KryoSerializer,也可以在配置文件中进行配置 System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator") val conf = new SparkConf() conf.setAppName("SparkKryoPrimitiveType") conf.setMaster("local[3]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(1, 3, 7, 9, 11, 22)) val fileDir = "file:///d:/wordcount" + System.currentTimeMillis() //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中 rdd.saveAsObjectFile(fileDir) //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1 val rdd1 = sc.objectFile[Int](fileDir + "/" + "part-00000") rdd1.foreachPartition(iter => { while (iter.hasNext) { println(iter.next()) } }) sc.stop } }
其它:
指定使用Kyro序列化,以及注册Kyro序列化类,可可以使用如下方式
val conf = new SparkConf() //这句是多于的,调用conf的registerKryoClasses时,已经设置了序列化方法 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Be strict about class registration ///如果一个要序列化的类没有进行Kryo注册,则强制Spark报错 conf.set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))
/** * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { val allClassNames = new LinkedHashSet[String]() allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) set("spark.kryo.classesToRegister", allClassNames.mkString(",")) set("spark.serializer", classOf[KryoSerializer].getName) this }
相关推荐
对于java原生序列化,我们使用了ObjectOutputStream和ObjectInputStream来实现序列化和反序列化,而对于Kryo序列化,我们使用了Kryo序列化器来实现序列化和反序列化。 6. 序列化的应用:序列化有很多实际应用,例如...
Kryo 是一个快速高效的Java对象图形序列化框架,主要特点是性能、高效和易用。该项目用来序列化对象到文件、数据库或者网络。示例代码:Kryo kryo = new Kryo(); // ... Output output = new Output(new...
7.所使用的知识点是spark core,spark SQL,spark streaming等三个技术框架。 8.主要是数据倾斜,线上故障,性能调优,troubleshooting等经验。 9.使用模拟数据,希望达到的效果。 10.需求分析,方案设计,数据设计,...
# 基于Netty+Kyro+Zookeeper的RPC框架 [中文](./README.md)|English ## 前言 通过这个简易的轮子,你可以学到 RPC 的底层原理及原理以及各种 Java 编码实践的运用。 ## 介绍 由于 Guide哥自身精力和能力...
guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,并且集成了 Check Style 规范代码结构,非常适合阅读和学习。
第四章 Jenkins集成Gitlab应用管理->创建应用->配置重定向url配置前备份config.xml文件,如果jenkins前面有代理需要改一下upst
第六章 前端发布流水线(NodeJs)1. 项目设置项目配置部分主要是将网站源代码上传到github,然后搭建用户访问的web服务器。再经过Jenkins配置发
Kyro Linux Tools允许Kyro 1/2图形卡的所有者进行配置和调整,以充分利用它
guide-rpc-framework 最近被一些不友好的人“喷”了,简单写了一篇记录一下: 中文| 本着开源精神,本项目README已经同步了英文版本。另外,项目的源代码的注释大部分也修改为了英文。 ...如果要提交问题或pr的话,请在...
事务事件通过gRPC报告给协调器,事务负载由Kyro序列化/反序列化。 低入侵。 您需要做的就是添加2-3个注释和相应的补偿方法。 易于部署。 所有组件都可以通过docker引导。 支持正向(重试)和反向(补偿)恢复。 ...
rpc框架前言学习javaGuide,自己动手造个轮子,通过这个简易的轮子,可以学到RPC的扭曲原理和原理以及各种Java编码实践... kryo:序列化/反序列化工具。 zookeeper:做注册中心。项目骨骼使用下载运行zookeeper。服务端
在分布式环境中,序列化技术(如XML、JSON、Hessian、Avro、Protobuf、Kyro等)用于数据在网络间传输,选择合适的序列化方案对于性能和数据大小都有直接影响。 综上所述,IT行业的多线程编程、并发集合、分布式存储...
基于kyro实现高效通用序列化协议; 基于zookeeper实现服务注册; 基于代理与反射使得接口简单易用; 客户端实现随机、一致性哈希、roundbin三种负载均衡算法; 客户端基于hystrix做限流和及时熔断; 对每一次客户端...
MRPC mrpc是一种简洁易用的分布式服务化治理框架。特性调用透明,像调用本地方法一样使用RPC服务高效支持分布式,基于Zookeeper的服务注册和发现完美集成Spring / SpringBoot项目HTTP传输协议多种负载均衡策略进行下...
guide-rpc-framework 最近被一些不友好的人"喷"了,简单写了一篇记录一下: 中文| 本着开源精神,本项目README已经同步了英文版本。另外,项目的源代码的注释大部分也修改为了英文。 如访问速度不佳,可放在 Gitee ...
- **高性能**:事务事件通过 gRPC 报告,同时利用 Kyro 进行序列化和反序列化,提高性能。 - **低侵入**:只需要少量的注解和编写补偿方法即可实现分布式事务功能。 - **部署简单**:支持 Docker 快速部署。 - **...
它根据GitHub仓库的模板文件生成一个新的项目结构,简化了项目的初始化过程。 `akka-http-scalajs.g8`模板是专为快速启动使用Akka HTTP和Scala.js的项目而设计的。当你使用这个模板时,你将得到一个已经配置好的...
Dubbox now means Dubbo eXtensions, and it adds features like RESTful remoting, Kyro/FST serialization, etc to the Dubbo service framework
从dubbox-2.8.4开始,所有依赖库的使用方式将和dubbo原来的一样:即如果要使用REST、Kyro、FST、Jackson等功能,需要用户自行手工添加相关的依赖。用户名密码都默认为:root
Dubbox adds features like RESTful remoting, Kyro/FST serialization, etc to the popular . It's been used by several projects of , which is one of the major e-commerce companies in China. 主要贡献者 沉...