Files
crypto_trader/docs/developer/pricing_providers.md

6.9 KiB

Pricing Provider Development Guide

This guide explains how to add new pricing data providers to the Crypto Trader system.

Overview

Pricing providers are responsible for fetching market data (prices, OHLCV candlestick data) without requiring API keys. They differ from exchange adapters, which handle trading operations and require authentication.

The system uses a multi-tier provider strategy:

  • Primary Providers: CCXT-based providers (Kraken, Coinbase, Binance)
  • Fallback Provider: CoinGecko API

Provider Interface

All pricing providers must implement the BasePricingProvider interface, located in src/data/providers/base_provider.py.

Required Methods

  1. name (property): Return the provider's display name
  2. supports_websocket (property): Return True if the provider supports WebSocket connections
  3. connect(): Establish connection to the provider, return True if successful
  4. disconnect(): Close connection and clean up resources
  5. get_ticker(symbol: str) -> Dict: Get current ticker data for a symbol
  6. get_ohlcv(symbol, timeframe, since, limit) -> List[List]: Get historical OHLCV data
  7. subscribe_ticker(symbol, callback) -> bool: Subscribe to real-time ticker updates

Ticker Data Format

The get_ticker() method should return a dictionary with the following keys:

{
    'symbol': str,           # Trading pair (e.g., 'BTC/USD')
    'bid': Decimal,          # Best bid price
    'ask': Decimal,          # Best ask price
    'last': Decimal,         # Last traded price
    'high': Decimal,         # 24h high
    'low': Decimal,          # 24h low
    'volume': Decimal,       # 24h volume
    'timestamp': int,        # Unix timestamp in milliseconds
}

OHLCV Data Format

The get_ohlcv() method should return a list of candles, where each candle is:

[timestamp_ms, open, high, low, close, volume]

All values should be numeric (float or Decimal).

Creating a New Provider

Step 1: Create Provider Class

Create a new file in src/data/providers/ (e.g., my_provider.py):

"""My custom pricing provider."""

from decimal import Decimal
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
from .base_provider import BasePricingProvider
from ...core.logger import get_logger

logger = get_logger(__name__)


class MyProvider(BasePricingProvider):
    """My custom pricing provider implementation."""
    
    @property
    def name(self) -> str:
        return "MyProvider"
    
    @property
    def supports_websocket(self) -> bool:
        return False  # Set to True if WebSocket supported
    
    def connect(self) -> bool:
        """Connect to provider."""
        try:
            # Initialize connection
            self._connected = True
            logger.info(f"Connected to {self.name}")
            return True
        except Exception as e:
            logger.error(f"Failed to connect: {e}")
            self._connected = False
            return False
    
    def disconnect(self):
        """Disconnect from provider."""
        self._connected = False
        logger.info(f"Disconnected from {self.name}")
    
    def get_ticker(self, symbol: str) -> Dict[str, Any]:
        """Get current ticker data."""
        # Implementation here
        pass
    
    def get_ohlcv(
        self,
        symbol: str,
        timeframe: str = '1h',
        since: Optional[datetime] = None,
        limit: int = 100
    ) -> List[List]:
        """Get OHLCV data."""
        # Implementation here
        pass
    
    def subscribe_ticker(self, symbol: str, callback: Callable) -> bool:
        """Subscribe to ticker updates."""
        # Implementation here
        pass

Step 2: Register Provider

Update src/data/providers/__init__.py to include your provider:

from .my_provider import MyProvider

__all__ = [..., 'MyProvider']

Step 3: Add to Pricing Service

Update src/data/pricing_service.py to include your provider in the initialization:

# Add to _initialize_providers method
try:
    my_provider = MyProvider()
    if my_provider.connect():
        self._providers[my_provider.name] = my_provider
        self._provider_priority.append(my_provider.name)
except Exception as e:
    logger.error(f"Error initializing MyProvider: {e}")

Step 4: Add Configuration

Update src/core/config.py to include configuration options for your provider:

"data_providers": {
    "primary": [
        # ... existing providers ...
        {"name": "my_provider", "enabled": True, "priority": 4},
    ],
    # ...
}

Best Practices

Error Handling

  • Always catch exceptions in provider methods
  • Return empty data structures ({} or []) on error rather than raising
  • Log errors with appropriate detail level

Rate Limiting

  • Respect API rate limits
  • Implement appropriate delays between requests
  • Use exponential backoff for retries

Symbol Normalization

  • Override normalize_symbol() if your provider uses different symbol formats
  • Handle common variations (BTC/USD vs BTC-USD vs BTCUSD)

Caching

  • The pricing service handles caching automatically
  • Focus on providing fresh data from the API
  • Don't implement your own caching layer

Testing

Create unit tests in tests/unit/data/providers/test_my_provider.py:

"""Unit tests for MyProvider."""

import pytest
from unittest.mock import Mock, patch
from src.data.providers.my_provider import MyProvider

class TestMyProvider:
    def test_connect(self):
        provider = MyProvider()
        result = provider.connect()
        assert result is True
    
    def test_get_ticker(self):
        provider = MyProvider()
        provider.connect()
        ticker = provider.get_ticker("BTC/USD")
        assert 'last' in ticker
        assert ticker['last'] > 0

Example: CoinGecko Provider

See src/data/providers/coingecko_provider.py for a complete example of a REST API-based provider.

Example: CCXT Provider

See src/data/providers/ccxt_provider.py for an example of a provider that wraps an existing library (CCXT).

Health Monitoring

The pricing service automatically monitors provider health:

  • Tracks success/failure rates
  • Measures response times
  • Implements circuit breaker pattern
  • Automatically fails over to next provider

Your provider doesn't need to implement health monitoring - it's handled by the HealthMonitor class.

Subscriptions

For real-time updates, implement subscribe_ticker(). The service expects:

  • Subscriptions to be persistent until unsubscribe_ticker() is called
  • Callbacks to be invoked with ticker data dictionaries
  • Graceful handling of connection failures

If WebSocket is not supported, use polling with appropriate intervals (typically 1-5 seconds for ticker data).

Questions?

For more information, see:

  • src/data/providers/base_provider.py - Base interface
  • src/data/pricing_service.py - Service implementation
  • src/data/health_monitor.py - Health monitoring