MetaTrader Portfolio Optimizer: Quantitative Trading MCP Server

This article presents a comprehensive Python MCP (Model Context Protocol) server that integrates with MetaTrader to provide advanced portfolio optimization, risk management, and automated trading capabilities. The system implements concepts from efficient_frontier, CAGR, Markowitz optimization, sharpe_ratio, and risk_parity to deliver institutional-grade portfolio management.

System Architecture

The MCP server provides:

Installation and Dependencies

# requirements.txt
mcp>=1.0.0
MetaTrader5>=5.0.4500
numpy>=1.24.0
pandas>=2.0.0
scipy>=1.10.0
pytz>=2023.3
plotly>=5.15.0
python-dateutil>=2.8.0
asyncio-mqtt>=0.16.0

Complete MCP Server Implementation

#!/usr/bin/env python3
"""
MetaTrader Portfolio Optimizer MCP Server
Implements efficient frontier, CAGR optimization, and risk management
"""
 
import asyncio
import json
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, asdict
from scipy.optimize import minimize
import MetaTrader5 as mt5
import pytz
from itertools import product
 
# FastMCP Server imports
from fastmcp import FastMCP
 
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
@dataclass
class PortfolioConfig:
    """Configuration parameters for portfolio optimization"""
    max_position_size: float = 0.20  # Maximum 20% in any single position
    min_position_size: float = 0.01  # Minimum 1% in any position
    rebalance_threshold: float = 0.05  # Rebalance when 5% drift
    max_volatility: float = 0.15  # Maximum 15% annual volatility
    target_return: float = 0.12  # Target 12% annual return
    risk_free_rate: float = 0.03  # 3% risk-free rate
    lookback_days: int = 252  # 1 year lookback for calculations
    min_sharpe_ratio: float = 0.5  # Minimum acceptable Sharpe ratio
    max_drawdown: float = 0.10  # Maximum 10% drawdown tolerance
    leverage_limit: float = 1.0  # No leverage by default
    transaction_cost: float = 0.001  # 0.1% transaction cost
    # Configurable symbol pairs (Brazilian Bovespa stocks)
    common_pairs: List[str] = None
    
    def __post_init__(self):
        if self.common_pairs is None:
            self.common_pairs = [
                'PETR4.SA',   # Petrobras
                'VALE3.SA',   # Vale
                'ITUB4.SA',   # Itaú Unibanco
                'BBDC4.SA',   # Bradesco
                'ABEV3.SA',   # Ambev
                'MGLU3.SA',   # Magazine Luiza
                'WEGE3.SA',   # WEG
                'RENT3.SA',   # Localiza
                'LREN3.SA',   # Lojas Renner
                'GGBR4.SA'    # Gerdau
            ]
 
@dataclass
class PortfolioMetrics:
    """Portfolio performance and risk metrics"""
    total_value: float
    available_cash: float
    invested_amount: float
    daily_return: float
    cagr: float
    volatility: float
    sharpe_ratio: float
    max_drawdown: float
    current_drawdown: float
    positions: Dict[str, float]
    weights: Dict[str, float]
    
