6.9 KiB
Pricing Provider Development Guide
This guide explains how to add new pricing data providers to the Crypto Trader system.
Overview
Pricing providers are responsible for fetching market data (prices, OHLCV candlestick data) without requiring API keys. They differ from exchange adapters, which handle trading operations and require authentication.
The system uses a multi-tier provider strategy:
- Primary Providers: CCXT-based providers (Kraken, Coinbase, Binance)
- Fallback Provider: CoinGecko API
Provider Interface
All pricing providers must implement the BasePricingProvider interface, located in src/data/providers/base_provider.py.
Required Methods
name(property): Return the provider's display namesupports_websocket(property): Return True if the provider supports WebSocket connectionsconnect(): Establish connection to the provider, return True if successfuldisconnect(): Close connection and clean up resourcesget_ticker(symbol: str) -> Dict: Get current ticker data for a symbolget_ohlcv(symbol, timeframe, since, limit) -> List[List]: Get historical OHLCV datasubscribe_ticker(symbol, callback) -> bool: Subscribe to real-time ticker updates
Ticker Data Format
The get_ticker() method should return a dictionary with the following keys:
{
'symbol': str, # Trading pair (e.g., 'BTC/USD')
'bid': Decimal, # Best bid price
'ask': Decimal, # Best ask price
'last': Decimal, # Last traded price
'high': Decimal, # 24h high
'low': Decimal, # 24h low
'volume': Decimal, # 24h volume
'timestamp': int, # Unix timestamp in milliseconds
}
OHLCV Data Format
The get_ohlcv() method should return a list of candles, where each candle is:
[timestamp_ms, open, high, low, close, volume]
All values should be numeric (float or Decimal).
Creating a New Provider
Step 1: Create Provider Class
Create a new file in src/data/providers/ (e.g., my_provider.py):
"""My custom pricing provider."""
from decimal import Decimal
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
from .base_provider import BasePricingProvider
from ...core.logger import get_logger
logger = get_logger(__name__)
class MyProvider(BasePricingProvider):
"""My custom pricing provider implementation."""
@property
def name(self) -> str:
return "MyProvider"
@property
def supports_websocket(self) -> bool:
return False # Set to True if WebSocket supported
def connect(self) -> bool:
"""Connect to provider."""
try:
# Initialize connection
self._connected = True
logger.info(f"Connected to {self.name}")
return True
except Exception as e:
logger.error(f"Failed to connect: {e}")
self._connected = False
return False
def disconnect(self):
"""Disconnect from provider."""
self._connected = False
logger.info(f"Disconnected from {self.name}")
def get_ticker(self, symbol: str) -> Dict[str, Any]:
"""Get current ticker data."""
# Implementation here
pass
def get_ohlcv(
self,
symbol: str,
timeframe: str = '1h',
since: Optional[datetime] = None,
limit: int = 100
) -> List[List]:
"""Get OHLCV data."""
# Implementation here
pass
def subscribe_ticker(self, symbol: str, callback: Callable) -> bool:
"""Subscribe to ticker updates."""
# Implementation here
pass
Step 2: Register Provider
Update src/data/providers/__init__.py to include your provider:
from .my_provider import MyProvider
__all__ = [..., 'MyProvider']
Step 3: Add to Pricing Service
Update src/data/pricing_service.py to include your provider in the initialization:
# Add to _initialize_providers method
try:
my_provider = MyProvider()
if my_provider.connect():
self._providers[my_provider.name] = my_provider
self._provider_priority.append(my_provider.name)
except Exception as e:
logger.error(f"Error initializing MyProvider: {e}")
Step 4: Add Configuration
Update src/core/config.py to include configuration options for your provider:
"data_providers": {
"primary": [
# ... existing providers ...
{"name": "my_provider", "enabled": True, "priority": 4},
],
# ...
}
Best Practices
Error Handling
- Always catch exceptions in provider methods
- Return empty data structures (
{}or[]) on error rather than raising - Log errors with appropriate detail level
Rate Limiting
- Respect API rate limits
- Implement appropriate delays between requests
- Use exponential backoff for retries
Symbol Normalization
- Override
normalize_symbol()if your provider uses different symbol formats - Handle common variations (BTC/USD vs BTC-USD vs BTCUSD)
Caching
- The pricing service handles caching automatically
- Focus on providing fresh data from the API
- Don't implement your own caching layer
Testing
Create unit tests in tests/unit/data/providers/test_my_provider.py:
"""Unit tests for MyProvider."""
import pytest
from unittest.mock import Mock, patch
from src.data.providers.my_provider import MyProvider
class TestMyProvider:
def test_connect(self):
provider = MyProvider()
result = provider.connect()
assert result is True
def test_get_ticker(self):
provider = MyProvider()
provider.connect()
ticker = provider.get_ticker("BTC/USD")
assert 'last' in ticker
assert ticker['last'] > 0
Example: CoinGecko Provider
See src/data/providers/coingecko_provider.py for a complete example of a REST API-based provider.
Example: CCXT Provider
See src/data/providers/ccxt_provider.py for an example of a provider that wraps an existing library (CCXT).
Health Monitoring
The pricing service automatically monitors provider health:
- Tracks success/failure rates
- Measures response times
- Implements circuit breaker pattern
- Automatically fails over to next provider
Your provider doesn't need to implement health monitoring - it's handled by the HealthMonitor class.
Subscriptions
For real-time updates, implement subscribe_ticker(). The service expects:
- Subscriptions to be persistent until
unsubscribe_ticker()is called - Callbacks to be invoked with ticker data dictionaries
- Graceful handling of connection failures
If WebSocket is not supported, use polling with appropriate intervals (typically 1-5 seconds for ticker data).
Questions?
For more information, see:
src/data/providers/base_provider.py- Base interfacesrc/data/pricing_service.py- Service implementationsrc/data/health_monitor.py- Health monitoring