"""Market Data API endpoints.""" from typing import List, Optional, Dict, Any from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, Query, Body from pydantic import BaseModel import pandas as pd from src.core.database import MarketData, get_database from src.data.pricing_service import get_pricing_service from src.data.indicators import get_indicators from src.core.config import get_config router = APIRouter() @router.get("/ohlcv/{symbol:path}") async def get_ohlcv( symbol: str, timeframe: str = "1h", limit: int = 100, exchange: str = "coinbase" # Default exchange ): """Get OHLCV data for a symbol.""" from sqlalchemy import select try: # Try database first try: db = get_database() async with db.get_session() as session: # Use select() for async compatibility stmt = select(MarketData).filter_by( symbol=symbol, timeframe=timeframe, exchange=exchange ).order_by(MarketData.timestamp.desc()).limit(limit) result = await session.execute(stmt) data = result.scalars().all() if data: return [ { "time": int(d.timestamp.timestamp()), "open": float(d.open), "high": float(d.high), "low": float(d.low), "close": float(d.close), "volume": float(d.volume) } for d in reversed(data) ] except Exception as db_error: import sys print(f"Database query failed, falling back to live data: {db_error}", file=sys.stderr) # If no data in DB or DB error, fetch live from pricing service try: pricing_service = get_pricing_service() # pricing_service.get_ohlcv is currently sync in its implementation but we call it from our async handler ohlcv_data = pricing_service.get_ohlcv( symbol=symbol, timeframe=timeframe, limit=limit ) if ohlcv_data: # Convert to frontend format: [timestamp, open, high, low, close, volume] -> {time, open, high, low, close, volume} return [ { "time": int(candle[0] / 1000), # Convert ms to seconds "open": float(candle[1]), "high": float(candle[2]), "low": float(candle[3]), "close": float(candle[4]), "volume": float(candle[5]) } for candle in ohlcv_data ] except Exception as fetch_error: import sys print(f"Failed to fetch live data: {fetch_error}", file=sys.stderr) # If all else fails, return empty list return [] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/ticker/{symbol:path}") async def get_ticker(symbol: str): """Get current ticker data for a symbol. Returns ticker data with provider information. """ try: pricing_service = get_pricing_service() ticker_data = pricing_service.get_ticker(symbol) if not ticker_data: raise HTTPException(status_code=404, detail=f"Ticker data not available for {symbol}") active_provider = pricing_service.get_active_provider() return { "symbol": symbol, "bid": float(ticker_data.get('bid', 0)), "ask": float(ticker_data.get('ask', 0)), "last": float(ticker_data.get('last', 0)), "high": float(ticker_data.get('high', 0)), "low": float(ticker_data.get('low', 0)), "volume": float(ticker_data.get('volume', 0)), "timestamp": ticker_data.get('timestamp'), "provider": active_provider, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/health") async def get_provider_health(provider: Optional[str] = Query(None, description="Specific provider name")): """Get health status for pricing providers. Args: provider: Optional provider name to get health for specific provider """ try: pricing_service = get_pricing_service() health_data = pricing_service.get_provider_health(provider) return { "active_provider": pricing_service.get_active_provider(), "health": health_data, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/status") async def get_provider_status(): """Get detailed status for all pricing providers.""" try: pricing_service = get_pricing_service() health_data = pricing_service.get_provider_health() cache_stats = pricing_service.get_cache_stats() return { "active_provider": pricing_service.get_active_provider(), "providers": health_data, "cache": cache_stats, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/config") async def get_provider_config(): """Get provider configuration.""" try: config = get_config() provider_config = config.get("data_providers", {}) return provider_config except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class ProviderConfigUpdate(BaseModel): """Provider configuration update model.""" primary: Optional[List[Dict[str, Any]]] = None fallback: Optional[Dict[str, Any]] = None caching: Optional[Dict[str, Any]] = None websocket: Optional[Dict[str, Any]] = None @router.put("/providers/config") async def update_provider_config(config_update: ProviderConfigUpdate = Body(...)): """Update provider configuration.""" try: config = get_config() current_config = config.get("data_providers", {}) # Update configuration if config_update.primary is not None: current_config["primary"] = config_update.primary if config_update.fallback is not None: current_config["fallback"] = {**current_config.get("fallback", {}), **config_update.fallback} if config_update.caching is not None: current_config["caching"] = {**current_config.get("caching", {}), **config_update.caching} if config_update.websocket is not None: current_config["websocket"] = {**current_config.get("websocket", {}), **config_update.websocket} # Save configuration config.set("data_providers", current_config) config.save() return {"message": "Configuration updated successfully", "config": current_config} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/spread") async def get_spread_data( primary_symbol: str = Query(..., description="Primary symbol (e.g., SOL/USD)"), secondary_symbol: str = Query(..., description="Secondary symbol (e.g., AVAX/USD)"), timeframe: str = Query("1h", description="Timeframe"), lookback: int = Query(50, description="Number of candles to fetch"), ): """Get spread and Z-Score data for pairs trading visualization. Returns spread ratio and Z-Score time series for the given symbol pair. """ try: pricing_service = get_pricing_service() # Fetch OHLCV for both symbols ohlcv_a = pricing_service.get_ohlcv( symbol=primary_symbol, timeframe=timeframe, limit=lookback ) ohlcv_b = pricing_service.get_ohlcv( symbol=secondary_symbol, timeframe=timeframe, limit=lookback ) if not ohlcv_a or not ohlcv_b: raise HTTPException(status_code=404, detail="Could not fetch data for one or both symbols") # Convert to DataFrames df_a = pd.DataFrame(ohlcv_a, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_b = pd.DataFrame(ohlcv_b, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # Align by length min_len = min(len(df_a), len(df_b)) df_a = df_a.tail(min_len).reset_index(drop=True) df_b = df_b.tail(min_len).reset_index(drop=True) # Calculate spread (ratio) closes_a = df_a['close'].astype(float) closes_b = df_b['close'].astype(float) spread = closes_a / closes_b # Calculate Z-Score with rolling window lookback_window = min(20, min_len - 1) rolling_mean = spread.rolling(window=lookback_window).mean() rolling_std = spread.rolling(window=lookback_window).std() z_score = (spread - rolling_mean) / rolling_std # Build response result = [] for i in range(min_len): result.append({ "timestamp": int(df_a['timestamp'].iloc[i]), "spread": float(spread.iloc[i]) if not pd.isna(spread.iloc[i]) else None, "zScore": float(z_score.iloc[i]) if not pd.isna(z_score.iloc[i]) else None, "priceA": float(closes_a.iloc[i]), "priceB": float(closes_b.iloc[i]), }) # Filter out entries with null Z-Score (during warmup period) result = [r for r in result if r["zScore"] is not None] return { "primarySymbol": primary_symbol, "secondarySymbol": secondary_symbol, "timeframe": timeframe, "lookbackWindow": lookback_window, "data": result, "currentSpread": result[-1]["spread"] if result else None, "currentZScore": result[-1]["zScore"] if result else None, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/indicators/{symbol:path}") async def get_indicators_data( symbol: str, timeframe: str = "1h", limit: int = 100, exchange: str = "coinbase", indicators: str = Query("", description="Comma-separated list of indicators (e.g., 'sma_20,ema_20,rsi,macd,bollinger_bands')") ): """Get OHLCV data with technical indicators for a symbol. Supported indicators: - sma_: Simple Moving Average (e.g., sma_20, sma_50) - ema_: Exponential Moving Average (e.g., ema_20, ema_50) - rsi: Relative Strength Index - macd: MACD (returns macd, signal, histogram) - bollinger_bands: Bollinger Bands (returns upper, middle, lower) - atr: Average True Range - obv: On Balance Volume - adx: Average Directional Index """ from sqlalchemy import select try: # Fetch OHLCV data first (reuse existing logic) ohlcv_data = [] try: db = get_database() async with db.get_session() as session: stmt = select(MarketData).filter_by( symbol=symbol, timeframe=timeframe, exchange=exchange ).order_by(MarketData.timestamp.desc()).limit(limit) result = await session.execute(stmt) data = result.scalars().all() if data: ohlcv_data = [ { "time": int(d.timestamp.timestamp()), "open": float(d.open), "high": float(d.high), "low": float(d.low), "close": float(d.close), "volume": float(d.volume) } for d in reversed(data) ] except Exception: pass # If no DB data, fetch from pricing service if not ohlcv_data: try: pricing_service = get_pricing_service() ohlcv_raw = pricing_service.get_ohlcv( symbol=symbol, timeframe=timeframe, limit=limit ) if ohlcv_raw: ohlcv_data = [ { "time": int(candle[0] / 1000), "open": float(candle[1]), "high": float(candle[2]), "low": float(candle[3]), "close": float(candle[4]), "volume": float(candle[5]) } for candle in ohlcv_raw ] except Exception: pass if not ohlcv_data: return {"ohlcv": [], "indicators": {}} # Convert to DataFrame for indicator calculation df = pd.DataFrame(ohlcv_data) df.set_index('time', inplace=True) # Prepare DataFrame for indicators (needs columns: open, high, low, close, volume) df_ind = pd.DataFrame({ 'open': df['open'], 'high': df['high'], 'low': df['low'], 'close': df['close'], 'volume': df['volume'] }) # Parse indicator list indicator_list = [ind.strip() for ind in indicators.split(',') if ind.strip()] if indicators else [] # Calculate indicators indicators_calc = get_indicators() if indicator_list: df_with_indicators = indicators_calc.calculate_all(df_ind, indicators=indicator_list) else: # Default indicators if none specified df_with_indicators = indicators_calc.calculate_all(df_ind) # Build response result_ohlcv = df.reset_index().to_dict('records') # Extract indicator data indicator_data = {} for col in df_with_indicators.columns: if col not in ['open', 'high', 'low', 'close', 'volume']: # Convert NaN to None for JSON serialization values = df_with_indicators[col].replace({pd.NA: None, pd.NaT: None}).tolist() indicator_data[col] = [ float(v) if v is not None and not pd.isna(v) else None for v in values ] return { "ohlcv": result_ohlcv, "indicators": indicator_data, "times": [int(t) for t in df.index.tolist()] } except Exception as e: raise HTTPException(status_code=500, detail=str(e))