Spark的内存淘汰策略是LRU还是FIFO?

Spark的内存淘汰策略(eviction strategy)是在Executor存储内存空间不足时置换已经被占用空间的策略。一些资料中提到Spark使用的是LRU,即当存储内存不足时,会用最近最少使用策略对内存中存储的block进行淘汰。(相关文章可以参考Spark MisconceptionsSPARK-14289

但是在阅读源码后发现,相关的两个类MemoryStoreBlockManager中并没有LRU的逻辑:

1.首先可以看到,在MemoryStore中使用一个LinkedHashMap类型的entries变量来记录所有存储的block的信息

// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
    
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

2.在通过MemoryStore获取一个block的时候,如在getValue这个方法中,并没有将被访问的block放入LinkedHashMap的队首。但是我认为LRU最简单的实现方式应该是将被访问过的元素移动到队首。

def getValues(blockId: BlockId): Option[Iterator[_]] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
        case null => None
        case e: SerializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
        case DeserializedMemoryEntry(values, _, _) =>
        val x = Some(values)
        x.map(_.iterator)
    }
}

3.在淘汰内存中的block去释放空间时,我们可以看到这里其实就是去entries中顺序遍历block进行释放,并没有选择最近最少使用的block,所以这里应该可以认为是先进先出策略(FIFO)

private[spark] def evictBlocksToFreeSpace(
     blockId: Option[BlockId],
     space: Long,
     memoryMode: MemoryMode): Long = {
   assert(space > 0)
   memoryManager.synchronized {
     var freedMemory = 0L
     val rddToAdd = blockId.flatMap(getRddId)
     val selectedBlocks = new ArrayBuffer[BlockId]
     def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
       entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
     }
     // This is synchronized to ensure that the set of entries is not changed
     // (because of getValue or getBytes) while traversing the iterator, as that
     // can lead to exceptions.
     entries.synchronized {
       val iterator = entries.entrySet().iterator()
       while (freedMemory < space && iterator.hasNext) {
         val pair = iterator.next()
         val blockId = pair.getKey
         val entry = pair.getValue
         if (blockIsEvictable(blockId, entry)) {
           // We don't want to evict blocks which are currently being read, so we need to obtain
           // an exclusive write lock on blocks which are candidates for eviction. We perform a
           // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
           if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
             selectedBlocks += blockId
             freedMemory += pair.getValue.size
           }
         }
       }
     }
     ...
     if (freedMemory >= space) {
       logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
         s"(${Utils.bytesToString(freedMemory)} bytes)")
       for (blockId <- selectedBlocks) {
         val entry = entries.synchronized { entries.get(blockId) }
         // This should never be null as only one task should be dropping
         // blocks and removing entries. However the check is still here for
         // future safety.
         if (entry != null) {
           dropBlock(blockId, entry)
         }
       }
      ...
     }
   }
 }

那么,难道官方的解释有问题?Spark的内存淘汰策略到底是LRU还是FIFO?
另外,我在stackoverflow上也提了这个问题,有兴趣者也可以去上面回答
What's the the current eviction strategy of Spark? FIFO or LRU?

阅读 3.8k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题