Skip to content

Public API

High level cache API

atomic_lru.Cache dataclass

Bases: Storage[bytes]

High-level cache with automatic serialization and deserialization.

This class extends Storage[bytes] to provide a convenient API for caching arbitrary Python objects. Values are automatically serialized to bytes when stored and deserialized when retrieved. By default, pickle is used for serialization, but custom serializers can be provided.

The cache inherits all features from Storage, including: - Thread-safe operations - LRU eviction when limits are reached - Optional TTL expiration - Size and item count limits

Parameters:

Name Type Description Default
serializer Serializer

Callable that serializes values to bytes. Defaults to pickle-based serialization.

(lambda: _default_serializer)()
deserializer Deserializer

Callable that deserializes bytes back to values. Defaults to pickle-based deserialization. Must be the inverse of the serializer.

(lambda: _default_deserializer)()
size_limit_in_bytes int | None

Optional maximum size limit in bytes. When set, items are evicted (LRU) to stay under this limit. Must be >= 4096 if specified.

None
max_items int | None

Optional maximum number of items to store. When exceeded, the least recently used items are evicted.

None
default_ttl float | None

Optional default time-to-live in seconds for stored values.

None
expiration_thread_delay float

Delay in seconds between expiration check iterations. Defaults to 10.0 seconds.

10.0
expiration_thread_max_checks_per_iteration int

Maximum number of items to check for expiration in each iteration. Defaults to 10,000.

10000
expiration_thread_log bool

Whether to enable debug logging for expiration operations. Defaults to False.

False
expiration_disabled bool

If True, disables TTL expiration entirely. Defaults to False.

False
Example
from atomic_lru import Cache, CACHE_MISS
cache = Cache(max_items=100, default_ttl=3600)  # 1 hour TTL
# Store any Python object
cache.set("user:123", {"name": "Alice", "age": 30})
# Retrieve it
user = cache.get("user:123")
if user is not CACHE_MISS:
    print(user["name"])  # "Alice"
cache.close()
Note

The cache stores serialized bytes internally, so size_limit_in_bytes works correctly with this class. Custom serializers should ensure they produce bytes that can be accurately measured for size calculations.

Source code in atomic_lru/_cache.py
@dataclass
class Cache(Storage[bytes]):
    """High-level cache with automatic serialization and deserialization.

    This class extends `Storage[bytes]` to provide a convenient API for caching
    arbitrary Python objects. Values are automatically serialized to bytes when
    stored and deserialized when retrieved. By default, pickle is used for
    serialization, but custom serializers can be provided.

    The cache inherits all features from `Storage`, including:
    - Thread-safe operations
    - LRU eviction when limits are reached
    - Optional TTL expiration
    - Size and item count limits

    Args:
        serializer: Callable that serializes values to bytes. Defaults to
            pickle-based serialization.
        deserializer: Callable that deserializes bytes back to values. Defaults
            to pickle-based deserialization. Must be the inverse of the serializer.
        size_limit_in_bytes: Optional maximum size limit in bytes. When set, items
            are evicted (LRU) to stay under this limit. Must be >= 4096 if specified.
        max_items: Optional maximum number of items to store. When exceeded, the
            least recently used items are evicted.
        default_ttl: Optional default time-to-live in seconds for stored values.
        expiration_thread_delay: Delay in seconds between expiration check iterations.
            Defaults to 10.0 seconds.
        expiration_thread_max_checks_per_iteration: Maximum number of items to check
            for expiration in each iteration. Defaults to 10,000.
        expiration_thread_log: Whether to enable debug logging for expiration
            operations. Defaults to False.
        expiration_disabled: If True, disables TTL expiration entirely. Defaults to False.

    Example:
        ```python
        from atomic_lru import Cache, CACHE_MISS
        cache = Cache(max_items=100, default_ttl=3600)  # 1 hour TTL
        # Store any Python object
        cache.set("user:123", {"name": "Alice", "age": 30})
        # Retrieve it
        user = cache.get("user:123")
        if user is not CACHE_MISS:
            print(user["name"])  # "Alice"
        cache.close()
        ```

    Note:
        The cache stores serialized bytes internally, so `size_limit_in_bytes` works
        correctly with this class. Custom serializers should ensure they produce
        bytes that can be accurately measured for size calculations.
    """

    serializer: Serializer = field(default_factory=lambda: _default_serializer)
    deserializer: Deserializer = field(default_factory=lambda: _default_deserializer)

    def _serialize(self, value: Any) -> bytes:
        """Serialize a value to bytes using the configured serializer.

        Args:
            value: The value to serialize.

        Returns:
            The serialized bytes representation.

        Raises:
            ValueError: If serialization fails.
        """
        try:
            return self.serializer(value)
        except Exception as e:
            raise ValueError(
                f"Failed to serialize value of type: {type(value).__name__}"
            ) from e

    def _deserialize(self, value: bytes) -> Any:
        """Deserialize bytes to a value using the configured deserializer.

        Args:
            value: The bytes to deserialize.

        Returns:
            The deserialized value.

        Raises:
            ValueError: If deserialization fails.
        """
        try:
            return self.deserializer(value)
        except Exception as e:
            raise ValueError("Failed to deserialize value") from e

    def set(
        self, key: str, value: Any, ttl: float | DefaultTTLSentinel | None = DEFAULT_TTL
    ) -> None:
        """Store a value in the cache with automatic serialization.

        Serializes the value to bytes and stores it in the underlying storage.
        The value can be any Python object that can be serialized by the configured
        serializer.

        Args:
            key: The key to store the value under. Must be a string.
            value: The value to store. Can be any Python object serializable by
                the configured serializer.
            ttl: Time-to-live in seconds. Use `DEFAULT_TTL` to use the instance's
                `default_ttl` value, `None` to disable expiration for this item,
                or a float to set a specific TTL.

        Raises:
            RuntimeError: If the cache has been closed.
            ValueError: If serialization fails.

        Note:
            The value is serialized before size checks are performed, so the
            serialized size is what counts toward size limits. If the serialized
            value exceeds half the `size_limit_in_bytes`, it is silently dropped
            without being stored.
        """
        serialized_value = self._serialize(value)
        super().set(key=key, value=serialized_value, ttl=ttl)

    def get(self, key: str) -> Any | CacheMissSentinel:
        """Retrieve a value from the cache with automatic deserialization.

        Retrieves the value associated with the given key, deserializing it from
        bytes back to the original Python object. If the key doesn't exist or the
        value has expired, `CACHE_MISS` is returned.

        Args:
            key: The key to look up.

        Returns:
            The deserialized value if found and not expired, or `CACHE_MISS` if
                the key doesn't exist or the value has expired.

        Raises:
            ValueError: If deserialization fails (e.g., corrupted data).

        Note:
            Expired items are automatically deleted when accessed. This method
            does not raise exceptions for missing keys - use `CACHE_MISS` to
            check for cache misses.
        """
        serialized_value = super().get(key=key)
        if serialized_value is CACHE_MISS:
            return CACHE_MISS
        assert not isinstance(serialized_value, CacheMissSentinel)
        return self._deserialize(serialized_value)

