MetaTrader Portfolio Optimizer: Quantitative Trading MCP Server
This article presents a comprehensive Python MCP (Model Context Protocol) server that integrates with MetaTrader to provide advanced portfolio optimization, risk management, and automated trading capabilities. The system implements concepts from efficient_frontier, CAGR, Markowitz optimization, sharpe_ratio, and risk_parity to deliver institutional-grade portfolio management.
System Architecture
The MCP server provides:
- Real-time portfolio analysis using efficient_frontier theory
- CAGR and performance metrics calculation
- Risk management based on volatility and drawdown limits
- Automated rebalancing using portfolio_optimization
- Sharpe_ratio optimization for risk-adjusted returns
- Configurable parameters for personalized trading strategies
Installation and Dependencies
# requirements.txt
mcp>=1.0.0
MetaTrader5>=5.0.4500
numpy>=1.24.0
pandas>=2.0.0
scipy>=1.10.0
pytz>=2023.3
plotly>=5.15.0
python-dateutil>=2.8.0
asyncio-mqtt>=0.16.0
Complete MCP Server Implementation
#!/usr/bin/env python3
"""
MetaTrader Portfolio Optimizer MCP Server
Implements efficient frontier, CAGR optimization, and risk management
"""
import asyncio
import json
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, asdict
from scipy.optimize import minimize
import MetaTrader5 as mt5
import pytz
from itertools import product
# FastMCP Server imports
from fastmcp import FastMCP
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PortfolioConfig:
"""Configuration parameters for portfolio optimization"""
max_position_size: float = 0.20 # Maximum 20% in any single position
min_position_size: float = 0.01 # Minimum 1% in any position
rebalance_threshold: float = 0.05 # Rebalance when 5% drift
max_volatility: float = 0.15 # Maximum 15% annual volatility
target_return: float = 0.12 # Target 12% annual return
risk_free_rate: float = 0.03 # 3% risk-free rate
lookback_days: int = 252 # 1 year lookback for calculations
min_sharpe_ratio: float = 0.5 # Minimum acceptable Sharpe ratio
max_drawdown: float = 0.10 # Maximum 10% drawdown tolerance
leverage_limit: float = 1.0 # No leverage by default
transaction_cost: float = 0.001 # 0.1% transaction cost
# Configurable symbol pairs (Brazilian Bovespa stocks)
common_pairs: List[str] = None
def __post_init__(self):
if self.common_pairs is None:
self.common_pairs = [
'PETR4.SA', # Petrobras
'VALE3.SA', # Vale
'ITUB4.SA', # Itaú Unibanco
'BBDC4.SA', # Bradesco
'ABEV3.SA', # Ambev
'MGLU3.SA', # Magazine Luiza
'WEGE3.SA', # WEG
'RENT3.SA', # Localiza
'LREN3.SA', # Lojas Renner
'GGBR4.SA' # Gerdau
]
@dataclass
class PortfolioMetrics:
"""Portfolio performance and risk metrics"""
total_value: float
available_cash: float
invested_amount: float
daily_return: float
cagr: float
volatility: float
sharpe_ratio: float
max_drawdown: float
current_drawdown: float
positions: Dict[str, float]
weights: Dict[str, float]
class EfficientFrontier:
"""Efficient Frontier calculation and optimization"""
def __init__(self, returns: pd.DataFrame, config: PortfolioConfig):
self.returns = returns
self.config = config
self.mean_returns = returns.mean() * 252 # Annualized
self.cov_matrix = returns.cov() * 252 # Annualized
self.assets = list(returns.columns)
self.n_assets = len(self.assets)
def calculate_portfolio_stats(self, weights: np.ndarray) -> Tuple[float, float, float]:
"""Calculate portfolio return, volatility, and Sharpe ratio"""
portfolio_return = np.sum(self.mean_returns * weights)
portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix, weights)))
sharpe_ratio = (portfolio_return - self.config.risk_free_rate) / portfolio_volatility
return portfolio_return, portfolio_volatility, sharpe_ratio
def efficient_frontier_curve(self, num_points: int = 100) -> pd.DataFrame:
"""Generate efficient frontier curve"""
target_returns = np.linspace(
self.mean_returns.min(),
self.mean_returns.max(),
num_points
)
efficient_portfolios = []
for target in target_returns:
try:
weights = self.minimize_volatility_for_return(target)
if weights is not None:
ret, vol, sharpe = self.calculate_portfolio_stats(weights)
efficient_portfolios.append({
'return': ret,
'volatility': vol,
'sharpe_ratio': sharpe,
'weights': weights.tolist()
})
except:
continue
return pd.DataFrame(efficient_portfolios)
def minimize_volatility_for_return(self, target_return: float) -> Optional[np.ndarray]:
"""Find minimum volatility portfolio for target return"""
constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # Weights sum to 1
{'type': 'eq', 'fun': lambda x: np.sum(self.mean_returns * x) - target_return}
]
bounds = [(self.config.min_position_size, self.config.max_position_size)
for _ in range(self.n_assets)]
initial_guess = np.array([1/self.n_assets] * self.n_assets)
def objective(weights):
return np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix, weights)))
result = minimize(objective, initial_guess, method='SLSQP',
bounds=bounds, constraints=constraints)
return result.x if result.success else None
def maximize_sharpe_ratio(self) -> Tuple[np.ndarray, float]:
"""Find portfolio with maximum Sharpe ratio"""
constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}]
bounds = [(self.config.min_position_size, self.config.max_position_size)
for _ in range(self.n_assets)]
def negative_sharpe(weights):
ret, vol, sharpe = self.calculate_portfolio_stats(weights)
return -sharpe # Minimize negative Sharpe = maximize Sharpe
initial_guess = np.array([1/self.n_assets] * self.n_assets)
result = minimize(negative_sharpe, initial_guess, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
optimal_weights = result.x
_, _, sharpe = self.calculate_portfolio_stats(optimal_weights)
return optimal_weights, sharpe
else:
return np.array([1/self.n_assets] * self.n_assets), 0.0
class RiskManager:
"""Risk management system with multiple risk controls"""
def __init__(self, config: PortfolioConfig):
self.config = config
self.position_history = []
self.returns_history = []
def calculate_var(self, returns: pd.Series, confidence: float = 0.05) -> float:
"""Calculate Value at Risk"""
return np.percentile(returns, confidence * 100)
def calculate_max_drawdown(self, equity_curve: pd.Series) -> Tuple[float, float]:
"""Calculate maximum and current drawdown"""
peak = equity_curve.expanding().max()
drawdown = (equity_curve - peak) / peak
max_dd = drawdown.min()
current_dd = drawdown.iloc[-1] if len(drawdown) > 0 else 0
return abs(max_dd), abs(current_dd)
def check_risk_limits(self, portfolio_metrics: PortfolioMetrics) -> Dict[str, Any]:
"""Check all risk limits and return violations"""
violations = {}
# Volatility check
if portfolio_metrics.volatility > self.config.max_volatility:
violations['volatility'] = {
'current': portfolio_metrics.volatility,
'limit': self.config.max_volatility,
'action': 'reduce_risk'
}
# Drawdown check
if portfolio_metrics.current_drawdown > self.config.max_drawdown:
violations['drawdown'] = {
'current': portfolio_metrics.current_drawdown,
'limit': self.config.max_drawdown,
'action': 'stop_loss'
}
# Sharpe ratio check
if portfolio_metrics.sharpe_ratio < self.config.min_sharpe_ratio:
violations['sharpe'] = {
'current': portfolio_metrics.sharpe_ratio,
'limit': self.config.min_sharpe_ratio,
'action': 'reoptimize'
}
# Position size check
for symbol, weight in portfolio_metrics.weights.items():
if weight > self.config.max_position_size:
violations[f'position_{symbol}'] = {
'current': weight,
'limit': self.config.max_position_size,
'action': 'reduce_position'
}
return violations
class MetaTraderInterface:
"""Interface to MetaTrader 5 platform"""
def __init__(self):
self.connected = False
self.account_info = None
self.timezone = pytz.timezone("Brazil/West") # Configure based on your broker
self.max_days_m1 = 58 # MT5 limit for M1 timeframe
async def initialize(self) -> bool:
"""Initialize MetaTrader connection"""
try:
if not mt5.initialize():
logger.error("MetaTrader5 initialization failed")
return False
self.account_info = mt5.account_info()
if self.account_info is None:
logger.error("Failed to get account info")
return False
self.connected = True
logger.info(f"Connected to MT5 account: {self.account_info.login}")
return True
except Exception as e:
logger.error(f"MT5 initialization error: {e}")
return False
def get_account_balance(self) -> Dict[str, float]:
"""Get current account balance and equity"""
if not self.connected:
return {'balance': 0, 'equity': 0, 'margin': 0, 'free_margin': 0}
info = mt5.account_info()
return {
'balance': float(info.balance),
'equity': float(info.equity),
'margin': float(info.margin),
'free_margin': float(info.margin_free)
}
def get_positions(self) -> List[Dict]:
"""Get all open positions"""
if not self.connected:
return []
positions = mt5.positions_get()
if positions is None:
return []
return [
{
'symbol': pos.symbol,
'volume': pos.volume,
'type': 'BUY' if pos.type == 0 else 'SELL',
'price': pos.price_open,
'current_price': pos.price_current,
'profit': pos.profit,
'swap': pos.swap,
'commission': pos.commission
}
for pos in positions
]
def get_ticks(self, symbol: str, start: datetime, end: datetime) -> List[Tuple]:
"""Get tick data for a specific time range"""
try:
ticks = mt5.copy_ticks_range(symbol, start, end, mt5.COPY_TICKS_TRADE)
if ticks is None:
logger.warning(f"No tick data for {symbol} from {start} to {end}")
return []
t_df = pd.DataFrame(ticks).drop_duplicates(subset='time', keep="last")
t_list = []
for row in t_df.itertuples(index=False):
t_list.append(row)
return t_list
except Exception as e:
logger.error(f"Error getting tick data for {symbol}: {e}")
return []
def get_m1_rates_paginated(self, symbol: str, pages: int) -> List[Tuple]:
"""Get M1 rates data with pagination to handle MT5 limits"""
try:
all_data = []
for i in range(pages, 0, -1):
now = datetime.now()
now = now - timedelta(days=1) # Avoid incomplete current day
inc_d = 1 if i-1 == 0 else 0
start_now_minus = now - timedelta(days=self.max_days_m1*i)
end_now_minus = now - timedelta(days=self.max_days_m1*(i-1))
start = datetime(start_now_minus.year, start_now_minus.month,
start_now_minus.day, tzinfo=self.timezone)
end = datetime(end_now_minus.year, end_now_minus.month,
end_now_minus.day + inc_d, tzinfo=self.timezone)
logger.info(f'Fetching {symbol} data: start={start}, end={end}')
tick_data = self.get_ticks(symbol, start, end)
if tick_data:
all_data.extend(tick_data)
return all_data
except Exception as e:
logger.error(f"Error getting paginated data for {symbol}: {e}")
return []
def get_historical_data(self, symbol: str, timeframe: int, count: int) -> pd.DataFrame:
"""Get historical price data using MT5 rates or ticks"""
try:
# For high-frequency data, use tick-based approach
if timeframe == mt5.TIMEFRAME_M1 and count > 1000:
pages = max(1, count // (self.max_days_m1 * 24 * 60)) # Estimate pages needed
tick_data = self.get_m1_rates_paginated(symbol, pages)
if not tick_data:
return pd.DataFrame()
# Convert tick data to DataFrame
df_data = []
for tick in tick_data:
df_data.append({
'time': pd.to_datetime(tick.time, unit='s'),
'bid': tick.bid,
'ask': tick.ask,
'last': tick.last,
'volume': tick.volume
})
if not df_data:
return pd.DataFrame()
df = pd.DataFrame(df_data)
df.set_index('time', inplace=True)
# Use 'last' price if available, otherwise average of bid/ask
df['close'] = df['last'].fillna((df['bid'] + df['ask']) / 2)
return df[['close']].dropna()
else:
# Use standard rates for other timeframes or smaller datasets
rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, count)
if rates is None:
logger.warning(f"No rates data for {symbol}")
return pd.DataFrame()
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.set_index('time', inplace=True)
return df[['close']] # Return only close prices
except Exception as e:
logger.error(f"Error getting historical data for {symbol}: {e}")
return pd.DataFrame()
def get_historical_data_range(self, symbol: str, start_date: datetime,
end_date: datetime, timeframe: int = mt5.TIMEFRAME_H1) -> pd.DataFrame:
"""Get historical data for a specific date range"""
try:
# Convert to timezone-aware datetimes
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=self.timezone)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=self.timezone)
rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date)
if rates is None:
logger.warning(f"No data for {symbol} from {start_date} to {end_date}")
return pd.DataFrame()
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.set_index('time', inplace=True)
return df[['close']]
except Exception as e:
logger.error(f"Error getting range data for {symbol}: {e}")
return pd.DataFrame()
async def place_order(self, symbol: str, order_type: str, volume: float,
price: Optional[float] = None) -> Dict:
"""Place trading order"""
try:
symbol_info = mt5.symbol_info(symbol)
if symbol_info is None:
return {'success': False, 'error': f'Symbol {symbol} not found'}
if not symbol_info.visible:
mt5.symbol_select(symbol, True)
# Prepare order request
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": volume,
"type_filling": mt5.ORDER_FILLING_IOC,
"comment": "Portfolio Optimizer",
}
if order_type.upper() == 'BUY':
request["type"] = mt5.ORDER_TYPE_BUY
request["price"] = mt5.symbol_info_tick(symbol).ask if price is None else price
else:
request["type"] = mt5.ORDER_TYPE_SELL
request["price"] = mt5.symbol_info_tick(symbol).bid if price is None else price
result = mt5.order_send(request)
if result.retcode != mt5.TRADE_RETCODE_DONE:
return {
'success': False,
'error': f'Order failed: {result.comment}',
'retcode': result.retcode
}
return {
'success': True,
'ticket': result.order,
'volume': result.volume,
'price': result.price
}
except Exception as e:
return {'success': False, 'error': str(e)}
class PortfolioOptimizer:
"""Main portfolio optimization and management system"""
def __init__(self, config: PortfolioConfig):
self.config = config
self.mt5 = MetaTraderInterface()
self.risk_manager = RiskManager(config)
self.last_rebalance = None
self.symbols = [] # Will be populated from MT5
async def initialize(self):
"""Initialize the optimizer"""
await self.mt5.initialize()
self.symbols = self._get_tradeable_symbols()
logger.info(f"Initialized with {len(self.symbols)} symbols")
def _get_tradeable_symbols(self) -> List[str]:
"""Get list of tradeable symbols from MT5"""
available_symbols = []
for symbol in self.config.common_pairs:
symbol_info = mt5.symbol_info(symbol)
if symbol_info is not None and symbol_info.visible:
available_symbols.append(symbol)
return available_symbols[:5] # Limit to 5 symbols for demo
def calculate_returns_matrix(self) -> pd.DataFrame:
"""Calculate returns matrix for all symbols"""
returns_data = {}
for symbol in self.symbols:
df = self.mt5.get_historical_data(symbol, mt5.TIMEFRAME_H1,
self.config.lookback_days * 24)
if not df.empty:
returns = df['close'].pct_change().dropna()
returns_data[symbol] = returns
if not returns_data:
return pd.DataFrame()
# Align all return series
returns_df = pd.DataFrame(returns_data)
returns_df = returns_df.dropna()
return returns_df
def calculate_cagr(self, equity_curve: pd.Series) -> float:
"""Calculate Compound Annual Growth Rate"""
if len(equity_curve) < 2:
return 0.0
start_value = equity_curve.iloc[0]
end_value = equity_curve.iloc[-1]
years = len(equity_curve) / (252 * 24) # Assuming hourly data
if start_value <= 0 or years <= 0:
return 0.0
cagr = (end_value / start_value) ** (1 / years) - 1
return cagr
async def get_portfolio_metrics(self) -> PortfolioMetrics:
"""Calculate comprehensive portfolio metrics"""
account_info = self.mt5.get_account_balance()
positions = self.mt5.get_positions()
total_value = account_info['equity']
available_cash = account_info['free_margin']
# Calculate position values and weights
position_values = {}
total_invested = 0
for pos in positions:
value = pos['volume'] * pos['current_price'] * \
(1 if pos['type'] == 'BUY' else -1)
position_values[pos['symbol']] = value
total_invested += abs(value)
weights = {symbol: value / total_value if total_value > 0 else 0
for symbol, value in position_values.items()}
# Calculate returns for metrics
returns_df = self.calculate_returns_matrix()
if returns_df.empty:
return PortfolioMetrics(
total_value=total_value,
available_cash=available_cash,
invested_amount=total_invested,
daily_return=0.0,
cagr=0.0,
volatility=0.0,
sharpe_ratio=0.0,
max_drawdown=0.0,
current_drawdown=0.0,
positions=position_values,
weights=weights
)
# Portfolio performance calculations
portfolio_weights = np.array([weights.get(symbol, 0) for symbol in returns_df.columns])
portfolio_returns = returns_df.dot(portfolio_weights)
daily_return = portfolio_returns.iloc[-1] if len(portfolio_returns) > 0 else 0
cagr = self.calculate_cagr(portfolio_returns.cumsum())
volatility = portfolio_returns.std() * np.sqrt(252 * 24) # Annualized
# Sharpe ratio
excess_returns = portfolio_returns - self.config.risk_free_rate / (252 * 24)
sharpe_ratio = excess_returns.mean() / excess_returns.std() * np.sqrt(252 * 24) \
if excess_returns.std() > 0 else 0
# Drawdown calculations
equity_curve = (1 + portfolio_returns).cumprod()
max_dd, current_dd = self.risk_manager.calculate_max_drawdown(equity_curve)
return PortfolioMetrics(
total_value=total_value,
available_cash=available_cash,
invested_amount=total_invested,
daily_return=daily_return,
cagr=cagr,
volatility=volatility,
sharpe_ratio=sharpe_ratio,
max_drawdown=max_dd,
current_drawdown=current_dd,
positions=position_values,
weights=weights
)
async def optimize_portfolio(self) -> Dict[str, float]:
"""Calculate optimal portfolio weights using efficient frontier"""
returns_df = self.calculate_returns_matrix()
if returns_df.empty:
return {}
ef = EfficientFrontier(returns_df, self.config)
optimal_weights, max_sharpe = ef.maximize_sharpe_ratio()
# Convert to dictionary
optimal_allocation = {
symbol: weight for symbol, weight in zip(self.symbols, optimal_weights)
if weight >= self.config.min_position_size
}
logger.info(f"Optimal portfolio (Sharpe: {max_sharpe:.3f}): {optimal_allocation}")
return optimal_allocation
async def preview_rebalance(self, target_weights: Dict[str, float]) -> Dict[str, Any]:
"""Preview rebalancing changes without executing trades"""
current_metrics = await self.get_portfolio_metrics()
account_info = self.mt5.get_account_balance()
rebalance_trades = []
total_value = account_info['equity']
# Calculate required position changes
for symbol, target_weight in target_weights.items():
current_weight = current_metrics.weights.get(symbol, 0)
weight_diff = target_weight - current_weight
# Only rebalance if difference exceeds threshold
if abs(weight_diff) > self.config.rebalance_threshold:
target_value = target_weight * total_value
current_value = current_metrics.positions.get(symbol, 0)
value_change = target_value - current_value
# Convert to volume (simplified - assumes 1:1 ratio)
volume = abs(value_change) / 100000 # Standard lot size
order_type = 'BUY' if value_change > 0 else 'SELL'
if volume >= 0.01: # Minimum volume
rebalance_trades.append({
'symbol': symbol,
'type': order_type,
'volume': round(volume, 2),
'weight_change': weight_diff,
'current_weight': current_weight,
'target_weight': target_weight,
'value_change': value_change
})
return {
'preview_time': datetime.now().isoformat(),
'total_portfolio_value': total_value,
'trades_planned': len(rebalance_trades),
'estimated_transaction_costs': len(rebalance_trades) * self.config.transaction_cost * total_value,
'rebalance_details': rebalance_trades,
'summary': {
'symbols_to_rebalance': len(rebalance_trades),
'total_weight_change': sum(abs(trade['weight_change']) for trade in rebalance_trades),
'largest_weight_change': max((abs(trade['weight_change']) for trade in rebalance_trades), default=0)
}
}
async def rebalance_portfolio(self, target_weights: Dict[str, float], execute: bool = True) -> Dict[str, Any]:
"""Rebalance portfolio to target weights with optional execution"""
# Always show preview first
preview_result = await self.preview_rebalance(target_weights)
if not execute:
return {
'preview_only': True,
**preview_result
}
# Execute trades if requested
current_metrics = await self.get_portfolio_metrics()
account_info = self.mt5.get_account_balance()
rebalance_trades = []
total_value = account_info['equity']
# Calculate required position changes
for symbol, target_weight in target_weights.items():
current_weight = current_metrics.weights.get(symbol, 0)
weight_diff = target_weight - current_weight
# Only rebalance if difference exceeds threshold
if abs(weight_diff) > self.config.rebalance_threshold:
target_value = target_weight * total_value
current_value = current_metrics.positions.get(symbol, 0)
value_change = target_value - current_value
# Convert to volume (simplified - assumes 1:1 ratio)
volume = abs(value_change) / 100000 # Standard lot size
order_type = 'BUY' if value_change > 0 else 'SELL'
if volume >= 0.01: # Minimum volume
rebalance_trades.append({
'symbol': symbol,
'type': order_type,
'volume': round(volume, 2),
'weight_change': weight_diff
})
# Execute trades
execution_results = []
for trade in rebalance_trades:
result = await self.mt5.place_order(
trade['symbol'],
trade['type'],
trade['volume']
)
execution_results.append({
'trade': trade,
'result': result
})
self.last_rebalance = datetime.now()
return {
'preview': preview_result,
'execution': {
'rebalance_time': self.last_rebalance.isoformat(),
'trades_planned': len(rebalance_trades),
'trades_executed': sum(1 for r in execution_results if r['result']['success']),
'execution_details': execution_results
}
}
# FastMCP Server Implementation
class PortfolioMCPServer:
"""FastMCP Server for portfolio optimization"""
def __init__(self):
self.mcp = FastMCP("portfolio-optimizer")
self.config = PortfolioConfig()
self.optimizer = None
self._setup_handlers()
def _setup_handlers(self):
"""Setup FastMCP server handlers"""
@self.mcp.resource("portfolio://status")
async def portfolio_status() -> dict:
"""Current portfolio metrics and analysis"""
if not self.optimizer:
await self._initialize_optimizer()
metrics = await self.optimizer.get_portfolio_metrics()
risk_check = self.optimizer.risk_manager.check_risk_limits(metrics)
return {
"portfolio_metrics": asdict(metrics),
"risk_violations": risk_check,
"last_update": datetime.now().isoformat()
}
@self.mcp.resource("portfolio://efficient-frontier")
async def efficient_frontier() -> dict:
"""Efficient frontier analysis and optimization"""
if not self.optimizer:
await self._initialize_optimizer()
returns_df = self.optimizer.calculate_returns_matrix()
if returns_df.empty:
return {"error": "No data available"}
ef = EfficientFrontier(returns_df, self.config)
frontier_data = ef.efficient_frontier_curve()
optimal_weights, max_sharpe = ef.maximize_sharpe_ratio()
return {
"efficient_frontier": frontier_data.to_dict('records'),
"optimal_portfolio": {
"weights": dict(zip(self.optimizer.symbols, optimal_weights)),
"sharpe_ratio": max_sharpe
},
"current_symbols": self.optimizer.symbols
}
@self.mcp.resource("portfolio://config")
async def portfolio_config() -> dict:
"""Portfolio optimization parameters"""
return asdict(self.config)
@self.mcp.tool()
async def optimize_portfolio() -> dict:
"""Calculate optimal portfolio allocation using efficient frontier"""
if not self.optimizer:
await self._initialize_optimizer()
optimal_weights = await self.optimizer.optimize_portfolio()
current_metrics = await self.optimizer.get_portfolio_metrics()
return {
"optimal_allocation": optimal_weights,
"current_allocation": current_metrics.weights,
"rebalancing_needed": self._check_rebalancing_needed(
current_metrics.weights, optimal_weights
),
"expected_improvement": self._calculate_improvement(
current_metrics.weights, optimal_weights
)
}
@self.mcp.tool()
async def preview_rebalance() -> dict:
"""Preview portfolio rebalancing without executing trades"""
if not self.optimizer:
await self._initialize_optimizer()
optimal_weights = await self.optimizer.optimize_portfolio()
preview_result = await self.optimizer.preview_rebalance(optimal_weights)
return preview_result
@self.mcp.tool()
async def rebalance_portfolio(execute: bool = False) -> dict:
"""Rebalance portfolio to optimal weights with optional execution"""
if not self.optimizer:
await self._initialize_optimizer()
optimal_weights = await self.optimizer.optimize_portfolio()
rebalance_result = await self.optimizer.rebalance_portfolio(optimal_weights, execute)
return rebalance_result
@self.mcp.tool()
async def update_config(
max_volatility: float = None,
target_return: float = None,
max_position_size: float = None,
rebalance_threshold: float = None,
risk_free_rate: float = None,
common_pairs: List[str] = None
) -> dict:
"""Update portfolio optimization parameters"""
updates = {}
if max_volatility is not None:
self.config.max_volatility = max_volatility
updates['max_volatility'] = max_volatility
if target_return is not None:
self.config.target_return = target_return
updates['target_return'] = target_return
if max_position_size is not None:
self.config.max_position_size = max_position_size
updates['max_position_size'] = max_position_size
if rebalance_threshold is not None:
self.config.rebalance_threshold = rebalance_threshold
updates['rebalance_threshold'] = rebalance_threshold
if risk_free_rate is not None:
self.config.risk_free_rate = risk_free_rate
updates['risk_free_rate'] = risk_free_rate
if common_pairs is not None:
self.config.common_pairs = common_pairs
updates['common_pairs'] = common_pairs
return {"configuration_updated": updates}
@self.mcp.tool()
async def risk_analysis() -> dict:
"""Perform comprehensive risk analysis"""
if not self.optimizer:
await self._initialize_optimizer()
metrics = await self.optimizer.get_portfolio_metrics()
risk_violations = self.optimizer.risk_manager.check_risk_limits(metrics)
# Calculate additional risk metrics
returns_df = self.optimizer.calculate_returns_matrix()
if not returns_df.empty:
portfolio_weights = np.array([
metrics.weights.get(symbol, 0) for symbol in returns_df.columns
])
portfolio_returns = returns_df.dot(portfolio_weights)
var_95 = self.optimizer.risk_manager.calculate_var(portfolio_returns, 0.05)
var_99 = self.optimizer.risk_manager.calculate_var(portfolio_returns, 0.01)
else:
var_95 = var_99 = 0
return {
"current_metrics": asdict(metrics),
"risk_violations": risk_violations,
"value_at_risk": {
"95%": float(var_95),
"99%": float(var_99)
},
"risk_recommendations": self._generate_risk_recommendations(
metrics, risk_violations
)
}
async def _initialize_optimizer(self):
"""Initialize the portfolio optimizer"""
self.optimizer = PortfolioOptimizer(self.config)
await self.optimizer.initialize()
def _check_rebalancing_needed(self, current: Dict[str, float],
optimal: Dict[str, float]) -> bool:
"""Check if rebalancing is needed based on threshold"""
for symbol in optimal.keys():
current_weight = current.get(symbol, 0)
optimal_weight = optimal[symbol]
if abs(current_weight - optimal_weight) > self.config.rebalance_threshold:
return True
return False
def _calculate_improvement(self, current: Dict[str, float],
optimal: Dict[str, float]) -> Dict[str, float]:
"""Calculate expected improvement from rebalancing"""
# Simplified improvement calculation
return {
"expected_sharpe_improvement": 0.1, # Placeholder
"expected_volatility_reduction": 0.02, # Placeholder
"diversification_improvement": 0.05 # Placeholder
}
def _generate_risk_recommendations(self, metrics: PortfolioMetrics,
violations: Dict) -> List[str]:
"""Generate risk management recommendations"""
recommendations = []
if violations:
if 'volatility' in violations:
recommendations.append(
"Reduce portfolio volatility by increasing allocation to lower-risk assets"
)
if 'drawdown' in violations:
recommendations.append(
"Current drawdown exceeds limit. Consider reducing position sizes or implementing stop-losses"
)
if 'sharpe' in violations:
recommendations.append(
"Portfolio Sharpe ratio is below target. Consider rebalancing to optimal weights"
)
else:
recommendations.append("Portfolio is within all risk limits")
if metrics.available_cash / metrics.total_value > 0.1:
recommendations.append(
"High cash allocation detected. Consider investing excess cash according to optimal allocation"
)
return recommendations
async def run(self):
"""Run the FastMCP server"""
logger.info("Portfolio Optimizer FastMCP Server started")
await self.mcp.run()
async def main():
"""Main entry point"""
server = PortfolioMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())
Usage Examples
1. Portfolio Status Check
# Get current portfolio status
portfolio_status = await server.read_resource("portfolio://status")
print(json.dumps(portfolio_status, indent=2))
2. Direct MT5 Data Retrieval
# Initialize MT5 interface
mt5_interface = MetaTraderInterface()
await mt5_interface.initialize()
# Get high-frequency tick data for backtesting
start_date = datetime.now() - timedelta(days=30)
end_date = datetime.now()
tick_data = mt5_interface.get_ticks("EURUSD", start_date, end_date)
# Get paginated M1 data for long-term analysis
m1_data = mt5_interface.get_m1_rates_paginated("EURUSD", pages=3)
# Get standard hourly data
hourly_data = mt5_interface.get_historical_data("EURUSD", mt5.TIMEFRAME_H1, 1000)
3. Efficient Frontier Analysis
# Get efficient frontier data
frontier_data = await server.read_resource("portfolio://efficient-frontier")
print("Optimal Sharpe Ratio:", frontier_data['optimal_portfolio']['sharpe_ratio'])
4. Risk Analysis
# Perform risk analysis
risk_result = await server.call_tool("risk_analysis", {})
print("Risk Violations:", risk_result)
5. Portfolio Optimization
# Optimize portfolio
optimization_result = await server.call_tool("optimize_portfolio", {})
print("Optimal Allocation:", optimization_result)
6. Rebalancing
# Rebalance portfolio (with confirmation)
rebalance_result = await server.call_tool("rebalance_portfolio", {"confirm": True})
print("Rebalancing Results:", rebalance_result)
Configuration
The system can be configured through the PortfolioConfig
class:
config = PortfolioConfig(
max_position_size=0.25, # Maximum 25% in any position
target_return=0.15, # Target 15% annual return
max_volatility=0.12, # Maximum 12% volatility
rebalance_threshold=0.03, # Rebalance at 3% drift
max_drawdown=0.08 # Maximum 8% drawdown
)
Key Features
- Real-time Portfolio Analysis: Live metrics using CAGR, sharpe_ratio, and volatility
- Efficient Frontier Optimization: Implementation of Markowitz optimization
- Risk Management: Comprehensive risk controls and violation monitoring
- Automated Rebalancing: Smart rebalancing based on drift thresholds
- Advanced MT5 Data Retrieval:
- High-frequency tick data using
copy_ticks_range
- Paginated data collection for large datasets
- Automatic handling of MT5 timeframe limitations
- Timezone-aware data processing
- High-frequency tick data using
- MetaTrader Integration: Direct connection to MT5 for live trading
- Configurable Parameters: Customizable risk and return targets including Brazilian Bovespa stocks
- Rebalancing Preview: See portfolio changes before execution
- FastMCP Protocol: Modern MCP implementation for AI agent integration
MT5 Data Retrieval Methods
The system provides multiple methods for retrieving historical data from MetaTrader 5:
Tick Data Retrieval
- High-frequency data: Uses
mt5.copy_ticks_range()
for precise tick-by-tick analysis - Automatic pagination: Handles MT5’s 58-day limit for M1 timeframes
- Data deduplication: Removes duplicate timestamps keeping the latest values
Rate Data Retrieval
- Standard timeframes: H1, H4, D1 using
mt5.copy_rates_from_pos()
- Date range queries: Specific period analysis with
mt5.copy_rates_range()
- Flexible timeframe support: Adapts method based on requested timeframe and data size
Data Processing Features
- Timezone handling: Configurable timezone support (default: Brazil/West)
- Price consolidation: Combines bid/ask spreads or uses last trade price
- Error handling: Robust error handling with logging and fallback mechanisms
Risk Management Features
- Position Size Limits: Prevents over-concentration
- Volatility Controls: Maintains portfolio within risk tolerance
- Drawdown Monitoring: Tracks current and maximum drawdowns
- Value at Risk: Calculates potential losses at various confidence levels
- Sharpe Ratio Optimization: Ensures risk-adjusted return efficiency
This comprehensive system provides institutional-grade portfolio management capabilities while remaining accessible through the MCP protocol for AI agent integration.
Related Topics
- efficient_frontier: Core optimization theory
- CAGR: Performance measurement
- Markowitz: Portfolio theory foundation
- sharpe_ratio: Risk-adjusted returns
- portfolio_optimization: Advanced techniques
- risk_parity: Alternative allocation methods
- volatility: Risk measurement and management