"""Feed fetching worker — RQ tasks and scheduling loop.""" import logging import re import time from datetime import datetime, timezone import feedparser import httpx from dateutil import parser as dateparser from redis import Redis from rq import Queue from sqlalchemy import create_engine, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.orm import Session, sessionmaker from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, REDIS_URL from app.models import Category, Entry, Feed log = logging.getLogger(__name__) # ── Sync DB engine (for RQ worker) ── _engine = create_engine(DATABASE_URL_SYNC, echo=False, pool_size=5, max_overflow=3) SyncSession = sessionmaker(_engine, class_=Session, expire_on_commit=False) # ── RQ queue ── _redis = Redis.from_url(REDIS_URL) queue = Queue("reader", connection=_redis) # HTML tag stripper _html_re = re.compile(r"<[^>]+>") def _strip_html(text: str) -> str: """Remove HTML tags for word counting.""" if not text: return "" return _html_re.sub("", text) def _calc_reading_time(html_content: str) -> int: """Estimate reading time in minutes from HTML content.""" plain = _strip_html(html_content) word_count = len(plain.split()) return max(1, word_count // 200) def _parse_date(entry: dict) -> datetime | None: """Parse published date from a feedparser entry.""" for field in ("published", "updated", "created"): val = entry.get(field) if val: try: return dateparser.parse(val) except (ValueError, TypeError): continue # Try struct_time fields for field in ("published_parsed", "updated_parsed", "created_parsed"): val = entry.get(field) if val: try: return datetime(*val[:6], tzinfo=timezone.utc) except (ValueError, TypeError): continue return None def _get_entry_content(entry: dict) -> str: """Extract the best content from a feedparser entry.""" # Prefer content field (often full HTML) if entry.get("content"): return entry["content"][0].get("value", "") # Fall back to summary if entry.get("summary"): return entry["summary"] # Fall back to description if entry.get("description"): return entry["description"] return "" def _get_entry_author(entry: dict) -> str | None: """Extract author from a feedparser entry.""" if entry.get("author"): return entry["author"] if entry.get("author_detail", {}).get("name"): return entry["author_detail"]["name"] return None def _ensure_uncategorized(db: Session, user_id: str) -> int: """Ensure an 'Uncategorized' category exists for the user, return its ID.""" row = db.execute( select(Category).where( Category.user_id == user_id, Category.title == "Uncategorized", ) ).scalar_one_or_none() if row: return row.id cat = Category(user_id=user_id, title="Uncategorized") db.add(cat) db.flush() return cat.id def fetch_single_feed(feed_id: int): """Fetch and parse a single feed, inserting new entries.""" with SyncSession() as db: feed = db.execute(select(Feed).where(Feed.id == feed_id)).scalar_one_or_none() if not feed: log.warning("Feed %d not found, skipping", feed_id) return log.info("Fetching feed %d: %s", feed.id, feed.feed_url) headers = {"User-Agent": "Reader/1.0"} if feed.etag: headers["If-None-Match"] = feed.etag if feed.last_modified: headers["If-Modified-Since"] = feed.last_modified try: resp = httpx.get(feed.feed_url, headers=headers, timeout=30, follow_redirects=True) except httpx.HTTPError as e: log.error("HTTP error fetching feed %d: %s", feed.id, e) return # 304 Not Modified if resp.status_code == 304: log.debug("Feed %d not modified", feed.id) feed.last_fetched_at = datetime.utcnow() db.commit() return if resp.status_code != 200: log.warning("Feed %d returned status %d", feed.id, resp.status_code) return # Update etag/last-modified feed.etag = resp.headers.get("ETag") feed.last_modified = resp.headers.get("Last-Modified") feed.last_fetched_at = datetime.utcnow() parsed = feedparser.parse(resp.text) if not parsed.entries: log.debug("Feed %d has no entries", feed.id) db.commit() return new_count = 0 new_entry_ids = [] for fe in parsed.entries: url = fe.get("link") if not url: continue content = _get_entry_content(fe) pub_date = _parse_date(fe) stmt = pg_insert(Entry).values( feed_id=feed.id, user_id=feed.user_id, title=fe.get("title", "")[:1000] if fe.get("title") else None, url=url, content=content, author=_get_entry_author(fe), published_at=pub_date, status="unread", starred=False, reading_time=_calc_reading_time(content), ).on_conflict_do_nothing( constraint="uq_reader_entries_feed_url" ).returning(Entry.id) result = db.execute(stmt) row = result.fetchone() if row: new_entry_ids.append(row[0]) new_count += 1 db.commit() log.info("Feed %d: %d new entries from %d total", feed.id, new_count, len(parsed.entries)) # Fetch full content for new entries if new_entry_ids: _fetch_full_content_for_entries(db, new_entry_ids) def _fetch_full_content_for_entries(db, entry_ids: list[int]): """Fetch full article content for specific entries.""" from app.config import CRAWLER_URL entries = db.execute( select(Entry).where(Entry.id.in_(entry_ids)) ).scalars().all() log.info("Fetching full content for %d new entries", len(entries)) for entry in entries: if not entry.url: continue try: resp = httpx.post( f"{CRAWLER_URL}/crawl", json={"url": entry.url}, timeout=45, ) if resp.status_code == 200: data = resp.json() readable = data.get("readable_html", "") full_text = data.get("text", "") if readable: entry.full_content = readable if full_text: entry.reading_time = max(1, len(full_text.split()) // 200) elif full_text and len(full_text) > len(_strip_html(entry.content or "")): paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()] if not paragraphs: paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()] entry.full_content = "\n".join(f"

