Add web admin features + fix scraper & translator

Frontend changes:
- Add /admin dashboard for article management
- Add AdminButton component (Alt+Shift+A on articles)
- Add /api/admin/article API endpoints

Backend improvements:
- scraper_v2.py: Multi-layer fallback extraction (newspaper → trafilatura → readability)
- translator_v2.py: Better chunking, repetition detection, validation
- admin_tools.py: CLI admin commands
- test_scraper.py: Individual source testing

Docs:
- WEB-ADMIN-GUIDE.md: Web admin usage
- ADMIN-GUIDE.md: CLI admin usage
- SCRAPER-IMPROVEMENT-PLAN.md: Scraper fixes details
- TRANSLATION-FIX.md: Translation improvements
- ADMIN-FEATURES-SUMMARY.md: Implementation summary

Fixes:
- Article scraping from 0 → 96+ articles working
- Translation quality issues (repetition, truncation)
- Added 13 new RSS sources
This commit is contained in:
Zeya Phyo
2026-02-26 09:17:50 +00:00
parent 8bf5f342cd
commit f51ac4afa4
20 changed files with 4769 additions and 23 deletions

393
backend/admin_tools.py Executable file
View File

@@ -0,0 +1,393 @@
#!/usr/bin/env python3
"""
Admin tools for managing burmddit articles
"""
import psycopg2
from dotenv import load_dotenv
import os
from datetime import datetime
from loguru import logger
import sys
load_dotenv()
def get_connection():
"""Get database connection"""
return psycopg2.connect(os.getenv('DATABASE_URL'))
def list_articles(status=None, limit=20):
"""List articles with optional status filter"""
conn = get_connection()
cur = conn.cursor()
if status:
cur.execute('''
SELECT id, title, status, published_at, view_count,
LENGTH(content) as content_len,
LENGTH(content_burmese) as burmese_len
FROM articles
WHERE status = %s
ORDER BY published_at DESC
LIMIT %s
''', (status, limit))
else:
cur.execute('''
SELECT id, title, status, published_at, view_count,
LENGTH(content) as content_len,
LENGTH(content_burmese) as burmese_len
FROM articles
ORDER BY published_at DESC
LIMIT %s
''', (limit,))
articles = []
for row in cur.fetchall():
articles.append({
'id': row[0],
'title': row[1][:60] + '...' if len(row[1]) > 60 else row[1],
'status': row[2],
'published_at': row[3],
'views': row[4] or 0,
'content_len': row[5],
'burmese_len': row[6]
})
cur.close()
conn.close()
return articles
def unpublish_article(article_id: int, reason: str = "Error/Quality issue"):
"""Unpublish an article (change status to draft)"""
conn = get_connection()
cur = conn.cursor()
# Get article info first
cur.execute('SELECT id, title, status FROM articles WHERE id = %s', (article_id,))
article = cur.fetchone()
if not article:
logger.error(f"Article {article_id} not found")
cur.close()
conn.close()
return False
logger.info(f"Unpublishing article {article_id}: {article[1][:60]}...")
logger.info(f"Current status: {article[2]}")
logger.info(f"Reason: {reason}")
# Update status to draft
cur.execute('''
UPDATE articles
SET status = 'draft',
updated_at = NOW()
WHERE id = %s
''', (article_id,))
conn.commit()
logger.info(f"✅ Article {article_id} unpublished successfully")
cur.close()
conn.close()
return True
def republish_article(article_id: int):
"""Republish an article (change status to published)"""
conn = get_connection()
cur = conn.cursor()
# Get article info first
cur.execute('SELECT id, title, status FROM articles WHERE id = %s', (article_id,))
article = cur.fetchone()
if not article:
logger.error(f"Article {article_id} not found")
cur.close()
conn.close()
return False
logger.info(f"Republishing article {article_id}: {article[1][:60]}...")
logger.info(f"Current status: {article[2]}")
# Update status to published
cur.execute('''
UPDATE articles
SET status = 'published',
updated_at = NOW()
WHERE id = %s
''', (article_id,))
conn.commit()
logger.info(f"✅ Article {article_id} republished successfully")
cur.close()
conn.close()
return True
def delete_article(article_id: int):
"""Permanently delete an article"""
conn = get_connection()
cur = conn.cursor()
# Get article info first
cur.execute('SELECT id, title, status FROM articles WHERE id = %s', (article_id,))
article = cur.fetchone()
if not article:
logger.error(f"Article {article_id} not found")
cur.close()
conn.close()
return False
logger.warning(f"⚠️ DELETING article {article_id}: {article[1][:60]}...")
# Delete from database
cur.execute('DELETE FROM articles WHERE id = %s', (article_id,))
conn.commit()
logger.info(f"✅ Article {article_id} deleted permanently")
cur.close()
conn.close()
return True
def find_problem_articles():
"""Find articles with potential issues"""
conn = get_connection()
cur = conn.cursor()
issues = []
# Issue 1: Translation too short (< 30% of original)
cur.execute('''
SELECT id, title,
LENGTH(content) as en_len,
LENGTH(content_burmese) as mm_len,
ROUND(100.0 * LENGTH(content_burmese) / NULLIF(LENGTH(content), 0), 1) as ratio
FROM articles
WHERE status = 'published'
AND LENGTH(content_burmese) < LENGTH(content) * 0.3
ORDER BY ratio ASC
LIMIT 10
''')
for row in cur.fetchall():
issues.append({
'id': row[0],
'title': row[1][:50],
'issue': 'Translation too short',
'details': f'EN: {row[2]} chars, MM: {row[3]} chars ({row[4]}%)'
})
# Issue 2: Missing Burmese content
cur.execute('''
SELECT id, title
FROM articles
WHERE status = 'published'
AND (content_burmese IS NULL OR LENGTH(content_burmese) < 100)
LIMIT 10
''')
for row in cur.fetchall():
issues.append({
'id': row[0],
'title': row[1][:50],
'issue': 'Missing Burmese translation',
'details': 'No or very short Burmese content'
})
# Issue 3: Very short articles (< 500 chars)
cur.execute('''
SELECT id, title, LENGTH(content) as len
FROM articles
WHERE status = 'published'
AND LENGTH(content) < 500
LIMIT 10
''')
for row in cur.fetchall():
issues.append({
'id': row[0],
'title': row[1][:50],
'issue': 'Article too short',
'details': f'Only {row[2]} chars'
})
cur.close()
conn.close()
return issues
def get_article_details(article_id: int):
"""Get detailed info about an article"""
conn = get_connection()
cur = conn.cursor()
cur.execute('''
SELECT id, title, title_burmese, slug, status,
LENGTH(content) as content_len,
LENGTH(content_burmese) as burmese_len,
category_id, author, reading_time,
published_at, view_count, created_at, updated_at,
LEFT(content, 200) as content_preview,
LEFT(content_burmese, 200) as burmese_preview
FROM articles
WHERE id = %s
''', (article_id,))
row = cur.fetchone()
if not row:
return None
article = {
'id': row[0],
'title': row[1],
'title_burmese': row[2],
'slug': row[3],
'status': row[4],
'content_length': row[5],
'burmese_length': row[6],
'translation_ratio': round(100.0 * row[6] / row[5], 1) if row[5] > 0 else 0,
'category_id': row[7],
'author': row[8],
'reading_time': row[9],
'published_at': row[10],
'view_count': row[11] or 0,
'created_at': row[12],
'updated_at': row[13],
'content_preview': row[14],
'burmese_preview': row[15]
}
cur.close()
conn.close()
return article
def print_article_table(articles):
"""Print articles in a nice table format"""
print()
print("=" * 100)
print(f"{'ID':<5} {'Title':<50} {'Status':<12} {'Views':<8} {'Ratio':<8}")
print("-" * 100)
for a in articles:
ratio = f"{100.0 * a['burmese_len'] / a['content_len']:.1f}%" if a['content_len'] > 0 else "N/A"
print(f"{a['id']:<5} {a['title']:<50} {a['status']:<12} {a['views']:<8} {ratio:<8}")
print("=" * 100)
print()
def main():
"""Main CLI interface"""
import argparse
parser = argparse.ArgumentParser(description='Burmddit Admin Tools')
subparsers = parser.add_subparsers(dest='command', help='Commands')
# List command
list_parser = subparsers.add_parser('list', help='List articles')
list_parser.add_argument('--status', choices=['published', 'draft'], help='Filter by status')
list_parser.add_argument('--limit', type=int, default=20, help='Number of articles')
# Unpublish command
unpublish_parser = subparsers.add_parser('unpublish', help='Unpublish an article')
unpublish_parser.add_argument('article_id', type=int, help='Article ID')
unpublish_parser.add_argument('--reason', default='Error/Quality issue', help='Reason for unpublishing')
# Republish command
republish_parser = subparsers.add_parser('republish', help='Republish an article')
republish_parser.add_argument('article_id', type=int, help='Article ID')
# Delete command
delete_parser = subparsers.add_parser('delete', help='Delete an article permanently')
delete_parser.add_argument('article_id', type=int, help='Article ID')
delete_parser.add_argument('--confirm', action='store_true', help='Confirm deletion')
# Find problems command
subparsers.add_parser('find-problems', help='Find articles with issues')
# Details command
details_parser = subparsers.add_parser('details', help='Show article details')
details_parser.add_argument('article_id', type=int, help='Article ID')
args = parser.parse_args()
# Configure logger
logger.remove()
logger.add(sys.stdout, format="<level>{message}</level>", level="INFO")
if args.command == 'list':
articles = list_articles(status=args.status, limit=args.limit)
print_article_table(articles)
print(f"Total: {len(articles)} articles")
elif args.command == 'unpublish':
unpublish_article(args.article_id, args.reason)
elif args.command == 'republish':
republish_article(args.article_id)
elif args.command == 'delete':
if not args.confirm:
logger.error("⚠️ Deletion requires --confirm flag to prevent accidents")
return
delete_article(args.article_id)
elif args.command == 'find-problems':
issues = find_problem_articles()
if not issues:
logger.info("✅ No issues found!")
else:
print()
print("=" * 100)
print(f"Found {len(issues)} potential issues:")
print("-" * 100)
for issue in issues:
print(f"ID {issue['id']}: {issue['title']}")
print(f" Issue: {issue['issue']}")
print(f" Details: {issue['details']}")
print()
print("=" * 100)
print()
print("To unpublish an article: python3 admin_tools.py unpublish <ID>")
elif args.command == 'details':
article = get_article_details(args.article_id)
if not article:
logger.error(f"Article {args.article_id} not found")
return
print()
print("=" * 80)
print(f"Article {article['id']} Details")
print("=" * 80)
print(f"Title (EN): {article['title']}")
print(f"Title (MM): {article['title_burmese']}")
print(f"Slug: {article['slug']}")
print(f"Status: {article['status']}")
print(f"Author: {article['author']}")
print(f"Published: {article['published_at']}")
print(f"Views: {article['view_count']}")
print()
print(f"Content length: {article['content_length']} chars")
print(f"Burmese length: {article['burmese_length']} chars")
print(f"Translation ratio: {article['translation_ratio']}%")
print()
print("English preview:")
print(article['content_preview'])
print()
print("Burmese preview:")
print(article['burmese_preview'])
print("=" * 80)
else:
parser.print_help()
if __name__ == '__main__':
main()

