feat: major platform expansion — Brain service, RSS reader, iOS app, AI assistants, Firefox extension
All checks were successful
Security Checks / dependency-audit (push) Successful in 1m13s
Security Checks / secret-scanning (push) Successful in 3s
Security Checks / dockerfile-lint (push) Successful in 3s

Brain Service:
- Playwright stealth crawler replacing browserless (og:image, Readability, Reddit JSON API)
- AI classification with tag definitions and folder assignment
- YouTube video download via yt-dlp
- Karakeep migration complete (96 items)
- Taxonomy management (folders with icons/colors, tags)
- Discovery shuffle, sort options, search (Meilisearch + pgvector)
- Item tag/folder editing, card color accents

RSS Reader Service:
- Custom FastAPI reader replacing Miniflux
- Feed management (add/delete/refresh), category support
- Full article extraction via Readability
- Background content fetching for new entries
- Mark all read with confirmation
- Infinite scroll, retention cleanup (30/60 day)
- 17 feeds migrated from Miniflux

iOS App (SwiftUI):
- Native iOS 17+ app with @Observable architecture
- Cookie-based auth, configurable gateway URL
- Dashboard with custom background photo + frosted glass widgets
- Full fitness module (today/templates/goals/food library)
- AI assistant chat (fitness + brain, raw JSON state management)
- 120fps ProMotion support

AI Assistants (Gateway):
- Unified dispatcher with fitness/brain domain detection
- Fitness: natural language food logging, photo analysis, multi-item splitting
- Brain: save/append/update/delete notes, search & answer, undo support
- Madiha user gets fitness-only (brain disabled)

Firefox Extension:
- One-click save to Brain from any page
- Login with platform credentials
- Right-click context menu (save page/link/image)
- Notes field for URL saves
- Signed and published on AMO

Other:
- Reader bookmark button routes to Brain (was Karakeep)
- Fitness food library with "Add" button + add-to-meal popup
- Kindle send file size check (25MB SMTP2GO limit)
- Atelier UI as default (useAtelierShell=true)
- Mobile upload box in nav drawer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Yusuf Suleman
2026-04-03 00:56:29 -05:00
parent af1765bd8e
commit 4592e35732
97 changed files with 11009 additions and 532 deletions

View File

View File

View File

