"""News tab rendered as a client-side HTML surface.""" from datetime import date as _date from datetime import datetime as _dt from datetime import timezone as _tz from html import escape as _esc import streamlit.components.v1 as components from services.data_service import get_company_info, get_latest_price from services.fmp_service import get_company_news as get_fmp_news from services.news_service import get_company_news, get_news_sentiment from utils.security import validate_outbound_url _XMAP = {"NYQ": "NYSE", "NMS": "NASDAQ", "NGM": "NASDAQ", "NCM": "NASDAQ", "ASE": "AMEX"} def _safe_float(value): try: return float(value) except (TypeError, ValueError): return None def _classify_sentiment(article: dict) -> str: positive = [ "beats", "surges", "rises", "gains", "profit", "record", "upgrade", "buy", "outperform", "growth", "strong", "higher", "rally", "raises", "expands", "upside", ] negative = [ "misses", "falls", "drops", "loss", "cut", "downgrade", "sell", "underperform", "weak", "lower", "decline", "warning", "layoff", "lawsuit", "probe", "cuts", ] headline = (article.get("headline") or article.get("title") or "").lower() summary = (article.get("summary") or article.get("text") or "").lower() text = headline + " " + summary pos = sum(1 for w in positive if w in text) neg = sum(1 for w in negative if w in text) if pos > neg: return "bullish" if neg > pos: return "bearish" return "neutral" def _normalize_dt(raw): if raw is None: return None if isinstance(raw, _dt): return raw if isinstance(raw, _date): return _dt(raw.year, raw.month, raw.day) if isinstance(raw, (int, float)): try: return _dt.fromtimestamp(float(raw), tz=_tz.utc).astimezone().replace(tzinfo=None) except Exception: return None text = str(raw).strip() if not text: return None norm = text.replace("Z", "+00:00") try: dt = _dt.fromisoformat(norm) if dt.tzinfo is not None: return dt.astimezone().replace(tzinfo=None) return dt.replace(tzinfo=_tz.utc).astimezone().replace(tzinfo=None) except Exception: pass for fmt in ( "%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M", "%m/%d/%Y", ): try: naive = _dt.strptime(text[: len(fmt)], fmt) return naive.replace(tzinfo=_tz.utc).astimezone().replace(tzinfo=None) except Exception: pass if len(text) >= 10: try: naive = _dt.strptime(text[:10], "%Y-%m-%d") return naive.replace(tzinfo=_tz.utc).astimezone().replace(tzinfo=None) except Exception: pass return None def render_news(ticker: str): import json as _json info = get_company_info(ticker) or {} price = get_latest_price(ticker) sentiment_data = get_news_sentiment(ticker) or {} # Finnhub first, FMP fallback. raw_articles = get_company_news(ticker) or [] provider = "finnhub" if not raw_articles: raw_articles = get_fmp_news(ticker) or [] provider = "fmp" rows = [] for i, article in enumerate(raw_articles): article = article or {} title = str(article.get("headline") or article.get("title") or "").strip() if not title: title = "Untitled item" source = str(article.get("source") or article.get("site") or "").strip() or "Unknown source" summary = str(article.get("summary") or article.get("text") or "").strip() dt_obj = _normalize_dt(article.get("datetime") or article.get("publishedDate")) ts = int(dt_obj.timestamp()) if dt_obj else None date_iso = dt_obj.strftime("%Y-%m-%d") if dt_obj else None date_pretty = dt_obj.strftime("%b %d, %Y") if dt_obj else "—" time_pretty = dt_obj.strftime("%H:%M") if dt_obj else "—" url = validate_outbound_url(article.get("url") or article.get("newsURL")) sentiment = _classify_sentiment(article) rows.append( { "id": i, "title": title, "summary": summary, "source": source, "url": url, "sentiment": sentiment, "date": date_iso, "date_pretty": date_pretty, "time_pretty": time_pretty, "ts": ts, } ) rows.sort(key=lambda r: (r.get("ts") is not None, r.get("ts") or 0), reverse=True) buzz = sentiment_data.get("buzz") or {} score = sentiment_data.get("sentiment") or {} bull_pct = score.get("bullishPercent") bear_pct = score.get("bearishPercent") weekly_articles = buzz.get("articlesInLastWeek") bull_str = f"{float(bull_pct) * 100:.1f}%" if _safe_float(bull_pct) is not None else "—" bear_str = f"{float(bear_pct) * 100:.1f}%" if _safe_float(bear_pct) is not None else "—" cur_num = _safe_float(info.get("currentPrice") or info.get("regularMarketPrice") or price) prev_num = _safe_float(info.get("previousClose")) if cur_num is not None and prev_num is not None and prev_num > 0: chg_pct = (cur_num - prev_num) / prev_num * 100.0 chg_sign = "+" if chg_pct >= 0 else "" chg_arrow = "▲" if chg_pct >= 0 else "▼" chg_str = chg_arrow + " " + chg_sign + "{:.2f}%".format(chg_pct) chg_cls = "chg-pos" if chg_pct >= 0 else "chg-neg" else: chg_str = "—" chg_cls = "" exchange_raw = str(info.get("exchange") or "") exchange = _XMAP.get(exchange_raw, exchange_raw) or "—" co_name = _esc(info.get("longName") or info.get("shortName") or ticker.upper()) price_str = "${:,.2f}".format(cur_num) if cur_num is not None else "—" rows_js = "const NEWS_ROWS=" + _json.dumps(rows).replace("", "<\\/") + ";" n_rows = max(len(rows), 18) height = 1240 + n_rows * 90 _ROOT = ( "" ) fonts_link = ( "" "" ) _NEWS_CSS = """""" ctx_html = ( '
Headline flow for ' + _esc(ticker.upper()) + '. Read source mix, timing, and sentiment tags in one stream. Sentiment chips are rule-based heuristics from headline and summary language.
' + "