number_of_items property

Get the current number of items stored.

Returns:

Type Description
int

The number of key-value pairs currently stored in the cache.

Note

This property can be accessed even after the storage is closed.

size_in_bytes property

Get the approximate current size of the storage in bytes.

Returns:

Type Description
int

The approximate size in bytes. This includes the size of stored values and overhead for the storage structure. The calculation is approximate and only accurate when storing bytes values.

Note

This property can be accessed even after the storage is closed.

clear

Clear all items from the cache.

Removes all key-value pairs from the cache and resets the size tracking to the initial empty state. This operation is thread-safe.

Raises:

Type Description
RuntimeError

If the storage has been closed.

Source code in atomic_lru/_storage/storage.py
def clear(self) -> None:
    """Clear all items from the cache.

    Removes all key-value pairs from the cache and resets the size tracking
    to the initial empty state. This operation is thread-safe.

    Raises:
        RuntimeError: If the storage has been closed.
    """
    with self.__lock:
        self._assert_not_closed()
        self._data.clear()
        self._size_in_bytes = sys.getsizeof(self._data)

close

Close the storage and stop the expiration thread.

Marks the storage as closed and stops the background expiration thread if it was started. After closing, all operations except size_in_bytes, number_of_items, and get() will raise a RuntimeError.

Parameters:

Name Type Description Default
wait bool

If True, blocks until the expiration thread has fully stopped. If False, returns immediately after signaling the thread to stop. Defaults to False.

False
Note

This method is idempotent - calling it multiple times or from multiple threads has no effect beyond the first call. It's recommended to call this method when you're done with the storage to ensure proper cleanup of background threads.

Source code in atomic_lru/_storage/storage.py
def close(self, wait: bool = False) -> None:
    """Close the storage and stop the expiration thread.

    Marks the storage as closed and stops the background expiration thread
    if it was started. After closing, all operations except `size_in_bytes`,
    `number_of_items`, and `get()` will raise a `RuntimeError`.

    Args:
        wait: If True, blocks until the expiration thread has fully stopped.
            If False, returns immediately after signaling the thread to stop.
            Defaults to False.

    Note:
        This method is idempotent - calling it multiple times
        or from multiple threads has no effect beyond the first call.
        It's recommended to call this method when you're done with the storage
        to ensure proper cleanup of background threads.
    """
    with self.__lock:
        if self.__closed:
            return
        self.__closed = True
    # Stop thread outside the lock to avoid potential deadlock
    # (thread's _clean_expired also acquires the lock)
    if not self.expiration_disabled:
        assert self.__expiration_thread is not None
        self.__expiration_thread.stop(wait=wait)

delete

Delete a key-value pair from the cache.

Removes the specified key and its associated value from the cache if it exists. The size tracking is updated accordingly.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Returns:

Type Description
bool

True if the key existed and was deleted, False if the key didn't exist.

Raises:

Type Description
RuntimeError

If the storage has been closed.

Source code in atomic_lru/_storage/storage.py
def delete(self, key: str) -> bool:
    """Delete a key-value pair from the cache.

    Removes the specified key and its associated value from the cache if it
    exists. The size tracking is updated accordingly.

    Args:
        key: The key to delete.

    Returns:
        True if the key existed and was deleted, False if the key didn't exist.

    Raises:
        RuntimeError: If the storage has been closed.
    """
    with self.__lock:
        self._assert_not_closed()
        if key not in self._data:
            return False
        self.__delete(key)
        return True

get

Retrieve a value from the cache with automatic deserialization.

Retrieves the value associated with the given key, deserializing it from bytes back to the original Python object. If the key doesn't exist or the value has expired, CACHE_MISS is returned.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Returns:

Type Description
Any | CacheMissSentinel

The deserialized value if found and not expired, or CACHE_MISS if the key doesn't exist or the value has expired.

Raises:

Type Description
ValueError

