from collections import Counter, defaultdict from calendar import month_name from datetime import date, timedelta from fastapi import APIRouter, Depends, Request from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from database import get_db from models import Film from services.countries import ( ISO_NUMERIC_TO_COUNTRY_NAME, country_name_to_iso_numeric, split_country_names, ) from services.film_people import split_credit_names def split_genre_names(genre_str: str | None) -> list[str]: if not genre_str: return [] return [name.strip() for name in genre_str.split(",") if name.strip()] router = APIRouter(tags=["stats"]) templates = Jinja2Templates(directory="templates") def _build_stats_payload(films: list[Film]) -> dict: countries = Counter() country_codes = Counter() decades = Counter() directors = Counter() genres = Counter() star_counts = Counter({0: 0, 1: 0, 2: 0, 3: 0}) months = Counter() days = Counter() watched_with = Counter() for film in films: country_names = split_country_names(film.country) countries.update(country_names) for country in country_names: iso_numeric = country_name_to_iso_numeric(country) if iso_numeric is not None: country_codes[iso_numeric] += 1 if film.year: decade = (film.year // 10) * 10 decades[f"{decade}s"] += 1 genres.update(split_genre_names(film.genre)) directors.update(split_credit_names(film.director)) stars = film.stars if film.stars in {0, 1, 2, 3} else 0 star_counts[stars] += 1 if film.date_watched: months[film.date_watched.strftime("%Y-%m")] += 1 days[film.date_watched.isoformat()] += 1 companions = split_credit_names(film.watched_with) if companions: watched_with.update(companions) else: watched_with["solo"] += 1 total_watched = len(films) total_runtime_minutes = sum(film.runtime for film in films if film.runtime) title_groups = defaultdict(list) for film in films: title_groups[film.title].append(film) rewatch_details = [] for title, entries in title_groups.items(): if len(entries) < 2: continue sorted_entries = sorted(entries, key=lambda f: f.date_watched or date.min) ratings = [e.stars for e in sorted_entries] first_date = sorted_entries[0].date_watched last_date = sorted_entries[-1].date_watched days_between = (last_date - first_date).days if first_date and last_date else None rewatch_details.append({ "title": title, "watches": len(sorted_entries), "ratings": ratings, "days_between": days_between, "rating_changed": len({r for r in ratings if r > 0}) > 1, }) rewatch_details.sort(key=lambda x: (-x["watches"], x["title"])) today = date.today() start_day = today - timedelta(days=364) trailing_days = [] cursor = start_day while cursor <= today: trailing_days.append({"date": cursor.isoformat(), "count": days[cursor.isoformat()]}) cursor += timedelta(days=1) return { "scope": { "shelf": "diary", "requires_date_watched": True, }, "total_watched": total_watched, "total_runtime_minutes": total_runtime_minutes, "films_per_country": [ {"country": country, "count": count} for country, count in sorted(countries.items(), key=lambda item: (-item[1], item[0])) ], "films_per_country_codes": [ {"code": code, "count": count} for code, count in sorted(country_codes.items(), key=lambda item: (-item[1], item[0])) ], "country_labels_by_code": { str(code): ISO_NUMERIC_TO_COUNTRY_NAME.get(code, str(code)) for code in country_codes }, "most_watched_directors": [ {"director": director, "count": count} for director, count in sorted(directors.items(), key=lambda item: (-item[1], item[0])) ], "films_per_decade": [ {"decade": decade, "count": count} for decade, count in sorted(decades.items(), key=lambda item: (item[0], -item[1])) ], "films_per_genre": [ {"genre": genre, "count": count} for genre, count in sorted(genres.items(), key=lambda item: (-item[1], item[0])) ], "star_distribution": [{"stars": stars, "count": star_counts[stars]} for stars in (0, 1, 2, 3)], "films_per_month": [ {"month": month, "count": count} for month, count in sorted(months.items()) ], "films_per_day": [ {"date": watched_date, "count": count} for watched_date, count in sorted(days.items()) ], "films_per_day_365": trailing_days, "rewatch_patterns": rewatch_details, "watched_with_breakdown": [ {"watched_with": watched_with_value, "count": count} for watched_with_value, count in sorted(watched_with.items(), key=lambda item: (-item[1], item[0])) ], } def _diary_films(db: Session) -> list[Film]: return ( db.query(Film) .filter(Film.shelf == "diary", Film.date_watched.is_not(None)) .all() ) def _available_years(films: list[Film]) -> list[int]: years = {film.date_watched.year for film in films if film.date_watched} return sorted(years, reverse=True) def _format_film_excerpt(notes: str | None, limit: int = 140) -> str | None: if not notes: return None excerpt = " ".join(notes.split()) if len(excerpt) <= limit: return excerpt return excerpt[: limit - 1].rstrip() + "..." def _film_highlight_payload(film: Film) -> dict: return { "id": film.id, "title": film.title, "poster_url": film.poster_url, "director": film.director, "year": film.year, "date_watched": film.date_watched.isoformat() if film.date_watched else None, "stars": film.stars, "notes_excerpt": _format_film_excerpt(film.notes), "runtime": film.runtime, "country": film.country, "language": film.language, } def _select_year(selected_year: int | None, available_years: list[int]) -> int | None: if selected_year is not None: return selected_year if available_years: return available_years[0] return None def _films_for_year(films: list[Film], year: int) -> list[Film]: return [ film for film in films if film.date_watched and film.date_watched.year == year ] def _year_review_payload(db: Session, year: int | None) -> dict: diary_films = _diary_films(db) available_years = _available_years(diary_films) selected_year = _select_year(year, available_years) if selected_year is None: return { "selected_year": date.today().year if year is None else year, "available_years": [], "total_watched": 0, "average_stars": 0, "most_watched_directors": [], "films_per_decade": [], "films_per_genre": [], "star_distribution": [{"stars": stars, "count": 0} for stars in (0, 1, 2, 3)], "films_per_month": [{"month": month_name[index], "count": 0} for index in range(1, 13)], "rewatch_rate": {"rewatched": 0, "total_watched": 0, "rate": 0}, "watched_with_breakdown": [], "top_director": None, "top_month": None, "highlight_films": { "highest_rated": [], "first_watch": None, "last_watch": None, "most_rewatched": None, }, } year_films = _films_for_year(diary_films, selected_year) countries = Counter() decades = Counter() directors = Counter() genres = Counter() star_counts = Counter({0: 0, 1: 0, 2: 0, 3: 0}) months = Counter({month_index: 0 for month_index in range(1, 13)}) watched_with = Counter() for film in year_films: countries.update(split_country_names(film.country)) if film.year: decade = (film.year // 10) * 10 decades[f"{decade}s"] += 1 genres.update(split_genre_names(film.genre)) directors.update(split_credit_names(film.director)) stars = film.stars if film.stars in {0, 1, 2, 3} else 0 star_counts[stars] += 1 if film.date_watched: months[film.date_watched.month] += 1 companions = split_credit_names(film.watched_with) if companions: watched_with.update(companions) else: watched_with["solo"] += 1 total_watched = len(year_films) rewatched = sum(1 for film in year_films if film.rewatch or film.rewatch_count > 0) average_stars = round(sum(film.stars for film in year_films) / total_watched, 1) if total_watched else 0 top_director = None if directors: top_director_name, top_director_count = sorted(directors.items(), key=lambda item: (-item[1], item[0]))[0] top_director = {"director": top_director_name, "count": top_director_count} top_month = None if total_watched: top_month_index, top_month_count = sorted(months.items(), key=lambda item: (-item[1], item[0]))[0] top_month = { "month": month_name[top_month_index], "count": top_month_count, } year_films_sorted = sorted( year_films, key=lambda film: ( -(film.stars or 0), -(film.date_watched.toordinal()) if film.date_watched else 0, film.title.casefold(), film.id, ), ) highest_rated = [_film_highlight_payload(film) for film in year_films_sorted[:4]] first_watch = None last_watch = None if year_films: first_watch_film = min( year_films, key=lambda film: (film.date_watched or date.max, film.id), ) last_watch_film = max( year_films, key=lambda film: (film.date_watched or date.min, film.id), ) first_watch = _film_highlight_payload(first_watch_film) last_watch = _film_highlight_payload(last_watch_film) most_rewatched = None rewatched_candidates = [ film for film in year_films if film.rewatch_count > 0 or film.rewatch ] if rewatched_candidates: rewatched_candidates.sort( key=lambda film: ( -film.rewatch_count, -(film.date_watched.toordinal()) if film.date_watched else 0, film.title.casefold(), film.id, ) ) most_rewatched = _film_highlight_payload(rewatched_candidates[0]) return { "selected_year": selected_year, "available_years": available_years, "total_watched": total_watched, "average_stars": average_stars, "films_per_month": [ {"month": month_name[index], "count": months[index]} for index in range(1, 13) ], "films_per_decade": [ {"decade": decade, "count": count} for decade, count in sorted(decades.items(), key=lambda item: (item[0], -item[1])) ], "films_per_genre": [ {"genre": genre, "count": count} for genre, count in sorted(genres.items(), key=lambda item: (-item[1], item[0])) ], "star_distribution": [{"stars": stars, "count": star_counts[stars]} for stars in (0, 1, 2, 3)], "most_watched_directors": [ {"director": director, "count": count} for director, count in sorted(directors.items(), key=lambda item: (-item[1], item[0])) ], "watched_with_breakdown": [ {"watched_with": watched_with_value, "count": count} for watched_with_value, count in sorted(watched_with.items(), key=lambda item: (-item[1], item[0])) ], "rewatch_rate": { "rewatched": rewatched, "total_watched": total_watched, "rate": round(rewatched / total_watched, 4) if total_watched else 0, }, "top_director": top_director, "top_month": top_month, "highlight_films": { "highest_rated": highest_rated, "first_watch": first_watch, "last_watch": last_watch, "most_rewatched": most_rewatched, }, } @router.get("/stats") def stats_page(request: Request): return templates.TemplateResponse( request=request, name="stats.html", context={"request": request, "active_page": "stats"}, ) @router.get("/stats/data") def stats_data(db: Session = Depends(get_db)): return _build_stats_payload(_diary_films(db)) @router.get("/stats/year-in-review") def year_in_review_page( request: Request, year: int | None = None, db: Session = Depends(get_db), ): review = _year_review_payload(db, year) return templates.TemplateResponse( request=request, name="year_review.html", context={"request": request, "active_page": "stats", **review}, ) @router.get("/stats/year-in-review/data") def year_in_review_data( year: int | None = None, db: Session = Depends(get_db), ): return _year_review_payload(db, year)