Compare commits
3 Commits
1049bc68ab
...
099432bf3f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
099432bf3f | ||
|
|
afe1a17d09 | ||
|
|
cc60da49e7 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -25,6 +25,10 @@ venv/
|
|||||||
env/
|
env/
|
||||||
ENV/
|
ENV/
|
||||||
|
|
||||||
|
# Node modules
|
||||||
|
node_modules/
|
||||||
|
frontend/node_modules/
|
||||||
|
|
||||||
# IDE
|
# IDE
|
||||||
.vscode/
|
.vscode/
|
||||||
.idea/
|
.idea/
|
||||||
|
|||||||
746
frontend/package-lock.json
generated
746
frontend/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
0
scripts/fetch_historical_data.py
Executable file → Normal file
0
scripts/fetch_historical_data.py
Executable file → Normal file
0
scripts/reset_database.py
Executable file → Normal file
0
scripts/reset_database.py
Executable file → Normal file
0
scripts/start_all.sh
Executable file → Normal file
0
scripts/start_all.sh
Executable file → Normal file
@@ -114,6 +114,8 @@ class PerformanceTracker:
|
|||||||
|
|
||||||
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
||||||
stmt = stmt.where(StrategyPerformance.timestamp >= cutoff_date)
|
stmt = stmt.where(StrategyPerformance.timestamp >= cutoff_date)
|
||||||
|
# Limit to prevent excessive queries - if we have lots of data, sample it
|
||||||
|
stmt = stmt.order_by(StrategyPerformance.timestamp.desc()).limit(10000)
|
||||||
|
|
||||||
result = await session.execute(stmt)
|
result = await session.execute(stmt)
|
||||||
records = result.scalars().all()
|
records = result.scalars().all()
|
||||||
@@ -121,22 +123,32 @@ class PerformanceTracker:
|
|||||||
if not records:
|
if not records:
|
||||||
return pd.DataFrame()
|
return pd.DataFrame()
|
||||||
|
|
||||||
# Convert to DataFrame
|
self.logger.info(f"Processing {len(records)} performance records for training data")
|
||||||
data = []
|
|
||||||
for record in records:
|
|
||||||
# Get corresponding market conditions
|
|
||||||
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
|
|
||||||
strategy_name=record.strategy_name,
|
|
||||||
symbol=record.symbol,
|
|
||||||
timeframe=record.timeframe
|
|
||||||
).where(
|
|
||||||
MarketConditionsSnapshot.timestamp <= record.timestamp
|
|
||||||
).order_by(
|
|
||||||
MarketConditionsSnapshot.timestamp.desc()
|
|
||||||
).limit(1)
|
|
||||||
|
|
||||||
snapshot_result = await session.execute(snapshot_stmt)
|
# Convert to DataFrame - optimize by batching snapshot queries
|
||||||
snapshot = snapshot_result.scalar_one_or_none()
|
data = []
|
||||||
|
# Batch snapshot lookups to reduce N+1 query problem
|
||||||
|
snapshot_cache = {}
|
||||||
|
for record in records:
|
||||||
|
cache_key = f"{record.strategy_name}:{record.symbol}:{record.timeframe}:{record.timestamp.date()}"
|
||||||
|
|
||||||
|
if cache_key not in snapshot_cache:
|
||||||
|
# Get corresponding market conditions (only once per day per strategy)
|
||||||
|
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
|
||||||
|
strategy_name=record.strategy_name,
|
||||||
|
symbol=record.symbol,
|
||||||
|
timeframe=record.timeframe
|
||||||
|
).where(
|
||||||
|
MarketConditionsSnapshot.timestamp <= record.timestamp
|
||||||
|
).order_by(
|
||||||
|
MarketConditionsSnapshot.timestamp.desc()
|
||||||
|
).limit(1)
|
||||||
|
|
||||||
|
snapshot_result = await session.execute(snapshot_stmt)
|
||||||
|
snapshot = snapshot_result.scalar_one_or_none()
|
||||||
|
snapshot_cache[cache_key] = snapshot
|
||||||
|
else:
|
||||||
|
snapshot = snapshot_cache[cache_key]
|
||||||
|
|
||||||
row = {
|
row = {
|
||||||
'strategy_name': record.strategy_name,
|
'strategy_name': record.strategy_name,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""ML-based strategy selector for intelligent autopilot."""
|
"""ML-based strategy selector for intelligent autopilot."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from typing import Dict, Any, Optional, List, Tuple
|
from typing import Dict, Any, Optional, List, Tuple
|
||||||
|
|
||||||
from src.core.logger import get_logger
|
from src.core.logger import get_logger
|
||||||
@@ -396,7 +397,8 @@ class StrategySelector:
|
|||||||
|
|
||||||
self.logger.info(f"Available strategies for bootstrap: {self._available_strategies}")
|
self.logger.info(f"Available strategies for bootstrap: {self._available_strategies}")
|
||||||
|
|
||||||
for strategy_name in self._available_strategies:
|
total_strategies = len(self._available_strategies)
|
||||||
|
for strategy_idx, strategy_name in enumerate(self._available_strategies):
|
||||||
try:
|
try:
|
||||||
strategy_class = self.strategy_registry._strategies.get(strategy_name.lower())
|
strategy_class = self.strategy_registry._strategies.get(strategy_name.lower())
|
||||||
if not strategy_class:
|
if not strategy_class:
|
||||||
@@ -409,7 +411,7 @@ class StrategySelector:
|
|||||||
)
|
)
|
||||||
strategy.enabled = True
|
strategy.enabled = True
|
||||||
|
|
||||||
self.logger.info(f"Running backtest for {strategy_name}...")
|
self.logger.info(f"Running backtest for {strategy_name} ({strategy_idx + 1}/{total_strategies})...")
|
||||||
|
|
||||||
# Run backtest
|
# Run backtest
|
||||||
results = await backtest_engine.run_backtest(
|
results = await backtest_engine.run_backtest(
|
||||||
@@ -461,8 +463,17 @@ class StrategySelector:
|
|||||||
last_regime = None
|
last_regime = None
|
||||||
last_sample_idx = -min_spacing # Allow first sample immediately
|
last_sample_idx = -min_spacing # Allow first sample immediately
|
||||||
|
|
||||||
|
# Limit processing to prevent excessive computation
|
||||||
|
# For small datasets (5 days), process all points but yield periodically
|
||||||
|
data_points = len(market_data) - 50
|
||||||
|
self.logger.info(f"Processing {data_points} data points for {strategy_name}...")
|
||||||
|
|
||||||
# Need at least 50 candles for feature calculation
|
# Need at least 50 candles for feature calculation
|
||||||
for i in range(50, len(market_data)):
|
for i in range(50, len(market_data)):
|
||||||
|
# Yield control periodically to prevent blocking (every 10 iterations)
|
||||||
|
if i % 10 == 0:
|
||||||
|
await asyncio.sleep(0) # Yield to event loop
|
||||||
|
|
||||||
sample_data = market_data.iloc[i-50:i]
|
sample_data = market_data.iloc[i-50:i]
|
||||||
conditions = self.market_analyzer.analyze_current_conditions(
|
conditions = self.market_analyzer.analyze_current_conditions(
|
||||||
symbol, timeframe, sample_data
|
symbol, timeframe, sample_data
|
||||||
@@ -505,6 +516,8 @@ class StrategySelector:
|
|||||||
|
|
||||||
last_regime = current_regime
|
last_regime = current_regime
|
||||||
|
|
||||||
|
self.logger.info(f"Recorded {samples_recorded} samples for {strategy_name}")
|
||||||
|
|
||||||
bootstrap_results.append({
|
bootstrap_results.append({
|
||||||
'strategy': strategy_name,
|
'strategy': strategy_name,
|
||||||
'trades_recorded': samples_recorded,
|
'trades_recorded': samples_recorded,
|
||||||
|
|||||||
@@ -373,7 +373,19 @@ class Database:
|
|||||||
if "postgresql://" in db_url and "postgresql+asyncpg://" not in db_url:
|
if "postgresql://" in db_url and "postgresql+asyncpg://" not in db_url:
|
||||||
# This is a naive replacement, in production we should handle this better
|
# This is a naive replacement, in production we should handle this better
|
||||||
db_url = db_url.replace("postgresql://", "postgresql+asyncpg://")
|
db_url = db_url.replace("postgresql://", "postgresql+asyncpg://")
|
||||||
return create_async_engine(db_url, echo=False)
|
# Add connection timeout to prevent hanging
|
||||||
|
# asyncpg connect timeout is set via connect_timeout in connect_args
|
||||||
|
return create_async_engine(
|
||||||
|
db_url,
|
||||||
|
echo=False,
|
||||||
|
connect_args={
|
||||||
|
"server_settings": {"application_name": "crypto_trader"},
|
||||||
|
"timeout": 5, # 5 second connection timeout
|
||||||
|
},
|
||||||
|
pool_pre_ping=True, # Verify connections before using
|
||||||
|
pool_recycle=3600, # Recycle connections after 1 hour
|
||||||
|
pool_timeout=5, # Timeout when getting connection from pool
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unsupported database type: {db_type}. Only 'postgresql' is supported.")
|
raise ValueError(f"Unsupported database type: {db_type}. Only 'postgresql' is supported.")
|
||||||
|
|
||||||
|
|||||||
@@ -41,16 +41,15 @@ app.conf.update(
|
|||||||
task_acks_late=True, # Ensure task is not lost if worker crashes
|
task_acks_late=True, # Ensure task is not lost if worker crashes
|
||||||
)
|
)
|
||||||
|
|
||||||
# Optional: Route specific tasks to specific queues if needed
|
# Queue routing disabled - all tasks go to default 'celery' queue
|
||||||
app.conf.task_routes = {
|
# If you want to use separate queues, start workers with: celery -A src.worker.app worker -Q ml_training,celery
|
||||||
# ML Training tasks - computationally intensive
|
# app.conf.task_routes = {
|
||||||
"src.worker.tasks.train_model_task": {"queue": "ml_training"},
|
# "src.worker.tasks.train_model_task": {"queue": "ml_training"},
|
||||||
"src.worker.tasks.bootstrap_task": {"queue": "ml_training"},
|
# "src.worker.tasks.bootstrap_task": {"queue": "ml_training"},
|
||||||
"src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"},
|
# "src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"},
|
||||||
# Reporting tasks - I/O bound
|
# "src.worker.tasks.generate_report_task": {"queue": "reporting"},
|
||||||
"src.worker.tasks.generate_report_task": {"queue": "reporting"},
|
# "src.worker.tasks.export_data_task": {"queue": "reporting"},
|
||||||
"src.worker.tasks.export_data_task": {"queue": "reporting"},
|
# }
|
||||||
}
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.start()
|
app.start()
|
||||||
|
|||||||
@@ -54,8 +54,24 @@ def train_model_task(
|
|||||||
"""
|
"""
|
||||||
logger.info(f"Starting model training workflow (force={force_retrain}, bootstrap={bootstrap}, symbols={symbols})")
|
logger.info(f"Starting model training workflow (force={force_retrain}, bootstrap={bootstrap}, symbols={symbols})")
|
||||||
|
|
||||||
|
# CRITICAL: Update state IMMEDIATELY before any blocking operations
|
||||||
|
# This must happen in the sync context, not in async_to_sync
|
||||||
|
try:
|
||||||
|
self.update_state(state='PROGRESS', meta={'step': 'init', 'progress': 5, 'message': 'Initializing...'})
|
||||||
|
logger.info("Initial progress state updated")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to update initial state: {e}")
|
||||||
|
# Don't fail the task, just log it
|
||||||
|
|
||||||
async def run_training_workflow():
|
async def run_training_workflow():
|
||||||
try:
|
try:
|
||||||
|
logger.info("Step 1: Resetting singletons...")
|
||||||
|
# Update progress - use try/except in case update_state fails
|
||||||
|
try:
|
||||||
|
self.update_state(state='PROGRESS', meta={'step': 'reset', 'progress': 8, 'message': 'Resetting modules...'})
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
# Force reset singletons to ensure no connection sharing across loops
|
# Force reset singletons to ensure no connection sharing across loops
|
||||||
from src.autopilot import strategy_selector as ss_module
|
from src.autopilot import strategy_selector as ss_module
|
||||||
from src.core import database as db_module
|
from src.core import database as db_module
|
||||||
@@ -72,6 +88,12 @@ def train_model_task(
|
|||||||
import src.autopilot.strategy_selector
|
import src.autopilot.strategy_selector
|
||||||
src.autopilot.strategy_selector._strategy_selector = None
|
src.autopilot.strategy_selector._strategy_selector = None
|
||||||
|
|
||||||
|
logger.info("Step 2: Getting strategy selector...")
|
||||||
|
try:
|
||||||
|
self.update_state(state='PROGRESS', meta={'step': 'get_selector', 'progress': 10, 'message': 'Getting strategy selector...'})
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
selector = get_strategy_selector()
|
selector = get_strategy_selector()
|
||||||
|
|
||||||
# Use passed config or fall back to selector defaults
|
# Use passed config or fall back to selector defaults
|
||||||
@@ -83,8 +105,13 @@ def train_model_task(
|
|||||||
logger.info(f"Config: days={bootstrap_days}, timeframe={bootstrap_timeframe}, min_samples={bootstrap_min_samples}, symbols={bootstrap_symbols}")
|
logger.info(f"Config: days={bootstrap_days}, timeframe={bootstrap_timeframe}, min_samples={bootstrap_min_samples}, symbols={bootstrap_symbols}")
|
||||||
|
|
||||||
# 1. Check existing training data
|
# 1. Check existing training data
|
||||||
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 10, 'message': 'Checking data...'})
|
logger.info("Step 3: Checking existing training data...")
|
||||||
|
try:
|
||||||
|
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 15, 'message': 'Checking training data...'})
|
||||||
|
except:
|
||||||
|
pass
|
||||||
training_data = await selector.performance_tracker.prepare_training_data(min_samples_per_strategy=bootstrap_min_samples)
|
training_data = await selector.performance_tracker.prepare_training_data(min_samples_per_strategy=bootstrap_min_samples)
|
||||||
|
logger.info(f"Training data check complete. Has data: {training_data is not None and len(training_data.get('X', [])) > 0}")
|
||||||
|
|
||||||
# 2. Bootstrap if needed
|
# 2. Bootstrap if needed
|
||||||
if (training_data is None or len(training_data.get('X', [])) == 0) and bootstrap:
|
if (training_data is None or len(training_data.get('X', [])) == 0) and bootstrap:
|
||||||
@@ -95,12 +122,13 @@ def train_model_task(
|
|||||||
for i, symbol in enumerate(bootstrap_symbols):
|
for i, symbol in enumerate(bootstrap_symbols):
|
||||||
pct = 20 + int(40 * (i / len(bootstrap_symbols)))
|
pct = 20 + int(40 * (i / len(bootstrap_symbols)))
|
||||||
self.update_state(state='PROGRESS', meta={
|
self.update_state(state='PROGRESS', meta={
|
||||||
'step': 'fetching',
|
'step': 'bootstrap',
|
||||||
'progress': pct,
|
'progress': pct,
|
||||||
'message': f'Fetching {symbol} data...',
|
'message': f'Bootstrapping {symbol} ({i+1}/{len(bootstrap_symbols)})...',
|
||||||
'details': {'symbol': symbol}
|
'details': {'symbol': symbol, 'symbol_index': i+1, 'total_symbols': len(bootstrap_symbols)}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
logger.info(f"Starting bootstrap for {symbol} (days={bootstrap_days}, timeframe={bootstrap_timeframe})")
|
||||||
res = await selector.bootstrap_training_data(
|
res = await selector.bootstrap_training_data(
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
timeframe=bootstrap_timeframe,
|
timeframe=bootstrap_timeframe,
|
||||||
@@ -108,6 +136,9 @@ def train_model_task(
|
|||||||
)
|
)
|
||||||
if "error" not in res:
|
if "error" not in res:
|
||||||
total_samples += res.get("total_samples", 0)
|
total_samples += res.get("total_samples", 0)
|
||||||
|
logger.info(f"Bootstrap for {symbol} complete: {res.get('total_samples', 0)} samples")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Bootstrap for {symbol} failed: {res.get('error', 'Unknown error')}")
|
||||||
|
|
||||||
if total_samples == 0:
|
if total_samples == 0:
|
||||||
raise Exception("Bootstrap failed: No data collected")
|
raise Exception("Bootstrap failed: No data collected")
|
||||||
|
|||||||
Reference in New Issue
Block a user