class EfficientFrontier:
    """Efficient Frontier calculation and optimization"""
    
    def __init__(self, returns: pd.DataFrame, config: PortfolioConfig):
        self.returns = returns
        self.config = config
        self.mean_returns = returns.mean() * 252  # Annualized
        self.cov_matrix = returns.cov() * 252  # Annualized
        self.assets = list(returns.columns)
        self.n_assets = len(self.assets)
    
    def calculate_portfolio_stats(self, weights: np.ndarray) -> Tuple[float, float, float]:
        """Calculate portfolio return, volatility, and Sharpe ratio"""
        portfolio_return = np.sum(self.mean_returns * weights)
        portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix, weights)))
        sharpe_ratio = (portfolio_return - self.config.risk_free_rate) / portfolio_volatility
        return portfolio_return, portfolio_volatility, sharpe_ratio
    
    def efficient_frontier_curve(self, num_points: int = 100) -> pd.DataFrame:
        """Generate efficient frontier curve"""
        target_returns = np.linspace(
            self.mean_returns.min(), 
            self.mean_returns.max(), 
            num_points
        )
        
        efficient_portfolios = []
        
        for target in target_returns:
            try:
                weights = self.minimize_volatility_for_return(target)
                if weights is not None:
                    ret, vol, sharpe = self.calculate_portfolio_stats(weights)
                    efficient_portfolios.append({
                        'return': ret,
                        'volatility': vol,
                        'sharpe_ratio': sharpe,
                        'weights': weights.tolist()
                    })
            except:
                continue
        
        return pd.DataFrame(efficient_portfolios)
    
    def minimize_volatility_for_return(self, target_return: float) -> Optional[np.ndarray]:
        """Find minimum volatility portfolio for target return"""
        constraints = [
            {'type': 'eq', 'fun': lambda x: np.sum(x) - 1},  # Weights sum to 1
            {'type': 'eq', 'fun': lambda x: np.sum(self.mean_returns * x) - target_return}
        ]
        
        bounds = [(self.config.min_position_size, self.config.max_position_size) 
                 for _ in range(self.n_assets)]
        
        initial_guess = np.array([1/self.n_assets] * self.n_assets)
        
        def objective(weights):
            return np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix, weights)))
        
        result = minimize(objective, initial_guess, method='SLSQP', 
                         bounds=bounds, constraints=constraints)
        
        return result.x if result.success else None
    
    def maximize_sharpe_ratio(self) -> Tuple[np.ndarray, float]:
        """Find portfolio with maximum Sharpe ratio"""
        constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}]
        bounds = [(self.config.min_position_size, self.config.max_position_size) 
                 for _ in range(self.n_assets)]
        
        def negative_sharpe(weights):
            ret, vol, sharpe = self.calculate_portfolio_stats(weights)
            return -sharpe  # Minimize negative Sharpe = maximize Sharpe
        
        initial_guess = np.array([1/self.n_assets] * self.n_assets)
        
        result = minimize(negative_sharpe, initial_guess, method='SLSQP',
                         bounds=bounds, constraints=constraints)
        
        if result.success:
            optimal_weights = result.x
            _, _, sharpe = self.calculate_portfolio_stats(optimal_weights)
            return optimal_weights, sharpe
        else:
            return np.array([1/self.n_assets] * self.n_assets), 0.0
 
class RiskManager:
    """Risk management system with multiple risk controls"""
    
    def __init__(self, config: PortfolioConfig):
        self.config = config
        self.position_history = []
        self.returns_history = []
    
    def calculate_var(self, returns: pd.Series, confidence: float = 0.05) -> float:
        """Calculate Value at Risk"""
        return np.percentile(returns, confidence * 100)
    
    def calculate_max_drawdown(self, equity_curve: pd.Series) -> Tuple[float, float]:
        """Calculate maximum and current drawdown"""
        peak = equity_curve.expanding().max()
        drawdown = (equity_curve - peak) / peak
        max_dd = drawdown.min()
        current_dd = drawdown.iloc[-1] if len(drawdown) > 0 else 0
        return abs(max_dd), abs(current_dd)
    
    def check_risk_limits(self, portfolio_metrics: PortfolioMetrics) -> Dict[str, Any]:
        """Check all risk limits and return violations"""
        violations = {}
        
        # Volatility check
        if portfolio_metrics.volatility > self.config.max_volatility:
            violations['volatility'] = {
                'current': portfolio_metrics.volatility,
                'limit': self.config.max_volatility,
                'action': 'reduce_risk'
            }
        
        # Drawdown check
        if portfolio_metrics.current_drawdown > self.config.max_drawdown:
            violations['drawdown'] = {
                'current': portfolio_metrics.current_drawdown,
                'limit': self.config.max_drawdown,
                'action': 'stop_loss'
            }
        
        # Sharpe ratio check
        if portfolio_metrics.sharpe_ratio < self.config.min_sharpe_ratio:
            violations['sharpe'] = {
                'current': portfolio_metrics.sharpe_ratio,
                'limit': self.config.min_sharpe_ratio,
                'action': 'reoptimize'
            }
        
        # Position size check
        for symbol, weight in portfolio_metrics.weights.items():
            if weight > self.config.max_position_size:
                violations[f'position_{symbol}'] = {
                    'current': weight,
                    'limit': self.config.max_position_size,
                    'action': 'reduce_position'
                }
        
        return violations
 
