Initial Burmddit deployment - AI news aggregator in Burmese

This commit is contained in:
Zeya Phyo
2026-02-19 02:52:58 +00:00
commit dddb86ea94
27 changed files with 5039 additions and 0 deletions

319
backend/compiler.py Normal file
View File

@@ -0,0 +1,319 @@
# Article compilation module - Groups and merges related articles
from typing import List, Dict, Tuple
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from loguru import logger
import anthropic
import config
import database
import time
class ArticleCompiler:
def __init__(self):
self.client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
def compile_articles(self, num_articles: int = None) -> List[Dict]:
"""Main compilation pipeline"""
if num_articles is None:
num_articles = config.PIPELINE['articles_per_day']
# Get unprocessed articles from database
raw_articles = database.get_unprocessed_articles(limit=100)
if not raw_articles:
logger.warning("No unprocessed articles found")
return []
logger.info(f"Processing {len(raw_articles)} raw articles")
# Cluster similar articles
clusters = self.cluster_articles(raw_articles, num_clusters=num_articles)
# Compile each cluster into one comprehensive article
compiled_articles = []
for i, cluster in enumerate(clusters):
try:
logger.info(f"Compiling cluster {i+1}/{len(clusters)} with {len(cluster)} articles")
compiled = self.compile_cluster(cluster)
if compiled:
compiled_articles.append(compiled)
time.sleep(1) # Rate limiting
except Exception as e:
logger.error(f"Error compiling cluster {i+1}: {e}")
continue
logger.info(f"Compiled {len(compiled_articles)} articles")
return compiled_articles
def cluster_articles(self, articles: List[Dict], num_clusters: int) -> List[List[Dict]]:
"""Cluster articles by similarity"""
if len(articles) <= num_clusters:
return [[article] for article in articles]
# Extract text for vectorization
texts = [
f"{article['title']} {article['content'][:500]}"
for article in articles
]
# TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
tfidf_matrix = vectorizer.fit_transform(texts)
# Calculate similarity matrix
similarity_matrix = cosine_similarity(tfidf_matrix)
# Simple clustering: greedy approach
# Find most similar articles and group them
clusters = []
used_indices = set()
for i in range(len(articles)):
if i in used_indices:
continue
# Find similar articles (above threshold)
similar_indices = []
for j in range(len(articles)):
if j != i and j not in used_indices:
if similarity_matrix[i][j] >= config.PIPELINE['clustering_threshold']:
similar_indices.append(j)
# Create cluster
cluster = [articles[i]]
for idx in similar_indices[:config.PIPELINE['sources_per_article']-1]: # Limit cluster size
cluster.append(articles[idx])
used_indices.add(idx)
clusters.append(cluster)
used_indices.add(i)
if len(clusters) >= num_clusters:
break
# If we don't have enough clusters, add remaining articles individually
while len(clusters) < num_clusters and len(used_indices) < len(articles):
for i, article in enumerate(articles):
if i not in used_indices:
clusters.append([article])
used_indices.add(i)
break
logger.info(f"Created {len(clusters)} clusters from {len(articles)} articles")
return clusters
def compile_cluster(self, cluster: List[Dict]) -> Optional[Dict]:
"""Compile multiple articles into one comprehensive piece"""
if not cluster:
return None
# If only one article, use it directly (with some enhancement)
if len(cluster) == 1:
return self.enhance_single_article(cluster[0])
# Prepare source summaries
sources_text = ""
for i, article in enumerate(cluster, 1):
sources_text += f"\n\n## Source {i}: {article['title']}\n"
sources_text += f"URL: {article['url']}\n"
sources_text += f"Content: {article['content'][:1000]}...\n" # First 1000 chars
# Use Claude to compile articles
prompt = f"""You are a friendly tech blogger writing for everyday people who are curious about AI but not tech experts. Compile these {len(cluster)} related AI articles into ONE easy-to-read, engaging article.
{sources_text}
🎯 CRITICAL REQUIREMENTS:
WRITING STYLE:
1. Write in SIMPLE, CASUAL language - like explaining to a friend
2. Use SHORT SENTENCES - easy to scan on mobile
3. AVOID JARGON - or explain it simply in parentheses
4. Use REAL-WORLD EXAMPLES and ANALOGIES
5. Make it FUN and ENGAGING - not boring or academic
6. Use active voice, not passive
7. Address readers directly ("you", "we")
CONTENT STRUCTURE:
1. Catchy, clear title (no clickbait, but interesting)
2. Hook opening: "Why should I care about this?"
3. Clear sections with descriptive subheadings
4. Key facts highlighted with bullet points
5. "What this means for you" sections
6. Brief, satisfying conclusion
EXAMPLES TO FOLLOW:
❌ Bad: "The implementation of advanced neural architectures facilitates..."
✅ Good: "New AI systems use smarter brain-like networks to..."
❌ Bad: "Anthropomorphic large language models demonstrate emergent capabilities..."
✅ Good: "ChatGPT-like AI is learning new tricks on its own..."
TARGET: Myanmar general public (will be translated to Burmese)
LENGTH: {config.PIPELINE['min_article_length']}-{config.PIPELINE['max_article_length']} words (shorter is better!)
Format the output as:
TITLE: [Engaging, clear title]
EXCERPT: [2-sentence casual summary that makes people want to read]
CONTENT:
[Your easy-to-read article with markdown formatting]
SOURCES: [List of original URLs]
"""
try:
message = self.client.messages.create(
model=config.TRANSLATION['model'],
max_tokens=config.TRANSLATION['max_tokens'],
temperature=0.5, # Slightly higher for creative writing
messages=[{"role": "user", "content": prompt}]
)
response = message.content[0].text
# Parse response
compiled = self.parse_compiled_article(response, cluster)
return compiled
except Exception as e:
logger.error(f"Error compiling with Claude: {e}")
return None
def enhance_single_article(self, article: Dict) -> Dict:
"""Enhance a single article (format, clean up, add structure)"""
return {
'title': article['title'],
'content': article['content'],
'excerpt': article['content'][:200] + '...',
'source_articles': [
{
'url': article['url'],
'title': article['title'],
'author': article['author']
}
],
'category_hint': article.get('category_hint'),
'featured_image': article.get('top_image')
}
def parse_compiled_article(self, response: str, cluster: List[Dict]) -> Dict:
"""Parse Claude's response into structured article"""
lines = response.strip().split('\n')
title = ""
excerpt = ""
content = ""
current_section = None
for line in lines:
if line.startswith('TITLE:'):
title = line.replace('TITLE:', '').strip()
current_section = 'title'
elif line.startswith('EXCERPT:'):
excerpt = line.replace('EXCERPT:', '').strip()
current_section = 'excerpt'
elif line.startswith('CONTENT:'):
current_section = 'content'
elif line.startswith('SOURCES:'):
current_section = 'sources'
elif current_section == 'content':
content += line + '\n'
# Fallback if parsing fails
if not title:
title = cluster[0]['title']
if not excerpt:
excerpt = content[:200] + '...' if content else cluster[0]['content'][:200] + '...'
if not content:
content = response
# Build source articles list
source_articles = [
{
'url': article['url'],
'title': article['title'],
'author': article['author']
}
for article in cluster
]
# Collect all images from cluster
all_images = []
for article in cluster:
if article.get('images'):
all_images.extend(article['images'])
elif article.get('top_image'):
all_images.append(article['top_image'])
# Remove duplicates, keep first 5
unique_images = []
for img in all_images:
if img and img not in unique_images:
unique_images.append(img)
if len(unique_images) >= 5:
break
# Collect all videos from cluster
all_videos = []
for article in cluster:
if article.get('videos'):
all_videos.extend(article['videos'])
# Remove duplicates
unique_videos = list(set([v for v in all_videos if v]))[:3] # Max 3 videos
# Detect category
category_hint = cluster[0].get('category_hint') or database.detect_category(title, content)
return {
'title': title.strip(),
'content': content.strip(),
'excerpt': excerpt.strip(),
'source_articles': source_articles,
'category_hint': category_hint,
'featured_image': unique_images[0] if unique_images else None,
'images': unique_images, # 🔥 All images
'videos': unique_videos # 🔥 All videos
}
def run_compiler():
"""Main compiler execution"""
logger.info("Starting compiler...")
start_time = time.time()
try:
compiler = ArticleCompiler()
compiled_articles = compiler.compile_articles()
duration = int(time.time() - start_time)
database.log_pipeline_stage(
stage='compile',
status='completed',
articles_processed=len(compiled_articles),
duration=duration
)
logger.info(f"Compiler completed in {duration}s. Articles compiled: {len(compiled_articles)}")
return compiled_articles
except Exception as e:
logger.error(f"Compiler failed: {e}")
database.log_pipeline_stage(
stage='compile',
status='failed',
error_message=str(e)
)
return []
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
compiled = run_compiler()
print(f"Compiled {len(compiled)} articles")