If deserialization fails (e.g., corrupted data).

Note

Expired items are automatically deleted when accessed. This method does not raise exceptions for missing keys - use CACHE_MISS to check for cache misses.

Source code in atomic_lru/_cache.py
def get(self, key: str) -> Any | CacheMissSentinel:
    """Retrieve a value from the cache with automatic deserialization.

    Retrieves the value associated with the given key, deserializing it from
    bytes back to the original Python object. If the key doesn't exist or the
    value has expired, `CACHE_MISS` is returned.

    Args:
        key: The key to look up.

    Returns:
        The deserialized value if found and not expired, or `CACHE_MISS` if
            the key doesn't exist or the value has expired.

    Raises:
        ValueError: If deserialization fails (e.g., corrupted data).

    Note:
        Expired items are automatically deleted when accessed. This method
        does not raise exceptions for missing keys - use `CACHE_MISS` to
        check for cache misses.
    """
    serialized_value = super().get(key=key)
    if serialized_value is CACHE_MISS:
        return CACHE_MISS
    assert not isinstance(serialized_value, CacheMissSentinel)
    return self._deserialize(serialized_value)

set

Store a value in the cache with automatic serialization.

Serializes the value to bytes and stores it in the underlying storage. The value can be any Python object that can be serialized by the configured serializer.

Parameters:

Name Type Description Default
key str

The key to store the value under. Must be a string.

required
value Any

The value to store. Can be any Python object serializable by the configured serializer.

required
ttl float | DefaultTTLSentinel | None

Time-to-live in seconds. Use DEFAULT_TTL to use the instance's default_ttl value, None to disable expiration for this item, or a float to set a specific TTL.

DEFAULT_TTL

Raises:

Type Description
RuntimeError

If the cache has been closed.

ValueError

If serialization fails.

Note

The value is serialized before size checks are performed, so the serialized size is what counts toward size limits. If the serialized value exceeds half the size_limit_in_bytes, it is silently dropped without being stored.

Source code in atomic_lru/_cache.py
def set(
    self, key: str, value: Any, ttl: float | DefaultTTLSentinel | None = DEFAULT_TTL
) -> None:
    """Store a value in the cache with automatic serialization.

    Serializes the value to bytes and stores it in the underlying storage.
    The value can be any Python object that can be serialized by the configured
    serializer.

    Args:
        key: The key to store the value under. Must be a string.
        value: The value to store. Can be any Python object serializable by
            the configured serializer.
        ttl: Time-to-live in seconds. Use `DEFAULT_TTL` to use the instance's
            `default_ttl` value, `None` to disable expiration for this item,
            or a float to set a specific TTL.

    Raises:
        RuntimeError: If the cache has been closed.
        ValueError: If serialization fails.

    Note:
        The value is serialized before size checks are performed, so the
        serialized size is what counts toward size limits. If the serialized
        value exceeds half the `size_limit_in_bytes`, it is silently dropped
        without being stored.
    """
    serialized_value = self._serialize(value)
    super().set(key=key, value=serialized_value, ttl=ttl)

Low level cache API

atomic_lru.Storage dataclass

Bases: Generic[T]

Thread-safe in-memory storage with LRU eviction and optional TTL expiration.

This class provides a thread-safe storage mechanism that automatically evicts the least recently used (LRU) items when size or item count limits are reached. It supports optional time-to-live (TTL) expiration for stored values, with a background thread that periodically cleans expired entries.

The storage uses an OrderedDict internally to maintain insertion order and efficiently move items to the end when accessed (LRU behavior). All operations are protected by a lock to ensure thread safety.

Parameters:

Name Type Description Default
size_limit_in_bytes int | None

Optional maximum size limit in bytes. When set, items are evicted (LRU) to stay under this limit. Must be >= 4096 if specified. Only works correctly when storing bytes values. If None, no size limit is enforced.

None
max_items int | None

Optional maximum number of items to store. When exceeded, the least recently used items are evicted. If None, no item count limit is enforced.

None
default_ttl float | None

Optional default time-to-live in seconds for stored values. Can be overridden per item when calling set(). If None, values don't expire by default. Use DEFAULT_TTL sentinel to use this default when calling set().

None
expiration_thread_delay float

Delay in seconds between expiration check iterations in the background thread. Defaults to 10.0 seconds.

10.0
expiration_thread_max_checks_per_iteration int

Maximum number of items to check for expiration in each iteration. This limits the work done per iteration to avoid blocking. Defaults to 10,000.

10000
expiration_thread_log bool

Whether to enable debug logging for expiration operations. Defaults to False.

False
expiration_disabled bool

If True, disables TTL expiration entirely. No background thread is created, and all TTL values are ignored. Defaults to False.

False
Example
from atomic_lru import Storage
# Create storage with size limit
storage = Storage[bytes](size_limit_in_bytes=1024 * 1024)  # 1MB
storage.set("key1", b"value1")
value = storage.get("key1")
if value is not CACHE_MISS:
    print(value)  # b'value1'
storage.close()
Note

When using size_limit_in_bytes, values must be of type bytes. The size calculation is approximate and includes overhead for the storage structure. Items larger than half the size limit are rejected to prevent storage from being dominated by a single item.