class MetaTraderInterface:
    """Interface to MetaTrader 5 platform"""
    
    def __init__(self):
        self.connected = False
        self.account_info = None
        self.timezone = pytz.timezone("Brazil/West")  # Configure based on your broker
        self.max_days_m1 = 58  # MT5 limit for M1 timeframe
    
    async def initialize(self) -> bool:
        """Initialize MetaTrader connection"""
        try:
            if not mt5.initialize():
                logger.error("MetaTrader5 initialization failed")
                return False
            
            self.account_info = mt5.account_info()
            if self.account_info is None:
                logger.error("Failed to get account info")
                return False
            
            self.connected = True
            logger.info(f"Connected to MT5 account: {self.account_info.login}")
            return True
        except Exception as e:
            logger.error(f"MT5 initialization error: {e}")
            return False
    
    def get_account_balance(self) -> Dict[str, float]:
        """Get current account balance and equity"""
        if not self.connected:
            return {'balance': 0, 'equity': 0, 'margin': 0, 'free_margin': 0}
        
        info = mt5.account_info()
        return {
            'balance': float(info.balance),
            'equity': float(info.equity),
            'margin': float(info.margin),
            'free_margin': float(info.margin_free)
        }
    
    def get_positions(self) -> List[Dict]:
        """Get all open positions"""
        if not self.connected:
            return []
        
        positions = mt5.positions_get()
        if positions is None:
            return []
        
        return [
            {
                'symbol': pos.symbol,
                'volume': pos.volume,
                'type': 'BUY' if pos.type == 0 else 'SELL',
                'price': pos.price_open,
                'current_price': pos.price_current,
                'profit': pos.profit,
                'swap': pos.swap,
                'commission': pos.commission
            }
            for pos in positions
        ]
    
    def get_ticks(self, symbol: str, start: datetime, end: datetime) -> List[Tuple]:
        """Get tick data for a specific time range"""
        try:
            ticks = mt5.copy_ticks_range(symbol, start, end, mt5.COPY_TICKS_TRADE)
            if ticks is None:
                logger.warning(f"No tick data for {symbol} from {start} to {end}")
                return []
            
            t_df = pd.DataFrame(ticks).drop_duplicates(subset='time', keep="last")
            t_list = []
            for row in t_df.itertuples(index=False):
                t_list.append(row)
            return t_list
        except Exception as e:
            logger.error(f"Error getting tick data for {symbol}: {e}")
            return []
    
    def get_m1_rates_paginated(self, symbol: str, pages: int) -> List[Tuple]:
        """Get M1 rates data with pagination to handle MT5 limits"""
        try:
            all_data = []
            
            for i in range(pages, 0, -1):
                now = datetime.now()
                now = now - timedelta(days=1)  # Avoid incomplete current day
                inc_d = 1 if i-1 == 0 else 0
                start_now_minus = now - timedelta(days=self.max_days_m1*i) 
                end_now_minus = now - timedelta(days=self.max_days_m1*(i-1))
                start = datetime(start_now_minus.year, start_now_minus.month, 
                               start_now_minus.day, tzinfo=self.timezone)
                end = datetime(end_now_minus.year, end_now_minus.month, 
                             end_now_minus.day + inc_d, tzinfo=self.timezone)
                
                logger.info(f'Fetching {symbol} data: start={start}, end={end}')
                
                tick_data = self.get_ticks(symbol, start, end)
                if tick_data:
                    all_data.extend(tick_data)
            
            return all_data
        except Exception as e:
            logger.error(f"Error getting paginated data for {symbol}: {e}")
            return []
    
    def get_historical_data(self, symbol: str, timeframe: int, count: int) -> pd.DataFrame:
        """Get historical price data using MT5 rates or ticks"""
        try:
            # For high-frequency data, use tick-based approach
            if timeframe == mt5.TIMEFRAME_M1 and count > 1000:
                pages = max(1, count // (self.max_days_m1 * 24 * 60))  # Estimate pages needed
                tick_data = self.get_m1_rates_paginated(symbol, pages)
                
                if not tick_data:
                    return pd.DataFrame()
                
                # Convert tick data to DataFrame
                df_data = []
                for tick in tick_data:
                    df_data.append({
                        'time': pd.to_datetime(tick.time, unit='s'),
                        'bid': tick.bid,
                        'ask': tick.ask,
                        'last': tick.last,
                        'volume': tick.volume
                    })
                
                if not df_data:
                    return pd.DataFrame()
                
                df = pd.DataFrame(df_data)
                df.set_index('time', inplace=True)
                
                # Use 'last' price if available, otherwise average of bid/ask
                df['close'] = df['last'].fillna((df['bid'] + df['ask']) / 2)
                return df[['close']].dropna()
            
            else:
                # Use standard rates for other timeframes or smaller datasets
                rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, count)
                if rates is None:
                    logger.warning(f"No rates data for {symbol}")
                    return pd.DataFrame()
                
                df = pd.DataFrame(rates)
                df['time'] = pd.to_datetime(df['time'], unit='s')
                df.set_index('time', inplace=True)
                return df[['close']]  # Return only close prices
                
        except Exception as e:
            logger.error(f"Error getting historical data for {symbol}: {e}")
            return pd.DataFrame()
    
    def get_historical_data_range(self, symbol: str, start_date: datetime, 
                                 end_date: datetime, timeframe: int = mt5.TIMEFRAME_H1) -> pd.DataFrame:
        """Get historical data for a specific date range"""
        try:
            # Convert to timezone-aware datetimes
            if start_date.tzinfo is None:
                start_date = start_date.replace(tzinfo=self.timezone)
            if end_date.tzinfo is None:
                end_date = end_date.replace(tzinfo=self.timezone)
            
            rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date)
            if rates is None:
                logger.warning(f"No data for {symbol} from {start_date} to {end_date}")
                return pd.DataFrame()
            
            df = pd.DataFrame(rates)
            df['time'] = pd.to_datetime(df['time'], unit='s')
            df.set_index('time', inplace=True)
            return df[['close']]
            
        except Exception as e:
            logger.error(f"Error getting range data for {symbol}: {e}")
            return pd.DataFrame()
    
    async def place_order(self, symbol: str, order_type: str, volume: float, 
                         price: Optional[float] = None) -> Dict:
        """Place trading order"""
        try:
            symbol_info = mt5.symbol_info(symbol)
            if symbol_info is None:
                return {'success': False, 'error': f'Symbol {symbol} not found'}
            
            if not symbol_info.visible:
                mt5.symbol_select(symbol, True)
            
            # Prepare order request
            request = {
                "action": mt5.TRADE_ACTION_DEAL,
                "symbol": symbol,
                "volume": volume,
                "type_filling": mt5.ORDER_FILLING_IOC,
                "comment": "Portfolio Optimizer",
            }
            
            if order_type.upper() == 'BUY':
                request["type"] = mt5.ORDER_TYPE_BUY
                request["price"] = mt5.symbol_info_tick(symbol).ask if price is None else price
            else:
                request["type"] = mt5.ORDER_TYPE_SELL
                request["price"] = mt5.symbol_info_tick(symbol).bid if price is None else price
            
            result = mt5.order_send(request)
            
            if result.retcode != mt5.TRADE_RETCODE_DONE:
                return {
                    'success': False, 
                    'error': f'Order failed: {result.comment}',
                    'retcode': result.retcode
                }
            
            return {
                'success': True,
                'ticket': result.order,
                'volume': result.volume,
                'price': result.price
            }
            
        except Exception as e:
            return {'success': False, 'error': str(e)}
 
