diff options
| author | Tyler <tyler@tylerhoang.xyz> | 2026-05-16 00:02:32 -0700 |
|---|---|---|
| committer | Tyler <tyler@tylerhoang.xyz> | 2026-05-16 00:02:32 -0700 |
| commit | 0d888203cbc4dc596d0c05cedfeabe8785b263fc (patch) | |
| tree | 7aa04a8b6b669fc8258e7e95905c07656c6f93f9 | |
| parent | 870f8e6c8b88d61d0f7183b938b9a496c193b141 (diff) | |
Fix valuation and data robustness bugs
| -rw-r--r-- | app.py | 19 | ||||
| -rw-r--r-- | components/news.py | 11 | ||||
| -rw-r--r-- | components/valuation.py | 85 | ||||
| -rw-r--r-- | services/data_service.py | 217 | ||||
| -rw-r--r-- | utils/security.py | 33 |
5 files changed, 261 insertions, 104 deletions
@@ -408,6 +408,7 @@ hr { import plotly.graph_objects as go import plotly.io as pio +from utils.security import escape_html, validate_outbound_url # ── Plotly theme ────────────────────────────────────────────────────────────── _prism_layout = go.Layout( @@ -543,6 +544,8 @@ with st.sidebar: co_name = info.get("longName", ticker) price = get_latest_price(ticker) prev_close = info.get("previousClose") or info.get("regularMarketPreviousClose") + ticker_html = escape_html(ticker) + co_name_html = escape_html(co_name) # Ticker + name st.markdown(f""" @@ -553,12 +556,12 @@ with st.sidebar: font-size: 2rem; color: #F2ECDC; line-height: 0.95; letter-spacing: -0.025em; margin-bottom: 4px; - ">{ticker}</div> + ">{ticker_html}</div> <div style=" font-family: 'IBM Plex Sans', sans-serif; font-size: 11px; color: #8E8676; letter-spacing: 0.01em; - ">{co_name}</div> + ">{co_name_html}</div> </div> """, unsafe_allow_html=True) @@ -608,10 +611,10 @@ with st.sidebar: emp_str = f"{employees:,}" if isinstance(employees, int) else "—" rows = [ - ("Exchange", exchange), - ("Sector", sector), - ("Currency", currency), - ("Employees", emp_str), + ("Exchange", escape_html(exchange)), + ("Sector", escape_html(sector)), + ("Currency", escape_html(currency)), + ("Employees", escape_html(emp_str)), ] rows_html = "".join(f""" <div style="display:flex;justify-content:space-between;align-items:baseline;"> @@ -628,11 +631,11 @@ with st.sidebar: ">{rows_html}</div> """, unsafe_allow_html=True) - website = info.get("website", "") + website = validate_outbound_url(info.get("website", "")) if website: st.markdown(f""" <div style="padding:6px 0 0;"> - <a href="{website}" target="_blank" style=" + <a href="{escape_html(website)}" target="_blank" rel="noopener noreferrer" style=" font-family:'IBM Plex Sans',sans-serif; font-size:11px;color:#C2AA7A; text-decoration:none; diff --git a/components/news.py b/components/news.py index 522826c..cb43ea8 100644 --- a/components/news.py +++ b/components/news.py @@ -3,6 +3,7 @@ import streamlit as st from datetime import datetime from services.news_service import get_company_news, get_news_sentiment from services.fmp_service import get_company_news as get_fmp_news +from utils.security import escape_html, validate_outbound_url def _sentiment_badge(sentiment: str) -> str: @@ -69,9 +70,10 @@ def render_news(ticker: str): for article in articles: headline = article.get("headline") or article.get("title", "No title") source = article.get("source") or article.get("site", "") - url = article.get("url") or article.get("newsURL") or article.get("url", "") + url = validate_outbound_url(article.get("url") or article.get("newsURL")) timestamp = article.get("datetime") or article.get("publishedDate", "") summary = article.get("summary") or article.get("text") or "" + headline_html = escape_html(headline) sentiment = _classify_sentiment(article) badge = _sentiment_badge(sentiment) @@ -81,9 +83,12 @@ def render_news(ticker: str): col1, col2 = st.columns([5, 1]) with col1: if url: - st.markdown(f"**[{headline}]({url})**") + st.markdown( + f'<strong><a href="{escape_html(url)}" target="_blank" rel="noopener noreferrer">{headline_html}</a></strong>', + unsafe_allow_html=True, + ) else: - st.markdown(f"**{headline}**") + st.markdown(f"<strong>{headline_html}</strong>", unsafe_allow_html=True) meta = " · ".join(filter(None, [source, time_str])) if meta: st.caption(meta) diff --git a/components/valuation.py b/components/valuation.py index db352b3..9525c69 100644 --- a/components/valuation.py +++ b/components/valuation.py @@ -1,5 +1,4 @@ """Valuation panel — key ratios, models, comparable companies, analyst targets, earnings history.""" -import json import numpy as np import pandas as pd import plotly.graph_objects as go @@ -38,6 +37,7 @@ from services.valuation_service import ( compute_raw_historical_growth_rate, ) from utils.formatters import fmt_ratio, fmt_pct, fmt_large, fmt_currency +from utils.security import escape_html, json_for_script FINANCIAL_SECTORS = {"Financial Services"} @@ -116,6 +116,10 @@ def _escape_markdown_currency(value: str) -> str: return value.replace("$", r"\$") +def _h(value) -> str: + return escape_html(value) + + def render_valuation(ticker: str): tabs = st.tabs([ "Key Ratios", @@ -503,10 +507,10 @@ def _render_ratios(ticker: str): _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" - co_name = (info.get("longName", ticker) if info else ticker) or ticker - sector = (info.get("sector", "—") if info else "—") or "—" - industry = (info.get("industry", "—") if info else "—") or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") + co_name = _h((info.get("longName", ticker) if info else ticker) or ticker) + sector = _h((info.get("sector", "—") if info else "—") or "—") + industry = _h((info.get("industry", "—") if info else "—") or "—") n_peers = len(peers) from datetime import date as _date today_str = _date.today().strftime("%b %d, %Y") @@ -1245,7 +1249,7 @@ def _build_dcf_canvas_html( bar_line_colors = ["#1F3B5E"] * len(discounted) + ["#DCC79E"] bar_text = [_fmt_b(v) for v in discounted] + [_fmt_b(tv_pv)] - plotly_data_json = json.dumps([{ + plotly_data_json = json_for_script([{ "type": "bar", "x": bar_x, "y": bar_y, @@ -1256,7 +1260,7 @@ def _build_dcf_canvas_html( "hovertemplate": "%{x}: %{text}<extra></extra>", "cliponaxis": False, }]) - plotly_layout_json = json.dumps({ + plotly_layout_json = json_for_script({ "paper_bgcolor": "#11151C", "plot_bgcolor": "#11151C", "margin": {"l": 48, "r": 8, "t": 28, "b": 36}, @@ -1280,7 +1284,7 @@ def _build_dcf_canvas_html( "uniformtext": {"mode": "hide", "minsize": 8}, }) - data_json = json.dumps({ + data_json = json_for_script({ "baseFcf": result["base_fcf"], "netDebt": result["net_debt"], "otherClaims": ctx["preferred_equity"] + ctx["minority_interest"], @@ -1758,11 +1762,11 @@ def _build_multiples_canvas_html(ctx: dict) -> str: pr = get_ratios_for_tickers(peers[:6]) if pr: import statistics as _stats - eb_vs = [float(r["enterpriseValueMultipleTTM"]) for r in pr.values() + eb_vs = [float(r["enterpriseValueMultipleTTM"]) for r in pr if r and r.get("enterpriseValueMultipleTTM") and 2 < r["enterpriseValueMultipleTTM"] < 100] - rv_vs = [float(r["priceToSalesRatioTTM"]) for r in pr.values() - if r and r.get("priceToSalesRatioTTM") and 0.1 < r["priceToSalesRatioTTM"] < 50] - pb_vs = [float(r["priceToBookRatioTTM"]) for r in pr.values() + rv_vs = [float(r["evToSalesTTM"]) for r in pr + if r and r.get("evToSalesTTM") and 0.1 < r["evToSalesTTM"] < 50] + pb_vs = [float(r["priceToBookRatioTTM"]) for r in pr if r and r.get("priceToBookRatioTTM") and 0.5 < r["priceToBookRatioTTM"] < 200] if eb_vs: eb_sector = _stats.median(eb_vs) @@ -1777,7 +1781,7 @@ def _build_multiples_canvas_html(ctx: dict) -> str: rv_sector = _clamp(rv_sector, 4.0, 20.0) pb_sector = _clamp(pb_sector, 4.0, 60.0) - dcf_iv = st.session_state.get("dcf_intrinsic") + dcf_iv = st.session_state.get(f"dcf_intrinsic_{ctx['ticker']}") dcf_wacc = st.session_state.get(f"dcf_wacc_{ctx['ticker']}", 10.0) dcf_tg = st.session_state.get(f"dcf_tg_{ctx['ticker']}", 2.5) dcf_yrs = st.session_state.get(f"dcf_yrs_{ctx['ticker']}", 5) @@ -1891,9 +1895,9 @@ def _build_multiples_canvas_html(ctx: dict) -> str: dcf_meta_str = "Switch to DCF tab to compute" ticker = ctx["ticker"] - exchange = (ctx.get("info") or {}).get("exchange") or "—" + exchange = _h((ctx.get("info") or {}).get("exchange") or "—") - data_json = json.dumps({ + data_json = json_for_script({ "market": market, "shares": shares, "netDebt": net_debt, "totalDebt": total_debt, "cash": cash, "ebitda": ebitda, "revenue": revenue, "bookPs": book_ps, @@ -2269,7 +2273,7 @@ def _render_dcf_model(ctx: dict): st.warning(result["error"]) return - st.session_state["dcf_intrinsic"] = result["intrinsic_value_per_share"] + st.session_state[f"dcf_intrinsic_{ctx['ticker']}"] = result["intrinsic_value_per_share"] st.session_state[f"dcf_params_{ctx['ticker']}"] = {"wacc": wacc_pct, "tg": tg_pct, "yrs": yrs} # Cross-check: run other models at their current market multiples @@ -2704,8 +2708,9 @@ def _render_models(ticker: str): st.caption(ctx["summary"]) _render_model_availability(ctx) - if "models_view" not in st.session_state: - st.session_state["models_view"] = "dcf" + view_key = f"models_view_{ticker}" + if view_key not in st.session_state: + st.session_state[view_key] = "dcf" st.markdown(_DCF_RAIL_CSS, unsafe_allow_html=True) @@ -2714,24 +2719,24 @@ def _render_models(ticker: str): if st.button( "Discounted Cash Flow", key=f"pick_dcf_{ticker}", - type="primary" if st.session_state["models_view"] == "dcf" else "secondary", + type="primary" if st.session_state[view_key] == "dcf" else "secondary", width="stretch", ): - st.session_state["models_view"] = "dcf" + st.session_state[view_key] = "dcf" st.rerun() with _pc2: if st.button( "Multiples", key=f"pick_mult_{ticker}", - type="primary" if st.session_state["models_view"] == "multiples" else "secondary", + type="primary" if st.session_state[view_key] == "multiples" else "secondary", width="stretch", ): - st.session_state["models_view"] = "multiples" + st.session_state[view_key] = "multiples" st.rerun() st.markdown("---") - view = st.session_state.get("models_view", "dcf") + view = st.session_state.get(view_key, "dcf") if view == "dcf": if ctx["dcf_available"]: _render_dcf_model(ctx) @@ -2826,7 +2831,6 @@ _CC_CSS = """<style> def _render_comps(ticker: str): - import json as _json info = get_company_info(ticker) auto_peers = get_peers(ticker) @@ -2978,7 +2982,7 @@ def _render_comps(ticker: str): }) sym = ticker.upper() - name = (info.get("longName") or info.get("shortName") or sym) if info else sym + name = _h((info.get("longName") or info.get("shortName") or sym) if info else sym) price = get_latest_price(ticker) prev_close = (info.get("previousClose") if info else None) if price and prev_close and prev_close > 0: @@ -2988,11 +2992,11 @@ def _render_comps(ticker: str): else: chg_str, chg_cls = "—", "" raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") price_str = f"${price:.2f}" if price else "—" n_peers = len(peers) - 1 - data_json = _json.dumps({ + data_json = json_for_script({ "subject": sym, "peers": peers, "peerMedian": peer_median_row, @@ -3257,7 +3261,6 @@ _AT_CSS = """<style> # ── Analyst Targets ────────────────────────────────────────────────────────── def _render_analyst_targets(ticker: str): - import json as _json targets = get_analyst_price_targets(ticker) recs = get_recommendations_summary(ticker) @@ -3409,7 +3412,7 @@ def _render_analyst_targets(ticker: str): # Context strip sym = ticker.upper() - name = (info.get("longName") or info.get("shortName") or sym) if info else sym + name = _h((info.get("longName") or info.get("shortName") or sym) if info else sym) price = get_latest_price(ticker) prev_close = (info.get("previousClose") if info else None) if price and prev_close and prev_close > 0: @@ -3420,7 +3423,7 @@ def _render_analyst_targets(ticker: str): chg_str, chg_cls = "—", "" _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") price_str = f"${price:.2f}" if price else "—" ctx_html = ( @@ -3580,7 +3583,6 @@ _EH_CSS = """<style> def _render_earnings_history(ticker: str): - import json as _json eh = get_earnings_history(ticker) next_date = get_next_earnings_date(ticker) @@ -3812,7 +3814,7 @@ def _render_earnings_history(ticker: str): # Context strip sym = ticker.upper() - name = (info.get("longName") or info.get("shortName") or sym) if info else sym + name = _h((info.get("longName") or info.get("shortName") or sym) if info else sym) price = get_latest_price(ticker) prev_close = (info.get("previousClose") if info else None) if price and prev_close and prev_close > 0: @@ -3823,7 +3825,7 @@ def _render_earnings_history(ticker: str): chg_str, chg_cls = "—", "" _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") price_str = f"${price:.2f}" if price else "—" ctx_html = ( @@ -3838,7 +3840,7 @@ def _render_earnings_history(ticker: str): '</div></div>' ) - next_date_str = next_date if next_date else "Not scheduled" + next_date_str = _h(next_date if next_date else "Not scheduled") med_surp_str = (f"{med_surprise * 100:+.1f}%" if med_surprise is not None else "—") lede_html = ( @@ -4060,7 +4062,6 @@ _KH_CSS = """<style> def _render_historical_ratios(ticker: str): - import json as _json info = get_company_info(ticker) hist_rows = get_historical_ratios(ticker, limit=10) if not hist_rows: @@ -4110,16 +4111,16 @@ def _render_historical_ratios(ticker: str): else: chg_str, chg_cls = "—", "" sym = ticker.upper() - name = (info.get("longName") or info.get("shortName") or sym) if info else sym + name = _h((info.get("longName") or info.get("shortName") or sym) if info else sym) _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") price_str = f"${price:.2f}" if price else "—" n_periods = len(periods) n_rows = len(series_data) n_groups = len({s["group"] for s in series_data}) total_height = 48 + 24 + 180 + 24 + 420 + 24 + (68 + n_groups * 50 + n_rows * 42) + 24 + 60 + 60 - data_json = _json.dumps({"periods": periods, "series": series_data}) + data_json = json_for_script({"periods": periods, "series": series_data}) ctx_html = ( f'<div class="val-ctx">' f'<span class="sym">{sym}</span>' @@ -4396,7 +4397,6 @@ table.fe-table td:first-child{text-align:left;color:var(--fg-1);font-weight:500} def _render_forward_estimates(ticker: str): - import json as _json with st.spinner("Loading forward estimates…"): estimates = get_analyst_estimates(ticker) @@ -4562,7 +4562,7 @@ def _render_forward_estimates(ticker: str): # Context strip info = get_company_info(ticker) sym = ticker.upper() - name = (info.get("longName") or info.get("shortName") or sym) if info else sym + name = _h((info.get("longName") or info.get("shortName") or sym) if info else sym) price = get_latest_price(ticker) prev_close = (info.get("previousClose") if info else None) if price and prev_close and prev_close > 0: @@ -4573,7 +4573,7 @@ def _render_forward_estimates(ticker: str): chg_str, chg_cls = "—", "" _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} raw_x = (info.get("exchange", "") if info else "") or "" - exchange = _XMAP.get(raw_x, raw_x) or "—" + exchange = _h(_XMAP.get(raw_x, raw_x) or "—") price_str = f"${price:.2f}" if price else "—" ctx_html = ( @@ -4702,7 +4702,7 @@ def _render_forward_estimates(ticker: str): ) js = ( - "const D=" + _json.dumps(chart_data) + ";\n" + "const D=" + json_for_script(chart_data) + ";\n" "function showTab(tab,el){" "document.querySelectorAll('.tab-pill').forEach(function(b){" "b.className='tab-pill '+(b===el?'active':'inactive');" @@ -4779,4 +4779,3 @@ def _render_forward_estimates(ticker: str): height = 1320 + len(annual_rows) * 50 components.html(doc, height=height, scrolling=False) - diff --git a/services/data_service.py b/services/data_service.py index bfd1290..9c82e14 100644 --- a/services/data_service.py +++ b/services/data_service.py @@ -26,9 +26,12 @@ def search_tickers(query: str) -> list[dict]: @st.cache_data(ttl=300) def get_company_info(ticker: str) -> dict: """Return company info dict from yfinance.""" - t = yf.Ticker(ticker.upper()) - info = t.info or {} - return info + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + return info if isinstance(info, dict) else {} + except Exception: + return {} @st.cache_data(ttl=300) @@ -57,7 +60,7 @@ def get_shares_outstanding(ticker: str) -> float | None: try: t = yf.Ticker(ticker.upper()) info = t.info or {} - for key in ("sharesOutstanding", "impliedSharesOutstanding", "floatShares"): + for key in ("sharesOutstanding", "impliedSharesOutstanding"): val = info.get(key) if val is not None: return float(val) @@ -88,31 +91,45 @@ def get_market_cap_computed(ticker: str) -> float | None: @st.cache_data(ttl=300) def get_price_history(ticker: str, period: str = "1y") -> pd.DataFrame: """Return OHLCV price history.""" - t = yf.Ticker(ticker.upper()) - df = t.history(period=period) - df.index = pd.to_datetime(df.index) - return df + try: + t = yf.Ticker(ticker.upper()) + df = t.history(period=period) + if df is None or df.empty: + return pd.DataFrame() + df.index = pd.to_datetime(df.index) + return df + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_income_statement(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_income_stmt if quarterly else t.income_stmt - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_income_stmt if quarterly else t.income_stmt + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_balance_sheet(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_balance_sheet if quarterly else t.balance_sheet - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_balance_sheet if quarterly else t.balance_sheet + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=3600) def get_cash_flow(ticker: str, quarterly: bool = False) -> pd.DataFrame: - t = yf.Ticker(ticker.upper()) - df = t.quarterly_cashflow if quarterly else t.cashflow - return df if df is not None else pd.DataFrame() + try: + t = yf.Ticker(ticker.upper()) + df = t.quarterly_cashflow if quarterly else t.cashflow + return df if df is not None else pd.DataFrame() + except Exception: + return pd.DataFrame() @st.cache_data(ttl=300) @@ -188,12 +205,51 @@ def get_next_earnings_date(ticker: str) -> str | None: """Return the next expected earnings date as a string, or None. Uses t.calendar (no lxml dependency). """ + def _collect_dates(value) -> list: + if value is None: + return [] + if isinstance(value, dict): + for key in ("Earnings Date", "earningsDate", "earnings_date"): + if key in value: + return _collect_dates(value.get(key)) + out = [] + for nested in value.values(): + out.extend(_collect_dates(nested)) + return out + if isinstance(value, pd.DataFrame): + out = [] + for col in value.columns: + out.extend(_collect_dates(value[col])) + return out + if isinstance(value, pd.Series): + if "Earnings Date" in value.index: + return _collect_dates(value.get("Earnings Date")) + return _collect_dates(value.tolist()) + if isinstance(value, pd.Index): + return _collect_dates(value.tolist()) + if isinstance(value, (list, tuple, set)): + out = [] + for item in value: + out.extend(_collect_dates(item)) + return out + return [value] + try: t = yf.Ticker(ticker.upper()) cal = t.calendar - dates = cal.get("Earnings Date", []) - if dates: - return str(dates[0]) + raw_dates = _collect_dates(cal) + parsed_dates = [] + for value in raw_dates: + dt = pd.to_datetime(value, errors="coerce") + if pd.notna(dt): + ts = pd.Timestamp(dt) + parsed_dates.append(ts.tz_localize(None) if ts.tzinfo else ts) + + if parsed_dates: + today = pd.Timestamp.today().normalize() + future_dates = sorted({dt.normalize() for dt in parsed_dates if dt.normalize() >= today}) + chosen = future_dates[0] if future_dates else sorted({dt.normalize() for dt in parsed_dates})[0] + return chosen.strftime("%b %d, %Y") return None except Exception: return None @@ -481,14 +537,67 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: t = yf.Ticker(ticker.upper()) income = t.income_stmt # rows=metrics, cols=fiscal-year dates balance = t.balance_sheet - info = t.info or {} if income is None or income.empty: return [] + try: + shares_history = t.get_shares_full(start="2000-01-01") + if isinstance(shares_history, pd.Series): + shares_history = shares_history.dropna().sort_index() + else: + shares_history = pd.Series(dtype=float) + except Exception: + shares_history = pd.Series(dtype=float) + + def _balance_shares(period_date) -> float | None: + if balance is None or balance.empty or period_date not in balance.columns: + return None + for label in ( + "Ordinary Shares Number", + "Share Issued", + "Common Stock Shares Outstanding", + ): + if label in balance.index: + value = balance.loc[label, period_date] + if pd.notna(value): + try: + shares_value = float(value) + except (TypeError, ValueError): + continue + if shares_value > 0: + return shares_value + return None + + def _historical_shares_for_date(period_date) -> float | None: + direct_balance_shares = _balance_shares(period_date) + if direct_balance_shares: + return direct_balance_shares + if shares_history.empty: + return None + + target = pd.Timestamp(period_date) + index = shares_history.index + if getattr(index, "tz", None) is not None and target.tzinfo is None: + target = target.tz_localize(index.tz) + elif getattr(index, "tz", None) is None and target.tzinfo is not None: + target = target.tz_localize(None) + + deltas = pd.Series(index - target, index=index).abs() + if deltas.empty: + return None + nearest_idx = deltas.idxmin() + if abs(pd.Timestamp(nearest_idx) - target) > pd.Timedelta(days=180): + return None + + try: + shares_value = float(shares_history.loc[nearest_idx]) + except (TypeError, ValueError): + return None + return shares_value if shares_value > 0 else None + # One year of monthly price history per fiscal year going back 10 years hist = t.history(period="10y", interval="1mo") - shares = get_shares_outstanding(ticker) rows: list[dict] = [] for date in income.columns: @@ -549,8 +658,10 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: if abs(roa) < 10: row["returnOnAssets"] = roa - # Price-based ratios — average closing price in ±45-day window around year-end - if shares and not hist.empty: + period_shares = _historical_shares_for_date(date) + + # Price-based ratios use period-appropriate shares when available. + if period_shares and not hist.empty: try: date_ts = pd.Timestamp(date) # Normalize timezones: yfinance history index may be tz-aware @@ -564,7 +675,7 @@ def get_historical_ratios_yfinance(ticker: str) -> list[dict]: window = hist.loc[mask, "Close"] if not window.empty: price = float(window.mean()) - market_cap = price * shares + market_cap = price * period_shares if net_income and net_income > 0: row["peRatio"] = market_cap / net_income @@ -671,40 +782,46 @@ def get_balance_sheet_bridge_items(ticker: str) -> dict: @st.cache_data(ttl=3600) def get_free_cash_flow_series(ticker: str) -> pd.Series: """Return annual Free Cash Flow series (most recent first).""" - t = yf.Ticker(ticker.upper()) - cf = t.cashflow - if cf is None or cf.empty: - return pd.Series(dtype=float) - if "Free Cash Flow" in cf.index: - return cf.loc["Free Cash Flow"].dropna() - # Compute from operating CF - capex try: - op = cf.loc["Operating Cash Flow"] - capex = cf.loc["Capital Expenditure"] - return (op + capex).dropna() - except KeyError: + t = yf.Ticker(ticker.upper()) + cf = t.cashflow + if cf is None or cf.empty: + return pd.Series(dtype=float) + if "Free Cash Flow" in cf.index: + return cf.loc["Free Cash Flow"].dropna() + # Compute from operating CF - capex + try: + op = cf.loc["Operating Cash Flow"] + capex = cf.loc["Capital Expenditure"] + return (op + capex).dropna() + except KeyError: + return pd.Series(dtype=float) + except Exception: return pd.Series(dtype=float) @st.cache_data(ttl=3600) def get_free_cash_flow_ttm(ticker: str) -> float | None: """Return trailing-twelve-month free cash flow from quarterly cash flow statements.""" - t = yf.Ticker(ticker.upper()) - cf_q = t.quarterly_cashflow - if cf_q is None or cf_q.empty: - return None + try: + t = yf.Ticker(ticker.upper()) + cf_q = t.quarterly_cashflow + if cf_q is None or cf_q.empty: + return None - if "Free Cash Flow" in cf_q.index: - vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna() - if len(vals) == 4: - return float(vals.sum()) + if "Free Cash Flow" in cf_q.index: + vals = cf_q.loc["Free Cash Flow"].iloc[:4].dropna() + if len(vals) == 4: + return float(vals.sum()) - try: - op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna() - capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna() - if len(op) == 4 and len(capex) == 4: - return float((op + capex).sum()) - except KeyError: + try: + op = cf_q.loc["Operating Cash Flow"].iloc[:4].dropna() + capex = cf_q.loc["Capital Expenditure"].iloc[:4].dropna() + if len(op) == 4 and len(capex) == 4: + return float((op + capex).sum()) + except KeyError: + return None + except Exception: return None return None diff --git a/utils/security.py b/utils/security.py new file mode 100644 index 0000000..962422b --- /dev/null +++ b/utils/security.py @@ -0,0 +1,33 @@ +"""Minimal helpers for safely rendering external text and URLs.""" +from html import escape +from urllib.parse import urlparse + + +def escape_html(value) -> str: + """Escape a value for HTML text or attribute contexts.""" + if value is None: + return "" + return escape(str(value), quote=True) + + +def validate_outbound_url(url: str | None) -> str | None: + """Allow only absolute http/https outbound URLs.""" + if not url: + return None + + candidate = str(url).strip() + if not candidate: + return None + + parsed = urlparse(candidate) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + return None + + return parsed.geturl() + + +def json_for_script(value) -> str: + """Serialize JSON for safe embedding inside inline script tags.""" + import json + + return json.dumps(value).replace("</", "<\\/") |
