Source code for lfu_cache.lfu_cache

from typing import Dict, Any, Optional, cast, Union
from .dllist import DLList, Element


class _NodeItem(object):
    """Object kept in frequency list node list"""

    def __init__(self, key: Any, value: Any, freqitem: '_FrequencyItem'):
        #: key set by by user for cache item
        self.key: Any = key
        #: value for key set by user for cache item
        self.value: Any = value
        #: FrequencyItem to which this node belongs
        self.freqitem: '_FrequencyItem' = freqitem


class _FrequencyItem(object):
    """Object kept as value inside LFUCache's frequency list. This is
    the type that is stored in each DLList Element"""

    def __init__(self, frequency: int):
        #: Frequency of this item
        self.frequency: int = frequency
        #: Linked list of nodelist. Items for this list will be NodeItem
        self.nodelist: DLList = DLList()
        #: Parent element to which this item belongs
        #: we use this to get the next frequency item
        self.element: Optional[Element] = None


[docs]class LFUCache(object): """Object implents an O(1) algorithm for implementing the LFU cache eviction scheme. This is based on paper at: http://dhruvbird.com/lfu.pdf See: https://en.wikipedia.org/wiki/Least_frequently_used""" def __init__(self, limit: int): #: Limit of cache before an element is evicted self.limit: int = limit #: We use a dict to store the data. Value is element from the #: corresponding frequency list item self.cache: Dict[Any, Element] = {} #: Frequency list self.freqlist: DLList = DLList() if self.limit < 0: self.limit = 0 def _movetonextfrequency(self, element: Element) -> Element: """Element is the linked list wrapper of Node item element.value is the actual NodeItem whose value has been updated to latest. Now we need it to move to next frequency item.""" nodeitem = cast(_NodeItem, element.value) assert nodeitem.freqitem.nodelist == element.dllist # Get the next element in the frequency list # nodeitem has reference to the frequency item it belongs to # from there we get its list element and get the next frequency currentfrequency = nodeitem.freqitem.frequency assert nodeitem.freqitem.element nodefreqitemelement = nodeitem.freqitem.element nextfrequencyitem = nodefreqitemelement.nextelement() # next item has to be present and its frequency is more then current # to be used. otherwise we just add an item after that. # Cases: # 1 -> 3 # 4 -> None # in the first case 3 is present after 1 but we need it to be 2 if ( nextfrequencyitem and cast( _FrequencyItem, nextfrequencyitem.value).frequency == currentfrequency + 1 ): nextfrequency = cast(_FrequencyItem, nextfrequencyitem.value) else: nextfrequency = _FrequencyItem(currentfrequency + 1) newfrequencyitemelement = self.freqlist.insertafter( nextfrequency, nodefreqitemelement) nextfrequency.element = newfrequencyitemelement nodeitem.freqitem.nodelist.remove(element) # we also remove this freqitem if its empty if not len(nodeitem.freqitem.nodelist): self.freqlist.remove(nodeitem.freqitem.element) newelement = nextfrequency.nodelist.append( _NodeItem(nodeitem.key, nodeitem.value, nextfrequency)) return newelement def _evictlfu(self): """Removes the first element from frequency list and cache""" first = self.freqlist.front() if first: frequencyitem = cast(_FrequencyItem, first.value) element = frequencyitem.nodelist.front() nodeitem = cast(_NodeItem, element.value) frequencyitem.nodelist.remove(element) del self.cache[nodeitem.key]
[docs] def put(self, key: Any, value: Any): """ Puts an object to the cache """ # We dont even attempt zero limit cache if not self.limit: return nodeitemelement = self.cache.get(key) firstfreq = self.freqlist.front() # Evict if key was not found and we have reached element # otherwise we will remove existing one which we dont want even if we # have reached the length if nodeitemelement is None and len(self.cache) == self.limit: self._evictlfu() if nodeitemelement: # we update the value before it can be moved to next element cast(_NodeItem, nodeitemelement.value).value = value newelement = self._movetonextfrequency(nodeitemelement) elif firstfreq and cast(_FrequencyItem, firstfreq.value).frequency == 0: # If the first node is 0 f = cast(_FrequencyItem, firstfreq.value) newnodeitem = _NodeItem(key, value, f) newelement = f.nodelist.append(newnodeitem) else: # We need a freqitem with 0 and add the element to it zerofreqitem = _FrequencyItem(0) freqitemelement = self.freqlist.appendleft(zerofreqitem) zerofreqitem.element = freqitemelement newnodeitem = _NodeItem(key, value, zerofreqitem) newelement = zerofreqitem.nodelist.append(newnodeitem) self.cache[key] = newelement
[docs] def get(self, key) -> Union[KeyError, Any]: """Gets value by key. Raises KeyError, if not found""" element = self.cache[key] # this will just return KeyError if not ther nodeitem = cast(_NodeItem, element.value) value = nodeitem.value newelement = self._movetonextfrequency(element) self.cache[key] = newelement return value