summaryrefslogtreecommitdiff
path: root/services/tmdb.py
diff options
context:
space:
mode:
authorTyler Hoang <tyler@tylerhoang.xyz>2026-05-06 12:21:26 -0700
committerTyler Hoang <tyler@tylerhoang.xyz>2026-05-06 12:21:26 -0700
commite708bec6cd76c2686de4158dde4d04f72a3c300d (patch)
tree04b0bc4738e090dd7834d47478c7e652da010f92 /services/tmdb.py
init: lumiere film diary
Diffstat (limited to 'services/tmdb.py')
-rw-r--r--services/tmdb.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/services/tmdb.py b/services/tmdb.py
new file mode 100644
index 0000000..b3adf17
--- /dev/null
+++ b/services/tmdb.py
@@ -0,0 +1,226 @@
+import os
+
+import httpx
+
+TMDB_SEARCH_URL = "https://api.themoviedb.org/3/search/movie"
+TMDB_MOVIE_URL = "https://api.themoviedb.org/3/movie/{movie_id}"
+TMDB_POSTER_BASE = "https://image.tmdb.org/t/p/w500"
+
+
+class TMDBNotConfiguredError(RuntimeError):
+ pass
+
+
+def api_key() -> str:
+ key = os.getenv("TMDB_API_KEY", "").strip()
+ if not key:
+ raise TMDBNotConfiguredError("TMDB_API_KEY is not configured.")
+ return key
+
+
+def poster_url(path: str | None) -> str | None:
+ if not path:
+ return None
+ return f"{TMDB_POSTER_BASE}{path}"
+
+
+def year_from_release_date(release_date: str | None) -> int | None:
+ if not release_date:
+ return None
+ try:
+ return int(release_date[:4])
+ except ValueError:
+ return None
+
+
+def directors(movie: dict) -> str | None:
+ crew = movie.get("credits", {}).get("crew", [])
+ names = [person.get("name") for person in crew if person.get("job") == "Director"]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def countries(movie: dict) -> str | None:
+ names = [country.get("name") for country in movie.get("production_countries", [])]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def languages(movie: dict) -> str | None:
+ names = [
+ language.get("english_name") or language.get("name")
+ for language in movie.get("spoken_languages", [])
+ ]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def cast_members(movie: dict, limit: int = 5) -> list[str]:
+ cast = movie.get("credits", {}).get("cast", [])
+ names = [person.get("name") for person in cast if person.get("name")]
+ return names[:limit]
+
+
+def detail_context(movie: dict, cast_limit: int = 5) -> dict:
+ tagline = (movie.get("tagline") or "").strip() or None
+ overview = (movie.get("overview") or "").strip() or None
+ return {
+ "tagline": tagline,
+ "overview": overview,
+ "cast": cast_members(movie, cast_limit),
+ }
+
+
+def normalize_title(value: str | None) -> str:
+ return (value or "").casefold().strip()
+
+
+async def movie_detail(client: httpx.AsyncClient, movie_id: int, key: str | None = None) -> dict:
+ response = await client.get(
+ TMDB_MOVIE_URL.format(movie_id=movie_id),
+ params={"api_key": key or api_key(), "append_to_response": "credits"},
+ )
+ response.raise_for_status()
+ return response.json()
+
+
+def movie_payload(movie: dict, fallback: dict | None = None) -> dict:
+ fallback = fallback or {}
+ release_date = movie.get("release_date") or fallback.get("release_date")
+ return {
+ "tmdb_id": movie.get("id") or fallback.get("id"),
+ "title": movie.get("title") or fallback.get("title"),
+ "original_title": movie.get("original_title") or fallback.get("original_title"),
+ "year": year_from_release_date(release_date),
+ "release_date": release_date,
+ "director": directors(movie),
+ "country": countries(movie),
+ "language": languages(movie),
+ "runtime": movie.get("runtime"),
+ "overview": movie.get("overview") or fallback.get("overview"),
+ "poster_url": poster_url(movie.get("poster_path") or fallback.get("poster_path")),
+ }
+
+
+def best_search_result(results: list[dict], title: str, year: int | None = None) -> dict | None:
+ if not results:
+ return None
+
+ wanted_title = normalize_title(title)
+ exact_title = [
+ result
+ for result in results
+ if wanted_title
+ and wanted_title
+ in {normalize_title(result.get("title")), normalize_title(result.get("original_title"))}
+ ]
+ candidates = exact_title or results
+
+ if year is not None:
+ same_year = [
+ result
+ for result in candidates
+ if year_from_release_date(result.get("release_date")) == year
+ ]
+ if same_year:
+ return same_year[0]
+
+ return candidates[0]
+
+
+async def search_movies(
+ query: str,
+ *,
+ year: int | None = None,
+ limit: int = 8,
+ include_details: bool = True,
+ client: httpx.AsyncClient | None = None,
+) -> list[dict]:
+ key = api_key()
+ owns_client = client is None
+ active_client = client or httpx.AsyncClient(timeout=10.0)
+ try:
+ params = {
+ "api_key": key,
+ "query": query,
+ "include_adult": "false",
+ }
+ if year:
+ params["year"] = str(year)
+
+ response = await active_client.get(TMDB_SEARCH_URL, params=params)
+ response.raise_for_status()
+ search_results = response.json().get("results", [])[:limit]
+
+ results = []
+ for item in search_results:
+ movie_id = item.get("id")
+ if not movie_id:
+ continue
+ detail = item
+ if include_details:
+ try:
+ detail = await movie_detail(active_client, movie_id, key)
+ except httpx.HTTPError:
+ detail = item
+ results.append(movie_payload(detail, item))
+ return results
+ finally:
+ if owns_client:
+ await active_client.aclose()
+
+
+async def find_movie(
+ title: str,
+ *,
+ year: int | None = None,
+ client: httpx.AsyncClient | None = None,
+) -> dict | None:
+ if not title.strip():
+ return None
+
+ key = api_key()
+ owns_client = client is None
+ active_client = client or httpx.AsyncClient(timeout=10.0)
+ try:
+ params = {
+ "api_key": key,
+ "query": title,
+ "include_adult": "false",
+ }
+ if year:
+ params["year"] = str(year)
+
+ response = await active_client.get(TMDB_SEARCH_URL, params=params)
+ response.raise_for_status()
+ search_results = response.json().get("results", [])
+ match = best_search_result(search_results, title, year)
+ if not match:
+ return None
+
+ detail = await movie_detail(active_client, match["id"], key)
+ return movie_payload(detail, match)
+ finally:
+ if owns_client:
+ await active_client.aclose()
+
+
+def apply_metadata_to_film(film, metadata: dict) -> bool:
+ changed = False
+ fill_if_empty = {
+ "tmdb_id": metadata.get("tmdb_id"),
+ "poster_url": metadata.get("poster_url"),
+ "original_title": metadata.get("original_title"),
+ "director": metadata.get("director"),
+ "year": metadata.get("year"),
+ "country": metadata.get("country"),
+ "language": metadata.get("language"),
+ "runtime": metadata.get("runtime"),
+ }
+
+ for field, value in fill_if_empty.items():
+ if value is not None and not getattr(film, field):
+ setattr(film, field, value)
+ changed = True
+
+ return changed