Files
crypto_trader/backend/api/market_data.py

281 lines
10 KiB
Python
Raw Normal View History

"""Market Data API endpoints."""
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, Query, Body
from pydantic import BaseModel
import pandas as pd
from src.core.database import MarketData, get_database
from src.data.pricing_service import get_pricing_service
from src.core.config import get_config
router = APIRouter()
@router.get("/ohlcv/{symbol:path}")
async def get_ohlcv(
symbol: str,
timeframe: str = "1h",
limit: int = 100,
exchange: str = "coinbase" # Default exchange
):
"""Get OHLCV data for a symbol."""
from sqlalchemy import select
try:
# Try database first
try:
db = get_database()
async with db.get_session() as session:
# Use select() for async compatibility
stmt = select(MarketData).filter_by(
symbol=symbol,
timeframe=timeframe,
exchange=exchange
).order_by(MarketData.timestamp.desc()).limit(limit)
result = await session.execute(stmt)
data = result.scalars().all()
if data:
return [
{
"time": int(d.timestamp.timestamp()),
"open": float(d.open),
"high": float(d.high),
"low": float(d.low),
"close": float(d.close),
"volume": float(d.volume)
}
for d in reversed(data)
]
except Exception as db_error:
import sys
print(f"Database query failed, falling back to live data: {db_error}", file=sys.stderr)
# If no data in DB or DB error, fetch live from pricing service
try:
pricing_service = get_pricing_service()
# pricing_service.get_ohlcv is currently sync in its implementation but we call it from our async handler
ohlcv_data = pricing_service.get_ohlcv(
symbol=symbol,
timeframe=timeframe,
limit=limit
)
if ohlcv_data:
# Convert to frontend format: [timestamp, open, high, low, close, volume] -> {time, open, high, low, close, volume}
return [
{
"time": int(candle[0] / 1000), # Convert ms to seconds
"open": float(candle[1]),
"high": float(candle[2]),
"low": float(candle[3]),
"close": float(candle[4]),
"volume": float(candle[5])
}
for candle in ohlcv_data
]
except Exception as fetch_error:
import sys
print(f"Failed to fetch live data: {fetch_error}", file=sys.stderr)
# If all else fails, return empty list
return []
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ticker/{symbol:path}")
async def get_ticker(symbol: str):
"""Get current ticker data for a symbol.
Returns ticker data with provider information.
"""
try:
pricing_service = get_pricing_service()
ticker_data = pricing_service.get_ticker(symbol)
if not ticker_data:
raise HTTPException(status_code=404, detail=f"Ticker data not available for {symbol}")
active_provider = pricing_service.get_active_provider()
return {
"symbol": symbol,
"bid": float(ticker_data.get('bid', 0)),
"ask": float(ticker_data.get('ask', 0)),
"last": float(ticker_data.get('last', 0)),
"high": float(ticker_data.get('high', 0)),
"low": float(ticker_data.get('low', 0)),
"volume": float(ticker_data.get('volume', 0)),
"timestamp": ticker_data.get('timestamp'),
"provider": active_provider,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers/health")
async def get_provider_health(provider: Optional[str] = Query(None, description="Specific provider name")):
"""Get health status for pricing providers.
Args:
provider: Optional provider name to get health for specific provider
"""
try:
pricing_service = get_pricing_service()
health_data = pricing_service.get_provider_health(provider)
return {
"active_provider": pricing_service.get_active_provider(),
"health": health_data,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers/status")
async def get_provider_status():
"""Get detailed status for all pricing providers."""
try:
pricing_service = get_pricing_service()
health_data = pricing_service.get_provider_health()
cache_stats = pricing_service.get_cache_stats()
return {
"active_provider": pricing_service.get_active_provider(),
"providers": health_data,
"cache": cache_stats,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers/config")
async def get_provider_config():
"""Get provider configuration."""
try:
config = get_config()
provider_config = config.get("data_providers", {})
return provider_config
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class ProviderConfigUpdate(BaseModel):
"""Provider configuration update model."""
primary: Optional[List[Dict[str, Any]]] = None
fallback: Optional[Dict[str, Any]] = None
caching: Optional[Dict[str, Any]] = None
websocket: Optional[Dict[str, Any]] = None
@router.put("/providers/config")
async def update_provider_config(config_update: ProviderConfigUpdate = Body(...)):
"""Update provider configuration."""
try:
config = get_config()
current_config = config.get("data_providers", {})
# Update configuration
if config_update.primary is not None:
current_config["primary"] = config_update.primary
if config_update.fallback is not None:
current_config["fallback"] = {**current_config.get("fallback", {}), **config_update.fallback}
if config_update.caching is not None:
current_config["caching"] = {**current_config.get("caching", {}), **config_update.caching}
if config_update.websocket is not None:
current_config["websocket"] = {**current_config.get("websocket", {}), **config_update.websocket}
# Save configuration
config.set("data_providers", current_config)
config.save()
return {"message": "Configuration updated successfully", "config": current_config}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/spread")
async def get_spread_data(
primary_symbol: str = Query(..., description="Primary symbol (e.g., SOL/USD)"),
secondary_symbol: str = Query(..., description="Secondary symbol (e.g., AVAX/USD)"),
timeframe: str = Query("1h", description="Timeframe"),
lookback: int = Query(50, description="Number of candles to fetch"),
):
"""Get spread and Z-Score data for pairs trading visualization.
Returns spread ratio and Z-Score time series for the given symbol pair.
"""
try:
pricing_service = get_pricing_service()
# Fetch OHLCV for both symbols
ohlcv_a = pricing_service.get_ohlcv(
symbol=primary_symbol,
timeframe=timeframe,
limit=lookback
)
ohlcv_b = pricing_service.get_ohlcv(
symbol=secondary_symbol,
timeframe=timeframe,
limit=lookback
)
if not ohlcv_a or not ohlcv_b:
raise HTTPException(status_code=404, detail="Could not fetch data for one or both symbols")
# Convert to DataFrames
df_a = pd.DataFrame(ohlcv_a, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_b = pd.DataFrame(ohlcv_b, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Align by length
min_len = min(len(df_a), len(df_b))
df_a = df_a.tail(min_len).reset_index(drop=True)
df_b = df_b.tail(min_len).reset_index(drop=True)
# Calculate spread (ratio)
closes_a = df_a['close'].astype(float)
closes_b = df_b['close'].astype(float)
spread = closes_a / closes_b
# Calculate Z-Score with rolling window
lookback_window = min(20, min_len - 1)
rolling_mean = spread.rolling(window=lookback_window).mean()
rolling_std = spread.rolling(window=lookback_window).std()
z_score = (spread - rolling_mean) / rolling_std
# Build response
result = []
for i in range(min_len):
result.append({
"timestamp": int(df_a['timestamp'].iloc[i]),
"spread": float(spread.iloc[i]) if not pd.isna(spread.iloc[i]) else None,
"zScore": float(z_score.iloc[i]) if not pd.isna(z_score.iloc[i]) else None,
"priceA": float(closes_a.iloc[i]),
"priceB": float(closes_b.iloc[i]),
})
# Filter out entries with null Z-Score (during warmup period)
result = [r for r in result if r["zScore"] is not None]
return {
"primarySymbol": primary_symbol,
"secondarySymbol": secondary_symbol,
"timeframe": timeframe,
"lookbackWindow": lookback_window,
"data": result,
"currentSpread": result[-1]["spread"] if result else None,
"currentZScore": result[-1]["zScore"] if result else None,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))