"""yfinance wrapper for Prism v2 Overview data.""" from __future__ import annotations import math import os from typing import Any import httpx import pandas as pd import yfinance as yf from cachetools import TTLCache, cached SEARCH_CACHE = TTLCache(maxsize=128, ttl=60) INFO_CACHE = TTLCache(maxsize=256, ttl=300) FAST_INFO_CACHE = TTLCache(maxsize=256, ttl=300) PROFILE_ENRICH_CACHE = TTLCache(maxsize=256, ttl=300) PRICE_CACHE = TTLCache(maxsize=256, ttl=300) HISTORY_CACHE = TTLCache(maxsize=256, ttl=300) INTRADAY_CACHE = TTLCache(maxsize=128, ttl=60) MARKET_CACHE = TTLCache(maxsize=8, ttl=300) PERIODS = {"1m", "3m", "6m", "1y", "5y"} YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "5y": "5y"} _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} def normalize_symbol(symbol: str) -> str: return str(symbol or "").strip().upper() def _safe_float(value: Any) -> float | None: try: n = float(value) except (TypeError, ValueError): return None if math.isnan(n) or math.isinf(n): return None return n def _safe_int(value: Any) -> int | None: n = _safe_float(value) return int(round(n)) if n is not None else None def _json_value(value: Any) -> Any: if value is None: return None if isinstance(value, pd.Timestamp): return value.isoformat() if pd.isna(value): return None if hasattr(value, "item"): return _json_value(value.item()) return value def _pick_search_match(symbol: str) -> dict[str, Any]: sym = normalize_symbol(symbol) results = search_tickers(sym) for row in results: if normalize_symbol(row.get("symbol")) == sym: return row return {} @cached(SEARCH_CACHE) def search_tickers(query: str) -> list[dict[str, Any]]: """Search for tickers by company name or symbol.""" q = str(query or "").strip() if len(q) < 2: return [] try: results = yf.Search(q, max_results=8).quotes out: list[dict[str, Any]] = [] for row in results: symbol = row.get("symbol", "") if not symbol: continue out.append( { "symbol": normalize_symbol(symbol), "name": row.get("longname") or row.get("shortname") or symbol, "exchange": row.get("exchange") or row.get("exchDisp") or None, } ) return out except Exception: return [] @cached(INFO_CACHE) def get_company_info(symbol: str) -> dict[str, Any]: """Return a JSON-safe company info dict from yfinance.""" sym = normalize_symbol(symbol) try: info = yf.Ticker(sym).info or {} if not isinstance(info, dict): return {} cleaned = {str(k): _json_value(v) for k, v in info.items()} return cleaned except Exception: return {} @cached(FAST_INFO_CACHE) def get_fast_info(symbol: str) -> dict[str, Any]: """Return a JSON-safe subset of yfinance fast_info.""" sym = normalize_symbol(symbol) try: fast_info = yf.Ticker(sym).fast_info keys = [ "currency", "dayHigh", "dayLow", "exchange", "fiftyDayAverage", "lastPrice", "lastVolume", "marketCap", "open", "previousClose", "regularMarketPreviousClose", "shares", "tenDayAverageVolume", "threeMonthAverageVolume", "timezone", "twoHundredDayAverage", "yearChange", "yearHigh", "yearLow", ] return {key: _json_value(fast_info.get(key)) for key in keys} except Exception: return {} @cached(PRICE_CACHE) def get_latest_price(symbol: str) -> float | None: """Return latest close price, falling back to quote fields in info.""" sym = normalize_symbol(symbol) try: hist = yf.Ticker(sym).history(period="5d") if hist is not None and not hist.empty and "Close" in hist.columns: close = pd.to_numeric(hist["Close"], errors="coerce").dropna() if not close.empty: return _safe_float(close.iloc[-1]) info = get_company_info(sym) for key in ("currentPrice", "regularMarketPrice", "previousClose"): price = _safe_float(info.get(key)) if price is not None: return price return None except Exception: return None @cached(HISTORY_CACHE) def get_price_history(symbol: str, period: str = "1y") -> list[dict[str, Any]]: """Return JSON-safe OHLCV history.""" if period not in PERIODS: period = "1y" try: df = yf.Ticker(normalize_symbol(symbol)).history(period=YF_PERIOD_MAP[period]) if df is None or df.empty: return [] df.index = pd.to_datetime(df.index) return _history_rows(df, include_time=False) except Exception: return [] @cached(INTRADAY_CACHE) def get_intraday_history(symbol: str, period: str, interval: str) -> list[dict[str, Any]]: """Return intraday JSON-safe OHLCV history.""" try: df = yf.Ticker(normalize_symbol(symbol)).history(period=period, interval=interval) if df is None or df.empty: return [] df.index = pd.to_datetime(df.index) try: df = df.between_time("09:30", "16:00") except Exception: pass return _history_rows(df, include_time=True) except Exception: return [] def _history_rows(df: pd.DataFrame, include_time: bool) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for idx, row in df.iterrows(): dt = pd.Timestamp(idx) date = dt.strftime("%Y-%m-%dT%H:%M:%S") if include_time else dt.strftime("%Y-%m-%d") rows.append( { "date": date, "open": _safe_float(row.get("Open")), "high": _safe_float(row.get("High")), "low": _safe_float(row.get("Low")), "close": _safe_float(row.get("Close")), "volume": _safe_float(row.get("Volume")), } ) return rows @cached(MARKET_CACHE) def get_market_indices() -> list[dict[str, Any]]: """Return latest price and day change percent for major indices.""" symbols = { "S&P 500": "^GSPC", "NASDAQ": "^IXIC", "DOW": "^DJI", "VIX": "^VIX", } result: list[dict[str, Any]] = [] for name, sym in symbols.items(): price: float | None = None pct_change: float | None = None try: hist = yf.Ticker(sym).history(period="2d") if len(hist) >= 2: prev_close = _safe_float(hist["Close"].iloc[-2]) last = _safe_float(hist["Close"].iloc[-1]) if prev_close and last is not None: price = last pct_change = (last - prev_close) / prev_close elif len(hist) == 1: price = _safe_float(hist["Close"].iloc[-1]) pct_change = 0.0 except Exception: pass result.append({"name": name, "price": price, "change_pct": pct_change}) return result def build_quote(info: dict[str, Any], symbol: str) -> dict[str, Any]: price = _safe_float(info.get("currentPrice") or info.get("regularMarketPrice")) or get_latest_price(symbol) prev_close = _safe_float(info.get("regularMarketPreviousClose") or info.get("previousClose")) change = None change_pct = None if price is not None and prev_close and prev_close > 0: change = price - prev_close change_pct = change / prev_close return {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct} def build_signals(info: dict[str, Any]) -> list[dict[str, str]]: signals: list[dict[str, str]] = [] pe = _safe_float(info.get("trailingPE")) if pe is not None and pe > 0: if pe < 15: signals.append({"key": "Valuation", "state": "pos", "value": f"P/E {pe:.1f}x", "description": "Attractive multiple"}) elif pe < 30: signals.append({"key": "Valuation", "state": "warn", "value": f"P/E {pe:.1f}x", "description": "Middle of range"}) else: signals.append({"key": "Valuation", "state": "neg", "value": f"P/E {pe:.1f}x", "description": "Premium multiple"}) else: signals.append({"key": "Valuation", "state": "neu", "value": "P/E unavailable", "description": "No trailing earnings"}) _ratio_signal(signals, "Growth", info.get("revenueGrowth"), 0.10, 0.0, "Strong top-line growth", "Low but positive growth", "Contracting revenue") _ratio_signal(signals, "Profit", info.get("profitMargins"), 0.15, 0.05, "High net margin", "Moderate net margin", "Thin or negative margin") debt_to_equity = _safe_float(info.get("debtToEquity")) if debt_to_equity is not None: de_x = debt_to_equity / 100.0 if de_x < 0.5: state, desc = "pos", "Low leverage" elif de_x < 2.0: state, desc = "warn", "Moderate leverage" else: state, desc = "neg", "High leverage" signals.append({"key": "Leverage", "state": state, "value": f"D/E {de_x:.2f}x", "description": desc}) return signals def _ratio_signal( signals: list[dict[str, str]], key: str, value: Any, positive_threshold: float, warn_threshold: float, positive_desc: str, warn_desc: str, negative_desc: str, ) -> None: ratio = _safe_float(value) if ratio is None: return if ratio > positive_threshold: state, desc = "pos", positive_desc elif ratio >= warn_threshold: state, desc = "warn", warn_desc else: state, desc = "neg", negative_desc signals.append({"key": key, "state": state, "value": f"{ratio * 100:+.0f}%" if key == "Growth" else f"{ratio * 100:.0f}%", "description": desc}) def _field(source_map: dict[str, dict[str, Any]], field_sources: dict[str, str], name: str, *candidates: tuple[str, str]) -> Any: for source_name, key in candidates: source = source_map.get(source_name) or {} value = source.get(key) if value is None: continue if isinstance(value, str) and not value.strip(): continue field_sources[name] = source_name return value return None def _history_snapshot(history: list[dict[str, Any]]) -> dict[str, Any]: if not history: return {} closes = [_safe_float(row.get("close")) for row in history] closes = [value for value in closes if value is not None] volumes = [_safe_float(row.get("volume")) for row in history] volumes = [value for value in volumes if value is not None] latest = history[-1] previous = history[-2] if len(history) > 1 else None return { "lastPrice": _safe_float(latest.get("close")), "previousClose": _safe_float(previous.get("close")) if previous else None, "lastVolume": _safe_float(latest.get("volume")), "yearHigh": max(closes) if closes else None, "yearLow": min(closes) if closes else None, "averageVolume": (sum(volumes) / len(volumes)) if volumes else None, } @cached(PROFILE_ENRICH_CACHE) def get_profile_enrichment(symbol: str) -> dict[str, Any]: sym = normalize_symbol(symbol) fmp_key = os.getenv("FMP_API_KEY") if fmp_key: try: with httpx.Client(timeout=3.0) as client: res = client.get( "https://financialmodelingprep.com/api/v3/profile/" + sym, params={"apikey": fmp_key}, ) rows = res.json() if isinstance(rows, list) and rows: row = rows[0] or {} return { "sector": row.get("sector"), "industry": row.get("industry"), "website": row.get("website"), "summary": row.get("description"), } except Exception: pass finnhub_key = os.getenv("FINNHUB_API_KEY") if finnhub_key: try: with httpx.Client(timeout=3.0) as client: res = client.get( "https://finnhub.io/api/v1/stock/profile2", params={"symbol": sym, "token": finnhub_key}, ) row = res.json() if isinstance(row, dict) and row: return { "industry": row.get("finnhubIndustry"), "website": row.get("weburl"), "name": row.get("name"), "exchange": row.get("exchange"), } except Exception: pass return {} def _build_profile(sym: str, info: dict[str, Any], fast_info: dict[str, Any], search_match: dict[str, Any], field_sources: dict[str, str]) -> dict[str, Any]: enrichment = get_profile_enrichment(sym) source_map = { "info": info, "fast_info": fast_info, "search": search_match, "enrichment": enrichment, } name = _field( source_map, field_sources, "profile.name", ("info", "longName"), ("info", "shortName"), ("enrichment", "name"), ("search", "name"), ) exchange = _field( source_map, field_sources, "profile.exchange", ("info", "exchange"), ("enrichment", "exchange"), ("fast_info", "exchange"), ("search", "exchange"), ) if exchange is not None: exchange = _XMAP.get(str(exchange), exchange) return { "symbol": sym, "name": str(name or sym), "sector": _field(source_map, field_sources, "profile.sector", ("info", "sector"), ("enrichment", "sector")), "industry": _field(source_map, field_sources, "profile.industry", ("info", "industry"), ("enrichment", "industry")), "exchange": exchange, "website": _field(source_map, field_sources, "profile.website", ("info", "website"), ("enrichment", "website")), "summary": _field(source_map, field_sources, "profile.summary", ("info", "longBusinessSummary"), ("enrichment", "summary")), } def _build_quote_and_stats( info: dict[str, Any], fast_info: dict[str, Any], month_history: list[dict[str, Any]], year_history: list[dict[str, Any]], field_sources: dict[str, str], ) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: month_snapshot = _history_snapshot(month_history) year_snapshot = _history_snapshot(year_history) source_map = { "info": info, "fast_info": fast_info, "history_recent": month_snapshot, "history_year": year_snapshot, } price = _safe_float( _field( source_map, field_sources, "quote.price", ("info", "currentPrice"), ("info", "regularMarketPrice"), ("fast_info", "lastPrice"), ("history_recent", "lastPrice"), ) ) prev_close = _safe_float( _field( source_map, field_sources, "quote.prev_close", ("info", "regularMarketPreviousClose"), ("info", "previousClose"), ("fast_info", "regularMarketPreviousClose"), ("fast_info", "previousClose"), ("history_recent", "previousClose"), ) ) change = None change_pct = None if price is not None and prev_close is not None and prev_close > 0: change = price - prev_close change_pct = change / prev_close volume = _safe_float( _field( source_map, field_sources, "stats.volume", ("info", "volume"), ("fast_info", "lastVolume"), ("history_recent", "lastVolume"), ) ) average_volume = _safe_float( _field( source_map, field_sources, "stats.average_volume", ("info", "averageVolume"), ("fast_info", "threeMonthAverageVolume"), ("fast_info", "tenDayAverageVolume"), ("history_recent", "averageVolume"), ) ) market_cap = _safe_float(_field(source_map, field_sources, "stats.market_cap", ("info", "marketCap"), ("fast_info", "marketCap"))) trailing_pe = _safe_float(_field(source_map, field_sources, "stats.trailing_pe", ("info", "trailingPE"))) trailing_eps = _safe_float(_field(source_map, field_sources, "stats.trailing_eps", ("info", "trailingEps"))) beta = _safe_float(_field(source_map, field_sources, "stats.beta", ("info", "beta"))) range_low = _safe_float( _field( source_map, field_sources, "range_52w.low", ("info", "fiftyTwoWeekLow"), ("fast_info", "yearLow"), ("history_year", "yearLow"), ) ) range_high = _safe_float( _field( source_map, field_sources, "range_52w.high", ("info", "fiftyTwoWeekHigh"), ("fast_info", "yearHigh"), ("history_year", "yearHigh"), ) ) return ( {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct}, { "market_cap": market_cap, "trailing_pe": trailing_pe, "trailing_eps": trailing_eps, "volume": volume, "average_volume": average_volume, "beta": beta, }, { "low": range_low, "high": range_high, "price": price, }, ) def _has_any_overview_data( profile: dict[str, Any], quote: dict[str, Any], stats: dict[str, Any], range_52w: dict[str, Any], short_interest: dict[str, Any], field_sources: dict[str, str], ) -> bool: for bucket in (profile, quote, stats, range_52w, short_interest): for key, value in bucket.items(): if key == "symbol": continue if key == "name" and bucket is profile and "profile.name" not in field_sources: continue if isinstance(value, str) and value.strip(): return True if value is not None and not isinstance(value, str): return True return False def get_ticker_overview(symbol: str) -> dict[str, Any] | None: sym = normalize_symbol(symbol) info = get_company_info(sym) search_match = _pick_search_match(sym) fast_info = get_fast_info(sym) month_history = get_price_history(sym, period="1m") year_history = get_price_history(sym, period="1y") field_sources: dict[str, str] = {} profile = _build_profile(sym, info, fast_info, search_match, field_sources) quote, stats, range_52w = _build_quote_and_stats(info, fast_info, month_history, year_history, field_sources) short = _safe_int(info.get("sharesShort")) short_prior = _safe_int(info.get("sharesShortPriorMonth")) short_delta = None if short is not None and short_prior and short_prior > 0: short_delta = (short - short_prior) / short_prior short_interest = { "short_percent_of_float": _safe_float(info.get("shortPercentOfFloat")), "short_ratio": _safe_float(info.get("shortRatio")), "shares_short": short, "shares_short_prior_month": short_prior, "shares_short_delta_pct": short_delta, } if not _has_any_overview_data(profile, quote, stats, range_52w, short_interest, field_sources): return None field_availability = { "profile.name": bool(profile.get("name")), "profile.exchange": profile.get("exchange") is not None, "profile.sector": profile.get("sector") is not None, "profile.industry": profile.get("industry") is not None, "profile.website": profile.get("website") is not None, "profile.summary": profile.get("summary") is not None, "quote.price": quote.get("price") is not None, "quote.prev_close": quote.get("prev_close") is not None, "stats.market_cap": stats.get("market_cap") is not None, "stats.trailing_pe": stats.get("trailing_pe") is not None, "stats.trailing_eps": stats.get("trailing_eps") is not None, "stats.volume": stats.get("volume") is not None, "stats.average_volume": stats.get("average_volume") is not None, "stats.beta": stats.get("beta") is not None, "range_52w.low": range_52w.get("low") is not None, "range_52w.high": range_52w.get("high") is not None, } is_partial = not all(field_availability.values()) return { "profile": profile, "quote": quote, "signals": build_signals(info), "stats": stats, "range_52w": range_52w, "short_interest": short_interest, "meta": { "status": "partial" if is_partial else "complete", "is_partial": is_partial, "field_availability": field_availability, "sources": field_sources, }, }