diff options
| author | Tyler <tyler@tylerhoang.xyz> | 2026-05-16 00:02:32 -0700 |
|---|---|---|
| committer | Tyler <tyler@tylerhoang.xyz> | 2026-05-16 00:02:32 -0700 |
| commit | 0d888203cbc4dc596d0c05cedfeabe8785b263fc (patch) | |
| tree | 7aa04a8b6b669fc8258e7e95905c07656c6f93f9 /services | |
| parent | 870f8e6c8b88d61d0f7183b938b9a496c193b141 (diff) | |
Fix valuation and data robustness bugs
Diffstat (limited to 'services')
| -rw-r--r-- | services/data_service.py | 217 |
1 files changed, 167 insertions, 50 deletions
diff --git a/services/data_service.py b/services/data_service.py index bfd1290..9c82e14 100644 --- a/services/data_service.py +++ b/services/data_service.py @@ -26,9 +26,12 @@ def search_tickers(query: str) -> list[dict]: @st.cache_data(ttl=300) def get_company_info(ticker: str) -> dict: """Return company info dict from yfinance.""" - t = yf.Ticker(ticker.upper()) - info = t.info or {} - return info + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + return info if isinstance(info, dict) else {} + except Exception: + return {} @st.cache_data(ttl=300) @@ -57,7 +60,7 @@ def get_shares_outstanding(ticker: str) -> float | None: try: t = yf.Ticker(ticker.upper()) info = t.info or {} - for key in ("sharesOutstanding", "impliedSharesOutstanding", "floatShares"): + for key in ("sharesOutstanding", "impliedSharesOutstanding"): val = info.get(key) if val is not None: return float(val) @@ -88,31 +91,45 @@ def get_market_cap_computed(ticker: str) -> float | None: @st.cache_data(ttl=300) def get_price_history(ticker: str, period: str = "1y") -> pd.DataFrame: """Return OHLCV price history.""" - t = yf.Ticker(ticker.upper()) - df = t.history(period=period) - df.index = pd.to_datetime(df.index) - return df + try: + t = yf.Ticker(ticker.upper()) + df = t.history(period=period) + if df is None or df.empty: + return pd.DataFrame() + df.index = pd.to_datetime(df.index) + return df + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_income_statement(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_income_stmt if quarterly else t.income_stmt - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_income_stmt if quarterly else t.income_stmt + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_balance_sheet(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_balance_sheet if quarterly else t.balance_sheet - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_balance_sheet if quarterly else t.balance_sheet + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_cash_flow(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_cashflow if quarterly else t.cashflow - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_cashflow if quarterly else t.cashflow + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=300) @@ -188,12 +205,51 @@ def get_next_earnings_date(ticker: str) -> str | None: """Return the next expected earnings date as a string, or None. Uses t.calendar (no lxml dependency). """ + def _collect_dates(value) -> list: + if value is None: + return [] + if isinstance(value, dict): + for key in ("Earnings Date", "earningsDate", "earnings_date"): + if key in value: + return _collect_dates(value.get(key)) + out = [] + for nested in value.values(): + out.extend(_collect_dates(nested)) + return out + if isinstance(value, pd.DataFrame): + out = [] + for col in value.columns: + out.extend(_collect_dates(value[col])) + return out + if isinstance(value, pd.Series): + if "Earnings Date" in value.index: + return _collect_dates(value.get("Earnings Date")) + return _collect_dates(value.tolist()) + if isinstance(value, pd.Index): + return _collect_dates(value.tolist()) + if isinstance(value, (list, tuple, set)): + out = [] + for item in value: + out.extend(_collect_dates(item)) + return out + return [value] + try: t = yf.Ticker(ticker.upper()) cal = t.calendar - dates = cal.get("Earnings Date", []) - if dates: - return str(dates[0]) + raw_dates = _collect_dates(cal) + parsed_dates = [] + for value in raw_dates: + dt = pd.to_datetime(value, errors="coerce") + if pd.notna(dt): + ts = pd.Timestamp(dt) + parsed_dates.append(ts.tz_localize(None) if ts.tzinfo else ts) + + if parsed_dates: + today = pd.Timestamp.today().normalize() + future_dates = sorted({dt.normalize() for dt in parsed_dates if dt.normalize() >= today}) + chosen = future_dates[0] if future_dates else sorted({dt.normalize() for dt in parsed_dates})[0] + return chosen.strftime("%b %d, %Y") return None except Exception: return None @@ -481,14 +537,67 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: t = yf.Ticker(ticker.upper()) income = t.income_stmt # rows=metrics, cols=fiscal-year dates balance = t.balance_sheet - info = t.info or {} if income is None or income.empty: return [] + try: + shares_history = t.get_shares_full(start="2000-01-01") + if isinstance(shares_history, pd.Series): + shares_history = shares_history.dropna().sort_index() + else: + shares_history = pd.Series(dtype=float) + except Exception: + shares_history = pd.Series(dtype=float) + + def _balance_shares(period_date) -> float | None: + if balance is None or balance.empty or period_date not in balance.columns: + return None + for label in ( + "Ordinary Shares Number", + "Share Issued", + "Common Stock Shares Outstanding", + ): + if label in balance.index: + value = balance.loc[label, period_date] + if pd.notna(value): + try: + shares_value = float(value) + except (TypeError, ValueError): + continue + if shares_value > 0: + return shares_value + return None + + def _historical_shares_for_date(period_date) -> float | None: + direct_balance_shares = _balance_shares(period_date) + if direct_balance_shares: + return direct_balance_shares + if shares_history.empty: + return None + + target = pd.Timestamp(period_date) + index = shares_history.index + if getattr(index, "tz", None) is not None and target.tzinfo is None: + target = target.tz_localize(index.tz) + elif getattr(index, "tz", None) is None and target.tzinfo is not None: + target = target.tz_localize(None) + + deltas = pd.Series(index - target, index=index).abs() + if deltas.empty: + return None + nearest_idx = deltas.idxmin() + if abs(pd.Timestamp(nearest_idx) - target) > pd.Timedelta(days=180): + return None + + try: + shares_value = float(shares_history.loc[nearest_idx]) + except (TypeError, ValueError): + return None + return shares_value if shares_value > 0 else None + # One year of monthly price history per fiscal year going back 10 years hist = t.history(period="10y", interval="1mo") - shares = get_shares_outstanding(ticker) rows: list[dict] = [] for date in income.columns: @@ -549,8 +658,10 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: if abs(roa) < 10: row["returnOnAssets"] = roa - # Price-based ratios — average closing price in ±45-day window around year-end - if shares and not hist.empty: + period_shares = _historical_shares_for_date(date) + + # Price-based ratios use period-appropriate shares when available. + if period_shares and not hist.empty: try: date_ts = pd.Timestamp(date) # Normalize timezones: yfinance history index may be tz-aware @@ -564,7 +675,7 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: window = hist.loc[mask, "Close"] if not window.empty: price = float(window.mean()) - market_cap = price * shares + market_cap = price * period_shares if net_income and net_income > 0: row["peRatio"] = market_cap / net_income @@ -671,40 +782,46 @@ def get_balance_sheet_bridge_items(ticker: str) -> dict: @st.cache_data(ttl=3600) def get_free_cash_flow_series(ticker: str) -> pd.Series: """Return annual Free Cash Flow series (most recent first).""" - t = yf.Ticker(ticker.upper()) - cf = t.cashflow - if cf is None or cf.empty: - return pd.Series(dtype=float) - if "Free Cash Flow" in cf.index: - return cf.loc["Free Cash Flow"].dropna() - # Compute from operating CF - capex try: - op = cf.loc["Operating Cash Flow"] - capex = cf.loc["Capital Expenditure"] - return (op + capex).dropna() - except KeyError: + t = yf.Ticker(ticker.upper()) + cf = t.cashflow + if cf is None or cf.empty: + return pd.Series(dtype=float) + if "Free Cash Flow" in cf.index: + return cf.loc["Free Cash Flow"].dropna() + # Compute from operating CF - capex + try: + op = cf.loc["Operating Cash Flow"] + capex = cf.loc["Capital Expenditure"] + return (op + capex).dropna() + except KeyError: + return pd.Series(dtype=float) + except Exception: return pd.Series(dtype=float) @st.cache_data(ttl=3600) def get_free_cash_flow_ttm(ticker: str) -> float | None: """Return trailing-twelve-month free cash flow from quarterly cash flow statements.""" - t = yf.Ticker(ticker.upper()) - cf_q = t.quarterly_cashflow - if cf_q is None or cf_q.empty: - return None + try: + t = yf.Ticker(ticker.upper()) + cf_q = t.quarterly_cashflow + if cf_q is None or cf_q.empty: + return None - if "Free Cash Flow" in cf_q.index: - vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna() - if len(vals) == 4: - return float(vals.sum()) + if "Free Cash Flow" in cf_q.index: + vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna() + if len(vals) == 4: + return float(vals.sum()) - try: - op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna() - capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna() - if len(op) == 4 and len(capex) == 4: - return float((op + capex).sum()) - except KeyError: + try: + op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna() + capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna() + if len(op) == 4 and len(capex) == 4: + return float((op + capex).sum()) + except KeyError: + return None + except Exception: return None return None |
