#!/usr/bin/env python3 """ Burmddit MCP Server Model Context Protocol server for autonomous Burmddit management Exposes tools for: - Database queries (articles, categories, analytics) - Content management (publish, update, delete) - Deployment control (Coolify API) - Performance monitoring """ import asyncio import json import sys from typing import Any, Optional import psycopg2 import requests from datetime import datetime, timedelta # MCP SDK imports (to be installed: pip install mcp) try: from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.server.stdio import stdio_server from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) except ImportError: print("ERROR: MCP SDK not installed. Run: pip install mcp", file=sys.stderr) sys.exit(1) class BurmdditMCPServer: """MCP Server for Burmddit autonomous management""" def __init__(self): self.server = Server("burmddit-mcp") self.db_config = self.load_db_config() self.coolify_config = self.load_coolify_config() # Register handlers self._register_handlers() def load_db_config(self) -> dict: """Load database configuration""" try: with open('/home/ubuntu/.openclaw/workspace/.credentials', 'r') as f: for line in f: if line.startswith('DATABASE_URL='): return {'url': line.split('=', 1)[1].strip()} except FileNotFoundError: pass # Fallback to environment or default return { 'host': 'localhost', 'database': 'burmddit', 'user': 'burmddit_user', 'password': 'burmddit_password' } def load_coolify_config(self) -> dict: """Load Coolify API configuration""" try: with open('/home/ubuntu/.openclaw/workspace/.credentials', 'r') as f: for line in f: if line.startswith('COOLIFY_TOKEN='): return { 'token': line.split('=', 1)[1].strip(), 'url': 'https://coolify.qikbite.asia', 'app_uuid': 'ocoock0oskc4cs00o0koo0c8' } except FileNotFoundError: pass return {} def _register_handlers(self): """Register all MCP handlers""" @self.server.list_tools() async def handle_list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="get_site_stats", description="Get Burmddit site statistics (articles, views, categories)", inputSchema={ "type": "object", "properties": { "days": { "type": "number", "description": "Number of days to look back (default: 7)" } } } ), Tool( name="get_articles", description="Query articles by category, tag, or date range", inputSchema={ "type": "object", "properties": { "category": {"type": "string"}, "tag": {"type": "string"}, "status": {"type": "string", "enum": ["draft", "published", "archived"]}, "limit": {"type": "number", "default": 20} } } ), Tool( name="get_article_by_slug", description="Get full article details by slug", inputSchema={ "type": "object", "properties": { "slug": {"type": "string", "description": "Article slug"} }, "required": ["slug"] } ), Tool( name="update_article", description="Update article fields (title, content, status, etc.)", inputSchema={ "type": "object", "properties": { "slug": {"type": "string"}, "updates": { "type": "object", "description": "Fields to update (e.g. {'status': 'published'})" } }, "required": ["slug", "updates"] } ), Tool( name="delete_article", description="Delete or archive an article", inputSchema={ "type": "object", "properties": { "slug": {"type": "string"}, "hard_delete": {"type": "boolean", "default": False} }, "required": ["slug"] } ), Tool( name="get_broken_articles", description="Find articles with translation errors or quality issues", inputSchema={ "type": "object", "properties": { "limit": {"type": "number", "default": 50} } } ), Tool( name="check_deployment_status", description="Check Coolify deployment status for Burmddit", inputSchema={ "type": "object", "properties": {} } ), Tool( name="trigger_deployment", description="Trigger a new deployment via Coolify", inputSchema={ "type": "object", "properties": { "force": {"type": "boolean", "default": False} } } ), Tool( name="get_deployment_logs", description="Fetch recent deployment logs", inputSchema={ "type": "object", "properties": { "lines": {"type": "number", "default": 100} } } ), Tool( name="run_pipeline", description="Manually trigger the content pipeline (scrape, compile, translate, publish)", inputSchema={ "type": "object", "properties": { "target_articles": {"type": "number", "default": 30} } } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Execute tool by name""" if name == "get_site_stats": return await self.get_site_stats(arguments.get("days", 7)) elif name == "get_articles": return await self.get_articles(**arguments) elif name == "get_article_by_slug": return await self.get_article_by_slug(arguments["slug"]) elif name == "update_article": return await self.update_article(arguments["slug"], arguments["updates"]) elif name == "delete_article": return await self.delete_article(arguments["slug"], arguments.get("hard_delete", False)) elif name == "get_broken_articles": return await self.get_broken_articles(arguments.get("limit", 50)) elif name == "check_deployment_status": return await self.check_deployment_status() elif name == "trigger_deployment": return await self.trigger_deployment(arguments.get("force", False)) elif name == "get_deployment_logs": return await self.get_deployment_logs(arguments.get("lines", 100)) elif name == "run_pipeline": return await self.run_pipeline(arguments.get("target_articles", 30)) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] # Tool implementations async def get_site_stats(self, days: int) -> list[TextContent]: """Get site statistics""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() # Total articles cur.execute("SELECT COUNT(*) FROM articles WHERE status = 'published'") total_articles = cur.fetchone()[0] # Recent articles cur.execute(""" SELECT COUNT(*) FROM articles WHERE status = 'published' AND published_at > NOW() - INTERVAL '%s days' """, (days,)) recent_articles = cur.fetchone()[0] # Total views cur.execute("SELECT SUM(view_count) FROM articles WHERE status = 'published'") total_views = cur.fetchone()[0] or 0 # Categories breakdown cur.execute(""" SELECT c.name_burmese, COUNT(a.id) as count FROM categories c LEFT JOIN articles a ON c.id = a.category_id AND a.status = 'published' GROUP BY c.id, c.name_burmese ORDER BY count DESC """) categories = cur.fetchall() cur.close() conn.close() stats = { "total_articles": total_articles, "recent_articles": recent_articles, "recent_days": days, "total_views": total_views, "avg_views_per_article": round(total_views / total_articles, 1) if total_articles > 0 else 0, "categories": [{"name": c[0], "count": c[1]} for c in categories] } return [TextContent( type="text", text=json.dumps(stats, indent=2, ensure_ascii=False) )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def get_articles(self, category: Optional[str] = None, tag: Optional[str] = None, status: Optional[str] = "published", limit: int = 20) -> list[TextContent]: """Query articles""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() query = """ SELECT a.slug, a.title_burmese, a.published_at, a.view_count, a.status, c.name_burmese as category FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE 1=1 """ params = [] if status: query += " AND a.status = %s" params.append(status) if category: query += " AND c.slug = %s" params.append(category) if tag: query += """ AND a.id IN ( SELECT article_id FROM article_tags at JOIN tags t ON at.tag_id = t.id WHERE t.slug = %s )""" params.append(tag) query += " ORDER BY a.published_at DESC LIMIT %s" params.append(limit) cur.execute(query, params) articles = cur.fetchall() cur.close() conn.close() result = [] for a in articles: result.append({ "slug": a[0], "title": a[1], "published_at": str(a[2]), "view_count": a[3], "status": a[4], "category": a[5] }) return [TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def get_article_by_slug(self, slug: str) -> list[TextContent]: """Get full article details""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() cur.execute(""" SELECT a.*, c.name_burmese as category FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.slug = %s """, (slug,)) article = cur.fetchone() if not article: return [TextContent(type="text", text=f"Article not found: {slug}")] # Get column names columns = [desc[0] for desc in cur.description] article_dict = dict(zip(columns, article)) # Convert datetime objects to strings for key, value in article_dict.items(): if isinstance(value, datetime): article_dict[key] = str(value) cur.close() conn.close() return [TextContent( type="text", text=json.dumps(article_dict, indent=2, ensure_ascii=False) )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def get_broken_articles(self, limit: int) -> list[TextContent]: """Find articles with quality issues""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() # Find articles with repeated text patterns or very short content cur.execute(""" SELECT slug, title_burmese, LENGTH(content_burmese) as content_length FROM articles WHERE status = 'published' AND ( LENGTH(content_burmese) < 500 OR content_burmese LIKE '%repetition%' OR content_burmese ~ '(.{50,})(\\1){2,}' ) ORDER BY published_at DESC LIMIT %s """, (limit,)) broken = cur.fetchall() cur.close() conn.close() result = [{ "slug": b[0], "title": b[1], "content_length": b[2] } for b in broken] return [TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def update_article(self, slug: str, updates: dict) -> list[TextContent]: """Update article fields""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() # Build UPDATE query dynamically set_parts = [] values = [] for key, value in updates.items(): set_parts.append(f"{key} = %s") values.append(value) values.append(slug) query = f""" UPDATE articles SET {', '.join(set_parts)}, updated_at = NOW() WHERE slug = %s RETURNING id, title_burmese """ cur.execute(query, values) result = cur.fetchone() if not result: return [TextContent(type="text", text=f"Article not found: {slug}")] conn.commit() cur.close() conn.close() return [TextContent( type="text", text=f"✅ Updated article: {result[1]} (ID: {result[0]})" )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def delete_article(self, slug: str, hard_delete: bool) -> list[TextContent]: """Delete or archive article""" try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() if hard_delete: cur.execute("DELETE FROM articles WHERE slug = %s RETURNING id", (slug,)) action = "deleted" else: cur.execute(""" UPDATE articles SET status = 'archived' WHERE slug = %s RETURNING id """, (slug,)) action = "archived" result = cur.fetchone() if not result: return [TextContent(type="text", text=f"Article not found: {slug}")] conn.commit() cur.close() conn.close() return [TextContent(type="text", text=f"✅ Article {action}: {slug}")] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def check_deployment_status(self) -> list[TextContent]: """Check Coolify deployment status""" try: if not self.coolify_config.get('token'): return [TextContent(type="text", text="Coolify API token not configured")] headers = {'Authorization': f"Bearer {self.coolify_config['token']}"} url = f"{self.coolify_config['url']}/api/v1/applications/{self.coolify_config['app_uuid']}" response = requests.get(url, headers=headers) data = response.json() status = { "name": data.get('name'), "status": data.get('status'), "git_branch": data.get('git_branch'), "last_deployment": data.get('last_deployment_at'), "url": data.get('fqdn') } return [TextContent( type="text", text=json.dumps(status, indent=2, ensure_ascii=False) )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def trigger_deployment(self, force: bool) -> list[TextContent]: """Trigger deployment""" try: if not self.coolify_config.get('token'): return [TextContent(type="text", text="Coolify API token not configured")] headers = {'Authorization': f"Bearer {self.coolify_config['token']}"} url = f"{self.coolify_config['url']}/api/v1/applications/{self.coolify_config['app_uuid']}/deploy" data = {"force": force} response = requests.post(url, headers=headers, json=data) return [TextContent(type="text", text=f"✅ Deployment triggered: {response.status_code}")] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def get_deployment_logs(self, lines: int) -> list[TextContent]: """Get deployment logs""" return [TextContent(type="text", text="Deployment logs feature coming soon")] async def run_pipeline(self, target_articles: int) -> list[TextContent]: """Run content pipeline""" try: # Execute the pipeline script import subprocess result = subprocess.run( ['python3', '/home/ubuntu/.openclaw/workspace/burmddit/backend/run_pipeline.py'], capture_output=True, text=True, timeout=300 ) return [TextContent( type="text", text=f"Pipeline execution:\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}" )] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def run(self): """Run the MCP server""" async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="burmddit-mcp", server_version="1.0.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) def main(): """Entry point""" server = BurmdditMCPServer() asyncio.run(server.run()) if __name__ == "__main__": main()