import os import httpx TMDB_SEARCH_URL = "https://api.themoviedb.org/3/search/movie" TMDB_MOVIE_URL = "https://api.themoviedb.org/3/movie/{movie_id}" TMDB_POSTER_BASE = "https://image.tmdb.org/t/p/w500" class TMDBNotConfiguredError(RuntimeError): pass def api_key() -> str: key = os.getenv("TMDB_API_KEY", "").strip() if not key: raise TMDBNotConfiguredError("TMDB_API_KEY is not configured.") return key def poster_url(path: str | None) -> str | None: if not path: return None return f"{TMDB_POSTER_BASE}{path}" def year_from_release_date(release_date: str | None) -> int | None: if not release_date: return None try: return int(release_date[:4]) except ValueError: return None def directors(movie: dict) -> str | None: crew = movie.get("credits", {}).get("crew", []) names = [person.get("name") for person in crew if person.get("job") == "Director"] names = [name for name in names if name] return ", ".join(names) if names else None def countries(movie: dict) -> str | None: names = [country.get("name") for country in movie.get("production_countries", [])] names = [name for name in names if name] return ", ".join(names) if names else None def languages(movie: dict) -> str | None: names = [ language.get("english_name") or language.get("name") for language in movie.get("spoken_languages", []) ] names = [name for name in names if name] return ", ".join(names) if names else None def cast_members(movie: dict, limit: int = 5) -> list[str]: cast = movie.get("credits", {}).get("cast", []) names = [person.get("name") for person in cast if person.get("name")] return names[:limit] def detail_context(movie: dict, cast_limit: int = 5) -> dict: tagline = (movie.get("tagline") or "").strip() or None overview = (movie.get("overview") or "").strip() or None return { "tagline": tagline, "overview": overview, "cast": cast_members(movie, cast_limit), } def normalize_title(value: str | None) -> str: return (value or "").casefold().strip() async def movie_detail(client: httpx.AsyncClient, movie_id: int, key: str | None = None) -> dict: response = await client.get( TMDB_MOVIE_URL.format(movie_id=movie_id), params={"api_key": key or api_key(), "append_to_response": "credits"}, ) response.raise_for_status() return response.json() async def movie_images(client: httpx.AsyncClient, movie_id: int) -> list[str]: """Return list of poster URLs for a TMDB movie ID.""" try: url = f"https://api.themoviedb.org/3/movie/{movie_id}/images" response = await client.get( url, params={"api_key": api_key(), "include_image_language": "en,null"}, ) response.raise_for_status() data = response.json() except httpx.HTTPError: return [] return [ poster_url(p["file_path"]) for p in data.get("posters", []) if p.get("file_path") ] def movie_payload(movie: dict, fallback: dict | None = None) -> dict: fallback = fallback or {} release_date = movie.get("release_date") or fallback.get("release_date") return { "tmdb_id": movie.get("id") or fallback.get("id"), "title": movie.get("title") or fallback.get("title"), "original_title": movie.get("original_title") or fallback.get("original_title"), "year": year_from_release_date(release_date), "release_date": release_date, "director": directors(movie), "country": countries(movie), "language": languages(movie), "runtime": movie.get("runtime"), "overview": movie.get("overview") or fallback.get("overview"), "poster_url": poster_url(movie.get("poster_path") or fallback.get("poster_path")), } def best_search_result(results: list[dict], title: str, year: int | None = None) -> dict | None: if not results: return None wanted_title = normalize_title(title) exact_title = [ result for result in results if wanted_title and wanted_title in {normalize_title(result.get("title")), normalize_title(result.get("original_title"))} ] candidates = exact_title or results if year is not None: same_year = [ result for result in candidates if year_from_release_date(result.get("release_date")) == year ] if same_year: return same_year[0] return candidates[0] async def search_movies( query: str, *, year: int | None = None, limit: int = 8, include_details: bool = True, client: httpx.AsyncClient | None = None, ) -> list[dict]: key = api_key() owns_client = client is None active_client = client or httpx.AsyncClient(timeout=10.0) try: params = { "api_key": key, "query": query, "include_adult": "false", } if year: params["year"] = str(year) response = await active_client.get(TMDB_SEARCH_URL, params=params) response.raise_for_status() search_results = response.json().get("results", [])[:limit] results = [] for item in search_results: movie_id = item.get("id") if not movie_id: continue detail = item if include_details: try: detail = await movie_detail(active_client, movie_id, key) except httpx.HTTPError: detail = item results.append(movie_payload(detail, item)) return results finally: if owns_client: await active_client.aclose() async def find_movie( title: str, *, year: int | None = None, client: httpx.AsyncClient | None = None, ) -> dict | None: if not title.strip(): return None key = api_key() owns_client = client is None active_client = client or httpx.AsyncClient(timeout=10.0) try: params = { "api_key": key, "query": title, "include_adult": "false", } if year: params["year"] = str(year) response = await active_client.get(TMDB_SEARCH_URL, params=params) response.raise_for_status() search_results = response.json().get("results", []) match = best_search_result(search_results, title, year) if not match: return None detail = await movie_detail(active_client, match["id"], key) return movie_payload(detail, match) finally: if owns_client: await active_client.aclose() async def director_info(director_name: str, key: str | None = None, client: httpx.AsyncClient | None = None) -> dict: """Search for a director by name and return their image and biography.""" try: active_client = client owns_client = False if not active_client: active_client = httpx.AsyncClient() owns_client = True response = await active_client.get( "https://api.themoviedb.org/3/search/person", params={"api_key": key or api_key(), "query": director_name}, ) response.raise_for_status() results = response.json().get("results", []) if not results: return {"image": None, "biography": None} person_id = results[0].get("id") image = poster_url(results[0].get("profile_path")) # Fetch full person details to get biography biography = None if person_id: detail_response = await active_client.get( f"https://api.themoviedb.org/3/person/{person_id}", params={"api_key": key or api_key()}, ) if detail_response.status_code == 200: person_detail = detail_response.json() biography = (person_detail.get("biography") or "").strip() or None return {"image": image, "biography": biography} except Exception: return {"image": None, "biography": None} finally: if owns_client: await active_client.aclose() def apply_metadata_to_film(film, metadata: dict) -> bool: changed = False fill_if_empty = { "tmdb_id": metadata.get("tmdb_id"), "poster_url": metadata.get("poster_url"), "original_title": metadata.get("original_title"), "director": metadata.get("director"), "year": metadata.get("year"), "country": metadata.get("country"), "language": metadata.get("language"), "runtime": metadata.get("runtime"), } for field, value in fill_if_empty.items(): if value is not None and not getattr(film, field): setattr(film, field, value) changed = True return changed