diff options
| author | Tyler Hoang <tyler@tylerhoang.xyz> | 2026-05-06 12:21:26 -0700 |
|---|---|---|
| committer | Tyler Hoang <tyler@tylerhoang.xyz> | 2026-05-06 12:21:26 -0700 |
| commit | e708bec6cd76c2686de4158dde4d04f72a3c300d (patch) | |
| tree | 04b0bc4738e090dd7834d47478c7e652da010f92 /database.py | |
init: lumiere film diary
Diffstat (limited to 'database.py')
| -rw-r--r-- | database.py | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/database.py b/database.py new file mode 100644 index 0000000..addcd30 --- /dev/null +++ b/database.py @@ -0,0 +1,90 @@ +import os +from collections.abc import Generator + +from dotenv import load_dotenv +from sqlalchemy import create_engine, inspect +from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker + +load_dotenv() + +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./lumiere.db") + +connect_args = {"check_same_thread": False} if DATABASE_URL.startswith("sqlite") else {} +engine = create_engine(DATABASE_URL, connect_args=connect_args) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +class Base(DeclarativeBase): + pass + + +def get_db() -> Generator[Session, None, None]: + db = SessionLocal() + try: + yield db + finally: + db.close() + + +def init_db() -> None: + import models + + _rebuild_films_table_if_needed(models.Film) + Base.metadata.create_all(bind=engine) + + +def _rebuild_films_table_if_needed(film_model) -> None: + if not DATABASE_URL.startswith("sqlite"): + return + + inspector = inspect(engine) + if "films" not in inspector.get_table_names(): + return + + existing_columns = {column["name"] for column in inspector.get_columns("films")} + expected_columns = {column.name for column in film_model.__table__.columns} + if existing_columns == expected_columns: + return + + legacy_indexes = [index["name"] for index in inspector.get_indexes("films")] + legacy_table = "films_legacy_schema" + with engine.begin() as connection: + connection.exec_driver_sql(f"DROP TABLE IF EXISTS {legacy_table}") + connection.exec_driver_sql(f"ALTER TABLE films RENAME TO {legacy_table}") + for index_name in legacy_indexes: + connection.exec_driver_sql(f"DROP INDEX IF EXISTS {index_name}") + film_model.__table__.create(bind=connection) + + target_columns = [] + select_expressions = [] + for column in film_model.__table__.columns: + name = column.name + if name in existing_columns: + target_columns.append(name) + select_expressions.append(name) + elif name == "context" and "mood_tags" in existing_columns: + target_columns.append(name) + select_expressions.append("mood_tags") + elif name == "rewatch": + target_columns.append(name) + select_expressions.append("0") + elif name in {"rewatch_count", "stars"}: + target_columns.append(name) + select_expressions.append("0") + elif name == "shelf": + target_columns.append(name) + select_expressions.append("'diary'") + elif name in {"created_at", "updated_at"}: + target_columns.append(name) + select_expressions.append("CURRENT_TIMESTAMP") + + if target_columns: + connection.exec_driver_sql( + f""" + INSERT INTO films ({", ".join(target_columns)}) + SELECT {", ".join(select_expressions)} + FROM {legacy_table} + """ + ) + + connection.exec_driver_sql(f"DROP TABLE {legacy_table}") |