Source code in atomic_lru/_storage/storage.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
@dataclass
class Storage(Generic[T]):
    """Thread-safe in-memory storage with LRU eviction and optional TTL expiration.

    This class provides a thread-safe storage mechanism that automatically evicts
    the least recently used (LRU) items when size or item count limits are reached.
    It supports optional time-to-live (TTL) expiration for stored values, with a
    background thread that periodically cleans expired entries.

    The storage uses an OrderedDict internally to maintain insertion order and
    efficiently move items to the end when accessed (LRU behavior). All operations
    are protected by a lock to ensure thread safety.

    Args:
        size_limit_in_bytes: Optional maximum size limit in bytes. When set, items
            are evicted (LRU) to stay under this limit. Must be >= 4096 if specified.
            Only works correctly when storing `bytes` values. If None, no size limit
            is enforced.
        max_items: Optional maximum number of items to store. When exceeded, the
            least recently used items are evicted. If None, no item count limit is
            enforced.
        default_ttl: Optional default time-to-live in seconds for stored values.
            Can be overridden per item when calling `set()`. If None, values don't
            expire by default. Use `DEFAULT_TTL` sentinel to use this default when
            calling `set()`.
        expiration_thread_delay: Delay in seconds between expiration check iterations
            in the background thread. Defaults to 10.0 seconds.
        expiration_thread_max_checks_per_iteration: Maximum number of items to check
            for expiration in each iteration. This limits the work done per iteration
            to avoid blocking. Defaults to 10,000.
        expiration_thread_log: Whether to enable debug logging for expiration
            operations. Defaults to False.
        expiration_disabled: If True, disables TTL expiration entirely. No background
            thread is created, and all TTL values are ignored. Defaults to False.

    Example:
        ```python
        from atomic_lru import Storage
        # Create storage with size limit
        storage = Storage[bytes](size_limit_in_bytes=1024 * 1024)  # 1MB
        storage.set("key1", b"value1")
        value = storage.get("key1")
        if value is not CACHE_MISS:
            print(value)  # b'value1'
        storage.close()
        ```

    Note:
        When using `size_limit_in_bytes`, values must be of type `bytes`. The size
        calculation is approximate and includes overhead for the storage structure.
        Items larger than half the size limit are rejected to prevent storage
        from being dominated by a single item.
    """

    size_limit_in_bytes: int | None = None
    max_items: int | None = None
    default_ttl: float | None = None
    expiration_thread_delay: float = 10.0
    expiration_thread_max_checks_per_iteration: int = 10_000
    expiration_thread_log: bool = False
    expiration_disabled: bool = False

    # Internal tracking of approximate size in bytes
    _size_in_bytes: int = 0

    # Internal OrderedDict storing key-value pairs
    _data: OrderedDict[str, Value[T]] = field(default_factory=OrderedDict)

    # Internal threading lock for thread safety
    __lock: threading.Lock = field(default_factory=threading.Lock)

    # Internal background thread for cleaning expired entries
    __expiration_thread: ExpirationThread | None = None

    # Internal flag indicating whether the storage has been closed
    __closed: bool = False

    def __post_init__(self) -> None:
        # Validate configuration before allocating any resources
        if self.size_limit_in_bytes is not None and self.size_limit_in_bytes < 4096:
            raise ValueError("size_limit_in_bytes must be at least 4096")
        if self.max_items is not None and self.max_items < 1:
            raise ValueError("max_items must be at least 1")
        if self.default_ttl is not None and self.default_ttl < 0:
            raise ValueError("default_ttl cannot be negative")
        if self.expiration_thread_delay <= 0:
            raise ValueError("expiration_thread_delay must be positive")
        if self.expiration_thread_max_checks_per_iteration < 0:
            raise ValueError(
                "expiration_thread_max_checks_per_iteration cannot be negative"
            )

        self._size_in_bytes = sys.getsizeof(self._data)
        if not self.expiration_disabled:
            self.__expiration_thread = ExpirationThread(
                clean_callback=self._clean_expired,
                delay=self.expiration_thread_delay,
                max_checks_per_iteration=self.expiration_thread_max_checks_per_iteration,
                log=self.expiration_thread_log,
            )
            self.__expiration_thread.start()

    def close(self, wait: bool = False) -> None:
        """Close the storage and stop the expiration thread.

        Marks the storage as closed and stops the background expiration thread
        if it was started. After closing, all operations except `size_in_bytes`,
        `number_of_items`, and `get()` will raise a `RuntimeError`.

        Args:
            wait: If True, blocks until the expiration thread has fully stopped.
                If False, returns immediately after signaling the thread to stop.
                Defaults to False.

        Note:
            This method is idempotent - calling it multiple times
            or from multiple threads has no effect beyond the first call.
            It's recommended to call this method when you're done with the storage
            to ensure proper cleanup of background threads.
        """
        with self.__lock:
            if self.__closed:
                return
            self.__closed = True
        # Stop thread outside the lock to avoid potential deadlock
        # (thread's _clean_expired also acquires the lock)
        if not self.expiration_disabled:
            assert self.__expiration_thread is not None
            self.__expiration_thread.stop(wait=wait)

    def _assert_not_closed(self) -> None:
        if self.__closed:
            raise RuntimeError("Storage is closed")

    @property
    def size_in_bytes(self) -> int:
        """Get the approximate current size of the storage in bytes.

        Returns:
            The approximate size in bytes. This includes the size of stored values
                and overhead for the storage structure. The calculation is approximate
                and only accurate when storing `bytes` values.

        Note:
            This property can be accessed even after the storage is closed.
        """
        with self.__lock:
            return self._size_in_bytes

    @property
    def number_of_items(self) -> int:
        """Get the current number of items stored.

        Returns:
            The number of key-value pairs currently stored in the cache.

        Note:
            This property can be accessed even after the storage is closed.
        """
        with self.__lock:
            return len(self._data)

    def __delete(self, key: str) -> None:
        value_obj = self._data[key]
        self._size_in_bytes -= value_obj.size_in_bytes + PER_ITEM_APPROXIMATE_SIZE
        del self._data[key]

    def __delete_least_recently_used_item(self) -> None:
        try:
            _, value_obj = self._data.popitem(last=False)
        except KeyError:
            return
        # popitem already removed the item, so we only need to update size tracking
        self._size_in_bytes -= value_obj.size_in_bytes + PER_ITEM_APPROXIMATE_SIZE

    def __delete_least_recently_used_items(
        self,
        until_size_in_bytes: int | None = None,
        until_number_of_items: int | None = None,
    ) -> None:
        if until_size_in_bytes is not None:
            while self._size_in_bytes > until_size_in_bytes:
                self.__delete_least_recently_used_item()
        if until_number_of_items is not None:
            while len(self._data) > until_number_of_items:
                self.__delete_least_recently_used_item()

    def _calculate_size_needed_for_eviction(
        self, new_value_obj: Value[T], old_value_obj: Value[T] | None
    ) -> int | None:
        """Calculate the target size for eviction before adding a new value.

        Args:
            new_value_obj: The new value object to be stored.
            old_value_obj: The existing value object if overwriting, None if new key.

        Returns:
            The target size in bytes to evict down to, or None if no size limit.
        """
        if self.size_limit_in_bytes is None:
            return None

        if old_value_obj is not None:
            # When overwriting, PER_ITEM_APPROXIMATE_SIZE stays the same,
            # so we only need to account for the difference in value sizes
            net_size_change = new_value_obj.size_in_bytes - old_value_obj.size_in_bytes
            return self.size_limit_in_bytes - net_size_change
        else:
            # When adding a new item, need space for value + PER_ITEM overhead
            return (
                self.size_limit_in_bytes
                - new_value_obj.size_in_bytes
                - PER_ITEM_APPROXIMATE_SIZE
            )

    def set(
        self, key: str, value: T, ttl: float | DefaultTTLSentinel | None = DEFAULT_TTL
    ) -> None:
        """Store a value in the cache.

        Stores a key-value pair in the cache. If size or item limits are set and
        exceeded, the least recently used items are automatically evicted to make
        room. The item is moved to the end of the LRU order (most recently used).

        Args:
            key: The key to store the value under. Must be a string.
            value: The value to store. If `size_limit_in_bytes` is set, must be
                of type `bytes`.
            ttl: Time-to-live in seconds. Use `DEFAULT_TTL` to use the instance's
                `default_ttl` value, `None` to disable expiration for this item,
                or a float to set a specific TTL. If `expiration_disabled` is True,
                this parameter is ignored.

        Raises:
            RuntimeError: If the storage has been closed.
            ValueError: If `size_limit_in_bytes` is set and `value` is not `bytes`.

        Note:
            Items larger than half the `size_limit_in_bytes` are rejected to prevent
            a single large item from dominating the cache. If the value is rejected
            due to size, this method returns silently without storing the value.
        """
        # Validate value type and size before acquiring lock
        if self.size_limit_in_bytes is not None:
            if not isinstance(value, bytes):
                raise ValueError("Value must be bytes if size_limit_in_bytes is set")
            if len(value) > self.size_limit_in_bytes / 2:
                return

        # Resolve TTL before acquiring lock
        if self.expiration_disabled:
            ttl = None
        resolved_ttl: float | None = (
            self.default_ttl if isinstance(ttl, DefaultTTLSentinel) else ttl
        )

        value_obj = Value(value=value, ttl=resolved_ttl)

        with self.__lock:
            self._assert_not_closed()

            # Check if we're overwriting an existing key
            old_value_obj: Value[T] | None = self._data.get(key)
            is_overwriting = old_value_obj is not None

            # Compute target sizes
            until_size_in_bytes = self._calculate_size_needed_for_eviction(
                value_obj, old_value_obj
            )
            until_number_of_items: int | None = None
            if self.max_items is not None and not is_overwriting:
                until_number_of_items = self.max_items - 1

            # Evict items if needed to make room
            self.__delete_least_recently_used_items(
                until_size_in_bytes=until_size_in_bytes,
                until_number_of_items=until_number_of_items,
            )

            # Re-check: the key being overwritten may have been the LRU item and got evicted
            if is_overwriting and key not in self._data:
                is_overwriting = False
                old_value_obj = None

            # Update size tracking: calculate net change instead of subtract then add
            if is_overwriting:
                # Net change: new size - old size (PER_ITEM_APPROXIMATE_SIZE cancels out)
                assert old_value_obj is not None  # Type narrowing for type checker
                size_delta = value_obj.size_in_bytes - old_value_obj.size_in_bytes
                self._size_in_bytes += size_delta
            else:
                # New item: add value size + overhead
                self._size_in_bytes += (
                    value_obj.size_in_bytes + PER_ITEM_APPROXIMATE_SIZE
                )

            # Store the value and ensure it's at the end of the LRU order (most recently used)
            self._data[key] = value_obj
            self._data.move_to_end(key)

    def get(self, key: str) -> T | CacheMissSentinel:
        """Retrieve a value from the cache.

        Retrieves the value associated with the given key. If the key exists and
        the value hasn't expired, it is moved to the end of the LRU order (most
        recently used) and returned. If the key doesn't exist or the value has
        expired, `CACHE_MISS` is returned.

        Args:
            key: The key to look up.

        Returns:
            The stored value if found and not expired, or `CACHE_MISS` if the key
                doesn't exist or the value has expired.

        Note:
            Expired items are automatically deleted when accessed. This method does
            not raise exceptions for missing keys - use `CACHE_MISS` to check for
            cache misses. Unlike mutating operations (`set`, `delete`, `clear`),
            this method can be called even after the storage has been closed.
        """
        with self.__lock:
            value_obj = self._data.get(key)
            if value_obj is None:
                return CACHE_MISS
            if value_obj.is_expired:
                self.__delete(key)
                return CACHE_MISS
            self._data.move_to_end(key)
            return value_obj.value

    def delete(self, key: str) -> bool:
        """Delete a key-value pair from the cache.

        Removes the specified key and its associated value from the cache if it
        exists. The size tracking is updated accordingly.

        Args:
            key: The key to delete.

        Returns:
            True if the key existed and was deleted, False if the key didn't exist.

        Raises:
            RuntimeError: If the storage has been closed.
        """
        with self.__lock:
            self._assert_not_closed()
            if key not in self._data:
                return False
            self.__delete(key)
            return True

    def clear(self) -> None:
        """Clear all items from the cache.

        Removes all key-value pairs from the cache and resets the size tracking
        to the initial empty state. This operation is thread-safe.

        Raises:
            RuntimeError: If the storage has been closed.
        """
        with self.__lock:
            self._assert_not_closed()
            self._data.clear()
            self._size_in_bytes = sys.getsizeof(self._data)

    def _clean_expired(
        self, start: int | None = None, stop: int | None = None
    ) -> tuple[int, int]:
        """Clean expired items from the cache.

        Checks items in the specified range for expiration and removes those that
        have expired. This method is typically called by the background expiration
        thread, but can also be called manually.

        Args:
            start: Optional start index (inclusive) for the range of items to check.
                If None, checks from the beginning.
            stop: Optional stop index (exclusive) for the range of items to check.
                If None, checks until the end.

        Returns:
            A tuple of (tested_count, deleted_count) where:

                - tested_count: Number of items checked for expiration
                - deleted_count: Number of expired items that were deleted

        Raises:
            RuntimeError: If the storage has been closed.

        Note:
            If `expiration_disabled` is True, this method returns (0, 0) without
            performing any checks. The items are checked in insertion order (LRU order).
        """
        if self.expiration_disabled:
            return 0, 0
        tested: int = 0
        deleted: int = 0
        with self.__lock:
            self._assert_not_closed()
            expired_keys: list[str] = []
            for key, value_obj in islice(self._data.items(), start, stop):
                tested += 1
                if value_obj.is_expired:
                    expired_keys.append(key)
            for key in expired_keys:
                deleted += 1
                self.__delete(key)
        return tested, deleted

