# Database connection and utilities import psycopg2 from psycopg2.extras import RealDictCursor, Json from contextlib import contextmanager from typing import List, Dict, Optional, Tuple from loguru import logger import config @contextmanager def get_db_connection(): """Context manager for database connections""" conn = None try: conn = psycopg2.connect(config.DATABASE_URL) yield conn conn.commit() except Exception as e: if conn: conn.rollback() logger.error(f"Database error: {e}") raise finally: if conn: conn.close() def execute_query(query: str, params: tuple = None, fetch=False): """Execute a query and optionally fetch results""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(query, params) if fetch: return cur.fetchall() return cur.rowcount # Raw articles functions def insert_raw_article(url: str, title: str, content: str, author: str, published_date, source: str, category_hint: str = None): """Insert a scraped article into raw_articles table""" query = """ INSERT INTO raw_articles (url, title, content, author, published_date, source, category_hint) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url) DO NOTHING RETURNING id """ try: result = execute_query( query, (url, title, content, author, published_date, source, category_hint), fetch=True ) return result[0]['id'] if result else None except Exception as e: logger.error(f"Error inserting raw article: {e}") return None def get_unprocessed_articles(limit: int = 100) -> List[Dict]: """Get unprocessed raw articles""" query = """ SELECT * FROM raw_articles WHERE processed = FALSE ORDER BY published_date DESC LIMIT %s """ return execute_query(query, (limit,), fetch=True) def mark_article_processed(article_id: int, compiled_into: int = None): """Mark raw article as processed""" query = """ UPDATE raw_articles SET processed = TRUE, compiled_into = %s WHERE id = %s """ execute_query(query, (compiled_into, article_id)) # Categories functions def get_all_categories() -> List[Dict]: """Get all categories""" query = "SELECT * FROM categories ORDER BY id" return execute_query(query, fetch=True) def get_category_by_slug(slug: str) -> Optional[Dict]: """Get category by slug""" query = "SELECT * FROM categories WHERE slug = %s" result = execute_query(query, (slug,), fetch=True) return result[0] if result else None def detect_category(title: str, content: str) -> int: """Detect article category based on keywords""" text = (title + ' ' + content).lower() scores = {} for category, keywords in config.CATEGORY_KEYWORDS.items(): score = sum(1 for keyword in keywords if keyword in text) scores[category] = score # Get category with highest score best_category = max(scores, key=scores.get) # Default to AI News if no clear match if scores[best_category] == 0: best_category = 'AI News' # Get category ID category = get_category_by_slug(best_category.lower().replace(' & ', '-').replace(' ', '-')) return category['id'] if category else 1 # Default to first category # Articles functions def insert_article(title: str, title_burmese: str, slug: str, content: str, content_burmese: str, excerpt: str, excerpt_burmese: str, category_id: int, featured_image: str = None, images: List[str] = None, # 🔥 NEW videos: List[str] = None, # 🔥 NEW source_articles: List[Dict] = None, meta_description: str = None, meta_keywords: List[str] = None, reading_time: int = None, status: str = 'published') -> Optional[int]: """Insert a new article""" query = """ INSERT INTO articles ( title, title_burmese, slug, content, content_burmese, excerpt, excerpt_burmese, category_id, featured_image, images, videos, source_articles, meta_description, meta_keywords, reading_time, status, published_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CASE WHEN %s = 'published' THEN CURRENT_TIMESTAMP ELSE NULL END ) ON CONFLICT (slug) DO NOTHING RETURNING id """ try: result = execute_query( query, (title, title_burmese, slug, content, content_burmese, excerpt, excerpt_burmese, category_id, featured_image, images or [], # 🔥 Images array videos or [], # 🔥 Videos array Json(source_articles) if source_articles else None, meta_description, meta_keywords, reading_time, status, status), fetch=True ) return result[0]['id'] if result else None except Exception as e: logger.error(f"Error inserting article: {e}") return None def get_recent_articles(limit: int = 10) -> List[Dict]: """Get recently published articles""" query = """ SELECT * FROM published_articles LIMIT %s """ return execute_query(query, (limit,), fetch=True) def get_article_by_slug(slug: str) -> Optional[Dict]: """Get article by slug""" query = """ SELECT a.*, c.name as category_name, c.name_burmese as category_name_burmese FROM articles a JOIN categories c ON a.category_id = c.id WHERE a.slug = %s AND a.status = 'published' """ result = execute_query(query, (slug,), fetch=True) return result[0] if result else None def increment_view_count(slug: str): """Increment article view count""" query = "SELECT increment_view_count(%s)" execute_query(query, (slug,)) def get_trending_articles(days: int = 7, limit: int = 10) -> List[Dict]: """Get trending articles""" query = "SELECT * FROM get_trending_articles(%s)" return execute_query(query, (limit,), fetch=True) def get_articles_by_category(category_slug: str, limit: int = 20) -> List[Dict]: """Get articles by category""" query = """ SELECT * FROM published_articles WHERE category_slug = %s LIMIT %s """ return execute_query(query, (category_slug, limit), fetch=True) def search_articles(search_term: str, limit: int = 20) -> List[Dict]: """Search articles (Burmese + English)""" query = """ SELECT id, title_burmese, slug, excerpt_burmese, category_name_burmese, published_at FROM published_articles WHERE to_tsvector('simple', title_burmese || ' ' || COALESCE(excerpt_burmese, '')) @@ plainto_tsquery('simple', %s) OR title ILIKE %s ORDER BY published_at DESC LIMIT %s """ search_pattern = f"%{search_term}%" return execute_query(query, (search_term, search_pattern, limit), fetch=True) # Pipeline logging def log_pipeline_stage(stage: str, status: str, articles_processed: int = 0, error_message: str = None, duration: int = None): """Log pipeline execution stage""" query = """ INSERT INTO pipeline_logs (stage, status, articles_processed, error_message, duration_seconds) VALUES (%s, %s, %s, %s, %s) """ execute_query(query, (stage, status, articles_processed, error_message, duration)) def get_last_pipeline_run() -> Optional[Dict]: """Get last pipeline run info""" query = """ SELECT pipeline_run, COUNT(*) as stages, SUM(articles_processed) as total_articles FROM pipeline_logs WHERE pipeline_run = (SELECT MAX(pipeline_run) FROM pipeline_logs) GROUP BY pipeline_run """ result = execute_query(query, fetch=True) return result[0] if result else None # Statistics def get_site_stats() -> Dict: """Get overall site statistics""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT (SELECT COUNT(*) FROM articles WHERE status = 'published') as total_articles, (SELECT SUM(view_count) FROM articles) as total_views, (SELECT COUNT(*) FROM subscribers WHERE status = 'active') as subscribers, (SELECT COUNT(*) FROM raw_articles WHERE scraped_at > CURRENT_DATE) as articles_today """) return cur.fetchone() # Initialize database (run schema.sql) def initialize_database(): """Initialize database with schema""" try: with open('../database/schema.sql', 'r') as f: schema = f.read() with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(schema) logger.info("Database initialized successfully") return True except Exception as e: logger.error(f"Error initializing database: {e}") return False