# Web scraper v2 for AI news sources - ROBUST VERSION # Multi-layer fallback extraction for maximum reliability import requests from bs4 import BeautifulSoup import feedparser from newspaper import Article from datetime import datetime, timedelta from typing import List, Dict, Optional from loguru import logger import time import config import database from fake_useragent import UserAgent import trafilatura from readability import Document import random class AINewsScraper: def __init__(self): self.session = requests.Session() self.ua = UserAgent() self.update_headers() # Success tracking self.stats = { 'total_attempts': 0, 'total_success': 0, 'method_success': { 'newspaper': 0, 'trafilatura': 0, 'readability': 0, 'failed': 0 } } def update_headers(self): """Rotate user agent for each request""" self.session.headers.update({ 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', }) def scrape_all_sources(self) -> int: """Scrape all enabled sources""" total_articles = 0 for source_name, source_config in config.SOURCES.items(): if not source_config.get('enabled', True): logger.info(f"ā­ļø Skipping {source_name} (disabled)") continue logger.info(f"šŸ” Scraping {source_name}...") try: if source_name == 'medium': articles = self.scrape_medium(source_config) elif 'url' in source_config: articles = self.scrape_rss_feed(source_name, source_config) else: logger.warning(f"āš ļø Unknown source type: {source_name}") continue # Store articles in database stored_count = 0 for article in articles: try: article_id = database.insert_raw_article( url=article['url'], title=article['title'], content=article['content'], author=article['author'], published_date=article['published_date'], source=source_name, category_hint=article.get('category_hint') ) if article_id: stored_count += 1 except Exception as e: logger.debug(f"Failed to store article {article['url']}: {e}") continue total_articles += stored_count logger.info(f"āœ… {source_name}: {stored_count}/{len(articles)} articles stored") # Rate limiting time.sleep(config.RATE_LIMITS['delay_between_requests']) except Exception as e: logger.error(f"āŒ Error scraping {source_name}: {e}") continue # Log stats logger.info(f"\nšŸ“Š Extraction Method Stats:") logger.info(f" newspaper3k: {self.stats['method_success']['newspaper']}") logger.info(f" trafilatura: {self.stats['method_success']['trafilatura']}") logger.info(f" readability: {self.stats['method_success']['readability']}") logger.info(f" failed: {self.stats['method_success']['failed']}") logger.info(f" Success rate: {self.stats['total_success']}/{self.stats['total_attempts']} ({100*self.stats['total_success']//max(self.stats['total_attempts'],1)}%)") logger.info(f"\nāœ… Total articles scraped: {total_articles}") return total_articles def scrape_medium(self, source_config: Dict) -> List[Dict]: """Scrape Medium articles by tags""" articles = [] for tag in source_config['tags']: try: url = source_config['url_pattern'].format(tag=tag) self.update_headers() response = self.session.get(url, timeout=30) soup = BeautifulSoup(response.content, 'html.parser') # Medium's structure: find article links links = soup.find_all('a', href=True, limit=source_config['articles_per_tag'] * 3) processed = 0 for link in links: if processed >= source_config['articles_per_tag']: break article_url = link['href'] if not article_url.startswith('http'): article_url = 'https://medium.com' + article_url # Only process Medium article URLs if 'medium.com' not in article_url or '?' in article_url: continue # Extract article content article = self.extract_article_content(article_url) if article and len(article['content']) > 500: article['category_hint'] = self.detect_category_from_text( article['title'] + ' ' + article['content'][:500] ) articles.append(article) processed += 1 logger.debug(f" Medium tag '{tag}': {processed} articles") time.sleep(3) # Rate limiting for Medium except Exception as e: logger.error(f"Error scraping Medium tag '{tag}': {e}") continue return articles def scrape_rss_feed(self, source_name: str, source_config: Dict) -> List[Dict]: """Scrape articles from RSS feed""" articles = [] try: # Parse RSS feed feed = feedparser.parse(source_config['url']) if not feed.entries: logger.warning(f" No entries found in RSS feed") return articles max_articles = source_config.get('articles_limit', 20) processed = 0 for entry in feed.entries: if processed >= max_articles: break try: # Check if AI-related (if filter enabled) if source_config.get('filter_ai'): text = entry.get('title', '') + ' ' + entry.get('summary', '') if not self.is_ai_related(text): continue article_url = entry.link # Extract full article article = self.extract_article_content(article_url) if article and len(article['content']) > 500: article['category_hint'] = self.detect_category_from_text( article['title'] + ' ' + article['content'][:500] ) articles.append(article) processed += 1 except Exception as e: logger.debug(f"Failed to parse RSS entry: {e}") continue except Exception as e: logger.error(f"Error fetching RSS feed: {e}") return articles def extract_article_content(self, url: str) -> Optional[Dict]: """ Extract article content using multi-layer fallback approach: 1. Try newspaper3k (fast but unreliable) 2. Fallback to trafilatura (reliable) 3. Fallback to readability-lxml (reliable) 4. Give up if all fail """ self.stats['total_attempts'] += 1 # Method 1: Try newspaper3k first (fast) article = self._extract_with_newspaper(url) if article: self.stats['method_success']['newspaper'] += 1 self.stats['total_success'] += 1 return article # Method 2: Fallback to trafilatura article = self._extract_with_trafilatura(url) if article: self.stats['method_success']['trafilatura'] += 1 self.stats['total_success'] += 1 return article # Method 3: Fallback to readability article = self._extract_with_readability(url) if article: self.stats['method_success']['readability'] += 1 self.stats['total_success'] += 1 return article # All methods failed self.stats['method_success']['failed'] += 1 logger.debug(f"All extraction methods failed for: {url}") return None def _extract_with_newspaper(self, url: str) -> Optional[Dict]: """Method 1: Extract using newspaper3k""" try: article = Article(url) article.download() article.parse() # Validation if not article.text or len(article.text) < 500: return None # Check age pub_date = article.publish_date or datetime.now() if datetime.now() - pub_date > timedelta(days=3): return None # Extract images images = [] if article.top_image: images.append(article.top_image) for img in article.images[:5]: if img and img not in images: images.append(img) # Extract videos videos = list(article.movies)[:3] if article.movies else [] return { 'url': url, 'title': article.title or 'Untitled', 'content': article.text, 'author': ', '.join(article.authors) if article.authors else 'Unknown', 'published_date': pub_date, 'top_image': article.top_image, 'images': images, 'videos': videos } except Exception as e: logger.debug(f"newspaper3k failed for {url}: {e}") return None def _extract_with_trafilatura(self, url: str) -> Optional[Dict]: """Method 2: Extract using trafilatura""" try: # Download with custom headers self.update_headers() downloaded = trafilatura.fetch_url(url) if not downloaded: return None # Extract content content = trafilatura.extract( downloaded, include_comments=False, include_tables=False, no_fallback=False ) if not content or len(content) < 500: return None # Extract metadata metadata = trafilatura.extract_metadata(downloaded) title = metadata.title if metadata and metadata.title else 'Untitled' author = metadata.author if metadata and metadata.author else 'Unknown' pub_date = metadata.date if metadata and metadata.date else datetime.now() # Convert date string to datetime if needed if isinstance(pub_date, str): try: pub_date = datetime.fromisoformat(pub_date.replace('Z', '+00:00')) except: pub_date = datetime.now() # Extract images from HTML images = [] try: soup = BeautifulSoup(downloaded, 'html.parser') for img in soup.find_all('img', limit=5): src = img.get('src', '') if src and src.startswith('http'): images.append(src) except: pass return { 'url': url, 'title': title, 'content': content, 'author': author, 'published_date': pub_date, 'top_image': images[0] if images else None, 'images': images, 'videos': [] } except Exception as e: logger.debug(f"trafilatura failed for {url}: {e}") return None def _extract_with_readability(self, url: str) -> Optional[Dict]: """Method 3: Extract using readability-lxml""" try: self.update_headers() response = self.session.get(url, timeout=30) if response.status_code != 200: return None # Extract with readability doc = Document(response.text) content = doc.summary() # Parse with BeautifulSoup to get clean text soup = BeautifulSoup(content, 'html.parser') text = soup.get_text(separator='\n', strip=True) if not text or len(text) < 500: return None # Extract title title = doc.title() or soup.find('title') if title and hasattr(title, 'text'): title = title.text elif not title: title = 'Untitled' # Extract images images = [] for img in soup.find_all('img', limit=5): src = img.get('src', '') if src and src.startswith('http'): images.append(src) return { 'url': url, 'title': str(title), 'content': text, 'author': 'Unknown', 'published_date': datetime.now(), 'top_image': images[0] if images else None, 'images': images, 'videos': [] } except Exception as e: logger.debug(f"readability failed for {url}: {e}") return None def is_ai_related(self, text: str) -> bool: """Check if text is AI-related""" ai_keywords = [ 'artificial intelligence', 'ai', 'machine learning', 'ml', 'deep learning', 'neural network', 'chatgpt', 'gpt', 'llm', 'claude', 'openai', 'anthropic', 'transformer', 'nlp', 'generative ai', 'automation', 'computer vision', 'gemini', 'copilot', 'ai model', 'training data', 'algorithm' ] text_lower = text.lower() return any(keyword in text_lower for keyword in ai_keywords) def detect_category_from_text(self, text: str) -> Optional[str]: """Detect category hint from text""" text_lower = text.lower() scores = {} for category, keywords in config.CATEGORY_KEYWORDS.items(): score = sum(1 for keyword in keywords if keyword in text_lower) scores[category] = score if max(scores.values()) > 0: return max(scores, key=scores.get) return None def run_scraper(): """Main scraper execution function""" logger.info("šŸš€ Starting scraper v2...") start_time = time.time() try: scraper = AINewsScraper() articles_count = scraper.scrape_all_sources() duration = int(time.time() - start_time) database.log_pipeline_stage( stage='crawl', status='completed', articles_processed=articles_count, duration=duration ) logger.info(f"āœ… Scraper completed in {duration}s. Articles scraped: {articles_count}") return articles_count except Exception as e: logger.error(f"āŒ Scraper failed: {e}") database.log_pipeline_stage( stage='crawl', status='failed', error_message=str(e) ) return 0 if __name__ == '__main__': from loguru import logger logger.add(config.LOG_FILE, rotation="1 day") run_scraper()