{p}

" for p in paragraphs) entry.reading_time = max(1, len(full_text.split()) // 200) else: entry.full_content = entry.content or "" else: entry.full_content = entry.content or "" except Exception as e: log.warning("Full content fetch failed for entry %d: %s", entry.id, e) entry.full_content = entry.content or "" db.commit() log.info("Full content done for %d entries", len(entries)) def fetch_full_content_batch(): """Fetch full article content for entries that only have RSS summaries.""" from app.config import CRAWLER_URL with SyncSession() as db: # Find entries with short content and no full_content (limit batch size) entries = db.execute( select(Entry).where( Entry.full_content.is_(None), Entry.url.isnot(None), Entry.status == "unread", ).order_by(Entry.published_at.desc()).limit(20) ).scalars().all() if not entries: return log.info("Fetching full content for %d entries", len(entries)) for entry in entries: try: resp = httpx.post( f"{CRAWLER_URL}/crawl", json={"url": entry.url}, timeout=45, ) if resp.status_code == 200: data = resp.json() readable = data.get("readable_html", "") full_text = data.get("text", "") if readable: entry.full_content = readable if full_text: entry.reading_time = max(1, len(full_text.split()) // 200) elif full_text and len(full_text) > len(_strip_html(entry.content or "")): paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()] if not paragraphs: paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()] entry.full_content = "\n".join(f"

{p}

" for p in paragraphs) entry.reading_time = max(1, len(full_text.split()) // 200) else: entry.full_content = entry.content or "" else: entry.full_content = entry.content or "" except Exception as e: log.warning("Full content fetch failed for entry %d: %s", entry.id, e) entry.full_content = entry.content or "" db.commit() log.info("Full content fetched for %d entries", len(entries)) def fetch_all_feeds(): """Fetch all feeds — called on schedule.""" with SyncSession() as db: feeds = db.execute(select(Feed)).scalars().all() log.info("Scheduling fetch for %d feeds", len(feeds)) for feed in feeds: try: fetch_single_feed(feed.id) except Exception: log.exception("Error fetching feed %d", feed.id) # Full content is now fetched inline for each new entry def cleanup_old_entries(): """Delete old entries: read > 30 days, unread > 60 days.""" from sqlalchemy import delete as sa_delete with SyncSession() as db: now = datetime.utcnow() thirty_days = now - __import__('datetime').timedelta(days=30) sixty_days = now - __import__('datetime').timedelta(days=60) # Read entries older than 30 days result1 = db.execute( sa_delete(Entry).where( Entry.status == "read", Entry.created_at < thirty_days, ) ) # Unread entries older than 60 days result2 = db.execute( sa_delete(Entry).where( Entry.status == "unread", Entry.created_at < sixty_days, ) ) db.commit() total = (result1.rowcount or 0) + (result2.rowcount or 0) if total > 0: log.info("Cleanup: deleted %d old entries (%d read, %d unread)", total, result1.rowcount or 0, result2.rowcount or 0) def run_scheduler(): """Simple loop that runs fetch_all_feeds every FEED_FETCH_INTERVAL seconds.""" log.info("Reader scheduler started — interval: %ds", FEED_FETCH_INTERVAL) # Create tables on first run (for the sync engine) from app.database import Base from app.models import Category, Feed, Entry # noqa: register models Base.metadata.create_all(_engine) cycles = 0 while True: try: fetch_all_feeds() except Exception: log.exception("Scheduler error in fetch_all_feeds") # Run cleanup once per day (every ~144 cycles at 10min interval) cycles += 1 if cycles % 144 == 0: try: cleanup_old_entries() except Exception: log.exception("Scheduler error in cleanup") time.sleep(FEED_FETCH_INTERVAL) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) run_scheduler()