`
distantlight1
  • 浏览: 44410 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spark枚举类作为Key时跨进程问题

阅读更多

最近在集群上跑spark时发现有些reduceByKey操作结果不符合预期,大致伪代码如下(公司统一用java,就没写成scala,用了scala的简写节省字数)。就是类似WordCount的简单计算,DimType是一个枚举类

        JavaPairRDD<DimType, Long> rawRdd=...;
        JavaPairRDD<DimType, Long> reducedRdd = entryPairRDD
                .reduceByKey(_+_);

        List<Tuple2<DimType, Long>> results = reducedRdd.collect();

        for (Tuple2<DimType, Long> tuple2 : results) {
            logger.info("Result: " + tuple2);
            ...;
        }

 脚本在单节点运行正常,但是设置多个Executor(如spark.executor.instances=2)结果就发生重复项,输出大致如下这样:

Result: (A,1)
Result: (A,2)
Result: (B,3)
Result: (C,3)
Result: (B,2)
Result: (C,4)

 所有枚举项都出现了两次(正好等于executor的实例数),就好像各个Executor之间没有进行reduce一样

 

出现这个情况的原因比较tricky,因为spark的Shuffle过程会根据key的hashCode来判定相等,而恰恰Enum类的hashCode比较特殊,系统写死了就等于内存地址

public final int hashCode() {
     return super.hashCode();
}

 这就导致在同不同进程里的枚举项被当成了不同的key,于是没有聚合起来

本来重写hashCode就可以解决问题,但坑爹的是Enum.hashCode()还被定义成final方法,无法被子类覆盖。所以只能自己在外面再封装一层对象,然后重新hashCode(),例如用Enum.name().hashCode()。或者干脆就不要用枚举类来做RDD的Key,以免发生类似问题

 

另外如果用其他自定义类做key的时候,一定要记得重写hashCode和equals,否则跨进程的时候也会发生类似问题

分享到:
评论

相关推荐

    Spark中文分词+文本分类.rar

    在Spark中,我们可以调用`LogisticRegression`类,设置相应的参数,如正则化强度、最大迭代次数等,然后训练模型。在训练过程中,数据通常会被划分为训练集和验证集,以便评估模型性能。 在完成模型训练后,我们...

    Spark跨集群bulk load(6-2)

    在大数据处理领域,Spark作为一个高效的分布式计算框架,广泛应用于数据处理、分析和机器学习任务。在实际的企业级应用中,经常需要将数据从一个Spark集群批量加载到另一个集群,这就是所谓的"Spark跨集群bulk load...

    跨进程通信实现原理分析

    8. **分布式对象和框架**:例如,通过使用Hadoop、Spark等分布式计算框架,可以实现大规模的跨进程通信,处理大数据问题。 在实际开发中,选择哪种跨进程通信方式取决于具体需求,如数据量、实时性、安全性等因素。...

    spark升级后无logging类

    在本例中,"spark升级后无logging类"的问题是由于从Spark 1.x升级到Spark 2.0时遇到的一个典型挑战。在Spark 1.x版本中,`org.apache.spark.Logging`接口被广泛用于实现日志记录功能,但在Spark 2.0中,这个接口已经...

    Spark实验:Standalone模式安装部署(带答案)1

    7. **验证安装部署**:通过jps命令检查各节点的进程,确保Spark Master和Worker正常运行,同时可访问Web UI进行监控。 **五、注意事项** 1. 在配置过程中,务必确保环境变量设置正确,避免因路径错误导致的问题。 ...

    spark期末复习题总结

    Scala中可以在类中定义object,调用函数时指定参数顺序可以跟函数定义时的顺序不同。 3. Spark的计算逻辑会被解析成DAG(有向无环图),这个解析操作由Driver完成。DAG是一种有向无环图,每个节点表示一个RDD操作,...

    spark的样例代码

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性而闻名。这个"spark的样例代码"压缩包很可能是为了帮助初学者或开发者快速理解和掌握Spark的基本操作和编程模型。接下来,我们将...

    springboot与spark整合开发, 练习spark api

    Spark作为一个快速、通用且可扩展的大数据处理框架,而Spring Boot则为构建微服务提供了简洁、高效的解决方案。本篇文章将深入探讨如何将Spring Boot与Spark进行整合,以及通过Spark API实现数据处理。 首先,我们...

    spark安装包+spark实验安装软件

    在Hadoop集群上运行Spark,通常会使用YARN作为资源管理器。这时,需要配置Spark的`yarn-client`或`yarn-cluster`模式,并设置相关的Hadoop配置。 **8. Spark的性能调优** 性能调优是Spark应用的关键环节,包括调整...

    【大数据学习资料】Spark单value,key-value类型21个算子(图解与源码).zip

    Spark是Apache Hadoop生态系统中的一个分布式计算框架,它在处理大规模数据时表现出高效性和灵活性。在Spark中,数据被组织成两种主要的数据结构:RDD(弹性分布式数据集)和DataFrame/Dataset。本资料主要聚焦于RDD...

    spark全套视频教程

    《Spark全套视频教程》是一份全面讲解Apache Spark的教育资源,旨在帮助学习者深入理解和掌握这一强大的大数据处理框架。...通过深入学习和实践,你将能够运用Spark解决复杂的数据问题,实现数据驱动的决策和洞察。

    spark笔记整理文档

    Spark,作为大数据处理领域的重要框架,以其高效、易用和弹性伸缩等特性,被广泛应用于大规模数据处理、实时计算、机器学习和图形处理等多个场景。本篇笔记将深入探讨Spark的核心概念、架构设计以及实际应用,旨在...

    java 连接spark工具类

    资源是java连接spark的源码,里面有支持连接hive,spark的方法,内部有两个方法,一个是getMaps,获取一个List对象,用于直接使用,一个是getJson,将获取到的数据转换成json,方便好用,不想下载的可以去我的博客去...

    Spark入门(完整版)

    Executor是Spark运行时的进程,它们在集群中运行,负责执行计算任务和缓存数据。 五、Spark Streaming Spark Streaming支持微批处理,将实时数据流分割成小批量处理,保持了Spark的高性能和低延迟。它可以与多种...

    SparkDemo.rar

    Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark Count、Spark SQL以及Spark Streaming。接下来,我们将深入探讨...

    Spark SQL操作JSON字段的小技巧

    Spark SQL是一款强大的大数据处理工具,它提供了对JSON数据的内置支持,使得在处理JSON格式的数据时更加便捷。本文将详细介绍Spark SQL操作JSON字段的几个关键函数:get_json_object、from_json 和 to_json,以及...

    cloudera-spark 官方文档

    根据提供的文件信息,我们可以从Cloudera的Spark官方文档中提炼出以下关键知识点...Spark作为一种强大的大数据处理工具,在数据分析、机器学习等领域有着广泛的应用前景。希望本文能够帮助大家更好地理解和使用Spark。

    Spark简单测试案例

    ### Spark简单测试案例 #### 一、测试环境 在本案例中,我们将使用特定的环境配置来进行测试。这些配置包括: - **集群环境**:基于Hadoop 1.2 和 Spark 1.0 构建的集群环境。 - **操作系统**:所有节点均采用 ...

    Spark生态圈介绍

    4. 随处运行:Spark 具有很强的适应性,能够读取 HDFS、Cassandra、HBase、S3 和 Techyon 等持久层读写原生数据,能够以 Mesos、YARN 和自带的 Standalone 作为资源管理器调度 job,来完成 Spark 应用程序的计算。...

    spark jdbc 读取并发优化

    在处理大数据时,Spark作为一个强大的分布式数据处理框架,能够通过其弹性分布式数据集(RDD)和DataFrame等数据结构,实现数据的并行处理。然而,在使用Spark与数据库交互时,尤其是使用JDBC(Java Database ...

Global site tag (gtag.js) - Google Analytics