Compare commits

...

3 Commits

Author SHA1 Message Date
kfox
099432bf3f Merge remote repository with local changes, keeping local fixes
Some checks failed
Documentation / build-docs (push) Has been cancelled
Tests / test (macos-latest, 3.11) (push) Has been cancelled
Tests / test (macos-latest, 3.12) (push) Has been cancelled
Tests / test (macos-latest, 3.13) (push) Has been cancelled
Tests / test (macos-latest, 3.14) (push) Has been cancelled
Tests / test (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / test (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / test (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / test (ubuntu-latest, 3.14) (push) Has been cancelled
2025-12-26 01:25:30 -05:00
kfox
afe1a17d09 Add node_modules to .gitignore 2025-12-26 01:17:18 -05:00
kfox
cc60da49e7 Local changes: Updated model training, removed debug instrumentation, and configuration improvements 2025-12-26 01:15:43 -05:00
10 changed files with 416 additions and 461 deletions

4
.gitignore vendored
View File

@@ -25,6 +25,10 @@ venv/
env/ env/
ENV/ ENV/
# Node modules
node_modules/
frontend/node_modules/
# IDE # IDE
.vscode/ .vscode/
.idea/ .idea/

File diff suppressed because it is too large Load Diff

0
scripts/fetch_historical_data.py Executable file → Normal file
View File

0
scripts/reset_database.py Executable file → Normal file
View File

0
scripts/start_all.sh Executable file → Normal file
View File

View File

@@ -114,6 +114,8 @@ class PerformanceTracker:
cutoff_date = datetime.utcnow() - timedelta(days=days) cutoff_date = datetime.utcnow() - timedelta(days=days)
stmt = stmt.where(StrategyPerformance.timestamp >= cutoff_date) stmt = stmt.where(StrategyPerformance.timestamp >= cutoff_date)
# Limit to prevent excessive queries - if we have lots of data, sample it
stmt = stmt.order_by(StrategyPerformance.timestamp.desc()).limit(10000)
result = await session.execute(stmt) result = await session.execute(stmt)
records = result.scalars().all() records = result.scalars().all()
@@ -121,22 +123,32 @@ class PerformanceTracker:
if not records: if not records:
return pd.DataFrame() return pd.DataFrame()
# Convert to DataFrame self.logger.info(f"Processing {len(records)} performance records for training data")
# Convert to DataFrame - optimize by batching snapshot queries
data = [] data = []
# Batch snapshot lookups to reduce N+1 query problem
snapshot_cache = {}
for record in records: for record in records:
# Get corresponding market conditions cache_key = f"{record.strategy_name}:{record.symbol}:{record.timeframe}:{record.timestamp.date()}"
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
strategy_name=record.strategy_name,
symbol=record.symbol,
timeframe=record.timeframe
).where(
MarketConditionsSnapshot.timestamp <= record.timestamp
).order_by(
MarketConditionsSnapshot.timestamp.desc()
).limit(1)
snapshot_result = await session.execute(snapshot_stmt) if cache_key not in snapshot_cache:
snapshot = snapshot_result.scalar_one_or_none() # Get corresponding market conditions (only once per day per strategy)
snapshot_stmt = select(MarketConditionsSnapshot).filter_by(
strategy_name=record.strategy_name,
symbol=record.symbol,
timeframe=record.timeframe
).where(
MarketConditionsSnapshot.timestamp <= record.timestamp
).order_by(
MarketConditionsSnapshot.timestamp.desc()
).limit(1)
snapshot_result = await session.execute(snapshot_stmt)
snapshot = snapshot_result.scalar_one_or_none()
snapshot_cache[cache_key] = snapshot
else:
snapshot = snapshot_cache[cache_key]
row = { row = {
'strategy_name': record.strategy_name, 'strategy_name': record.strategy_name,

View File

@@ -1,5 +1,6 @@
"""ML-based strategy selector for intelligent autopilot.""" """ML-based strategy selector for intelligent autopilot."""
import asyncio
from typing import Dict, Any, Optional, List, Tuple from typing import Dict, Any, Optional, List, Tuple
from src.core.logger import get_logger from src.core.logger import get_logger
@@ -396,7 +397,8 @@ class StrategySelector:
self.logger.info(f"Available strategies for bootstrap: {self._available_strategies}") self.logger.info(f"Available strategies for bootstrap: {self._available_strategies}")
for strategy_name in self._available_strategies: total_strategies = len(self._available_strategies)
for strategy_idx, strategy_name in enumerate(self._available_strategies):
try: try:
strategy_class = self.strategy_registry._strategies.get(strategy_name.lower()) strategy_class = self.strategy_registry._strategies.get(strategy_name.lower())
if not strategy_class: if not strategy_class:
@@ -409,7 +411,7 @@ class StrategySelector:
) )
strategy.enabled = True strategy.enabled = True
self.logger.info(f"Running backtest for {strategy_name}...") self.logger.info(f"Running backtest for {strategy_name} ({strategy_idx + 1}/{total_strategies})...")
# Run backtest # Run backtest
results = await backtest_engine.run_backtest( results = await backtest_engine.run_backtest(
@@ -461,8 +463,17 @@ class StrategySelector:
last_regime = None last_regime = None
last_sample_idx = -min_spacing # Allow first sample immediately last_sample_idx = -min_spacing # Allow first sample immediately
# Limit processing to prevent excessive computation
# For small datasets (5 days), process all points but yield periodically
data_points = len(market_data) - 50
self.logger.info(f"Processing {data_points} data points for {strategy_name}...")
# Need at least 50 candles for feature calculation # Need at least 50 candles for feature calculation
for i in range(50, len(market_data)): for i in range(50, len(market_data)):
# Yield control periodically to prevent blocking (every 10 iterations)
if i % 10 == 0:
await asyncio.sleep(0) # Yield to event loop
sample_data = market_data.iloc[i-50:i] sample_data = market_data.iloc[i-50:i]
conditions = self.market_analyzer.analyze_current_conditions( conditions = self.market_analyzer.analyze_current_conditions(
symbol, timeframe, sample_data symbol, timeframe, sample_data
@@ -505,6 +516,8 @@ class StrategySelector:
last_regime = current_regime last_regime = current_regime
self.logger.info(f"Recorded {samples_recorded} samples for {strategy_name}")
bootstrap_results.append({ bootstrap_results.append({
'strategy': strategy_name, 'strategy': strategy_name,
'trades_recorded': samples_recorded, 'trades_recorded': samples_recorded,

View File

@@ -373,7 +373,19 @@ class Database:
if "postgresql://" in db_url and "postgresql+asyncpg://" not in db_url: if "postgresql://" in db_url and "postgresql+asyncpg://" not in db_url:
# This is a naive replacement, in production we should handle this better # This is a naive replacement, in production we should handle this better
db_url = db_url.replace("postgresql://", "postgresql+asyncpg://") db_url = db_url.replace("postgresql://", "postgresql+asyncpg://")
return create_async_engine(db_url, echo=False) # Add connection timeout to prevent hanging
# asyncpg connect timeout is set via connect_timeout in connect_args
return create_async_engine(
db_url,
echo=False,
connect_args={
"server_settings": {"application_name": "crypto_trader"},
"timeout": 5, # 5 second connection timeout
},
pool_pre_ping=True, # Verify connections before using
pool_recycle=3600, # Recycle connections after 1 hour
pool_timeout=5, # Timeout when getting connection from pool
)
else: else:
raise ValueError(f"Unsupported database type: {db_type}. Only 'postgresql' is supported.") raise ValueError(f"Unsupported database type: {db_type}. Only 'postgresql' is supported.")

View File

@@ -41,16 +41,15 @@ app.conf.update(
task_acks_late=True, # Ensure task is not lost if worker crashes task_acks_late=True, # Ensure task is not lost if worker crashes
) )
# Optional: Route specific tasks to specific queues if needed # Queue routing disabled - all tasks go to default 'celery' queue
app.conf.task_routes = { # If you want to use separate queues, start workers with: celery -A src.worker.app worker -Q ml_training,celery
# ML Training tasks - computationally intensive # app.conf.task_routes = {
"src.worker.tasks.train_model_task": {"queue": "ml_training"}, # "src.worker.tasks.train_model_task": {"queue": "ml_training"},
"src.worker.tasks.bootstrap_task": {"queue": "ml_training"}, # "src.worker.tasks.bootstrap_task": {"queue": "ml_training"},
"src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"}, # "src.worker.tasks.optimize_strategy_task": {"queue": "ml_training"},
# Reporting tasks - I/O bound # "src.worker.tasks.generate_report_task": {"queue": "reporting"},
"src.worker.tasks.generate_report_task": {"queue": "reporting"}, # "src.worker.tasks.export_data_task": {"queue": "reporting"},
"src.worker.tasks.export_data_task": {"queue": "reporting"}, # }
}
if __name__ == "__main__": if __name__ == "__main__":
app.start() app.start()

View File

@@ -54,8 +54,24 @@ def train_model_task(
""" """
logger.info(f"Starting model training workflow (force={force_retrain}, bootstrap={bootstrap}, symbols={symbols})") logger.info(f"Starting model training workflow (force={force_retrain}, bootstrap={bootstrap}, symbols={symbols})")
# CRITICAL: Update state IMMEDIATELY before any blocking operations
# This must happen in the sync context, not in async_to_sync
try:
self.update_state(state='PROGRESS', meta={'step': 'init', 'progress': 5, 'message': 'Initializing...'})
logger.info("Initial progress state updated")
except Exception as e:
logger.error(f"Failed to update initial state: {e}")
# Don't fail the task, just log it
async def run_training_workflow(): async def run_training_workflow():
try: try:
logger.info("Step 1: Resetting singletons...")
# Update progress - use try/except in case update_state fails
try:
self.update_state(state='PROGRESS', meta={'step': 'reset', 'progress': 8, 'message': 'Resetting modules...'})
except:
pass
# Force reset singletons to ensure no connection sharing across loops # Force reset singletons to ensure no connection sharing across loops
from src.autopilot import strategy_selector as ss_module from src.autopilot import strategy_selector as ss_module
from src.core import database as db_module from src.core import database as db_module
@@ -72,6 +88,12 @@ def train_model_task(
import src.autopilot.strategy_selector import src.autopilot.strategy_selector
src.autopilot.strategy_selector._strategy_selector = None src.autopilot.strategy_selector._strategy_selector = None
logger.info("Step 2: Getting strategy selector...")
try:
self.update_state(state='PROGRESS', meta={'step': 'get_selector', 'progress': 10, 'message': 'Getting strategy selector...'})
except:
pass
selector = get_strategy_selector() selector = get_strategy_selector()
# Use passed config or fall back to selector defaults # Use passed config or fall back to selector defaults
@@ -83,8 +105,13 @@ def train_model_task(
logger.info(f"Config: days={bootstrap_days}, timeframe={bootstrap_timeframe}, min_samples={bootstrap_min_samples}, symbols={bootstrap_symbols}") logger.info(f"Config: days={bootstrap_days}, timeframe={bootstrap_timeframe}, min_samples={bootstrap_min_samples}, symbols={bootstrap_symbols}")
# 1. Check existing training data # 1. Check existing training data
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 10, 'message': 'Checking data...'}) logger.info("Step 3: Checking existing training data...")
try:
self.update_state(state='PROGRESS', meta={'step': 'check_data', 'progress': 15, 'message': 'Checking training data...'})
except:
pass
training_data = await selector.performance_tracker.prepare_training_data(min_samples_per_strategy=bootstrap_min_samples) training_data = await selector.performance_tracker.prepare_training_data(min_samples_per_strategy=bootstrap_min_samples)
logger.info(f"Training data check complete. Has data: {training_data is not None and len(training_data.get('X', [])) > 0}")
# 2. Bootstrap if needed # 2. Bootstrap if needed
if (training_data is None or len(training_data.get('X', [])) == 0) and bootstrap: if (training_data is None or len(training_data.get('X', [])) == 0) and bootstrap:
@@ -95,12 +122,13 @@ def train_model_task(
for i, symbol in enumerate(bootstrap_symbols): for i, symbol in enumerate(bootstrap_symbols):
pct = 20 + int(40 * (i / len(bootstrap_symbols))) pct = 20 + int(40 * (i / len(bootstrap_symbols)))
self.update_state(state='PROGRESS', meta={ self.update_state(state='PROGRESS', meta={
'step': 'fetching', 'step': 'bootstrap',
'progress': pct, 'progress': pct,
'message': f'Fetching {symbol} data...', 'message': f'Bootstrapping {symbol} ({i+1}/{len(bootstrap_symbols)})...',
'details': {'symbol': symbol} 'details': {'symbol': symbol, 'symbol_index': i+1, 'total_symbols': len(bootstrap_symbols)}
}) })
logger.info(f"Starting bootstrap for {symbol} (days={bootstrap_days}, timeframe={bootstrap_timeframe})")
res = await selector.bootstrap_training_data( res = await selector.bootstrap_training_data(
symbol=symbol, symbol=symbol,
timeframe=bootstrap_timeframe, timeframe=bootstrap_timeframe,
@@ -108,6 +136,9 @@ def train_model_task(
) )
if "error" not in res: if "error" not in res:
total_samples += res.get("total_samples", 0) total_samples += res.get("total_samples", 0)
logger.info(f"Bootstrap for {symbol} complete: {res.get('total_samples', 0)} samples")
else:
logger.warning(f"Bootstrap for {symbol} failed: {res.get('error', 'Unknown error')}")
if total_samples == 0: if total_samples == 0:
raise Exception("Bootstrap failed: No data collected") raise Exception("Bootstrap failed: No data collected")