Compare commits
3 Commits
1049bc68ab
...
099432bf3f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
099432bf3f | ||
|
|
afe1a17d09 | ||
|
|
cc60da49e7 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -25,6 +25,10 @@ venv/
|
||||
env/
|
||||
ENV/
|
||||
|
||||
# Node modules
|
||||
node_modules/
|
||||
frontend/node_modules/
|
||||
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
|
||||
746
frontend/package-lock.json
generated
746
frontend/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
0
scripts/fetch_historical_data.py
Executable file → Normal file
0
scripts/fetch_historical_data.py
Executable file → Normal file
0
scripts/reset_database.py
Executable file → Normal file
0
scripts/reset_database.py
Executable file → Normal file
0
scripts/start_all.sh
Executable file → Normal file
0
scripts/start_all.sh
Executable file → Normal file
@@ -114,6 +114,8 @@ class PerformanceTracker:
|
||||
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
||||
stmt = stmt.where(StrategyPerformance.timestamp >= cutoff_date)
|
||||
# Limit to prevent excessive queries - if we have lots of data, sample it
|
||||
stmt = stmt.order_by(StrategyPerformance.timestamp.desc()).limit(10000)
|
||||
|
||||
result = await session.execute(stmt)
|
||||
records = result.scalars().all()
|
||||
@@ -121,22 +123,32 @@ class PerformanceTracker:
|
||||
if not records:
|
||||
return pd.DataFrame()
|
||||
|
||||
# Convert to DataFrame
|
||||
data = []
|
||||
for record in records:
|
||||
# Get corresponding market conditions
|
||||
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
|
||||
strategy_name=record.strategy_name,
|
||||
symbol=record.symbol,
|
||||
timeframe=record.timeframe
|
||||
).where(
|
||||
MarketConditionsSnapshot.timestamp <= record.timestamp
|
||||
).order_by(
|
||||
MarketConditionsSnapshot.timestamp.desc()
|
||||
).limit(1)
|
||||
self.logger.info(f"Processing {len(records)} performance records for training data")
|
||||
|
||||
snapshot_result = await session.execute(snapshot_stmt)
|
||||
snapshot = snapshot_result.scalar_one_or_none()
|
||||
# Convert to DataFrame - optimize by batching snapshot queries
|
||||
data = []
|
||||
# Batch snapshot lookups to reduce N+1 query problem
|
||||
snapshot_cache = {}
|
||||
for record in records:
|
||||
cache_key = f"{record.strategy_name}:{record.symbol}:{record.timeframe}:{record.timestamp.date()}"
|
||||
|
||||
if cache_key not in snapshot_cache:
|
||||
# Get corresponding market conditions (only once per day per strategy)
|
||||
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
|
||||
strategy_name=record.strategy_name,
|
||||
symbol=record.symbol,
|
||||
timeframe=record.timeframe
|
||||
).where(
|
||||
MarketConditionsSnapshot.timestamp <= record.timestamp
|
||||
).order_by(
|
||||
MarketConditionsSnapshot.timestamp.desc()
|
||||
).limit(1)
|
||||
|
||||
snapshot_result = await session.execute(snapshot_stmt)
|
||||
snapshot = snapshot_result.scalar_one_or_none()
|
||||
snapshot_cache[cache_key] = snapshot
|
||||
else:
|
||||
snapshot = snapshot_cache[cache_key]
|
||||
|
||||
row = {
|
||||
'strategy_name': record.strategy_name,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""ML-based strategy selector for intelligent autopilot."""
|
||||
|
||||
import asyncio
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
|
||||
from src.core.logger import get_logger
|
||||
@@ -396,7 +397,8 @@ class StrategySelector:
|
||||
|
||||
self.logger.info(f"Available strategies for bootstrap: {self._available_strategies}")
|
||||
|
||||
for strategy_name in self._available_strategies:
|
||||
total_strategies = len(self._available_strategies)
|
||||
for strategy_idx, strategy_name in enumerate(self._available_strategies):
|
||||
try:
|
||||
strategy_class = self.strategy_registry._strategies.get(strategy_name.lower())
|
||||
if not strategy_class:
|
||||
@@ -409,7 +411,7 @@ class StrategySelector:
|
||||
)
|
||||
strategy.enabled = True
|
||||
|
||||
self.logger.info(f"Running backtest for {strategy_name}...")
|
||||
self.logger.info(f"Running backtest for {strategy_name} ({strategy_idx + 1}/{total_strategies})...")
|
||||
|
||||
# Run backtest
|
||||
results = await backtest_engine.run_backtest(
|
||||
@@ -461,8 +463,17 @@ class StrategySelector:
|
||||
last_regime = None
|
||||
last_sample_idx = -min_spacing # Allow first sample immediately
|
||||
|
||||
# Limit processing to prevent excessive computation
|
||||
# For small datasets (5 days), process all points but yield periodically
|
||||
data_points = len(market_data) - 50
|
||||
self.logger.info(f"Processing {data_points} data points for {strategy_name}...")
|
||||
|
||||
# Need at least 50 candles for feature calculation
|
||||
for i in range(50, len(market_data)):
|
||||
# Yield control periodically to prevent blocking (every 10 iterations)
|
||||
if i % 10 == 0:
|
||||
await asyncio.sleep(0) # Yield to event loop
|
||||
|
||||
sample_data = market_data.iloc[i-50:i]
|
||||
conditions = self.market_analyzer.analyze_current_conditions(
|
||||
symbol, timeframe, sample_data
|
||||
@@ -505,6 +516,8 @@ class StrategySelector:
|
||||
|
||||
last_regime = current_regime
|
||||
|
||||
self.logger.info(f"Recorded {samples_recorded} samples for {strategy_name}")
|
||||
|
||||
bootstrap_results.append({
|
||||
'strategy': strategy_name,
|
||||
'trades_recorded': samples_recorded,
|
||||
|
||||
@@ -373,7 +373,19 @@ class Database:
|
||||
if "postgresql://" in db_url and "postgresql+asyncpg://" not in db_url:
|
||||
# This is a naive replacement, in production we should handle this better
|
||||
db_url = db_url.replace("postgresql://", "postgresql+asyncpg://")
|
||||
return create_async_engine(db_url, echo=False)
|
||||
# Add connection timeout to prevent hanging
|
||||
# asyncpg connect timeout is set via connect_timeout in connect_args
|
||||
return create_async_engine(
|
||||
db_url,
|
||||
echo=False,
|
||||
connect_args={
|
||||
"server_settings": {"application_name": "crypto_trader"},
|
||||
"timeout": 5, # 5 second connection timeout
|
||||
},
|
||||
pool_pre_ping=True, # Verify connections before using
|
||||
pool_recycle=3600, # Recycle connections after 1 hour
|
||||
pool_timeout=5, # Timeout when getting connection from pool
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported database type: {db_type}. Only 'postgresql' is supported.")
|
||||
|
||||
|
||||
@@ -41,16 +41,15 @@ app.conf.update(
|
||||
task_acks_late=True, # Ensure task is not lost if worker crashes
|
||||
)
|
||||
|
||||
# Optional: Route specific tasks to specific queues if needed
|
||||
app.conf.task_routes = {
|
||||
# ML Training tasks - computationally intensive
|
||||
"src.worker.tasks.train_model_task": {"queue": "ml_training"},
|
||||
"src.worker.tasks.bootstrap_task": {"queue": "ml_training"},
|
||||
"src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"},
|
||||
# Reporting tasks - I/O bound
|
||||
"src.worker.tasks.generate_report_task": {"queue": "reporting"},
|
||||
"src.worker.tasks.export_data_task": {"queue": "reporting"},
|
||||
}
|
||||
# Queue routing disabled - all tasks go to default 'celery' queue
|
||||
# If you want to use separate queues, start workers with: celery -A src.worker.app worker -Q ml_training,celery
|
||||
# app.conf.task_routes = {
|
||||
# "src.worker.tasks.train_model_task": {"queue": "ml_training"},
|
||||
# "src.worker.tasks.bootstrap_task": {"queue": "ml_training"},
|
||||
# "src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"},
|
||||
# "src.worker.tasks.generate_report_task": {"queue": "reporting"},
|
||||
# "src.worker.tasks.export_data_task": {"queue": "reporting"},
|
||||
# }
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.start()
|
||||
|
||||
@@ -54,8 +54,24 @@ def train_model_task(
|
||||
"""
|
||||
logger.info(f"Starting model training workflow (force={force_retrain}, bootstrap={bootstrap}, symbols={symbols})")
|
||||
|
||||
# CRITICAL: Update state IMMEDIATELY before any blocking operations
|
||||
# This must happen in the sync context, not in async_to_sync
|
||||
try:
|
||||
self.update_state(state='PROGRESS', meta={'step': 'init', 'progress': 5, 'message': 'Initializing...'})
|
||||
logger.info("Initial progress state updated")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update initial state: {e}")
|
||||
# Don't fail the task, just log it
|
||||
|
||||
async def run_training_workflow():
|
||||
try:
|
||||
logger.info("Step 1: Resetting singletons...")
|
||||
# Update progress - use try/except in case update_state fails
|
||||
try:
|
||||
self.update_state(state='PROGRESS', meta={'step': 'reset', 'progress': 8, 'message': 'Resetting modules...'})
|
||||
except:
|
||||
pass
|
||||
|
||||
# Force reset singletons to ensure no connection sharing across loops
|
||||
from src.autopilot import strategy_selector as ss_module
|
||||
from src.core import database as db_module
|
||||
@@ -72,6 +88,12 @@ def train_model_task(
|
||||
import src.autopilot.strategy_selector
|
||||
src.autopilot.strategy_selector._strategy_selector = None
|
||||
|
||||
logger.info("Step 2: Getting strategy selector...")
|
||||
try:
|
||||
self.update_state(state='PROGRESS', meta={'step': 'get_selector', 'progress': 10, 'message': 'Getting strategy selector...'})
|
||||
except:
|
||||
pass
|
||||
|
||||
selector = get_strategy_selector()
|
||||
|
||||
# Use passed config or fall back to selector defaults
|
||||
@@ -83,8 +105,13 @@ def train_model_task(
|
||||
logger.info(f"Config: days={bootstrap_days}, timeframe={bootstrap_timeframe}, min_samples={bootstrap_min_samples}, symbols={bootstrap_symbols}")
|
||||
|
||||
# 1. Check existing training data
|
||||
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 10, 'message': 'Checking data...'})
|
||||
logger.info("Step 3: Checking existing training data...")
|
||||
try:
|
||||
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 15, 'message': 'Checking training data...'})
|
||||
except:
|
||||
pass
|
||||
training_data = await selector.performance_tracker.prepare_training_data(min_samples_per_strategy=bootstrap_min_samples)
|
||||
logger.info(f"Training data check complete. Has data: {training_data is not None and len(training_data.get('X', [])) > 0}")
|
||||
|
||||
# 2. Bootstrap if needed
|
||||
if (training_data is None or len(training_data.get('X', [])) == 0) and bootstrap:
|
||||
@@ -95,12 +122,13 @@ def train_model_task(
|
||||
for i, symbol in enumerate(bootstrap_symbols):
|
||||
pct = 20 + int(40 * (i / len(bootstrap_symbols)))
|
||||
self.update_state(state='PROGRESS', meta={
|
||||
'step': 'fetching',
|
||||
'step': 'bootstrap',
|
||||
'progress': pct,
|
||||
'message': f'Fetching {symbol} data...',
|
||||
'details': {'symbol': symbol}
|
||||
'message': f'Bootstrapping {symbol} ({i+1}/{len(bootstrap_symbols)})...',
|
||||
'details': {'symbol': symbol, 'symbol_index': i+1, 'total_symbols': len(bootstrap_symbols)}
|
||||
})
|
||||
|
||||
logger.info(f"Starting bootstrap for {symbol} (days={bootstrap_days}, timeframe={bootstrap_timeframe})")
|
||||
res = await selector.bootstrap_training_data(
|
||||
symbol=symbol,
|
||||
timeframe=bootstrap_timeframe,
|
||||
@@ -108,6 +136,9 @@ def train_model_task(
|
||||
)
|
||||
if "error" not in res:
|
||||
total_samples += res.get("total_samples", 0)
|
||||
logger.info(f"Bootstrap for {symbol} complete: {res.get('total_samples', 0)} samples")
|
||||
else:
|
||||
logger.warning(f"Bootstrap for {symbol} failed: {res.get('error', 'Unknown error')}")
|
||||
|
||||
if total_samples == 0:
|
||||
raise Exception("Bootstrap failed: No data collected")
|
||||
|
||||
Reference in New Issue
Block a user