"""Market Data API endpoints.""" from typing import List, Optional, Dict, Any from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, Query, Body from pydantic import BaseModel import pandas as pd from src.core.database import MarketData, get_database from src.data.pricing_service import get_pricing_service from src.core.config import get_config router = APIRouter() @router.get("/ohlcv/{symbol:path}") async def get_ohlcv( symbol: str, timeframe: str = "1h", limit: int = 100, exchange: str = "coinbase" # Default exchange ): """Get OHLCV data for a symbol.""" from sqlalchemy import select try: # Try database first try: db = get_database() async with db.get_session() as session: # Use select() for async compatibility stmt = select(MarketData).filter_by( symbol=symbol, timeframe=timeframe, exchange=exchange ).order_by(MarketData.timestamp.desc()).limit(limit) result = await session.execute(stmt) data = result.scalars().all() if data: return [ { "time": int(d.timestamp.timestamp()), "open": float(d.open), "high": float(d.high), "low": float(d.low), "close": float(d.close), "volume": float(d.volume) } for d in reversed(data) ] except Exception as db_error: import sys print(f"Database query failed, falling back to live data: {db_error}", file=sys.stderr) # If no data in DB or DB error, fetch live from pricing service try: pricing_service = get_pricing_service() # pricing_service.get_ohlcv is currently sync in its implementation but we call it from our async handler ohlcv_data = pricing_service.get_ohlcv( symbol=symbol, timeframe=timeframe, limit=limit ) if ohlcv_data: # Convert to frontend format: [timestamp, open, high, low, close, volume] -> {time, open, high, low, close, volume} return [ { "time": int(candle[0] / 1000), # Convert ms to seconds "open": float(candle[1]), "high": float(candle[2]), "low": float(candle[3]), "close": float(candle[4]), "volume": float(candle[5]) } for candle in ohlcv_data ] except Exception as fetch_error: import sys print(f"Failed to fetch live data: {fetch_error}", file=sys.stderr) # If all else fails, return empty list return [] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/ticker/{symbol:path}") async def get_ticker(symbol: str): """Get current ticker data for a symbol. Returns ticker data with provider information. """ try: pricing_service = get_pricing_service() ticker_data = pricing_service.get_ticker(symbol) if not ticker_data: raise HTTPException(status_code=404, detail=f"Ticker data not available for {symbol}") active_provider = pricing_service.get_active_provider() return { "symbol": symbol, "bid": float(ticker_data.get('bid', 0)), "ask": float(ticker_data.get('ask', 0)), "last": float(ticker_data.get('last', 0)), "high": float(ticker_data.get('high', 0)), "low": float(ticker_data.get('low', 0)), "volume": float(ticker_data.get('volume', 0)), "timestamp": ticker_data.get('timestamp'), "provider": active_provider, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/health") async def get_provider_health(provider: Optional[str] = Query(None, description="Specific provider name")): """Get health status for pricing providers. Args: provider: Optional provider name to get health for specific provider """ try: pricing_service = get_pricing_service() health_data = pricing_service.get_provider_health(provider) return { "active_provider": pricing_service.get_active_provider(), "health": health_data, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/status") async def get_provider_status(): """Get detailed status for all pricing providers.""" try: pricing_service = get_pricing_service() health_data = pricing_service.get_provider_health() cache_stats = pricing_service.get_cache_stats() return { "active_provider": pricing_service.get_active_provider(), "providers": health_data, "cache": cache_stats, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/providers/config") async def get_provider_config(): """Get provider configuration.""" try: config = get_config() provider_config = config.get("data_providers", {}) return provider_config except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class ProviderConfigUpdate(BaseModel): """Provider configuration update model.""" primary: Optional[List[Dict[str, Any]]] = None fallback: Optional[Dict[str, Any]] = None caching: Optional[Dict[str, Any]] = None websocket: Optional[Dict[str, Any]] = None @router.put("/providers/config") async def update_provider_config(config_update: ProviderConfigUpdate = Body(...)): """Update provider configuration.""" try: config = get_config() current_config = config.get("data_providers", {}) # Update configuration if config_update.primary is not None: current_config["primary"] = config_update.primary if config_update.fallback is not None: current_config["fallback"] = {**current_config.get("fallback", {}), **config_update.fallback} if config_update.caching is not None: current_config["caching"] = {**current_config.get("caching", {}), **config_update.caching} if config_update.websocket is not None: current_config["websocket"] = {**current_config.get("websocket", {}), **config_update.websocket} # Save configuration config.set("data_providers", current_config) config.save() return {"message": "Configuration updated successfully", "config": current_config} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/spread") async def get_spread_data( primary_symbol: str = Query(..., description="Primary symbol (e.g., SOL/USD)"), secondary_symbol: str = Query(..., description="Secondary symbol (e.g., AVAX/USD)"), timeframe: str = Query("1h", description="Timeframe"), lookback: int = Query(50, description="Number of candles to fetch"), ): """Get spread and Z-Score data for pairs trading visualization. Returns spread ratio and Z-Score time series for the given symbol pair. """ try: pricing_service = get_pricing_service() # Fetch OHLCV for both symbols ohlcv_a = pricing_service.get_ohlcv( symbol=primary_symbol, timeframe=timeframe, limit=lookback ) ohlcv_b = pricing_service.get_ohlcv( symbol=secondary_symbol, timeframe=timeframe, limit=lookback ) if not ohlcv_a or not ohlcv_b: raise HTTPException(status_code=404, detail="Could not fetch data for one or both symbols") # Convert to DataFrames df_a = pd.DataFrame(ohlcv_a, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_b = pd.DataFrame(ohlcv_b, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # Align by length min_len = min(len(df_a), len(df_b)) df_a = df_a.tail(min_len).reset_index(drop=True) df_b = df_b.tail(min_len).reset_index(drop=True) # Calculate spread (ratio) closes_a = df_a['close'].astype(float) closes_b = df_b['close'].astype(float) spread = closes_a / closes_b # Calculate Z-Score with rolling window lookback_window = min(20, min_len - 1) rolling_mean = spread.rolling(window=lookback_window).mean() rolling_std = spread.rolling(window=lookback_window).std() z_score = (spread - rolling_mean) / rolling_std # Build response result = [] for i in range(min_len): result.append({ "timestamp": int(df_a['timestamp'].iloc[i]), "spread": float(spread.iloc[i]) if not pd.isna(spread.iloc[i]) else None, "zScore": float(z_score.iloc[i]) if not pd.isna(z_score.iloc[i]) else None, "priceA": float(closes_a.iloc[i]), "priceB": float(closes_b.iloc[i]), }) # Filter out entries with null Z-Score (during warmup period) result = [r for r in result if r["zScore"] is not None] return { "primarySymbol": primary_symbol, "secondarySymbol": secondary_symbol, "timeframe": timeframe, "lookbackWindow": lookback_window, "data": result, "currentSpread": result[-1]["spread"] if result else None, "currentZScore": result[-1]["zScore"] if result else None, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))