aboutsummaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/data_service.py217
1 files changed, 167 insertions, 50 deletions
diff --git a/services/data_service.py b/services/data_service.py
index bfd1290..9c82e14 100644
--- a/services/data_service.py
+++ b/services/data_service.py
@@ -26,9 +26,12 @@ def search_tickers(query: str) -> list[dict]:
@st.cache_data(ttl=300)
def get_company_info(ticker: str) -> dict:
"""Return company info dict from yfinance."""
- t = yf.Ticker(ticker.upper())
- info = t.info or {}
- return info
+ try:
+ t = yf.Ticker(ticker.upper())
+ info = t.info or {}
+ return info if isinstance(info, dict) else {}
+ except Exception:
+ return {}
@st.cache_data(ttl=300)
@@ -57,7 +60,7 @@ def get_shares_outstanding(ticker: str) -> float | None:
try:
t = yf.Ticker(ticker.upper())
info = t.info or {}
- for key in ("sharesOutstanding", "impliedSharesOutstanding", "floatShares"):
+ for key in ("sharesOutstanding", "impliedSharesOutstanding"):
val = info.get(key)
if val is not None:
return float(val)
@@ -88,31 +91,45 @@ def get_market_cap_computed(ticker: str) -> float | None:
@st.cache_data(ttl=300)
def get_price_history(ticker: str, period: str = "1y") -> pd.DataFrame:
"""Return OHLCV price history."""
- t = yf.Ticker(ticker.upper())
- df = t.history(period=period)
- df.index = pd.to_datetime(df.index)
- return df
+ try:
+ t = yf.Ticker(ticker.upper())
+ df = t.history(period=period)
+ if df is None or df.empty:
+ return pd.DataFrame()
+ df.index = pd.to_datetime(df.index)
+ return df
+ except Exception:
+ return pd.DataFrame()
@st.cache_data(ttl=3600)
def get_income_statement(ticker: str, quarterly: bool = False) -> pd.DataFrame:
- t = yf.Ticker(ticker.upper())
- df = t.quarterly_income_stmt if quarterly else t.income_stmt
- return df if df is not None else pd.DataFrame()
+ try:
+ t = yf.Ticker(ticker.upper())
+ df = t.quarterly_income_stmt if quarterly else t.income_stmt
+ return df if df is not None else pd.DataFrame()
+ except Exception:
+ return pd.DataFrame()
@st.cache_data(ttl=3600)
def get_balance_sheet(ticker: str, quarterly: bool = False) -> pd.DataFrame:
- t = yf.Ticker(ticker.upper())
- df = t.quarterly_balance_sheet if quarterly else t.balance_sheet
- return df if df is not None else pd.DataFrame()
+ try:
+ t = yf.Ticker(ticker.upper())
+ df = t.quarterly_balance_sheet if quarterly else t.balance_sheet
+ return df if df is not None else pd.DataFrame()
+ except Exception:
+ return pd.DataFrame()
@st.cache_data(ttl=3600)
def get_cash_flow(ticker: str, quarterly: bool = False) -> pd.DataFrame:
- t = yf.Ticker(ticker.upper())
- df = t.quarterly_cashflow if quarterly else t.cashflow
- return df if df is not None else pd.DataFrame()
+ try:
+ t = yf.Ticker(ticker.upper())
+ df = t.quarterly_cashflow if quarterly else t.cashflow
+ return df if df is not None else pd.DataFrame()
+ except Exception:
+ return pd.DataFrame()
@st.cache_data(ttl=300)
@@ -188,12 +205,51 @@ def get_next_earnings_date(ticker: str) -> str | None:
"""Return the next expected earnings date as a string, or None.
Uses t.calendar (no lxml dependency).
"""
+ def _collect_dates(value) -> list:
+ if value is None:
+ return []
+ if isinstance(value, dict):
+ for key in ("Earnings Date", "earningsDate", "earnings_date"):
+ if key in value:
+ return _collect_dates(value.get(key))
+ out = []
+ for nested in value.values():
+ out.extend(_collect_dates(nested))
+ return out
+ if isinstance(value, pd.DataFrame):
+ out = []
+ for col in value.columns:
+ out.extend(_collect_dates(value[col]))
+ return out
+ if isinstance(value, pd.Series):
+ if "Earnings Date" in value.index:
+ return _collect_dates(value.get("Earnings Date"))
+ return _collect_dates(value.tolist())
+ if isinstance(value, pd.Index):
+ return _collect_dates(value.tolist())
+ if isinstance(value, (list, tuple, set)):
+ out = []
+ for item in value:
+ out.extend(_collect_dates(item))
+ return out
+ return [value]
+
try:
t = yf.Ticker(ticker.upper())
cal = t.calendar
- dates = cal.get("Earnings Date", [])
- if dates:
- return str(dates[0])
+ raw_dates = _collect_dates(cal)
+ parsed_dates = []
+ for value in raw_dates:
+ dt = pd.to_datetime(value, errors="coerce")
+ if pd.notna(dt):
+ ts = pd.Timestamp(dt)
+ parsed_dates.append(ts.tz_localize(None) if ts.tzinfo else ts)
+
+ if parsed_dates:
+ today = pd.Timestamp.today().normalize()
+ future_dates = sorted({dt.normalize() for dt in parsed_dates if dt.normalize() >= today})
+ chosen = future_dates[0] if future_dates else sorted({dt.normalize() for dt in parsed_dates})[0]
+ return chosen.strftime("%b %d, %Y")
return None
except Exception:
return None
@@ -481,14 +537,67 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]:
t = yf.Ticker(ticker.upper())
income = t.income_stmt # rows=metrics, cols=fiscal-year dates
balance = t.balance_sheet
- info = t.info or {}
if income is None or income.empty:
return []
+ try:
+ shares_history = t.get_shares_full(start="2000-01-01")
+ if isinstance(shares_history, pd.Series):
+ shares_history = shares_history.dropna().sort_index()
+ else:
+ shares_history = pd.Series(dtype=float)
+ except Exception:
+ shares_history = pd.Series(dtype=float)
+
+ def _balance_shares(period_date) -> float | None:
+ if balance is None or balance.empty or period_date not in balance.columns:
+ return None
+ for label in (
+ "Ordinary Shares Number",
+ "Share Issued",
+ "Common Stock Shares Outstanding",
+ ):
+ if label in balance.index:
+ value = balance.loc[label, period_date]
+ if pd.notna(value):
+ try:
+ shares_value = float(value)
+ except (TypeError, ValueError):
+ continue
+ if shares_value > 0:
+ return shares_value
+ return None
+
+ def _historical_shares_for_date(period_date) -> float | None:
+ direct_balance_shares = _balance_shares(period_date)
+ if direct_balance_shares:
+ return direct_balance_shares
+ if shares_history.empty:
+ return None
+
+ target = pd.Timestamp(period_date)
+ index = shares_history.index
+ if getattr(index, "tz", None) is not None and target.tzinfo is None:
+ target = target.tz_localize(index.tz)
+ elif getattr(index, "tz", None) is None and target.tzinfo is not None:
+ target = target.tz_localize(None)
+
+ deltas = pd.Series(index - target, index=index).abs()
+ if deltas.empty:
+ return None
+ nearest_idx = deltas.idxmin()
+ if abs(pd.Timestamp(nearest_idx) - target) > pd.Timedelta(days=180):
+ return None
+
+ try:
+ shares_value = float(shares_history.loc[nearest_idx])
+ except (TypeError, ValueError):
+ return None
+ return shares_value if shares_value > 0 else None
+
# One year of monthly price history per fiscal year going back 10 years
hist = t.history(period="10y", interval="1mo")
- shares = get_shares_outstanding(ticker)
rows: list[dict] = []
for date in income.columns:
@@ -549,8 +658,10 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]:
if abs(roa) < 10:
row["returnOnAssets"] = roa
- # Price-based ratios — average closing price in ±45-day window around year-end
- if shares and not hist.empty:
+ period_shares = _historical_shares_for_date(date)
+
+ # Price-based ratios use period-appropriate shares when available.
+ if period_shares and not hist.empty:
try:
date_ts = pd.Timestamp(date)
# Normalize timezones: yfinance history index may be tz-aware
@@ -564,7 +675,7 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]:
window = hist.loc[mask, "Close"]
if not window.empty:
price = float(window.mean())
- market_cap = price * shares
+ market_cap = price * period_shares
if net_income and net_income > 0:
row["peRatio"] = market_cap / net_income
@@ -671,40 +782,46 @@ def get_balance_sheet_bridge_items(ticker: str) -> dict:
@st.cache_data(ttl=3600)
def get_free_cash_flow_series(ticker: str) -> pd.Series:
"""Return annual Free Cash Flow series (most recent first)."""
- t = yf.Ticker(ticker.upper())
- cf = t.cashflow
- if cf is None or cf.empty:
- return pd.Series(dtype=float)
- if "Free Cash Flow" in cf.index:
- return cf.loc["Free Cash Flow"].dropna()
- # Compute from operating CF - capex
try:
- op = cf.loc["Operating Cash Flow"]
- capex = cf.loc["Capital Expenditure"]
- return (op + capex).dropna()
- except KeyError:
+ t = yf.Ticker(ticker.upper())
+ cf = t.cashflow
+ if cf is None or cf.empty:
+ return pd.Series(dtype=float)
+ if "Free Cash Flow" in cf.index:
+ return cf.loc["Free Cash Flow"].dropna()
+ # Compute from operating CF - capex
+ try:
+ op = cf.loc["Operating Cash Flow"]
+ capex = cf.loc["Capital Expenditure"]
+ return (op + capex).dropna()
+ except KeyError:
+ return pd.Series(dtype=float)
+ except Exception:
return pd.Series(dtype=float)
@st.cache_data(ttl=3600)
def get_free_cash_flow_ttm(ticker: str) -> float | None:
"""Return trailing-twelve-month free cash flow from quarterly cash flow statements."""
- t = yf.Ticker(ticker.upper())
- cf_q = t.quarterly_cashflow
- if cf_q is None or cf_q.empty:
- return None
+ try:
+ t = yf.Ticker(ticker.upper())
+ cf_q = t.quarterly_cashflow
+ if cf_q is None or cf_q.empty:
+ return None
- if "Free Cash Flow" in cf_q.index:
- vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna()
- if len(vals) == 4:
- return float(vals.sum())
+ if "Free Cash Flow" in cf_q.index:
+ vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna()
+ if len(vals) == 4:
+ return float(vals.sum())
- try:
- op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna()
- capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna()
- if len(op) == 4 and len(capex) == 4:
- return float((op + capex).sum())
- except KeyError:
+ try:
+ op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna()
+ capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna()
+ if len(op) == 4 and len(capex) == 4:
+ return float((op + capex).sum())
+ except KeyError:
+ return None
+ except Exception:
return None
return None