number_of_items property

Get the current number of items stored.

Returns:

Type Description
int

The number of key-value pairs currently stored in the cache.

Note

This property can be accessed even after the storage is closed.

size_in_bytes property

Get the approximate current size of the storage in bytes.

Returns:

Type Description
int

The approximate size in bytes. This includes the size of stored values and overhead for the storage structure. The calculation is approximate and only accurate when storing bytes values.

Note

This property can be accessed even after the storage is closed.

clear

Clear all items from the cache.

Removes all key-value pairs from the cache and resets the size tracking to the initial empty state. This operation is thread-safe.

Raises:

Type Description
RuntimeError

If the storage has been closed.

Source code in atomic_lru/_storage/storage.py
def clear(self) -> None:
    """Clear all items from the cache.

    Removes all key-value pairs from the cache and resets the size tracking
    to the initial empty state. This operation is thread-safe.

    Raises:
        RuntimeError: If the storage has been closed.
    """
    with self.__lock:
        self._assert_not_closed()
        self._data.clear()
        self._size_in_bytes = sys.getsizeof(self._data)

close

Close the storage and stop the expiration thread.

Marks the storage as closed and stops the background expiration thread if it was started. After closing, all operations except size_in_bytes, number_of_items, and get() will raise a RuntimeError.

Parameters:

