blob: b060c54e0fcd19a52b521b7bb8da621c687f7b55 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
"""Finnhub news service — company news with sentiment."""
import os
import time
import requests
import streamlit as st
from dotenv import load_dotenv
load_dotenv()
BASE_URL = "https://finnhub.io/api/v1"
def _api_key() -> str:
return os.getenv("FINNHUB_API_KEY", "")
def _get(endpoint: str, params: dict = None) -> dict | list | None:
params = params or {}
params["token"] = _api_key()
try:
resp = requests.get(f"{BASE_URL}{endpoint}", params=params, timeout=10)
resp.raise_for_status()
return resp.json()
except Exception:
return None
@st.cache_data(ttl=600)
def get_company_news(ticker: str, days_back: int = 7) -> list[dict]:
"""Return recent news articles with sentiment for a ticker."""
end = int(time.time())
start = end - days_back * 86400
from datetime import datetime
date_from = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d")
date_to = datetime.utcfromtimestamp(end).strftime("%Y-%m-%d")
data = _get("/company-news", params={
"symbol": ticker.upper(),
"from": date_from,
"to": date_to,
})
if not data or not isinstance(data, list):
return []
return data[:30]
@st.cache_data(ttl=600)
def get_news_sentiment(ticker: str) -> dict:
"""Return sentiment scores for a ticker."""
data = _get("/news-sentiment", params={"symbol": ticker.upper()})
if data and isinstance(data, dict):
return data
return {}
|