summaryrefslogtreecommitdiff
path: root/backend/app/services/data_service.py
diff options
context:
space:
mode:
authorTyler Hoang <tyler@tylerhoang.xyz>2026-05-18 22:45:59 -0700
committerTyler Hoang <tyler@tylerhoang.xyz>2026-05-18 22:45:59 -0700
commit66cfb26ebd8fa44b24e37b4ffc796ab29dcbd704 (patch)
tree4d98b268502c6aa7c8988957d6e41dffd319534d /backend/app/services/data_service.py
parent7fc2f0177518d70114aa75b7874a0ef59bdaec61 (diff)
parent52635efd7d435b091b4f13897511ca8e2c48f0b9 (diff)
Merge branch 'feat/key-ratios-tab'
Diffstat (limited to 'backend/app/services/data_service.py')
-rw-r--r--backend/app/services/data_service.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/backend/app/services/data_service.py b/backend/app/services/data_service.py
index 9fe7f67..f913ec5 100644
--- a/backend/app/services/data_service.py
+++ b/backend/app/services/data_service.py
@@ -3,6 +3,7 @@ from __future__ import annotations
import math
import os
+import statistics
from typing import Any
import httpx
@@ -28,6 +29,9 @@ BETA_CACHE = TTLCache(maxsize=256, ttl=3600)
SHORT_CACHE = TTLCache(maxsize=256, ttl=3600)
FINANCIALS_CACHE = TTLCache(maxsize=128, ttl=3600)
VALUATION_CACHE = TTLCache(maxsize=128, ttl=3600)
+HIST_RATIOS_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600)
+RATIOS_ENDPOINT_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600)
+SECTOR_BENCHMARK_CACHE: TTLCache = TTLCache(maxsize=128, ttl=3600)
PERIODS = {"1m", "3m", "6m", "1y", "2y", "5y"}
YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "2y": "2y", "5y": "5y"}
@@ -614,6 +618,309 @@ def _latest_share_count(balance_sheet: pd.DataFrame) -> float | None:
return shares if shares is not None and shares > 0 else None
+def _find_price_at_date(price_history: list[dict], target: "pd.Timestamp") -> float | None:
+ """Return closing price from price_history nearest to target date (within 45 days)."""
+ if not price_history:
+ return None
+ best_price: float | None = None
+ best_delta = float("inf")
+ for pt in price_history:
+ try:
+ delta = abs((pd.Timestamp(pt["date"]) - target).days)
+ if delta < best_delta:
+ best_delta = delta
+ best_price = _safe_float(pt.get("close"))
+ except Exception:
+ continue
+ return best_price if best_delta <= 45 else None
+
+
+@cached(HIST_RATIOS_CACHE)
+def compute_historical_ratios(symbol: str) -> dict[str, list[float | None]]:
+ """Per-fiscal-year ratios from annual statements, oldest-first (up to 4 points)."""
+ sym = normalize_symbol(symbol)
+ inc_a = get_income_statement(sym, quarterly=False)
+ bal_a = get_balance_sheet(sym, quarterly=False)
+ cf_a = get_cash_flow(sym, quarterly=False)
+
+ if inc_a is None or inc_a.empty:
+ return {}
+
+ years = list(inc_a.columns[: min(len(inc_a.columns), 4)])
+ price_history = get_price_history(sym, period="5y")
+ current_shares = get_shares_outstanding(sym)
+
+ try:
+ shares_history_raw = yf.Ticker(sym).get_shares_full(start="2000-01-01")
+ if isinstance(shares_history_raw, pd.Series):
+ shares_history = pd.to_numeric(shares_history_raw, errors="coerce").dropna().sort_index()
+ else:
+ shares_history = pd.Series(dtype=float)
+ except Exception:
+ shares_history = pd.Series(dtype=float)
+
+ result: dict[str, list[float | None]] = {k: [] for k in [
+ "gross_margin", "operating_margin", "net_margin", "ebitda_margin",
+ "roe", "roa", "debt_to_equity", "current_ratio",
+ "trailing_pe", "ev_to_ebitda", "price_to_book", "price_to_sales",
+ ]}
+
+ def _balance_shares(period_date: pd.Timestamp) -> float | None:
+ if bal_a is None or bal_a.empty or period_date not in bal_a.columns:
+ return None
+ for label in _SHARE_LABELS:
+ if label not in bal_a.index:
+ continue
+ shares_value = _safe_float(bal_a.loc[label, period_date])
+ if shares_value is not None and shares_value > 0:
+ return shares_value
+ return None
+
+ def _historical_shares_for_date(period_date: pd.Timestamp) -> float | None:
+ direct_balance_shares = _balance_shares(period_date)
+ if direct_balance_shares is not None:
+ return direct_balance_shares
+ if not shares_history.empty:
+ target = pd.Timestamp(period_date)
+ index = shares_history.index
+ if getattr(index, "tz", None) is not None and target.tzinfo is None:
+ target = target.tz_localize(index.tz)
+ elif getattr(index, "tz", None) is None and target.tzinfo is not None:
+ target = target.tz_localize(None)
+
+ deltas = pd.Series(index - target, index=index).abs()
+ if not deltas.empty:
+ nearest_idx = deltas.idxmin()
+ if abs(pd.Timestamp(nearest_idx) - target) <= pd.Timedelta(days=180):
+ shares_value = _safe_float(shares_history.loc[nearest_idx])
+ if shares_value is not None and shares_value > 0:
+ return shares_value
+ return current_shares
+
+ for col in years:
+ col_dt = pd.Timestamp(col)
+
+ def _inc(label: str) -> float | None:
+ if label not in inc_a.index:
+ return None
+ return _safe_float(inc_a.loc[label, col]) if col in inc_a.columns else None
+
+ def _bal(label: str) -> float | None:
+ if bal_a is None or bal_a.empty or label not in bal_a.index:
+ return None
+ return _safe_float(bal_a.loc[label, col]) if col in bal_a.columns else None
+
+ revenue = _inc("Total Revenue")
+ gross_profit = _inc("Gross Profit")
+ operating_income = _inc("Operating Income")
+ net_income = _inc("Net Income")
+ ebitda = _inc("EBITDA") or _inc("Normalized EBITDA")
+ equity = _bal("Stockholders Equity") or _bal("Common Stock Equity")
+ total_assets = _bal("Total Assets")
+ total_debt = _bal("Total Debt") or _bal("Long Term Debt And Capital Lease Obligation")
+ current_assets = _bal("Current Assets")
+ current_liabilities = _bal("Current Liabilities")
+ cash = _bal("Cash And Cash Equivalents") or _bal("Cash Cash Equivalents And Short Term Investments") or 0.0
+ period_shares = _historical_shares_for_date(col_dt)
+
+ rev = revenue if revenue and revenue > 0 else None
+ result["gross_margin"].append(_cap_ratio(gross_profit / rev, -5, 5) if rev and gross_profit is not None else None)
+ result["operating_margin"].append(_cap_ratio(operating_income / rev, -5, 5) if rev and operating_income is not None else None)
+ result["net_margin"].append(_cap_ratio(net_income / rev, -5, 5) if rev and net_income is not None else None)
+ result["ebitda_margin"].append(_cap_ratio(ebitda / rev, -5, 5) if rev and ebitda is not None else None)
+ result["roe"].append(_cap_ratio(net_income / equity, -10, 10) if equity and equity > 0 and net_income is not None else None)
+ result["roa"].append(_cap_ratio(net_income / total_assets, -10, 10) if total_assets and total_assets > 0 and net_income is not None else None)
+ result["debt_to_equity"].append(_cap_ratio(total_debt / equity, -1, 100) if equity and equity > 0 and total_debt is not None else None)
+ result["current_ratio"].append(current_assets / current_liabilities if current_liabilities and current_liabilities > 0 and current_assets is not None else None)
+
+ price = _find_price_at_date(price_history, col_dt)
+ market_cap = price * period_shares if price and period_shares else None
+ ev = market_cap + (total_debt or 0.0) - cash if market_cap else None
+
+ result["trailing_pe"].append(_cap_ratio(market_cap / net_income, 0, 500) if market_cap and net_income and net_income > 0 else None)
+ result["ev_to_ebitda"].append(_cap_ratio(ev / ebitda, 0, 500) if ev and ebitda and ebitda > 1e6 else None)
+ result["price_to_book"].append(_cap_ratio(market_cap / equity, 0, 100) if market_cap and equity and equity > 0 else None)
+ result["price_to_sales"].append(_cap_ratio(market_cap / revenue, 0, 100) if market_cap and revenue and revenue > 0 else None)
+
+ return {k: list(reversed(v)) for k, v in result.items()}
+
+
+@cached(RATIOS_ENDPOINT_CACHE)
+def get_ratios(symbol: str) -> dict:
+ """Build the full RatiosResponse dict for the /ratios endpoint."""
+ sym = normalize_symbol(symbol)
+ ttm = compute_ttm_ratios(sym)
+ hist = compute_historical_ratios(sym)
+ info = get_company_info(sym)
+ sector_bench = compute_sector_ratio_benchmarks(sym)
+
+ income = get_income_statement(sym, quarterly=True)
+ balance = get_balance_sheet(sym, quarterly=True)
+ cf = get_cash_flow(sym, quarterly=True)
+
+ ebitda = _statement_ttm(income, "EBITDA", "Normalized EBITDA")
+ revenue = _statement_ttm(income, "Total Revenue")
+ current_assets = _balance_value(balance, "Current Assets")
+ current_liabilities = _balance_value(balance, "Current Liabilities")
+ inventory = _balance_value(balance, "Inventory")
+ ebit = _statement_ttm(income, "EBIT")
+ interest_expense = _statement_ttm(income, "Interest Expense")
+ op_cf = _statement_ttm(cf, "Operating Cash Flow", "Cash From Operations")
+ capex_raw = _statement_ttm(cf, "Capital Expenditure")
+ capex = abs(capex_raw) if capex_raw is not None else None
+ fcf = (op_cf - capex) if op_cf is not None and capex is not None else None
+ market_cap = ttm.get("market_cap")
+
+ quick_ratio: float | None = None
+ if current_liabilities and current_liabilities > 0 and current_assets is not None:
+ quick_ratio = (current_assets - (inventory or 0.0)) / current_liabilities
+
+ interest_coverage: float | None = None
+ if interest_expense and ebit is not None:
+ ie = abs(interest_expense)
+ if ie > 0 and ebit > 0:
+ interest_coverage = _cap_ratio(ebit / ie, 0, 1000)
+
+ ebitda_margin = _cap_ratio(ebitda / revenue, -5, 5) if revenue and revenue > 0 and ebitda is not None else None
+ fcf_margin = _cap_ratio(fcf / revenue, -5, 5) if revenue and revenue > 0 and fcf is not None else None
+ p_fcf = _cap_ratio(market_cap / fcf, 0, 1000) if market_cap and fcf and fcf > 0 else None
+
+ fwd_pe = _safe_float(info.get("forwardPE")) if info else None
+ forward_pe = fwd_pe if fwd_pe and 0 < fwd_pe < 500 else None
+
+ def point(
+ ttm_key: str | None,
+ hist_key: str | None,
+ override: float | None = None,
+ sector_key: str | None = None,
+ ) -> dict:
+ val = override if override is not None else (ttm.get(ttm_key) if ttm_key else None)
+ spark = hist.get(hist_key, []) if hist_key else []
+ skey = sector_key if sector_key is not None else ttm_key
+ vs_sector = sector_bench.get(skey) if skey else None
+ return {"value": val, "spark": spark, "vs_sector": vs_sector}
+
+ return {
+ "pe_ttm": point("trailing_pe", "trailing_pe"),
+ "ev_ebitda": point("ev_to_ebitda", "ev_to_ebitda"),
+ "gross_margin": point("gross_margin_ttm", "gross_margin"),
+ "net_margin": point("net_margin_ttm", "net_margin"),
+ "price_to_book": point("price_to_book", "price_to_book"),
+ "price_to_sales": point("price_to_sales", "price_to_sales"),
+ "ev_to_sales": point("ev_to_sales", None),
+ "p_fcf": point(None, None, p_fcf),
+ "forward_pe": point(None, None, forward_pe, "trailing_pe"),
+ "operating_margin": point("operating_margin_ttm", "operating_margin"),
+ "ebitda_margin": point(None, "ebitda_margin", ebitda_margin, "operating_margin_ttm"),
+ "fcf_margin": point(None, None, fcf_margin),
+ "roe": point("roe_ttm", "roe"),
+ "roa": point("roa_ttm", "roa"),
+ "roic": point("roic_ttm", None),
+ "debt_to_equity": point("debt_to_equity", "debt_to_equity"),
+ "current_ratio": point("current_ratio", "current_ratio"),
+ "quick_ratio": point(None, None, quick_ratio, "current_ratio"),
+ "interest_coverage": point(None, None, interest_coverage),
+ "dividend_yield": point("dividend_yield_ttm", None),
+ "dividend_payout": point("dividend_payout_ratio_ttm", None),
+ }
+
+
+@cached(SECTOR_BENCHMARK_CACHE)
+def compute_sector_ratio_benchmarks(symbol: str) -> dict[str, float]:
+ """Median TTM ratio benchmarks from same-sector peers (FMP-backed when available)."""
+ sym = normalize_symbol(symbol)
+ fmp_key = os.getenv("FMP_API_KEY")
+
+ info = get_company_info(sym)
+ sector_raw = info.get("sector") if isinstance(info, dict) else None
+ sector = str(sector_raw or "").strip()
+ if not sector:
+ enrichment = get_profile_enrichment(sym)
+ sector = str((enrichment or {}).get("sector") or "").strip()
+ if not sector:
+ return {}
+
+ peer_symbols: list[str] = []
+ if fmp_key:
+ try:
+ with httpx.Client(timeout=3.5) as client:
+ res = client.get(
+ "https://financialmodelingprep.com/api/v3/stock-screener",
+ params={
+ "sector": sector,
+ "isEtf": "false",
+ "isActivelyTrading": "true",
+ "limit": 12,
+ "apikey": fmp_key,
+ },
+ )
+ rows = res.json()
+ if isinstance(rows, list):
+ for row in rows:
+ psym = normalize_symbol((row or {}).get("symbol"))
+ if not psym or psym == sym:
+ continue
+ peer_symbols.append(psym)
+ except Exception:
+ peer_symbols = []
+
+ # No-key or FMP failure fallback: search by sector term, then filter by exact sector.
+ if not peer_symbols:
+ try:
+ candidates = search_tickers(sector)
+ except Exception:
+ candidates = []
+ target_sector = sector.lower()
+ for row in candidates[:24]:
+ psym = normalize_symbol((row or {}).get("symbol"))
+ if not psym or psym == sym:
+ continue
+ pinfo = get_company_info(psym)
+ psector = str((pinfo or {}).get("sector") or "").strip().lower()
+ if psector and psector == target_sector:
+ peer_symbols.append(psym)
+
+ if not peer_symbols:
+ return {}
+
+ keys = [
+ "trailing_pe",
+ "ev_to_ebitda",
+ "gross_margin_ttm",
+ "net_margin_ttm",
+ "price_to_book",
+ "price_to_sales",
+ "ev_to_sales",
+ "operating_margin_ttm",
+ "roe_ttm",
+ "roa_ttm",
+ "roic_ttm",
+ "debt_to_equity",
+ "current_ratio",
+ "dividend_yield_ttm",
+ "dividend_payout_ratio_ttm",
+ ]
+ buckets: dict[str, list[float]] = {k: [] for k in keys}
+
+ for psym in peer_symbols[:6]:
+ try:
+ ratios = compute_ttm_ratios(psym)
+ except Exception:
+ continue
+ if not isinstance(ratios, dict):
+ continue
+ for key in keys:
+ val = _safe_float(ratios.get(key))
+ if val is not None:
+ buckets[key].append(val)
+
+ out: dict[str, float] = {}
+ for key, values in buckets.items():
+ if values:
+ out[key] = float(statistics.median(values))
+ return out
+
+
def _pick_search_match(symbol: str) -> dict[str, Any]:
sym = normalize_symbol(symbol)
results = search_tickers(sym)