当前位置:首页 > 科技  > 软件

一文详解Spark内存模型原理,面试轻松搞定

来源: 责编: 时间:2024-03-26 09:37:03 142观看
导读1.引言 Spark 是一个基于内存处理的计算引擎,其中任务执行的所有计算都发生在内存中。因此,了解 Spark 内存管理非常重要。这将有助于我们开发 Spark 应用程序并执行性能调优。我们在使用spark-submit去提交spark任务的

1.引言 

Spark 是一个基于内存处理的计算引擎,其中任务执行的所有计算都发生在内存中。因此,了解 Spark 内存管理非常重要。这将有助于我们开发 Spark 应用程序并执行性能调优。我们在使用spark-submit去提交spark任务的时候可以使用--executor-memory和--driver-memory这两个参数去指定任务提交时的内存分配,如果提交时内存分配过大,会占用资源。如果内存分配太小,则很容易出现内存溢出和满GC问题。ecI28资讯网——每日最新资讯28at.com

Efficient memory use is critical for good performance, but the reverse is also true: inefficient memory use leads to bad performance.

Spark 的整体架构图如下:ecI28资讯网——每日最新资讯28at.com

图片图片ecI28资讯网——每日最新资讯28at.com

Spark 应用程序包括两个 JVM 进程:driver进程和executor进程。其中:ecI28资讯网——每日最新资讯28at.com

  • driver进程是主控制进程,负责创建 SparkSession/SparkContext、提交作业、将作业转换为任务以及协调执行器之间的任务执行。
  • executor进程主要负责执行特定的计算任务并将结果返回给驱动程序。driver的进程的内存管理相对简单,Spark并没有对此制定具体内存管理计划。

因此在这篇文章中,我们将会详细深入分析executor的内存管理。ecI28资讯网——每日最新资讯28at.com

2.Excutor内存模型

executor充当在工作节点上启动的 JVM 进程。因此,了解 JVM 内存管理非常重要。我们知道JVM 内存管理分为两种类型: ecI28资讯网——每日最新资讯28at.com

  • 堆内存管理(In-Heap Memory):对象在 JVM 堆上分配并由 GC 绑定。
  • 堆外内存管理(外部内存):对象通过序列化在JVM外部的内存中分配,由应用程序管理,不受GC约束。 

整体的JVM结构如下所示:ecI28资讯网——每日最新资讯28at.com

图片图片ecI28资讯网——每日最新资讯28at.com

通常,对象的读写速度为:on-heap > off-heap > diskecI28资讯网——每日最新资讯28at.com

2.1 内存管理 

Spark 内存管理分为两种类型:静态内存管理器(Static Memory Management,SMM),以及统一内存管理器(Unified Memory Management,UMM)。ecI28资讯网——每日最新资讯28at.com

图片图片ecI28资讯网——每日最新资讯28at.com

在Spark1.6.0之前只有一种内存管理方案,即Static Memory Management,但是从 Spark 1.6.0 开始,引入Unified Memory Manager 内存管理方案,并被设置为 Spark 的默认内存管理器,从代码中开始发现(以下代码是基于spark 2.4.8)。ecI28资讯网——每日最新资讯28at.com

// Determine whether to use the old memory management modeval useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager =if (useLegacyMemoryManager) {// The old version uses static memory managementnew StaticMemoryManager(conf, numUsableCores)} else {// The new version uses unified memory managementUnifiedMemoryManager(conf, numUsableCores)}

而在最新的Spark 3.x开始, Static Memory Management由于缺乏灵活性而已弃用,在源码中已经看到关于Static Memory Management的所有代码,自然也就看不到控制内存管理方案选择的spark.memory.useLegacyMode这个参数。ecI28资讯网——每日最新资讯28at.com

2.2 静态内存管理器(SMM)

虽然在spark 3.x版本开始SMM已经被淘汰了,但是目前很多企业使用的spark的版本还有很多是3.x之前的,因此我觉得为了整个学习的连贯性,还是有必要说一下的静态内存管理器 (SMM) 是用于内存管理的传统模型和简单方案,该方案实现上简单粗暴,将整个内存区间分成了:存储内存(storage memory,)、执行内存(execution memory)和其他内存(other memory)的大小在应用程序处理过程中是固定的,但用户可以在应用程序启动之前进行配置。这三部分内存的作用及占比如下:storage memory:主要用于缓存数据块以提高性能,同时也用于连续不断地广播或发送大的任务结果。通过spark.storage.memoryFraction进行配置,默认为0.6。ecI28资讯网——每日最新资讯28at.com

