forked from minzeyaphyo/burmddit
200 lines
7.9 KiB
Python
200 lines
7.9 KiB
Python
# Publisher module - Publishes translated articles to the website
|
|
|
|
from typing import List, Dict
|
|
from slugify import slugify
|
|
from loguru import logger
|
|
import database
|
|
import config
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
class ArticlePublisher:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def publish_articles(self, translated_articles: List[Dict]) -> int:
|
|
"""Publish translated articles to the website"""
|
|
published_count = 0
|
|
|
|
for i, article in enumerate(translated_articles):
|
|
try:
|
|
logger.info(f"Publishing article {i+1}/{len(translated_articles)}: {article['title'][:50]}...")
|
|
|
|
# Prepare article data
|
|
article_data = self.prepare_article_for_publishing(article)
|
|
|
|
# Insert into database
|
|
article_id = database.insert_article(**article_data)
|
|
|
|
if article_id:
|
|
published_count += 1
|
|
logger.info(f"✓ Article published successfully (ID: {article_id})")
|
|
|
|
# Mark raw articles as processed
|
|
for source in article.get('source_articles', []):
|
|
# This is simplified - in production, track raw_article IDs
|
|
pass
|
|
else:
|
|
logger.warning(f"✗ Article already exists or failed to publish")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error publishing article {i+1}: {e}")
|
|
continue
|
|
|
|
logger.info(f"Published {published_count}/{len(translated_articles)} articles")
|
|
return published_count
|
|
|
|
def prepare_article_for_publishing(self, article: Dict) -> Dict:
|
|
"""Prepare article data for database insertion"""
|
|
|
|
# Generate slug from Burmese title (romanized) or English title
|
|
slug = self.generate_slug(article.get('title_burmese', article['title']))
|
|
|
|
# Ensure excerpt is generated if missing
|
|
excerpt_burmese = article.get('excerpt_burmese') or article['content_burmese'][:200] + '...'
|
|
excerpt = article.get('excerpt') or article['content'][:200] + '...'
|
|
|
|
# Calculate reading time (words per minute)
|
|
reading_time = self.calculate_reading_time(article['content_burmese'])
|
|
|
|
# Detect category
|
|
category_id = self.detect_category_id(article)
|
|
|
|
# Generate meta description
|
|
meta_description = excerpt_burmese[:160]
|
|
|
|
# Generate keywords
|
|
meta_keywords = self.extract_keywords(article['title_burmese'] + ' ' + article['content_burmese'])
|
|
|
|
# Prepare source articles JSONB
|
|
source_articles = article.get('source_articles', [])
|
|
|
|
return {
|
|
'title': article['title'],
|
|
'title_burmese': article['title_burmese'],
|
|
'slug': slug,
|
|
'content': article['content'],
|
|
'content_burmese': article['content_burmese'],
|
|
'excerpt': excerpt,
|
|
'excerpt_burmese': excerpt_burmese,
|
|
'category_id': category_id,
|
|
'featured_image': article.get('featured_image'),
|
|
'images': article.get('images', []), # 🔥 Multiple images
|
|
'videos': article.get('videos', []), # 🔥 Videos
|
|
'source_articles': source_articles,
|
|
'meta_description': meta_description,
|
|
'meta_keywords': meta_keywords,
|
|
'reading_time': reading_time,
|
|
'status': config.PUBLISHING['status_default']
|
|
}
|
|
|
|
def generate_slug(self, title: str) -> str:
|
|
"""Generate URL-friendly slug"""
|
|
# Slugify handles Unicode characters
|
|
slug = slugify(title, max_length=100)
|
|
|
|
# If slug is empty (all non-ASCII), use timestamp
|
|
if not slug:
|
|
slug = f"article-{int(time.time())}"
|
|
|
|
# Make unique by adding timestamp if needed
|
|
# (Database will handle conflicts with ON CONFLICT DO NOTHING)
|
|
return slug
|
|
|
|
def calculate_reading_time(self, text: str) -> int:
|
|
"""Calculate reading time in minutes (Burmese text)"""
|
|
# Burmese reading speed: approximately 200-250 characters per minute
|
|
# (slower than English due to script complexity)
|
|
chars = len(text)
|
|
minutes = max(1, round(chars / 225))
|
|
return minutes
|
|
|
|
def detect_category_id(self, article: Dict) -> int:
|
|
"""Detect and return category ID"""
|
|
# Check if category hint was provided
|
|
if article.get('category_hint'):
|
|
category_slug = article['category_hint'].lower().replace(' & ', '-').replace(' ', '-')
|
|
category = database.get_category_by_slug(category_slug)
|
|
if category:
|
|
return category['id']
|
|
|
|
# Fall back to content-based detection
|
|
return database.detect_category(
|
|
article['title'] + ' ' + article.get('title_burmese', ''),
|
|
article['content'][:500]
|
|
)
|
|
|
|
def extract_keywords(self, text: str, limit: int = 10) -> List[str]:
|
|
"""Extract keywords from text"""
|
|
# Simple keyword extraction (can be improved with NLP)
|
|
# For now, use common AI terms
|
|
keywords = [
|
|
'AI', 'ChatGPT', 'GPT', 'OpenAI', 'Anthropic', 'Claude',
|
|
'Machine Learning', 'Deep Learning', 'Neural Network',
|
|
'LLM', 'Transformer', 'NLP', 'Computer Vision',
|
|
'Automation', 'Generative AI'
|
|
]
|
|
|
|
# Find which keywords appear in the text
|
|
text_lower = text.lower()
|
|
found_keywords = []
|
|
|
|
for keyword in keywords:
|
|
if keyword.lower() in text_lower:
|
|
found_keywords.append(keyword)
|
|
|
|
return found_keywords[:limit]
|
|
|
|
def schedule_publications(self, translated_articles: List[Dict]) -> int:
|
|
"""Schedule articles for staggered publication (future enhancement)"""
|
|
# For now, publish all immediately
|
|
# In future: use PUBLISH_AT timestamp to space out publications
|
|
return self.publish_articles(translated_articles)
|
|
|
|
def run_publisher(translated_articles: List[Dict]) -> int:
|
|
"""Main publisher execution"""
|
|
logger.info(f"Starting publisher for {len(translated_articles)} articles...")
|
|
start_time = time.time()
|
|
|
|
try:
|
|
publisher = ArticlePublisher()
|
|
published_count = publisher.publish_articles(translated_articles)
|
|
|
|
duration = int(time.time() - start_time)
|
|
database.log_pipeline_stage(
|
|
stage='publish',
|
|
status='completed',
|
|
articles_processed=published_count,
|
|
duration=duration
|
|
)
|
|
|
|
logger.info(f"Publisher completed in {duration}s. Articles published: {published_count}")
|
|
return published_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Publisher failed: {e}")
|
|
database.log_pipeline_stage(
|
|
stage='publish',
|
|
status='failed',
|
|
error_message=str(e)
|
|
)
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
from loguru import logger
|
|
logger.add(config.LOG_FILE, rotation="1 day")
|
|
|
|
# Test with sample translated article
|
|
test_article = {
|
|
'title': 'OpenAI Releases GPT-5',
|
|
'title_burmese': 'OpenAI က GPT-5 ကို ထုတ်ပြန်လိုက်ပြီ',
|
|
'content': 'Full English content...',
|
|
'content_burmese': 'OpenAI သည် ယနေ့ GPT-5 ကို တရားဝင် ထုတ်ပြန်လိုက်ပြီ ဖြစ်ပါသည်။...',
|
|
'excerpt': 'OpenAI announces GPT-5...',
|
|
'excerpt_burmese': 'OpenAI က GPT-5 ကို ကြေညာလိုက်ပါပြီ...',
|
|
'source_articles': [{'url': 'https://example.com', 'title': 'Test', 'author': 'Test'}]
|
|
}
|
|
|
|
count = run_publisher([test_article])
|
|
print(f"Published: {count}")
|