summaryrefslogtreecommitdiff
path: root/backend/app/services/data_service.py
diff options
context:
space:
mode:
authorTyler Hoang <tyler@tylerhoang.xyz>2026-05-17 12:46:13 -0700
committerTyler Hoang <tyler@tylerhoang.xyz>2026-05-17 12:46:13 -0700
commit1482422f2f5b236cdcdff4429ae06bb55dca4083 (patch)
tree4653cb4986a8a138f84dbec934effb0d011751d3 /backend/app/services/data_service.py
Add stack start and stop scripts
Diffstat (limited to 'backend/app/services/data_service.py')
-rw-r--r--backend/app/services/data_service.py605
1 files changed, 605 insertions, 0 deletions
diff --git a/backend/app/services/data_service.py b/backend/app/services/data_service.py
new file mode 100644
index 0000000..ae078cd
--- /dev/null
+++ b/backend/app/services/data_service.py
@@ -0,0 +1,605 @@
+"""yfinance wrapper for Prism v2 Overview data."""
+from __future__ import annotations
+
+import math
+import os
+from typing import Any
+
+import httpx
+import pandas as pd
+import yfinance as yf
+from cachetools import TTLCache, cached
+
+SEARCH_CACHE = TTLCache(maxsize=128, ttl=60)
+INFO_CACHE = TTLCache(maxsize=256, ttl=300)
+FAST_INFO_CACHE = TTLCache(maxsize=256, ttl=300)
+PROFILE_ENRICH_CACHE = TTLCache(maxsize=256, ttl=300)
+PRICE_CACHE = TTLCache(maxsize=256, ttl=300)
+HISTORY_CACHE = TTLCache(maxsize=256, ttl=300)
+INTRADAY_CACHE = TTLCache(maxsize=128, ttl=60)
+MARKET_CACHE = TTLCache(maxsize=8, ttl=300)
+
+PERIODS = {"1m", "3m", "6m", "1y", "5y"}
+YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "5y": "5y"}
+_XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"}
+
+
+def normalize_symbol(symbol: str) -> str:
+ return str(symbol or "").strip().upper()
+
+
+def _safe_float(value: Any) -> float | None:
+ try:
+ n = float(value)
+ except (TypeError, ValueError):
+ return None
+ if math.isnan(n) or math.isinf(n):
+ return None
+ return n
+
+
+def _safe_int(value: Any) -> int | None:
+ n = _safe_float(value)
+ return int(round(n)) if n is not None else None
+
+
+def _json_value(value: Any) -> Any:
+ if value is None:
+ return None
+ if isinstance(value, pd.Timestamp):
+ return value.isoformat()
+ if pd.isna(value):
+ return None
+ if hasattr(value, "item"):
+ return _json_value(value.item())
+ return value
+
+
+def _pick_search_match(symbol: str) -> dict[str, Any]:
+ sym = normalize_symbol(symbol)
+ results = search_tickers(sym)
+ for row in results:
+ if normalize_symbol(row.get("symbol")) == sym:
+ return row
+ return {}
+
+
+@cached(SEARCH_CACHE)
+def search_tickers(query: str) -> list[dict[str, Any]]:
+ """Search for tickers by company name or symbol."""
+ q = str(query or "").strip()
+ if len(q) < 2:
+ return []
+ try:
+ results = yf.Search(q, max_results=8).quotes
+ out: list[dict[str, Any]] = []
+ for row in results:
+ symbol = row.get("symbol", "")
+ if not symbol:
+ continue
+ out.append(
+ {
+ "symbol": normalize_symbol(symbol),
+ "name": row.get("longname") or row.get("shortname") or symbol,
+ "exchange": row.get("exchange") or row.get("exchDisp") or None,
+ }
+ )
+ return out
+ except Exception:
+ return []
+
+
+@cached(INFO_CACHE)
+def get_company_info(symbol: str) -> dict[str, Any]:
+ """Return a JSON-safe company info dict from yfinance."""
+ sym = normalize_symbol(symbol)
+ try:
+ info = yf.Ticker(sym).info or {}
+ if not isinstance(info, dict):
+ return {}
+ cleaned = {str(k): _json_value(v) for k, v in info.items()}
+ return cleaned
+ except Exception:
+ return {}
+
+
+@cached(FAST_INFO_CACHE)
+def get_fast_info(symbol: str) -> dict[str, Any]:
+ """Return a JSON-safe subset of yfinance fast_info."""
+ sym = normalize_symbol(symbol)
+ try:
+ fast_info = yf.Ticker(sym).fast_info
+ keys = [
+ "currency",
+ "dayHigh",
+ "dayLow",
+ "exchange",
+ "fiftyDayAverage",
+ "lastPrice",
+ "lastVolume",
+ "marketCap",
+ "open",
+ "previousClose",
+ "regularMarketPreviousClose",
+ "shares",
+ "tenDayAverageVolume",
+ "threeMonthAverageVolume",
+ "timezone",
+ "twoHundredDayAverage",
+ "yearChange",
+ "yearHigh",
+ "yearLow",
+ ]
+ return {key: _json_value(fast_info.get(key)) for key in keys}
+ except Exception:
+ return {}
+
+
+@cached(PRICE_CACHE)
+def get_latest_price(symbol: str) -> float | None:
+ """Return latest close price, falling back to quote fields in info."""
+ sym = normalize_symbol(symbol)
+ try:
+ hist = yf.Ticker(sym).history(period="5d")
+ if hist is not None and not hist.empty and "Close" in hist.columns:
+ close = pd.to_numeric(hist["Close"], errors="coerce").dropna()
+ if not close.empty:
+ return _safe_float(close.iloc[-1])
+ info = get_company_info(sym)
+ for key in ("currentPrice", "regularMarketPrice", "previousClose"):
+ price = _safe_float(info.get(key))
+ if price is not None:
+ return price
+ return None
+ except Exception:
+ return None
+
+
+@cached(HISTORY_CACHE)
+def get_price_history(symbol: str, period: str = "1y") -> list[dict[str, Any]]:
+ """Return JSON-safe OHLCV history."""
+ if period not in PERIODS:
+ period = "1y"
+ try:
+ df = yf.Ticker(normalize_symbol(symbol)).history(period=YF_PERIOD_MAP[period])
+ if df is None or df.empty:
+ return []
+ df.index = pd.to_datetime(df.index)
+ return _history_rows(df, include_time=False)
+ except Exception:
+ return []
+
+
+@cached(INTRADAY_CACHE)
+def get_intraday_history(symbol: str, period: str, interval: str) -> list[dict[str, Any]]:
+ """Return intraday JSON-safe OHLCV history."""
+ try:
+ df = yf.Ticker(normalize_symbol(symbol)).history(period=period, interval=interval)
+ if df is None or df.empty:
+ return []
+ df.index = pd.to_datetime(df.index)
+ try:
+ df = df.between_time("09:30", "16:00")
+ except Exception:
+ pass
+ return _history_rows(df, include_time=True)
+ except Exception:
+ return []
+
+
+def _history_rows(df: pd.DataFrame, include_time: bool) -> list[dict[str, Any]]:
+ rows: list[dict[str, Any]] = []
+ for idx, row in df.iterrows():
+ dt = pd.Timestamp(idx)
+ date = dt.strftime("%Y-%m-%dT%H:%M:%S") if include_time else dt.strftime("%Y-%m-%d")
+ rows.append(
+ {
+ "date": date,
+ "open": _safe_float(row.get("Open")),
+ "high": _safe_float(row.get("High")),
+ "low": _safe_float(row.get("Low")),
+ "close": _safe_float(row.get("Close")),
+ "volume": _safe_float(row.get("Volume")),
+ }
+ )
+ return rows
+
+
+@cached(MARKET_CACHE)
+def get_market_indices() -> list[dict[str, Any]]:
+ """Return latest price and day change percent for major indices."""
+ symbols = {
+ "S&P 500": "^GSPC",
+ "NASDAQ": "^IXIC",
+ "DOW": "^DJI",
+ "VIX": "^VIX",
+ }
+ result: list[dict[str, Any]] = []
+ for name, sym in symbols.items():
+ price: float | None = None
+ pct_change: float | None = None
+ try:
+ hist = yf.Ticker(sym).history(period="2d")
+ if len(hist) >= 2:
+ prev_close = _safe_float(hist["Close"].iloc[-2])
+ last = _safe_float(hist["Close"].iloc[-1])
+ if prev_close and last is not None:
+ price = last
+ pct_change = (last - prev_close) / prev_close
+ elif len(hist) == 1:
+ price = _safe_float(hist["Close"].iloc[-1])
+ pct_change = 0.0
+ except Exception:
+ pass
+ result.append({"name": name, "price": price, "change_pct": pct_change})
+ return result
+
+
+def build_quote(info: dict[str, Any], symbol: str) -> dict[str, Any]:
+ price = _safe_float(info.get("currentPrice") or info.get("regularMarketPrice")) or get_latest_price(symbol)
+ prev_close = _safe_float(info.get("regularMarketPreviousClose") or info.get("previousClose"))
+ change = None
+ change_pct = None
+ if price is not None and prev_close and prev_close > 0:
+ change = price - prev_close
+ change_pct = change / prev_close
+ return {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct}
+
+
+def build_signals(info: dict[str, Any]) -> list[dict[str, str]]:
+ signals: list[dict[str, str]] = []
+ pe = _safe_float(info.get("trailingPE"))
+ if pe is not None and pe > 0:
+ if pe < 15:
+ signals.append({"key": "Valuation", "state": "pos", "value": f"P/E {pe:.1f}x", "description": "Attractive multiple"})
+ elif pe < 30:
+ signals.append({"key": "Valuation", "state": "warn", "value": f"P/E {pe:.1f}x", "description": "Middle of range"})
+ else:
+ signals.append({"key": "Valuation", "state": "neg", "value": f"P/E {pe:.1f}x", "description": "Premium multiple"})
+ else:
+ signals.append({"key": "Valuation", "state": "neu", "value": "P/E unavailable", "description": "No trailing earnings"})
+
+ _ratio_signal(signals, "Growth", info.get("revenueGrowth"), 0.10, 0.0, "Strong top-line growth", "Low but positive growth", "Contracting revenue")
+ _ratio_signal(signals, "Profit", info.get("profitMargins"), 0.15, 0.05, "High net margin", "Moderate net margin", "Thin or negative margin")
+
+ debt_to_equity = _safe_float(info.get("debtToEquity"))
+ if debt_to_equity is not None:
+ de_x = debt_to_equity / 100.0
+ if de_x < 0.5:
+ state, desc = "pos", "Low leverage"
+ elif de_x < 2.0:
+ state, desc = "warn", "Moderate leverage"
+ else:
+ state, desc = "neg", "High leverage"
+ signals.append({"key": "Leverage", "state": state, "value": f"D/E {de_x:.2f}x", "description": desc})
+
+ return signals
+
+
+def _ratio_signal(
+ signals: list[dict[str, str]],
+ key: str,
+ value: Any,
+ positive_threshold: float,
+ warn_threshold: float,
+ positive_desc: str,
+ warn_desc: str,
+ negative_desc: str,
+) -> None:
+ ratio = _safe_float(value)
+ if ratio is None:
+ return
+ if ratio > positive_threshold:
+ state, desc = "pos", positive_desc
+ elif ratio >= warn_threshold:
+ state, desc = "warn", warn_desc
+ else:
+ state, desc = "neg", negative_desc
+ signals.append({"key": key, "state": state, "value": f"{ratio * 100:+.0f}%" if key == "Growth" else f"{ratio * 100:.0f}%", "description": desc})
+
+
+def _field(source_map: dict[str, dict[str, Any]], field_sources: dict[str, str], name: str, *candidates: tuple[str, str]) -> Any:
+ for source_name, key in candidates:
+ source = source_map.get(source_name) or {}
+ value = source.get(key)
+ if value is None:
+ continue
+ if isinstance(value, str) and not value.strip():
+ continue
+ field_sources[name] = source_name
+ return value
+ return None
+
+
+def _history_snapshot(history: list[dict[str, Any]]) -> dict[str, Any]:
+ if not history:
+ return {}
+ closes = [_safe_float(row.get("close")) for row in history]
+ closes = [value for value in closes if value is not None]
+ volumes = [_safe_float(row.get("volume")) for row in history]
+ volumes = [value for value in volumes if value is not None]
+ latest = history[-1]
+ previous = history[-2] if len(history) > 1 else None
+ return {
+ "lastPrice": _safe_float(latest.get("close")),
+ "previousClose": _safe_float(previous.get("close")) if previous else None,
+ "lastVolume": _safe_float(latest.get("volume")),
+ "yearHigh": max(closes) if closes else None,
+ "yearLow": min(closes) if closes else None,
+ "averageVolume": (sum(volumes) / len(volumes)) if volumes else None,
+ }
+
+
+@cached(PROFILE_ENRICH_CACHE)
+def get_profile_enrichment(symbol: str) -> dict[str, Any]:
+ sym = normalize_symbol(symbol)
+ fmp_key = os.getenv("FMP_API_KEY")
+ if fmp_key:
+ try:
+ with httpx.Client(timeout=3.0) as client:
+ res = client.get(
+ "https://financialmodelingprep.com/api/v3/profile/" + sym,
+ params={"apikey": fmp_key},
+ )
+ rows = res.json()
+ if isinstance(rows, list) and rows:
+ row = rows[0] or {}
+ return {
+ "sector": row.get("sector"),
+ "industry": row.get("industry"),
+ "website": row.get("website"),
+ "summary": row.get("description"),
+ }
+ except Exception:
+ pass
+ finnhub_key = os.getenv("FINNHUB_API_KEY")
+ if finnhub_key:
+ try:
+ with httpx.Client(timeout=3.0) as client:
+ res = client.get(
+ "https://finnhub.io/api/v1/stock/profile2",
+ params={"symbol": sym, "token": finnhub_key},
+ )
+ row = res.json()
+ if isinstance(row, dict) and row:
+ return {
+ "industry": row.get("finnhubIndustry"),
+ "website": row.get("weburl"),
+ "name": row.get("name"),
+ "exchange": row.get("exchange"),
+ }
+ except Exception:
+ pass
+ return {}
+
+
+def _build_profile(sym: str, info: dict[str, Any], fast_info: dict[str, Any], search_match: dict[str, Any], field_sources: dict[str, str]) -> dict[str, Any]:
+ enrichment = get_profile_enrichment(sym)
+ source_map = {
+ "info": info,
+ "fast_info": fast_info,
+ "search": search_match,
+ "enrichment": enrichment,
+ }
+ name = _field(
+ source_map,
+ field_sources,
+ "profile.name",
+ ("info", "longName"),
+ ("info", "shortName"),
+ ("enrichment", "name"),
+ ("search", "name"),
+ )
+ exchange = _field(
+ source_map,
+ field_sources,
+ "profile.exchange",
+ ("info", "exchange"),
+ ("enrichment", "exchange"),
+ ("fast_info", "exchange"),
+ ("search", "exchange"),
+ )
+ if exchange is not None:
+ exchange = _XMAP.get(str(exchange), exchange)
+ return {
+ "symbol": sym,
+ "name": str(name or sym),
+ "sector": _field(source_map, field_sources, "profile.sector", ("info", "sector"), ("enrichment", "sector")),
+ "industry": _field(source_map, field_sources, "profile.industry", ("info", "industry"), ("enrichment", "industry")),
+ "exchange": exchange,
+ "website": _field(source_map, field_sources, "profile.website", ("info", "website"), ("enrichment", "website")),
+ "summary": _field(source_map, field_sources, "profile.summary", ("info", "longBusinessSummary"), ("enrichment", "summary")),
+ }
+
+
+def _build_quote_and_stats(
+ info: dict[str, Any],
+ fast_info: dict[str, Any],
+ month_history: list[dict[str, Any]],
+ year_history: list[dict[str, Any]],
+ field_sources: dict[str, str],
+) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
+ month_snapshot = _history_snapshot(month_history)
+ year_snapshot = _history_snapshot(year_history)
+ source_map = {
+ "info": info,
+ "fast_info": fast_info,
+ "history_recent": month_snapshot,
+ "history_year": year_snapshot,
+ }
+
+ price = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "quote.price",
+ ("info", "currentPrice"),
+ ("info", "regularMarketPrice"),
+ ("fast_info", "lastPrice"),
+ ("history_recent", "lastPrice"),
+ )
+ )
+ prev_close = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "quote.prev_close",
+ ("info", "regularMarketPreviousClose"),
+ ("info", "previousClose"),
+ ("fast_info", "regularMarketPreviousClose"),
+ ("fast_info", "previousClose"),
+ ("history_recent", "previousClose"),
+ )
+ )
+ change = None
+ change_pct = None
+ if price is not None and prev_close is not None and prev_close > 0:
+ change = price - prev_close
+ change_pct = change / prev_close
+
+ volume = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "stats.volume",
+ ("info", "volume"),
+ ("fast_info", "lastVolume"),
+ ("history_recent", "lastVolume"),
+ )
+ )
+ average_volume = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "stats.average_volume",
+ ("info", "averageVolume"),
+ ("fast_info", "threeMonthAverageVolume"),
+ ("fast_info", "tenDayAverageVolume"),
+ ("history_recent", "averageVolume"),
+ )
+ )
+ market_cap = _safe_float(_field(source_map, field_sources, "stats.market_cap", ("info", "marketCap"), ("fast_info", "marketCap")))
+ trailing_pe = _safe_float(_field(source_map, field_sources, "stats.trailing_pe", ("info", "trailingPE")))
+ trailing_eps = _safe_float(_field(source_map, field_sources, "stats.trailing_eps", ("info", "trailingEps")))
+ beta = _safe_float(_field(source_map, field_sources, "stats.beta", ("info", "beta")))
+ range_low = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "range_52w.low",
+ ("info", "fiftyTwoWeekLow"),
+ ("fast_info", "yearLow"),
+ ("history_year", "yearLow"),
+ )
+ )
+ range_high = _safe_float(
+ _field(
+ source_map,
+ field_sources,
+ "range_52w.high",
+ ("info", "fiftyTwoWeekHigh"),
+ ("fast_info", "yearHigh"),
+ ("history_year", "yearHigh"),
+ )
+ )
+
+ return (
+ {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct},
+ {
+ "market_cap": market_cap,
+ "trailing_pe": trailing_pe,
+ "trailing_eps": trailing_eps,
+ "volume": volume,
+ "average_volume": average_volume,
+ "beta": beta,
+ },
+ {
+ "low": range_low,
+ "high": range_high,
+ "price": price,
+ },
+ )
+
+
+def _has_any_overview_data(
+ profile: dict[str, Any],
+ quote: dict[str, Any],
+ stats: dict[str, Any],
+ range_52w: dict[str, Any],
+ short_interest: dict[str, Any],
+ field_sources: dict[str, str],
+) -> bool:
+ for bucket in (profile, quote, stats, range_52w, short_interest):
+ for key, value in bucket.items():
+ if key == "symbol":
+ continue
+ if key == "name" and bucket is profile and "profile.name" not in field_sources:
+ continue
+ if isinstance(value, str) and value.strip():
+ return True
+ if value is not None and not isinstance(value, str):
+ return True
+ return False
+
+
+def get_ticker_overview(symbol: str) -> dict[str, Any] | None:
+ sym = normalize_symbol(symbol)
+ info = get_company_info(sym)
+ search_match = _pick_search_match(sym)
+ fast_info = get_fast_info(sym)
+ month_history = get_price_history(sym, period="1m")
+ year_history = get_price_history(sym, period="1y")
+ field_sources: dict[str, str] = {}
+
+ profile = _build_profile(sym, info, fast_info, search_match, field_sources)
+ quote, stats, range_52w = _build_quote_and_stats(info, fast_info, month_history, year_history, field_sources)
+ short = _safe_int(info.get("sharesShort"))
+ short_prior = _safe_int(info.get("sharesShortPriorMonth"))
+ short_delta = None
+ if short is not None and short_prior and short_prior > 0:
+ short_delta = (short - short_prior) / short_prior
+ short_interest = {
+ "short_percent_of_float": _safe_float(info.get("shortPercentOfFloat")),
+ "short_ratio": _safe_float(info.get("shortRatio")),
+ "shares_short": short,
+ "shares_short_prior_month": short_prior,
+ "shares_short_delta_pct": short_delta,
+ }
+
+ if not _has_any_overview_data(profile, quote, stats, range_52w, short_interest, field_sources):
+ return None
+
+ field_availability = {
+ "profile.name": bool(profile.get("name")),
+ "profile.exchange": profile.get("exchange") is not None,
+ "profile.sector": profile.get("sector") is not None,
+ "profile.industry": profile.get("industry") is not None,
+ "profile.website": profile.get("website") is not None,
+ "profile.summary": profile.get("summary") is not None,
+ "quote.price": quote.get("price") is not None,
+ "quote.prev_close": quote.get("prev_close") is not None,
+ "stats.market_cap": stats.get("market_cap") is not None,
+ "stats.trailing_pe": stats.get("trailing_pe") is not None,
+ "stats.trailing_eps": stats.get("trailing_eps") is not None,
+ "stats.volume": stats.get("volume") is not None,
+ "stats.average_volume": stats.get("average_volume") is not None,
+ "stats.beta": stats.get("beta") is not None,
+ "range_52w.low": range_52w.get("low") is not None,
+ "range_52w.high": range_52w.get("high") is not None,
+ }
+ is_partial = not all(field_availability.values())
+
+ return {
+ "profile": profile,
+ "quote": quote,
+ "signals": build_signals(info),
+ "stats": stats,
+ "range_52w": range_52w,
+ "short_interest": short_interest,
+ "meta": {
+ "status": "partial" if is_partial else "complete",
+ "is_partial": is_partial,
+ "field_availability": field_availability,
+ "sources": field_sources,
+ },
+ }