summaryrefslogtreecommitdiff
path: root/database.py
diff options
context:
space:
mode:
authorTyler Hoang <tyler@tylerhoang.xyz>2026-05-06 12:21:26 -0700
committerTyler Hoang <tyler@tylerhoang.xyz>2026-05-06 12:21:26 -0700
commite708bec6cd76c2686de4158dde4d04f72a3c300d (patch)
tree04b0bc4738e090dd7834d47478c7e652da010f92 /database.py
init: lumiere film diary
Diffstat (limited to 'database.py')
-rw-r--r--database.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/database.py b/database.py
new file mode 100644
index 0000000..addcd30
--- /dev/null
+++ b/database.py
@@ -0,0 +1,90 @@
+import os
+from collections.abc import Generator
+
+from dotenv import load_dotenv
+from sqlalchemy import create_engine, inspect
+from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
+
+load_dotenv()
+
+DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./lumiere.db")
+
+connect_args = {"check_same_thread": False} if DATABASE_URL.startswith("sqlite") else {}
+engine = create_engine(DATABASE_URL, connect_args=connect_args)
+SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
+
+
+class Base(DeclarativeBase):
+ pass
+
+
+def get_db() -> Generator[Session, None, None]:
+ db = SessionLocal()
+ try:
+ yield db
+ finally:
+ db.close()
+
+
+def init_db() -> None:
+ import models
+
+ _rebuild_films_table_if_needed(models.Film)
+ Base.metadata.create_all(bind=engine)
+
+
+def _rebuild_films_table_if_needed(film_model) -> None:
+ if not DATABASE_URL.startswith("sqlite"):
+ return
+
+ inspector = inspect(engine)
+ if "films" not in inspector.get_table_names():
+ return
+
+ existing_columns = {column["name"] for column in inspector.get_columns("films")}
+ expected_columns = {column.name for column in film_model.__table__.columns}
+ if existing_columns == expected_columns:
+ return
+
+ legacy_indexes = [index["name"] for index in inspector.get_indexes("films")]
+ legacy_table = "films_legacy_schema"
+ with engine.begin() as connection:
+ connection.exec_driver_sql(f"DROP TABLE IF EXISTS {legacy_table}")
+ connection.exec_driver_sql(f"ALTER TABLE films RENAME TO {legacy_table}")
+ for index_name in legacy_indexes:
+ connection.exec_driver_sql(f"DROP INDEX IF EXISTS {index_name}")
+ film_model.__table__.create(bind=connection)
+
+ target_columns = []
+ select_expressions = []
+ for column in film_model.__table__.columns:
+ name = column.name
+ if name in existing_columns:
+ target_columns.append(name)
+ select_expressions.append(name)
+ elif name == "context" and "mood_tags" in existing_columns:
+ target_columns.append(name)
+ select_expressions.append("mood_tags")
+ elif name == "rewatch":
+ target_columns.append(name)
+ select_expressions.append("0")
+ elif name in {"rewatch_count", "stars"}:
+ target_columns.append(name)
+ select_expressions.append("0")
+ elif name == "shelf":
+ target_columns.append(name)
+ select_expressions.append("'diary'")
+ elif name in {"created_at", "updated_at"}:
+ target_columns.append(name)
+ select_expressions.append("CURRENT_TIMESTAMP")
+
+ if target_columns:
+ connection.exec_driver_sql(
+ f"""
+ INSERT INTO films ({", ".join(target_columns)})
+ SELECT {", ".join(select_expressions)}
+ FROM {legacy_table}
+ """
+ )
+
+ connection.exec_driver_sql(f"DROP TABLE {legacy_table}")