Files

145 lines
4.3 KiB
Python
Raw Permalink Normal View History

"""Alerts API endpoints."""
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select
from src.core.database import Alert, get_database
router = APIRouter()
def get_alert_manager():
"""Get alert manager instance."""
from src.alerts.manager import get_alert_manager as _get_alert_manager
return _get_alert_manager()
class AlertCreate(BaseModel):
"""Create alert request."""
name: str
alert_type: str # price, indicator, risk, system
condition: dict
class AlertUpdate(BaseModel):
"""Update alert request."""
name: Optional[str] = None
condition: Optional[dict] = None
enabled: Optional[bool] = None
class AlertResponse(BaseModel):
"""Alert response."""
id: int
name: str
alert_type: str
condition: dict
enabled: bool
triggered: bool
triggered_at: Optional[datetime] = None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
@router.get("/", response_model=List[AlertResponse])
async def list_alerts(
enabled_only: bool = False,
manager=Depends(get_alert_manager)
):
"""List all alerts."""
try:
alerts = await manager.list_alerts(enabled_only=enabled_only)
return [AlertResponse.model_validate(alert) for alert in alerts]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/", response_model=AlertResponse)
async def create_alert(
alert_data: AlertCreate,
manager=Depends(get_alert_manager)
):
"""Create a new alert."""
try:
alert = await manager.create_alert(
name=alert_data.name,
alert_type=alert_data.alert_type,
condition=alert_data.condition
)
return AlertResponse.model_validate(alert)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{alert_id}", response_model=AlertResponse)
async def get_alert(alert_id: int):
"""Get alert by ID."""
try:
db = get_database()
async with db.get_session() as session:
stmt = select(Alert).where(Alert.id == alert_id)
result = await session.execute(stmt)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
return AlertResponse.model_validate(alert)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.put("/{alert_id}", response_model=AlertResponse)
async def update_alert(alert_id: int, alert_data: AlertUpdate):
"""Update an alert."""
try:
db = get_database()
async with db.get_session() as session:
stmt = select(Alert).where(Alert.id == alert_id)
result = await session.execute(stmt)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
if alert_data.name is not None:
alert.name = alert_data.name
if alert_data.condition is not None:
alert.condition = alert_data.condition
if alert_data.enabled is not None:
alert.enabled = alert_data.enabled
await session.commit()
await session.refresh(alert)
return AlertResponse.model_validate(alert)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/{alert_id}")
async def delete_alert(alert_id: int):
"""Delete an alert."""
try:
db = get_database()
async with db.get_session() as session:
stmt = select(Alert).where(Alert.id == alert_id)
result = await session.execute(stmt)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
await session.delete(alert)
await session.commit()
return {"status": "deleted", "alert_id": alert_id}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))