diff --git a/.gitignore b/.gitignore index 8367d6b..e0f1c3f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,28 @@ gateway/data/ # Test artifacts test-results/ + +# Brain storage (user data, not code) +services/brain/storage/ +services/brain/data/ + +# Reader data +services/reader/data/ + +# Screenshots (uploaded images, not code) +screenshots/*.png +screenshots/*.jpg +screenshots/*.jpeg + +# iOS build artifacts +ios/Platform/Platform.xcodeproj/xcuserdata/ +ios/Platform/Platform.xcodeproj/project.xcworkspace/xcuserdata/ +ios/Platform.bak/ + +# Node modules +**/node_modules/ + +# Temp files +*.pyc +__pycache__/ +.DS_Store diff --git a/docker-compose.yml b/docker-compose.yml index d7f8641..31214b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,8 @@ services: - IMMICH_API_KEY=${IMMICH_API_KEY} - KARAKEEP_URL=${KARAKEEP_URL:-http://192.168.1.42:3005} - KARAKEEP_API_KEY=${KARAKEEP_API_KEY} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OPENAI_MODEL=${OPENAI_MODEL:-gpt-5.2} - BODY_SIZE_LIMIT=52428800 - TZ=${TZ:-America/Chicago} networks: @@ -64,6 +66,7 @@ services: - TASKS_BACKEND_URL=http://tasks-service:8098 - TASKS_SERVICE_API_KEY=${TASKS_SERVICE_API_KEY} - BRAIN_BACKEND_URL=http://brain-api:8200 + - READER_BACKEND_URL=http://reader-api:8300 - QBITTORRENT_HOST=${QBITTORRENT_HOST:-192.168.1.42} - QBITTORRENT_PORT=${QBITTORRENT_PORT:-8080} - QBITTORRENT_USERNAME=${QBITTORRENT_USERNAME:-admin} diff --git a/extensions/brain-firefox/background.js b/extensions/brain-firefox/background.js new file mode 100644 index 0000000..6d2e60b --- /dev/null +++ b/extensions/brain-firefox/background.js @@ -0,0 +1,250 @@ +"use strict"; + +var API_BASE = "https://dash.quadjourney.com"; + +console.log("[Brain] Background script loaded"); + +// ── Auth helpers ── + +function getSession() { + return browser.storage.local.get("brainSession").then(function(data) { + return data.brainSession || null; + }); +} + +function apiRequest(path, opts) { + return getSession().then(function(session) { + if (!session) throw new Error("Not logged in"); + + opts = opts || {}; + opts.headers = opts.headers || {}; + opts.headers["Cookie"] = "platform_session=" + session; + opts.credentials = "include"; + + return fetch(API_BASE + path, opts).then(function(resp) { + if (resp.status === 401 || resp.status === 403) { + browser.storage.local.remove("brainSession"); + throw new Error("Session expired"); + } + if (!resp.ok) throw new Error("API error: " + resp.status); + return resp.json(); + }); + }); +} + +// ── Save functions ── + +function saveLink(url, title, note) { + var body = { type: "link", url: url }; + if (title) body.title = title; + if (note && note.trim()) body.raw_content = note.trim(); + return apiRequest("/api/brain/items", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); +} + +function saveNote(text) { + return apiRequest("/api/brain/items", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ type: "note", raw_content: text }), + }); +} + +function saveImage(imageUrl) { + return fetch(imageUrl).then(function(resp) { + if (!resp.ok) throw new Error("Failed to download image"); + return resp.blob(); + }).then(function(blob) { + var parts = imageUrl.split("/"); + var filename = (parts[parts.length - 1] || "image.jpg").split("?")[0]; + var formData = new FormData(); + formData.append("file", blob, filename); + + return getSession().then(function(session) { + if (!session) throw new Error("Not logged in"); + return fetch(API_BASE + "/api/brain/items/upload", { + method: "POST", + headers: { "Cookie": "platform_session=" + session }, + credentials: "include", + body: formData, + }); + }); + }).then(function(resp) { + if (!resp.ok) throw new Error("Upload failed: " + resp.status); + return resp.json(); + }); +} + +// ── Badge helpers ── + +function showBadge(tabId, text, color, duration) { + browser.action.setBadgeText({ text: text, tabId: tabId }); + browser.action.setBadgeBackgroundColor({ color: color, tabId: tabId }); + setTimeout(function() { + browser.action.setBadgeText({ text: "", tabId: tabId }); + }, duration || 2000); +} + +// ── Context menus ── + +browser.runtime.onInstalled.addListener(function() { + console.log("[Brain] Creating context menus"); + browser.contextMenus.create({ + id: "save-page", + title: "Save page to Brain", + contexts: ["page"], + }); + browser.contextMenus.create({ + id: "save-link", + title: "Save link to Brain", + contexts: ["link"], + }); + browser.contextMenus.create({ + id: "save-image", + title: "Save image to Brain", + contexts: ["image"], + }); +}); + +browser.contextMenus.onClicked.addListener(function(info, tab) { + getSession().then(function(session) { + if (!session) { + browser.action.openPopup(); + return; + } + + var promise; + if (info.menuItemId === "save-page") { + promise = saveLink(tab.url, tab.title); + } else if (info.menuItemId === "save-link") { + promise = saveLink(info.linkUrl, info.linkText); + } else if (info.menuItemId === "save-image") { + promise = saveImage(info.srcUrl); + } + + if (promise) { + promise.then(function() { + showBadge(tab.id, "OK", "#059669", 2000); + }).catch(function(e) { + showBadge(tab.id, "ERR", "#DC2626", 3000); + console.error("[Brain] Save failed:", e); + }); + } + }); +}); + +// ── Keyboard shortcut ── + +browser.commands.onCommand.addListener(function(command) { + if (command === "save-page") { + browser.tabs.query({ active: true, currentWindow: true }).then(function(tabs) { + var tab = tabs[0]; + if (!tab || !tab.url) return; + + getSession().then(function(session) { + if (!session) { + browser.action.openPopup(); + return; + } + saveLink(tab.url, tab.title).then(function() { + showBadge(tab.id, "OK", "#059669", 2000); + }).catch(function(e) { + showBadge(tab.id, "ERR", "#DC2626", 3000); + }); + }); + }); + } +}); + +// ── Message handler (from popup) ── + +browser.runtime.onMessage.addListener(function(msg, sender, sendResponse) { + console.log("[Brain] Got message:", msg.action); + + if (msg.action === "login") { + fetch(API_BASE + "/api/auth/login", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + username: msg.username, + password: msg.password, + }), + credentials: "include", + }).then(function(resp) { + if (!resp.ok) { + sendResponse({ success: false, error: "Invalid credentials" }); + return; + } + // Wait for cookie to be set + return new Promise(function(r) { setTimeout(r, 300); }).then(function() { + return browser.cookies.get({ + url: API_BASE, + name: "platform_session", + }); + }).then(function(cookie) { + if (cookie && cookie.value) { + return browser.storage.local.set({ brainSession: cookie.value }).then(function() { + sendResponse({ success: true }); + }); + } + sendResponse({ success: false, error: "Login OK but could not capture session" }); + }); + }).catch(function(e) { + console.error("[Brain] Login error:", e); + sendResponse({ success: false, error: e.message || "Connection failed" }); + }); + return true; + } + + if (msg.action === "logout") { + browser.storage.local.remove("brainSession").then(function() { + sendResponse({ success: true }); + }); + return true; + } + + if (msg.action === "check-auth") { + getSession().then(function(session) { + if (!session) { + sendResponse({ authenticated: false }); + return; + } + return apiRequest("/api/auth/me").then(function(data) { + sendResponse({ authenticated: true, user: data.user || data }); + }).catch(function() { + sendResponse({ authenticated: false }); + }); + }); + return true; + } + + if (msg.action === "save-link") { + saveLink(msg.url, msg.title, msg.note).then(function(result) { + sendResponse(result); + }).catch(function(e) { + sendResponse({ error: e.message }); + }); + return true; + } + + if (msg.action === "save-note") { + saveNote(msg.text).then(function(result) { + sendResponse(result); + }).catch(function(e) { + sendResponse({ error: e.message }); + }); + return true; + } + + if (msg.action === "save-image") { + saveImage(msg.url).then(function(result) { + sendResponse(result); + }).catch(function(e) { + sendResponse({ error: e.message }); + }); + return true; + } +}); diff --git a/extensions/brain-firefox/brain-firefox.xpi b/extensions/brain-firefox/brain-firefox.xpi new file mode 100644 index 0000000..55c190a Binary files /dev/null and b/extensions/brain-firefox/brain-firefox.xpi differ diff --git a/extensions/brain-firefox/manifest.json b/extensions/brain-firefox/manifest.json new file mode 100644 index 0000000..88373cc --- /dev/null +++ b/extensions/brain-firefox/manifest.json @@ -0,0 +1,49 @@ +{ + "manifest_version": 3, + "name": "Brain - Save to Second Brain", + "version": "1.0.0", + "description": "One-click save pages, notes, and images to your Second Brain", + "permissions": [ + "activeTab", + "contextMenus", + "storage", + "cookies" + ], + "host_permissions": [ + "https://dash.quadjourney.com/*" + ], + "action": { + "default_popup": "popup.html", + "default_icon": { + "16": "icons/brain-16.png", + "32": "icons/brain-32.png", + "48": "icons/brain-48.png" + } + }, + "background": { + "scripts": ["background.js"] + }, + "commands": { + "save-page": { + "suggested_key": { + "default": "Alt+Shift+S" + }, + "description": "Save current page to Brain" + } + }, + "icons": { + "16": "icons/brain-16.png", + "32": "icons/brain-32.png", + "48": "icons/brain-48.png", + "128": "icons/brain-128.png" + }, + "browser_specific_settings": { + "gecko": { + "id": "brain@quadjourney.com", + "data_collection_permissions": { + "required": ["none"], + "optional": [] + } + } + } +} diff --git a/extensions/brain-firefox/popup.html b/extensions/brain-firefox/popup.html new file mode 100644 index 0000000..a838675 --- /dev/null +++ b/extensions/brain-firefox/popup.html @@ -0,0 +1,259 @@ + + +
+ + + + + + + + +If you see this, the popup works.
+ + diff --git a/frontend-v2/src/app.css b/frontend-v2/src/app.css index da5f984..ad33435 100644 --- a/frontend-v2/src/app.css +++ b/frontend-v2/src/app.css @@ -12,8 +12,9 @@ ═══════════════════════════════════════════════ */ @layer base { - html, body { overflow-x: hidden; } - body { padding-top: env(safe-area-inset-top); padding-bottom: env(safe-area-inset-bottom); } + html { background-color: #f5efe6; } + body { overflow-x: clip; } + body { padding-bottom: env(safe-area-inset-bottom); } :root { /* ── Fonts ── */ --font: 'Outfit', -apple-system, system-ui, sans-serif; @@ -106,7 +107,7 @@ /* ── LIGHT MODE — Zinc + Emerald ── */ :root { - --canvas: #FAFAFA; + --canvas: #f5efe6; --surface: #FFFFFF; --surface-secondary: #F4F4F5; --card: #FFFFFF; diff --git a/frontend-v2/src/app.html b/frontend-v2/src/app.html index bd524fe..2ee60cb 100644 --- a/frontend-v2/src/app.html +++ b/frontend-v2/src/app.html @@ -3,6 +3,9 @@ + + + diff --git a/frontend-v2/src/lib/components/assistant/BrainAssistantDrawer.svelte b/frontend-v2/src/lib/components/assistant/BrainAssistantDrawer.svelte new file mode 100644 index 0000000..779df73 --- /dev/null +++ b/frontend-v2/src/lib/components/assistant/BrainAssistantDrawer.svelte @@ -0,0 +1,480 @@ + + +{#if open} + + +{p}
" for p in paragraphs) + else: + entry.full_content = "" + + # Recalculate reading time from plain text + if full_text: + word_count = len(full_text.split()) + entry.reading_time = max(1, word_count // 200) + + await db.commit() + await db.refresh(entry) + return EntryOut.from_entry(entry) diff --git a/services/reader/app/api/feeds.py b/services/reader/app/api/feeds.py new file mode 100644 index 0000000..c98e0a6 --- /dev/null +++ b/services/reader/app/api/feeds.py @@ -0,0 +1,242 @@ +"""Feed endpoints.""" + +import logging +import re + +import feedparser +import httpx +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_db_session, get_user_id +from app.models import Category, Entry, Feed + +log = logging.getLogger(__name__) +router = APIRouter(prefix="/api/feeds", tags=["feeds"]) + + +# ── Schemas ────────────────────────────────────────────────────────────── + + +class CategoryRef(BaseModel): + id: int + title: str + + class Config: + from_attributes = True + + +class FeedOut(BaseModel): + id: int + title: str + feed_url: str + site_url: str | None = None + category: CategoryRef | None = None + + class Config: + from_attributes = True + + +class FeedCreate(BaseModel): + feed_url: str + category_id: int | None = None + + +class CountersOut(BaseModel): + unreads: dict[str, int] + + +# ── Helpers ────────────────────────────────────────────────────────────── + + +def _discover_feed_url(html: str, base_url: str) -> str | None: + """Try to find an RSS/Atom feed link in HTML.""" + patterns = [ + r']+type=["\']application/(?:rss|atom)\+xml["\'][^>]+href=["\']([^"\']+)["\']', + r']+href=["\']([^"\']+)["\'][^>]+type=["\']application/(?:rss|atom)\+xml["\']', + ] + for pat in patterns: + match = re.search(pat, html, re.IGNORECASE) + if match: + href = match.group(1) + if href.startswith("/"): + # Resolve relative URL + from urllib.parse import urljoin + href = urljoin(base_url, href) + return href + return None + + +async def _fetch_and_parse_feed(feed_url: str) -> tuple[str, str, str | None]: + """ + Fetch a URL. If it's a valid feed, return (feed_url, title, site_url). + If it's HTML, try to discover the feed link and follow it. + """ + async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: + resp = await client.get(feed_url, headers={"User-Agent": "Reader/1.0"}) + resp.raise_for_status() + + body = resp.text + parsed = feedparser.parse(body) + + # Check if it's a valid feed + if parsed.feed.get("title") or parsed.entries: + title = parsed.feed.get("title", feed_url) + site_url = parsed.feed.get("link") + return feed_url, title, site_url + + # Not a feed — try to discover from HTML + discovered = _discover_feed_url(body, feed_url) + if not discovered: + raise HTTPException(status_code=400, detail="No RSS/Atom feed found at this URL") + + # Fetch the discovered feed + async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: + resp2 = await client.get(discovered, headers={"User-Agent": "Reader/1.0"}) + resp2.raise_for_status() + + parsed2 = feedparser.parse(resp2.text) + title = parsed2.feed.get("title", discovered) + site_url = parsed2.feed.get("link") or feed_url + return discovered, title, site_url + + +# ── Routes ─────────────────────────────────────────────────────────────── + + +@router.get("/counters", response_model=CountersOut) +async def feed_counters( + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + result = await db.execute( + select(Entry.feed_id, func.count(Entry.id)) + .where(Entry.user_id == user_id, Entry.status == "unread") + .group_by(Entry.feed_id) + ) + unreads = {str(row[0]): row[1] for row in result.all()} + return {"unreads": unreads} + + +@router.get("", response_model=list[FeedOut]) +async def list_feeds( + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + result = await db.execute( + select(Feed) + .where(Feed.user_id == user_id) + .order_by(Feed.title) + ) + return result.scalars().all() + + +@router.post("", response_model=FeedOut, status_code=201) +async def create_feed( + body: FeedCreate, + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + # Check for duplicate + existing = await db.execute( + select(Feed).where(Feed.feed_url == body.feed_url) + ) + if existing.scalar_one_or_none(): + raise HTTPException(status_code=409, detail="Feed already exists") + + # Validate category belongs to user + if body.category_id: + cat = await db.execute( + select(Category).where( + Category.id == body.category_id, + Category.user_id == user_id, + ) + ) + if not cat.scalar_one_or_none(): + raise HTTPException(status_code=404, detail="Category not found") + + # Fetch and discover feed + try: + actual_url, title, site_url = await _fetch_and_parse_feed(body.feed_url) + except httpx.HTTPError as e: + log.warning("Failed to fetch feed %s: %s", body.feed_url, e) + raise HTTPException(status_code=400, detail=f"Could not fetch feed: {e}") + + # Check again with discovered URL + if actual_url != body.feed_url: + existing = await db.execute( + select(Feed).where(Feed.feed_url == actual_url) + ) + if existing.scalar_one_or_none(): + raise HTTPException(status_code=409, detail="Feed already exists") + + feed = Feed( + user_id=user_id, + category_id=body.category_id, + title=title, + feed_url=actual_url, + site_url=site_url, + ) + db.add(feed) + await db.commit() + await db.refresh(feed) + return feed + + +@router.delete("/{feed_id}", status_code=204) +async def delete_feed( + feed_id: int, + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + result = await db.execute( + select(Feed).where(Feed.id == feed_id, Feed.user_id == user_id) + ) + feed = result.scalar_one_or_none() + if not feed: + raise HTTPException(status_code=404, detail="Feed not found") + await db.delete(feed) + await db.commit() + + +@router.post("/{feed_id}/refresh") +async def refresh_feed( + feed_id: int, + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + result = await db.execute( + select(Feed).where(Feed.id == feed_id, Feed.user_id == user_id) + ) + feed = result.scalar_one_or_none() + if not feed: + raise HTTPException(status_code=404, detail="Feed not found") + + import asyncio + from app.worker.tasks import fetch_single_feed + await asyncio.to_thread(fetch_single_feed, feed_id) + + return {"ok": True, "message": f"Refreshed {feed.title}"} + + +@router.post("/refresh-all") +async def refresh_all_feeds( + user_id: str = Depends(get_user_id), + db: AsyncSession = Depends(get_db_session), +): + result = await db.execute( + select(Feed).where(Feed.user_id == user_id) + ) + feeds = result.scalars().all() + + import asyncio + from app.worker.tasks import fetch_single_feed + for feed in feeds: + try: + await asyncio.to_thread(fetch_single_feed, feed.id) + except Exception: + pass + + return {"ok": True, "message": f"Refreshed {len(feeds)} feeds"} diff --git a/services/reader/app/config.py b/services/reader/app/config.py new file mode 100644 index 0000000..7a6c251 --- /dev/null +++ b/services/reader/app/config.py @@ -0,0 +1,23 @@ +"""Reader service configuration — all from environment variables.""" + +import os + +# ── Database (reuse Brain's PostgreSQL) ── +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql+asyncpg://brain:brain@brain-db:5432/brain", +) +DATABASE_URL_SYNC = DATABASE_URL.replace("+asyncpg", "") + +# ── Redis (reuse Brain's Redis) ── +REDIS_URL = os.environ.get("REDIS_URL", "redis://brain-redis:6379/0") + +# ── Crawler (reuse Brain's Playwright crawler) ── +CRAWLER_URL = os.environ.get("CRAWLER_URL", "http://brain-crawler:3100") + +# ── Service ── +PORT = int(os.environ.get("PORT", "8300")) +DEBUG = os.environ.get("DEBUG", "").lower() in ("1", "true") + +# ── Feed fetch interval (seconds) ── +FEED_FETCH_INTERVAL = int(os.environ.get("FEED_FETCH_INTERVAL", "600")) diff --git a/services/reader/app/database.py b/services/reader/app/database.py new file mode 100644 index 0000000..cc9e2ea --- /dev/null +++ b/services/reader/app/database.py @@ -0,0 +1,18 @@ +"""Database session and engine setup.""" + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase + +from app.config import DATABASE_URL + +engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=5) +async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +class Base(DeclarativeBase): + pass + + +async def get_db() -> AsyncSession: + async with async_session() as session: + yield session diff --git a/services/reader/app/main.py b/services/reader/app/main.py new file mode 100644 index 0000000..9b4da5a --- /dev/null +++ b/services/reader/app/main.py @@ -0,0 +1,43 @@ +"""Reader service — FastAPI entrypoint.""" + +import logging + +from fastapi import FastAPI + +from app.api.categories import router as categories_router +from app.api.feeds import router as feeds_router +from app.api.entries import router as entries_router +from app.config import DEBUG + +logging.basicConfig( + level=logging.DEBUG if DEBUG else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) + +app = FastAPI( + title="Reader", + description="Self-hosted RSS reader — replaces Miniflux.", + version="1.0.0", + docs_url="/api/docs" if DEBUG else None, + redoc_url=None, +) + +app.include_router(categories_router) +app.include_router(feeds_router) +app.include_router(entries_router) + + +@app.get("/api/health") +async def health(): + return {"status": "ok"} + + +@app.on_event("startup") +async def startup(): + from app.database import engine, Base + from app.models import Category, Feed, Entry # noqa: register models + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + logging.getLogger(__name__).info("Reader service started") diff --git a/services/reader/app/models.py b/services/reader/app/models.py new file mode 100644 index 0000000..2aa6f35 --- /dev/null +++ b/services/reader/app/models.py @@ -0,0 +1,74 @@ +"""SQLAlchemy models for the reader service.""" + +from datetime import datetime + +from sqlalchemy import ( + Boolean, + Column, + DateTime, + ForeignKey, + Index, + Integer, + String, + Text, + UniqueConstraint, +) +from sqlalchemy.orm import relationship + +from app.database import Base + + +class Category(Base): + __tablename__ = "reader_categories" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(String(64), nullable=False) + title = Column(String(255), nullable=False) + created_at = Column(DateTime, default=datetime.utcnow) + + feeds = relationship("Feed", back_populates="category", lazy="selectin") + + +class Feed(Base): + __tablename__ = "reader_feeds" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(String(64), nullable=False) + category_id = Column(Integer, ForeignKey("reader_categories.id", ondelete="SET NULL"), nullable=True) + title = Column(String(500), nullable=False) + feed_url = Column(Text, nullable=False, unique=True) + site_url = Column(Text) + etag = Column(String(255)) + last_modified = Column(String(255)) + last_fetched_at = Column(DateTime) + created_at = Column(DateTime, default=datetime.utcnow) + + category = relationship("Category", back_populates="feeds", lazy="selectin") + entries = relationship("Entry", back_populates="feed", lazy="noload", cascade="all, delete-orphan") + + +class Entry(Base): + __tablename__ = "reader_entries" + __table_args__ = ( + UniqueConstraint("feed_id", "url", name="uq_reader_entries_feed_url"), + Index("idx_reader_entries_user_status", "user_id", "status"), + Index("idx_reader_entries_user_starred", "user_id", "starred"), + Index("idx_reader_entries_feed", "feed_id"), + Index("idx_reader_entries_published", "published_at"), + ) + + id = Column(Integer, primary_key=True, autoincrement=True) + feed_id = Column(Integer, ForeignKey("reader_feeds.id", ondelete="CASCADE"), nullable=False) + user_id = Column(String(64), nullable=False) + title = Column(String(1000)) + url = Column(Text) + content = Column(Text) + full_content = Column(Text) + author = Column(String(500)) + published_at = Column(DateTime) + status = Column(String(10), default="unread") + starred = Column(Boolean, default=False) + reading_time = Column(Integer, default=1) + created_at = Column(DateTime, default=datetime.utcnow) + + feed = relationship("Feed", back_populates="entries", lazy="selectin") diff --git a/services/reader/app/worker/__init__.py b/services/reader/app/worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/reader/app/worker/tasks.py b/services/reader/app/worker/tasks.py new file mode 100644 index 0000000..ea92652 --- /dev/null +++ b/services/reader/app/worker/tasks.py @@ -0,0 +1,363 @@ +"""Feed fetching worker — RQ tasks and scheduling loop.""" + +import logging +import re +import time +from datetime import datetime, timezone + +import feedparser +import httpx +from dateutil import parser as dateparser +from redis import Redis +from rq import Queue +from sqlalchemy import create_engine, select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session, sessionmaker + +from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, REDIS_URL +from app.models import Category, Entry, Feed + +log = logging.getLogger(__name__) + +# ── Sync DB engine (for RQ worker) ── +_engine = create_engine(DATABASE_URL_SYNC, echo=False, pool_size=5, max_overflow=3) +SyncSession = sessionmaker(_engine, class_=Session, expire_on_commit=False) + +# ── RQ queue ── +_redis = Redis.from_url(REDIS_URL) +queue = Queue("reader", connection=_redis) + +# HTML tag stripper +_html_re = re.compile(r"<[^>]+>") + + +def _strip_html(text: str) -> str: + """Remove HTML tags for word counting.""" + if not text: + return "" + return _html_re.sub("", text) + + +def _calc_reading_time(html_content: str) -> int: + """Estimate reading time in minutes from HTML content.""" + plain = _strip_html(html_content) + word_count = len(plain.split()) + return max(1, word_count // 200) + + +def _parse_date(entry: dict) -> datetime | None: + """Parse published date from a feedparser entry.""" + for field in ("published", "updated", "created"): + val = entry.get(field) + if val: + try: + return dateparser.parse(val) + except (ValueError, TypeError): + continue + # Try struct_time fields + for field in ("published_parsed", "updated_parsed", "created_parsed"): + val = entry.get(field) + if val: + try: + return datetime(*val[:6], tzinfo=timezone.utc) + except (ValueError, TypeError): + continue + return None + + +def _get_entry_content(entry: dict) -> str: + """Extract the best content from a feedparser entry.""" + # Prefer content field (often full HTML) + if entry.get("content"): + return entry["content"][0].get("value", "") + # Fall back to summary + if entry.get("summary"): + return entry["summary"] + # Fall back to description + if entry.get("description"): + return entry["description"] + return "" + + +def _get_entry_author(entry: dict) -> str | None: + """Extract author from a feedparser entry.""" + if entry.get("author"): + return entry["author"] + if entry.get("author_detail", {}).get("name"): + return entry["author_detail"]["name"] + return None + + +def _ensure_uncategorized(db: Session, user_id: str) -> int: + """Ensure an 'Uncategorized' category exists for the user, return its ID.""" + row = db.execute( + select(Category).where( + Category.user_id == user_id, + Category.title == "Uncategorized", + ) + ).scalar_one_or_none() + if row: + return row.id + cat = Category(user_id=user_id, title="Uncategorized") + db.add(cat) + db.flush() + return cat.id + + +def fetch_single_feed(feed_id: int): + """Fetch and parse a single feed, inserting new entries.""" + with SyncSession() as db: + feed = db.execute(select(Feed).where(Feed.id == feed_id)).scalar_one_or_none() + if not feed: + log.warning("Feed %d not found, skipping", feed_id) + return + + log.info("Fetching feed %d: %s", feed.id, feed.feed_url) + + headers = {"User-Agent": "Reader/1.0"} + if feed.etag: + headers["If-None-Match"] = feed.etag + if feed.last_modified: + headers["If-Modified-Since"] = feed.last_modified + + try: + resp = httpx.get(feed.feed_url, headers=headers, timeout=30, follow_redirects=True) + except httpx.HTTPError as e: + log.error("HTTP error fetching feed %d: %s", feed.id, e) + return + + # 304 Not Modified + if resp.status_code == 304: + log.debug("Feed %d not modified", feed.id) + feed.last_fetched_at = datetime.utcnow() + db.commit() + return + + if resp.status_code != 200: + log.warning("Feed %d returned status %d", feed.id, resp.status_code) + return + + # Update etag/last-modified + feed.etag = resp.headers.get("ETag") + feed.last_modified = resp.headers.get("Last-Modified") + feed.last_fetched_at = datetime.utcnow() + + parsed = feedparser.parse(resp.text) + if not parsed.entries: + log.debug("Feed %d has no entries", feed.id) + db.commit() + return + + new_count = 0 + new_entry_ids = [] + for fe in parsed.entries: + url = fe.get("link") + if not url: + continue + + content = _get_entry_content(fe) + pub_date = _parse_date(fe) + + stmt = pg_insert(Entry).values( + feed_id=feed.id, + user_id=feed.user_id, + title=fe.get("title", "")[:1000] if fe.get("title") else None, + url=url, + content=content, + author=_get_entry_author(fe), + published_at=pub_date, + status="unread", + starred=False, + reading_time=_calc_reading_time(content), + ).on_conflict_do_nothing( + constraint="uq_reader_entries_feed_url" + ).returning(Entry.id) + result = db.execute(stmt) + row = result.fetchone() + if row: + new_entry_ids.append(row[0]) + new_count += 1 + + db.commit() + log.info("Feed %d: %d new entries from %d total", feed.id, new_count, len(parsed.entries)) + + # Fetch full content for new entries + if new_entry_ids: + _fetch_full_content_for_entries(db, new_entry_ids) + + +def _fetch_full_content_for_entries(db, entry_ids: list[int]): + """Fetch full article content for specific entries.""" + from app.config import CRAWLER_URL + + entries = db.execute( + select(Entry).where(Entry.id.in_(entry_ids)) + ).scalars().all() + + log.info("Fetching full content for %d new entries", len(entries)) + for entry in entries: + if not entry.url: + continue + try: + resp = httpx.post( + f"{CRAWLER_URL}/crawl", + json={"url": entry.url}, + timeout=45, + ) + if resp.status_code == 200: + data = resp.json() + readable = data.get("readable_html", "") + full_text = data.get("text", "") + if readable: + entry.full_content = readable + if full_text: + entry.reading_time = max(1, len(full_text.split()) // 200) + elif full_text and len(full_text) > len(_strip_html(entry.content or "")): + paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()] + if not paragraphs: + paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()] + entry.full_content = "\n".join(f"{p}
" for p in paragraphs) + entry.reading_time = max(1, len(full_text.split()) // 200) + else: + entry.full_content = entry.content or "" + else: + entry.full_content = entry.content or "" + except Exception as e: + log.warning("Full content fetch failed for entry %d: %s", entry.id, e) + entry.full_content = entry.content or "" + + db.commit() + log.info("Full content done for %d entries", len(entries)) + + +def fetch_full_content_batch(): + """Fetch full article content for entries that only have RSS summaries.""" + from app.config import CRAWLER_URL + + with SyncSession() as db: + # Find entries with short content and no full_content (limit batch size) + entries = db.execute( + select(Entry).where( + Entry.full_content.is_(None), + Entry.url.isnot(None), + Entry.status == "unread", + ).order_by(Entry.published_at.desc()).limit(20) + ).scalars().all() + + if not entries: + return + + log.info("Fetching full content for %d entries", len(entries)) + for entry in entries: + try: + resp = httpx.post( + f"{CRAWLER_URL}/crawl", + json={"url": entry.url}, + timeout=45, + ) + if resp.status_code == 200: + data = resp.json() + readable = data.get("readable_html", "") + full_text = data.get("text", "") + if readable: + entry.full_content = readable + if full_text: + entry.reading_time = max(1, len(full_text.split()) // 200) + elif full_text and len(full_text) > len(_strip_html(entry.content or "")): + paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()] + if not paragraphs: + paragraphs = [p.strip() for p in full_text.split("\n") if p.strip()] + entry.full_content = "\n".join(f"{p}
" for p in paragraphs) + entry.reading_time = max(1, len(full_text.split()) // 200) + else: + entry.full_content = entry.content or "" + else: + entry.full_content = entry.content or "" + except Exception as e: + log.warning("Full content fetch failed for entry %d: %s", entry.id, e) + entry.full_content = entry.content or "" + + db.commit() + log.info("Full content fetched for %d entries", len(entries)) + + +def fetch_all_feeds(): + """Fetch all feeds — called on schedule.""" + with SyncSession() as db: + feeds = db.execute(select(Feed)).scalars().all() + + log.info("Scheduling fetch for %d feeds", len(feeds)) + for feed in feeds: + try: + fetch_single_feed(feed.id) + except Exception: + log.exception("Error fetching feed %d", feed.id) + + # Full content is now fetched inline for each new entry + + +def cleanup_old_entries(): + """Delete old entries: read > 30 days, unread > 60 days.""" + from sqlalchemy import delete as sa_delete + + with SyncSession() as db: + now = datetime.utcnow() + thirty_days = now - __import__('datetime').timedelta(days=30) + sixty_days = now - __import__('datetime').timedelta(days=60) + + # Read entries older than 30 days + result1 = db.execute( + sa_delete(Entry).where( + Entry.status == "read", + Entry.created_at < thirty_days, + ) + ) + + # Unread entries older than 60 days + result2 = db.execute( + sa_delete(Entry).where( + Entry.status == "unread", + Entry.created_at < sixty_days, + ) + ) + + db.commit() + total = (result1.rowcount or 0) + (result2.rowcount or 0) + if total > 0: + log.info("Cleanup: deleted %d old entries (%d read, %d unread)", + total, result1.rowcount or 0, result2.rowcount or 0) + + +def run_scheduler(): + """Simple loop that runs fetch_all_feeds every FEED_FETCH_INTERVAL seconds.""" + log.info("Reader scheduler started — interval: %ds", FEED_FETCH_INTERVAL) + + # Create tables on first run (for the sync engine) + from app.database import Base + from app.models import Category, Feed, Entry # noqa: register models + Base.metadata.create_all(_engine) + + cycles = 0 + while True: + try: + fetch_all_feeds() + except Exception: + log.exception("Scheduler error in fetch_all_feeds") + + # Run cleanup once per day (every ~144 cycles at 10min interval) + cycles += 1 + if cycles % 144 == 0: + try: + cleanup_old_entries() + except Exception: + log.exception("Scheduler error in cleanup") + + time.sleep(FEED_FETCH_INTERVAL) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + run_scheduler() diff --git a/services/reader/docker-compose.yml b/services/reader/docker-compose.yml new file mode 100644 index 0000000..b92db40 --- /dev/null +++ b/services/reader/docker-compose.yml @@ -0,0 +1,43 @@ +services: + # ── API ── + reader-api: + build: + context: . + dockerfile: Dockerfile.api + container_name: reader-api + restart: unless-stopped + environment: + - DATABASE_URL=postgresql+asyncpg://brain:brain@brain-db:5432/brain + - REDIS_URL=redis://brain-redis:6379/0 + - CRAWLER_URL=http://brain-crawler:3100 + - PORT=8300 + - DEBUG=${DEBUG:-0} + - TZ=${TZ:-America/Chicago} + networks: + - default + - pangolin + - brain + + # ── Worker (feed fetcher + scheduler) ── + reader-worker: + build: + context: . + dockerfile: Dockerfile.worker + container_name: reader-worker + restart: unless-stopped + environment: + - DATABASE_URL=postgresql+asyncpg://brain:brain@brain-db:5432/brain + - REDIS_URL=redis://brain-redis:6379/0 + - CRAWLER_URL=http://brain-crawler:3100 + - FEED_FETCH_INTERVAL=600 + - TZ=${TZ:-America/Chicago} + networks: + - default + - brain + +networks: + pangolin: + external: true + brain: + name: brain_default + external: true diff --git a/services/reader/requirements.txt b/services/reader/requirements.txt new file mode 100644 index 0000000..d9ed724 --- /dev/null +++ b/services/reader/requirements.txt @@ -0,0 +1,11 @@ +fastapi==0.115.0 +uvicorn[standard]==0.32.0 +sqlalchemy[asyncio]==2.0.35 +asyncpg==0.30.0 +psycopg2-binary==2.9.10 +pydantic==2.10.0 +httpx==0.28.0 +feedparser==6.0.11 +redis==5.2.0 +rq==2.1.0 +python-dateutil==2.9.0