class PortfolioOptimizer:
    """Main portfolio optimization and management system"""
    
    def __init__(self, config: PortfolioConfig):
        self.config = config
        self.mt5 = MetaTraderInterface()
        self.risk_manager = RiskManager(config)
        self.last_rebalance = None
        self.symbols = []  # Will be populated from MT5
        
    async def initialize(self):
        """Initialize the optimizer"""
        await self.mt5.initialize()
        self.symbols = self._get_tradeable_symbols()
        logger.info(f"Initialized with {len(self.symbols)} symbols")
    
    def _get_tradeable_symbols(self) -> List[str]:
        """Get list of tradeable symbols from MT5"""
        available_symbols = []
        for symbol in self.config.common_pairs:
            symbol_info = mt5.symbol_info(symbol)
            if symbol_info is not None and symbol_info.visible:
                available_symbols.append(symbol)
        
        return available_symbols[:5]  # Limit to 5 symbols for demo
    
    def calculate_returns_matrix(self) -> pd.DataFrame:
        """Calculate returns matrix for all symbols"""
        returns_data = {}
        
        for symbol in self.symbols:
            df = self.mt5.get_historical_data(symbol, mt5.TIMEFRAME_H1, 
                                            self.config.lookback_days * 24)
            if not df.empty:
                returns = df['close'].pct_change().dropna()
                returns_data[symbol] = returns
        
        if not returns_data:
            return pd.DataFrame()
        
        # Align all return series
        returns_df = pd.DataFrame(returns_data)
        returns_df = returns_df.dropna()
        
        return returns_df
    
    def calculate_cagr(self, equity_curve: pd.Series) -> float:
        """Calculate Compound Annual Growth Rate"""
        if len(equity_curve) < 2:
            return 0.0
        
        start_value = equity_curve.iloc[0]
        end_value = equity_curve.iloc[-1]
        years = len(equity_curve) / (252 * 24)  # Assuming hourly data
        
        if start_value <= 0 or years <= 0:
            return 0.0
        
        cagr = (end_value / start_value) ** (1 / years) - 1
        return cagr
    
    async def get_portfolio_metrics(self) -> PortfolioMetrics:
        """Calculate comprehensive portfolio metrics"""
        account_info = self.mt5.get_account_balance()
        positions = self.mt5.get_positions()
        
        total_value = account_info['equity']
        available_cash = account_info['free_margin']
        
        # Calculate position values and weights
        position_values = {}
        total_invested = 0
        
        for pos in positions:
            value = pos['volume'] * pos['current_price'] * \
                   (1 if pos['type'] == 'BUY' else -1)
            position_values[pos['symbol']] = value
            total_invested += abs(value)
        
        weights = {symbol: value / total_value if total_value > 0 else 0 
                  for symbol, value in position_values.items()}
        
        # Calculate returns for metrics
        returns_df = self.calculate_returns_matrix()
        if returns_df.empty:
            return PortfolioMetrics(
                total_value=total_value,
                available_cash=available_cash,
                invested_amount=total_invested,
                daily_return=0.0,
                cagr=0.0,
                volatility=0.0,
                sharpe_ratio=0.0,
                max_drawdown=0.0,
                current_drawdown=0.0,
                positions=position_values,
                weights=weights
            )
        
        # Portfolio performance calculations
        portfolio_weights = np.array([weights.get(symbol, 0) for symbol in returns_df.columns])
        portfolio_returns = returns_df.dot(portfolio_weights)
        
        daily_return = portfolio_returns.iloc[-1] if len(portfolio_returns) > 0 else 0
        cagr = self.calculate_cagr(portfolio_returns.cumsum())
        volatility = portfolio_returns.std() * np.sqrt(252 * 24)  # Annualized
        
        # Sharpe ratio
        excess_returns = portfolio_returns - self.config.risk_free_rate / (252 * 24)
        sharpe_ratio = excess_returns.mean() / excess_returns.std() * np.sqrt(252 * 24) \
                      if excess_returns.std() > 0 else 0
        
        # Drawdown calculations
        equity_curve = (1 + portfolio_returns).cumprod()
        max_dd, current_dd = self.risk_manager.calculate_max_drawdown(equity_curve)
        
        return PortfolioMetrics(
            total_value=total_value,
            available_cash=available_cash,
            invested_amount=total_invested,
            daily_return=daily_return,
            cagr=cagr,
            volatility=volatility,
            sharpe_ratio=sharpe_ratio,
            max_drawdown=max_dd,
            current_drawdown=current_dd,
            positions=position_values,
            weights=weights
        )
    
    async def optimize_portfolio(self) -> Dict[str, float]:
        """Calculate optimal portfolio weights using efficient frontier"""
        returns_df = self.calculate_returns_matrix()
        if returns_df.empty:
            return {}
        
        ef = EfficientFrontier(returns_df, self.config)
        optimal_weights, max_sharpe = ef.maximize_sharpe_ratio()
        
        # Convert to dictionary
        optimal_allocation = {
            symbol: weight for symbol, weight in zip(self.symbols, optimal_weights)
            if weight >= self.config.min_position_size
        }
        
        logger.info(f"Optimal portfolio (Sharpe: {max_sharpe:.3f}): {optimal_allocation}")
        return optimal_allocation
    
    async def preview_rebalance(self, target_weights: Dict[str, float]) -> Dict[str, Any]:
        """Preview rebalancing changes without executing trades"""
        current_metrics = await self.get_portfolio_metrics()
        account_info = self.mt5.get_account_balance()
        
        rebalance_trades = []
        total_value = account_info['equity']
        
        # Calculate required position changes
        for symbol, target_weight in target_weights.items():
            current_weight = current_metrics.weights.get(symbol, 0)
            weight_diff = target_weight - current_weight
            
            # Only rebalance if difference exceeds threshold
            if abs(weight_diff) > self.config.rebalance_threshold:
                target_value = target_weight * total_value
                current_value = current_metrics.positions.get(symbol, 0)
                value_change = target_value - current_value
                
                # Convert to volume (simplified - assumes 1:1 ratio)
                volume = abs(value_change) / 100000  # Standard lot size
                order_type = 'BUY' if value_change > 0 else 'SELL'
                
                if volume >= 0.01:  # Minimum volume
                    rebalance_trades.append({
                        'symbol': symbol,
                        'type': order_type,
                        'volume': round(volume, 2),
                        'weight_change': weight_diff,
                        'current_weight': current_weight,
                        'target_weight': target_weight,
                        'value_change': value_change
                    })
        
        return {
            'preview_time': datetime.now().isoformat(),
            'total_portfolio_value': total_value,
            'trades_planned': len(rebalance_trades),
            'estimated_transaction_costs': len(rebalance_trades) * self.config.transaction_cost * total_value,
            'rebalance_details': rebalance_trades,
            'summary': {
                'symbols_to_rebalance': len(rebalance_trades),
                'total_weight_change': sum(abs(trade['weight_change']) for trade in rebalance_trades),
                'largest_weight_change': max((abs(trade['weight_change']) for trade in rebalance_trades), default=0)
            }
        }
    
    async def rebalance_portfolio(self, target_weights: Dict[str, float], execute: bool = True) -> Dict[str, Any]:
        """Rebalance portfolio to target weights with optional execution"""
        # Always show preview first
        preview_result = await self.preview_rebalance(target_weights)
        
        if not execute:
            return {
                'preview_only': True,
                **preview_result
            }
        
        # Execute trades if requested
        current_metrics = await self.get_portfolio_metrics()
        account_info = self.mt5.get_account_balance()
        
        rebalance_trades = []
        total_value = account_info['equity']
        
        # Calculate required position changes
        for symbol, target_weight in target_weights.items():
            current_weight = current_metrics.weights.get(symbol, 0)
            weight_diff = target_weight - current_weight
            
            # Only rebalance if difference exceeds threshold
            if abs(weight_diff) > self.config.rebalance_threshold:
                target_value = target_weight * total_value
                current_value = current_metrics.positions.get(symbol, 0)
                value_change = target_value - current_value
                
                # Convert to volume (simplified - assumes 1:1 ratio)
                volume = abs(value_change) / 100000  # Standard lot size
                order_type = 'BUY' if value_change > 0 else 'SELL'
                
                if volume >= 0.01:  # Minimum volume
                    rebalance_trades.append({
                        'symbol': symbol,
                        'type': order_type,
                        'volume': round(volume, 2),
                        'weight_change': weight_diff
                    })
        
        # Execute trades
        execution_results = []
        for trade in rebalance_trades:
            result = await self.mt5.place_order(
                trade['symbol'], 
                trade['type'], 
                trade['volume']
            )
            execution_results.append({
                'trade': trade,
                'result': result
            })
        
        self.last_rebalance = datetime.now()
        
        return {
            'preview': preview_result,
            'execution': {
                'rebalance_time': self.last_rebalance.isoformat(),
                'trades_planned': len(rebalance_trades),
                'trades_executed': sum(1 for r in execution_results if r['result']['success']),
                'execution_details': execution_results
            }
        }
 
