引子:在使用spark和hadoop的时候,遇到一些进程退出时的报错。因此顺便研究了一下jvm以及一些开源框架的关闭钩子的机制。这篇文章不涉及底层native实现,仅限Java层面
1.jvm关闭钩子
注册jvm关闭钩子通过Runtime.addShutdownHook(),实际调用ApplicationShutdownHooks.add()。后者维护了一个钩子集合IdentityHashMap<Thread, Thread> hooks
ApplicationShutdownHooks类初始化的时候,会注册一个线程到Shutdown类
static { try { Shutdown.add(1, false, new Runnable() { public void run() { runHooks(); } } ); hooks = new IdentityHashMap<>(); } catch (IllegalStateException e) { hooks = null; } }
Shutdown类里也维护了一个钩子集合
private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];
这个集合是分优先级的(优先级就是下标数值),自定义的钩子优先级默认是1,也就是最先执行。关闭钩子最终触发就是从这个集合进行
应用关闭时,以System.exit()为例,依次调用Runtime.exit()、Shutdow.exit()
Shutdown执行jvm退出逻辑,并维护了若干关闭状态
private static final int RUNNING = 0; // 初始状态,开始关闭 private static final int HOOKS = 1; // 运行钩子 private static final int FINALIZERS = 2; // 运行finalizer private static int state = RUNNING; static void exit(int status) { boolean runMoreFinalizers = false; synchronized (lock) { // 根据退出码status参数做不同处理 if (status != 0) runFinalizersOnExit = false; // 只有正常退出才会运行finalizer switch (state) { case RUNNING: // 执行钩子并修改状态 state = HOOKS; break; case HOOKS: // 执行钩子 break; case FINALIZERS: // 执行finalizer if (status != 0) { halt(status); // 如果是异常退出,直接退出进程。halt()底层是native实现,这时不会执行finalizer } else { // 正常退出则标记是否需要执行finalizer runMoreFinalizers = runFinalizersOnExit; } break; } } if (runMoreFinalizers) { // 如果有需要,就执行finalizer,注意只有state=FINALIZERS会走这个分支 runAllFinalizers(); halt(status); } synchronized (Shutdown.class) { // 这里执行state= HOOKS逻辑,包括执行钩子和finalizer sequence(); halt(status); } } private static void sequence() { synchronized (lock) { if (state != HOOKS) return; } runHooks(); // 执行钩子,这里会依次执行hooks数组里的各线程 boolean rfoe; // finalizer逻辑 synchronized (lock) { state = FINALIZERS; rfoe = runFinalizersOnExit; } if (rfoe) runAllFinalizers(); } private static void runHooks() { for (int i=0; i < MAX_SYSTEM_HOOKS; i++) { try { Runnable hook; synchronized (lock) { currentRunningHook = i; hook = hooks[i]; } // 由于之前注册了ApplicationShutdownHooks的钩子线程,这里又会回调ApplicationShutdownHooks.runHooks if (hook != null) hook.run(); } catch(Throwable t) { if (t instanceof ThreadDeath) { ThreadDeath td = (ThreadDeath)t; throw td; } } } } static void runHooks() { Collection<Thread> threads; synchronized(ApplicationShutdownHooks.class) { threads = hooks.keySet(); hooks = null; } // 注意ApplicationShutdownHooks里的钩子之间是没有优先级的,如果定义了多个钩子,那么这些钩子会并发执行 for (Thread hook : threads) { hook.start(); } for (Thread hook : threads) { try { hook.join(); } catch (InterruptedException x) { } } }
2.Spring关闭钩子
Spring在AbstractApplicationContext里维护了一个shutdownHook属性,用来关闭Spring上下文。但这个钩子不是默认生效的,需要手动调用ApplicationContext.registerShutdownHook()来开启
在自行维护ApplicationContext(而不是托管给tomcat之类的容器时),注意尽量使用ApplicationContext.registerShutdownHook()或者手动调用ApplicationContext.close()来关闭Spring上下文,否则应用退出时可能会残留资源
public void registerShutdownHook() { if (this.shutdownHook == null) { this.shutdownHook = new Thread() { @Override public void run() { // 这里会调用Spring的关闭逻辑,包括资源清理,bean的销毁等 doClose(); } }; // 这里会把spring的钩子注册到jvm关闭钩子 Runtime.getRuntime().addShutdownHook(this.shutdownHook); } }
3.Hadoop关闭钩子
Hadoop客户端初始化时,org.apache.hadoop.util.ShutdownHookManager会向Runtime注册一个钩子线程。ShutdownHookManager是一个单例类,并维护了一个钩子集合
private Set<HookEntry> hooks; static { Runtime.getRuntime().addShutdownHook( new Thread() { @Override public void run() { MGR.shutdownInProgress.set(true); // MGR是本类的单例 for (Runnable hook: MGR.getShutdownHooksInOrder()) { try { hook.run(); } catch (Throwable ex) { LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() + "' failed, " + ex.toString(), ex); } } } } ); }
这里HookEntry是hadoop封装的钩子类,HookEntry是带优先级的,一个priority属性。MGR.getShutdownHooksInOrder()方法会按priority依次(单线程)执行钩子
默认挂上的钩子就一个:org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer(priority=10),这个钩子用来清理hadoop FileSystem缓存以及销毁FileSystem实例。这个钩子是在第一次hadoop IO发生时(如FileSystem.get)lazy加载
此外调用FileContext.deleteOnExit()方法也会通过注册钩子
hadoop集群(非客户端)启动时,还会注册钩子清理临时路径
4.SparkContext关闭钩子
Spark也有关闭钩子管理类org.apache.spark.util.ShutdownHookManager,结构与hadoop的ShutdownHookManager基本类似
hadoop 2.x开始,spark的ShutdownHookManager会挂一个SparkShutdownHook钩子到hadoop的ShutdownHookManager(priority=40),用来实现SparkContext的清理逻辑。hadoop 1.x没有ShutdownHookManager,所以SparkShutdownHook直接挂在jvm上
def install(): Unit = { val hookTask = new Runnable() { // 执行钩子的回调进程,根据priority依次执行钩子 override def run(): Unit = runAll() } Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match { case Success(shmClass) => val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get(null).asInstanceOf[Int] val shm = shmClass.getMethod("get").invoke(null) shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]).invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) case Failure(_) => // hadoop 1.x Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); } }
顺便说一下,hadoop的FileSystem实例底层默认是复用的,所以如果执行了两次fileSystem.close(),第二次会报错FileSystem Already Closed异常(即使表面上是对两个实例执行的)
一个典型的场景是同时使用Spark和Hadoop-Api,Spark会创建FileSystem实例,Hadoop-Api也会创建,由于底层复用,两者其实是同一个。因为关闭钩子的存在,应用退出时会执行两次FileSystem.close(),导致报错。解决这个问题的办法是在hdfs-site.xml增加以下配置,关闭FileSystem实例复用
<property> <name>fs.hdfs.impl.disable.cache</name> <value>true</value> </property>
5.总结
以下为相关调用逻辑整理
紫红色箭头表示钩子注册,蓝色箭头表示钩子触发
蓝色、黄色、红色线框分别表示Spring、Hadoop、Spark相关代码
相关推荐
数据算法:Hadoop/Spark大数据处理技巧
本书《数据算法:Hadoop/Spark算法》深入探讨了这两个平台上的算法实现,对于理解大数据处理的内在机制以及优化数据处理流程具有极大的帮助。 Hadoop,由Apache基金会开发,是基于分布式文件系统HDFS(Hadoop ...
在大数据处理领域,Hadoop和Spark是两个关键的框架,它们提供了高效的数据处理能力。这份“数据算法--HadoopSpark大数据处理技巧”文档显然探讨了如何利用这两个工具进行复杂的数据操作,具体涉及到Scala编程实现的...
基于Hadoop/Spark奥运会奖牌变化大数据分析实现 开发技术:Hadoop + Spark + Hive + Sqoop + Flask + Mysql + Echart 针对采集到的奥运会大数据,本文选取了最新的云计算技术,基于Hadoop对奥运会奖牌数进行分析,...
《Spark 数据算法 Hadoop/Spark大数据处理技巧》是一本深入探讨大数据处理领域的专业书籍,它主要聚焦于如何利用Hadoop和Spark平台进行高效的数据分析和算法应用。这本书以中文的形式详细解析了各种数据算法,旨在...
中文版,一共3卷,第1卷。了解spark技术内幕 了解spark技术内幕
《数据算法:Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和...
大数据处理必看必会的书籍,本资源完美高清带书签,非常适合学习
在大数据处理领域,Hadoop和Spark是两个至关重要的框架,它们在处理海量数据时发挥着核心作用。Hadoop是Apache基金会开发的一个开源分布式计算框架,它允许在廉价硬件上存储和处理大规模数据集。而Spark则是在Hadoop...
开发技术:Hadoop + Spark + Hive + Sqoop + Flask + Mysql + Echart 针对采集到的奥运会大数据,本文选取了最新的云计算技术,基于Hadoop对奥运会奖牌数进行分析,在技术上的选取是非常有意义的,因为Hadoop的...
在大数据处理领域,Hadoop和Spark是两个至关重要的框架,它们为海量数据的存储、管理和分析提供了高效且可扩展的解决方案。本资源包含了基于这两个框架的数据算法和源代码,可以帮助我们深入理解并实践大数据处理...
开发技术:Hadoop + Spark + Hive + Sqoop + Flask + Mysql + Echart 针对采集到的奥运会大数据,本文选取了最新的云计算技术,基于Hadoop对奥运会奖牌数进行分析,在技术上的选取是非常有意义的,因为Hadoop的...
"基于Linux平台下的Hadoop和Spark集群搭建研究" 本文主要研究了基于Linux平台下的Hadoop和Spark集群搭建,旨在提高计算速率和数据性能。Hadoop是最流行的处理平台,主要解决了数据存储和分布式计算的问题,而Spark...
hadoop/etc/hadoop/6个文件 core-site.xml hadoop-env.sh hdfs-site.xml mapred-site.xml yarn-env.sh yarn-site.xml
《数据算法:Hadoop+Spark大数据处理技巧》是一本深入探讨大数据处理技术的专业书籍,主要聚焦于两大主流的大数据处理框架——Hadoop和Spark。这本书不仅涵盖了基础理论,还提供了丰富的实践指导,对于想要深入了解...
"藏经阁-A Container-based Sizing Framework for Apache Hadoop/Spark Clusters" 藏经阁是基于容器的大小调整框架,旨在优化 Apache Hadoop/Spark 集群的性能。下面是对该框架的详细解读: 容器化的 Hadoop/Spark...
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......
《数据算法 Hadoop Spark大数据处理技巧》这本书深入探讨了大数据处理的核心技术和工具,主要涵盖了Hadoop和Spark两个关键框架。大数据是当前信息技术领域的重要趋势,它涉及到如何从海量、多源、快速生成的数据中...