`
bit1129
  • 浏览: 1068064 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark103】Task not serializable

 
阅读更多

Task not serializable是Spark开发过程最令人头疼的问题之一,这里记录下出现这个问题的两个实例,一个是自己遇到的,另一个是stackoverflow上看到。等有时间了再仔细探究出现Task not serialiazable的各种原因以及出现问题后如何快速定位问题的所在,至少目前阶段碰到此类问题,没有什么章法

1.

 

package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._

import scala.collection.mutable

object NetCatStreamingWordCount3 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    lines.foreachRDD(rdd => {
      rdd.foreachPartition(partitionIterable=> {
        val map = mutable.Map[String, String]()
        while(partitionIterable.hasNext) {
          val v = partitionIterable.next()
            map += v ->v
        }

        map.foreach(entry => {
          if (entry._1.equals("abc")) {
            return; //return语句导致Task无法序列化,两个字:诡异,三个字:太诡异
          }
        })

      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

异常信息:

org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
	at spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1.apply(NetCatStreamingWordCount3.scala:15)
	at spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1.apply(NetCatStreamingWordCount3.scala:14)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
	- object not serializable (class: java.lang.Object, value: java.lang.Object@143d53c)
	- field (class: spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1, name: nonLocalReturnKey1$1, type: class java.lang.Object)
	- object (class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1, <function1>)
	- field (class: spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1)
	- object (class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1$$anonfun$apply$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
	... 20 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
	at spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1.apply(NetCatStreamingWordCount3.scala:15)
	at spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1.apply(NetCatStreamingWordCount3.scala:14)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
	- object not serializable (class: java.lang.Object, value: java.lang.Object@143d53c)
	- field (class: spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1, name: nonLocalReturnKey1$1, type: class java.lang.Object)
	- object (class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1, <function1>)
	- field (class: spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1)
	- object (class spark.examples.streaming.NetCatStreamingWordCount3$$anonfun$main$1$$anonfun$apply$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
	... 20 more

 

 

 

 

 

2.

 

package spark.examples.rdd

import org.apache.spark.{SparkConf, SparkContext}

object TaskNotSerializationTest {
  def main(args: Array[String]) {
    new Testing().runJob
  }
}

class Testing {
  val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest")
  val sc = new SparkContext(conf)
  val rdd = sc.parallelize(List(1, 2, 3))

  def runJob = {
    rdd.map(someFunc).collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

 异常信息:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
	at org.apache.spark.rdd.RDD.map(RDD.scala:286)
	at spark.examples.rdd.Testing.runJob(TaskNotSerializationTest.scala:20)
	at spark.examples.rdd.TaskNotSerializationTest$.main(TaskNotSerializationTest.scala:10)
	at spark.examples.rdd.TaskNotSerializationTest.main(TaskNotSerializationTest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.io.NotSerializableException: spark.examples.rdd.Testing
Serialization stack:
	- object not serializable (class: spark.examples.rdd.Testing, value: spark.examples.rdd.Testing@b8972)
	- field (class: spark.examples.rdd.Testing$$anonfun$runJob$1, name: $outer, type: class spark.examples.rdd.Testing)
	- object (class spark.examples.rdd.Testing$$anonfun$runJob$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
	... 11 more

 

 第二个问题:stackoverflow上有比较详细的讨论:

http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou

 

 

分享到:
评论

相关推荐

    org.apache.spark.SparkException: Task not serializable

    org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark...

    Databricks Spark 知识库

    Job aborted due to stage failure: Task not serializable 缺失依赖 执行 start-all.sh 错误 - Connection refused Spark 组件之间的网络连接问题 性能 & 优化 一个 RDD 有多少个分区 数据本地性 Spark Streaming ...

    Python json 错误xx is not JSON serializable解决办法

    Python json 错误xx is not JSON serializable解决办法 在使用json的时候经常会遇到xxx is not JSON serializable,也就是无法序列化某些对象。经常使用django的同学知道django里面有个自带的Encoder来序列化时间等...

    Intent传递对象Serializable

    本教程将详细讲解如何通过Serializable接口来实现Intent对象的传递。 首先,了解Serializable接口。在Java中,Serializable是用于序列化和反序列化的接口。当一个类实现了这个接口,它的实例就可以被转化为一串字节...

    轉Serializable至Stream

    在Java编程中,`Serializable`接口是用于对象序列化的重要工具。对象序列化是指将一个对象的状态转换为字节流的过程,以便存储或通过网络进行传输。另一方面,`Stream`通常指的是I/O流,它是Java处理输入/输出数据的...

    Serializable的增删改查

    Serializable的增删改查操作,已经经过验证,可以直接运行。

    java->serializable深入了解

    java-&gt;serializable深入了解 java-&gt;serializable深入了解 java-&gt;serializable深入了解

    (存入json文件出错)TypeError: Object of type int64 is not JSON serializable

    问题描述 因为numpy的int类型无法被json化,所以需要将numpy的int转为原生类型。 解决方案 # pandas返回的 sex_cnt = marks['sex'].value_counts() type(sex_cnt['男']) # numpy.int64 # 3种转化方法 ...

    序列化 serializable demo

    例如,`MySerializable.java`和`Product.java`两个文件可能分别代表实现了`Serializable`接口的类。`MySerializable`可能是自定义的一个示例类,而`Product`可能是表示产品的类,它们都包含了可序列化的属性。 在`...

    Laravel开发-serializable-values

    在Laravel框架中,"serializable-values"是一个关键概念,它涉及到对象的序列化与反序列化,这对于数据存储和传输至关重要。在这个话题下,我们将深入探讨Laravel如何处理可序列化的值,以及如何利用Luminark提供的...

    Serializable在C#中的作用.NET 中的对象序列化

    ### C#中Serializable的作用与对象序列化详解 #### 一、引言 在现代软件开发中,特别是基于.NET框架的应用程序开发中,对象序列化是一项非常重要的技术。它允许将对象的状态转换为一种持久的形式(如文件或网络传输...

    Serializable java序列号

    在Java中,如果一个类需要支持序列化,那么该类需要实现`java.io.Serializable`接口,虽然这个接口没有定义任何方法,但是它的存在作为一个标记,表明该类的对象可以被序列化。 序列化的优点主要有以下几点: 1. **...

    Android序列化——Serializable与Parcelable

    本文将深入探讨两种主要的序列化方式:Serializable和Parcelable,并比较它们的优缺点以及适用场景。 首先,我们来了解什么是序列化。序列化是将对象的状态转换为可存储或可传输的形式的过程。在Android中,这个...

    bundle传递基本数据,Parcelable类型数据,Serializable类型数据

    本篇将详细介绍如何通过`Bundle` 传递基本数据类型、Parcelable类型数据以及Serializable类型数据。 ### 一、基本数据类型的传递 在Android中,基本数据类型包括int、boolean、float、double、char等。通过`put()`...

    可序列化接口Serializable

    在Java编程语言中,`Serializable`接口是一个非常重要的概念,它是实现对象持久化的关键。本文将深入探讨`Serializable`接口的细节,以及与其相关的高级知识。 `Serializable`接口是Java中的一个标记接口,没有包含...

    java.io.Serializable序列化问题

    ### Java.io.Serializable 序列化问题详解 #### 一、序列化的概念与作用 在 Java 编程语言中,序列化是一种将对象的状态(即成员变量的值)转换为可以存储或传输的形式的过程。通常,这种形式是字节流,但也可以是...

    spark 2 笔记

    public interface Row extends scala.Serializable ``` 通过以上介绍,我们可以看到 Spark 为大数据处理提供了强大的支持。无论是从数据读取、转换还是结果输出,Spark 均提供了一系列高度优化的功能,使得开发者...

    Java_Serializable(序列化)的理解和总结

    ### Java Serializable(序列化)的理解和总结 #### 一、序列化的定义与目的 序列化是一种将对象的状态转换为可以存储或传输的形式的过程。在Java中,如果一个类实现了`Serializable`接口,那么该类的对象就可以被...

    java对象序列化Demo------------Serializable

    java 序列化 对象 Serializable 写着玩的Demo 简单 实用

Global site tag (gtag.js) - Google Analytics