Name Type Description Default
wait bool

If True, blocks until the expiration thread has fully stopped. If False, returns immediately after signaling the thread to stop. Defaults to False.

False
Note

This method is idempotent - calling it multiple times or from multiple threads has no effect beyond the first call. It's recommended to call this method when you're done with the storage to ensure proper cleanup of background threads.

Source code in atomic_lru/_storage/storage.py
def close(self, wait: bool = False) -> None:
    """Close the storage and stop the expiration thread.

    Marks the storage as closed and stops the background expiration thread
    if it was started. After closing, all operations except `size_in_bytes`,
    `number_of_items`, and `get()` will raise a `RuntimeError`.

    Args:
        wait: If True, blocks until the expiration thread has fully stopped.
            If False, returns immediately after signaling the thread to stop.
            Defaults to False.

    Note:
        This method is idempotent - calling it multiple times
        or from multiple threads has no effect beyond the first call.
        It's recommended to call this method when you're done with the storage
        to ensure proper cleanup of background threads.
    """
    with self.__lock:
        if self.__closed:
            return
        self.__closed = True
    # Stop thread outside the lock to avoid potential deadlock
    # (thread's _clean_expired also acquires the lock)
    if not self.expiration_disabled:
        assert self.__expiration_thread is not None
        self.__expiration_thread.stop(wait=wait)

delete

Delete a key-value pair from the cache.

Removes the specified key and its associated value from the cache if it exists. The size tracking is updated accordingly.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Returns:

