Files
crypto_trader/tests/e2e/test_pricing_data_e2e.py

341 lines
12 KiB
Python
Raw Permalink Normal View History

"""End-to-end tests for pricing data flow."""
import pytest
import asyncio
from unittest.mock import Mock, patch, AsyncMock
from decimal import Decimal
from datetime import datetime
from src.data.pricing_service import get_pricing_service, PricingService
from src.data.providers.base_provider import BasePricingProvider
class MockProvider(BasePricingProvider):
"""Mock provider for E2E testing."""
def __init__(self, name: str = "MockProvider"):
super().__init__()
self._name = name
self._ticker_data = {
'symbol': 'BTC/USD',
'bid': Decimal('50000'),
'ask': Decimal('50001'),
'last': Decimal('50000.5'),
'high': Decimal('51000'),
'low': Decimal('49000'),
'volume': Decimal('1000000'),
'timestamp': int(datetime.now().timestamp() * 1000),
}
self._ohlcv_data = [
[int(datetime.now().timestamp() * 1000), 50000, 51000, 49000, 50000, 1000],
]
self._callbacks = []
@property
def name(self) -> str:
return self._name
@property
def supports_websocket(self) -> bool:
return False
def connect(self) -> bool:
self._connected = True
return True
def disconnect(self):
self._connected = False
self._callbacks.clear()
def get_ticker(self, symbol: str):
return self._ticker_data.copy()
def get_ohlcv(self, symbol: str, timeframe: str = '1h', since=None, limit: int = 100):
return self._ohlcv_data.copy()
def subscribe_ticker(self, symbol: str, callback) -> bool:
if symbol not in self._subscribers:
self._subscribers[symbol] = []
self._subscribers[symbol].append(callback)
self._callbacks.append((symbol, callback))
# Simulate price update
import threading
def send_update():
import time
time.sleep(0.1)
if callback and symbol in self._subscribers:
callback(self._ticker_data.copy())
thread = threading.Thread(target=send_update, daemon=True)
thread.start()
return True
@pytest.mark.e2e
class TestPricingDataE2E:
"""End-to-end tests for pricing data system."""
@pytest.fixture(autouse=True)
def reset_service(self):
"""Reset pricing service between tests."""
import src.data.pricing_service
src.data.pricing_service._pricing_service = None
yield
src.data.pricing_service._pricing_service = None
@patch('src.data.pricing_service.get_config')
def test_pricing_service_initialization(self, mock_get_config):
"""Test pricing service initializes correctly."""
mock_config = Mock()
mock_config.get = Mock(side_effect=lambda key, default=None: {
"data_providers.primary": [
{"name": "mock", "enabled": True, "priority": 1}
],
"data_providers.fallback": {"enabled": True, "api_key": ""},
"data_providers.caching.ticker_ttl": 2,
"data_providers.caching.ohlcv_ttl": 60,
"data_providers.caching.max_cache_size": 1000,
}.get(key, default))
mock_get_config.return_value = mock_config
# Patch provider initialization to use mock
with patch('src.data.pricing_service.CCXTProvider') as mock_ccxt:
mock_provider = MockProvider("CCXT-Mock")
mock_ccxt.return_value = mock_provider
service = get_pricing_service()
assert service is not None
assert service.cache is not None
assert service.health_monitor is not None
def test_get_ticker_with_failover(self):
"""Test getting ticker with provider failover."""
# Create service with mock providers
service = PricingService()
# Create two providers - one will fail, one will succeed
provider1 = MockProvider("Provider1")
provider1.get_ticker = Mock(side_effect=Exception("Provider1 failed"))
provider1.connect = Mock(return_value=True)
provider2 = MockProvider("Provider2")
provider2.connect = Mock(return_value=True)
service._providers = {"Provider1": provider1, "Provider2": provider2}
service._provider_priority = ["Provider1", "Provider2"]
service._active_provider = "Provider1"
# Get ticker - should failover to Provider2
ticker = service.get_ticker("BTC/USD", use_cache=False)
assert ticker is not None
assert ticker['symbol'] == 'BTC/USD'
assert provider1.get_ticker.called
assert provider2.get_ticker.called
def test_caching_works(self):
"""Test that caching works correctly."""
service = PricingService()
provider = MockProvider()
provider.connect()
service._providers["MockProvider"] = provider
service._active_provider = "MockProvider"
# First call - should hit provider
ticker1 = service.get_ticker("BTC/USD", use_cache=True)
# Modify provider response
provider._ticker_data['last'] = Decimal('60000')
# Second call - should get cached value
ticker2 = service.get_ticker("BTC/USD", use_cache=True)
# Should be same as first call (cached)
assert ticker1['last'] == ticker2['last']
def test_subscription_and_updates(self):
"""Test subscribing to price updates."""
service = PricingService()
provider = MockProvider()
provider.connect()
service._providers["MockProvider"] = provider
service._active_provider = "MockProvider"
received_updates = []
def callback(data):
received_updates.append(data)
# Subscribe
success = service.subscribe_ticker("BTC/USD", callback)
assert success is True
# Wait for update
import time
time.sleep(0.2)
# Should have received at least one update
assert len(received_updates) > 0
assert received_updates[0]['symbol'] == 'BTC/USD'
def test_health_monitoring(self):
"""Test health monitoring tracks provider status."""
service = PricingService()
provider = MockProvider("TestProvider")
provider.connect()
service._providers["TestProvider"] = provider
service._active_provider = "TestProvider"
# Record some operations
service.health_monitor.record_success("TestProvider", 0.1)
service.health_monitor.record_success("TestProvider", 0.2)
service.health_monitor.record_failure("TestProvider")
# Check health
health = service.get_provider_health("TestProvider")
assert health is not None
assert health['success_count'] == 2
assert health['failure_count'] == 1
assert health['avg_response_time'] > 0
def test_provider_priority_selection(self):
"""Test that providers are selected by priority."""
service = PricingService()
provider1 = MockProvider("Provider1")
provider1.connect()
provider2 = MockProvider("Provider2")
provider2.connect()
service._providers = {"Provider1": provider1, "Provider2": provider2}
service._provider_priority = ["Provider1", "Provider2"]
# Select active provider
active = service._select_active_provider()
assert active == "Provider1" # Should select first in priority
def test_cache_stats(self):
"""Test cache statistics are tracked."""
service = PricingService()
# Perform some cache operations
service.cache.set("key1", "value1")
service.cache.get("key1") # Hit
service.cache.get("missing") # Miss
stats = service.get_cache_stats()
assert stats['hits'] >= 1
assert stats['misses'] >= 1
assert stats['size'] >= 1
def test_get_ohlcv_flow(self):
"""Test complete OHLCV data flow."""
service = PricingService()
provider = MockProvider()
provider.connect()
service._providers["MockProvider"] = provider
service._active_provider = "MockProvider"
# Get OHLCV data
ohlcv = service.get_ohlcv("BTC/USD", "1h", limit=10, use_cache=False)
assert len(ohlcv) > 0
assert len(ohlcv[0]) == 6 # timestamp, open, high, low, close, volume
assert ohlcv[0][0] > 0 # Valid timestamp
def test_unsubscribe_ticker(self):
"""Test unsubscribing from ticker updates."""
service = PricingService()
provider = MockProvider()
provider.connect()
service._providers["MockProvider"] = provider
service._active_provider = "MockProvider"
callback = Mock()
# Subscribe
service.subscribe_ticker("BTC/USD", callback)
assert "ticker:BTC/USD" in service._subscriptions
# Unsubscribe
service.unsubscribe_ticker("BTC/USD", callback)
assert "ticker:BTC/USD" not in service._subscriptions
def test_multiple_symbol_subscriptions(self):
"""Test subscribing to multiple symbols."""
service = PricingService()
provider = MockProvider()
provider.connect()
service._providers["MockProvider"] = provider
service._active_provider = "MockProvider"
callback1 = Mock()
callback2 = Mock()
# Subscribe to multiple symbols
service.subscribe_ticker("BTC/USD", callback1)
service.subscribe_ticker("ETH/USD", callback2)
assert "ticker:BTC/USD" in service._subscriptions
assert "ticker:ETH/USD" in service._subscriptions
assert len(service._subscriptions) == 2
@pytest.mark.e2e
@pytest.mark.asyncio
class TestPricingDataWebSocketE2E:
"""E2E tests for WebSocket pricing updates."""
async def test_websocket_price_broadcast(self):
"""Test WebSocket broadcasts price updates."""
# This test would require a running WebSocket server
# For now, we test the integration point
from backend.api.websocket import ConnectionManager
manager = ConnectionManager()
# Mock pricing service
with patch('backend.api.websocket.get_pricing_service') as mock_get_service:
mock_service = Mock()
mock_service.subscribe_ticker = Mock(return_value=True)
mock_get_service.return_value = mock_service
# Subscribe to symbol
manager.subscribe_to_symbol("BTC/USD")
assert "BTC/USD" in manager.subscribed_symbols
assert mock_service.subscribe_ticker.called
async def test_websocket_subscription_flow(self):
"""Test WebSocket subscription and unsubscription."""
from backend.api.websocket import ConnectionManager
manager = ConnectionManager()
with patch('backend.api.websocket.get_pricing_service') as mock_get_service:
mock_service = Mock()
mock_service.subscribe_ticker = Mock(return_value=True)
mock_service.unsubscribe_ticker = Mock()
mock_get_service.return_value = mock_service
# Subscribe
manager.subscribe_to_symbol("BTC/USD")
assert "BTC/USD" in manager.subscribed_symbols
# Unsubscribe
manager.unsubscribe_from_symbol("BTC/USD")
assert "BTC/USD" not in manager.subscribed_symbols
assert mock_service.unsubscribe_ticker.called