forked from minzeyaphyo/burmddit
258 lines
9.2 KiB
Python
258 lines
9.2 KiB
Python
# Database connection and utilities
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor, Json
|
|
from contextlib import contextmanager
|
|
from typing import List, Dict, Optional, Tuple
|
|
from loguru import logger
|
|
import config
|
|
|
|
@contextmanager
|
|
def get_db_connection():
|
|
"""Context manager for database connections"""
|
|
conn = None
|
|
try:
|
|
conn = psycopg2.connect(config.DATABASE_URL)
|
|
yield conn
|
|
conn.commit()
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
logger.error(f"Database error: {e}")
|
|
raise
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def execute_query(query: str, params: tuple = None, fetch=False):
|
|
"""Execute a query and optionally fetch results"""
|
|
with get_db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(query, params)
|
|
if fetch:
|
|
return cur.fetchall()
|
|
return cur.rowcount
|
|
|
|
# Raw articles functions
|
|
def insert_raw_article(url: str, title: str, content: str, author: str,
|
|
published_date, source: str, category_hint: str = None):
|
|
"""Insert a scraped article into raw_articles table"""
|
|
query = """
|
|
INSERT INTO raw_articles (url, title, content, author, published_date, source, category_hint)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (url) DO NOTHING
|
|
RETURNING id
|
|
"""
|
|
try:
|
|
result = execute_query(
|
|
query,
|
|
(url, title, content, author, published_date, source, category_hint),
|
|
fetch=True
|
|
)
|
|
return result[0]['id'] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Error inserting raw article: {e}")
|
|
return None
|
|
|
|
def get_unprocessed_articles(limit: int = 100) -> List[Dict]:
|
|
"""Get unprocessed raw articles"""
|
|
query = """
|
|
SELECT * FROM raw_articles
|
|
WHERE processed = FALSE
|
|
ORDER BY published_date DESC
|
|
LIMIT %s
|
|
"""
|
|
return execute_query(query, (limit,), fetch=True)
|
|
|
|
def mark_article_processed(article_id: int, compiled_into: int = None):
|
|
"""Mark raw article as processed"""
|
|
query = """
|
|
UPDATE raw_articles
|
|
SET processed = TRUE, compiled_into = %s
|
|
WHERE id = %s
|
|
"""
|
|
execute_query(query, (compiled_into, article_id))
|
|
|
|
# Categories functions
|
|
def get_all_categories() -> List[Dict]:
|
|
"""Get all categories"""
|
|
query = "SELECT * FROM categories ORDER BY id"
|
|
return execute_query(query, fetch=True)
|
|
|
|
def get_category_by_slug(slug: str) -> Optional[Dict]:
|
|
"""Get category by slug"""
|
|
query = "SELECT * FROM categories WHERE slug = %s"
|
|
result = execute_query(query, (slug,), fetch=True)
|
|
return result[0] if result else None
|
|
|
|
def detect_category(title: str, content: str) -> int:
|
|
"""Detect article category based on keywords"""
|
|
text = (title + ' ' + content).lower()
|
|
scores = {}
|
|
|
|
for category, keywords in config.CATEGORY_KEYWORDS.items():
|
|
score = sum(1 for keyword in keywords if keyword in text)
|
|
scores[category] = score
|
|
|
|
# Get category with highest score
|
|
best_category = max(scores, key=scores.get)
|
|
|
|
# Default to AI News if no clear match
|
|
if scores[best_category] == 0:
|
|
best_category = 'AI News'
|
|
|
|
# Get category ID
|
|
category = get_category_by_slug(best_category.lower().replace(' & ', '-').replace(' ', '-'))
|
|
return category['id'] if category else 1 # Default to first category
|
|
|
|
# Articles functions
|
|
def insert_article(title: str, title_burmese: str, slug: str,
|
|
content: str, content_burmese: str,
|
|
excerpt: str, excerpt_burmese: str,
|
|
category_id: int, featured_image: str = None,
|
|
images: List[str] = None, # 🔥 NEW
|
|
videos: List[str] = None, # 🔥 NEW
|
|
source_articles: List[Dict] = None,
|
|
meta_description: str = None,
|
|
meta_keywords: List[str] = None,
|
|
reading_time: int = None,
|
|
status: str = 'published') -> Optional[int]:
|
|
"""Insert a new article"""
|
|
query = """
|
|
INSERT INTO articles (
|
|
title, title_burmese, slug, content, content_burmese,
|
|
excerpt, excerpt_burmese, category_id, featured_image,
|
|
images, videos,
|
|
source_articles, meta_description, meta_keywords,
|
|
reading_time, status, published_at
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
CASE WHEN %s = 'published' THEN CURRENT_TIMESTAMP ELSE NULL END
|
|
)
|
|
ON CONFLICT (slug) DO NOTHING
|
|
RETURNING id
|
|
"""
|
|
try:
|
|
result = execute_query(
|
|
query,
|
|
(title, title_burmese, slug, content, content_burmese,
|
|
excerpt, excerpt_burmese, category_id, featured_image,
|
|
images or [], # 🔥 Images array
|
|
videos or [], # 🔥 Videos array
|
|
Json(source_articles) if source_articles else None,
|
|
meta_description, meta_keywords, reading_time, status, status),
|
|
fetch=True
|
|
)
|
|
return result[0]['id'] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Error inserting article: {e}")
|
|
return None
|
|
|
|
def get_recent_articles(limit: int = 10) -> List[Dict]:
|
|
"""Get recently published articles"""
|
|
query = """
|
|
SELECT * FROM published_articles
|
|
LIMIT %s
|
|
"""
|
|
return execute_query(query, (limit,), fetch=True)
|
|
|
|
def get_article_by_slug(slug: str) -> Optional[Dict]:
|
|
"""Get article by slug"""
|
|
query = """
|
|
SELECT a.*, c.name as category_name, c.name_burmese as category_name_burmese
|
|
FROM articles a
|
|
JOIN categories c ON a.category_id = c.id
|
|
WHERE a.slug = %s AND a.status = 'published'
|
|
"""
|
|
result = execute_query(query, (slug,), fetch=True)
|
|
return result[0] if result else None
|
|
|
|
def increment_view_count(slug: str):
|
|
"""Increment article view count"""
|
|
query = "SELECT increment_view_count(%s)"
|
|
execute_query(query, (slug,))
|
|
|
|
def get_trending_articles(days: int = 7, limit: int = 10) -> List[Dict]:
|
|
"""Get trending articles"""
|
|
query = "SELECT * FROM get_trending_articles(%s)"
|
|
return execute_query(query, (limit,), fetch=True)
|
|
|
|
def get_articles_by_category(category_slug: str, limit: int = 20) -> List[Dict]:
|
|
"""Get articles by category"""
|
|
query = """
|
|
SELECT * FROM published_articles
|
|
WHERE category_slug = %s
|
|
LIMIT %s
|
|
"""
|
|
return execute_query(query, (category_slug, limit), fetch=True)
|
|
|
|
def search_articles(search_term: str, limit: int = 20) -> List[Dict]:
|
|
"""Search articles (Burmese + English)"""
|
|
query = """
|
|
SELECT
|
|
id, title_burmese, slug, excerpt_burmese,
|
|
category_name_burmese, published_at
|
|
FROM published_articles
|
|
WHERE
|
|
to_tsvector('simple', title_burmese || ' ' || COALESCE(excerpt_burmese, ''))
|
|
@@ plainto_tsquery('simple', %s)
|
|
OR title ILIKE %s
|
|
ORDER BY published_at DESC
|
|
LIMIT %s
|
|
"""
|
|
search_pattern = f"%{search_term}%"
|
|
return execute_query(query, (search_term, search_pattern, limit), fetch=True)
|
|
|
|
# Pipeline logging
|
|
def log_pipeline_stage(stage: str, status: str, articles_processed: int = 0,
|
|
error_message: str = None, duration: int = None):
|
|
"""Log pipeline execution stage"""
|
|
query = """
|
|
INSERT INTO pipeline_logs (stage, status, articles_processed, error_message, duration_seconds)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
"""
|
|
execute_query(query, (stage, status, articles_processed, error_message, duration))
|
|
|
|
def get_last_pipeline_run() -> Optional[Dict]:
|
|
"""Get last pipeline run info"""
|
|
query = """
|
|
SELECT pipeline_run, COUNT(*) as stages,
|
|
SUM(articles_processed) as total_articles
|
|
FROM pipeline_logs
|
|
WHERE pipeline_run = (SELECT MAX(pipeline_run) FROM pipeline_logs)
|
|
GROUP BY pipeline_run
|
|
"""
|
|
result = execute_query(query, fetch=True)
|
|
return result[0] if result else None
|
|
|
|
# Statistics
|
|
def get_site_stats() -> Dict:
|
|
"""Get overall site statistics"""
|
|
with get_db_connection() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
(SELECT COUNT(*) FROM articles WHERE status = 'published') as total_articles,
|
|
(SELECT SUM(view_count) FROM articles) as total_views,
|
|
(SELECT COUNT(*) FROM subscribers WHERE status = 'active') as subscribers,
|
|
(SELECT COUNT(*) FROM raw_articles WHERE scraped_at > CURRENT_DATE) as articles_today
|
|
""")
|
|
return cur.fetchone()
|
|
|
|
# Initialize database (run schema.sql)
|
|
def initialize_database():
|
|
"""Initialize database with schema"""
|
|
try:
|
|
with open('../database/schema.sql', 'r') as f:
|
|
schema = f.read()
|
|
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(schema)
|
|
|
|
logger.info("Database initialized successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error initializing database: {e}")
|
|
return False
|