@@ -0,0 +1,49 @@
"""Category endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_user_id, get_db_session
from app.models import Category
router = APIRouter(prefix="/api/categories", tags=["categories"])
class CategoryOut(BaseModel):
id: int
title: str
class Config:
from_attributes = True
class CategoryCreate(BaseModel):
title: str
@router.get("", response_model=list[CategoryOut])
async def list_categories(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Category)
.where(Category.user_id == user_id)
.order_by(Category.title)
)
return result.scalars().all()
@router.post("", response_model=CategoryOut, status_code=201)
async def create_category(
body: CategoryCreate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
cat = Category(user_id=user_id, title=body.title)
db.add(cat)
await db.commit()
await db.refresh(cat)
return cat

View File

@@ -0,0 +1,21 @@
"""API dependencies — auth, database session."""
from fastapi import Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
async def get_user_id(
x_gateway_user_id: str = Header(None, alias="X-Gateway-User-Id"),
) -> str:
"""Extract authenticated user ID from gateway-injected header."""
if not x_gateway_user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
return x_gateway_user_id
async def get_db_session() -> AsyncSession:
"""Provide an async database session."""
async for session in get_db():
yield session

View File

@@ -0,0 +1,264 @@
"""Entry endpoints."""
import logging
from typing import Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_db_session, get_user_id
from app.config import CRAWLER_URL
from app.models import Entry, Feed
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api/entries", tags=["entries"])
# ── Schemas ──────────────────────────────────────────────────────────────
class FeedRef(BaseModel):
id: int
title: str
class Config:
from_attributes = True
class EntryOut(BaseModel):
id: int
title: str | None = None
url: str | None = None
content: str | None = None
full_content: str | None = None
author: str | None = None
published_at: str | None = None
status: str = "unread"
starred: bool = False
reading_time: int = 1
feed: FeedRef | None = None
class Config:
from_attributes = True
@classmethod
def from_entry(cls, entry: Entry) -> "EntryOut":
# Use full_content if available, otherwise RSS content
best_content = entry.full_content if entry.full_content else entry.content
return cls(
id=entry.id,
title=entry.title,
url=entry.url,
content=best_content,
full_content=entry.full_content,
author=entry.author,
published_at=entry.published_at.isoformat() if entry.published_at else None,
status=entry.status,
starred=entry.starred,
reading_time=entry.reading_time,
feed=FeedRef(id=entry.feed.id, title=entry.feed.title) if entry.feed else None,
)
class EntryListOut(BaseModel):
total: int
entries: list[EntryOut]
class EntryBulkUpdate(BaseModel):
entry_ids: list[int]
status: str
# ── Routes ───────────────────────────────────────────────────────────────
@router.get("", response_model=EntryListOut)
async def list_entries(
status: Optional[str] = Query(None),
starred: Optional[bool] = Query(None),
feed_id: Optional[int] = Query(None),
category_id: Optional[int] = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
direction: str = Query("desc"),
order: str = Query("published_at"),
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
query = select(Entry).where(Entry.user_id == user_id)
count_query = select(func.count(Entry.id)).where(Entry.user_id == user_id)
if status:
query = query.where(Entry.status == status)
count_query = count_query.where(Entry.status == status)
if starred is not None:
query = query.where(Entry.starred == starred)
count_query = count_query.where(Entry.starred == starred)
if feed_id is not None:
query = query.where(Entry.feed_id == feed_id)
count_query = count_query.where(Entry.feed_id == feed_id)
if category_id is not None:
# Join through feed to filter by category
query = query.join(Feed, Entry.feed_id == Feed.id).where(Feed.category_id == category_id)
count_query = count_query.join(Feed, Entry.feed_id == Feed.id).where(Feed.category_id == category_id)
# Ordering
order_col = Entry.published_at if order == "published_at" else Entry.created_at
if direction == "asc":
query = query.order_by(order_col.asc().nullslast())
else:
query = query.order_by(order_col.desc().nullsfirst())
# Total count
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Paginate
query = query.options(selectinload(Entry.feed)).offset(offset).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return EntryListOut(
total=total,
entries=[EntryOut.from_entry(e) for e in entries],
)
@router.put("")
async def bulk_update_entries(
body: EntryBulkUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
if body.status not in ("read", "unread"):
raise HTTPException(status_code=400, detail="Status must be 'read' or 'unread'")
await db.execute(
update(Entry)
.where(Entry.user_id == user_id, Entry.id.in_(body.entry_ids))
.values(status=body.status)
)
await db.commit()
return {"ok": True}
class MarkAllReadBody(BaseModel):
feed_id: int | None = None
category_id: int | None = None
@router.put("/mark-all-read")
async def mark_all_read(
body: MarkAllReadBody,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
"""Mark ALL unread entries as read, optionally filtered by feed or category."""
q = update(Entry).where(Entry.user_id == user_id, Entry.status == "unread")
if body.feed_id:
q = q.where(Entry.feed_id == body.feed_id)
elif body.category_id:
from app.models import Feed
feed_ids_q = select(Feed.id).where(Feed.category_id == body.category_id, Feed.user_id == user_id)
q = q.where(Entry.feed_id.in_(feed_ids_q))
result = await db.execute(q.values(status="read"))
await db.commit()
return {"ok": True, "marked": result.rowcount}
@router.get("/{entry_id}", response_model=EntryOut)
async def get_entry(
entry_id: int,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Entry)
.options(selectinload(Entry.feed))
.where(Entry.id == entry_id, Entry.user_id == user_id)
)
entry = result.scalar_one_or_none()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
return EntryOut.from_entry(entry)
@router.put("/{entry_id}/bookmark")
async def toggle_bookmark(
entry_id: int,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Entry).where(Entry.id == entry_id, Entry.user_id == user_id)
)
entry = result.scalar_one_or_none()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
entry.starred = not entry.starred
await db.commit()
return {"starred": entry.starred}
@router.post("/{entry_id}/fetch-full-content", response_model=EntryOut)
async def fetch_full_content(
entry_id: int,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Entry)
.options(selectinload(Entry.feed))
.where(Entry.id == entry_id, Entry.user_id == user_id)
)
entry = result.scalar_one_or_none()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
if not entry.url:
raise HTTPException(status_code=400, detail="Entry has no URL to crawl")
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{CRAWLER_URL}/crawl",
json={"url": entry.url},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
log.error("Crawler error for entry %d: %s", entry_id, e)
raise HTTPException(status_code=502, detail="Failed to fetch full content")
# Prefer readable_html (Readability-extracted clean article with images)
readable = data.get("readable_html", "")
full_text = data.get("text", "")
if readable:
entry.full_content = readable
elif full_text:
paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
if not paragraphs:
paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()]
entry.full_content = "\n".join(f"<p>{p}</p>" for p in paragraphs)
else:
entry.full_content = ""
# Recalculate reading time from plain text
if full_text:
word_count = len(full_text.split())
entry.reading_time = max(1, word_count // 200)
await db.commit()
await db.refresh(entry)
return EntryOut.from_entry(entry)

View File

@@ -0,0 +1,242 @@
"""Feed endpoints."""
import logging
import re
import feedparser
import httpx
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db_session, get_user_id
from app.models import Category, Entry, Feed
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api/feeds", tags=["feeds"])
# ── Schemas ──────────────────────────────────────────────────────────────
class CategoryRef(BaseModel):
id: int
title: str
class Config:
from_attributes = True
class FeedOut(BaseModel):
id: int
title: str
feed_url: str
site_url: str | None = None
category: CategoryRef | None = None
class Config:
from_attributes = True
class FeedCreate(BaseModel):
feed_url: str
category_id: int | None = None
class CountersOut(BaseModel):
unreads: dict[str, int]
# ── Helpers ──────────────────────────────────────────────────────────────
def _discover_feed_url(html: str, base_url: str) -> str | None:
"""Try to find an RSS/Atom feed link in HTML."""
patterns = [
r'<link[^>]+type=["\']application/(?:rss|atom)\+xml["\'][^>]+href=["\']([^"\']+)["\']',
r'<link[^>]+href=["\']([^"\']+)["\'][^>]+type=["\']application/(?:rss|atom)\+xml["\']',
]
for pat in patterns:
match = re.search(pat, html, re.IGNORECASE)
if match:
href = match.group(1)
if href.startswith("/"):
# Resolve relative URL
from urllib.parse import urljoin
href = urljoin(base_url, href)
return href
return None
async def _fetch_and_parse_feed(feed_url: str) -> tuple[str, str, str | None]:
"""
Fetch a URL. If it's a valid feed, return (feed_url, title, site_url).
If it's HTML, try to discover the feed link and follow it.
"""
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
resp = await client.get(feed_url, headers={"User-Agent": "Reader/1.0"})
resp.raise_for_status()
body = resp.text
parsed = feedparser.parse(body)
# Check if it's a valid feed
if parsed.feed.get("title") or parsed.entries:
title = parsed.feed.get("title", feed_url)
site_url = parsed.feed.get("link")
return feed_url, title, site_url
# Not a feed — try to discover from HTML
discovered = _discover_feed_url(body, feed_url)
if not discovered:
raise HTTPException(status_code=400, detail="No RSS/Atom feed found at this URL")
# Fetch the discovered feed
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
resp2 = await client.get(discovered, headers={"User-Agent": "Reader/1.0"})
resp2.raise_for_status()
parsed2 = feedparser.parse(resp2.text)
title = parsed2.feed.get("title", discovered)
site_url = parsed2.feed.get("link") or feed_url
return discovered, title, site_url
# ── Routes ───────────────────────────────────────────────────────────────
@router.get("/counters", response_model=CountersOut)
async def feed_counters(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Entry.feed_id, func.count(Entry.id))
.where(Entry.user_id == user_id, Entry.status == "unread")
.group_by(Entry.feed_id)
)
unreads = {str(row[0]): row[1] for row in result.all()}
return {"unreads": unreads}
@router.get("", response_model=list[FeedOut])
async def list_feeds(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Feed)
.where(Feed.user_id == user_id)
.order_by(Feed.title)
)
return result.scalars().all()
@router.post("", response_model=FeedOut, status_code=201)
async def create_feed(
body: FeedCreate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
# Check for duplicate
existing = await db.execute(
select(Feed).where(Feed.feed_url == body.feed_url)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Feed already exists")
# Validate category belongs to user
if body.category_id:
cat = await db.execute(
select(Category).where(
Category.id == body.category_id,
Category.user_id == user_id,
)
)
if not cat.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Category not found")
# Fetch and discover feed
try:
actual_url, title, site_url = await _fetch_and_parse_feed(body.feed_url)
except httpx.HTTPError as e:
log.warning("Failed to fetch feed %s: %s", body.feed_url, e)
raise HTTPException(status_code=400, detail=f"Could not fetch feed: {e}")
# Check again with discovered URL
if actual_url != body.feed_url:
existing = await db.execute(
select(Feed).where(Feed.feed_url == actual_url)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Feed already exists")
feed = Feed(
user_id=user_id,
category_id=body.category_id,
title=title,
feed_url=actual_url,
site_url=site_url,
)
db.add(feed)
await db.commit()
await db.refresh(feed)
return feed
@router.delete("/{feed_id}", status_code=204)
async def delete_feed(
feed_id: int,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Feed).where(Feed.id == feed_id, Feed.user_id == user_id)
)
feed = result.scalar_one_or_none()
if not feed:
raise HTTPException(status_code=404, detail="Feed not found")
await db.delete(feed)
await db.commit()
@router.post("/{feed_id}/refresh")
async def refresh_feed(
feed_id: int,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Feed).where(Feed.id == feed_id, Feed.user_id == user_id)
)
feed = result.scalar_one_or_none()
if not feed:
raise HTTPException(status_code=404, detail="Feed not found")
import asyncio
from app.worker.tasks import fetch_single_feed
await asyncio.to_thread(fetch_single_feed, feed_id)
return {"ok": True, "message": f"Refreshed {feed.title}"}
@router.post("/refresh-all")
async def refresh_all_feeds(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Feed).where(Feed.user_id == user_id)
)
feeds = result.scalars().all()
import asyncio
from app.worker.tasks import fetch_single_feed
for feed in feeds:
try:
await asyncio.to_thread(fetch_single_feed, feed.id)
except Exception:
pass
return {"ok": True, "message": f"Refreshed {len(feeds)} feeds"}

View File

@@ -0,0 +1,23 @@
"""Reader service configuration — all from environment variables."""
import os
# ── Database (reuse Brain's PostgreSQL) ──
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://brain:brain@brain-db:5432/brain",
)
DATABASE_URL_SYNC = DATABASE_URL.replace("+asyncpg", "")
# ── Redis (reuse Brain's Redis) ──
REDIS_URL = os.environ.get("REDIS_URL", "redis://brain-redis:6379/0")
# ── Crawler (reuse Brain's Playwright crawler) ──
CRAWLER_URL = os.environ.get("CRAWLER_URL", "http://brain-crawler:3100")
# ── Service ──
PORT = int(os.environ.get("PORT", "8300"))
DEBUG = os.environ.get("DEBUG", "").lower() in ("1", "true")
# ── Feed fetch interval (seconds) ──
FEED_FETCH_INTERVAL = int(os.environ.get("FEED_FETCH_INTERVAL", "600"))

View File

@@ -0,0 +1,18 @@
"""Database session and engine setup."""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import DATABASE_URL
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=5)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncSession:
async with async_session() as session:
yield session

View File

@@ -0,0 +1,43 @@
"""Reader service — FastAPI entrypoint."""
import logging
from fastapi import FastAPI
from app.api.categories import router as categories_router
from app.api.feeds import router as feeds_router
from app.api.entries import router as entries_router
from app.config import DEBUG
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
app = FastAPI(
title="Reader",
description="Self-hosted RSS reader — replaces Miniflux.",
version="1.0.0",
docs_url="/api/docs" if DEBUG else None,
redoc_url=None,
)
app.include_router(categories_router)
app.include_router(feeds_router)
app.include_router(entries_router)
@app.get("/api/health")
async def health():
return {"status": "ok"}
@app.on_event("startup")
async def startup():
from app.database import engine, Base
from app.models import Category, Feed, Entry # noqa: register models
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logging.getLogger(__name__).info("Reader service started")

View File

@@ -0,0 +1,74 @@
"""SQLAlchemy models for the reader service."""
from datetime import datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import relationship
from app.database import Base
class Category(Base):
__tablename__ = "reader_categories"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(64), nullable=False)
title = Column(String(255), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
feeds = relationship("Feed", back_populates="category", lazy="selectin")
class Feed(Base):
__tablename__ = "reader_feeds"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(64), nullable=False)
category_id = Column(Integer, ForeignKey("reader_categories.id", ondelete="SET NULL"), nullable=True)
title = Column(String(500), nullable=False)
feed_url = Column(Text, nullable=False, unique=True)
site_url = Column(Text)
etag = Column(String(255))
last_modified = Column(String(255))
last_fetched_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
category = relationship("Category", back_populates="feeds", lazy="selectin")
entries = relationship("Entry", back_populates="feed", lazy="noload", cascade="all, delete-orphan")
class Entry(Base):
__tablename__ = "reader_entries"
__table_args__ = (
UniqueConstraint("feed_id", "url", name="uq_reader_entries_feed_url"),
Index("idx_reader_entries_user_status", "user_id", "status"),
Index("idx_reader_entries_user_starred", "user_id", "starred"),
Index("idx_reader_entries_feed", "feed_id"),
Index("idx_reader_entries_published", "published_at"),
)
id = Column(Integer, primary_key=True, autoincrement=True)
feed_id = Column(Integer, ForeignKey("reader_feeds.id", ondelete="CASCADE"), nullable=False)
user_id = Column(String(64), nullable=False)
title = Column(String(1000))
url = Column(Text)
content = Column(Text)
full_content = Column(Text)
author = Column(String(500))
published_at = Column(DateTime)
status = Column(String(10), default="unread")
starred = Column(Boolean, default=False)
reading_time = Column(Integer, default=1)
created_at = Column(DateTime, default=datetime.utcnow)
feed = relationship("Feed", back_populates="entries", lazy="selectin")

View File

View File

@@ -0,0 +1,363 @@
"""Feed fetching worker — RQ tasks and scheduling loop."""
import logging
import re
import time
from datetime import datetime, timezone
import feedparser
import httpx
from dateutil import parser as dateparser
from redis import Redis
from rq import Queue
from sqlalchemy import create_engine, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session, sessionmaker
from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, REDIS_URL
from app.models import Category, Entry, Feed
log = logging.getLogger(__name__)
# ── Sync DB engine (for RQ worker) ──
_engine = create_engine(DATABASE_URL_SYNC, echo=False, pool_size=5, max_overflow=3)
SyncSession = sessionmaker(_engine, class_=Session, expire_on_commit=False)
# ── RQ queue ──
_redis = Redis.from_url(REDIS_URL)
queue = Queue("reader", connection=_redis)
# HTML tag stripper
_html_re = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
"""Remove HTML tags for word counting."""
if not text:
return ""
return _html_re.sub("", text)
def _calc_reading_time(html_content: str) -> int:
"""Estimate reading time in minutes from HTML content."""
plain = _strip_html(html_content)
word_count = len(plain.split())
return max(1, word_count // 200)
def _parse_date(entry: dict) -> datetime | None:
"""Parse published date from a feedparser entry."""
for field in ("published", "updated", "created"):
val = entry.get(field)
if val:
try:
return dateparser.parse(val)
except (ValueError, TypeError):
continue
# Try struct_time fields
for field in ("published_parsed", "updated_parsed", "created_parsed"):
val = entry.get(field)
if val:
try:
return datetime(*val[:6], tzinfo=timezone.utc)
except (ValueError, TypeError):
continue
return None
def _get_entry_content(entry: dict) -> str:
"""Extract the best content from a feedparser entry."""
# Prefer content field (often full HTML)
if entry.get("content"):
return entry["content"][0].get("value", "")
# Fall back to summary
if entry.get("summary"):
return entry["summary"]
# Fall back to description
if entry.get("description"):
return entry["description"]
return ""
def _get_entry_author(entry: dict) -> str | None:
"""Extract author from a feedparser entry."""
if entry.get("author"):
return entry["author"]
if entry.get("author_detail", {}).get("name"):
return entry["author_detail"]["name"]
return None
def _ensure_uncategorized(db: Session, user_id: str) -> int:
"""Ensure an 'Uncategorized' category exists for the user, return its ID."""
row = db.execute(
select(Category).where(
Category.user_id == user_id,
Category.title == "Uncategorized",
)
).scalar_one_or_none()
if row:
return row.id
cat = Category(user_id=user_id, title="Uncategorized")
db.add(cat)
db.flush()
return cat.id
def fetch_single_feed(feed_id: int):
"""Fetch and parse a single feed, inserting new entries."""
with SyncSession() as db:
feed = db.execute(select(Feed).where(Feed.id == feed_id)).scalar_one_or_none()
if not feed:
log.warning("Feed %d not found, skipping", feed_id)
return
log.info("Fetching feed %d: %s", feed.id, feed.feed_url)
headers = {"User-Agent": "Reader/1.0"}
if feed.etag:
headers["If-None-Match"] = feed.etag
if feed.last_modified:
headers["If-Modified-Since"] = feed.last_modified
try:
resp = httpx.get(feed.feed_url, headers=headers, timeout=30, follow_redirects=True)
except httpx.HTTPError as e:
log.error("HTTP error fetching feed %d: %s", feed.id, e)
return
# 304 Not Modified
if resp.status_code == 304:
log.debug("Feed %d not modified", feed.id)
feed.last_fetched_at = datetime.utcnow()
db.commit()
return
if resp.status_code != 200:
log.warning("Feed %d returned status %d", feed.id, resp.status_code)
return
# Update etag/last-modified
feed.etag = resp.headers.get("ETag")
feed.last_modified = resp.headers.get("Last-Modified")
feed.last_fetched_at = datetime.utcnow()
parsed = feedparser.parse(resp.text)
if not parsed.entries:
log.debug("Feed %d has no entries", feed.id)
db.commit()
return
new_count = 0
new_entry_ids = []
for fe in parsed.entries:
url = fe.get("link")
if not url:
continue
content = _get_entry_content(fe)
pub_date = _parse_date(fe)
stmt = pg_insert(Entry).values(
feed_id=feed.id,
user_id=feed.user_id,
title=fe.get("title", "")[:1000] if fe.get("title") else None,
url=url,
content=content,
author=_get_entry_author(fe),
published_at=pub_date,
status="unread",
starred=False,
reading_time=_calc_reading_time(content),
).on_conflict_do_nothing(
constraint="uq_reader_entries_feed_url"
).returning(Entry.id)
result = db.execute(stmt)
row = result.fetchone()
if row:
new_entry_ids.append(row[0])
new_count += 1
db.commit()
log.info("Feed %d: %d new entries from %d total", feed.id, new_count, len(parsed.entries))
# Fetch full content for new entries
if new_entry_ids:
_fetch_full_content_for_entries(db, new_entry_ids)
def _fetch_full_content_for_entries(db, entry_ids: list[int]):
"""Fetch full article content for specific entries."""
from app.config import CRAWLER_URL
entries = db.execute(
select(Entry).where(Entry.id.in_(entry_ids))
).scalars().all()
log.info("Fetching full content for %d new entries", len(entries))
for entry in entries:
if not entry.url:
continue
try:
resp = httpx.post(
f"{CRAWLER_URL}/crawl",
json={"url": entry.url},
timeout=45,
)
if resp.status_code == 200:
data = resp.json()
readable = data.get("readable_html", "")
full_text = data.get("text", "")
if readable:
entry.full_content = readable
if full_text:
entry.reading_time = max(1, len(full_text.split()) // 200)
elif full_text and len(full_text) > len(_strip_html(entry.content or "")):
paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
if not paragraphs:
paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()]
entry.full_content = "\n".join(f"<p>{p}</p>" for p in paragraphs)
entry.reading_time = max(1, len(full_text.split()) // 200)
else:
entry.full_content = entry.content or ""
else:
entry.full_content = entry.content or ""
except Exception as e:
log.warning("Full content fetch failed for entry %d: %s", entry.id, e)
entry.full_content = entry.content or ""
db.commit()
log.info("Full content done for %d entries", len(entries))
def fetch_full_content_batch():
"""Fetch full article content for entries that only have RSS summaries."""
from app.config import CRAWLER_URL
with SyncSession() as db:
# Find entries with short content and no full_content (limit batch size)
entries = db.execute(
select(Entry).where(
Entry.full_content.is_(None),
Entry.url.isnot(None),
Entry.status == "unread",
).order_by(Entry.published_at.desc()).limit(20)
).scalars().all()
if not entries:
return
log.info("Fetching full content for %d entries", len(entries))
for entry in entries:
try:
resp = httpx.post(
f"{CRAWLER_URL}/crawl",
json={"url": entry.url},
timeout=45,
)
if resp.status_code == 200:
data = resp.json()
readable = data.get("readable_html", "")
full_text = data.get("text", "")
if readable:
entry.full_content = readable
if full_text:
entry.reading_time = max(1, len(full_text.split()) // 200)
elif full_text and len(full_text) > len(_strip_html(entry.content or "")):
paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
if not paragraphs:
paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()]
entry.full_content = "\n".join(f"<p>{p}</p>" for p in paragraphs)
entry.reading_time = max(1, len(full_text.split()) // 200)
else:
entry.full_content = entry.content or ""
else:
entry.full_content = entry.content or ""
except Exception as e:
log.warning("Full content fetch failed for entry %d: %s", entry.id, e)
entry.full_content = entry.content or ""
db.commit()
log.info("Full content fetched for %d entries", len(entries))
def fetch_all_feeds():
"""Fetch all feeds — called on schedule."""
with SyncSession() as db:
feeds = db.execute(select(Feed)).scalars().all()
log.info("Scheduling fetch for %d feeds", len(feeds))
for feed in feeds:
try:
fetch_single_feed(feed.id)
except Exception:
log.exception("Error fetching feed %d", feed.id)
# Full content is now fetched inline for each new entry
def cleanup_old_entries():
"""Delete old entries: read > 30 days, unread > 60 days."""
from sqlalchemy import delete as sa_delete
with SyncSession() as db:
now = datetime.utcnow()
thirty_days = now - __import__('datetime').timedelta(days=30)
sixty_days = now - __import__('datetime').timedelta(days=60)
# Read entries older than 30 days
result1 = db.execute(
sa_delete(Entry).where(
Entry.status == "read",
Entry.created_at < thirty_days,
)
)
# Unread entries older than 60 days
result2 = db.execute(
sa_delete(Entry).where(
Entry.status == "unread",
Entry.created_at < sixty_days,
)
)
db.commit()
total = (result1.rowcount or 0) + (result2.rowcount or 0)
if total > 0:
log.info("Cleanup: deleted %d old entries (%d read, %d unread)",
total, result1.rowcount or 0, result2.rowcount or 0)
def run_scheduler():
"""Simple loop that runs fetch_all_feeds every FEED_FETCH_INTERVAL seconds."""
log.info("Reader scheduler started — interval: %ds", FEED_FETCH_INTERVAL)
# Create tables on first run (for the sync engine)
from app.database import Base
from app.models import Category, Feed, Entry # noqa: register models
Base.metadata.create_all(_engine)
cycles = 0
while True:
try:
fetch_all_feeds()
except Exception:
log.exception("Scheduler error in fetch_all_feeds")
# Run cleanup once per day (every ~144 cycles at 10min interval)
cycles += 1
if cycles % 144 == 0:
try:
cleanup_old_entries()
except Exception:
log.exception("Scheduler error in cleanup")
time.sleep(FEED_FETCH_INTERVAL)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
run_scheduler()