"""Background worker tasks — processes items after creation.""" import asyncio import logging import uuid from datetime import datetime from redis import Redis from rq import Queue from sqlalchemy import select from sqlalchemy.orm import selectinload from app.config import REDIS_URL, DATABASE_URL_SYNC from app.models.item import Item, ItemAsset log = logging.getLogger(__name__) # RQ queue _redis = Redis.from_url(REDIS_URL) queue = Queue("brain", connection=_redis) def enqueue_process_item(item_id: str): """Enqueue a background job to process an item.""" queue.enqueue(process_item_job, item_id, job_timeout=300) def process_item_job(item_id: str): """Synchronous entry point for RQ — runs the async pipeline.""" asyncio.run(_process_item(item_id)) async def _process_item(item_id: str): """Full processing pipeline for a saved item.""" from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from app.config import DATABASE_URL from app.services.ingest import fetch_url_content, take_screenshot, archive_html from app.services.classify import classify_item from app.services.embed import generate_embedding from app.search.engine import index_item, ensure_meili_index engine = create_async_engine(DATABASE_URL, echo=False) Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with Session() as db: # Load item result = await db.execute( select(Item).options(selectinload(Item.assets)).where(Item.id == item_id) ) item = result.scalar_one_or_none() if not item: log.error(f"Item {item_id} not found") return try: item.processing_status = "processing" await db.commit() extracted_text = item.raw_content or "" title = item.title html_content = None # ── Step 1: Fetch content for URLs ── if item.type == "link" and item.url: log.info(f"Fetching URL: {item.url}") content = await fetch_url_content(item.url) html_content = content.get("html") extracted_text = content.get("text") or extracted_text if not title: title = content.get("title") item.metadata_json = item.metadata_json or {} item.metadata_json["description"] = content.get("description") item.metadata_json["used_browserless"] = content.get("used_browserless", False) # Take screenshot screenshot_path = await take_screenshot(item.url, item.id) if screenshot_path: asset = ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="screenshot", filename="screenshot.png", content_type="image/png", storage_path=screenshot_path, ) db.add(asset) # Archive HTML if html_content: html_path = await archive_html(html_content, item.id) if html_path: asset = ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="archived_html", filename="page.html", content_type="text/html", storage_path=html_path, ) db.add(asset) # ── Step 2: AI classification ── log.info(f"Classifying item {item.id}") classification = await classify_item( item_type=item.type, url=item.url, title=title, text=extracted_text, ) item.title = classification.get("title") or title or "Untitled" item.folder = classification.get("folder", "Knowledge") item.tags = classification.get("tags", ["reference", "read-later"]) item.summary = classification.get("summary") item.confidence = classification.get("confidence", 0.0) item.extracted_text = extracted_text # ── Step 3: Generate embedding ── log.info(f"Generating embedding for item {item.id}") embed_text = f"{item.title or ''}\n{item.summary or ''}\n{extracted_text}" embedding = await generate_embedding(embed_text) if embedding: item.embedding = embedding # ── Step 4: Update status ── item.processing_status = "ready" item.updated_at = datetime.utcnow() await db.commit() # ── Step 5: Index in Meilisearch ── log.info(f"Indexing item {item.id} in Meilisearch") await ensure_meili_index() await index_item({ "id": item.id, "user_id": item.user_id, "type": item.type, "title": item.title, "url": item.url, "folder": item.folder, "tags": item.tags or [], "summary": item.summary, "extracted_text": (extracted_text or "")[:10000], # Truncate for search index "processing_status": item.processing_status, "created_at": item.created_at.isoformat() if item.created_at else None, }) log.info(f"Item {item.id} processed successfully") except Exception as e: log.error(f"Processing failed for item {item.id}: {e}", exc_info=True) item.processing_status = "failed" item.processing_error = str(e)[:500] item.updated_at = datetime.utcnow() await db.commit() await engine.dispose()