由于爬虫抓取等原因,会导致单一ID的日志条数过多。在spark中,同一ID的日志会被shuffle到单一的节点上进行处理,导致系统运行缓慢!
因为这些用户的访问本来就是无效的,所以可以直接过滤掉这部分用户。
话不多说,scala的DataFrame版输出和代码如下(参考链接见代码注释):
引用
spark version: 1.6.1
Original DataFrame (with fake users):
+---------+------+
| id| movie|
+---------+------+
| u1|WhoAmI|
| u2|Zoppia|
| u2| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
+---------+------+
Fake Users with count (threshold=2):
+---------+-----+
| id|count|
+---------+-----+
|FakeUserA| 6|
|FakeUserB| 4|
+---------+-----+
Fake Users:
Set(FakeUserA, FakeUserB)
Valid users after filter:
+---+------+
| id| movie|
+---+------+
| u1|WhoAmI|
| u2|Zoppia|
| u2| Lost|
+---+------+
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
/**
* Created by colinliang on 2017/8/14.
*/
case class IDMovie(id: String, movie: String)
object BroadcastTest {
def main(args: Array[String]): Unit = {
Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
println("spark version: " + sc.version)
sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val spark = new SQLContext(sc)
val idvids = List(
IDMovie("u1", "WhoAmI")
, IDMovie("u2", "Zoppia")
, IDMovie("u2", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
);
val df = spark
.createDataFrame(idvids)
.repartition(col("id"))
println("Original DataFrame (with fake users): ")
df.show()
// val df_fakeUsers_with_count=df.sample(false,0.1).groupBy(col("id")).count().filter(col("count")>2).limit(10000)//实际中可以根据需要仅采样一部分数据
val df_fakeUsers_with_count=df.groupBy(col("id")).count().filter(col("count")>2)
/**DataFrame 中的groupby 为aggregation形式的,不涉及shuffle,速度很快。参见:https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html
更多聚合函数参见:https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.functions$
此外,还可以通过agg()函数对groupBy后的数据的多列进行聚合
*/
println("Fake Users with count (threshold=2):")
df_fakeUsers_with_count.show()
val set_fakeUsers=df_fakeUsers_with_count.select("id").collect().map(_(0)).toList.map(_.toString).toArray[String].toSet
println("Fake Users:")
println(set_fakeUsers)
val set_fakeUsers_broadcast=sc.broadcast(set_fakeUsers)
/** broadcast教程:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
* 官方文档: http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
*/
val udf_isValidUser = udf((id: String) => !set_fakeUsers_broadcast.value.contains(id)) //直接用set_highCountUsers.contains(id) 也行,但效率低,因为反序列化的次数可能比较多,参见http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
val df_filtered=df.filter(udf_isValidUser(col("id")) ) //过滤掉这部分用户
/** 如果是要保留部分用户,而不是过滤掉这部分用户,且用户量很小,无需定义UDF:
* https://stackoverflow.com/questions/39234360/filter-spark-scala-dataframe-if-column-is-present-in-set
* val validValues = Set("A", "B", "C")
* data.filter($"myColumn".isin(validValues.toSeq: _*))
*/
/** 如果是要保留部分用户,且用户量比较大,可以用broadcast 的DataFrame:
* https://stackoverflow.com/questions/33824933/spark-dataframe-filtering-retain-element-belonging-to-a-list
* import org.apache.spark.sql.functions.broadcast
* initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
*/
println("\nValid users after filter:")
df_filtered.show()
}
}
分享到:
相关推荐
3. 数据倾斜:识别并处理数据分布不均问题,通过分区策略优化数据分布。 4. 缓存策略:利用RDD持久化,将常用数据缓存到内存,减少重复计算。 5. SQL优化:使用DataFrame或Dataset API,避免冗余转换,使用合适的...
9. **倾斜键处理**:Spark SQL提供了处理数据倾斜的策略,如采样重分布和广播JOIN,以解决大规模数据集处理中的性能瓶颈。 10. **Spark SQL与Spark Streaming集成**:Spark SQL可以与Spark Streaming结合,对实时流...
-并行度和分区操作:合理控制并行度,避免数据倾斜。 6. Spark的生态系统 Spark生态包含众多项目,除了核心的Spark之外,还包括了用于实时数据处理的Spark Streaming、提供高级SQL和数据框操作的Spark SQL、用于...
- Spark性能调优策略:包括内存管理、持久化级别选择、广播变量使用、并行度设置、数据倾斜处理等。 - Spark容错机制:如RDD的不变性和分区概念,使得在节点失败时可以通过重新计算恢复数据。 此外,还可能涉及到...
9. **Spark性能优化**:提供Spark性能调优的实践指导,包括配置参数调整、数据倾斜处理、内存管理策略等。 10. **Spark实际应用案例**:通过实际项目或案例分析,展示Spark在不同领域的应用,如推荐系统、日志分析...
这可能涉及到配置调整、数据倾斜处理、宽依赖优化、Shuffle减少等多个方面。 10. **Spark与大数据生态集成**:Spark可以与Hadoop、Cassandra、HBase等大数据组件集成,实现数据的读取、写入和处理,了解这些集成...
4. 数据倾斜:处理数据分布不均的问题,避免“热点”分区。 七、Spark部署模式 Spark可以运行在本地模式、独立集群模式、YARN、Mesos或Kubernetes等资源管理系统上。选择合适的部署模式取决于具体需求,如资源管理...
4. 数据倾斜:识别和处理数据分布不均的问题,避免单个task过载。 5. 缓存策略:智能缓存热点数据,减少重复计算,提升整体性能。 6. 代码优化:避免Shuffle操作,减少网络传输;使用宽依赖替代窄依赖,提高执行效率...
此外,可能还会涉及数据缓存策略、任务并行度、数据倾斜问题的解决等。 8. **案例研究**:书中可能会包含多个实际应用案例,展示如何在不同领域,如金融、社交媒体、电商等,利用Spark进行大数据分析和处理。 9. *...
6. 性能优化:为了最大化Spark的性能,需要关注数据倾斜、宽依赖优化、缓存策略、网络传输效率等因素。例如,通过分区调整避免数据不平衡,利用广播变量减少网络传输,合理设置executor数量和内存大小等。 7. 总结...
11. **性能调优**:包括配置参数调整(如executor内存、并行度设置)、数据倾斜处理、宽依赖优化等,以提高Spark应用的性能。 12. **案例研究**:笔记可能包含使用Spark处理实际问题的案例,如日志分析、推荐系统...
读者可能会从中了解到如何配置和优化Spark集群,以及如何处理数据倾斜等问题,这些对于在企业环境中部署和运行Spark至关重要。 《深入理解Spark 核心思想与源码分析》则可能针对希望深入了解Spark内部机制的读者。...
* 诊断和确认数据倾斜 * 增加 shuffle 分区数 * 使用随机前缀和扩展键 * 广播小表 * 分桶(Bucketing) * 自定义分区器 Spark 的 DataFrame API 与 RDD 相比的优点和局限性 DataFrame 和 RDD 是 Spark 中两种不同...
此外,业务层面的问题包括平台无法提供足够的指导来优化Executor配置和内存参数,以及发现并解决数据倾斜、HDFS提交阻塞等问题。 为了解决这些问题,平台需要建立自动化分析和故障诊断系统,比如“华佗”系统。它...
3. 数据倾斜:识别和处理数据不均匀分布问题,如使用自定义分区器。 4. SQL性能调优:优化查询计划,使用广播变量和Join优化等方法。 七、实战应用 通过案例分析,了解如何在实际项目中运用Spark解决大数据问题,如...
7. **性能优化**:Spark的性能优化是开发者关注的重点,可能涉及到调度策略、并行度设置、数据倾斜处理、网络通信优化等多个方面。 8. **Spark与Hadoop的集成**:Spark可以在Hadoop生态系统中无缝运行,资料可能...
12. **Spark性能调优**:涵盖参数调整、数据倾斜处理、任务并行度优化、网络传输优化等方面,以最大化系统性能。 13. **Spark的故障恢复与容错机制**:包括检查点和事件时间窗口,以及如何在失败后恢复作业。 通过...
1. **数据分区策略**:合理设置数据分区可以帮助优化计算性能,避免数据倾斜问题。 2. **内存管理**:理解Spark的存储层次和缓存策略,可以有效利用内存资源,减少磁盘I/O。 3. **错误恢复**:Spark提供容错机制,如...
在大数据处理中,数据倾斜可能造成部分节点处理的数据量过大,影响整体性能。解决这个问题的方法包括使用哈希分桶、自定义分区函数,或者使用Stable Hash Join来平衡负载。 以上就是关于SparkSQL命令提交接口的基本...
自适应执行引擎突破了这一限制,能够在运行时根据数据的分布和执行情况优化查询执行,例如自动选择合适的分区数、处理数据倾斜问题以及优化连接(Joins)操作。 分区是Spark中的一个重要概念,Spark SQL通过分区...