summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/__init__.py1
-rw-r--r--services/countries.py67
-rw-r--r--services/film_people.py17
-rw-r--r--services/tmdb.py226
4 files changed, 311 insertions, 0 deletions
diff --git a/services/__init__.py b/services/__init__.py
new file mode 100644
index 0000000..1ac9669
--- /dev/null
+++ b/services/__init__.py
@@ -0,0 +1 @@
+"""Service helpers for Lumiere."""
diff --git a/services/countries.py b/services/countries.py
new file mode 100644
index 0000000..8681e81
--- /dev/null
+++ b/services/countries.py
@@ -0,0 +1,67 @@
+COUNTRY_NAME_TO_ISO_NUMERIC = {
+ "argentina": 32,
+ "australia": 36,
+ "belgium": 56,
+ "brazil": 76,
+ "bulgaria": 100,
+ "canada": 124,
+ "china": 156,
+ "czech republic": 203,
+ "denmark": 208,
+ "finland": 246,
+ "france": 250,
+ "germany": 276,
+ "hong kong": 344,
+ "hungary": 348,
+ "india": 356,
+ "iran": 364,
+ "ireland": 372,
+ "italy": 380,
+ "japan": 392,
+ "luxembourg": 442,
+ "netherlands": 528,
+ "new zealand": 554,
+ "norway": 578,
+ "northern ireland": 826,
+ "russia": 643,
+ "soviet union": 643,
+ "spain": 724,
+ "south africa": 710,
+ "south korea": 410,
+ "korea, south": 410,
+ "republic of korea": 410,
+ "sweden": 752,
+ "taiwan": 158,
+ "thailand": 764,
+ "united arab emirates": 784,
+ "united kingdom": 826,
+ "uk": 826,
+ "great britain": 826,
+ "england": 826,
+ "scotland": 826,
+ "united states": 840,
+ "united states of america": 840,
+ "usa": 840,
+}
+
+ISO_NUMERIC_TO_COUNTRY_NAME = {
+ value: key.title() for key, value in COUNTRY_NAME_TO_ISO_NUMERIC.items()
+}
+ISO_NUMERIC_TO_COUNTRY_NAME[410] = "South Korea"
+ISO_NUMERIC_TO_COUNTRY_NAME[643] = "Russia"
+ISO_NUMERIC_TO_COUNTRY_NAME[826] = "United Kingdom"
+ISO_NUMERIC_TO_COUNTRY_NAME[840] = "United States of America"
+
+
+def split_country_names(value: str | None) -> list[str]:
+ if not value:
+ return []
+
+ normalized = value.replace(";", ",")
+ return [item.strip() for item in normalized.split(",") if item.strip()]
+
+
+def country_name_to_iso_numeric(value: str | None) -> int | None:
+ if not value:
+ return None
+ return COUNTRY_NAME_TO_ISO_NUMERIC.get(value.casefold().strip())
diff --git a/services/film_people.py b/services/film_people.py
new file mode 100644
index 0000000..ef0e2ff
--- /dev/null
+++ b/services/film_people.py
@@ -0,0 +1,17 @@
+from urllib.parse import quote
+
+
+def split_credit_names(value: str | None) -> list[str]:
+ if not value:
+ return []
+
+ normalized = value.replace(";", ",")
+ return [item.strip() for item in normalized.split(",") if item.strip()]
+
+
+def normalize_name(value: str) -> str:
+ return value.casefold().strip()
+
+
+def director_href(name: str) -> str:
+ return f"/director/{quote(name, safe='')}"
diff --git a/services/tmdb.py b/services/tmdb.py
new file mode 100644
index 0000000..b3adf17
--- /dev/null
+++ b/services/tmdb.py
@@ -0,0 +1,226 @@
+import os
+
+import httpx
+
+TMDB_SEARCH_URL = "https://api.themoviedb.org/3/search/movie"
+TMDB_MOVIE_URL = "https://api.themoviedb.org/3/movie/{movie_id}"
+TMDB_POSTER_BASE = "https://image.tmdb.org/t/p/w500"
+
+
+class TMDBNotConfiguredError(RuntimeError):
+ pass
+
+
+def api_key() -> str:
+ key = os.getenv("TMDB_API_KEY", "").strip()
+ if not key:
+ raise TMDBNotConfiguredError("TMDB_API_KEY is not configured.")
+ return key
+
+
+def poster_url(path: str | None) -> str | None:
+ if not path:
+ return None
+ return f"{TMDB_POSTER_BASE}{path}"
+
+
+def year_from_release_date(release_date: str | None) -> int | None:
+ if not release_date:
+ return None
+ try:
+ return int(release_date[:4])
+ except ValueError:
+ return None
+
+
+def directors(movie: dict) -> str | None:
+ crew = movie.get("credits", {}).get("crew", [])
+ names = [person.get("name") for person in crew if person.get("job") == "Director"]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def countries(movie: dict) -> str | None:
+ names = [country.get("name") for country in movie.get("production_countries", [])]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def languages(movie: dict) -> str | None:
+ names = [
+ language.get("english_name") or language.get("name")
+ for language in movie.get("spoken_languages", [])
+ ]
+ names = [name for name in names if name]
+ return ", ".join(names) if names else None
+
+
+def cast_members(movie: dict, limit: int = 5) -> list[str]:
+ cast = movie.get("credits", {}).get("cast", [])
+ names = [person.get("name") for person in cast if person.get("name")]
+ return names[:limit]
+
+
+def detail_context(movie: dict, cast_limit: int = 5) -> dict:
+ tagline = (movie.get("tagline") or "").strip() or None
+ overview = (movie.get("overview") or "").strip() or None
+ return {
+ "tagline": tagline,
+ "overview": overview,
+ "cast": cast_members(movie, cast_limit),
+ }
+
+
+def normalize_title(value: str | None) -> str:
+ return (value or "").casefold().strip()
+
+
+async def movie_detail(client: httpx.AsyncClient, movie_id: int, key: str | None = None) -> dict:
+ response = await client.get(
+ TMDB_MOVIE_URL.format(movie_id=movie_id),
+ params={"api_key": key or api_key(), "append_to_response": "credits"},
+ )
+ response.raise_for_status()
+ return response.json()
+
+
+def movie_payload(movie: dict, fallback: dict | None = None) -> dict:
+ fallback = fallback or {}
+ release_date = movie.get("release_date") or fallback.get("release_date")
+ return {
+ "tmdb_id": movie.get("id") or fallback.get("id"),
+ "title": movie.get("title") or fallback.get("title"),
+ "original_title": movie.get("original_title") or fallback.get("original_title"),
+ "year": year_from_release_date(release_date),
+ "release_date": release_date,
+ "director": directors(movie),
+ "country": countries(movie),
+ "language": languages(movie),
+ "runtime": movie.get("runtime"),
+ "overview": movie.get("overview") or fallback.get("overview"),
+ "poster_url": poster_url(movie.get("poster_path") or fallback.get("poster_path")),
+ }
+
+
+def best_search_result(results: list[dict], title: str, year: int | None = None) -> dict | None:
+ if not results:
+ return None
+
+ wanted_title = normalize_title(title)
+ exact_title = [
+ result
+ for result in results
+ if wanted_title
+ and wanted_title
+ in {normalize_title(result.get("title")), normalize_title(result.get("original_title"))}
+ ]
+ candidates = exact_title or results
+
+ if year is not None:
+ same_year = [
+ result
+ for result in candidates
+ if year_from_release_date(result.get("release_date")) == year
+ ]
+ if same_year:
+ return same_year[0]
+
+ return candidates[0]
+
+
+async def search_movies(
+ query: str,
+ *,
+ year: int | None = None,
+ limit: int = 8,
+ include_details: bool = True,
+ client: httpx.AsyncClient | None = None,
+) -> list[dict]:
+ key = api_key()
+ owns_client = client is None
+ active_client = client or httpx.AsyncClient(timeout=10.0)
+ try:
+ params = {
+ "api_key": key,
+ "query": query,
+ "include_adult": "false",
+ }
+ if year:
+ params["year"] = str(year)
+
+ response = await active_client.get(TMDB_SEARCH_URL, params=params)
+ response.raise_for_status()
+ search_results = response.json().get("results", [])[:limit]
+
+ results = []
+ for item in search_results:
+ movie_id = item.get("id")
+ if not movie_id:
+ continue
+ detail = item
+ if include_details:
+ try:
+ detail = await movie_detail(active_client, movie_id, key)
+ except httpx.HTTPError:
+ detail = item
+ results.append(movie_payload(detail, item))
+ return results
+ finally:
+ if owns_client:
+ await active_client.aclose()
+
+
+async def find_movie(
+ title: str,
+ *,
+ year: int | None = None,
+ client: httpx.AsyncClient | None = None,
+) -> dict | None:
+ if not title.strip():
+ return None
+
+ key = api_key()
+ owns_client = client is None
+ active_client = client or httpx.AsyncClient(timeout=10.0)
+ try:
+ params = {
+ "api_key": key,
+ "query": title,
+ "include_adult": "false",
+ }
+ if year:
+ params["year"] = str(year)
+
+ response = await active_client.get(TMDB_SEARCH_URL, params=params)
+ response.raise_for_status()
+ search_results = response.json().get("results", [])
+ match = best_search_result(search_results, title, year)
+ if not match:
+ return None
+
+ detail = await movie_detail(active_client, match["id"], key)
+ return movie_payload(detail, match)
+ finally:
+ if owns_client:
+ await active_client.aclose()
+
+
+def apply_metadata_to_film(film, metadata: dict) -> bool:
+ changed = False
+ fill_if_empty = {
+ "tmdb_id": metadata.get("tmdb_id"),
+ "poster_url": metadata.get("poster_url"),
+ "original_title": metadata.get("original_title"),
+ "director": metadata.get("director"),
+ "year": metadata.get("year"),
+ "country": metadata.get("country"),
+ "language": metadata.get("language"),
+ "runtime": metadata.get("runtime"),
+ }
+
+ for field, value in fill_if_empty.items():
+ if value is not None and not getattr(film, field):
+ setattr(film, field, value)
+ changed = True
+
+ return changed