This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/infrastructure/db_pool.py

85 lines
2.7 KiB
Python

"""Thread-local SQLite connection pool.
Provides a ConnectionPool class that manages SQLite connections per thread,
with support for context managers and automatic cleanup.
"""
import sqlite3
import threading
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
class ConnectionPool:
"""Thread-local SQLite connection pool.
Each thread gets its own connection, which is reused for subsequent
requests from the same thread. Connections are automatically cleaned
up when close_connection() is called or the context manager exits.
"""
def __init__(self, db_path: Path | str) -> None:
"""Initialize the connection pool.
Args:
db_path: Path to the SQLite database file.
"""
self._db_path = Path(db_path)
self._local = threading.local()
def _ensure_db_exists(self) -> None:
"""Ensure the database directory exists."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
def get_connection(self) -> sqlite3.Connection:
"""Get a connection for the current thread.
Creates a new connection if one doesn't exist for this thread,
otherwise returns the existing connection.
Returns:
A sqlite3 Connection object.
"""
if not hasattr(self._local, "conn") or self._local.conn is None:
self._ensure_db_exists()
self._local.conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
self._local.conn.row_factory = sqlite3.Row
return self._local.conn
def close_connection(self) -> None:
"""Close the connection for the current thread.
Cleans up the thread-local storage. Safe to call even if
no connection exists for this thread.
"""
if hasattr(self._local, "conn") and self._local.conn is not None:
self._local.conn.close()
self._local.conn = None
@contextmanager
def connection(self) -> Generator[sqlite3.Connection, None, None]:
"""Context manager for getting and automatically closing a connection.
Yields:
A sqlite3 Connection object.
Example:
with pool.connection() as conn:
cursor = conn.execute("SELECT 1")
result = cursor.fetchone()
"""
conn = self.get_connection()
try:
yield conn
finally:
self.close_connection()
def close_all(self) -> None:
"""Close all connections (useful for testing).
Note: This only closes the connection for the current thread.
In a multi-threaded environment, each thread must close its own.
"""
self.close_connection()