"""Brain API endpoints.""" from __future__ import annotations import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query from sqlalchemy import select, func, desc from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.api.deps import get_user_id, get_db_session from app.config import FOLDERS, TAGS from app.models.item import Item, ItemAsset, ItemAddition from app.models.schema import ( ItemCreate, ItemUpdate, ItemOut, ItemList, SearchQuery, SemanticSearchQuery, HybridSearchQuery, SearchResult, ConfigOut, ItemAdditionCreate, ItemAdditionOut, ) from app.services.storage import storage from fastapi.responses import Response from app.worker.tasks import enqueue_process_item router = APIRouter(prefix="/api", tags=["brain"]) async def refresh_item_search_state(db: AsyncSession, item: Item): """Recompute embedding + Meilisearch doc after assistant additions change.""" from app.search.engine import index_item from app.services.embed import generate_embedding additions_result = await db.execute( select(ItemAddition) .where(ItemAddition.item_id == item.id, ItemAddition.user_id == item.user_id) .order_by(ItemAddition.created_at.asc()) ) additions = additions_result.scalars().all() additions_text = "\n\n".join(addition.content for addition in additions if addition.content.strip()) searchable_text_parts = [item.raw_content or "", item.extracted_text or "", additions_text] searchable_text = "\n\n".join(part.strip() for part in searchable_text_parts if part and part.strip()) embed_text = f"{item.title or ''}\n{item.summary or ''}\n{searchable_text}".strip() embedding = await generate_embedding(embed_text) if embedding: item.embedding = embedding item.updated_at = datetime.utcnow() await db.commit() await db.refresh(item) await index_item({ "id": item.id, "user_id": item.user_id, "type": item.type, "title": item.title, "url": item.url, "folder": item.folder, "tags": item.tags or [], "summary": item.summary, "extracted_text": searchable_text[:10000], "processing_status": item.processing_status, "created_at": item.created_at.isoformat() if item.created_at else None, }) # ── Health ── @router.get("/health") async def health(): return {"status": "ok", "service": "brain"} # ── Config ── @router.get("/config", response_model=ConfigOut) async def get_config(): return ConfigOut(folders=FOLDERS, tags=TAGS) # ── Create item ── @router.post("/items", response_model=ItemOut, status_code=201) async def create_item( body: ItemCreate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): item = Item( id=str(uuid.uuid4()), user_id=user_id, type=body.type, url=body.url, raw_content=body.raw_content, title=body.title, folder=body.folder, tags=body.tags or [], processing_status="pending", ) db.add(item) await db.commit() await db.refresh(item, ["assets"]) # Enqueue background processing enqueue_process_item(item.id) return item # ── Upload file ── @router.post("/items/upload", response_model=ItemOut, status_code=201) async def upload_file( file: UploadFile = File(...), title: Optional[str] = Form(None), folder: Optional[str] = Form(None), user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): item_id = str(uuid.uuid4()) content_type = file.content_type or "application/octet-stream" # Determine type from content_type if content_type.startswith("image/"): item_type = "image" elif content_type == "application/pdf": item_type = "pdf" else: item_type = "file" # Store the uploaded file data = await file.read() path = storage.save( item_id=item_id, asset_type="original_upload", filename=file.filename or "upload", data=data, ) item = Item( id=item_id, user_id=user_id, type=item_type, title=title or file.filename, folder=folder, processing_status="pending", ) db.add(item) asset = ItemAsset( id=str(uuid.uuid4()), item_id=item_id, asset_type="original_upload", filename=file.filename or "upload", content_type=content_type, size_bytes=len(data), storage_path=path, ) db.add(asset) await db.commit() await db.refresh(item, ["assets"]) enqueue_process_item(item.id) return item # ── Get item ── @router.get("/items/{item_id}", response_model=ItemOut) async def get_item( item_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id == item_id, Item.user_id == user_id) ) item = result.scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") return item # ── List items ── @router.get("/items", response_model=ItemList) async def list_items( user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), folder: Optional[str] = Query(None), tag: Optional[str] = Query(None), type: Optional[str] = Query(None), status: Optional[str] = Query(None), limit: int = Query(20, le=100), offset: int = Query(0), ): q = select(Item).options(selectinload(Item.assets)).where(Item.user_id == user_id) if folder: q = q.where(Item.folder == folder) if tag: q = q.where(Item.tags.contains([tag])) if type: q = q.where(Item.type == type) if status: q = q.where(Item.processing_status == status) # Count count_q = select(func.count()).select_from(q.subquery()) total = (await db.execute(count_q)).scalar() or 0 # Fetch q = q.order_by(desc(Item.created_at)).offset(offset).limit(limit) result = await db.execute(q) items = result.scalars().all() return ItemList(items=items, total=total) # ── Update item ── @router.patch("/items/{item_id}", response_model=ItemOut) async def update_item( item_id: str, body: ItemUpdate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id == item_id, Item.user_id == user_id) ) item = result.scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") if body.title is not None: item.title = body.title if body.folder is not None: item.folder = body.folder # Update folder_id FK from app.models.taxonomy import Folder as FolderModel folder_row = (await db.execute( select(FolderModel).where(FolderModel.user_id == user_id, FolderModel.name == body.folder) )).scalar_one_or_none() item.folder_id = folder_row.id if folder_row else None if body.tags is not None: item.tags = body.tags # Update item_tags relational entries from app.models.taxonomy import Tag as TagModel, ItemTag from sqlalchemy import delete as sa_delete await db.execute(sa_delete(ItemTag).where(ItemTag.item_id == item.id)) for tag_name in body.tags: tag_row = (await db.execute( select(TagModel).where(TagModel.user_id == user_id, TagModel.name == tag_name) )).scalar_one_or_none() if tag_row: db.add(ItemTag(item_id=item.id, tag_id=tag_row.id)) if body.raw_content is not None: item.raw_content = body.raw_content item.updated_at = datetime.utcnow() await db.commit() await db.refresh(item) await refresh_item_search_state(db, item) return item # ── Delete item ── @router.delete("/items/{item_id}") async def delete_item( item_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): result = await db.execute( select(Item).where(Item.id == item_id, Item.user_id == user_id) ) item = result.scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") # Delete stored assets for asset in (await db.execute( select(ItemAsset).where(ItemAsset.item_id == item_id) )).scalars().all(): storage.delete(asset.storage_path) await db.delete(item) await db.commit() return {"status": "deleted"} @router.get("/items/{item_id}/additions", response_model=list[ItemAdditionOut]) async def list_item_additions( item_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): item = (await db.execute( select(Item).where(Item.id == item_id, Item.user_id == user_id) )).scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") additions = (await db.execute( select(ItemAddition) .where(ItemAddition.item_id == item_id, ItemAddition.user_id == user_id) .order_by(ItemAddition.created_at.asc()) )).scalars().all() return additions @router.post("/items/{item_id}/additions", response_model=ItemAdditionOut, status_code=201) async def create_item_addition( item_id: str, body: ItemAdditionCreate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): item = (await db.execute( select(Item).where(Item.id == item_id, Item.user_id == user_id) )).scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") content = body.content.strip() if not content: raise HTTPException(status_code=400, detail="Addition content cannot be empty") addition = ItemAddition( id=str(uuid.uuid4()), item_id=item.id, user_id=user_id, source=(body.source or "assistant").strip() or "assistant", kind=(body.kind or "append").strip() or "append", content=content, metadata_json=body.metadata_json or {}, ) db.add(addition) item.updated_at = datetime.utcnow() await db.commit() await db.refresh(addition) result = await db.execute( select(Item).where(Item.id == item.id, Item.user_id == user_id) ) fresh_item = result.scalar_one() await refresh_item_search_state(db, fresh_item) return addition @router.delete("/items/{item_id}/additions/{addition_id}") async def delete_item_addition( item_id: str, addition_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): item = (await db.execute( select(Item).where(Item.id == item_id, Item.user_id == user_id) )).scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") addition = (await db.execute( select(ItemAddition).where( ItemAddition.id == addition_id, ItemAddition.item_id == item_id, ItemAddition.user_id == user_id, ) )).scalar_one_or_none() if not addition: raise HTTPException(status_code=404, detail="Addition not found") await db.delete(addition) item.updated_at = datetime.utcnow() await db.commit() result = await db.execute( select(Item).where(Item.id == item.id, Item.user_id == user_id) ) fresh_item = result.scalar_one() await refresh_item_search_state(db, fresh_item) return {"status": "deleted"} # ── Reprocess item ── @router.post("/items/{item_id}/reprocess", response_model=ItemOut) async def reprocess_item( item_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id == item_id, Item.user_id == user_id) ) item = result.scalar_one_or_none() if not item: raise HTTPException(status_code=404, detail="Item not found") item.processing_status = "pending" item.processing_error = None item.updated_at = datetime.utcnow() await db.commit() enqueue_process_item(item.id) return item # ── Search (keyword via Meilisearch) ── @router.post("/search", response_model=SearchResult) async def search_items( body: SearchQuery, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): from app.search.engine import keyword_search item_ids, total = await keyword_search( user_id=user_id, q=body.q, folder=body.folder, tags=body.tags, item_type=body.type, limit=body.limit, offset=body.offset, ) if not item_ids: return SearchResult(items=[], total=0, query=body.q) result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id.in_(item_ids)) ) items_map = {i.id: i for i in result.scalars().all()} ordered = [items_map[id] for id in item_ids if id in items_map] return SearchResult(items=ordered, total=total, query=body.q) # ── Semantic search (pgvector) ── @router.post("/search/semantic", response_model=SearchResult) async def semantic_search( body: SemanticSearchQuery, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): from app.search.engine import vector_search items = await vector_search( db=db, user_id=user_id, q=body.q, folder=body.folder, item_type=body.type, limit=body.limit, ) return SearchResult(items=items, total=len(items), query=body.q) # ── Hybrid search ── @router.post("/search/hybrid", response_model=SearchResult) async def hybrid_search( body: HybridSearchQuery, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): from app.search.engine import hybrid_search as do_hybrid items = await do_hybrid( db=db, user_id=user_id, q=body.q, folder=body.folder, tags=body.tags, item_type=body.type, limit=body.limit, ) return SearchResult(items=items, total=len(items), query=body.q) # ── Serve stored files (screenshots, archived HTML) ── @router.get("/storage/{item_id}/{asset_type}/{filename}") async def serve_asset(item_id: str, asset_type: str, filename: str): """Serve a stored asset file.""" path = f"{item_id}/{asset_type}/{filename}" if not storage.exists(path): raise HTTPException(status_code=404, detail="Asset not found") data = storage.read(path) ct = "application/octet-stream" if filename.endswith(".png"): ct = "image/png" elif filename.endswith(".jpg") or filename.endswith(".jpeg"): ct = "image/jpeg" elif filename.endswith(".html"): ct = "text/html" elif filename.endswith(".pdf"): ct = "application/pdf" elif filename.endswith(".mp4"): ct = "video/mp4" elif filename.endswith(".webm"): ct = "video/webm" return Response(content=data, media_type=ct, headers={"Cache-Control": "public, max-age=3600"})