程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

緩存淘汰算法與 python 中 lru_cache 裝飾器的實現

編輯:Python

1. 引言

此前的文章中,我們介紹過常見兩種緩存架構 — 穿透型緩存與旁路型緩存。 常見緩存架構 — 穿透型緩存與旁路型緩存

穿透型緩存與旁路型緩存架構的主要區別在於當緩存中不存在被訪問數據時的處理方式。 但是,緩存的容量是有限的,隨著時間的發展,無論是使用穿透型緩存還是旁路型緩存的架構,最終都會面臨緩存被占滿的情況,那麼此時,為了緩存數據的實時性,我們需要淘汰一些舊的、訪問率低的數據來增加空閒空間以便緩存的繼續使用。 應該如何選擇被淘汰的數據呢?這就是緩存淘汰算法的目標。

2. 緩存淘汰算法

最常見的緩存淘汰算法有以下這些:

2.1. 先進先出 — FIFO

FIFO 即 First In First Out 的縮寫,這可以說是實現起來最簡單的一種算法了。 緩存維護一個隊列,總是在隊首插入數據,當緩存區滿時,則刪除隊尾數據。

這個算法的優點就在於實現簡單,但缺點也是顯而易見的:

  1. 順序寫入與讀出往往不符合我們的業務場景,雖然我們可以通過疊加其他數據結構讓隊列可以支持隨機訪問來規避這個缺點
  2. 先寫入的數據並不一定比後寫入的數據重要性低,這是該算法的主要問題

2.2. 最不經常使用算法 — LFU

LFU 是 least frequently used 的縮寫。 算法思想是淘汰那些被訪問次數最少的數據。 其實現方式也非常簡單,只需要維護一個隊列,存儲數據與被訪問次數的對應關系。 每次數據被訪問時,增加其對應的訪問次數值,並將該節點在鏈表中向隊首移動,直到整個隊列從對少到隊尾仍然保持按訪問次數遞減存儲。 當需要執行淘汰算法時,只要淘汰隊尾的部分數據即可。

這個算法相比於先進先出算法,他兼顧了數據的訪問次數,從而避免熱數據先於冷數據被淘汰。 但是,這個算法仍然存在一定的問題,那就是一旦某個數據在短時間被大量訪問,此後即便很長時間沒有任何訪問,該數據仍然憑借其巨大的訪問次數數值而不被淘汰。

2.3. 最近最少使用算法 — LRU

LRU 是 Least Recently Used 的縮寫。 這是目前實踐中最為廣泛使用的一種緩存淘汰算法,他的算法思想與 LFU 非常相似,但他去除了訪問次數數值,在隊列中采用訪問則上升的策略,從而規避了上面提到的訪問次數數值過大造成的影響。 由於該算法的廣泛使用性,我們下文將以 python 中十分常用的方法執行參數與結果的緩存 — functools.lru_cache,來詳細介紹一下該算法。

2.4. 最近最常使用算法 — MRU

與 LRU 相對,MRU 是 Most recently used 的縮寫。 這個算法與 LRU 恰好相反,MRU 算法優先移除最近被使用過的數據,它用來處理越久沒有被使用的數據越容易被訪問到的情況。

3. LRU 的實現 — python 標准庫 functools.lru_cache 裝飾器的實現

python 標准庫中的 functools.lru_cache 裝飾器實現了一個 LRU 算法的緩存,用來緩存方法所有參數與返回值的對應關系,用來提升一個方法頻繁用相同參數調用場景下的性能。 關於 python 的閉包與裝飾器,參考此前的文章: python 的閉包特性

python 中的裝飾器及其原理

3.1. 簡化後源碼

下面是抽取簡化後的 python 標准庫 functools.lru_cache 源碼:

def make_key(args, kwds):
"""
通過方法參數獲取緩存的 key
:param args: 方法原始參數
:param kwds: 方法鍵值對參數
:return: 計算後 key 的 hash 值
"""
key = args
if kwds:
key += object()
for item in kwds.items():
key += item
elif len(key) == 1 and type(key[0]) in {int, str, frozenset, type(None)}:
return key[0]
return hash(key)
def lru_cache(maxsize=128):
def decorating_function(user_function):
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
cache = {} # 緩存結構,存儲 key 與緩存數據的映射
hits = misses = 0 # 緩存命中、丟失統計
full = False # 緩沖區是否已滿標記
cache_len = cache.__len__ # 緩沖區大小
root = []
root[:] = [root, root, None, None] # 隊列頭結點為空
def lru_cache_wrapper(*args, **kwds):
nonlocal root, hits, misses, full
key = make_key(args, kwds)
with RLock(): # 線程安全鎖
link = cache.get(key)
""" 緩存命中,移動命中節點,直接返回預設結果 """
if link is not None:
""" 從鏈表中移除命中節點 """
link_prev, link_next, _key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
""" 將命中節點移動到隊尾 """
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits += 1
return result
""" 緩存未命中,調用方法獲取返回,創建節點,淘汰算法 """
result = user_function(*args, **kwds)
with RLock():
if key in cache:
pass
elif full:
""" 緩沖區已滿,淘汰隊首元素,刪除緩沖區對應元素 """
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
root = oldroot[NEXT]
oldkey = root[KEY]
root[KEY] = root[RESULT] = None
del cache[oldkey]
cache[key] = oldroot
else:
""" 緩沖區未滿,直接創建節點,插入數據 """
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
full = (cache_len() >= maxsize)
misses += 1
return result
return lru_cache_wrapper
return decorating_function

3.2. 算法流程