/** * Return the total amount of memory available for the storage region, in bytes. */private def getMaxStorageMemory(conf: SparkConf): Long = {  val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)  val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)  val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)  (systemMaxMemory * memoryFraction * safetyFraction).toLong}

其中又可以分成两部分:预留区域:这部分主要是为了防止OOM,大概占了存储区域中的10%,由参数spark.storage.safetyFraction控制;可用的存储区域:该区域主要是为了缓存RDD的数据和Broadcast数据,大概占了存储区域的90%。另外该区域中并不是所有的内存都用于以上作用,还单独拎出来一部分区域用于缓存iterator形式的block数据,我们称之为Unroll区域,由参数spark.storage.unrollFraction控制,大概占了可用的存储区域的20%,如下:ecI28资讯网——每日最新资讯28at.com

private val maxUnrollMemory: Long = {  (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong}

execution memory:在执行shuffle、join、sort和aggregation时,用于缓存中间数据。通过spark.shuffle.memoryFraction进行配置,默认为0.2。ecI28资讯网——每日最新资讯28at.com

private def getMaxExecutionMemory(conf: SparkConf): Long = {  val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)  if (systemMaxMemory < MIN_MEMORY_BYTES) {    throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +      s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +      s"option or spark.driver.memory in Spark configuration.")  }  if (conf.contains("spark.executor.memory")) {    val executorMemory = conf.getSizeAsBytes("spark.executor.memory")    if (executorMemory < MIN_MEMORY_BYTES) {      throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +        s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +        s"--executor-memory option or spark.executor.memory in Spark configuration.")    }  }  val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)  val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)  (systemMaxMemory * memoryFraction * safetyFraction).toLong}

从代码中我们可以看到,可执行内存也分成了两个部分:预留部分和可用部分,类似存储内存学习,这里不在赘述。other memory:除了以上两部分的内存,剩下的就是用于其他用作的内存,默认为0.2。这部分内存用于存储运行Spark系统本身需要加载的代码与元数据。因此,关于SMM的整体分配图如下ecI28资讯网——每日最新资讯28at.com

图片图片ecI28资讯网——每日最新资讯28at.com

基于此就会产生不可逾越的缺点:即使存储内存有可用空间,我们也无法使用它,并且由于执行程序内存已满,因此存在磁盘溢出。(反之亦然)。另外一个最大的问题就是:SMM只支持堆内内存(On-Heap),不支持对外内存(Off-Heap)ecI28资讯网——每日最新资讯28at.com

补充知识1:在Spark的存储体系中,数据的读写是以块(Block)为单位,也就是说Block是Spark存储的基本单位,这里的Block和Hdfs的Block是不一样的,HDFS中是对大文件进行分Block进行存储,Block大小是由dfs.blocksize决定的;而Spark中的Block是用户的操作单位,一个Block对应一块有组织的内存,一个完整的文件或文件的区间端,并没有固定每个Block大小的做法。每个块都有唯一的标识,Spark把这个标识抽象为BlockId。BlockId本质上是一个字符串,但是在Spark中将它保证为"一组"case类,这些类的不同本质是BlockID这个命名字符串的不同,从而可以通过BlockID这个字符串来区别BlockIdecI28资讯网——每日最新资讯28at.com

补充知识2:内存池是Spark内存的抽象,它记录了总内存大小,已使用内存大小,剩余内存大小,提供给MemoryManager进行分配/回收内存。它包括两个实现类:ExecutionMemoryPool和StorageMemoryPool,分别对应execution memory和storage memory。当需要新的内存时,spark通过memoryPool来判断内存是否充足。需要注意的是memoryPool以及子类方法只是用来标记内存使用情况,而不实际分配/回收内存。 ecI28资讯网——每日最新资讯28at.com

2.3 统一内存管理器(UMM)

从 Spark 1.6.0 开始,采用了新的内存管理器来取代静态内存管理器,并为 Spark 提供动态内存分配。它将内存区域分配为由存储和执行共享的统一内存容器。当未使用执行内存时,存储内存可以获取所有可用内存,反之亦然。如果任何存储或执行内存需要更多空间,则会调用acquireMemory方法将扩展其中一个内存池并收缩另一个内存池。因此,UMM相比SMM的内存管理优势明显:存储内存和执行内存之间的边界不是静态的,在内存压力的情况下,边界会移动,即一个区域会通过从另一个区域借用空间来增长。当应用程序没有缓存并且正在进行时,执行会使用所有内存以避免不必要的磁盘溢出。当应用程序有缓存时,它将保留最小存储内存,以便数据块不受影响。此内存管理可为各种工作负载提供合理的开箱即用性能,而无需用户了解内存内部划分方式的专业知识。ecI28资讯网——每日最新资讯28at.com

2.3.1 堆内存