Type Description
bool

True if the key existed and was deleted, False if the key didn't exist.

Raises:

Type Description
RuntimeError

If the storage has been closed.

Source code in atomic_lru/_storage/storage.py
def delete(self, key: str) -> bool:
    """Delete a key-value pair from the cache.

    Removes the specified key and its associated value from the cache if it
    exists. The size tracking is updated accordingly.

    Args:
        key: The key to delete.

    Returns:
        True if the key existed and was deleted, False if the key didn't exist.

    Raises:
        RuntimeError: If the storage has been closed.
    """
    with self.__lock:
        self._assert_not_closed()
        if key not in self._data:
            return False
        self.__delete(key)
        return True

get

Retrieve a value from the cache.

Retrieves the value associated with the given key. If the key exists and the value hasn't expired, it is moved to the end of the LRU order (most recently used) and returned. If the key doesn't exist or the value has expired, CACHE_MISS is returned.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Returns:

Type Description
T | CacheMissSentinel

The stored value if found and not expired, or CACHE_MISS if the key doesn't exist or the value has expired.

Note

Expired items are automatically deleted when accessed. This method does not raise exceptions for missing keys - use CACHE_MISS to check for cache misses. Unlike mutating operations (set, delete, clear), this method can be called even after the storage has been closed.

Source code in atomic_lru/_storage/storage.py
def get(self, key: str) -> T | CacheMissSentinel:
    """Retrieve a value from the cache.

    Retrieves the value associated with the given key. If the key exists and
    the value hasn't expired, it is moved to the end of the LRU order (most
    recently used) and returned. If the key doesn't exist or the value has
    expired, `CACHE_MISS` is returned.

    Args:
        key: The key to look up.

    Returns:
        The stored value if found and not expired, or `CACHE_MISS` if the key
            doesn't exist or the value has expired.

    Note:
        Expired items are automatically deleted when accessed. This method does
        not raise exceptions for missing keys - use `CACHE_MISS` to check for
        cache misses. Unlike mutating operations (`set`, `delete`, `clear`),
        this method can be called even after the storage has been closed.
    """
    with self.__lock:
        value_obj = self._data.get(key)
        if value_obj is None:
            return CACHE_MISS
        if value_obj.is_expired:
            self.__delete(key)
            return CACHE_MISS
        self._data.move_to_end(key)
        return value_obj.value

set

Store a value in the cache.

Stores a key-value pair in the cache. If size or item limits are set and exceeded, the least recently used items are automatically evicted to make room. The item is moved to the end of the LRU order (most recently used).

Parameters:

Name Type Description Default
key str

The key to store the value under. Must be a string.

required
value T

The value to store. If size_limit_in_bytes is set, must be of type bytes.

required
ttl float | DefaultTTLSentinel | None

Time-to-live in seconds. Use DEFAULT_TTL to use the instance's default_ttl value, None to disable expiration for this item, or a float to set a specific TTL. If expiration_disabled is True, this parameter is ignored.

DEFAULT_TTL

Raises:

Type Description
RuntimeError

If the storage has been closed.

ValueError

If size_limit_in_bytes is set and value is not bytes.

Note

Items larger than half the size_limit_in_bytes are rejected to prevent a single large item from dominating the cache. If the value is rejected due to size, this method returns silently without storing the value.

Source code in atomic_lru/_storage/storage.py
def set(
    self, key: str, value: T, ttl: float | DefaultTTLSentinel | None = DEFAULT_TTL
) -> None:
    """Store a value in the cache.

    Stores a key-value pair in the cache. If size or item limits are set and
    exceeded, the least recently used items are automatically evicted to make
    room. The item is moved to the end of the LRU order (most recently used).

    Args:
        key: The key to store the value under. Must be a string.
        value: The value to store. If `size_limit_in_bytes` is set, must be
            of type `bytes`.
        ttl: Time-to-live in seconds. Use `DEFAULT_TTL` to use the instance's
            `default_ttl` value, `None` to disable expiration for this item,
            or a float to set a specific TTL. If `expiration_disabled` is True,
            this parameter is ignored.

    Raises:
        RuntimeError: If the storage has been closed.
        ValueError: If `size_limit_in_bytes` is set and `value` is not `bytes`.

    Note:
        Items larger than half the `size_limit_in_bytes` are rejected to prevent
        a single large item from dominating the cache. If the value is rejected
        due to size, this method returns silently without storing the value.
    """
    # Validate value type and size before acquiring lock
    if self.size_limit_in_bytes is not None:
        if not isinstance(value, bytes):
            raise ValueError("Value must be bytes if size_limit_in_bytes is set")
        if len(value) > self.size_limit_in_bytes / 2:
            return

    # Resolve TTL before acquiring lock
    if self.expiration_disabled:
        ttl = None
    resolved_ttl: float | None = (
        self.default_ttl if isinstance(ttl, DefaultTTLSentinel) else ttl
    )

    value_obj = Value(value=value, ttl=resolved_ttl)

    with self.__lock:
        self._assert_not_closed()

        # Check if we're overwriting an existing key
        old_value_obj: Value[T] | None = self._data.get(key)
        is_overwriting = old_value_obj is not None

        # Compute target sizes
        until_size_in_bytes = self._calculate_size_needed_for_eviction(
            value_obj, old_value_obj
        )
        until_number_of_items: int | None = None
        if self.max_items is not None and not is_overwriting:
            until_number_of_items = self.max_items - 1

        # Evict items if needed to make room
        self.__delete_least_recently_used_items(
            until_size_in_bytes=until_size_in_bytes,
            until_number_of_items=until_number_of_items,
        )

        # Re-check: the key being overwritten may have been the LRU item and got evicted
        if is_overwriting and key not in self._data:
            is_overwriting = False
            old_value_obj = None

        # Update size tracking: calculate net change instead of subtract then add
        if is_overwriting:
            # Net change: new size - old size (PER_ITEM_APPROXIMATE_SIZE cancels out)
            assert old_value_obj is not None  # Type narrowing for type checker
            size_delta = value_obj.size_in_bytes - old_value_obj.size_in_bytes
            self._size_in_bytes += size_delta
        else:
            # New item: add value size + overhead
            self._size_in_bytes += (
                value_obj.size_in_bytes + PER_ITEM_APPROXIMATE_SIZE
            )

        # Store the value and ensure it's at the end of the LRU order (most recently used)
        self._data[key] = value_obj
        self._data.move_to_end(key)