上述代碼的實現邏輯非常清晰,實現方式也非常巧妙。 他利用字典實現了一個緩沖區,同時創建了一個環形雙向鏈表,而鏈表中每個節點都是一個 list,list 中的四個元素分別代表前驅的引用、後繼引用、key、函數返回值。 通過緩沖區與環形雙向鏈表的同步操作完成 LRU 算法的實現。

  1. 【初始狀態】 初始狀態下,cache 字典為空,環形雙向鏈表中只有 key、result 均為 None 的 root 節點
  2. 【緩存命中】 當插入元素命中緩存,則在鏈表中移除該節點,並將該節點插入 root 之前,實現最近使用數據在鏈表中位置的提升
  3. 【緩存未命中且隊列未滿】 當插入元素未命中緩存,則創建該元素的節點,並直接在環形雙線鏈表的 root 之前插入節點,cache[key] 賦值為插入節點
  4. 【緩存未命中且隊列已滿】 此時需要觸發 LRU 緩存淘汰算法,此時將 root 的 key 與 result 分別賦值為待插入節點對應的值,向後移動 root,將 root 的 key、result 分別賦值為 None,從而實現 root 後相鄰節點的清除,cache[key] 賦值為插入節點,刪除 cache 中被移除節點

下圖展示了緩沖命中與緩存淘汰兩種場景下的算法執行過程:

4. 利用 lru_cache 優化方法執行

此前我們曾經提到,由於 python 沒有尾遞歸優化,遞歸執行算法效率是很低的。 在此前的文章中,針對這一情況,我們自行實現了簡易的尾遞歸優化。 經典動態規劃問題 — 青蛙上台階與 python 的遞歸優化

4.1. 斐波那契數列的遞歸生成

讓我們加上此前文章中的 clock 裝飾器,再次看看遞歸生成斐波那契數列的程序。

def clock(func):
@functools.wraps(func)
def clocked(*args, **kwargs):
t0 = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
print('[%0.8fs] %s(%s) -> %r ' % (elapsed, name, arg_str, result))
return result
return clocked
@lru_cache()
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 2) + fibonacci(n - 1)
if __name__ == '__main__':
print(fibonacci(6))

執行程序,打印出了:

[0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(3) -> 2 [0.00050116s] fibonacci(4) -> 3 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(3) -> 2 [0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(3) -> 2 [0.00000000s] fibonacci(4) -> 3 [0.00000000s] fibonacci(5) -> 5 [0.00050116s] fibonacci(6) -> 8 8

可以看到,有很多重復參數的調用在消耗著程序性能,隨著數列規模的增長,重復調用的次數將成倍增長。 一個有效的優化條件就是將這些重復調用的結果緩存起來,再次調用時直接返回即可,這正是 lru_cache 的用途。

4.2. 使用 lru_cache 優化斐波那契數列的生成

import functools
import time
from _thread import RLock
def make_key(args, kwds):
"""
通過方法參數獲取緩存的 key
:param args: 方法原始參數
:param kwds: 方法鍵值對參數
:return: 計算後 key 的 hash 值
"""
key = args
if kwds:
key += object()
for item in kwds.items():
key += item
elif len(key) == 1 and type(key[0]) in {int, str, frozenset, type(None)}:
return key[0]
return hash(key)
def lru_cache(maxsize=128):
def decorating_function(user_function):
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
cache = {} # 緩存結構,存儲 key 與緩存數據的映射
hits = misses = 0 # 緩存命中、丟失統計
full = False # 緩沖區是否已滿標記
cache_len = cache.__len__ # 緩沖區大小
root = []
root[:] = [root, root, None, None] # 隊列頭結點為空
def lru_cache_wrapper(*args, **kwds):
nonlocal root, hits, misses, full
key = make_key(args, kwds)
with RLock(): # 線程安全鎖
link = cache.get(key)
""" 緩存命中,移動命中節點,直接返回預設結果 """
if link is not None:
""" 從鏈表中移除命中節點 """
link_prev, link_next, _key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
""" 將命中節點移動到隊尾 """
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits += 1
return result
""" 緩存未命中,調用方法獲取返回,創建節點,淘汰算法 """
result = user_function(*args, **kwds)
with RLock():
if key in cache:
pass
elif full:
""" 緩沖區已滿,淘汰隊首元素,刪除緩沖區對應元素 """
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
root = oldroot[NEXT]
oldkey = root[KEY]
root[KEY] = root[RESULT] = None
del cache[oldkey]
cache[key] = oldroot
else:
""" 緩沖區未滿,直接創建節點,插入數據 """
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
full = (cache_len() >= maxsize)
misses += 1
return result
return lru_cache_wrapper
return decorating_function
def clock(func):
@functools.wraps(func)
def clocked(*args, **kwargs):
t0 = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
print('[%0.8fs] %s(%s) -> %r ' % (elapsed, name, arg_str, result))
return result
return clocked
@lru_cache()
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 2) + fibonacci(n - 1)
if __name__ == '__main__':
print(fibonacci(6))

打印出了:

[0.00000000s] fibonacci(0) -> 0 [0.00000000s] fibonacci(1) -> 1 [0.00000000s] fibonacci(2) -> 1 [0.00000000s] fibonacci(3) -> 2 [0.00000000s] fibonacci(4) -> 3 [0.00000000s] fibonacci(5) -> 5 [0.00000000s] fibonacci(6) -> 8 8

我們看到,方法從原來的 25 次遞歸調用變成了只有 7 次遞歸調用,執行時間上的優化效果也是相當明顯。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved