Fix: Add category pages + MCP server for autonomous management

- Created /app/category/[slug]/page.tsx - category navigation now works
- Built Burmddit MCP Server with 10 tools:
  * Site stats, article queries, content management
  * Deployment control, quality checks, pipeline triggers
- Added MCP setup guide and config
- Categories fully functional: ai-news, tutorials, tips-tricks, upcoming
- Modo can now manage Burmddit autonomously via MCP
This commit is contained in:
Zeya Phyo
2026-02-19 15:40:26 +00:00
parent 310fff9d55
commit 785910b81d
10 changed files with 1898 additions and 0 deletions

View File

@@ -0,0 +1,597 @@
#!/usr/bin/env python3
"""
Burmddit MCP Server
Model Context Protocol server for autonomous Burmddit management
Exposes tools for:
- Database queries (articles, categories, analytics)
- Content management (publish, update, delete)
- Deployment control (Coolify API)
- Performance monitoring
"""
import asyncio
import json
import sys
from typing import Any, Optional
import psycopg2
import requests
from datetime import datetime, timedelta
# MCP SDK imports (to be installed: pip install mcp)
try:
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
except ImportError:
print("ERROR: MCP SDK not installed. Run: pip install mcp", file=sys.stderr)
sys.exit(1)
class BurmdditMCPServer:
"""MCP Server for Burmddit autonomous management"""
def __init__(self):
self.server = Server("burmddit-mcp")
self.db_config = self.load_db_config()
self.coolify_config = self.load_coolify_config()
# Register handlers
self._register_handlers()
def load_db_config(self) -> dict:
"""Load database configuration"""
try:
with open('/home/ubuntu/.openclaw/workspace/.credentials', 'r') as f:
for line in f:
if line.startswith('DATABASE_URL='):
return {'url': line.split('=', 1)[1].strip()}
except FileNotFoundError:
pass
# Fallback to environment or default
return {
'host': 'localhost',
'database': 'burmddit',
'user': 'burmddit_user',
'password': 'burmddit_password'
}
def load_coolify_config(self) -> dict:
"""Load Coolify API configuration"""
try:
with open('/home/ubuntu/.openclaw/workspace/.credentials', 'r') as f:
for line in f:
if line.startswith('COOLIFY_TOKEN='):
return {
'token': line.split('=', 1)[1].strip(),
'url': 'https://coolify.qikbite.asia',
'app_uuid': 'ocoock0oskc4cs00o0koo0c8'
}
except FileNotFoundError:
pass
return {}
def _register_handlers(self):
"""Register all MCP handlers"""
@self.server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="get_site_stats",
description="Get Burmddit site statistics (articles, views, categories)",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "number",
"description": "Number of days to look back (default: 7)"
}
}
}
),
Tool(
name="get_articles",
description="Query articles by category, tag, or date range",
inputSchema={
"type": "object",
"properties": {
"category": {"type": "string"},
"tag": {"type": "string"},
"status": {"type": "string", "enum": ["draft", "published", "archived"]},
"limit": {"type": "number", "default": 20}
}
}
),
Tool(
name="get_article_by_slug",
description="Get full article details by slug",
inputSchema={
"type": "object",
"properties": {
"slug": {"type": "string", "description": "Article slug"}
},
"required": ["slug"]
}
),
Tool(
name="update_article",
description="Update article fields (title, content, status, etc.)",
inputSchema={
"type": "object",
"properties": {
"slug": {"type": "string"},
"updates": {
"type": "object",
"description": "Fields to update (e.g. {'status': 'published'})"
}
},
"required": ["slug", "updates"]
}
),
Tool(
name="delete_article",
description="Delete or archive an article",
inputSchema={
"type": "object",
"properties": {
"slug": {"type": "string"},
"hard_delete": {"type": "boolean", "default": False}
},
"required": ["slug"]
}
),
Tool(
name="get_broken_articles",
description="Find articles with translation errors or quality issues",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "number", "default": 50}
}
}
),
Tool(
name="check_deployment_status",
description="Check Coolify deployment status for Burmddit",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="trigger_deployment",
description="Trigger a new deployment via Coolify",
inputSchema={
"type": "object",
"properties": {
"force": {"type": "boolean", "default": False}
}
}
),
Tool(
name="get_deployment_logs",
description="Fetch recent deployment logs",
inputSchema={
"type": "object",
"properties": {
"lines": {"type": "number", "default": 100}
}
}
),
Tool(
name="run_pipeline",
description="Manually trigger the content pipeline (scrape, compile, translate, publish)",
inputSchema={
"type": "object",
"properties": {
"target_articles": {"type": "number", "default": 30}
}
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool by name"""
if name == "get_site_stats":
return await self.get_site_stats(arguments.get("days", 7))
elif name == "get_articles":
return await self.get_articles(**arguments)
elif name == "get_article_by_slug":
return await self.get_article_by_slug(arguments["slug"])
elif name == "update_article":
return await self.update_article(arguments["slug"], arguments["updates"])
elif name == "delete_article":
return await self.delete_article(arguments["slug"], arguments.get("hard_delete", False))
elif name == "get_broken_articles":
return await self.get_broken_articles(arguments.get("limit", 50))
elif name == "check_deployment_status":
return await self.check_deployment_status()
elif name == "trigger_deployment":
return await self.trigger_deployment(arguments.get("force", False))
elif name == "get_deployment_logs":
return await self.get_deployment_logs(arguments.get("lines", 100))
elif name == "run_pipeline":
return await self.run_pipeline(arguments.get("target_articles", 30))
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
# Tool implementations
async def get_site_stats(self, days: int) -> list[TextContent]:
"""Get site statistics"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
# Total articles
cur.execute("SELECT COUNT(*) FROM articles WHERE status = 'published'")
total_articles = cur.fetchone()[0]
# Recent articles
cur.execute("""
SELECT COUNT(*) FROM articles
WHERE status = 'published'
AND published_at > NOW() - INTERVAL '%s days'
""", (days,))
recent_articles = cur.fetchone()[0]
# Total views
cur.execute("SELECT SUM(view_count) FROM articles WHERE status = 'published'")
total_views = cur.fetchone()[0] or 0
# Categories breakdown
cur.execute("""
SELECT c.name_burmese, COUNT(a.id) as count
FROM categories c
LEFT JOIN articles a ON c.id = a.category_id AND a.status = 'published'
GROUP BY c.id, c.name_burmese
ORDER BY count DESC
""")
categories = cur.fetchall()
cur.close()
conn.close()
stats = {
"total_articles": total_articles,
"recent_articles": recent_articles,
"recent_days": days,
"total_views": total_views,
"avg_views_per_article": round(total_views / total_articles, 1) if total_articles > 0 else 0,
"categories": [{"name": c[0], "count": c[1]} for c in categories]
}
return [TextContent(
type="text",
text=json.dumps(stats, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def get_articles(self, category: Optional[str] = None,
tag: Optional[str] = None,
status: Optional[str] = "published",
limit: int = 20) -> list[TextContent]:
"""Query articles"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
query = """
SELECT a.slug, a.title_burmese, a.published_at, a.view_count, a.status,
c.name_burmese as category
FROM articles a
LEFT JOIN categories c ON a.category_id = c.id
WHERE 1=1
"""
params = []
if status:
query += " AND a.status = %s"
params.append(status)
if category:
query += " AND c.slug = %s"
params.append(category)
if tag:
query += """ AND a.id IN (
SELECT article_id FROM article_tags at
JOIN tags t ON at.tag_id = t.id
WHERE t.slug = %s
)"""
params.append(tag)
query += " ORDER BY a.published_at DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
articles = cur.fetchall()
cur.close()
conn.close()
result = []
for a in articles:
result.append({
"slug": a[0],
"title": a[1],
"published_at": str(a[2]),
"view_count": a[3],
"status": a[4],
"category": a[5]
})
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def get_article_by_slug(self, slug: str) -> list[TextContent]:
"""Get full article details"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
cur.execute("""
SELECT a.*, c.name_burmese as category
FROM articles a
LEFT JOIN categories c ON a.category_id = c.id
WHERE a.slug = %s
""", (slug,))
article = cur.fetchone()
if not article:
return [TextContent(type="text", text=f"Article not found: {slug}")]
# Get column names
columns = [desc[0] for desc in cur.description]
article_dict = dict(zip(columns, article))
# Convert datetime objects to strings
for key, value in article_dict.items():
if isinstance(value, datetime):
article_dict[key] = str(value)
cur.close()
conn.close()
return [TextContent(
type="text",
text=json.dumps(article_dict, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def get_broken_articles(self, limit: int) -> list[TextContent]:
"""Find articles with quality issues"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
# Find articles with repeated text patterns or very short content
cur.execute("""
SELECT slug, title_burmese, LENGTH(content_burmese) as content_length
FROM articles
WHERE status = 'published'
AND (
LENGTH(content_burmese) < 500
OR content_burmese LIKE '%repetition%'
OR content_burmese ~ '(.{50,})(\\1){2,}'
)
ORDER BY published_at DESC
LIMIT %s
""", (limit,))
broken = cur.fetchall()
cur.close()
conn.close()
result = [{
"slug": b[0],
"title": b[1],
"content_length": b[2]
} for b in broken]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def update_article(self, slug: str, updates: dict) -> list[TextContent]:
"""Update article fields"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
# Build UPDATE query dynamically
set_parts = []
values = []
for key, value in updates.items():
set_parts.append(f"{key} = %s")
values.append(value)
values.append(slug)
query = f"""
UPDATE articles
SET {', '.join(set_parts)}, updated_at = NOW()
WHERE slug = %s
RETURNING id, title_burmese
"""
cur.execute(query, values)
result = cur.fetchone()
if not result:
return [TextContent(type="text", text=f"Article not found: {slug}")]
conn.commit()
cur.close()
conn.close()
return [TextContent(
type="text",
text=f"✅ Updated article: {result[1]} (ID: {result[0]})"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def delete_article(self, slug: str, hard_delete: bool) -> list[TextContent]:
"""Delete or archive article"""
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
if hard_delete:
cur.execute("DELETE FROM articles WHERE slug = %s RETURNING id", (slug,))
action = "deleted"
else:
cur.execute("""
UPDATE articles SET status = 'archived'
WHERE slug = %s RETURNING id
""", (slug,))
action = "archived"
result = cur.fetchone()
if not result:
return [TextContent(type="text", text=f"Article not found: {slug}")]
conn.commit()
cur.close()
conn.close()
return [TextContent(type="text", text=f"✅ Article {action}: {slug}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def check_deployment_status(self) -> list[TextContent]:
"""Check Coolify deployment status"""
try:
if not self.coolify_config.get('token'):
return [TextContent(type="text", text="Coolify API token not configured")]
headers = {'Authorization': f"Bearer {self.coolify_config['token']}"}
url = f"{self.coolify_config['url']}/api/v1/applications/{self.coolify_config['app_uuid']}"
response = requests.get(url, headers=headers)
data = response.json()
status = {
"name": data.get('name'),
"status": data.get('status'),
"git_branch": data.get('git_branch'),
"last_deployment": data.get('last_deployment_at'),
"url": data.get('fqdn')
}
return [TextContent(
type="text",
text=json.dumps(status, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def trigger_deployment(self, force: bool) -> list[TextContent]:
"""Trigger deployment"""
try:
if not self.coolify_config.get('token'):
return [TextContent(type="text", text="Coolify API token not configured")]
headers = {'Authorization': f"Bearer {self.coolify_config['token']}"}
url = f"{self.coolify_config['url']}/api/v1/applications/{self.coolify_config['app_uuid']}/deploy"
data = {"force": force}
response = requests.post(url, headers=headers, json=data)
return [TextContent(type="text", text=f"✅ Deployment triggered: {response.status_code}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def get_deployment_logs(self, lines: int) -> list[TextContent]:
"""Get deployment logs"""
return [TextContent(type="text", text="Deployment logs feature coming soon")]
async def run_pipeline(self, target_articles: int) -> list[TextContent]:
"""Run content pipeline"""
try:
# Execute the pipeline script
import subprocess
result = subprocess.run(
['python3', '/home/ubuntu/.openclaw/workspace/burmddit/backend/run_pipeline.py'],
capture_output=True,
text=True,
timeout=300
)
return [TextContent(
type="text",
text=f"Pipeline execution:\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def run(self):
"""Run the MCP server"""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="burmddit-mcp",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
def main():
"""Entry point"""
server = BurmdditMCPServer()
asyncio.run(server.run())
if __name__ == "__main__":
main()