142
backend/config.py Normal file
View File

@@ -0,0 +1,142 @@
# Burmddit Configuration
import os
from dotenv import load_dotenv
load_dotenv()
# Database
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://localhost/burmddit')
# APIs
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') # Optional, for embeddings
# Scraping sources - 🔥 EXPANDED for more content!
SOURCES = {
'medium': {
'enabled': True,
'tags': ['artificial-intelligence', 'machine-learning', 'chatgpt', 'ai-tools',
'generative-ai', 'deeplearning', 'prompt-engineering', 'ai-news'],
'url_pattern': 'https://medium.com/tag/{tag}/latest',
'articles_per_tag': 15 # Increased from 10
},
'techcrunch': {
'enabled': True,
'category': 'artificial-intelligence',
'url': 'https://techcrunch.com/category/artificial-intelligence/feed/',
'articles_limit': 30 # Increased from 20
},
'venturebeat': {
'enabled': True,
'url': 'https://venturebeat.com/category/ai/feed/',
'articles_limit': 25 # Increased from 15
},
'mit_tech_review': {
'enabled': True,
'url': 'https://www.technologyreview.com/feed/',
'filter_ai': True,
'articles_limit': 20 # Increased from 10
},
'theverge': {
'enabled': True,
'url': 'https://www.theverge.com/ai-artificial-intelligence/rss/index.xml',
'articles_limit': 20
},
'wired_ai': {
'enabled': True,
'url': 'https://www.wired.com/feed/tag/ai/latest/rss',
'articles_limit': 15
},
'arstechnica': {
'enabled': True,
'url': 'https://arstechnica.com/tag/artificial-intelligence/feed/',
'articles_limit': 15
},
'hackernews': {
'enabled': True,
'url': 'https://hnrss.org/newest?q=AI+OR+ChatGPT+OR+OpenAI',
'articles_limit': 30
}
}
# Content pipeline settings
PIPELINE = {
'articles_per_day': 30, # 🔥 INCREASED! More content = more traffic
'min_article_length': 600, # Shorter, easier to read
'max_article_length': 1000, # Keep it concise
'sources_per_article': 3, # How many articles to compile into one
'clustering_threshold': 0.6, # Lower threshold = more diverse topics
'research_time_minutes': 90, # Spend 1.5 hours researching daily
}
# Category mapping (keyword-based)
CATEGORY_KEYWORDS = {
'AI News': ['news', 'announcement', 'report', 'industry', 'company', 'funding', 'release'],
'AI Tutorials': ['how to', 'tutorial', 'guide', 'step by step', 'learn', 'beginners', 'course'],
'Tips & Tricks': ['tips', 'tricks', 'hacks', 'productivity', 'best practices', 'optimize', 'improve'],
'Upcoming Releases': ['upcoming', 'soon', 'preview', 'roadmap', 'future', 'expected', 'announce']
}
# Translation settings
TRANSLATION = {
'model': 'claude-3-5-sonnet-20241022',
'max_tokens': 4000,
'temperature': 0.5, # Higher = more natural, casual translation
'preserve_terms': [ # Technical terms to keep in English
'AI', 'ChatGPT', 'GPT', 'Claude', 'API', 'ML', 'NLP',
'LLM', 'Transformer', 'Neural Network', 'Python', 'GitHub',
'DeepSeek', 'OpenAI', 'Anthropic', 'Google', 'Meta'
],
'style': 'casual', # Casual, conversational tone
'target_audience': 'general', # Not just tech experts
'simplify_jargon': True, # Explain technical terms simply
}
# Publishing settings
PUBLISHING = {
'status_default': 'published', # or 'draft' for manual review
'publish_interval_hours': 1, # Space out publications
'featured_image_required': False,
'auto_generate_excerpt': True,
'excerpt_length': 200, # characters
'require_featured_image': True, # Every article needs an image
'extract_videos': True, # Extract YouTube/video embeds
'max_images_per_article': 5, # Include multiple images
'image_fallback': 'generate' # If no image, generate AI image
}
# SEO settings
SEO = {
'meta_description_length': 160,
'keywords_per_article': 10,
'auto_generate_slug': True
}
# Burmese font settings
BURMESE = {
'font_family': 'Pyidaungsu',
'fallback_fonts': ['Noto Sans Myanmar', 'Myanmar Text'],
'unicode_range': 'U+1000-109F' # Myanmar Unicode range
}
# Admin
ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD', 'change_me_in_production')
# Logging
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = 'burmddit_pipeline.log'
# Rate limiting
RATE_LIMITS = {
'requests_per_minute': 30,
'anthropic_rpm': 50,
'delay_between_requests': 2 # seconds
}
# Retry settings
RETRY = {
'max_attempts': 3,
'backoff_factor': 2,
'timeout': 30 # seconds
}

