diff options
| author | Tyler Hoang <tyler@tylerhoang.xyz> | 2026-05-18 22:45:59 -0700 |
|---|---|---|
| committer | Tyler Hoang <tyler@tylerhoang.xyz> | 2026-05-18 22:45:59 -0700 |
| commit | 66cfb26ebd8fa44b24e37b4ffc796ab29dcbd704 (patch) | |
| tree | 4d98b268502c6aa7c8988957d6e41dffd319534d /backend/app | |
| parent | 7fc2f0177518d70114aa75b7874a0ef59bdaec61 (diff) | |
| parent | 52635efd7d435b091b4f13897511ca8e2c48f0b9 (diff) | |
Merge branch 'feat/key-ratios-tab'
Diffstat (limited to 'backend/app')
| -rw-r--r-- | backend/app/main.py | 7 | ||||
| -rw-r--r-- | backend/app/schemas.py | 30 | ||||
| -rw-r--r-- | backend/app/services/data_service.py | 307 |
3 files changed, 343 insertions, 1 deletions
diff --git a/backend/app/main.py b/backend/app/main.py index 1cc127e..7e02cbe 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,7 @@ from fastapi import FastAPI, HTTPException, Query, status from fastapi.middleware.cors import CORSMiddleware from app.db import watchlist -from app.schemas import FinancialsResponse, HistoryPoint, MarketIndex, SearchResult, TickerOverview, ValuationResponse, WatchlistResponse +from app.schemas import FinancialsResponse, HistoryPoint, MarketIndex, RatiosResponse, SearchResult, TickerOverview, ValuationResponse, WatchlistResponse from app.services import data_service load_dotenv() @@ -75,6 +75,11 @@ def ticker_valuation(symbol: str) -> dict: return data_service.get_valuation(symbol) +@app.get("/api/tickers/{symbol}/ratios", response_model=RatiosResponse) +def ticker_ratios(symbol: str) -> dict: + return data_service.get_ratios(symbol) + + @app.get("/api/watchlist", response_model=WatchlistResponse) def get_watchlist() -> dict: items = [] diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 87acd0d..2c64333 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -175,5 +175,35 @@ class WatchlistResponse(BaseModel): limit: int = 10 +class RatioPoint(BaseModel): + value: float | None = None + spark: list[float | None] = Field(default_factory=list) + vs_sector: float | None = None + + +class RatiosResponse(BaseModel): + pe_ttm: RatioPoint + ev_ebitda: RatioPoint + gross_margin: RatioPoint + net_margin: RatioPoint + price_to_book: RatioPoint + price_to_sales: RatioPoint + ev_to_sales: RatioPoint + p_fcf: RatioPoint + forward_pe: RatioPoint + operating_margin: RatioPoint + ebitda_margin: RatioPoint + fcf_margin: RatioPoint + roe: RatioPoint + roa: RatioPoint + roic: RatioPoint + debt_to_equity: RatioPoint + current_ratio: RatioPoint + quick_ratio: RatioPoint + interest_coverage: RatioPoint + dividend_yield: RatioPoint + dividend_payout: RatioPoint + + class ErrorResponse(BaseModel): detail: str diff --git a/backend/app/services/data_service.py b/backend/app/services/data_service.py index 9fe7f67..f913ec5 100644 --- a/backend/app/services/data_service.py +++ b/backend/app/services/data_service.py @@ -3,6 +3,7 @@ from __future__ import annotations import math import os +import statistics from typing import Any import httpx @@ -28,6 +29,9 @@ BETA_CACHE = TTLCache(maxsize=256, ttl=3600) SHORT_CACHE = TTLCache(maxsize=256, ttl=3600) FINANCIALS_CACHE = TTLCache(maxsize=128, ttl=3600) VALUATION_CACHE = TTLCache(maxsize=128, ttl=3600) +HIST_RATIOS_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600) +RATIOS_ENDPOINT_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600) +SECTOR_BENCHMARK_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600) PERIODS = {"1m", "3m", "6m", "1y", "2y", "5y"} YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "2y": "2y", "5y": "5y"} @@ -614,6 +618,309 @@ def _latest_share_count(balance_sheet: pd.DataFrame) -> float | None: return shares if shares is not None and shares > 0 else None +def _find_price_at_date(price_history: list[dict], target: "pd.Timestamp") -> float | None: + """Return closing price from price_history nearest to target date (within 45 days).""" + if not price_history: + return None + best_price: float | None = None + best_delta = float("inf") + for pt in price_history: + try: + delta = abs((pd.Timestamp(pt["date"]) - target).days) + if delta < best_delta: + best_delta = delta + best_price = _safe_float(pt.get("close")) + except Exception: + continue + return best_price if best_delta <= 45 else None + + +@cached(HIST_RATIOS_CACHE) +def compute_historical_ratios(symbol: str) -> dict[str, list[float | None]]: + """Per-fiscal-year ratios from annual statements, oldest-first (up to 4 points).""" + sym = normalize_symbol(symbol) + inc_a = get_income_statement(sym, quarterly=False) + bal_a = get_balance_sheet(sym, quarterly=False) + cf_a = get_cash_flow(sym, quarterly=False) + + if inc_a is None or inc_a.empty: + return {} + + years = list(inc_a.columns[: min(len(inc_a.columns), 4)]) + price_history = get_price_history(sym, period="5y") + current_shares = get_shares_outstanding(sym) + + try: + shares_history_raw = yf.Ticker(sym).get_shares_full(start="2000-01-01") + if isinstance(shares_history_raw, pd.Series): + shares_history = pd.to_numeric(shares_history_raw, errors="coerce").dropna().sort_index() + else: + shares_history = pd.Series(dtype=float) + except Exception: + shares_history = pd.Series(dtype=float) + + result: dict[str, list[float | None]] = {k: [] for k in [ + "gross_margin", "operating_margin", "net_margin", "ebitda_margin", + "roe", "roa", "debt_to_equity", "current_ratio", + "trailing_pe", "ev_to_ebitda", "price_to_book", "price_to_sales", + ]} + + def _balance_shares(period_date: pd.Timestamp) -> float | None: + if bal_a is None or bal_a.empty or period_date not in bal_a.columns: + return None + for label in _SHARE_LABELS: + if label not in bal_a.index: + continue + shares_value = _safe_float(bal_a.loc[label, period_date]) + if shares_value is not None and shares_value > 0: + return shares_value + return None + + def _historical_shares_for_date(period_date: pd.Timestamp) -> float | None: + direct_balance_shares = _balance_shares(period_date) + if direct_balance_shares is not None: + return direct_balance_shares + if not shares_history.empty: + target = pd.Timestamp(period_date) + index = shares_history.index + if getattr(index, "tz", None) is not None and target.tzinfo is None: + target = target.tz_localize(index.tz) + elif getattr(index, "tz", None) is None and target.tzinfo is not None: + target = target.tz_localize(None) + + deltas = pd.Series(index - target, index=index).abs() + if not deltas.empty: + nearest_idx = deltas.idxmin() + if abs(pd.Timestamp(nearest_idx) - target) <= pd.Timedelta(days=180): + shares_value = _safe_float(shares_history.loc[nearest_idx]) + if shares_value is not None and shares_value > 0: + return shares_value + return current_shares + + for col in years: + col_dt = pd.Timestamp(col) + + def _inc(label: str) -> float | None: + if label not in inc_a.index: + return None + return _safe_float(inc_a.loc[label, col]) if col in inc_a.columns else None + + def _bal(label: str) -> float | None: + if bal_a is None or bal_a.empty or label not in bal_a.index: + return None + return _safe_float(bal_a.loc[label, col]) if col in bal_a.columns else None + + revenue = _inc("Total Revenue") + gross_profit = _inc("Gross Profit") + operating_income = _inc("Operating Income") + net_income = _inc("Net Income") + ebitda = _inc("EBITDA") or _inc("Normalized EBITDA") + equity = _bal("Stockholders Equity") or _bal("Common Stock Equity") + total_assets = _bal("Total Assets") + total_debt = _bal("Total Debt") or _bal("Long Term Debt And Capital Lease Obligation") + current_assets = _bal("Current Assets") + current_liabilities = _bal("Current Liabilities") + cash = _bal("Cash And Cash Equivalents") or _bal("Cash Cash Equivalents And Short Term Investments") or 0.0 + period_shares = _historical_shares_for_date(col_dt) + + rev = revenue if revenue and revenue > 0 else None + result["gross_margin"].append(_cap_ratio(gross_profit / rev, -5, 5) if rev and gross_profit is not None else None) + result["operating_margin"].append(_cap_ratio(operating_income / rev, -5, 5) if rev and operating_income is not None else None) + result["net_margin"].append(_cap_ratio(net_income / rev, -5, 5) if rev and net_income is not None else None) + result["ebitda_margin"].append(_cap_ratio(ebitda / rev, -5, 5) if rev and ebitda is not None else None) + result["roe"].append(_cap_ratio(net_income / equity, -10, 10) if equity and equity > 0 and net_income is not None else None) + result["roa"].append(_cap_ratio(net_income / total_assets, -10, 10) if total_assets and total_assets > 0 and net_income is not None else None) + result["debt_to_equity"].append(_cap_ratio(total_debt / equity, -1, 100) if equity and equity > 0 and total_debt is not None else None) + result["current_ratio"].append(current_assets / current_liabilities if current_liabilities and current_liabilities > 0 and current_assets is not None else None) + + price = _find_price_at_date(price_history, col_dt) + market_cap = price * period_shares if price and period_shares else None + ev = market_cap + (total_debt or 0.0) - cash if market_cap else None + + result["trailing_pe"].append(_cap_ratio(market_cap / net_income, 0, 500) if market_cap and net_income and net_income > 0 else None) + result["ev_to_ebitda"].append(_cap_ratio(ev / ebitda, 0, 500) if ev and ebitda and ebitda > 1e6 else None) + result["price_to_book"].append(_cap_ratio(market_cap / equity, 0, 100) if market_cap and equity and equity > 0 else None) + result["price_to_sales"].append(_cap_ratio(market_cap / revenue, 0, 100) if market_cap and revenue and revenue > 0 else None) + + return {k: list(reversed(v)) for k, v in result.items()} + + +@cached(RATIOS_ENDPOINT_CACHE) +def get_ratios(symbol: str) -> dict: + """Build the full RatiosResponse dict for the /ratios endpoint.""" + sym = normalize_symbol(symbol) + ttm = compute_ttm_ratios(sym) + hist = compute_historical_ratios(sym) + info = get_company_info(sym) + sector_bench = compute_sector_ratio_benchmarks(sym) + + income = get_income_statement(sym, quarterly=True) + balance = get_balance_sheet(sym, quarterly=True) + cf = get_cash_flow(sym, quarterly=True) + + ebitda = _statement_ttm(income, "EBITDA", "Normalized EBITDA") + revenue = _statement_ttm(income, "Total Revenue") + current_assets = _balance_value(balance, "Current Assets") + current_liabilities = _balance_value(balance, "Current Liabilities") + inventory = _balance_value(balance, "Inventory") + ebit = _statement_ttm(income, "EBIT") + interest_expense = _statement_ttm(income, "Interest Expense") + op_cf = _statement_ttm(cf, "Operating Cash Flow", "Cash From Operations") + capex_raw = _statement_ttm(cf, "Capital Expenditure") + capex = abs(capex_raw) if capex_raw is not None else None + fcf = (op_cf - capex) if op_cf is not None and capex is not None else None + market_cap = ttm.get("market_cap") + + quick_ratio: float | None = None + if current_liabilities and current_liabilities > 0 and current_assets is not None: + quick_ratio = (current_assets - (inventory or 0.0)) / current_liabilities + + interest_coverage: float | None = None + if interest_expense and ebit is not None: + ie = abs(interest_expense) + if ie > 0 and ebit > 0: + interest_coverage = _cap_ratio(ebit / ie, 0, 1000) + + ebitda_margin = _cap_ratio(ebitda / revenue, -5, 5) if revenue and revenue > 0 and ebitda is not None else None + fcf_margin = _cap_ratio(fcf / revenue, -5, 5) if revenue and revenue > 0 and fcf is not None else None + p_fcf = _cap_ratio(market_cap / fcf, 0, 1000) if market_cap and fcf and fcf > 0 else None + + fwd_pe = _safe_float(info.get("forwardPE")) if info else None + forward_pe = fwd_pe if fwd_pe and 0 < fwd_pe < 500 else None + + def point( + ttm_key: str | None, + hist_key: str | None, + override: float | None = None, + sector_key: str | None = None, + ) -> dict: + val = override if override is not None else (ttm.get(ttm_key) if ttm_key else None) + spark = hist.get(hist_key, []) if hist_key else [] + skey = sector_key if sector_key is not None else ttm_key + vs_sector = sector_bench.get(skey) if skey else None + return {"value": val, "spark": spark, "vs_sector": vs_sector} + + return { + "pe_ttm": point("trailing_pe", "trailing_pe"), + "ev_ebitda": point("ev_to_ebitda", "ev_to_ebitda"), + "gross_margin": point("gross_margin_ttm", "gross_margin"), + "net_margin": point("net_margin_ttm", "net_margin"), + "price_to_book": point("price_to_book", "price_to_book"), + "price_to_sales": point("price_to_sales", "price_to_sales"), + "ev_to_sales": point("ev_to_sales", None), + "p_fcf": point(None, None, p_fcf), + "forward_pe": point(None, None, forward_pe, "trailing_pe"), + "operating_margin": point("operating_margin_ttm", "operating_margin"), + "ebitda_margin": point(None, "ebitda_margin", ebitda_margin, "operating_margin_ttm"), + "fcf_margin": point(None, None, fcf_margin), + "roe": point("roe_ttm", "roe"), + "roa": point("roa_ttm", "roa"), + "roic": point("roic_ttm", None), + "debt_to_equity": point("debt_to_equity", "debt_to_equity"), + "current_ratio": point("current_ratio", "current_ratio"), + "quick_ratio": point(None, None, quick_ratio, "current_ratio"), + "interest_coverage": point(None, None, interest_coverage), + "dividend_yield": point("dividend_yield_ttm", None), + "dividend_payout": point("dividend_payout_ratio_ttm", None), + } + + +@cached(SECTOR_BENCHMARK_CACHE) +def compute_sector_ratio_benchmarks(symbol: str) -> dict[str, float]: + """Median TTM ratio benchmarks from same-sector peers (FMP-backed when available).""" + sym = normalize_symbol(symbol) + fmp_key = os.getenv("FMP_API_KEY") + + info = get_company_info(sym) + sector_raw = info.get("sector") if isinstance(info, dict) else None + sector = str(sector_raw or "").strip() + if not sector: + enrichment = get_profile_enrichment(sym) + sector = str((enrichment or {}).get("sector") or "").strip() + if not sector: + return {} + + peer_symbols: list[str] = [] + if fmp_key: + try: + with httpx.Client(timeout=3.5) as client: + res = client.get( + "https://financialmodelingprep.com/api/v3/stock-screener", + params={ + "sector": sector, + "isEtf": "false", + "isActivelyTrading": "true", + "limit": 12, + "apikey": fmp_key, + }, + ) + rows = res.json() + if isinstance(rows, list): + for row in rows: + psym = normalize_symbol((row or {}).get("symbol")) + if not psym or psym == sym: + continue + peer_symbols.append(psym) + except Exception: + peer_symbols = [] + + # No-key or FMP failure fallback: search by sector term, then filter by exact sector. + if not peer_symbols: + try: + candidates = search_tickers(sector) + except Exception: + candidates = [] + target_sector = sector.lower() + for row in candidates[:24]: + psym = normalize_symbol((row or {}).get("symbol")) + if not psym or psym == sym: + continue + pinfo = get_company_info(psym) + psector = str((pinfo or {}).get("sector") or "").strip().lower() + if psector and psector == target_sector: + peer_symbols.append(psym) + + if not peer_symbols: + return {} + + keys = [ + "trailing_pe", + "ev_to_ebitda", + "gross_margin_ttm", + "net_margin_ttm", + "price_to_book", + "price_to_sales", + "ev_to_sales", + "operating_margin_ttm", + "roe_ttm", + "roa_ttm", + "roic_ttm", + "debt_to_equity", + "current_ratio", + "dividend_yield_ttm", + "dividend_payout_ratio_ttm", + ] + buckets: dict[str, list[float]] = {k: [] for k in keys} + + for psym in peer_symbols[:6]: + try: + ratios = compute_ttm_ratios(psym) + except Exception: + continue + if not isinstance(ratios, dict): + continue + for key in keys: + val = _safe_float(ratios.get(key)) + if val is not None: + buckets[key].append(val) + + out: dict[str, float] = {} + for key, values in buckets.items(): + if values: + out[key] = float(statistics.median(values)) + return out + + def _pick_search_match(symbol: str) -> dict[str, Any]: sym = normalize_symbol(symbol) results = search_tickers(sym) |
