Files
Yusuf Suleman a3eabf3e3b
All checks were successful
Security Checks / dockerfile-lint (push) Successful in 4s
Security Checks / dependency-audit (push) Successful in 13s
Security Checks / secret-scanning (push) Successful in 3s
feat: thumbnail extraction for Reader — fixes all clients
Server-side (dashboard + iOS + any client):
- Added thumbnail column to reader_entries
- Worker extracts from media:thumbnail, media:content, enclosures, HTML img
- API returns thumbnail in EntryOut with & decoding
- Backfilled 260 existing entries

iOS:
- Prefers API thumbnail, falls back to client-side extraction
- Decodes HTML entities in URLs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 19:32:47 -05:00

391 lines
13 KiB
Python

"""Feed fetching worker — RQ tasks and scheduling loop."""
import logging
import re
import time
from datetime import datetime, timezone
import feedparser
import httpx
from dateutil import parser as dateparser
from redis import Redis
from rq import Queue
from sqlalchemy import create_engine, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session, sessionmaker
from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, REDIS_URL
from app.models import Category, Entry, Feed
log = logging.getLogger(__name__)
# ── Sync DB engine (for RQ worker) ──
_engine = create_engine(DATABASE_URL_SYNC, echo=False, pool_size=5, max_overflow=3)
SyncSession = sessionmaker(_engine, class_=Session, expire_on_commit=False)
# ── RQ queue ──
_redis = Redis.from_url(REDIS_URL)
queue = Queue("reader", connection=_redis)
# HTML tag stripper
_html_re = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
"""Remove HTML tags for word counting."""
if not text:
return ""
return _html_re.sub("", text)
def _calc_reading_time(html_content: str) -> int:
"""Estimate reading time in minutes from HTML content."""
plain = _strip_html(html_content)
word_count = len(plain.split())
return max(1, word_count // 200)
def _parse_date(entry: dict) -> datetime | None:
"""Parse published date from a feedparser entry."""
for field in ("published", "updated", "created"):
val = entry.get(field)
if val:
try:
return dateparser.parse(val)
except (ValueError, TypeError):
continue
# Try struct_time fields
for field in ("published_parsed", "updated_parsed", "created_parsed"):
val = entry.get(field)
if val:
try:
return datetime(*val[:6], tzinfo=timezone.utc)
except (ValueError, TypeError):
continue
return None
def _get_entry_content(entry: dict) -> str:
"""Extract the best content from a feedparser entry."""
# Prefer content field (often full HTML)
if entry.get("content"):
return entry["content"][0].get("value", "")
# Fall back to summary
if entry.get("summary"):
return entry["summary"]
# Fall back to description
if entry.get("description"):
return entry["description"]
return ""
def _extract_thumbnail(entry: dict, content: str) -> str | None:
"""Extract thumbnail from feedparser entry or content HTML."""
# 1. Check media:thumbnail
for mt in entry.get("media_thumbnail", []):
if mt.get("url"):
return mt["url"]
# 2. Check media:content with image type
for mc in entry.get("media_content", []):
if mc.get("medium") == "image" or (mc.get("type", "").startswith("image")):
if mc.get("url"):
return mc["url"]
# 3. Check enclosures
for enc in entry.get("enclosures", []):
if enc.get("type", "").startswith("image") and enc.get("href"):
return enc["href"]
# 4. Extract from content HTML
if content:
match = re.search(r'<img[^>]+src=["\']([^"\']+)["\']', content[:3000], re.IGNORECASE)
if match:
url = match.group(1).replace("&amp;", "&")
if not any(skip in url.lower() for skip in ["1x1", "pixel", "tracking", "spacer"]):
return url
return None
def _get_entry_author(entry: dict) -> str | None:
"""Extract author from a feedparser entry."""
if entry.get("author"):
return entry["author"]
if entry.get("author_detail", {}).get("name"):
return entry["author_detail"]["name"]
return None
def _ensure_uncategorized(db: Session, user_id: str) -> int:
"""Ensure an 'Uncategorized' category exists for the user, return its ID."""
row = db.execute(
select(Category).where(
Category.user_id == user_id,
Category.title == "Uncategorized",
)
).scalar_one_or_none()
if row:
return row.id
cat = Category(user_id=user_id, title="Uncategorized")
db.add(cat)
db.flush()
return cat.id
def fetch_single_feed(feed_id: int):
"""Fetch and parse a single feed, inserting new entries."""
with SyncSession() as db:
feed = db.execute(select(Feed).where(Feed.id == feed_id)).scalar_one_or_none()
if not feed:
log.warning("Feed %d not found, skipping", feed_id)
return
log.info("Fetching feed %d: %s", feed.id, feed.feed_url)
headers = {"User-Agent": "Reader/1.0"}
if feed.etag:
headers["If-None-Match"] = feed.etag
if feed.last_modified:
headers["If-Modified-Since"] = feed.last_modified
try:
resp = httpx.get(feed.feed_url, headers=headers, timeout=30, follow_redirects=True)
except httpx.HTTPError as e:
log.error("HTTP error fetching feed %d: %s", feed.id, e)
return
# 304 Not Modified
if resp.status_code == 304:
log.debug("Feed %d not modified", feed.id)
feed.last_fetched_at = datetime.utcnow()
db.commit()
return
if resp.status_code != 200:
log.warning("Feed %d returned status %d", feed.id, resp.status_code)
return
# Update etag/last-modified
feed.etag = resp.headers.get("ETag")
feed.last_modified = resp.headers.get("Last-Modified")
feed.last_fetched_at = datetime.utcnow()
parsed = feedparser.parse(resp.text)
if not parsed.entries:
log.debug("Feed %d has no entries", feed.id)
db.commit()
return
new_count = 0
new_entry_ids = []
for fe in parsed.entries:
url = fe.get("link")
if not url:
continue
content = _get_entry_content(fe)
pub_date = _parse_date(fe)
thumb = _extract_thumbnail(fe, content)
stmt = pg_insert(Entry).values(
feed_id=feed.id,
user_id=feed.user_id,
title=fe.get("title", "")[:1000] if fe.get("title") else None,
url=url,
content=content,
author=_get_entry_author(fe),
published_at=pub_date,
status="unread",
starred=False,
thumbnail=thumb,
reading_time=_calc_reading_time(content),
).on_conflict_do_nothing(
constraint="uq_reader_entries_feed_url"
).returning(Entry.id)
result = db.execute(stmt)
row = result.fetchone()
if row:
new_entry_ids.append(row[0])
new_count += 1
db.commit()
log.info("Feed %d: %d new entries from %d total", feed.id, new_count, len(parsed.entries))
# Fetch full content for new entries
if new_entry_ids:
_fetch_full_content_for_entries(db, new_entry_ids)
def _fetch_full_content_for_entries(db, entry_ids: list[int]):
"""Fetch full article content for specific entries."""
from app.config import CRAWLER_URL
entries = db.execute(
select(Entry).where(Entry.id.in_(entry_ids))
).scalars().all()
log.info("Fetching full content for %d new entries", len(entries))
for entry in entries:
if not entry.url:
continue
try:
resp = httpx.post(
f"{CRAWLER_URL}/crawl",
json={"url": entry.url},
timeout=45,
)
if resp.status_code == 200:
data = resp.json()
readable = data.get("readable_html", "")
full_text = data.get("text", "")
if readable:
entry.full_content = readable
if full_text:
entry.reading_time = max(1, len(full_text.split()) // 200)
elif full_text and len(full_text) > len(_strip_html(entry.content or "")):
paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
if not paragraphs:
paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()]
entry.full_content = "\n".join(f"<p>{p}</p>" for p in paragraphs)
entry.reading_time = max(1, len(full_text.split()) // 200)
else:
entry.full_content = entry.content or ""
else:
entry.full_content = entry.content or ""
except Exception as e:
log.warning("Full content fetch failed for entry %d: %s", entry.id, e)
entry.full_content = entry.content or ""
db.commit()
log.info("Full content done for %d entries", len(entries))
def fetch_full_content_batch():
"""Fetch full article content for entries that only have RSS summaries."""
from app.config import CRAWLER_URL
with SyncSession() as db:
# Find entries with short content and no full_content (limit batch size)
entries = db.execute(
select(Entry).where(
Entry.full_content.is_(None),
Entry.url.isnot(None),
Entry.status == "unread",
).order_by(Entry.published_at.desc()).limit(20)
).scalars().all()
if not entries:
return
log.info("Fetching full content for %d entries", len(entries))
for entry in entries:
try:
resp = httpx.post(
f"{CRAWLER_URL}/crawl",
json={"url": entry.url},
timeout=45,
)
if resp.status_code == 200:
data = resp.json()
readable = data.get("readable_html", "")
full_text = data.get("text", "")
if readable:
entry.full_content = readable
if full_text:
entry.reading_time = max(1, len(full_text.split()) // 200)
elif full_text and len(full_text) > len(_strip_html(entry.content or "")):
paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
if not paragraphs:
paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()]
entry.full_content = "\n".join(f"<p>{p}</p>" for p in paragraphs)
entry.reading_time = max(1, len(full_text.split()) // 200)
else:
entry.full_content = entry.content or ""
else:
entry.full_content = entry.content or ""
except Exception as e:
log.warning("Full content fetch failed for entry %d: %s", entry.id, e)
entry.full_content = entry.content or ""
db.commit()
log.info("Full content fetched for %d entries", len(entries))
def fetch_all_feeds():
"""Fetch all feeds — called on schedule."""
with SyncSession() as db:
feeds = db.execute(select(Feed)).scalars().all()
log.info("Scheduling fetch for %d feeds", len(feeds))
for feed in feeds:
try:
fetch_single_feed(feed.id)
except Exception:
log.exception("Error fetching feed %d", feed.id)
# Full content is now fetched inline for each new entry
def cleanup_old_entries():
"""Delete old entries: read > 30 days, unread > 60 days."""
from sqlalchemy import delete as sa_delete
with SyncSession() as db:
now = datetime.utcnow()
thirty_days = now - __import__('datetime').timedelta(days=30)
sixty_days = now - __import__('datetime').timedelta(days=60)
# Read entries older than 30 days
result1 = db.execute(
sa_delete(Entry).where(
Entry.status == "read",
Entry.created_at < thirty_days,
)
)
# Unread entries older than 60 days
result2 = db.execute(
sa_delete(Entry).where(
Entry.status == "unread",
Entry.created_at < sixty_days,
)
)
db.commit()
total = (result1.rowcount or 0) + (result2.rowcount or 0)
if total > 0:
log.info("Cleanup: deleted %d old entries (%d read, %d unread)",
total, result1.rowcount or 0, result2.rowcount or 0)
def run_scheduler():
"""Simple loop that runs fetch_all_feeds every FEED_FETCH_INTERVAL seconds."""
log.info("Reader scheduler started — interval: %ds", FEED_FETCH_INTERVAL)
# Create tables on first run (for the sync engine)
from app.database import Base
from app.models import Category, Feed, Entry # noqa: register models
Base.metadata.create_all(_engine)
cycles = 0
while True:
try:
fetch_all_feeds()
except Exception:
log.exception("Scheduler error in fetch_all_feeds")
# Run cleanup once per day (every ~144 cycles at 10min interval)
cycles += 1
if cycles % 144 == 0:
try:
cleanup_old_entries()
except Exception:
log.exception("Scheduler error in cleanup")
time.sleep(FEED_FETCH_INTERVAL)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
run_scheduler()