# FastMCP Server Implementation
class PortfolioMCPServer:
    """FastMCP Server for portfolio optimization"""
    
    def __init__(self):
        self.mcp = FastMCP("portfolio-optimizer")
        self.config = PortfolioConfig()
        self.optimizer = None
        self._setup_handlers()
    
    def _setup_handlers(self):
        """Setup FastMCP server handlers"""
        
        @self.mcp.resource("portfolio://status")
        async def portfolio_status() -> dict:
            """Current portfolio metrics and analysis"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            metrics = await self.optimizer.get_portfolio_metrics()
            risk_check = self.optimizer.risk_manager.check_risk_limits(metrics)
            
            return {
                "portfolio_metrics": asdict(metrics),
                "risk_violations": risk_check,
                "last_update": datetime.now().isoformat()
            }
        
        @self.mcp.resource("portfolio://efficient-frontier")
        async def efficient_frontier() -> dict:
            """Efficient frontier analysis and optimization"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            returns_df = self.optimizer.calculate_returns_matrix()
            if returns_df.empty:
                return {"error": "No data available"}
            
            ef = EfficientFrontier(returns_df, self.config)
            frontier_data = ef.efficient_frontier_curve()
            optimal_weights, max_sharpe = ef.maximize_sharpe_ratio()
            
            return {
                "efficient_frontier": frontier_data.to_dict('records'),
                "optimal_portfolio": {
                    "weights": dict(zip(self.optimizer.symbols, optimal_weights)),
                    "sharpe_ratio": max_sharpe
                },
                "current_symbols": self.optimizer.symbols
            }
        
        @self.mcp.resource("portfolio://config")
        async def portfolio_config() -> dict:
            """Portfolio optimization parameters"""
            return asdict(self.config)
        
        @self.mcp.tool()
        async def optimize_portfolio() -> dict:
            """Calculate optimal portfolio allocation using efficient frontier"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            optimal_weights = await self.optimizer.optimize_portfolio()
            current_metrics = await self.optimizer.get_portfolio_metrics()
            
            return {
                "optimal_allocation": optimal_weights,
                "current_allocation": current_metrics.weights,
                "rebalancing_needed": self._check_rebalancing_needed(
                    current_metrics.weights, optimal_weights
                ),
                "expected_improvement": self._calculate_improvement(
                    current_metrics.weights, optimal_weights
                )
            }
        
        @self.mcp.tool()
        async def preview_rebalance() -> dict:
            """Preview portfolio rebalancing without executing trades"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            optimal_weights = await self.optimizer.optimize_portfolio()
            preview_result = await self.optimizer.preview_rebalance(optimal_weights)
            
            return preview_result
        
        @self.mcp.tool()
        async def rebalance_portfolio(execute: bool = False) -> dict:
            """Rebalance portfolio to optimal weights with optional execution"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            optimal_weights = await self.optimizer.optimize_portfolio()
            rebalance_result = await self.optimizer.rebalance_portfolio(optimal_weights, execute)
            
            return rebalance_result
        
        @self.mcp.tool()
        async def update_config(
            max_volatility: float = None,
            target_return: float = None,
            max_position_size: float = None,
            rebalance_threshold: float = None,
            risk_free_rate: float = None,
            common_pairs: List[str] = None
        ) -> dict:
            """Update portfolio optimization parameters"""
            updates = {}
            
            if max_volatility is not None:
                self.config.max_volatility = max_volatility
                updates['max_volatility'] = max_volatility
            if target_return is not None:
                self.config.target_return = target_return
                updates['target_return'] = target_return
            if max_position_size is not None:
                self.config.max_position_size = max_position_size
                updates['max_position_size'] = max_position_size
            if rebalance_threshold is not None:
                self.config.rebalance_threshold = rebalance_threshold
                updates['rebalance_threshold'] = rebalance_threshold
            if risk_free_rate is not None:
                self.config.risk_free_rate = risk_free_rate
                updates['risk_free_rate'] = risk_free_rate
            if common_pairs is not None:
                self.config.common_pairs = common_pairs
                updates['common_pairs'] = common_pairs
            
            return {"configuration_updated": updates}
        
        @self.mcp.tool()
        async def risk_analysis() -> dict:
            """Perform comprehensive risk analysis"""
            if not self.optimizer:
                await self._initialize_optimizer()
            
            metrics = await self.optimizer.get_portfolio_metrics()
            risk_violations = self.optimizer.risk_manager.check_risk_limits(metrics)
            
            # Calculate additional risk metrics
            returns_df = self.optimizer.calculate_returns_matrix()
            if not returns_df.empty:
                portfolio_weights = np.array([
                    metrics.weights.get(symbol, 0) for symbol in returns_df.columns
                ])
                portfolio_returns = returns_df.dot(portfolio_weights)
                var_95 = self.optimizer.risk_manager.calculate_var(portfolio_returns, 0.05)
                var_99 = self.optimizer.risk_manager.calculate_var(portfolio_returns, 0.01)
            else:
                var_95 = var_99 = 0
            
            return {
                "current_metrics": asdict(metrics),
                "risk_violations": risk_violations,
                "value_at_risk": {
                    "95%": float(var_95),
                    "99%": float(var_99)
                },
                "risk_recommendations": self._generate_risk_recommendations(
                    metrics, risk_violations
                )
            }
    
    async def _initialize_optimizer(self):
        """Initialize the portfolio optimizer"""
        self.optimizer = PortfolioOptimizer(self.config)
        await self.optimizer.initialize()
    
    def _check_rebalancing_needed(self, current: Dict[str, float], 
                                 optimal: Dict[str, float]) -> bool:
        """Check if rebalancing is needed based on threshold"""
        for symbol in optimal.keys():
            current_weight = current.get(symbol, 0)
            optimal_weight = optimal[symbol]
            if abs(current_weight - optimal_weight) > self.config.rebalance_threshold:
                return True
        return False
    
    def _calculate_improvement(self, current: Dict[str, float], 
                             optimal: Dict[str, float]) -> Dict[str, float]:
        """Calculate expected improvement from rebalancing"""
        # Simplified improvement calculation
        return {
            "expected_sharpe_improvement": 0.1,  # Placeholder
            "expected_volatility_reduction": 0.02,  # Placeholder
            "diversification_improvement": 0.05  # Placeholder
        }
    
    def _generate_risk_recommendations(self, metrics: PortfolioMetrics, 
                                     violations: Dict) -> List[str]:
        """Generate risk management recommendations"""
        recommendations = []
        
        if violations:
            if 'volatility' in violations:
                recommendations.append(
                    "Reduce portfolio volatility by increasing allocation to lower-risk assets"
                )
            
            if 'drawdown' in violations:
                recommendations.append(
                    "Current drawdown exceeds limit. Consider reducing position sizes or implementing stop-losses"
                )
            
            if 'sharpe' in violations:
                recommendations.append(
                    "Portfolio Sharpe ratio is below target. Consider rebalancing to optimal weights"
                )
        else:
            recommendations.append("Portfolio is within all risk limits")
        
        if metrics.available_cash / metrics.total_value > 0.1:
            recommendations.append(
                "High cash allocation detected. Consider investing excess cash according to optimal allocation"
            )
        
        return recommendations
    
    async def run(self):
        """Run the FastMCP server"""
        logger.info("Portfolio Optimizer FastMCP Server started")
        await self.mcp.run()
 
async def main():
    """Main entry point"""
    server = PortfolioMCPServer()
    await server.run()
 
if __name__ == "__main__":
    asyncio.run(main())

Usage Examples

1. Portfolio Status Check

# Get current portfolio status
portfolio_status = await server.read_resource("portfolio://status")
print(json.dumps(portfolio_status, indent=2))

2. Direct MT5 Data Retrieval

# Initialize MT5 interface
mt5_interface = MetaTraderInterface()
await mt5_interface.initialize()
 
# Get high-frequency tick data for backtesting
start_date = datetime.now() - timedelta(days=30)
end_date = datetime.now()
tick_data = mt5_interface.get_ticks("EURUSD", start_date, end_date)
 
# Get paginated M1 data for long-term analysis
m1_data = mt5_interface.get_m1_rates_paginated("EURUSD", pages=3)
 
# Get standard hourly data
hourly_data = mt5_interface.get_historical_data("EURUSD", mt5.TIMEFRAME_H1, 1000)

3. Efficient Frontier Analysis

# Get efficient frontier data
frontier_data = await server.read_resource("portfolio://efficient-frontier")
print("Optimal Sharpe Ratio:", frontier_data['optimal_portfolio']['sharpe_ratio'])

4. Risk Analysis

# Perform risk analysis
risk_result = await server.call_tool("risk_analysis", {})
print("Risk Violations:", risk_result)

5. Portfolio Optimization

# Optimize portfolio
optimization_result = await server.call_tool("optimize_portfolio", {})
print("Optimal Allocation:", optimization_result)

6. Rebalancing

# Rebalance portfolio (with confirmation)
rebalance_result = await server.call_tool("rebalance_portfolio", {"confirm": True})
print("Rebalancing Results:", rebalance_result)

Configuration

The system can be configured through the PortfolioConfig class:

config = PortfolioConfig(
    max_position_size=0.25,      # Maximum 25% in any position
    target_return=0.15,          # Target 15% annual return
    max_volatility=0.12,         # Maximum 12% volatility
    rebalance_threshold=0.03,    # Rebalance at 3% drift
    max_drawdown=0.08           # Maximum 8% drawdown
)

Key Features

  1. Real-time Portfolio Analysis: Live metrics using CAGR, sharpe_ratio, and volatility
  2. Efficient Frontier Optimization: Implementation of Markowitz optimization
  3. Risk Management: Comprehensive risk controls and violation monitoring
  4. Automated Rebalancing: Smart rebalancing based on drift thresholds
  5. Advanced MT5 Data Retrieval:
    • High-frequency tick data using copy_ticks_range
    • Paginated data collection for large datasets
    • Automatic handling of MT5 timeframe limitations
    • Timezone-aware data processing
  6. MetaTrader Integration: Direct connection to MT5 for live trading
  7. Configurable Parameters: Customizable risk and return targets including Brazilian Bovespa stocks
  8. Rebalancing Preview: See portfolio changes before execution
  9. FastMCP Protocol: Modern MCP implementation for AI agent integration

MT5 Data Retrieval Methods

The system provides multiple methods for retrieving historical data from MetaTrader 5:

Tick Data Retrieval

  • High-frequency data: Uses mt5.copy_ticks_range() for precise tick-by-tick analysis
  • Automatic pagination: Handles MT5’s 58-day limit for M1 timeframes
  • Data deduplication: Removes duplicate timestamps keeping the latest values

Rate Data Retrieval

  • Standard timeframes: H1, H4, D1 using mt5.copy_rates_from_pos()
  • Date range queries: Specific period analysis with mt5.copy_rates_range()
  • Flexible timeframe support: Adapts method based on requested timeframe and data size

Data Processing Features

  • Timezone handling: Configurable timezone support (default: Brazil/West)
  • Price consolidation: Combines bid/ask spreads or uses last trade price
  • Error handling: Robust error handling with logging and fallback mechanisms

Risk Management Features

  • Position Size Limits: Prevents over-concentration
  • Volatility Controls: Maintains portfolio within risk tolerance
  • Drawdown Monitoring: Tracks current and maximum drawdowns
  • Value at Risk: Calculates potential losses at various confidence levels
  • Sharpe Ratio Optimization: Ensures risk-adjusted return efficiency

This comprehensive system provides institutional-grade portfolio management capabilities while remaining accessible through the MCP protocol for AI agent integration.