257
backend/database.py Normal file
View File

@@ -0,0 +1,257 @@
# Database connection and utilities
import psycopg2
from psycopg2.extras import RealDictCursor, Json
from contextlib import contextmanager
from typing import List, Dict, Optional, Tuple
from loguru import logger
import config
@contextmanager
def get_db_connection():
"""Context manager for database connections"""
conn = None
try:
conn = psycopg2.connect(config.DATABASE_URL)
yield conn
conn.commit()
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
if conn:
conn.close()
def execute_query(query: str, params: tuple = None, fetch=False):
"""Execute a query and optionally fetch results"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
if fetch:
return cur.fetchall()
return cur.rowcount
# Raw articles functions
def insert_raw_article(url: str, title: str, content: str, author: str,
published_date, source: str, category_hint: str = None):
"""Insert a scraped article into raw_articles table"""
query = """
INSERT INTO raw_articles (url, title, content, author, published_date, source, category_hint)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO NOTHING
RETURNING id
"""
try:
result = execute_query(
query,
(url, title, content, author, published_date, source, category_hint),
fetch=True
)
return result[0]['id'] if result else None
except Exception as e:
logger.error(f"Error inserting raw article: {e}")
return None
def get_unprocessed_articles(limit: int = 100) -> List[Dict]:
"""Get unprocessed raw articles"""
query = """
SELECT * FROM raw_articles
WHERE processed = FALSE
ORDER BY published_date DESC
LIMIT %s
"""
return execute_query(query, (limit,), fetch=True)
def mark_article_processed(article_id: int, compiled_into: int = None):
"""Mark raw article as processed"""
query = """
UPDATE raw_articles
SET processed = TRUE, compiled_into = %s
WHERE id = %s
"""
execute_query(query, (compiled_into, article_id))
# Categories functions
def get_all_categories() -> List[Dict]:
"""Get all categories"""
query = "SELECT * FROM categories ORDER BY id"
return execute_query(query, fetch=True)
def get_category_by_slug(slug: str) -> Optional[Dict]:
"""Get category by slug"""
query = "SELECT * FROM categories WHERE slug = %s"
result = execute_query(query, (slug,), fetch=True)
return result[0] if result else None
def detect_category(title: str, content: str) -> int:
"""Detect article category based on keywords"""
text = (title + ' ' + content).lower()
scores = {}
for category, keywords in config.CATEGORY_KEYWORDS.items():
score = sum(1 for keyword in keywords if keyword in text)
scores[category] = score
# Get category with highest score
best_category = max(scores, key=scores.get)
# Default to AI News if no clear match
if scores[best_category] == 0:
best_category = 'AI News'
# Get category ID
category = get_category_by_slug(best_category.lower().replace(' & ', '-').replace(' ', '-'))
return category['id'] if category else 1 # Default to first category
# Articles functions
def insert_article(title: str, title_burmese: str, slug: str,
content: str, content_burmese: str,
excerpt: str, excerpt_burmese: str,
category_id: int, featured_image: str = None,
images: List[str] = None, # 🔥 NEW
videos: List[str] = None, # 🔥 NEW
source_articles: List[Dict] = None,
meta_description: str = None,
meta_keywords: List[str] = None,
reading_time: int = None,
status: str = 'published') -> Optional[int]:
"""Insert a new article"""
query = """
INSERT INTO articles (
title, title_burmese, slug, content, content_burmese,
excerpt, excerpt_burmese, category_id, featured_image,
images, videos,
source_articles, meta_description, meta_keywords,
reading_time, status, published_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s = 'published' THEN CURRENT_TIMESTAMP ELSE NULL END
)
ON CONFLICT (slug) DO NOTHING
RETURNING id
"""
try:
result = execute_query(
query,
(title, title_burmese, slug, content, content_burmese,
excerpt, excerpt_burmese, category_id, featured_image,
images or [], # 🔥 Images array
videos or [], # 🔥 Videos array
Json(source_articles) if source_articles else None,
meta_description, meta_keywords, reading_time, status, status),
fetch=True
)
return result[0]['id'] if result else None
except Exception as e:
logger.error(f"Error inserting article: {e}")
return None
def get_recent_articles(limit: int = 10) -> List[Dict]:
"""Get recently published articles"""
query = """
SELECT * FROM published_articles
LIMIT %s
"""
return execute_query(query, (limit,), fetch=True)
def get_article_by_slug(slug: str) -> Optional[Dict]:
"""Get article by slug"""
query = """
SELECT a.*, c.name as category_name, c.name_burmese as category_name_burmese
FROM articles a
JOIN categories c ON a.category_id = c.id
WHERE a.slug = %s AND a.status = 'published'
"""
result = execute_query(query, (slug,), fetch=True)
return result[0] if result else None
def increment_view_count(slug: str):
"""Increment article view count"""
query = "SELECT increment_view_count(%s)"
execute_query(query, (slug,))
def get_trending_articles(days: int = 7, limit: int = 10) -> List[Dict]:
"""Get trending articles"""
query = "SELECT * FROM get_trending_articles(%s)"
return execute_query(query, (limit,), fetch=True)
def get_articles_by_category(category_slug: str, limit: int = 20) -> List[Dict]:
"""Get articles by category"""
query = """
SELECT * FROM published_articles
WHERE category_slug = %s
LIMIT %s
"""
return execute_query(query, (category_slug, limit), fetch=True)
def search_articles(search_term: str, limit: int = 20) -> List[Dict]:
"""Search articles (Burmese + English)"""
query = """
SELECT
id, title_burmese, slug, excerpt_burmese,
category_name_burmese, published_at
FROM published_articles
WHERE
to_tsvector('simple', title_burmese || ' ' || COALESCE(excerpt_burmese, ''))
@@ plainto_tsquery('simple', %s)
OR title ILIKE %s
ORDER BY published_at DESC
LIMIT %s
"""
search_pattern = f"%{search_term}%"
return execute_query(query, (search_term, search_pattern, limit), fetch=True)
# Pipeline logging
def log_pipeline_stage(stage: str, status: str, articles_processed: int = 0,
error_message: str = None, duration: int = None):
"""Log pipeline execution stage"""
query = """
INSERT INTO pipeline_logs (stage, status, articles_processed, error_message, duration_seconds)
VALUES (%s, %s, %s, %s, %s)
"""
execute_query(query, (stage, status, articles_processed, error_message, duration))
def get_last_pipeline_run() -> Optional[Dict]:
"""Get last pipeline run info"""
query = """
SELECT pipeline_run, COUNT(*) as stages,
SUM(articles_processed) as total_articles
FROM pipeline_logs
WHERE pipeline_run = (SELECT MAX(pipeline_run) FROM pipeline_logs)
GROUP BY pipeline_run
"""
result = execute_query(query, fetch=True)
return result[0] if result else None
# Statistics
def get_site_stats() -> Dict:
"""Get overall site statistics"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
(SELECT COUNT(*) FROM articles WHERE status = 'published') as total_articles,
(SELECT SUM(view_count) FROM articles) as total_views,
(SELECT COUNT(*) FROM subscribers WHERE status = 'active') as subscribers,
(SELECT COUNT(*) FROM raw_articles WHERE scraped_at > CURRENT_DATE) as articles_today
""")
return cur.fetchone()
# Initialize database (run schema.sql)
def initialize_database():
"""Initialize database with schema"""
try:
with open('../database/schema.sql', 'r') as f:
schema = f.read()
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(schema)
logger.info("Database initialized successfully")
return True
except Exception as e:
logger.error(f"Error initializing database: {e}")
return False

142
backend/init_db.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
# Database initialization script
import sys
import os
from loguru import logger
import database
import config
def init_database():
"""Initialize database with schema"""
logger.info("Initializing Burmddit database...")
# Check if DATABASE_URL is set
if not config.DATABASE_URL:
logger.error("DATABASE_URL not set!")
logger.error("Please set it in .env file or environment")
return False
logger.info(f"Connecting to database: {config.DATABASE_URL[:30]}...")
try:
# Read and execute schema
schema_path = os.path.join(os.path.dirname(__file__), '..', 'database', 'schema.sql')
with open(schema_path, 'r') as f:
schema_sql = f.read()
with database.get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(schema_sql)
logger.info("✅ Database schema created successfully!")
# Verify tables exist
with database.get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
tables = cur.fetchall()
logger.info(f"Created {len(tables)} tables:")
for table in tables:
logger.info(f" - {table[0]}")
# Check categories
categories = database.get_all_categories()
logger.info(f"\n{len(categories)} categories created:")
for cat in categories:
logger.info(f" - {cat['name']} ({cat['name_burmese']})")
logger.info("\n🎉 Database initialization complete!")
return True
except FileNotFoundError:
logger.error(f"Schema file not found at: {schema_path}")
return False
except Exception as e:
logger.error(f"Error initializing database: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def reset_database():
"""Reset database (DANGEROUS - deletes all data!)"""
logger.warning("⚠️ RESETTING DATABASE - ALL DATA WILL BE LOST!")
confirm = input("Type 'YES DELETE EVERYTHING' to confirm: ")
if confirm != 'YES DELETE EVERYTHING':
logger.info("Reset cancelled.")
return False
try:
with database.get_db_connection() as conn:
with conn.cursor() as cur:
# Drop all tables
cur.execute("""
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
GRANT ALL ON SCHEMA public TO postgres;
GRANT ALL ON SCHEMA public TO public;
""")
logger.info("✅ Database reset complete")
# Reinitialize
return init_database()
except Exception as e:
logger.error(f"Error resetting database: {e}")
return False
def show_stats():
"""Show database statistics"""
try:
stats = database.get_site_stats()
logger.info("\n📊 DATABASE STATISTICS")
logger.info("=" * 40)
logger.info(f"Total articles: {stats['total_articles']}")
logger.info(f"Total views: {stats['total_views']}")
logger.info(f"Active subscribers: {stats['subscribers']}")
logger.info(f"Articles today: {stats['articles_today']}")
logger.info("=" * 40)
# Get recent articles
recent = database.get_recent_articles(5)
logger.info(f"\n📰 RECENT ARTICLES ({len(recent)}):")
for article in recent:
logger.info(f" - {article['title_burmese'][:50]}...")
return True
except Exception as e:
logger.error(f"Error fetching stats: {e}")
return False
def main():
"""Main CLI"""
import argparse
parser = argparse.ArgumentParser(description='Burmddit Database Management')
parser.add_argument('command', choices=['init', 'reset', 'stats'],
help='Command to execute')
args = parser.parse_args()
if args.command == 'init':
success = init_database()
elif args.command == 'reset':
success = reset_database()
elif args.command == 'stats':
success = show_stats()
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()

199
backend/publisher.py Normal file
View File

@@ -0,0 +1,199 @@
# Publisher module - Publishes translated articles to the website
from typing import List, Dict
from slugify import slugify
from loguru import logger
import database
import config
import time
from datetime import datetime, timedelta
class ArticlePublisher:
def __init__(self):
pass
def publish_articles(self, translated_articles: List[Dict]) -> int:
"""Publish translated articles to the website"""
published_count = 0
for i, article in enumerate(translated_articles):
try:
logger.info(f"Publishing article {i+1}/{len(translated_articles)}: {article['title'][:50]}...")
# Prepare article data
article_data = self.prepare_article_for_publishing(article)
# Insert into database
article_id = database.insert_article(**article_data)
if article_id:
published_count += 1
logger.info(f"✓ Article published successfully (ID: {article_id})")
# Mark raw articles as processed
for source in article.get('source_articles', []):
# This is simplified - in production, track raw_article IDs
pass
else:
logger.warning(f"✗ Article already exists or failed to publish")
except Exception as e:
logger.error(f"Error publishing article {i+1}: {e}")
continue
logger.info(f"Published {published_count}/{len(translated_articles)} articles")
return published_count
def prepare_article_for_publishing(self, article: Dict) -> Dict:
"""Prepare article data for database insertion"""
# Generate slug from Burmese title (romanized) or English title
slug = self.generate_slug(article.get('title_burmese', article['title']))
# Ensure excerpt is generated if missing
excerpt_burmese = article.get('excerpt_burmese') or article['content_burmese'][:200] + '...'
excerpt = article.get('excerpt') or article['content'][:200] + '...'
# Calculate reading time (words per minute)
reading_time = self.calculate_reading_time(article['content_burmese'])
# Detect category
category_id = self.detect_category_id(article)
# Generate meta description
meta_description = excerpt_burmese[:160]
# Generate keywords
meta_keywords = self.extract_keywords(article['title_burmese'] + ' ' + article['content_burmese'])
# Prepare source articles JSONB
source_articles = article.get('source_articles', [])
return {
'title': article['title'],
'title_burmese': article['title_burmese'],
'slug': slug,
'content': article['content'],
'content_burmese': article['content_burmese'],
'excerpt': excerpt,
'excerpt_burmese': excerpt_burmese,
'category_id': category_id,
'featured_image': article.get('featured_image'),
'images': article.get('images', []), # 🔥 Multiple images
'videos': article.get('videos', []), # 🔥 Videos
'source_articles': source_articles,
'meta_description': meta_description,
'meta_keywords': meta_keywords,
'reading_time': reading_time,
'status': config.PUBLISHING['status_default']
}
def generate_slug(self, title: str) -> str:
"""Generate URL-friendly slug"""
# Slugify handles Unicode characters
slug = slugify(title, max_length=100)
# If slug is empty (all non-ASCII), use timestamp
if not slug:
slug = f"article-{int(time.time())}"
# Make unique by adding timestamp if needed
# (Database will handle conflicts with ON CONFLICT DO NOTHING)
return slug
def calculate_reading_time(self, text: str) -> int:
"""Calculate reading time in minutes (Burmese text)"""
# Burmese reading speed: approximately 200-250 characters per minute
# (slower than English due to script complexity)
chars = len(text)
minutes = max(1, round(chars / 225))
return minutes
def detect_category_id(self, article: Dict) -> int:
"""Detect and return category ID"""
# Check if category hint was provided
if article.get('category_hint'):
category_slug = article['category_hint'].lower().replace(' & ', '-').replace(' ', '-')
category = database.get_category_by_slug(category_slug)
if category:
return category['id']
# Fall back to content-based detection
return database.detect_category(
article['title'] + ' ' + article.get('title_burmese', ''),
article['content'][:500]
)
def extract_keywords(self, text: str, limit: int = 10) -> List[str]:
"""Extract keywords from text"""
# Simple keyword extraction (can be improved with NLP)
# For now, use common AI terms
keywords = [
'AI', 'ChatGPT', 'GPT', 'OpenAI', 'Anthropic', 'Claude',
'Machine Learning', 'Deep Learning', 'Neural Network',
'LLM', 'Transformer', 'NLP', 'Computer Vision',
'Automation', 'Generative AI'
]
# Find which keywords appear in the text
text_lower = text.lower()
found_keywords = []
for keyword in keywords:
if keyword.lower() in text_lower:
found_keywords.append(keyword)
return found_keywords[:limit]
def schedule_publications(self, translated_articles: List[Dict]) -> int:
"""Schedule articles for staggered publication (future enhancement)"""
# For now, publish all immediately
# In future: use PUBLISH_AT timestamp to space out publications
return self.publish_articles(translated_articles)
def run_publisher(translated_articles: List[Dict]) -> int:
"""Main publisher execution"""
logger.info(f"Starting publisher for {len(translated_articles)} articles...")
start_time = time.time()
try:
publisher = ArticlePublisher()
published_count = publisher.publish_articles(translated_articles)
duration = int(time.time() - start_time)
database.log_pipeline_stage(
stage='publish',
status='completed',
articles_processed=published_count,
duration=duration
)
logger.info(f"Publisher completed in {duration}s. Articles published: {published_count}")
return published_count
except Exception as e:
logger.error(f"Publisher failed: {e}")
database.log_pipeline_stage(
stage='publish',
status='failed',
error_message=str(e)
)
return 0
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
# Test with sample translated article
test_article = {
'title': 'OpenAI Releases GPT-5',
'title_burmese': 'OpenAI က GPT-5 ကို ထုတ်ပြန်လိုက်ပြီ',
'content': 'Full English content...',
'content_burmese': 'OpenAI သည် ယနေ့ GPT-5 ကို တရားဝင် ထုတ်ပြန်လိုက်ပြီ ဖြစ်ပါသည်။...',
'excerpt': 'OpenAI announces GPT-5...',
'excerpt_burmese': 'OpenAI က GPT-5 ကို ကြေညာလိုက်ပါပြီ...',
'source_articles': [{'url': 'https://example.com', 'title': 'Test', 'author': 'Test'}]
}
count = run_publisher([test_article])
print(f"Published: {count}")

44
backend/requirements.txt Normal file
View File

@@ -0,0 +1,44 @@
# Burmddit Backend Dependencies
# Web scraping
beautifulsoup4==4.12.3
requests==2.31.0
scrapy==2.11.0
feedparser==6.0.11
newspaper3k==0.2.8
# Database
psycopg2-binary==2.9.9
sqlalchemy==2.0.25
# AI & NLP
anthropic==0.18.1
openai==1.12.0
sentence-transformers==2.3.1
scikit-learn==1.4.0
# Text processing
python-slugify==8.0.2
markdown==3.5.2
bleach==6.1.0
# Utilities
python-dotenv==1.0.1
python-dateutil==2.8.2
pytz==2024.1
pyyaml==6.0.1
# Scheduling
schedule==1.2.1
apscheduler==3.10.4
# API & Server (optional, for admin dashboard)
fastapi==0.109.2
uvicorn==0.27.1
pydantic==2.6.1
# Logging & Monitoring
loguru==0.7.2
# Image processing (for featured images)
pillow==10.2.0

160
backend/run_pipeline.py Normal file
View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
# Main pipeline orchestrator - Runs entire content generation pipeline
import sys
import time
from datetime import datetime
from loguru import logger
import config
# Import pipeline stages
from scraper import run_scraper
from compiler import run_compiler
from translator import run_translator
from publisher import run_publisher
import database
# Configure logging
logger.remove() # Remove default handler
logger.add(sys.stderr, level=config.LOG_LEVEL)
logger.add(config.LOG_FILE, rotation="1 day", retention="7 days", level="INFO")
class Pipeline:
def __init__(self):
self.start_time = None
self.stats = {
'scraped': 0,
'compiled': 0,
'translated': 0,
'published': 0
}
def run(self):
"""Execute full pipeline"""
self.start_time = time.time()
logger.info("="*60)
logger.info(f"🚀 Starting Burmddit Content Pipeline - {datetime.now()}")
logger.info("="*60)
try:
# Stage 1: Scrape
logger.info("\n📥 STAGE 1: SCRAPING")
logger.info("-" * 40)
scraped_count = run_scraper()
self.stats['scraped'] = scraped_count
if scraped_count == 0:
logger.warning("⚠️ No articles scraped. Exiting pipeline.")
return self.finish()
logger.info(f"✅ Scraped {scraped_count} articles")
# Stage 2: Compile
logger.info("\n🔨 STAGE 2: COMPILING")
logger.info("-" * 40)
compiled_articles = run_compiler()
self.stats['compiled'] = len(compiled_articles)
if not compiled_articles:
logger.warning("⚠️ No articles compiled. Exiting pipeline.")
return self.finish()
logger.info(f"✅ Compiled {len(compiled_articles)} articles")
# Stage 3: Translate
logger.info("\n🌍 STAGE 3: TRANSLATING TO BURMESE")
logger.info("-" * 40)
translated_articles = run_translator(compiled_articles)
self.stats['translated'] = len(translated_articles)
if not translated_articles:
logger.warning("⚠️ No articles translated. Exiting pipeline.")
return self.finish()
logger.info(f"✅ Translated {len(translated_articles)} articles")
# Stage 4: Publish
logger.info("\n📤 STAGE 4: PUBLISHING")
logger.info("-" * 40)
published_count = run_publisher(translated_articles)
self.stats['published'] = published_count
if published_count == 0:
logger.warning("⚠️ No articles published.")
else:
logger.info(f"✅ Published {published_count} articles")
# Finish
return self.finish()
except KeyboardInterrupt:
logger.warning("\n⚠️ Pipeline interrupted by user")
return self.finish(interrupted=True)
except Exception as e:
logger.error(f"\n❌ Pipeline failed with error: {e}")
import traceback
logger.error(traceback.format_exc())
return self.finish(failed=True)
def finish(self, interrupted=False, failed=False):
"""Finish pipeline and display summary"""
duration = int(time.time() - self.start_time)
logger.info("\n" + "="*60)
logger.info("📊 PIPELINE SUMMARY")
logger.info("="*60)
if interrupted:
status = "⚠️ INTERRUPTED"
elif failed:
status = "❌ FAILED"
elif self.stats['published'] > 0:
status = "✅ SUCCESS"
else:
status = "⚠️ COMPLETED WITH WARNINGS"
logger.info(f"Status: {status}")
logger.info(f"Duration: {duration}s ({duration // 60}m {duration % 60}s)")
logger.info(f"")
logger.info(f"Articles scraped: {self.stats['scraped']}")
logger.info(f"Articles compiled: {self.stats['compiled']}")
logger.info(f"Articles translated: {self.stats['translated']}")
logger.info(f"Articles published: {self.stats['published']}")
logger.info("="*60)
# Get site stats
try:
site_stats = database.get_site_stats()
logger.info(f"\n📈 SITE STATISTICS")
logger.info(f"Total articles: {site_stats['total_articles']}")
logger.info(f"Total views: {site_stats['total_views']}")
logger.info(f"Subscribers: {site_stats['subscribers']}")
logger.info("="*60)
except Exception as e:
logger.error(f"Error fetching site stats: {e}")
return self.stats['published']
def main():
"""Main entry point"""
# Check environment
if not config.ANTHROPIC_API_KEY:
logger.error("❌ ANTHROPIC_API_KEY not set in environment!")
logger.error("Please set it in .env file or environment variables.")
sys.exit(1)
if not config.DATABASE_URL:
logger.error("❌ DATABASE_URL not set!")
sys.exit(1)
# Run pipeline
pipeline = Pipeline()
published = pipeline.run()
# Exit with status code
sys.exit(0 if published > 0 else 1)
if __name__ == '__main__':
main()

271
backend/scraper.py Normal file
View File

@@ -0,0 +1,271 @@
# Web scraper for AI news sources
import requests
from bs4 import BeautifulSoup
import feedparser
from newspaper import Article
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from loguru import logger
import time
import config
import database
class AINewsScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; BurmdditBot/1.0; +https://burmddit.vercel.app)'
})
def scrape_all_sources(self) -> int:
"""Scrape all enabled sources"""
total_articles = 0
for source_name, source_config in config.SOURCES.items():
if not source_config.get('enabled', True):
continue
logger.info(f"Scraping {source_name}...")
try:
if source_name == 'medium':
articles = self.scrape_medium(source_config)
elif source_name in ['techcrunch', 'venturebeat', 'mit_tech_review']:
articles = self.scrape_rss_feed(source_config)
else:
logger.warning(f"Unknown source: {source_name}")
continue
# Store articles in database
for article in articles:
article_id = database.insert_raw_article(
url=article['url'],
title=article['title'],
content=article['content'],
author=article['author'],
published_date=article['published_date'],
source=source_name,
category_hint=article.get('category_hint')
)
if article_id:
total_articles += 1
logger.info(f"Scraped {len(articles)} articles from {source_name}")
time.sleep(config.RATE_LIMITS['delay_between_requests'])
except Exception as e:
logger.error(f"Error scraping {source_name}: {e}")
continue
logger.info(f"Total articles scraped: {total_articles}")
return total_articles
def scrape_medium(self, source_config: Dict) -> List[Dict]:
"""Scrape Medium articles by tags"""
articles = []
for tag in source_config['tags']:
try:
url = source_config['url_pattern'].format(tag=tag)
response = self.session.get(url, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# Medium's structure: find article cards
article_elements = soup.find_all('article', limit=source_config['articles_per_tag'])
for element in article_elements:
try:
# Extract article URL
link = element.find('a', href=True)
if not link:
continue
article_url = link['href']
if not article_url.startswith('http'):
article_url = 'https://medium.com' + article_url
# Use newspaper3k for full article extraction
article = self.extract_article_content(article_url)
if article:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
except Exception as e:
logger.error(f"Error parsing Medium article: {e}")
continue
time.sleep(2) # Rate limiting
except Exception as e:
logger.error(f"Error scraping Medium tag '{tag}': {e}")
continue
return articles
def scrape_rss_feed(self, source_config: Dict) -> List[Dict]:
"""Scrape articles from RSS feed"""
articles = []
try:
feed = feedparser.parse(source_config['url'])
for entry in feed.entries[:source_config.get('articles_limit', 20)]:
try:
# Check if AI-related (if filter enabled)
if source_config.get('filter_ai') and not self.is_ai_related(entry.title + ' ' + entry.get('summary', '')):
continue
article_url = entry.link
article = self.extract_article_content(article_url)
if article:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
except Exception as e:
logger.error(f"Error parsing RSS entry: {e}")
continue
except Exception as e:
logger.error(f"Error fetching RSS feed: {e}")
return articles
def extract_article_content(self, url: str) -> Optional[Dict]:
"""Extract full article content using newspaper3k"""
try:
article = Article(url)
article.download()
article.parse()
# Skip if article is too short
if len(article.text) < 500:
logger.debug(f"Article too short, skipping: {url}")
return None
# Parse publication date
pub_date = article.publish_date
if not pub_date:
pub_date = datetime.now()
# Skip old articles (older than 2 days)
if datetime.now() - pub_date > timedelta(days=2):
logger.debug(f"Article too old, skipping: {url}")
return None
# Extract images
images = []
if article.top_image:
images.append(article.top_image)
# Get additional images from article
for img in article.images[:config.PUBLISHING['max_images_per_article']]:
if img and img not in images:
images.append(img)
# Extract videos (YouTube, etc.)
videos = []
if article.movies:
videos = list(article.movies)
# Also check for YouTube embeds in HTML
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(article.html, 'html.parser')
# Find YouTube iframes
for iframe in soup.find_all('iframe'):
src = iframe.get('src', '')
if 'youtube.com' in src or 'youtu.be' in src:
videos.append(src)
# Find more images
for img in soup.find_all('img')[:10]:
img_src = img.get('src', '')
if img_src and img_src not in images and len(images) < config.PUBLISHING['max_images_per_article']:
# Filter out tiny images (likely icons/ads)
width = img.get('width', 0)
if not width or (isinstance(width, str) and not width.isdigit()) or int(str(width)) > 200:
images.append(img_src)
except Exception as e:
logger.debug(f"Error extracting additional media: {e}")
return {
'url': url,
'title': article.title or 'Untitled',
'content': article.text,
'author': ', '.join(article.authors) if article.authors else 'Unknown',
'published_date': pub_date,
'top_image': article.top_image,
'images': images, # 🔥 Multiple images!
'videos': videos # 🔥 Video embeds!
}
except Exception as e:
logger.error(f"Error extracting article from {url}: {e}")
return None
def is_ai_related(self, text: str) -> bool:
"""Check if text is AI-related"""
ai_keywords = [
'artificial intelligence', 'ai', 'machine learning', 'ml',
'deep learning', 'neural network', 'chatgpt', 'gpt', 'llm',
'claude', 'openai', 'anthropic', 'transformer', 'nlp',
'generative ai', 'automation', 'computer vision'
]
text_lower = text.lower()
return any(keyword in text_lower for keyword in ai_keywords)
def detect_category_from_text(self, text: str) -> Optional[str]:
"""Detect category hint from text"""
text_lower = text.lower()
scores = {}
for category, keywords in config.CATEGORY_KEYWORDS.items():
score = sum(1 for keyword in keywords if keyword in text_lower)
scores[category] = score
if max(scores.values()) > 0:
return max(scores, key=scores.get)
return None
def run_scraper():
"""Main scraper execution function"""
logger.info("Starting scraper...")
start_time = time.time()
try:
scraper = AINewsScraper()
articles_count = scraper.scrape_all_sources()
duration = int(time.time() - start_time)
database.log_pipeline_stage(
stage='crawl',
status='completed',
articles_processed=articles_count,
duration=duration
)
logger.info(f"Scraper completed in {duration}s. Articles scraped: {articles_count}")
return articles_count
except Exception as e:
logger.error(f"Scraper failed: {e}")
database.log_pipeline_stage(
stage='crawl',
status='failed',
error_message=str(e)
)
return 0
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
run_scraper()

255
backend/translator.py Normal file
View File

@@ -0,0 +1,255 @@
# Burmese translation module using Claude
from typing import Dict, Optional
from loguru import logger
import anthropic
import re
import config
import time
class BurmeseTranslator:
def __init__(self):
self.client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
self.preserve_terms = config.TRANSLATION['preserve_terms']
def translate_article(self, article: Dict) -> Dict:
"""Translate compiled article to Burmese"""
logger.info(f"Translating article: {article['title'][:50]}...")
try:
# Translate title
title_burmese = self.translate_text(
text=article['title'],
context="This is an article title about AI technology"
)
# Translate excerpt
excerpt_burmese = self.translate_text(
text=article['excerpt'],
context="This is a brief article summary"
)
# Translate main content (in chunks if too long)
content_burmese = self.translate_long_text(article['content'])
# Return article with Burmese translations
return {
**article,
'title_burmese': title_burmese,
'excerpt_burmese': excerpt_burmese,
'content_burmese': content_burmese
}
except Exception as e:
logger.error(f"Translation error: {e}")
# Fallback: return original text if translation fails
return {
**article,
'title_burmese': article['title'],
'excerpt_burmese': article['excerpt'],
'content_burmese': article['content']
}
def translate_text(self, text: str, context: str = "") -> str:
"""Translate a text block to Burmese"""
# Build preserved terms list for this text
preserved_terms_str = ", ".join(self.preserve_terms)
prompt = f"""Translate the following English text to Burmese (Myanmar Unicode) in a CASUAL, EASY-TO-READ style.
🎯 CRITICAL GUIDELINES:
1. Write in **CASUAL, CONVERSATIONAL Burmese** - like talking to a friend over tea
2. Use **SIMPLE, EVERYDAY words** - avoid formal or academic language
3. Explain technical concepts in **LAYMAN TERMS** - as if explaining to your grandmother
4. Keep these terms in English: {preserved_terms_str}
5. Add **brief explanations** in parentheses for complex terms
6. Use **short sentences** - easy to read on mobile
7. Break up long paragraphs - white space is good
8. Keep markdown formatting (##, **, -, etc.) intact
TARGET AUDIENCE: General Myanmar public who are curious about AI but not tech experts
TONE: Friendly, approachable, informative but not boring
EXAMPLE STYLE:
❌ Bad (too formal): "ယခု နည်းပညာသည် ဉာဏ်ရည်တု ဖြစ်စဉ်များကို အသုံးပြုပါသည်"
✅ Good (casual): "ဒီနည်းပညာက AI (အထက်တန်းကွန်ပျူတာဦးနှောက်) ကို သုံးတာပါ"
Context: {context}
Text to translate:
{text}
Casual, easy-to-read Burmese translation:"""
try:
message = self.client.messages.create(
model=config.TRANSLATION['model'],
max_tokens=config.TRANSLATION['max_tokens'],
temperature=config.TRANSLATION['temperature'],
messages=[{"role": "user", "content": prompt}]
)
translated = message.content[0].text.strip()
# Post-process: ensure Unicode and clean up
translated = self.post_process_translation(translated)
return translated
except Exception as e:
logger.error(f"API translation error: {e}")
return text # Fallback to original
def translate_long_text(self, text: str, chunk_size: int = 2000) -> str:
"""Translate long text in chunks to stay within token limits"""
# If text is short enough, translate directly
if len(text) < chunk_size:
return self.translate_text(text, context="This is the main article content")
# Split into paragraphs
paragraphs = text.split('\n\n')
# Group paragraphs into chunks
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) < chunk_size:
current_chunk += para + '\n\n'
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para + '\n\n'
if current_chunk:
chunks.append(current_chunk.strip())
logger.info(f"Translating {len(chunks)} chunks...")
# Translate each chunk
translated_chunks = []
for i, chunk in enumerate(chunks):
logger.debug(f"Translating chunk {i+1}/{len(chunks)}")
translated = self.translate_text(
chunk,
context=f"This is part {i+1} of {len(chunks)} of a longer article"
)
translated_chunks.append(translated)
time.sleep(0.5) # Rate limiting
# Join chunks
return '\n\n'.join(translated_chunks)
def post_process_translation(self, text: str) -> str:
"""Clean up and validate translation"""
# Remove any accidental duplication
text = re.sub(r'(\n{3,})', '\n\n', text)
# Ensure proper spacing after punctuation
text = re.sub(r'([။၊])([^\s])', r'\1 \2', text)
# Preserve preserved terms (fix any that got translated)
for term in self.preserve_terms:
# If the term appears in a weird form, try to fix it
# (This is a simple check; more sophisticated matching could be added)
if term not in text and term.lower() in text.lower():
text = re.sub(re.escape(term.lower()), term, text, flags=re.IGNORECASE)
return text.strip()
def validate_burmese_text(self, text: str) -> bool:
"""Check if text contains valid Burmese Unicode"""
# Myanmar Unicode range: U+1000 to U+109F
burmese_pattern = re.compile(r'[\u1000-\u109F]')
return bool(burmese_pattern.search(text))
def run_translator(compiled_articles: list) -> list:
"""Translate compiled articles to Burmese"""
logger.info(f"Starting translator for {len(compiled_articles)} articles...")
start_time = time.time()
try:
translator = BurmeseTranslator()
translated_articles = []
for i, article in enumerate(compiled_articles, 1):
logger.info(f"Translating article {i}/{len(compiled_articles)}")
try:
translated = translator.translate_article(article)
# Validate translation
if translator.validate_burmese_text(translated['content_burmese']):
translated_articles.append(translated)
logger.info(f"✓ Translation successful for article {i}")
else:
logger.warning(f"✗ Translation validation failed for article {i}")
# Still add it, but flag it
translated_articles.append(translated)
time.sleep(1) # Rate limiting
except Exception as e:
logger.error(f"Error translating article {i}: {e}")
continue
duration = int(time.time() - start_time)
from database import log_pipeline_stage
log_pipeline_stage(
stage='translate',
status='completed',
articles_processed=len(translated_articles),
duration=duration
)
logger.info(f"Translator completed in {duration}s. Articles translated: {len(translated_articles)}")
return translated_articles
except Exception as e:
logger.error(f"Translator failed: {e}")
from database import log_pipeline_stage
log_pipeline_stage(
stage='translate',
status='failed',
error_message=str(e)
)
return []
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
# Test translation
test_article = {
'title': 'OpenAI Releases GPT-5: A New Era of AI',
'excerpt': 'OpenAI today announced GPT-5, the next generation of their language model.',
'content': '''OpenAI has officially released GPT-5, marking a significant milestone in artificial intelligence development.
## Key Features
The new model includes:
- 10x more parameters than GPT-4
- Better reasoning capabilities
- Multimodal support for video
- Reduced hallucinations
CEO Sam Altman said, "GPT-5 represents our most advanced AI system yet."
The model will be available to ChatGPT Plus subscribers starting next month.'''
}
translator = BurmeseTranslator()
translated = translator.translate_article(test_article)
print("\n=== ORIGINAL ===")
print(f"Title: {translated['title']}")
print(f"\nContent: {translated['content'][:200]}...")
print("\n=== BURMESE ===")
print(f"Title: {translated['title_burmese']}")
print(f"\nContent: {translated['content_burmese'][:200]}...")