默认情况下,Spark 仅使用堆内存。Spark 应用程序启动时,堆内存的大小由 --executor-memory 或 spark.executor.memory 参数配置。在UMM下,spark的堆内存结构图如下:ecI28资讯网——每日最新资讯28at.com

图片图片ecI28资讯网——每日最新资讯28at.com

我们发现大体上和SMM没有太大的区别,包括每个区域的功能,只是UMM在Storage和Execution可以弹性的变化(这一点也是spark rdd中“弹性”的体现之一)。ecI28资讯网——每日最新资讯28at.com

备注:在 Spark 1.6 中,spark.memory.fraction 值为 0.75,spark.memory.storageFraction 值为 0.5。从spark 2.x开始spark.memory.fraction 值为 0.6。ecI28资讯网——每日最新资讯28at.com

2.3.1.1 System Reserved:系统预留

预留内存是为系统预留的内存,用于存储Spark的内部对象。从 Spark 1.6 开始,该值为 300MB。这意味着 300MB 的 RAM 不参与 Spark 内存区域大小计算。预留内存的大小是硬编码的,如果不重新编译 Spark 或设置 spark.testing.reservedMemory,则无法以任何方式更改其大小,一般在实际的生产环境中不建议修改此值。ecI28资讯网——每日最新资讯28at.com

private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024val reservedMemory = conf.getLong("spark.testing.reservedMemory",  if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)val minSystemMemory = (reservedMemory * 1.5).ceil.toLongif (systemMemory < minSystemMemory) {  throw new IllegalArgumentException(s"System memory $systemMemory must " +    s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +    s"option or spark.driver.memory in Spark configuration.")}

从源码中我们可以看出,如果执行程序内存小于保留内存的 1.5 倍(1.5 * 保留内存 = 450MB),则 Spark 作业将失败,并显示以下异常消息:ecI28资讯网——每日最新资讯28at.com

24/03/20 13:55:51 ERROR repl.Main: Failed to initialize Spark session.java.lang.IllegalArgumentException: Executor memory 314572800 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.        at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:225)        at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)

2.3.1.2 其他内存(或称用户内存)

其他内存是用于存储用户定义的数据结构、Spark 内部元数据、用户创建的任何 UDF 以及 RDD 转换操作所需的数据(如 RDD 依赖信息等)的内存。例如,我们可以通过使用 mapPartitions 转换来重写 Spark 聚合,以维护一个哈希表以运行此聚合,这将消耗所谓的其他内存。此内存段不受 Spark 管理,计算公式为:(Java Heap - Reserved Memory) * (1.0 - spark.memory.fraction)。ecI28资讯网——每日最新资讯28at.com

2.3.1.3 Spark内存(或称统一内存)

Spark Memory 是由 Apache Spark 管理的内存池。Spark Memory 负责在执行任务(如联接)或存储广播变量时存储中间状态。计算公式为:(Java Heap - Reserved Memory) * spark.memory.fraction。ecI28资讯网——每日最新资讯28at.com

Spark 任务在两个主要内存区域中运行:ecI28资讯网——每日最新资讯28at.com

  • Executor Memory:用于随机播放、联接、排序和聚合。
  • Storage Memory:用于缓存数据分区。

它们之间的边界由 spark.memory.storageFraction 参数设置,默认为 0.5 或 50%。ecI28资讯网——每日最新资讯28at.com

1)StorageMemory: 存储内存ecI28资讯网——每日最新资讯28at.com

存储内存用于存储所有缓存数据、广播变量、unroll数据等,“unroll”本质上是反序列化序列化数据的过程。任何包含内存的持久性选项都会将该数据存储在此段中。Spark 通过删除基于最近最少使用 (LRU) 机制的旧缓存对象来为新缓存请求清除空间。缓存的数据从存储中取出后,将写入磁盘或根据配置重新计算。广播变量存储在缓存中,具有MEMORY_AND_DISK持久性级别。这就是我们存储缓存数据的地方,这些数据是长期存在的。ecI28资讯网——每日最新资讯28at.com

计算公式:ecI28资讯网——每日最新资讯28at.com

(Java Heap - Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction

2)Execution Memory:执行内存ecI28资讯网——每日最新资讯28at.com

执行内存用于存储 Spark 任务执行过程中所需的对象。例如,它用于将映射端的shuffle中间缓冲区存储在内存中。此外,它还用于存储hash聚合步骤的hash table。如果没有足够的可用内存,执行内存池还支持溢出磁盘,但是其他线程(任务)无法强制逐出此池中的block。执行内存往往比存储内存寿命更短。每次操作后都会立即将其逐出,为下一次操作腾出空间。ecI28资讯网——每日最新资讯28at.com

