Full backend service with: - FastAPI REST API with CRUD, search, reprocess endpoints - PostgreSQL + pgvector for items and semantic search - Redis + RQ for background job processing - Meilisearch for fast keyword/filter search - Browserless/Chrome for JS rendering and screenshots - OpenAI structured output for AI classification - Local file storage with S3-ready abstraction - Gateway auth via X-Gateway-User-Id header - Own docker-compose stack (6 containers) Classification: fixed folders (Home/Family/Work/Travel/Knowledge/Faith/Projects) and fixed tags (28 predefined). AI assigns exactly 1 folder, 2-3 tags, title, summary, and confidence score per item. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
157 lines
5.9 KiB
Python
157 lines
5.9 KiB
Python
"""Background worker tasks — processes items after creation."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
from redis import Redis
|
|
from rq import Queue
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.config import REDIS_URL, DATABASE_URL_SYNC
|
|
from app.models.item import Item, ItemAsset
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# RQ queue
|
|
_redis = Redis.from_url(REDIS_URL)
|
|
queue = Queue("brain", connection=_redis)
|
|
|
|
|
|
def enqueue_process_item(item_id: str):
|
|
"""Enqueue a background job to process an item."""
|
|
queue.enqueue(process_item_job, item_id, job_timeout=300)
|
|
|
|
|
|
def process_item_job(item_id: str):
|
|
"""Synchronous entry point for RQ — runs the async pipeline."""
|
|
asyncio.run(_process_item(item_id))
|
|
|
|
|
|
async def _process_item(item_id: str):
|
|
"""Full processing pipeline for a saved item."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
|
from app.config import DATABASE_URL
|
|
from app.services.ingest import fetch_url_content, take_screenshot, archive_html
|
|
from app.services.classify import classify_item
|
|
from app.services.embed import generate_embedding
|
|
from app.search.engine import index_item, ensure_meili_index
|
|
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
async with Session() as db:
|
|
# Load item
|
|
result = await db.execute(
|
|
select(Item).options(selectinload(Item.assets)).where(Item.id == item_id)
|
|
)
|
|
item = result.scalar_one_or_none()
|
|
if not item:
|
|
log.error(f"Item {item_id} not found")
|
|
return
|
|
|
|
try:
|
|
item.processing_status = "processing"
|
|
await db.commit()
|
|
|
|
extracted_text = item.raw_content or ""
|
|
title = item.title
|
|
html_content = None
|
|
|
|
# ── Step 1: Fetch content for URLs ──
|
|
if item.type == "link" and item.url:
|
|
log.info(f"Fetching URL: {item.url}")
|
|
content = await fetch_url_content(item.url)
|
|
html_content = content.get("html")
|
|
extracted_text = content.get("text") or extracted_text
|
|
if not title:
|
|
title = content.get("title")
|
|
item.metadata_json = item.metadata_json or {}
|
|
item.metadata_json["description"] = content.get("description")
|
|
item.metadata_json["used_browserless"] = content.get("used_browserless", False)
|
|
|
|
# Take screenshot
|
|
screenshot_path = await take_screenshot(item.url, item.id)
|
|
if screenshot_path:
|
|
asset = ItemAsset(
|
|
id=str(uuid.uuid4()),
|
|
item_id=item.id,
|
|
asset_type="screenshot",
|
|
filename="screenshot.png",
|
|
content_type="image/png",
|
|
storage_path=screenshot_path,
|
|
)
|
|
db.add(asset)
|
|
|
|
# Archive HTML
|
|
if html_content:
|
|
html_path = await archive_html(html_content, item.id)
|
|
if html_path:
|
|
asset = ItemAsset(
|
|
id=str(uuid.uuid4()),
|
|
item_id=item.id,
|
|
asset_type="archived_html",
|
|
filename="page.html",
|
|
content_type="text/html",
|
|
storage_path=html_path,
|
|
)
|
|
db.add(asset)
|
|
|
|
# ── Step 2: AI classification ──
|
|
log.info(f"Classifying item {item.id}")
|
|
classification = await classify_item(
|
|
item_type=item.type,
|
|
url=item.url,
|
|
title=title,
|
|
text=extracted_text,
|
|
)
|
|
|
|
item.title = classification.get("title") or title or "Untitled"
|
|
item.folder = classification.get("folder", "Knowledge")
|
|
item.tags = classification.get("tags", ["reference", "read-later"])
|
|
item.summary = classification.get("summary")
|
|
item.confidence = classification.get("confidence", 0.0)
|
|
item.extracted_text = extracted_text
|
|
|
|
# ── Step 3: Generate embedding ──
|
|
log.info(f"Generating embedding for item {item.id}")
|
|
embed_text = f"{item.title or ''}\n{item.summary or ''}\n{extracted_text}"
|
|
embedding = await generate_embedding(embed_text)
|
|
if embedding:
|
|
item.embedding = embedding
|
|
|
|
# ── Step 4: Update status ──
|
|
item.processing_status = "ready"
|
|
item.updated_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
# ── Step 5: Index in Meilisearch ──
|
|
log.info(f"Indexing item {item.id} in Meilisearch")
|
|
await ensure_meili_index()
|
|
await index_item({
|
|
"id": item.id,
|
|
"user_id": item.user_id,
|
|
"type": item.type,
|
|
"title": item.title,
|
|
"url": item.url,
|
|
"folder": item.folder,
|
|
"tags": item.tags or [],
|
|
"summary": item.summary,
|
|
"extracted_text": (extracted_text or "")[:10000], # Truncate for search index
|
|
"processing_status": item.processing_status,
|
|
"created_at": item.created_at.isoformat() if item.created_at else None,
|
|
})
|
|
|
|
log.info(f"Item {item.id} processed successfully")
|
|
|
|
except Exception as e:
|
|
log.error(f"Processing failed for item {item.id}: {e}", exc_info=True)
|
|
item.processing_status = "failed"
|
|
item.processing_error = str(e)[:500]
|
|
item.updated_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
await engine.dispose()
|