diff options
Diffstat (limited to 'backend/app')
| -rw-r--r-- | backend/app/__init__.py | 1 | ||||
| -rw-r--r-- | backend/app/api/__init__.py | 1 | ||||
| -rw-r--r-- | backend/app/db/__init__.py | 1 | ||||
| -rw-r--r-- | backend/app/db/watchlist.py | 104 | ||||
| -rw-r--r-- | backend/app/main.py | 94 | ||||
| -rw-r--r-- | backend/app/schemas.py | 104 | ||||
| -rw-r--r-- | backend/app/services/__init__.py | 1 | ||||
| -rw-r--r-- | backend/app/services/data_service.py | 605 |
8 files changed, 911 insertions, 0 deletions
diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..a9425fc --- /dev/null +++ b/backend/app/__init__.py @@ -0,0 +1 @@ +"""Prism v2 backend package.""" diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py new file mode 100644 index 0000000..509980f --- /dev/null +++ b/backend/app/api/__init__.py @@ -0,0 +1 @@ +"""API route package.""" diff --git a/backend/app/db/__init__.py b/backend/app/db/__init__.py new file mode 100644 index 0000000..a6e0ed0 --- /dev/null +++ b/backend/app/db/__init__.py @@ -0,0 +1 @@ +"""SQLite persistence package.""" diff --git a/backend/app/db/watchlist.py b/backend/app/db/watchlist.py new file mode 100644 index 0000000..238cf35 --- /dev/null +++ b/backend/app/db/watchlist.py @@ -0,0 +1,104 @@ +"""SQLite watchlist persistence for one local default profile.""" +from __future__ import annotations + +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path + +DEFAULT_DB_PATH = Path(__file__).resolve().parents[2] / "data" / "prism.db" +WATCHLIST_LIMIT = 10 + + +class WatchlistFullError(ValueError): + """Raised when the local watchlist already has the maximum number of symbols.""" + + +def normalize_symbol(symbol: str) -> str: + cleaned = str(symbol or "").strip().upper() + if not cleaned: + raise ValueError("symbol is required") + if len(cleaned) > 16: + raise ValueError("symbol is too long") + return cleaned + + +def connect(db_path: Path | str = DEFAULT_DB_PATH) -> sqlite3.Connection: + path = Path(db_path) + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(db_path: Path | str = DEFAULT_DB_PATH) -> None: + with connect(db_path) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS profiles ( + id INTEGER PRIMARY KEY, + name TEXT UNIQUE NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS watchlist ( + profile_id INTEGER NOT NULL, + symbol TEXT NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(profile_id, symbol), + FOREIGN KEY(profile_id) REFERENCES profiles(id) + ) + """ + ) + conn.execute("INSERT OR IGNORE INTO profiles(name) VALUES (?)", ("default",)) + + +def get_default_profile_id(conn: sqlite3.Connection) -> int: + row = conn.execute("SELECT id FROM profiles WHERE name = ?", ("default",)).fetchone() + if row is None: + conn.execute("INSERT INTO profiles(name) VALUES (?)", ("default",)) + row = conn.execute("SELECT id FROM profiles WHERE name = ?", ("default",)).fetchone() + return int(row["id"]) + + +def list_symbols(db_path: Path | str = DEFAULT_DB_PATH) -> list[dict[str, str]]: + init_db(db_path) + with connect(db_path) as conn: + profile_id = get_default_profile_id(conn) + rows = conn.execute( + "SELECT symbol, created_at FROM watchlist WHERE profile_id = ? ORDER BY created_at ASC", + (profile_id,), + ).fetchall() + return [{"symbol": row["symbol"], "created_at": row["created_at"]} for row in rows] + + +def add_symbol(symbol: str, db_path: Path | str = DEFAULT_DB_PATH) -> dict[str, str]: + sym = normalize_symbol(symbol) + init_db(db_path) + with connect(db_path) as conn: + profile_id = get_default_profile_id(conn) + existing = conn.execute( + "SELECT symbol, created_at FROM watchlist WHERE profile_id = ? AND symbol = ?", + (profile_id, sym), + ).fetchone() + if existing: + return {"symbol": existing["symbol"], "created_at": existing["created_at"]} + count = conn.execute("SELECT COUNT(*) AS c FROM watchlist WHERE profile_id = ?", (profile_id,)).fetchone()["c"] + if int(count) >= WATCHLIST_LIMIT: + raise WatchlistFullError("watchlist limit reached") + created_at = datetime.now(timezone.utc).isoformat() + conn.execute( + "INSERT INTO watchlist(profile_id, symbol, created_at) VALUES (?, ?, ?)", + (profile_id, sym, created_at), + ) + return {"symbol": sym, "created_at": created_at} + + +def remove_symbol(symbol: str, db_path: Path | str = DEFAULT_DB_PATH) -> bool: + sym = normalize_symbol(symbol) + init_db(db_path) + with connect(db_path) as conn: + profile_id = get_default_profile_id(conn) + cur = conn.execute("DELETE FROM watchlist WHERE profile_id = ? AND symbol = ?", (profile_id, sym)) + return cur.rowcount > 0 diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..fc98a5e --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,94 @@ +"""FastAPI entrypoint for Prism v2.""" +from __future__ import annotations + +from contextlib import asynccontextmanager +from pathlib import Path + +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Query, status +from fastapi.middleware.cors import CORSMiddleware + +from app.db import watchlist +from app.schemas import HistoryPoint, MarketIndex, SearchResult, TickerOverview, WatchlistResponse +from app.services import data_service + +load_dotenv() + +@asynccontextmanager +async def lifespan(_: FastAPI): + watchlist.init_db(DB_PATH) + yield + + +app = FastAPI(title="Prism v2 API", version="0.1.0", lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=[ + "http://localhost:3000", + "http://127.0.0.1:3000", + "http://localhost:3001", + "http://127.0.0.1:3001", + ], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +DB_PATH = Path(__file__).resolve().parents[1] / "data" / "prism.db" + + +@app.get("/health") +def health() -> dict[str, str]: + return {"status": "ok"} + + +@app.get("/api/search", response_model=list[SearchResult]) +def search(q: str = Query(default="", min_length=0)) -> list[dict]: + return data_service.search_tickers(q) + + +@app.get("/api/market/indices", response_model=list[MarketIndex]) +def market_indices() -> list[dict]: + return data_service.get_market_indices() + + +@app.get("/api/tickers/{symbol}/overview", response_model=TickerOverview) +def ticker_overview(symbol: str) -> dict: + overview = data_service.get_ticker_overview(symbol) + if overview is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ticker data unavailable") + return overview + + +@app.get("/api/tickers/{symbol}/history", response_model=list[HistoryPoint]) +def ticker_history(symbol: str, period: str = Query(default="1y", pattern="^(1m|3m|6m|1y|5y)$")) -> list[dict]: + return data_service.get_price_history(symbol, period=period) + + +@app.get("/api/watchlist", response_model=WatchlistResponse) +def get_watchlist() -> dict: + items = [] + for row in watchlist.list_symbols(DB_PATH): + info = data_service.get_company_info(row["symbol"]) + items.append({**row, "quote": data_service.build_quote(info, row["symbol"])}) + return {"items": items, "limit": watchlist.WATCHLIST_LIMIT} + + +@app.post("/api/watchlist/{symbol}", response_model=WatchlistResponse, status_code=status.HTTP_201_CREATED) +def add_watchlist_symbol(symbol: str) -> dict: + try: + watchlist.add_symbol(symbol, DB_PATH) + except watchlist.WatchlistFullError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + return get_watchlist() + + +@app.delete("/api/watchlist/{symbol}", response_model=WatchlistResponse) +def delete_watchlist_symbol(symbol: str) -> dict: + try: + watchlist.remove_symbol(symbol, DB_PATH) + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + return get_watchlist() diff --git a/backend/app/schemas.py b/backend/app/schemas.py new file mode 100644 index 0000000..db0076d --- /dev/null +++ b/backend/app/schemas.py @@ -0,0 +1,104 @@ +"""Pydantic response schemas for the Prism v2 Overview API.""" +from typing import Literal + +from pydantic import BaseModel, Field + + +class SearchResult(BaseModel): + symbol: str + name: str + exchange: str | None = None + + +class MarketIndex(BaseModel): + name: str + price: float | None = None + change_pct: float | None = None + + +class Quote(BaseModel): + price: float | None = None + prev_close: float | None = None + change: float | None = None + change_pct: float | None = None + + +class Signal(BaseModel): + key: str + state: Literal["pos", "warn", "neg", "neu"] + value: str + description: str + + +class OverviewStats(BaseModel): + market_cap: float | None = None + trailing_pe: float | None = None + trailing_eps: float | None = None + volume: float | None = None + average_volume: float | None = None + beta: float | None = None + + +class Range52Week(BaseModel): + low: float | None = None + high: float | None = None + price: float | None = None + + +class ShortInterest(BaseModel): + short_percent_of_float: float | None = None + short_ratio: float | None = None + shares_short: int | None = None + shares_short_prior_month: int | None = None + shares_short_delta_pct: float | None = None + + +class CompanyProfile(BaseModel): + symbol: str + name: str + sector: str | None = None + industry: str | None = None + exchange: str | None = None + website: str | None = None + summary: str | None = None + + +class OverviewMeta(BaseModel): + status: Literal["complete", "partial"] + is_partial: bool = False + field_availability: dict[str, bool] = Field(default_factory=dict) + sources: dict[str, str] = Field(default_factory=dict) + + +class TickerOverview(BaseModel): + profile: CompanyProfile + quote: Quote + signals: list[Signal] = Field(default_factory=list) + stats: OverviewStats + range_52w: Range52Week + short_interest: ShortInterest + meta: OverviewMeta + + +class HistoryPoint(BaseModel): + date: str + open: float | None = None + high: float | None = None + low: float | None = None + close: float | None = None + volume: float | None = None + + +class WatchlistItem(BaseModel): + symbol: str + created_at: str + quote: Quote | None = None + + +class WatchlistResponse(BaseModel): + items: list[WatchlistItem] + limit: int = 10 + + +class ErrorResponse(BaseModel): + detail: str diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py new file mode 100644 index 0000000..fbe69f6 --- /dev/null +++ b/backend/app/services/__init__.py @@ -0,0 +1 @@ +"""Finance service package.""" diff --git a/backend/app/services/data_service.py b/backend/app/services/data_service.py new file mode 100644 index 0000000..ae078cd --- /dev/null +++ b/backend/app/services/data_service.py @@ -0,0 +1,605 @@ +"""yfinance wrapper for Prism v2 Overview data.""" +from __future__ import annotations + +import math +import os +from typing import Any + +import httpx +import pandas as pd +import yfinance as yf +from cachetools import TTLCache, cached + +SEARCH_CACHE = TTLCache(maxsize=128, ttl=60) +INFO_CACHE = TTLCache(maxsize=256, ttl=300) +FAST_INFO_CACHE = TTLCache(maxsize=256, ttl=300) +PROFILE_ENRICH_CACHE = TTLCache(maxsize=256, ttl=300) +PRICE_CACHE = TTLCache(maxsize=256, ttl=300) +HISTORY_CACHE = TTLCache(maxsize=256, ttl=300) +INTRADAY_CACHE = TTLCache(maxsize=128, ttl=60) +MARKET_CACHE = TTLCache(maxsize=8, ttl=300) + +PERIODS = {"1m", "3m", "6m", "1y", "5y"} +YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "5y": "5y"} +_XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} + + +def normalize_symbol(symbol: str) -> str: + return str(symbol or "").strip().upper() + + +def _safe_float(value: Any) -> float | None: + try: + n = float(value) + except (TypeError, ValueError): + return None + if math.isnan(n) or math.isinf(n): + return None + return n + + +def _safe_int(value: Any) -> int | None: + n = _safe_float(value) + return int(round(n)) if n is not None else None + + +def _json_value(value: Any) -> Any: + if value is None: + return None + if isinstance(value, pd.Timestamp): + return value.isoformat() + if pd.isna(value): + return None + if hasattr(value, "item"): + return _json_value(value.item()) + return value + + +def _pick_search_match(symbol: str) -> dict[str, Any]: + sym = normalize_symbol(symbol) + results = search_tickers(sym) + for row in results: + if normalize_symbol(row.get("symbol")) == sym: + return row + return {} + + +@cached(SEARCH_CACHE) +def search_tickers(query: str) -> list[dict[str, Any]]: + """Search for tickers by company name or symbol.""" + q = str(query or "").strip() + if len(q) < 2: + return [] + try: + results = yf.Search(q, max_results=8).quotes + out: list[dict[str, Any]] = [] + for row in results: + symbol = row.get("symbol", "") + if not symbol: + continue + out.append( + { + "symbol": normalize_symbol(symbol), + "name": row.get("longname") or row.get("shortname") or symbol, + "exchange": row.get("exchange") or row.get("exchDisp") or None, + } + ) + return out + except Exception: + return [] + + +@cached(INFO_CACHE) +def get_company_info(symbol: str) -> dict[str, Any]: + """Return a JSON-safe company info dict from yfinance.""" + sym = normalize_symbol(symbol) + try: + info = yf.Ticker(sym).info or {} + if not isinstance(info, dict): + return {} + cleaned = {str(k): _json_value(v) for k, v in info.items()} + return cleaned + except Exception: + return {} + + +@cached(FAST_INFO_CACHE) +def get_fast_info(symbol: str) -> dict[str, Any]: + """Return a JSON-safe subset of yfinance fast_info.""" + sym = normalize_symbol(symbol) + try: + fast_info = yf.Ticker(sym).fast_info + keys = [ + "currency", + "dayHigh", + "dayLow", + "exchange", + "fiftyDayAverage", + "lastPrice", + "lastVolume", + "marketCap", + "open", + "previousClose", + "regularMarketPreviousClose", + "shares", + "tenDayAverageVolume", + "threeMonthAverageVolume", + "timezone", + "twoHundredDayAverage", + "yearChange", + "yearHigh", + "yearLow", + ] + return {key: _json_value(fast_info.get(key)) for key in keys} + except Exception: + return {} + + +@cached(PRICE_CACHE) +def get_latest_price(symbol: str) -> float | None: + """Return latest close price, falling back to quote fields in info.""" + sym = normalize_symbol(symbol) + try: + hist = yf.Ticker(sym).history(period="5d") + if hist is not None and not hist.empty and "Close" in hist.columns: + close = pd.to_numeric(hist["Close"], errors="coerce").dropna() + if not close.empty: + return _safe_float(close.iloc[-1]) + info = get_company_info(sym) + for key in ("currentPrice", "regularMarketPrice", "previousClose"): + price = _safe_float(info.get(key)) + if price is not None: + return price + return None + except Exception: + return None + + +@cached(HISTORY_CACHE) +def get_price_history(symbol: str, period: str = "1y") -> list[dict[str, Any]]: + """Return JSON-safe OHLCV history.""" + if period not in PERIODS: + period = "1y" + try: + df = yf.Ticker(normalize_symbol(symbol)).history(period=YF_PERIOD_MAP[period]) + if df is None or df.empty: + return [] + df.index = pd.to_datetime(df.index) + return _history_rows(df, include_time=False) + except Exception: + return [] + + +@cached(INTRADAY_CACHE) +def get_intraday_history(symbol: str, period: str, interval: str) -> list[dict[str, Any]]: + """Return intraday JSON-safe OHLCV history.""" + try: + df = yf.Ticker(normalize_symbol(symbol)).history(period=period, interval=interval) + if df is None or df.empty: + return [] + df.index = pd.to_datetime(df.index) + try: + df = df.between_time("09:30", "16:00") + except Exception: + pass + return _history_rows(df, include_time=True) + except Exception: + return [] + + +def _history_rows(df: pd.DataFrame, include_time: bool) -> list[dict[str, Any]]: + rows: list[dict[str, Any]] = [] + for idx, row in df.iterrows(): + dt = pd.Timestamp(idx) + date = dt.strftime("%Y-%m-%dT%H:%M:%S") if include_time else dt.strftime("%Y-%m-%d") + rows.append( + { + "date": date, + "open": _safe_float(row.get("Open")), + "high": _safe_float(row.get("High")), + "low": _safe_float(row.get("Low")), + "close": _safe_float(row.get("Close")), + "volume": _safe_float(row.get("Volume")), + } + ) + return rows + + +@cached(MARKET_CACHE) +def get_market_indices() -> list[dict[str, Any]]: + """Return latest price and day change percent for major indices.""" + symbols = { + "S&P 500": "^GSPC", + "NASDAQ": "^IXIC", + "DOW": "^DJI", + "VIX": "^VIX", + } + result: list[dict[str, Any]] = [] + for name, sym in symbols.items(): + price: float | None = None + pct_change: float | None = None + try: + hist = yf.Ticker(sym).history(period="2d") + if len(hist) >= 2: + prev_close = _safe_float(hist["Close"].iloc[-2]) + last = _safe_float(hist["Close"].iloc[-1]) + if prev_close and last is not None: + price = last + pct_change = (last - prev_close) / prev_close + elif len(hist) == 1: + price = _safe_float(hist["Close"].iloc[-1]) + pct_change = 0.0 + except Exception: + pass + result.append({"name": name, "price": price, "change_pct": pct_change}) + return result + + +def build_quote(info: dict[str, Any], symbol: str) -> dict[str, Any]: + price = _safe_float(info.get("currentPrice") or info.get("regularMarketPrice")) or get_latest_price(symbol) + prev_close = _safe_float(info.get("regularMarketPreviousClose") or info.get("previousClose")) + change = None + change_pct = None + if price is not None and prev_close and prev_close > 0: + change = price - prev_close + change_pct = change / prev_close + return {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct} + + +def build_signals(info: dict[str, Any]) -> list[dict[str, str]]: + signals: list[dict[str, str]] = [] + pe = _safe_float(info.get("trailingPE")) + if pe is not None and pe > 0: + if pe < 15: + signals.append({"key": "Valuation", "state": "pos", "value": f"P/E {pe:.1f}x", "description": "Attractive multiple"}) + elif pe < 30: + signals.append({"key": "Valuation", "state": "warn", "value": f"P/E {pe:.1f}x", "description": "Middle of range"}) + else: + signals.append({"key": "Valuation", "state": "neg", "value": f"P/E {pe:.1f}x", "description": "Premium multiple"}) + else: + signals.append({"key": "Valuation", "state": "neu", "value": "P/E unavailable", "description": "No trailing earnings"}) + + _ratio_signal(signals, "Growth", info.get("revenueGrowth"), 0.10, 0.0, "Strong top-line growth", "Low but positive growth", "Contracting revenue") + _ratio_signal(signals, "Profit", info.get("profitMargins"), 0.15, 0.05, "High net margin", "Moderate net margin", "Thin or negative margin") + + debt_to_equity = _safe_float(info.get("debtToEquity")) + if debt_to_equity is not None: + de_x = debt_to_equity / 100.0 + if de_x < 0.5: + state, desc = "pos", "Low leverage" + elif de_x < 2.0: + state, desc = "warn", "Moderate leverage" + else: + state, desc = "neg", "High leverage" + signals.append({"key": "Leverage", "state": state, "value": f"D/E {de_x:.2f}x", "description": desc}) + + return signals + + +def _ratio_signal( + signals: list[dict[str, str]], + key: str, + value: Any, + positive_threshold: float, + warn_threshold: float, + positive_desc: str, + warn_desc: str, + negative_desc: str, +) -> None: + ratio = _safe_float(value) + if ratio is None: + return + if ratio > positive_threshold: + state, desc = "pos", positive_desc + elif ratio >= warn_threshold: + state, desc = "warn", warn_desc + else: + state, desc = "neg", negative_desc + signals.append({"key": key, "state": state, "value": f"{ratio * 100:+.0f}%" if key == "Growth" else f"{ratio * 100:.0f}%", "description": desc}) + + +def _field(source_map: dict[str, dict[str, Any]], field_sources: dict[str, str], name: str, *candidates: tuple[str, str]) -> Any: + for source_name, key in candidates: + source = source_map.get(source_name) or {} + value = source.get(key) + if value is None: + continue + if isinstance(value, str) and not value.strip(): + continue + field_sources[name] = source_name + return value + return None + + +def _history_snapshot(history: list[dict[str, Any]]) -> dict[str, Any]: + if not history: + return {} + closes = [_safe_float(row.get("close")) for row in history] + closes = [value for value in closes if value is not None] + volumes = [_safe_float(row.get("volume")) for row in history] + volumes = [value for value in volumes if value is not None] + latest = history[-1] + previous = history[-2] if len(history) > 1 else None + return { + "lastPrice": _safe_float(latest.get("close")), + "previousClose": _safe_float(previous.get("close")) if previous else None, + "lastVolume": _safe_float(latest.get("volume")), + "yearHigh": max(closes) if closes else None, + "yearLow": min(closes) if closes else None, + "averageVolume": (sum(volumes) / len(volumes)) if volumes else None, + } + + +@cached(PROFILE_ENRICH_CACHE) +def get_profile_enrichment(symbol: str) -> dict[str, Any]: + sym = normalize_symbol(symbol) + fmp_key = os.getenv("FMP_API_KEY") + if fmp_key: + try: + with httpx.Client(timeout=3.0) as client: + res = client.get( + "https://financialmodelingprep.com/api/v3/profile/" + sym, + params={"apikey": fmp_key}, + ) + rows = res.json() + if isinstance(rows, list) and rows: + row = rows[0] or {} + return { + "sector": row.get("sector"), + "industry": row.get("industry"), + "website": row.get("website"), + "summary": row.get("description"), + } + except Exception: + pass + finnhub_key = os.getenv("FINNHUB_API_KEY") + if finnhub_key: + try: + with httpx.Client(timeout=3.0) as client: + res = client.get( + "https://finnhub.io/api/v1/stock/profile2", + params={"symbol": sym, "token": finnhub_key}, + ) + row = res.json() + if isinstance(row, dict) and row: + return { + "industry": row.get("finnhubIndustry"), + "website": row.get("weburl"), + "name": row.get("name"), + "exchange": row.get("exchange"), + } + except Exception: + pass + return {} + + +def _build_profile(sym: str, info: dict[str, Any], fast_info: dict[str, Any], search_match: dict[str, Any], field_sources: dict[str, str]) -> dict[str, Any]: + enrichment = get_profile_enrichment(sym) + source_map = { + "info": info, + "fast_info": fast_info, + "search": search_match, + "enrichment": enrichment, + } + name = _field( + source_map, + field_sources, + "profile.name", + ("info", "longName"), + ("info", "shortName"), + ("enrichment", "name"), + ("search", "name"), + ) + exchange = _field( + source_map, + field_sources, + "profile.exchange", + ("info", "exchange"), + ("enrichment", "exchange"), + ("fast_info", "exchange"), + ("search", "exchange"), + ) + if exchange is not None: + exchange = _XMAP.get(str(exchange), exchange) + return { + "symbol": sym, + "name": str(name or sym), + "sector": _field(source_map, field_sources, "profile.sector", ("info", "sector"), ("enrichment", "sector")), + "industry": _field(source_map, field_sources, "profile.industry", ("info", "industry"), ("enrichment", "industry")), + "exchange": exchange, + "website": _field(source_map, field_sources, "profile.website", ("info", "website"), ("enrichment", "website")), + "summary": _field(source_map, field_sources, "profile.summary", ("info", "longBusinessSummary"), ("enrichment", "summary")), + } + + +def _build_quote_and_stats( + info: dict[str, Any], + fast_info: dict[str, Any], + month_history: list[dict[str, Any]], + year_history: list[dict[str, Any]], + field_sources: dict[str, str], +) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: + month_snapshot = _history_snapshot(month_history) + year_snapshot = _history_snapshot(year_history) + source_map = { + "info": info, + "fast_info": fast_info, + "history_recent": month_snapshot, + "history_year": year_snapshot, + } + + price = _safe_float( + _field( + source_map, + field_sources, + "quote.price", + ("info", "currentPrice"), + ("info", "regularMarketPrice"), + ("fast_info", "lastPrice"), + ("history_recent", "lastPrice"), + ) + ) + prev_close = _safe_float( + _field( + source_map, + field_sources, + "quote.prev_close", + ("info", "regularMarketPreviousClose"), + ("info", "previousClose"), + ("fast_info", "regularMarketPreviousClose"), + ("fast_info", "previousClose"), + ("history_recent", "previousClose"), + ) + ) + change = None + change_pct = None + if price is not None and prev_close is not None and prev_close > 0: + change = price - prev_close + change_pct = change / prev_close + + volume = _safe_float( + _field( + source_map, + field_sources, + "stats.volume", + ("info", "volume"), + ("fast_info", "lastVolume"), + ("history_recent", "lastVolume"), + ) + ) + average_volume = _safe_float( + _field( + source_map, + field_sources, + "stats.average_volume", + ("info", "averageVolume"), + ("fast_info", "threeMonthAverageVolume"), + ("fast_info", "tenDayAverageVolume"), + ("history_recent", "averageVolume"), + ) + ) + market_cap = _safe_float(_field(source_map, field_sources, "stats.market_cap", ("info", "marketCap"), ("fast_info", "marketCap"))) + trailing_pe = _safe_float(_field(source_map, field_sources, "stats.trailing_pe", ("info", "trailingPE"))) + trailing_eps = _safe_float(_field(source_map, field_sources, "stats.trailing_eps", ("info", "trailingEps"))) + beta = _safe_float(_field(source_map, field_sources, "stats.beta", ("info", "beta"))) + range_low = _safe_float( + _field( + source_map, + field_sources, + "range_52w.low", + ("info", "fiftyTwoWeekLow"), + ("fast_info", "yearLow"), + ("history_year", "yearLow"), + ) + ) + range_high = _safe_float( + _field( + source_map, + field_sources, + "range_52w.high", + ("info", "fiftyTwoWeekHigh"), + ("fast_info", "yearHigh"), + ("history_year", "yearHigh"), + ) + ) + + return ( + {"price": price, "prev_close": prev_close, "change": change, "change_pct": change_pct}, + { + "market_cap": market_cap, + "trailing_pe": trailing_pe, + "trailing_eps": trailing_eps, + "volume": volume, + "average_volume": average_volume, + "beta": beta, + }, + { + "low": range_low, + "high": range_high, + "price": price, + }, + ) + + +def _has_any_overview_data( + profile: dict[str, Any], + quote: dict[str, Any], + stats: dict[str, Any], + range_52w: dict[str, Any], + short_interest: dict[str, Any], + field_sources: dict[str, str], +) -> bool: + for bucket in (profile, quote, stats, range_52w, short_interest): + for key, value in bucket.items(): + if key == "symbol": + continue + if key == "name" and bucket is profile and "profile.name" not in field_sources: + continue + if isinstance(value, str) and value.strip(): + return True + if value is not None and not isinstance(value, str): + return True + return False + + +def get_ticker_overview(symbol: str) -> dict[str, Any] | None: + sym = normalize_symbol(symbol) + info = get_company_info(sym) + search_match = _pick_search_match(sym) + fast_info = get_fast_info(sym) + month_history = get_price_history(sym, period="1m") + year_history = get_price_history(sym, period="1y") + field_sources: dict[str, str] = {} + + profile = _build_profile(sym, info, fast_info, search_match, field_sources) + quote, stats, range_52w = _build_quote_and_stats(info, fast_info, month_history, year_history, field_sources) + short = _safe_int(info.get("sharesShort")) + short_prior = _safe_int(info.get("sharesShortPriorMonth")) + short_delta = None + if short is not None and short_prior and short_prior > 0: + short_delta = (short - short_prior) / short_prior + short_interest = { + "short_percent_of_float": _safe_float(info.get("shortPercentOfFloat")), + "short_ratio": _safe_float(info.get("shortRatio")), + "shares_short": short, + "shares_short_prior_month": short_prior, + "shares_short_delta_pct": short_delta, + } + + if not _has_any_overview_data(profile, quote, stats, range_52w, short_interest, field_sources): + return None + + field_availability = { + "profile.name": bool(profile.get("name")), + "profile.exchange": profile.get("exchange") is not None, + "profile.sector": profile.get("sector") is not None, + "profile.industry": profile.get("industry") is not None, + "profile.website": profile.get("website") is not None, + "profile.summary": profile.get("summary") is not None, + "quote.price": quote.get("price") is not None, + "quote.prev_close": quote.get("prev_close") is not None, + "stats.market_cap": stats.get("market_cap") is not None, + "stats.trailing_pe": stats.get("trailing_pe") is not None, + "stats.trailing_eps": stats.get("trailing_eps") is not None, + "stats.volume": stats.get("volume") is not None, + "stats.average_volume": stats.get("average_volume") is not None, + "stats.beta": stats.get("beta") is not None, + "range_52w.low": range_52w.get("low") is not None, + "range_52w.high": range_52w.get("high") is not None, + } + is_partial = not all(field_availability.values()) + + return { + "profile": profile, + "quote": quote, + "signals": build_signals(info), + "stats": stats, + "range_52w": range_52w, + "short_interest": short_interest, + "meta": { + "status": "partial" if is_partial else "complete", + "is_partial": is_partial, + "field_availability": field_availability, + "sources": field_sources, + }, + } |
