1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/tests/unit/test_db_pool.py

81 lines
2.2 KiB
Python

"""Tests for the thread-local SQLite ConnectionPool."""
import sqlite3
import threading
from pathlib import Path
import pytest
from infrastructure.db_pool import ConnectionPool
pytestmark = pytest.mark.unit
def test_pool_creates_connection(tmp_path: Path):
"""Test that the pool successfully creates a SQLite connection."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn = pool.get_connection()
assert isinstance(conn, sqlite3.Connection)
cursor = conn.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
def test_pool_reuses_connection_same_thread(tmp_path: Path):
"""Test that multiple calls in the same thread return the same connection."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
conn2 = pool.get_connection()
assert conn1 is conn2
def test_pool_different_connections_different_threads(tmp_path: Path):
"""Test that different threads receive distinct connections."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
conn2_list = []
def _worker():
conn2_list.append(pool.get_connection())
thread = threading.Thread(target=_worker)
thread.start()
thread.join()
assert len(conn2_list) == 1
conn2 = conn2_list[0]
assert conn1 is not conn2
def test_pool_close_connection(tmp_path: Path):
"""Test that connection is closed and cleared from thread local."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
pool.close_connection()
# Getting a new connection should create a new object
conn2 = pool.get_connection()
assert conn1 is not conn2
def test_pool_context_manager(tmp_path: Path):
"""Test that the context manager yields a connection and closes it after."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
with pool.connection() as conn1:
assert isinstance(conn1, sqlite3.Connection)
# After exiting the context manager, the connection should be closed implicitly
# resulting in a new connection object for the next request.
conn2 = pool.get_connection()
assert conn1 is not conn2