Files
platform/services/brain/app/worker/tasks.py
Yusuf Suleman 68a8d4c228 feat: brain taxonomy — DB-backed folders/tags, sidebar, CRUD API
Backend:
- New Folder/Tag/ItemTag models with proper relational tables
- Taxonomy CRUD endpoints: list, create, rename, delete, merge tags
- Sidebar endpoint with folder/tag counts
- AI classification reads live folders/tags from DB, not hardcoded
- Default folders/tags seeded on first request per user
- folder_id FK on items for relational integrity

Frontend:
- Left sidebar with Folders/Tags tabs (like Karakeep)
- Click folder/tag to filter items
- "Manage" mode: add new folders/tags, delete existing
- Counts next to each folder/tag
- "All items" option to clear filter
- Replaces the old signal-strip cards

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 20:23:45 -05:00

257 lines
11 KiB
Python

"""Background worker tasks — processes items after creation."""
import asyncio
import logging
import uuid
from datetime import datetime
from redis import Redis
from rq import Queue
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.config import REDIS_URL, DATABASE_URL_SYNC
from app.models.item import Item, ItemAsset
log = logging.getLogger(__name__)
# RQ queue
_redis = Redis.from_url(REDIS_URL)
queue = Queue("brain", connection=_redis)
def enqueue_process_item(item_id: str):
"""Enqueue a background job to process an item."""
queue.enqueue(process_item_job, item_id, job_timeout=300)
def process_item_job(item_id: str):
"""Synchronous entry point for RQ — runs the async pipeline."""
asyncio.run(_process_item(item_id))
async def _process_item(item_id: str):
"""Full processing pipeline for a saved item."""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.config import DATABASE_URL
from app.services.ingest import fetch_url_content, take_screenshot, archive_html
from app.services.classify import classify_item
from app.services.embed import generate_embedding
from app.search.engine import index_item, ensure_meili_index
engine = create_async_engine(DATABASE_URL, echo=False)
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with Session() as db:
# Load item
result = await db.execute(
select(Item).options(selectinload(Item.assets)).where(Item.id == item_id)
)
item = result.scalar_one_or_none()
if not item:
log.error(f"Item {item_id} not found")
return
try:
item.processing_status = "processing"
await db.commit()
extracted_text = item.raw_content or ""
title = item.title
html_content = None
# ── Step 1: Fetch content for URLs ──
if item.type == "link" and item.url:
log.info(f"Fetching URL: {item.url}")
content = await fetch_url_content(item.url)
html_content = content.get("html")
extracted_text = content.get("text") or extracted_text
if not title:
title = content.get("title")
item.metadata_json = item.metadata_json or {}
item.metadata_json["description"] = content.get("description")
item.metadata_json["used_browserless"] = content.get("used_browserless", False)
# Take screenshot
screenshot_path = await take_screenshot(item.url, item.id)
if screenshot_path:
asset = ItemAsset(
id=str(uuid.uuid4()),
item_id=item.id,
asset_type="screenshot",
filename="screenshot.png",
content_type="image/png",
storage_path=screenshot_path,
)
db.add(asset)
# Archive HTML
if html_content:
html_path = await archive_html(html_content, item.id)
if html_path:
asset = ItemAsset(
id=str(uuid.uuid4()),
item_id=item.id,
asset_type="archived_html",
filename="page.html",
content_type="text/html",
storage_path=html_path,
)
db.add(asset)
# ── Step 1b: Process uploaded files (PDF, image, document) ──
if item.type in ("pdf", "image", "document", "file"):
from app.services.extract import extract_text_from_file, describe_image_with_vision
from app.services.storage import storage as file_storage
# Find the original upload asset
upload_asset = None
for a in item.assets:
if a.asset_type == "original_upload":
upload_asset = a
break
if upload_asset and file_storage.exists(upload_asset.storage_path):
log.info(f"Extracting text from {item.type}: {upload_asset.filename}")
file_bytes = file_storage.read(upload_asset.storage_path)
result = extract_text_from_file(
file_bytes,
upload_asset.content_type or "application/octet-stream",
upload_asset.filename,
)
if result["text"]:
extracted_text = result["text"]
log.info(f"Extracted {len(extracted_text)} chars via {result['method']}")
# Save PDF screenshot as an asset
if result.get("screenshot_png"):
from app.services.storage import storage
path = storage.save(
item_id=item.id,
asset_type="screenshot",
filename="screenshot.png",
data=result["screenshot_png"],
)
asset = ItemAsset(
id=str(uuid.uuid4()),
item_id=item.id,
asset_type="screenshot",
filename="screenshot.png",
content_type="image/png",
storage_path=path,
)
db.add(asset)
# For images with little OCR text, try vision API for description
if item.type == "image" and len(extracted_text) < 50:
log.info("Image has little OCR text, trying vision API...")
vision_text = await describe_image_with_vision(
file_bytes,
upload_asset.content_type or "image/png",
)
if vision_text:
extracted_text = vision_text
log.info(f"Vision API returned {len(vision_text)} chars")
item.metadata_json = item.metadata_json or {}
item.metadata_json["extraction_method"] = result["method"]
if result.get("page_count"):
item.metadata_json["page_count"] = result["page_count"]
# ── Step 2: Fetch live taxonomy from DB ──
from app.models.taxonomy import Folder as FolderModel, Tag as TagModel, ItemTag, ensure_user_taxonomy
await ensure_user_taxonomy(db, item.user_id)
active_folders = (await db.execute(
select(FolderModel).where(FolderModel.user_id == item.user_id, FolderModel.is_active == True)
.order_by(FolderModel.sort_order)
)).scalars().all()
active_tags = (await db.execute(
select(TagModel).where(TagModel.user_id == item.user_id, TagModel.is_active == True)
.order_by(TagModel.sort_order)
)).scalars().all()
folder_names = [f.name for f in active_folders]
tag_names = [t.name for t in active_tags]
folder_map = {f.name: f for f in active_folders}
tag_map = {t.name: t for t in active_tags}
# ── Step 3: AI classification ──
log.info(f"Classifying item {item.id} with {len(folder_names)} folders, {len(tag_names)} tags")
classification = await classify_item(
item_type=item.type,
url=item.url,
title=title,
text=extracted_text,
folders=folder_names,
tags=tag_names,
)
item.title = classification.get("title") or title or "Untitled"
# Set folder (relational + denormalized)
classified_folder = classification.get("folder", folder_names[0] if folder_names else "Knowledge")
item.folder = classified_folder
if classified_folder in folder_map:
item.folder_id = folder_map[classified_folder].id
# Set tags (relational + denormalized)
classified_tags = classification.get("tags", [])
item.tags = classified_tags
# Clear old item_tags and create new ones
from sqlalchemy import delete as sa_delete
await db.execute(sa_delete(ItemTag).where(ItemTag.item_id == item.id))
for tag_name in classified_tags:
if tag_name in tag_map:
db.add(ItemTag(item_id=item.id, tag_id=tag_map[tag_name].id))
# For notes: replace raw_content with spell-corrected version
corrected = classification.get("corrected_text", "")
if item.type == "note" and corrected and corrected.strip():
item.raw_content = corrected
item.summary = classification.get("summary")
item.confidence = classification.get("confidence", 0.0)
item.extracted_text = extracted_text
# ── Step 3: Generate embedding ──
log.info(f"Generating embedding for item {item.id}")
embed_text = f"{item.title or ''}\n{item.summary or ''}\n{extracted_text}"
embedding = await generate_embedding(embed_text)
if embedding:
item.embedding = embedding
# ── Step 4: Update status ──
item.processing_status = "ready"
item.updated_at = datetime.utcnow()
await db.commit()
# ── Step 5: Index in Meilisearch ──
log.info(f"Indexing item {item.id} in Meilisearch")
await ensure_meili_index()
await index_item({
"id": item.id,
"user_id": item.user_id,
"type": item.type,
"title": item.title,
"url": item.url,
"folder": item.folder,
"tags": item.tags or [],
"summary": item.summary,
"extracted_text": (extracted_text or "")[:10000], # Truncate for search index
"processing_status": item.processing_status,
"created_at": item.created_at.isoformat() if item.created_at else None,
})
log.info(f"Item {item.id} processed successfully")
except Exception as e:
log.error(f"Processing failed for item {item.id}: {e}", exc_info=True)
item.processing_status = "failed"
item.processing_error = str(e)[:500]
item.updated_at = datetime.utcnow()
await db.commit()
await engine.dispose()