"""Background worker tasks — processes items after creation.""" import asyncio import logging import uuid from datetime import datetime from redis import Redis from rq import Queue from sqlalchemy import select from sqlalchemy.orm import selectinload from app.config import REDIS_URL, DATABASE_URL_SYNC from app.models.item import Item, ItemAsset from app.models.taxonomy import Folder, Tag, ItemTag # noqa: F401 — register FK targets log = logging.getLogger(__name__) # RQ queue _redis = Redis.from_url(REDIS_URL) queue = Queue("brain", connection=_redis) def enqueue_process_item(item_id: str): """Enqueue a background job to process an item.""" queue.enqueue(process_item_job, item_id, job_timeout=300) def process_item_job(item_id: str): """Synchronous entry point for RQ — runs the async pipeline.""" asyncio.run(_process_item(item_id)) async def _process_item(item_id: str): """Full processing pipeline for a saved item.""" from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from app.config import DATABASE_URL from app.services.ingest import crawl_url, save_screenshot_from_base64, download_og_image, archive_html from app.services.classify import classify_item from app.services.embed import generate_embedding from app.search.engine import index_item, ensure_meili_index engine = create_async_engine(DATABASE_URL, echo=False) Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with Session() as db: # Load item result = await db.execute( select(Item).options(selectinload(Item.assets)).where(Item.id == item_id) ) item = result.scalar_one_or_none() if not item: log.error(f"Item {item_id} not found") return try: item.processing_status = "processing" await db.commit() extracted_text = item.raw_content or "" title = item.title html_content = None # ── Step 1: Fetch content for URLs ── if item.type == "link" and item.url: from app.services.ingest import ( _is_youtube_url, download_youtube_thumbnail, download_youtube_video, fetch_youtube_metadata, ) item.metadata_json = item.metadata_json or {} is_yt = _is_youtube_url(item.url) if is_yt: # YouTube: use oEmbed + thumbnail + yt-dlp (no crawler needed) log.info(f"Processing YouTube URL: {item.url}") yt_meta = await fetch_youtube_metadata(item.url) if yt_meta: if not title: title = yt_meta.get("title") extracted_text = f"YouTube: {yt_meta.get('title','')}\nBy: {yt_meta.get('author','')}" item.metadata_json["youtube"] = { "video_id": yt_meta.get("video_id"), "author": yt_meta.get("author"), "is_short": yt_meta.get("is_short", False), } item.metadata_json["description"] = f"YouTube video by {yt_meta.get('author','')}" # Download video log.info(f"Downloading YouTube video: {item.url}") video_path, yt_info = await download_youtube_video(item.url, item.id) if video_path: db.add(ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="video", filename=f"{yt_meta['video_id']}.mp4", content_type="video/mp4", storage_path=video_path, )) if yt_info.get("duration"): item.metadata_json["youtube"]["duration"] = yt_info["duration"] if yt_info.get("description"): item.metadata_json["youtube"]["description"] = yt_info["description"][:500] extracted_text = f"YouTube: {title or ''}\nBy: {(yt_meta or {}).get('author','')}\n{yt_info['description'][:2000]}" # Thumbnail thumb_path = await download_youtube_thumbnail(item.url, item.id) if thumb_path: db.add(ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="screenshot", filename="thumbnail.jpg", content_type="image/jpeg", storage_path=thumb_path, )) else: # Regular URL: use Playwright crawler (stealth) log.info(f"Crawling URL: {item.url}") crawl = await crawl_url(item.url) html_content = crawl.get("html") extracted_text = crawl.get("text") or extracted_text if not title: title = crawl.get("title") item.metadata_json["description"] = crawl.get("description") item.metadata_json["author"] = crawl.get("author") item.metadata_json["status_code"] = crawl.get("status_code") # Screenshot (from crawler, base64 JPEG) if crawl.get("screenshot"): ss_path = await save_screenshot_from_base64(crawl["screenshot"], item.id) if ss_path: db.add(ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="screenshot", filename="screenshot.jpg", content_type="image/jpeg", storage_path=ss_path, )) # og:image (extracted from rendered DOM by crawler) og_url = crawl.get("og_image_url") if og_url: og_path = await download_og_image(og_url, item.id) if og_path: db.add(ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="og_image", filename="og_image.jpg", content_type="image/jpeg", storage_path=og_path, )) item.metadata_json["og_image_url"] = og_url # Archive HTML if html_content: html_path = await archive_html(html_content, item.id) if html_path: db.add(ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="archived_html", filename="page.html", content_type="text/html", storage_path=html_path, )) # ── Step 1b: Process uploaded files (PDF, image, document) ── if item.type in ("pdf", "image", "document", "file"): from app.services.extract import extract_text_from_file, describe_image_with_vision from app.services.storage import storage as file_storage # Find the original upload asset upload_asset = None for a in item.assets: if a.asset_type == "original_upload": upload_asset = a break if upload_asset and file_storage.exists(upload_asset.storage_path): log.info(f"Extracting text from {item.type}: {upload_asset.filename}") file_bytes = file_storage.read(upload_asset.storage_path) result = extract_text_from_file( file_bytes, upload_asset.content_type or "application/octet-stream", upload_asset.filename, ) if result["text"]: extracted_text = result["text"] log.info(f"Extracted {len(extracted_text)} chars via {result['method']}") # Save PDF screenshot as an asset if result.get("screenshot_png"): from app.services.storage import storage path = storage.save( item_id=item.id, asset_type="screenshot", filename="screenshot.png", data=result["screenshot_png"], ) asset = ItemAsset( id=str(uuid.uuid4()), item_id=item.id, asset_type="screenshot", filename="screenshot.png", content_type="image/png", storage_path=path, ) db.add(asset) # For images with little OCR text, try vision API for description if item.type == "image" and len(extracted_text) < 50: log.info("Image has little OCR text, trying vision API...") vision_text = await describe_image_with_vision( file_bytes, upload_asset.content_type or "image/png", ) if vision_text: extracted_text = vision_text log.info(f"Vision API returned {len(vision_text)} chars") item.metadata_json = item.metadata_json or {} item.metadata_json["extraction_method"] = result["method"] if result.get("page_count"): item.metadata_json["page_count"] = result["page_count"] # ── Step 2: Fetch live taxonomy from DB ── from app.models.taxonomy import Folder as FolderModel, Tag as TagModel, ItemTag, ensure_user_taxonomy await ensure_user_taxonomy(db, item.user_id) active_folders = (await db.execute( select(FolderModel).where(FolderModel.user_id == item.user_id, FolderModel.is_active == True) .order_by(FolderModel.sort_order) )).scalars().all() active_tags = (await db.execute( select(TagModel).where(TagModel.user_id == item.user_id, TagModel.is_active == True) .order_by(TagModel.sort_order) )).scalars().all() folder_names = [f.name for f in active_folders] tag_names = [t.name for t in active_tags] folder_map = {f.name: f for f in active_folders} tag_map = {t.name: t for t in active_tags} # ── Step 3: AI classification ── log.info(f"Classifying item {item.id} with {len(folder_names)} folders, {len(tag_names)} tags") classification = await classify_item( item_type=item.type, url=item.url, title=title, text=extracted_text, folders=folder_names, tags=tag_names, ) item.title = classification.get("title") or title or "Untitled" # Set folder (relational + denormalized) classified_folder = classification.get("folder", folder_names[0] if folder_names else "Knowledge") item.folder = classified_folder if classified_folder in folder_map: item.folder_id = folder_map[classified_folder].id # Set tags (relational + denormalized) classified_tags = classification.get("tags", []) item.tags = classified_tags # Clear old item_tags and create new ones from sqlalchemy import delete as sa_delete await db.execute(sa_delete(ItemTag).where(ItemTag.item_id == item.id)) for tag_name in classified_tags: if tag_name in tag_map: db.add(ItemTag(item_id=item.id, tag_id=tag_map[tag_name].id)) # For notes: replace raw_content with spell-corrected version corrected = classification.get("corrected_text", "") if item.type == "note" and corrected and corrected.strip(): item.raw_content = corrected item.summary = classification.get("summary") item.confidence = classification.get("confidence", 0.0) item.extracted_text = extracted_text # ── Step 3: Generate embedding ── log.info(f"Generating embedding for item {item.id}") embed_text = f"{item.title or ''}\n{item.summary or ''}\n{extracted_text}" embedding = await generate_embedding(embed_text) if embedding: item.embedding = embedding # ── Step 4: Update status ── item.processing_status = "ready" item.updated_at = datetime.utcnow() await db.commit() # ── Step 5: Index in Meilisearch ── log.info(f"Indexing item {item.id} in Meilisearch") await ensure_meili_index() await index_item({ "id": item.id, "user_id": item.user_id, "type": item.type, "title": item.title, "url": item.url, "folder": item.folder, "tags": item.tags or [], "summary": item.summary, "extracted_text": (extracted_text or "")[:10000], # Truncate for search index "processing_status": item.processing_status, "created_at": item.created_at.isoformat() if item.created_at else None, }) log.info(f"Item {item.id} processed successfully") except Exception as e: log.error(f"Processing failed for item {item.id}: {e}", exc_info=True) item.processing_status = "failed" item.processing_error = str(e)[:500] item.updated_at = datetime.utcnow() await db.commit() await engine.dispose()