Other objects and types

Sentinel objects

atomic_lru.CACHE_MISS = CacheMissSentinel() module-attribute

Sentinel instance returned when a cache lookup fails.

This constant is returned by get() methods when the requested key doesn't exist in the cache or the value has expired. Use is or is not to check for cache misses:

```python
if cache.get("key") is CACHE_MISS:
    # Handle cache miss
```

atomic_lru.DEFAULT_TTL = DefaultTTLSentinel() module-attribute

Sentinel instance indicating use of the default TTL.

Use this constant when calling set() to indicate that the instance's default_ttl should be used for the item being stored.

Other types

atomic_lru.CacheMissSentinel

Sentinel value returned when a cache lookup fails.

This class is used as a sentinel value to indicate that a requested key was not found in the cache or the value has expired. It's returned by get() methods instead of raising an exception or returning None.

Example
from atomic_lru import Cache, CACHE_MISS
cache = Cache()
result = cache.get("nonexistent_key")
if result is CACHE_MISS:
    print("Key not found")
else:
    print(f"Found: {result}")
Source code in atomic_lru/_storage/types.py
class CacheMissSentinel:
    """Sentinel value returned when a cache lookup fails.

    This class is used as a sentinel value to indicate that a requested key
    was not found in the cache or the value has expired. It's returned by
    `get()` methods instead of raising an exception or returning None.

    Example:
        ```python
        from atomic_lru import Cache, CACHE_MISS
        cache = Cache()
        result = cache.get("nonexistent_key")
        if result is CACHE_MISS:
            print("Key not found")
        else:
            print(f"Found: {result}")
        ```
    """

    def __repr__(self) -> str:
        return "<CacheMiss>"

atomic_lru.DefaultTTLSentinel

Sentinel value indicating that the default TTL should be used.

This class is used as a sentinel value to indicate that when setting a cache value, the instance's default_ttl should be used instead of explicitly providing a TTL value or disabling expiration.

Example
from atomic_lru import Cache, DEFAULT_TTL
cache = Cache(default_ttl=3600)  # 1 hour default
# Use default TTL
cache.set("key1", "value1", ttl=DEFAULT_TTL)
# Override with specific TTL
cache.set("key2", "value2", ttl=1800)  # 30 minutes
# Disable expiration for this item
cache.set("key3", "value3", ttl=None)
Source code in atomic_lru/_storage/types.py
class DefaultTTLSentinel:
    """Sentinel value indicating that the default TTL should be used.

    This class is used as a sentinel value to indicate that when setting a cache
    value, the instance's `default_ttl` should be used instead of explicitly
    providing a TTL value or disabling expiration.

    Example:
        ```python
        from atomic_lru import Cache, DEFAULT_TTL
        cache = Cache(default_ttl=3600)  # 1 hour default
        # Use default TTL
        cache.set("key1", "value1", ttl=DEFAULT_TTL)
        # Override with specific TTL
        cache.set("key2", "value2", ttl=1800)  # 30 minutes
        # Disable expiration for this item
        cache.set("key3", "value3", ttl=None)
        ```
    """

    def __repr__(self) -> str:
        return "<DefaultTTL>"

atomic_lru.Serializer

Bases: Protocol

Protocol for serializing values to bytes.

A serializer is a callable that takes any value and converts it to bytes for storage in the cache. The default implementation uses pickle.

Example
def my_serializer(value: Any) -> bytes:
    return json.dumps(value).encode('utf-8')
Source code in atomic_lru/_cache.py
class Serializer(Protocol):
    """Protocol for serializing values to bytes.

    A serializer is a callable that takes any value and converts it to bytes
    for storage in the cache. The default implementation uses pickle.

    Example:
        ```python
        def my_serializer(value: Any) -> bytes:
            return json.dumps(value).encode('utf-8')
        ```
    """

    def __call__(self, value: Any) -> bytes: ...

atomic_lru.Deserializer

Bases: Protocol

Protocol for deserializing values from bytes.

A deserializer is a callable that takes bytes and converts them back to the original value. It should be the inverse of the serializer.

Example
def my_deserializer(value: bytes) -> Any:
    return json.loads(value.decode('utf-8'))
Source code in atomic_lru/_cache.py
class Deserializer(Protocol):
    """Protocol for deserializing values from bytes.

    A deserializer is a callable that takes bytes and converts them back to
    the original value. It should be the inverse of the serializer.

    Example:
        ```python
        def my_deserializer(value: bytes) -> Any:
            return json.loads(value.decode('utf-8'))
        ```
    """

    def __call__(self, value: bytes) -> Any: ...