"""Finnhub news service — company news with sentiment.""" import os import time import requests import streamlit as st from dotenv import load_dotenv load_dotenv() BASE_URL = "https://finnhub.io/api/v1" def _api_key() -> str: return os.getenv("FINNHUB_API_KEY", "") def _get(endpoint: str, params: dict = None) -> dict | list | None: params = params or {} params["token"] = _api_key() try: resp = requests.get(f"{BASE_URL}{endpoint}", params=params, timeout=10) resp.raise_for_status() return resp.json() except Exception: return None @st.cache_data(ttl=600) def get_company_news(ticker: str, days_back: int = 7) -> list[dict]: """Return recent news articles with sentiment for a ticker.""" end = int(time.time()) start = end - days_back * 86400 from datetime import datetime date_from = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d") date_to = datetime.utcfromtimestamp(end).strftime("%Y-%m-%d") data = _get("/company-news", params={ "symbol": ticker.upper(), "from": date_from, "to": date_to, }) if not data or not isinstance(data, list): return [] return data[:30] @st.cache_data(ttl=600) def get_news_sentiment(ticker: str) -> dict: """Return sentiment scores for a ticker.""" data = _get("/news-sentiment", params={"symbol": ticker.upper()}) if data and isinstance(data, dict): return data return {}