import csv import io from datetime import datetime import httpx from fastapi import APIRouter, Depends, File, Request, UploadFile from fastapi.responses import RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import or_, text from sqlalchemy.orm import Session from database import get_db from models import Film from services.tmdb import TMDBNotConfiguredError, apply_metadata_to_film, find_movie router = APIRouter(tags=["imports"]) templates = Jinja2Templates(directory="templates") def _value(row: dict, *names: str) -> str: for name in names: value = row.get(name) if value: return value.strip() return "" def _parse_int(value: str) -> int | None: value = value.strip() if not value: return None try: return int(value) except ValueError: return None def _parse_bool(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "y", "on", "rewatched"} def _parse_explicit_stars(value: str) -> int | None: value = value.strip() if not value: return None try: stars = int(value) except ValueError: return None return stars if stars in {0, 1, 2, 3} else None def _parse_rating_to_stars(value: str) -> int: value = value.strip() if not value: return 0 try: rating = float(value) except ValueError: return 0 if rating < 0 or rating > 5: return 0 if rating >= 5: return 3 if rating >= 4.5: return 2 if rating >= 3.5: return 1 return 0 def _stars_from_row(row: dict) -> int: explicit_stars = _parse_explicit_stars(_value(row, "Stars")) if explicit_stars is not None: return explicit_stars return _parse_rating_to_stars(_value(row, "Rating")) def _parse_shelf(value: str) -> str: shelf = value.strip().lower() return shelf if shelf in {"diary", "queue", "abandoned"} else "diary" def _parse_date(value: str): value = value.strip() if not value: return None for date_format in ("%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"): try: return datetime.strptime(value, date_format).date() except ValueError: continue return None def _normalize_title(value: str | None) -> str: return (value or "").casefold().strip() def _date_key(value) -> str: return value.isoformat() if value else "" def _duplicate_keys_for_film(film: Film) -> set[tuple]: keys = set() watched_date = _date_key(film.date_watched) if film.tmdb_id: keys.add(("tmdb", str(film.tmdb_id), watched_date)) title = _normalize_title(film.title) if title: keys.add(("title", title, str(film.year or ""), watched_date)) return keys def _existing_duplicate_keys(db: Session) -> set[tuple]: keys = set() for film in db.query(Film).all(): keys.update(_duplicate_keys_for_film(film)) return keys def _decode_csv(content: bytes) -> str: for encoding in ("utf-8-sig", "utf-8", "latin-1"): try: return content.decode(encoding) except UnicodeDecodeError: continue return content.decode("utf-8", errors="replace") def _film_from_row(row: dict, shelf: str) -> Film | None: title = _value(row, "Name", "Title") if not title: return None is_diary = shelf == "diary" return Film( title=title, original_title=_value(row, "Original Title") or None, director=_value(row, "Director") or None, year=_parse_int(_value(row, "Year")), country=_value(row, "Country") or None, language=_value(row, "Language") or None, runtime=_parse_int(_value(row, "Runtime", "Runtime (mins)")), date_watched=_parse_date(_value(row, "Watched Date", "Date")) if is_diary else None, rewatch=_parse_bool(_value(row, "Rewatch")) if is_diary else False, rewatch_count=(_parse_int(_value(row, "Rewatch Count")) or 0) if is_diary else 0, stars=_stars_from_row(row) if is_diary else 0, watched_with=_value(row, "Watched With", "Watched_with") or None, shelf=shelf, how_found=_value(row, "How Found", "How_found") or None, context=_value(row, "Context", "Tags") or None, notes=_value(row, "Review", "Notes") or None, poster_url=_value(row, "Poster URL", "Poster") or None, tmdb_id=_parse_int(_value(row, "TMDB ID", "tmdb_id")), ) async def _dedupe_enrich_and_save( films: list[Film], db: Session, ) -> tuple[list[Film], int, int]: skipped = 0 existing_keys = _existing_duplicate_keys(db) pending_keys = set() deduped_before_enrichment = [] for film in films: film_keys = _duplicate_keys_for_film(film) if film_keys & (existing_keys | pending_keys): skipped += 1 continue deduped_before_enrichment.append(film) pending_keys.update(film_keys) enriched = await _enrich_films_from_tmdb(deduped_before_enrichment) deduped_after_enrichment = [] final_keys = set(existing_keys) for film in deduped_before_enrichment: film_keys = _duplicate_keys_for_film(film) if film_keys & final_keys: skipped += 1 continue deduped_after_enrichment.append(film) final_keys.update(film_keys) if deduped_after_enrichment: db.add_all(deduped_after_enrichment) db.commit() return deduped_after_enrichment, skipped, enriched async def _enrich_films_from_tmdb(films: list[Film]) -> int: enriched = 0 if not films: return enriched async with httpx.AsyncClient(timeout=10.0) as client: for film in films: if film.poster_url and film.tmdb_id: continue try: metadata = await find_movie(film.title, year=film.year, client=client) except TMDBNotConfiguredError: return enriched except httpx.HTTPError: continue if metadata and apply_metadata_to_film(film, metadata): enriched += 1 return enriched @router.get("/import") def import_page(request: Request): return templates.TemplateResponse( request=request, name="import.html", context={"request": request, "error": None, "active_page": "import"}, ) @router.post("/data/clear") def clear_data(db: Session = Depends(get_db)): deleted = db.query(Film).delete(synchronize_session=False) bind = db.get_bind() if bind.dialect.name == "sqlite": db.execute(text("DELETE FROM sqlite_sequence WHERE name = 'films'")) db.commit() return RedirectResponse(f"/?cleared={deleted}", status_code=303) @router.post("/data/clear-duplicates") def clear_duplicates(db: Session = Depends(get_db)): seen_keys = set() duplicate_ids = [] for film in db.query(Film).order_by(Film.id.asc()).all(): film_keys = _duplicate_keys_for_film(film) if film_keys & seen_keys: duplicate_ids.append(film.id) continue seen_keys.update(film_keys) deleted = 0 if duplicate_ids: deleted = ( db.query(Film) .filter(Film.id.in_(duplicate_ids)) .delete(synchronize_session=False) ) db.commit() return RedirectResponse(f"/?deduped={deleted}", status_code=303) @router.post("/data/enrich-posters") async def enrich_missing_posters(db: Session = Depends(get_db)): films = ( db.query(Film) .filter(or_(Film.poster_url.is_(None), Film.poster_url == "", Film.tmdb_id.is_(None))) .order_by(Film.year.asc(), Film.title.asc()) .all() ) enriched = await _enrich_films_from_tmdb(films) if enriched: db.commit() return RedirectResponse(f"/?enriched={enriched}", status_code=303) @router.post("/import/letterboxd") async def import_letterboxd( request: Request, file: UploadFile = File(...), db: Session = Depends(get_db), ): if not file.filename.lower().endswith(".csv"): return templates.TemplateResponse( request=request, name="import.html", context={"request": request, "error": "Upload a CSV file.", "active_page": "import"}, status_code=400, ) content = await file.read() reader = csv.DictReader(io.StringIO(_decode_csv(content))) films = [] for row in reader: if not any(value.strip() for value in row.values() if value): continue film = _film_from_row(row, _parse_shelf(_value(row, "Shelf"))) if film is None: continue films.append(film) films, skipped, enriched = await _dedupe_enrich_and_save(films, db) return RedirectResponse( f"/?imported={len(films)}&skipped={skipped}&enriched={enriched}", status_code=303, ) @router.post("/import/watchlist") async def import_watchlist( request: Request, file: UploadFile = File(...), db: Session = Depends(get_db), ): if not file.filename.lower().endswith(".csv"): return templates.TemplateResponse( request=request, name="import.html", context={"request": request, "error": "Upload a CSV file.", "active_page": "import"}, status_code=400, ) content = await file.read() reader = csv.DictReader(io.StringIO(_decode_csv(content))) films = [] for row in reader: if not any(value.strip() for value in row.values() if value): continue film = _film_from_row(row, "queue") if film is None: continue films.append(film) films, skipped, enriched = await _dedupe_enrich_and_save(films, db) return RedirectResponse( f"/queue?imported={len(films)}&skipped={skipped}&enriched={enriched}", status_code=303, )