summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--backend/app/services/data_service.py57
-rw-r--r--backend/tests/test_api.py57
2 files changed, 114 insertions, 0 deletions
diff --git a/backend/app/services/data_service.py b/backend/app/services/data_service.py
index f7188f7..9a88005 100644
--- a/backend/app/services/data_service.py
+++ b/backend/app/services/data_service.py
@@ -27,6 +27,7 @@ RATIO_CACHE = TTLCache(maxsize=256, ttl=3600)
BETA_CACHE = TTLCache(maxsize=256, ttl=3600)
SHORT_CACHE = TTLCache(maxsize=256, ttl=3600)
FINANCIALS_CACHE = TTLCache(maxsize=128, ttl=3600)
+VALUATION_CACHE = TTLCache(maxsize=128, ttl=3600)
PERIODS = {"1m", "3m", "6m", "1y", "2y", "5y"}
YF_PERIOD_MAP = {"1m": "1mo", "3m": "3mo", "6m": "6mo", "1y": "1y", "2y": "2y", "5y": "5y"}
@@ -265,6 +266,62 @@ def _build_cash_flow(cf: pd.DataFrame, cf_q: pd.DataFrame, inc: pd.DataFrame, in
}
+_GROWTH_FLOOR = -0.50
+_GROWTH_CAP = 0.50
+_GROWTH_MIN_BASE = 1e-9
+
+
+def _cap_growth(value: float) -> float:
+ return max(_GROWTH_FLOOR, min(_GROWTH_CAP, float(value)))
+
+
+def _dcf_capped_growth_rate(fcf_series: "pd.Series") -> float | None:
+ historical = fcf_series.sort_index().dropna().astype(float).values
+ if len(historical) < 2:
+ return None
+ rates = []
+ for i in range(1, len(historical)):
+ prev, curr = float(historical[i - 1]), float(historical[i])
+ if abs(prev) < _GROWTH_MIN_BASE:
+ continue
+ if prev <= 0 or curr <= 0:
+ continue
+ rates.append((curr - prev) / prev)
+ if not rates:
+ return None
+ raw = float(pd.Series(rates).median())
+ return _cap_growth(raw)
+
+
+def _build_fcf_series(cf_annual: "pd.DataFrame") -> "pd.Series | None":
+ if cf_annual is None or cf_annual.empty:
+ return None
+ op_labels = ("Operating Cash Flow", "Cash Flow From Continuing Operating Activities")
+ op_row = None
+ for label in op_labels:
+ if label in cf_annual.index:
+ op_row = pd.to_numeric(cf_annual.loc[label], errors="coerce")
+ break
+ if op_row is None or "Capital Expenditure" not in cf_annual.index:
+ return None
+ capex_row = pd.to_numeric(cf_annual.loc["Capital Expenditure"], errors="coerce")
+ fcf = (op_row + capex_row).dropna().sort_index()
+ return fcf if not fcf.empty else None
+
+
+def _build_multiple_result(raw: dict) -> dict:
+ if not raw:
+ return {"available": False}
+ return {
+ "available": True,
+ "implied_price_per_share": raw.get("implied_price_per_share"),
+ "implied_ev": raw.get("implied_ev"),
+ "equity_value": raw.get("equity_value"),
+ "net_debt": raw.get("net_debt"),
+ "multiple_used": raw.get("target_multiple_used"),
+ }
+
+
@cached(FINANCIALS_CACHE)
def get_financials(symbol: str, period: str = "annual") -> dict:
sym = normalize_symbol(symbol)
diff --git a/backend/tests/test_api.py b/backend/tests/test_api.py
index 395bc12..a5c4eb5 100644
--- a/backend/tests/test_api.py
+++ b/backend/tests/test_api.py
@@ -18,6 +18,7 @@ def clear_service_caches() -> None:
data_service.SHARES_CACHE.clear()
data_service.RATIO_CACHE.clear()
data_service.FINANCIALS_CACHE.clear()
+ data_service.VALUATION_CACHE.clear()
def quarterly_frame(rows: dict[str, list[float]]) -> pd.DataFrame:
@@ -627,3 +628,59 @@ def test_valuation_schema_structure() -> None:
assert resp.dcf.intrinsic_value_per_share == 182.0
assert resp.ev_ebitda.multiple_used == 20.0
assert resp.ev_revenue.available is False
+
+
+def test_build_fcf_series_happy_path() -> None:
+ cf = annual_frame({
+ "Operating Cash Flow": [100.0, 90.0, 80.0, 70.0],
+ "Capital Expenditure": [-10.0, -9.0, -8.0, -7.0],
+ })
+ result = data_service._build_fcf_series(cf)
+ assert result is not None
+ assert len(result) == 4
+ # most recent year FCF = 100 + (-10) = 90
+ assert result.iloc[-1] == 90.0
+
+
+def test_build_fcf_series_empty_df() -> None:
+ result = data_service._build_fcf_series(pd.DataFrame())
+ assert result is None
+
+
+def test_build_fcf_series_missing_capex() -> None:
+ cf = annual_frame({"Operating Cash Flow": [100.0, 90.0, 80.0, 70.0]})
+ result = data_service._build_fcf_series(cf)
+ assert result is None
+
+
+def test_build_multiple_result_empty() -> None:
+ result = data_service._build_multiple_result({})
+ assert result == {"available": False}
+
+
+def test_build_multiple_result_valid() -> None:
+ raw = {
+ "implied_price_per_share": 178.0,
+ "implied_ev": 1_000.0,
+ "equity_value": 900.0,
+ "net_debt": 100.0,
+ "target_multiple_used": 20.0,
+ }
+ result = data_service._build_multiple_result(raw)
+ assert result["available"] is True
+ assert result["implied_price_per_share"] == 178.0
+ assert result["multiple_used"] == 20.0
+
+
+def test_dcf_capped_growth_rate_caps_extremes() -> None:
+ # growth of 200% should be capped at 50%
+ series = pd.Series([10.0, 30.0], index=pd.to_datetime(["2022", "2023"]))
+ result = data_service._dcf_capped_growth_rate(series)
+ assert result == 0.50
+
+
+def test_dcf_capped_growth_rate_skips_sign_flip() -> None:
+ # negative to positive is a sign flip — should skip and return None (no usable periods)
+ series = pd.Series([-10.0, 20.0], index=pd.to_datetime(["2022", "2023"]))
+ result = data_service._dcf_capped_growth_rate(series)
+ assert result is None