计算公式:ecI28资讯网——每日最新资讯28at.com

(Java Heap - Reserved Memory) * spark.memory.fraction * (1.0 -  spark.memory.storageFraction)

由于执行内存的性质,无法从此池中强制逐出块;否则,执行将中断,因为找不到它引用的块。但是,当涉及到存储内存时,可以根据需要从内存中逐出block并写入磁盘或重新计算(如果持久性级别为MEMORY_ONLY)。ecI28资讯网——每日最新资讯28at.com

存储和执行池借用规则:ecI28资讯网——每日最新资讯28at.com

  • 只有当执行内存中有未使用的块时,存储内存才能从执行内存中借用空间。
  • 如果块未在存储内存中使用,则执行内存也可以从存储内存中借用空间。
  • 如果存储内存使用执行内存中的块,并且执行需要更多内存,则可以强制逐出存储内存占用的多余块
  • 如果存储内存中的块被执行内存使用,而存储需要更多的内存,则无法强行逐出执行内存占用的多余块;它将具有更少的内存区域。它将等到 Spark 释放存储在执行内存中的多余块,然后占用它们。

案例:计算 5 GB 执行程序内存的内存ecI28资讯网——每日最新资讯28at.com

为了计算预留内存、用户内存、spark内存、存储内存和执行内存,我们将使用以下参数:ecI28资讯网——每日最新资讯28at.com

spark.executor.memory=5gspark.memory.fractinotallow=0.6spark.memory.storageFractinotallow=0.5

那么会得到如下结论:ecI28资讯网——每日最新资讯28at.com

Java Heap Memory = 5 GB                               = 5 * 1024 MB                               = 5120 MBReserved Memory      = 300 MBUsable Memory        = (Java Heap Memory - Reserved Memory)                                 = 5120 MB - 300 MB                                 = 4820 MBUser Memory            = Usable Memory * (1.0 * spark.memory.fraction)                                  = 4820 MB * (1.0 - 0.6)                                  = 4820 MB * 0.4                                  = 1928 MBSpark Memory           = Usable Memory * spark.memory.fraction                                  = 4820 MB * 0.6                                   = 2892 MBSpark Storage Memory   = Spark Memory * Spark.memory.storageFraction                                       = 2892 MB * 0.5                                        = 1446 MBSpark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction)                       = 2892 MB * ( 1 - 0.5)                        = 2892 MB * 0.5                        = 1446 MB

2.3.2 堆外内存

堆外内存是指将内存对象(序列化为字节数组)分配给 JVM堆之外的内存,该堆由操作系统(而不是JVM)直接管理,但存储在进程堆之外的本机内存中(因此,它们不会被垃圾回收器处理)。这样做的结果是保留较小的堆,以减少垃圾回收对应用程序的影响。访问此数据比访问堆存储稍慢,但仍比从磁盘读取/写入快。缺点是用户必须手动处理管理分配的内存。此模型不适用于 JVM 内存,而是将 malloc()  中不安全相关语言(如 C)的 Java API 直接调用操作系统以获取内存。由于此方法不是对 JVM 内存进行管理,因此请避免频繁 GC。此应用程序的缺点是内存必须写入自己的逻辑和内存应用程序版本。Spark 1.6+ 开始引入堆外内存,可以选择使用堆外内存来分配 Unified Memory Manager。ecI28资讯网——每日最新资讯28at.com

默认情况下,堆外内存是禁用的,但我们可以通过 spark.memory.offHeap.enabled(默认为 false)参数启用它,并通过 spark.memory.offHeap.size(默认为 0)参数设置内存大小。如:ecI28资讯网——每日最新资讯28at.com

spark-shell /    --conf spark.memory.offHeap.enabled=true /    --conf spark.memory.offHeap.size=5g

堆外内存支持OFF_HEAP持久性级别。与堆上内存相比,堆外内存的模型相对简单,仅包括存储内存和执行内存。ecI28资讯网——每日最新资讯28at.com

如果启用了堆外内存,Executor 中的 Execution Memory 是堆内的 Execution 内存和堆外的 Execution 内存之和。存储内存也是如此。ecI28资讯网——每日最新资讯28at.com

总之,Spark内存管理的核心目标是在有限的内存资源下,实现数据缓存的最大化利用和执行计算的高效进行,同时尽量减少由于内存不足导致的数据重算或内存溢出等问题,是整个spark允许可以稳定运行的基础保障。ecI28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-79305-0.html一文详解Spark内存模型原理,面试轻松搞定

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 记一次 .NET某半导体CIM系统崩溃分析

下一篇: 一文带你完整了解Go语言IO基础库

标签:
  • 热门焦点
Top