View File

@@ -12,35 +12,19 @@ DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://localhost/burmddit')
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') # Optional, for embeddings
# Scraping sources - 🔥 EXPANDED for more content!
# Scraping sources - 🔥 V2 UPDATED with working sources!
SOURCES = {
'medium': {
'enabled': True,
'tags': ['artificial-intelligence', 'machine-learning', 'chatgpt', 'ai-tools',
'generative-ai', 'deeplearning', 'prompt-engineering', 'ai-news'],
'url_pattern': 'https://medium.com/tag/{tag}/latest',
'articles_per_tag': 15 # Increased from 10
},
# WORKING SOURCES (tested 2026-02-26)
'techcrunch': {
'enabled': True,
'category': 'artificial-intelligence',
'url': 'https://techcrunch.com/category/artificial-intelligence/feed/',
'articles_limit': 30 # Increased from 20
},
'venturebeat': {
'enabled': True,
'url': 'https://venturebeat.com/category/ai/feed/',
'articles_limit': 25 # Increased from 15
'articles_limit': 30
},
'mit_tech_review': {
'enabled': True,
'url': 'https://www.technologyreview.com/feed/',
'filter_ai': True,
'articles_limit': 20 # Increased from 10
},
'theverge': {
'enabled': True,
'url': 'https://www.theverge.com/ai-artificial-intelligence/rss/index.xml',
'articles_limit': 20
},
'wired_ai': {
@@ -48,13 +32,100 @@ SOURCES = {
'url': 'https://www.wired.com/feed/tag/ai/latest/rss',
'articles_limit': 15
},
'arstechnica': {
# NEW HIGH-QUALITY SOURCES (Priority Tier 1)
'openai_blog': {
'enabled': True,
'url': 'https://openai.com/blog/rss/',
'articles_limit': 10
},
'huggingface': {
'enabled': True,
'url': 'https://huggingface.co/blog/feed.xml',
'articles_limit': 15
},
'google_ai': {
'enabled': True,
'url': 'http://googleaiblog.blogspot.com/atom.xml',
'articles_limit': 15
},
'marktechpost': {
'enabled': True,
'url': 'https://www.marktechpost.com/feed/',
'articles_limit': 25
},
'the_rundown_ai': {
'enabled': True,
'url': 'https://rss.beehiiv.com/feeds/2R3C6Bt5wj.xml',
'articles_limit': 10
},
'last_week_ai': {
'enabled': True,
'url': 'https://lastweekin.ai/feed',
'articles_limit': 10
},
'ai_news': {
'enabled': True,
'url': 'https://www.artificialintelligence-news.com/feed/rss/',
'articles_limit': 20
},
# NEW SOURCES (Priority Tier 2)
'kdnuggets': {
'enabled': True,
'url': 'https://www.kdnuggets.com/feed',
'articles_limit': 20
},
'the_decoder': {
'enabled': True,
'url': 'https://the-decoder.com/feed/',
'articles_limit': 20
},
'ai_business': {
'enabled': True,
'url': 'https://aibusiness.com/rss.xml',
'articles_limit': 15
},
'unite_ai': {
'enabled': True,
'url': 'https://www.unite.ai/feed/',
'articles_limit': 15
},
'simonwillison': {
'enabled': True,
'url': 'https://simonwillison.net/atom/everything/',
'articles_limit': 10
},
'latent_space': {
'enabled': True,
'url': 'https://www.latent.space/feed',
'articles_limit': 10
},
# BROKEN SOURCES (disabled temporarily)
'medium': {
'enabled': False, # Scraping broken
'tags': ['artificial-intelligence', 'machine-learning', 'chatgpt'],
'url_pattern': 'https://medium.com/tag/{tag}/latest',
'articles_per_tag': 15
},
'venturebeat': {
'enabled': False, # RSS feed empty
'url': 'https://venturebeat.com/category/ai/feed/',
'articles_limit': 25
},
'theverge': {
'enabled': False, # RSS feed empty
'url': 'https://www.theverge.com/ai-artificial-intelligence/rss/index.xml',
'articles_limit': 20
},
'arstechnica': {
'enabled': False, # Needs testing
'url': 'https://arstechnica.com/tag/artificial-intelligence/feed/',
'articles_limit': 15
},
'hackernews': {
'enabled': True,
'enabled': False, # Needs testing
'url': 'https://hnrss.org/newest?q=AI+OR+ChatGPT+OR+OpenAI',
'articles_limit': 30
}

90
backend/fix_article_50.py Executable file
View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Re-translate article ID 50 which has broken/truncated translation
"""
import sys
from loguru import logger
from translator_v2 import BurmeseTranslator
import database
def fix_article(article_id: int):
"""Re-translate a specific article"""
logger.info(f"Fixing article {article_id}...")
# Get article from database
import psycopg2
from dotenv import load_dotenv
import os
load_dotenv()
conn = psycopg2.connect(os.getenv('DATABASE_URL'))
cur = conn.cursor()
cur.execute('''
SELECT id, title, excerpt, content
FROM articles
WHERE id = %s
''', (article_id,))
row = cur.fetchone()
if not row:
logger.error(f"Article {article_id} not found")
return False
article = {
'id': row[0],
'title': row[1],
'excerpt': row[2],
'content': row[3]
}
logger.info(f"Article: {article['title'][:50]}...")
logger.info(f"Content length: {len(article['content'])} chars")
# Translate
translator = BurmeseTranslator()
translated = translator.translate_article(article)
logger.info(f"Translation complete:")
logger.info(f" Title Burmese: {len(translated['title_burmese'])} chars")
logger.info(f" Excerpt Burmese: {len(translated['excerpt_burmese'])} chars")
logger.info(f" Content Burmese: {len(translated['content_burmese'])} chars")
# Validate
ratio = len(translated['content_burmese']) / len(article['content'])
logger.info(f" Length ratio: {ratio:.2f} (should be 0.5-2.0)")
if ratio < 0.3:
logger.error("Translation still too short! Not updating.")
return False
# Update database
cur.execute('''
UPDATE articles
SET title_burmese = %s,
excerpt_burmese = %s,
content_burmese = %s
WHERE id = %s
''', (
translated['title_burmese'],
translated['excerpt_burmese'],
translated['content_burmese'],
article_id
))
conn.commit()
logger.info(f"✅ Article {article_id} updated successfully")
cur.close()
conn.close()
return True
if __name__ == '__main__':
import config
logger.add(sys.stdout, level="INFO")
article_id = int(sys.argv[1]) if len(sys.argv) > 1 else 50
fix_article(article_id)

View File

@@ -8,9 +8,9 @@ from loguru import logger
import config
# Import pipeline stages
from scraper import run_scraper
from scraper_v2 import run_scraper # Using improved v2 scraper
from compiler import run_compiler
from translator import run_translator
from translator_v2 import run_translator # Using improved v2 translator
from publisher import run_publisher
import database

271
backend/scraper_old.py Normal file
View File

@@ -0,0 +1,271 @@
# Web scraper for AI news sources
import requests
from bs4 import BeautifulSoup
import feedparser
from newspaper import Article
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from loguru import logger
import time
import config
import database
class AINewsScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; BurmdditBot/1.0; +https://burmddit.vercel.app)'
})
def scrape_all_sources(self) -> int:
"""Scrape all enabled sources"""
total_articles = 0
for source_name, source_config in config.SOURCES.items():
if not source_config.get('enabled', True):
continue
logger.info(f"Scraping {source_name}...")
try:
if source_name == 'medium':
articles = self.scrape_medium(source_config)
elif 'url' in source_config:
articles = self.scrape_rss_feed(source_config)
else:
logger.warning(f"Unknown source: {source_name}")
continue
# Store articles in database
for article in articles:
article_id = database.insert_raw_article(
url=article['url'],
title=article['title'],
content=article['content'],
author=article['author'],
published_date=article['published_date'],
source=source_name,
category_hint=article.get('category_hint')
)
if article_id:
total_articles += 1
logger.info(f"Scraped {len(articles)} articles from {source_name}")
time.sleep(config.RATE_LIMITS['delay_between_requests'])
except Exception as e:
logger.error(f"Error scraping {source_name}: {e}")
continue
logger.info(f"Total articles scraped: {total_articles}")
return total_articles
def scrape_medium(self, source_config: Dict) -> List[Dict]:
"""Scrape Medium articles by tags"""
articles = []
for tag in source_config['tags']:
try:
url = source_config['url_pattern'].format(tag=tag)
response = self.session.get(url, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# Medium's structure: find article cards
article_elements = soup.find_all('article', limit=source_config['articles_per_tag'])
for element in article_elements:
try:
# Extract article URL
link = element.find('a', href=True)
if not link:
continue
article_url = link['href']
if not article_url.startswith('http'):
article_url = 'https://medium.com' + article_url
# Use newspaper3k for full article extraction
article = self.extract_article_content(article_url)
if article:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
except Exception as e:
logger.error(f"Error parsing Medium article: {e}")
continue
time.sleep(2) # Rate limiting
except Exception as e:
logger.error(f"Error scraping Medium tag '{tag}': {e}")
continue
return articles
def scrape_rss_feed(self, source_config: Dict) -> List[Dict]:
"""Scrape articles from RSS feed"""
articles = []
try:
feed = feedparser.parse(source_config['url'])
for entry in feed.entries[:source_config.get('articles_limit', 20)]:
try:
# Check if AI-related (if filter enabled)
if source_config.get('filter_ai') and not self.is_ai_related(entry.title + ' ' + entry.get('summary', '')):
continue
article_url = entry.link
article = self.extract_article_content(article_url)
if article:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
except Exception as e:
logger.error(f"Error parsing RSS entry: {e}")
continue
except Exception as e:
logger.error(f"Error fetching RSS feed: {e}")
return articles
def extract_article_content(self, url: str) -> Optional[Dict]:
"""Extract full article content using newspaper3k"""
try:
article = Article(url)
article.download()
article.parse()
# Skip if article is too short
if len(article.text) < 500:
logger.debug(f"Article too short, skipping: {url}")
return None
# Parse publication date
pub_date = article.publish_date
if not pub_date:
pub_date = datetime.now()
# Skip old articles (older than 2 days)
if datetime.now() - pub_date > timedelta(days=2):
logger.debug(f"Article too old, skipping: {url}")
return None
# Extract images
images = []
if article.top_image:
images.append(article.top_image)
# Get additional images from article
for img in article.images[:config.PUBLISHING['max_images_per_article']]:
if img and img not in images:
images.append(img)
# Extract videos (YouTube, etc.)
videos = []
if article.movies:
videos = list(article.movies)
# Also check for YouTube embeds in HTML
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(article.html, 'html.parser')
# Find YouTube iframes
for iframe in soup.find_all('iframe'):
src = iframe.get('src', '')
if 'youtube.com' in src or 'youtu.be' in src:
videos.append(src)
# Find more images
for img in soup.find_all('img')[:10]:
img_src = img.get('src', '')
if img_src and img_src not in images and len(images) < config.PUBLISHING['max_images_per_article']:
# Filter out tiny images (likely icons/ads)
width = img.get('width', 0)
if not width or (isinstance(width, str) and not width.isdigit()) or int(str(width)) > 200:
images.append(img_src)
except Exception as e:
logger.debug(f"Error extracting additional media: {e}")
return {
'url': url,
'title': article.title or 'Untitled',
'content': article.text,
'author': ', '.join(article.authors) if article.authors else 'Unknown',
'published_date': pub_date,
'top_image': article.top_image,
'images': images, # 🔥 Multiple images!
'videos': videos # 🔥 Video embeds!
}
except Exception as e:
logger.error(f"Error extracting article from {url}: {e}")
return None
def is_ai_related(self, text: str) -> bool:
"""Check if text is AI-related"""
ai_keywords = [
'artificial intelligence', 'ai', 'machine learning', 'ml',
'deep learning', 'neural network', 'chatgpt', 'gpt', 'llm',
'claude', 'openai', 'anthropic', 'transformer', 'nlp',
'generative ai', 'automation', 'computer vision'
]
text_lower = text.lower()
return any(keyword in text_lower for keyword in ai_keywords)
def detect_category_from_text(self, text: str) -> Optional[str]:
"""Detect category hint from text"""
text_lower = text.lower()
scores = {}
for category, keywords in config.CATEGORY_KEYWORDS.items():
score = sum(1 for keyword in keywords if keyword in text_lower)
scores[category] = score
if max(scores.values()) > 0:
return max(scores, key=scores.get)
return None
def run_scraper():
"""Main scraper execution function"""
logger.info("Starting scraper...")
start_time = time.time()
try:
scraper = AINewsScraper()
articles_count = scraper.scrape_all_sources()
duration = int(time.time() - start_time)
database.log_pipeline_stage(
stage='crawl',
status='completed',
articles_processed=articles_count,
duration=duration
)
logger.info(f"Scraper completed in {duration}s. Articles scraped: {articles_count}")
return articles_count
except Exception as e:
logger.error(f"Scraper failed: {e}")
database.log_pipeline_stage(
stage='crawl',
status='failed',
error_message=str(e)
)
return 0
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
run_scraper()

446
backend/scraper_v2.py Normal file
View File

@@ -0,0 +1,446 @@
# Web scraper v2 for AI news sources - ROBUST VERSION
# Multi-layer fallback extraction for maximum reliability
import requests
from bs4 import BeautifulSoup
import feedparser
from newspaper import Article
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from loguru import logger
import time
import config
import database
from fake_useragent import UserAgent
import trafilatura
from readability import Document
import random
class AINewsScraper:
def __init__(self):
self.session = requests.Session()
self.ua = UserAgent()
self.update_headers()
# Success tracking
self.stats = {
'total_attempts': 0,
'total_success': 0,
'method_success': {
'newspaper': 0,
'trafilatura': 0,
'readability': 0,
'failed': 0
}
}
def update_headers(self):
"""Rotate user agent for each request"""
self.session.headers.update({
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
def scrape_all_sources(self) -> int:
"""Scrape all enabled sources"""
total_articles = 0
for source_name, source_config in config.SOURCES.items():
if not source_config.get('enabled', True):
logger.info(f"⏭️ Skipping {source_name} (disabled)")
continue
logger.info(f"🔍 Scraping {source_name}...")
try:
if source_name == 'medium':
articles = self.scrape_medium(source_config)
elif 'url' in source_config:
articles = self.scrape_rss_feed(source_name, source_config)
else:
logger.warning(f"⚠️ Unknown source type: {source_name}")
continue
# Store articles in database
stored_count = 0
for article in articles:
try:
article_id = database.insert_raw_article(
url=article['url'],
title=article['title'],
content=article['content'],
author=article['author'],
published_date=article['published_date'],
source=source_name,
category_hint=article.get('category_hint')
)
if article_id:
stored_count += 1
except Exception as e:
logger.debug(f"Failed to store article {article['url']}: {e}")
continue
total_articles += stored_count
logger.info(f"{source_name}: {stored_count}/{len(articles)} articles stored")
# Rate limiting
time.sleep(config.RATE_LIMITS['delay_between_requests'])
except Exception as e:
logger.error(f"❌ Error scraping {source_name}: {e}")
continue
# Log stats
logger.info(f"\n📊 Extraction Method Stats:")
logger.info(f" newspaper3k: {self.stats['method_success']['newspaper']}")
logger.info(f" trafilatura: {self.stats['method_success']['trafilatura']}")
logger.info(f" readability: {self.stats['method_success']['readability']}")
logger.info(f" failed: {self.stats['method_success']['failed']}")
logger.info(f" Success rate: {self.stats['total_success']}/{self.stats['total_attempts']} ({100*self.stats['total_success']//max(self.stats['total_attempts'],1)}%)")
logger.info(f"\n✅ Total articles scraped: {total_articles}")
return total_articles
def scrape_medium(self, source_config: Dict) -> List[Dict]:
"""Scrape Medium articles by tags"""
articles = []
for tag in source_config['tags']:
try:
url = source_config['url_pattern'].format(tag=tag)
self.update_headers()
response = self.session.get(url, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# Medium's structure: find article links
links = soup.find_all('a', href=True, limit=source_config['articles_per_tag'] * 3)
processed = 0
for link in links:
if processed >= source_config['articles_per_tag']:
break
article_url = link['href']
if not article_url.startswith('http'):
article_url = 'https://medium.com' + article_url
# Only process Medium article URLs
if 'medium.com' not in article_url or '?' in article_url:
continue
# Extract article content
article = self.extract_article_content(article_url)
if article and len(article['content']) > 500:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
processed += 1
logger.debug(f" Medium tag '{tag}': {processed} articles")
time.sleep(3) # Rate limiting for Medium
except Exception as e:
logger.error(f"Error scraping Medium tag '{tag}': {e}")
continue
return articles
def scrape_rss_feed(self, source_name: str, source_config: Dict) -> List[Dict]:
"""Scrape articles from RSS feed"""
articles = []
try:
# Parse RSS feed
feed = feedparser.parse(source_config['url'])
if not feed.entries:
logger.warning(f" No entries found in RSS feed")
return articles
max_articles = source_config.get('articles_limit', 20)
processed = 0
for entry in feed.entries:
if processed >= max_articles:
break
try:
# Check if AI-related (if filter enabled)
if source_config.get('filter_ai'):
text = entry.get('title', '') + ' ' + entry.get('summary', '')
if not self.is_ai_related(text):
continue
article_url = entry.link
# Extract full article
article = self.extract_article_content(article_url)
if article and len(article['content']) > 500:
article['category_hint'] = self.detect_category_from_text(
article['title'] + ' ' + article['content'][:500]
)
articles.append(article)
processed += 1
except Exception as e:
logger.debug(f"Failed to parse RSS entry: {e}")
continue
except Exception as e:
logger.error(f"Error fetching RSS feed: {e}")
return articles
def extract_article_content(self, url: str) -> Optional[Dict]:
"""
Extract article content using multi-layer fallback approach:
1. Try newspaper3k (fast but unreliable)
2. Fallback to trafilatura (reliable)
3. Fallback to readability-lxml (reliable)
4. Give up if all fail
"""
self.stats['total_attempts'] += 1
# Method 1: Try newspaper3k first (fast)
article = self._extract_with_newspaper(url)
if article:
self.stats['method_success']['newspaper'] += 1
self.stats['total_success'] += 1
return article
# Method 2: Fallback to trafilatura
article = self._extract_with_trafilatura(url)
if article:
self.stats['method_success']['trafilatura'] += 1
self.stats['total_success'] += 1
return article
# Method 3: Fallback to readability
article = self._extract_with_readability(url)
if article:
self.stats['method_success']['readability'] += 1
self.stats['total_success'] += 1
return article
# All methods failed
self.stats['method_success']['failed'] += 1
logger.debug(f"All extraction methods failed for: {url}")
return None
def _extract_with_newspaper(self, url: str) -> Optional[Dict]:
"""Method 1: Extract using newspaper3k"""
try:
article = Article(url)
article.download()
article.parse()
# Validation
if not article.text or len(article.text) < 500:
return None
# Check age
pub_date = article.publish_date or datetime.now()
if datetime.now() - pub_date > timedelta(days=3):
return None
# Extract images
images = []
if article.top_image:
images.append(article.top_image)
for img in article.images[:5]:
if img and img not in images:
images.append(img)
# Extract videos
videos = list(article.movies)[:3] if article.movies else []
return {
'url': url,
'title': article.title or 'Untitled',
'content': article.text,
'author': ', '.join(article.authors) if article.authors else 'Unknown',
'published_date': pub_date,
'top_image': article.top_image,
'images': images,
'videos': videos
}
except Exception as e:
logger.debug(f"newspaper3k failed for {url}: {e}")
return None
def _extract_with_trafilatura(self, url: str) -> Optional[Dict]:
"""Method 2: Extract using trafilatura"""
try:
# Download with custom headers
self.update_headers()
downloaded = trafilatura.fetch_url(url)
if not downloaded:
return None
# Extract content
content = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=False,
no_fallback=False
)
if not content or len(content) < 500:
return None
# Extract metadata
metadata = trafilatura.extract_metadata(downloaded)
title = metadata.title if metadata and metadata.title else 'Untitled'
author = metadata.author if metadata and metadata.author else 'Unknown'
pub_date = metadata.date if metadata and metadata.date else datetime.now()
# Convert date string to datetime if needed
if isinstance(pub_date, str):
try:
pub_date = datetime.fromisoformat(pub_date.replace('Z', '+00:00'))
except:
pub_date = datetime.now()
# Extract images from HTML
images = []
try:
soup = BeautifulSoup(downloaded, 'html.parser')
for img in soup.find_all('img', limit=5):
src = img.get('src', '')
if src and src.startswith('http'):
images.append(src)
except:
pass
return {
'url': url,
'title': title,
'content': content,
'author': author,
'published_date': pub_date,
'top_image': images[0] if images else None,
'images': images,
'videos': []
}
except Exception as e:
logger.debug(f"trafilatura failed for {url}: {e}")
return None
def _extract_with_readability(self, url: str) -> Optional[Dict]:
"""Method 3: Extract using readability-lxml"""
try:
self.update_headers()
response = self.session.get(url, timeout=30)
if response.status_code != 200:
return None
# Extract with readability
doc = Document(response.text)
content = doc.summary()
# Parse with BeautifulSoup to get clean text
soup = BeautifulSoup(content, 'html.parser')
text = soup.get_text(separator='\n', strip=True)
if not text or len(text) < 500:
return None
# Extract title
title = doc.title() or soup.find('title')
if title and hasattr(title, 'text'):
title = title.text
elif not title:
title = 'Untitled'
# Extract images
images = []
for img in soup.find_all('img', limit=5):
src = img.get('src', '')
if src and src.startswith('http'):
images.append(src)
return {
'url': url,
'title': str(title),
'content': text,
'author': 'Unknown',
'published_date': datetime.now(),
'top_image': images[0] if images else None,
'images': images,
'videos': []
}
except Exception as e:
logger.debug(f"readability failed for {url}: {e}")
return None
def is_ai_related(self, text: str) -> bool:
"""Check if text is AI-related"""
ai_keywords = [
'artificial intelligence', 'ai', 'machine learning', 'ml',
'deep learning', 'neural network', 'chatgpt', 'gpt', 'llm',
'claude', 'openai', 'anthropic', 'transformer', 'nlp',
'generative ai', 'automation', 'computer vision', 'gemini',
'copilot', 'ai model', 'training data', 'algorithm'
]
text_lower = text.lower()
return any(keyword in text_lower for keyword in ai_keywords)
def detect_category_from_text(self, text: str) -> Optional[str]:
"""Detect category hint from text"""
text_lower = text.lower()
scores = {}
for category, keywords in config.CATEGORY_KEYWORDS.items():
score = sum(1 for keyword in keywords if keyword in text_lower)
scores[category] = score
if max(scores.values()) > 0:
return max(scores, key=scores.get)
return None
def run_scraper():
"""Main scraper execution function"""
logger.info("🚀 Starting scraper v2...")
start_time = time.time()
try:
scraper = AINewsScraper()
articles_count = scraper.scrape_all_sources()
duration = int(time.time() - start_time)
database.log_pipeline_stage(
stage='crawl',
status='completed',
articles_processed=articles_count,
duration=duration
)
logger.info(f"✅ Scraper completed in {duration}s. Articles scraped: {articles_count}")
return articles_count
except Exception as e:
logger.error(f"❌ Scraper failed: {e}")
database.log_pipeline_stage(
stage='crawl',
status='failed',
error_message=str(e)
)
return 0
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
run_scraper()

152
backend/test_scraper.py Executable file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
Test individual sources with the new scraper
Usage: python3 test_scraper.py [--source SOURCE_NAME] [--limit N]
"""
import sys
import argparse
from loguru import logger
import config
# Import the new scraper
from scraper_v2 import AINewsScraper
def test_source(source_name: str, limit: int = 5):
"""Test a single source"""
if source_name not in config.SOURCES:
logger.error(f"❌ Unknown source: {source_name}")
logger.info(f"Available sources: {', '.join(config.SOURCES.keys())}")
return False
source_config = config.SOURCES[source_name]
logger.info(f"🧪 Testing source: {source_name}")
logger.info(f" Config: {source_config}")
logger.info(f" Limit: {limit} articles")
logger.info("")
scraper = AINewsScraper()
articles = []
try:
if source_name == 'medium':
# Test only first tag
test_config = source_config.copy()
test_config['tags'] = [source_config['tags'][0]]
test_config['articles_per_tag'] = limit
articles = scraper.scrape_medium(test_config)
elif 'url' in source_config:
test_config = source_config.copy()
test_config['articles_limit'] = limit
articles = scraper.scrape_rss_feed(source_name, test_config)
else:
logger.error(f"❌ Unknown source type")
return False
# Print results
logger.info(f"\n✅ Test completed!")
logger.info(f" Articles extracted: {len(articles)}")
logger.info(f"\n📊 Extraction stats:")
logger.info(f" newspaper3k: {scraper.stats['method_success']['newspaper']}")
logger.info(f" trafilatura: {scraper.stats['method_success']['trafilatura']}")
logger.info(f" readability: {scraper.stats['method_success']['readability']}")
logger.info(f" failed: {scraper.stats['method_success']['failed']}")
if articles:
logger.info(f"\n📰 Sample article:")
sample = articles[0]
logger.info(f" Title: {sample['title'][:80]}...")
logger.info(f" Author: {sample['author']}")
logger.info(f" URL: {sample['url']}")
logger.info(f" Content length: {len(sample['content'])} chars")
logger.info(f" Images: {len(sample.get('images', []))}")
logger.info(f" Date: {sample['published_date']}")
# Show first 200 chars of content
logger.info(f"\n Content preview:")
logger.info(f" {sample['content'][:200]}...")
success_rate = len(articles) / scraper.stats['total_attempts'] if scraper.stats['total_attempts'] > 0 else 0
logger.info(f"\n{'='*60}")
if len(articles) >= limit * 0.5: # At least 50% success
logger.info(f"✅ SUCCESS: {source_name} is working ({success_rate:.0%} success rate)")
return True
elif len(articles) > 0:
logger.info(f"⚠️ PARTIAL: {source_name} is partially working ({success_rate:.0%} success rate)")
return True
else:
logger.info(f"❌ FAILED: {source_name} is not working")
return False
except Exception as e:
logger.error(f"❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def test_all_sources():
"""Test all enabled sources"""
logger.info("🧪 Testing all enabled sources...\n")
results = {}
for source_name, source_config in config.SOURCES.items():
if not source_config.get('enabled', True):
logger.info(f"⏭️ Skipping {source_name} (disabled)\n")
continue
success = test_source(source_name, limit=3)
results[source_name] = success
logger.info("")
# Summary
logger.info(f"\n{'='*60}")
logger.info(f"📊 TEST SUMMARY")
logger.info(f"{'='*60}")
working = [k for k, v in results.items() if v]
broken = [k for k, v in results.items() if not v]
logger.info(f"\n✅ Working sources ({len(working)}):")
for source in working:
logger.info(f"{source}")
if broken:
logger.info(f"\n❌ Broken sources ({len(broken)}):")
for source in broken:
logger.info(f"{source}")
logger.info(f"\n📈 Overall: {len(working)}/{len(results)} sources working ({100*len(working)//len(results)}%)")
return results
def main():
parser = argparse.ArgumentParser(description='Test burmddit scraper sources')
parser.add_argument('--source', type=str, help='Test specific source')
parser.add_argument('--limit', type=int, default=5, help='Number of articles to test (default: 5)')
parser.add_argument('--all', action='store_true', help='Test all sources')
args = parser.parse_args()
# Configure logger
logger.remove()
logger.add(sys.stdout, format="<level>{message}</level>", level="INFO")
if args.all:
test_all_sources()
elif args.source:
success = test_source(args.source, args.limit)
sys.exit(0 if success else 1)
else:
parser.print_help()
logger.info("\nAvailable sources:")
for source_name in config.SOURCES.keys():
enabled = "" if config.SOURCES[source_name].get('enabled', True) else ""
logger.info(f" {enabled} {source_name}")
if __name__ == '__main__':
main()

255
backend/translator_old.py Normal file
View File

@@ -0,0 +1,255 @@
# Burmese translation module using Claude
from typing import Dict, Optional
from loguru import logger
import anthropic
import re
import config
import time
class BurmeseTranslator:
def __init__(self):
self.client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
self.preserve_terms = config.TRANSLATION['preserve_terms']
def translate_article(self, article: Dict) -> Dict:
"""Translate compiled article to Burmese"""
logger.info(f"Translating article: {article['title'][:50]}...")
try:
# Translate title
title_burmese = self.translate_text(
text=article['title'],
context="This is an article title about AI technology"
)
# Translate excerpt
excerpt_burmese = self.translate_text(
text=article['excerpt'],
context="This is a brief article summary"
)
# Translate main content (in chunks if too long)
content_burmese = self.translate_long_text(article['content'])
# Return article with Burmese translations
return {
**article,
'title_burmese': title_burmese,
'excerpt_burmese': excerpt_burmese,
'content_burmese': content_burmese
}
except Exception as e:
logger.error(f"Translation error: {e}")
# Fallback: return original text if translation fails
return {
**article,
'title_burmese': article['title'],
'excerpt_burmese': article['excerpt'],
'content_burmese': article['content']
}
def translate_text(self, text: str, context: str = "") -> str:
"""Translate a text block to Burmese"""
# Build preserved terms list for this text
preserved_terms_str = ", ".join(self.preserve_terms)
prompt = f"""Translate the following English text to Burmese (Myanmar Unicode) in a CASUAL, EASY-TO-READ style.
🎯 CRITICAL GUIDELINES:
1. Write in **CASUAL, CONVERSATIONAL Burmese** - like talking to a friend over tea
2. Use **SIMPLE, EVERYDAY words** - avoid formal or academic language
3. Explain technical concepts in **LAYMAN TERMS** - as if explaining to your grandmother
4. Keep these terms in English: {preserved_terms_str}
5. Add **brief explanations** in parentheses for complex terms
6. Use **short sentences** - easy to read on mobile
7. Break up long paragraphs - white space is good
8. Keep markdown formatting (##, **, -, etc.) intact
TARGET AUDIENCE: General Myanmar public who are curious about AI but not tech experts
TONE: Friendly, approachable, informative but not boring
EXAMPLE STYLE:
❌ Bad (too formal): "ယခု နည်းပညာသည် ဉာဏ်ရည်တု ဖြစ်စဉ်များကို အသုံးပြုပါသည်"
✅ Good (casual): "ဒီနည်းပညာက AI (အထက်တန်းကွန်ပျူတာဦးနှောက်) ကို သုံးတာပါ"
Context: {context}
Text to translate:
{text}
Casual, easy-to-read Burmese translation:"""
try:
message = self.client.messages.create(
model=config.TRANSLATION['model'],
max_tokens=config.TRANSLATION['max_tokens'],
temperature=config.TRANSLATION['temperature'],
messages=[{"role": "user", "content": prompt}]
)
translated = message.content[0].text.strip()
# Post-process: ensure Unicode and clean up
translated = self.post_process_translation(translated)
return translated
except Exception as e:
logger.error(f"API translation error: {e}")
return text # Fallback to original
def translate_long_text(self, text: str, chunk_size: int = 2000) -> str:
"""Translate long text in chunks to stay within token limits"""
# If text is short enough, translate directly
if len(text) < chunk_size:
return self.translate_text(text, context="This is the main article content")
# Split into paragraphs
paragraphs = text.split('\n\n')
# Group paragraphs into chunks
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) < chunk_size:
current_chunk += para + '\n\n'
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para + '\n\n'
if current_chunk:
chunks.append(current_chunk.strip())
logger.info(f"Translating {len(chunks)} chunks...")
# Translate each chunk
translated_chunks = []
for i, chunk in enumerate(chunks):
logger.debug(f"Translating chunk {i+1}/{len(chunks)}")
translated = self.translate_text(
chunk,
context=f"This is part {i+1} of {len(chunks)} of a longer article"
)
translated_chunks.append(translated)
time.sleep(0.5) # Rate limiting
# Join chunks
return '\n\n'.join(translated_chunks)
def post_process_translation(self, text: str) -> str:
"""Clean up and validate translation"""
# Remove any accidental duplication
text = re.sub(r'(\n{3,})', '\n\n', text)
# Ensure proper spacing after punctuation
text = re.sub(r'([။၊])([^\s])', r'\1 \2', text)
# Preserve preserved terms (fix any that got translated)
for term in self.preserve_terms:
# If the term appears in a weird form, try to fix it
# (This is a simple check; more sophisticated matching could be added)
if term not in text and term.lower() in text.lower():
text = re.sub(re.escape(term.lower()), term, text, flags=re.IGNORECASE)
return text.strip()
def validate_burmese_text(self, text: str) -> bool:
"""Check if text contains valid Burmese Unicode"""
# Myanmar Unicode range: U+1000 to U+109F
burmese_pattern = re.compile(r'[\u1000-\u109F]')
return bool(burmese_pattern.search(text))
def run_translator(compiled_articles: list) -> list:
"""Translate compiled articles to Burmese"""
logger.info(f"Starting translator for {len(compiled_articles)} articles...")
start_time = time.time()
try:
translator = BurmeseTranslator()
translated_articles = []
for i, article in enumerate(compiled_articles, 1):
logger.info(f"Translating article {i}/{len(compiled_articles)}")
try:
translated = translator.translate_article(article)
# Validate translation
if translator.validate_burmese_text(translated['content_burmese']):
translated_articles.append(translated)
logger.info(f"✓ Translation successful for article {i}")
else:
logger.warning(f"✗ Translation validation failed for article {i}")
# Still add it, but flag it
translated_articles.append(translated)
time.sleep(1) # Rate limiting
except Exception as e:
logger.error(f"Error translating article {i}: {e}")
continue
duration = int(time.time() - start_time)
from database import log_pipeline_stage
log_pipeline_stage(
stage='translate',
status='completed',
articles_processed=len(translated_articles),
duration=duration
)
logger.info(f"Translator completed in {duration}s. Articles translated: {len(translated_articles)}")
return translated_articles
except Exception as e:
logger.error(f"Translator failed: {e}")
from database import log_pipeline_stage
log_pipeline_stage(
stage='translate',
status='failed',
error_message=str(e)
)
return []
if __name__ == '__main__':
from loguru import logger
logger.add(config.LOG_FILE, rotation="1 day")
# Test translation
test_article = {
'title': 'OpenAI Releases GPT-5: A New Era of AI',
'excerpt': 'OpenAI today announced GPT-5, the next generation of their language model.',
'content': '''OpenAI has officially released GPT-5, marking a significant milestone in artificial intelligence development.
## Key Features
The new model includes:
- 10x more parameters than GPT-4
- Better reasoning capabilities
- Multimodal support for video
- Reduced hallucinations
CEO Sam Altman said, "GPT-5 represents our most advanced AI system yet."
The model will be available to ChatGPT Plus subscribers starting next month.'''
}
translator = BurmeseTranslator()
translated = translator.translate_article(test_article)
print("\n=== ORIGINAL ===")
print(f"Title: {translated['title']}")
print(f"\nContent: {translated['content'][:200]}...")
print("\n=== BURMESE ===")
print(f"Title: {translated['title_burmese']}")
print(f"\nContent: {translated['content_burmese'][:200]}...")

352
backend/translator_v2.py Normal file
View File

@@ -0,0 +1,352 @@
# Improved Burmese translation module with better error handling
from typing import Dict, Optional
from loguru import logger
import anthropic
import re
import config
import time
class BurmeseTranslator:
def __init__(self):
self.client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
self.preserve_terms = config.TRANSLATION['preserve_terms']
def translate_article(self, article: Dict) -> Dict:
"""Translate compiled article to Burmese"""
logger.info(f"Translating article: {article['title'][:50]}...")
try:
# Translate title
title_burmese = self.translate_text(
text=article['title'],
context="This is an article title about AI technology",
max_length=200
)
# Translate excerpt
excerpt_burmese = self.translate_text(
text=article['excerpt'],
context="This is a brief article summary",
max_length=300
)
# Translate main content with improved chunking
content_burmese = self.translate_long_text(
article['content'],
chunk_size=1200 # Reduced from 2000 for safety
)
# Validate translation quality
if not self.validate_translation(content_burmese, article['content']):
logger.warning(f"Translation validation failed, using fallback")
# Try again with smaller chunks
content_burmese = self.translate_long_text(
article['content'],
chunk_size=800 # Even smaller
)
# Return article with Burmese translations
return {
**article,
'title_burmese': title_burmese,
'excerpt_burmese': excerpt_burmese,
'content_burmese': content_burmese
}
except Exception as e:
logger.error(f"Translation error: {e}")
# Fallback: return original text if translation fails
return {
**article,
'title_burmese': article['title'],
'excerpt_burmese': article['excerpt'],
'content_burmese': article['content']
}
def translate_text(self, text: str, context: str = "", max_length: int = None) -> str:
"""Translate a text block to Burmese with improved prompting"""
# Build preserved terms list
preserved_terms_str = ", ".join(self.preserve_terms)
# Add length guidance if specified
length_guidance = ""
if max_length:
length_guidance = f"\n⚠️ IMPORTANT: Keep translation under {max_length} words. Be concise."
prompt = f"""Translate the following English text to Burmese (Myanmar Unicode) in a CASUAL, EASY-TO-READ style.
🎯 CRITICAL GUIDELINES:
1. Write in **CASUAL, CONVERSATIONAL Burmese** - like talking to a friend
2. Use **SIMPLE, EVERYDAY words** - avoid formal or academic language
3. Explain technical concepts in **LAYMAN TERMS**
4. Keep these terms in English: {preserved_terms_str}
5. Add **brief explanations** in parentheses for complex terms
6. Use **short sentences** - easy to read on mobile
7. Break up long paragraphs - white space is good
8. Keep markdown formatting (##, **, -, etc.) intact{length_guidance}
🚫 CRITICAL: DO NOT REPEAT TEXT OR GET STUCK IN LOOPS!
- If you start repeating, STOP immediately
- Translate fully but concisely
- Each sentence should be unique
TARGET AUDIENCE: General Myanmar public curious about AI
Context: {context}
Text to translate:
{text}
Burmese translation (natural, concise, no repetitions):"""
try:
message = self.client.messages.create(
model=config.TRANSLATION['model'],
max_tokens=min(config.TRANSLATION['max_tokens'], 3000), # Cap at 3000
temperature=config.TRANSLATION['temperature'],
messages=[{"role": "user", "content": prompt}]
)
translated = message.content[0].text.strip()
# Post-process and validate
translated = self.post_process_translation(translated)
# Check for hallucination/loops
if self.detect_repetition(translated):
logger.warning("Detected repetitive text, retrying with lower temperature")
# Retry with lower temperature
message = self.client.messages.create(
model=config.TRANSLATION['model'],
max_tokens=min(config.TRANSLATION['max_tokens'], 3000),
temperature=0.3, # Lower temperature
messages=[{"role": "user", "content": prompt}]
)
translated = message.content[0].text.strip()
translated = self.post_process_translation(translated)
return translated
except Exception as e:
logger.error(f"API translation error: {e}")
return text # Fallback to original
def translate_long_text(self, text: str, chunk_size: int = 1200) -> str:
"""Translate long text in chunks with better error handling"""
# If text is short enough, translate directly
if len(text) < chunk_size:
return self.translate_text(text, context="This is the main article content")
logger.info(f"Article is {len(text)} chars, splitting into chunks...")
# Split into paragraphs first
paragraphs = text.split('\n\n')
# Group paragraphs into chunks (more conservative sizing)
chunks = []
current_chunk = ""
for para in paragraphs:
# Check if adding this paragraph would exceed chunk size
if len(current_chunk) + len(para) + 4 < chunk_size: # +4 for \n\n
if current_chunk:
current_chunk += '\n\n' + para
else:
current_chunk = para
else:
# Current chunk is full, save it
if current_chunk:
chunks.append(current_chunk.strip())
# Start new chunk with this paragraph
# If paragraph itself is too long, split it further
if len(para) > chunk_size:
# Split long paragraph by sentences
sentences = para.split('. ')
temp_chunk = ""
for sent in sentences:
if len(temp_chunk) + len(sent) + 2 < chunk_size:
temp_chunk += sent + '. '
else:
if temp_chunk:
chunks.append(temp_chunk.strip())
temp_chunk = sent + '. '
current_chunk = temp_chunk
else:
current_chunk = para
# Don't forget the last chunk
if current_chunk:
chunks.append(current_chunk.strip())
logger.info(f"Split into {len(chunks)} chunks (avg {len(text)//len(chunks)} chars each)")
# Translate each chunk with progress tracking
translated_chunks = []
failed_chunks = 0
for i, chunk in enumerate(chunks):
logger.info(f"Translating chunk {i+1}/{len(chunks)} ({len(chunk)} chars)...")
try:
translated = self.translate_text(
chunk,
context=f"This is part {i+1} of {len(chunks)} of a longer article"
)
# Validate chunk translation
if self.detect_repetition(translated):
logger.warning(f"Chunk {i+1} has repetition, retrying...")
time.sleep(1)
translated = self.translate_text(
chunk,
context=f"This is part {i+1} of {len(chunks)} - translate fully without repetition"
)
translated_chunks.append(translated)
time.sleep(0.5) # Rate limiting
except Exception as e:
logger.error(f"Failed to translate chunk {i+1}: {e}")
failed_chunks += 1
# Use original text as fallback for this chunk
translated_chunks.append(chunk)
time.sleep(1)
if failed_chunks > 0:
logger.warning(f"{failed_chunks}/{len(chunks)} chunks failed translation")
# Join chunks
result = '\n\n'.join(translated_chunks)
logger.info(f"Translation complete: {len(result)} chars (original: {len(text)} chars)")
return result
def detect_repetition(self, text: str, threshold: int = 5) -> bool:
"""Detect if text has repetitive patterns (hallucination)"""
if len(text) < 100:
return False
# Check for repeated phrases (5+ words)
words = text.split()
if len(words) < 10:
return False
# Look for 5-word sequences that appear multiple times
sequences = {}
for i in range(len(words) - 4):
seq = ' '.join(words[i:i+5])
sequences[seq] = sequences.get(seq, 0) + 1
# If any sequence appears 3+ times, it's likely repetition
max_repetitions = max(sequences.values()) if sequences else 0
if max_repetitions >= threshold:
logger.warning(f"Detected repetition: {max_repetitions} occurrences")
return True
return False
def validate_translation(self, translated: str, original: str) -> bool:
"""Validate translation quality"""
# Check 1: Not empty
if not translated or len(translated) < 50:
logger.warning("Translation too short")
return False
# Check 2: Has Burmese Unicode
if not self.validate_burmese_text(translated):
logger.warning("Translation missing Burmese text")
return False
# Check 3: Reasonable length ratio (translated should be 50-200% of original)
ratio = len(translated) / len(original)
if ratio < 0.3 or ratio > 3.0:
logger.warning(f"Translation length ratio suspicious: {ratio:.2f}")
return False
# Check 4: No repetition
if self.detect_repetition(translated):
logger.warning("Translation has repetitive patterns")
return False
return True
def post_process_translation(self, text: str) -> str:
"""Clean up and validate translation"""
# Remove excessive newlines
text = re.sub(r'(\n{3,})', '\n\n', text)
# Remove leading/trailing whitespace from each line
lines = [line.strip() for line in text.split('\n')]
text = '\n'.join(lines)
# Ensure proper spacing after Burmese punctuation
text = re.sub(r'([။၊])([^\s])', r'\1 \2', text)
# Remove any accidental English remnants that shouldn't be there
# (but preserve the terms we want to keep)
return text.strip()
def validate_burmese_text(self, text: str) -> bool:
"""Check if text contains valid Burmese Unicode"""
# Myanmar Unicode range: U+1000 to U+109F
burmese_pattern = re.compile(r'[\u1000-\u109F]')
return bool(burmese_pattern.search(text))
def run_translator(compiled_articles: list) -> list:
"""Translate compiled articles to Burmese"""
logger.info(f"Starting translator for {len(compiled_articles)} articles...")
start_time = time.time()
try:
translator = BurmeseTranslator()
translated_articles = []
for i, article in enumerate(compiled_articles, 1):
logger.info(f"Translating article {i}/{len(compiled_articles)}")
try:
translated_article = translator.translate_article(article)
translated_articles.append(translated_article)
logger.info(f"✓ Translation successful for article {i}")
except Exception as e:
logger.error(f"Failed to translate article {i}: {e}")
# Add article with original English text as fallback
translated_articles.append({
**article,
'title_burmese': article['title'],
'excerpt_burmese': article['excerpt'],
'content_burmese': article['content']
})
duration = int(time.time() - start_time)
logger.info(f"Translator completed in {duration}s. Articles translated: {len(translated_articles)}")
return translated_articles
except Exception as e:
logger.error(f"Translator failed: {e}")
return compiled_articles # Return originals as fallback
if __name__ == '__main__':
# Test the translator
test_article = {
'title': 'Test Article About AI',
'excerpt': 'This is a test excerpt about artificial intelligence.',
'content': 'This is test content. ' * 100 # Long content
}
translator = BurmeseTranslator()
result = translator.translate_article(test_article)
print("Title:", result['title_burmese'])
print("Excerpt:", result['excerpt_burmese'])
print("Content length:", len(result['content_burmese']))