Files
platform/services/brain/app/api/routes.py
Yusuf Suleman 4592e35732
All checks were successful
Security Checks / dependency-audit (push) Successful in 1m13s
Security Checks / secret-scanning (push) Successful in 3s
Security Checks / dockerfile-lint (push) Successful in 3s
feat: major platform expansion — Brain service, RSS reader, iOS app, AI assistants, Firefox extension
Brain Service:
- Playwright stealth crawler replacing browserless (og:image, Readability, Reddit JSON API)
- AI classification with tag definitions and folder assignment
- YouTube video download via yt-dlp
- Karakeep migration complete (96 items)
- Taxonomy management (folders with icons/colors, tags)
- Discovery shuffle, sort options, search (Meilisearch + pgvector)
- Item tag/folder editing, card color accents

RSS Reader Service:
- Custom FastAPI reader replacing Miniflux
- Feed management (add/delete/refresh), category support
- Full article extraction via Readability
- Background content fetching for new entries
- Mark all read with confirmation
- Infinite scroll, retention cleanup (30/60 day)
- 17 feeds migrated from Miniflux

iOS App (SwiftUI):
- Native iOS 17+ app with @Observable architecture
- Cookie-based auth, configurable gateway URL
- Dashboard with custom background photo + frosted glass widgets
- Full fitness module (today/templates/goals/food library)
- AI assistant chat (fitness + brain, raw JSON state management)
- 120fps ProMotion support

AI Assistants (Gateway):
- Unified dispatcher with fitness/brain domain detection
- Fitness: natural language food logging, photo analysis, multi-item splitting
- Brain: save/append/update/delete notes, search & answer, undo support
- Madiha user gets fitness-only (brain disabled)

Firefox Extension:
- One-click save to Brain from any page
- Login with platform credentials
- Right-click context menu (save page/link/image)
- Notes field for URL saves
- Signed and published on AMO

Other:
- Reader bookmark button routes to Brain (was Karakeep)
- Fitness food library with "Add" button + add-to-meal popup
- Kindle send file size check (25MB SMTP2GO limit)
- Atelier UI as default (useAtelierShell=true)
- Mobile upload box in nav drawer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 00:56:29 -05:00

493 lines
15 KiB
Python

"""Brain API endpoints."""
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query
from sqlalchemy import select, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_user_id, get_db_session
from app.config import FOLDERS, TAGS
from app.models.item import Item, ItemAsset, ItemAddition
from app.models.schema import (
ItemCreate, ItemUpdate, ItemOut, ItemList, SearchQuery, SemanticSearchQuery,
HybridSearchQuery, SearchResult, ConfigOut, ItemAdditionCreate, ItemAdditionOut,
)
from app.services.storage import storage
from fastapi.responses import Response
from app.worker.tasks import enqueue_process_item
router = APIRouter(prefix="/api", tags=["brain"])
async def refresh_item_search_state(db: AsyncSession, item: Item):
"""Recompute embedding + Meilisearch doc after assistant additions change."""
from app.search.engine import index_item
from app.services.embed import generate_embedding
additions_result = await db.execute(
select(ItemAddition)
.where(ItemAddition.item_id == item.id, ItemAddition.user_id == item.user_id)
.order_by(ItemAddition.created_at.asc())
)
additions = additions_result.scalars().all()
additions_text = "\n\n".join(addition.content for addition in additions if addition.content.strip())
searchable_text_parts = [item.raw_content or "", item.extracted_text or "", additions_text]
searchable_text = "\n\n".join(part.strip() for part in searchable_text_parts if part and part.strip())
embed_text = f"{item.title or ''}\n{item.summary or ''}\n{searchable_text}".strip()
embedding = await generate_embedding(embed_text)
if embedding:
item.embedding = embedding
item.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(item)
await index_item({
"id": item.id,
"user_id": item.user_id,
"type": item.type,
"title": item.title,
"url": item.url,
"folder": item.folder,
"tags": item.tags or [],
"summary": item.summary,
"extracted_text": searchable_text[:10000],
"processing_status": item.processing_status,
"created_at": item.created_at.isoformat() if item.created_at else None,
})
# ── Health ──
@router.get("/health")
async def health():
return {"status": "ok", "service": "brain"}
# ── Config ──
@router.get("/config", response_model=ConfigOut)
async def get_config():
return ConfigOut(folders=FOLDERS, tags=TAGS)
# ── Create item ──
@router.post("/items", response_model=ItemOut, status_code=201)
async def create_item(
body: ItemCreate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
item = Item(
id=str(uuid.uuid4()),
user_id=user_id,
type=body.type,
url=body.url,
raw_content=body.raw_content,
title=body.title,
folder=body.folder,
tags=body.tags or [],
processing_status="pending",
)
db.add(item)
await db.commit()
await db.refresh(item, ["assets"])
# Enqueue background processing
enqueue_process_item(item.id)
return item
# ── Upload file ──
@router.post("/items/upload", response_model=ItemOut, status_code=201)
async def upload_file(
file: UploadFile = File(...),
title: Optional[str] = Form(None),
folder: Optional[str] = Form(None),
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
item_id = str(uuid.uuid4())
content_type = file.content_type or "application/octet-stream"
# Determine type from content_type
if content_type.startswith("image/"):
item_type = "image"
elif content_type == "application/pdf":
item_type = "pdf"
else:
item_type = "file"
# Store the uploaded file
data = await file.read()
path = storage.save(
item_id=item_id,
asset_type="original_upload",
filename=file.filename or "upload",
data=data,
)
item = Item(
id=item_id,
user_id=user_id,
type=item_type,
title=title or file.filename,
folder=folder,
processing_status="pending",
)
db.add(item)
asset = ItemAsset(
id=str(uuid.uuid4()),
item_id=item_id,
asset_type="original_upload",
filename=file.filename or "upload",
content_type=content_type,
size_bytes=len(data),
storage_path=path,
)
db.add(asset)
await db.commit()
await db.refresh(item, ["assets"])
enqueue_process_item(item.id)
return item
# ── Get item ──
@router.get("/items/{item_id}", response_model=ItemOut)
async def get_item(
item_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Item).options(selectinload(Item.assets))
.where(Item.id == item_id, Item.user_id == user_id)
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
# ── List items ──
@router.get("/items", response_model=ItemList)
async def list_items(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
folder: Optional[str] = Query(None),
tag: Optional[str] = Query(None),
type: Optional[str] = Query(None),
status: Optional[str] = Query(None),
limit: int = Query(20, le=100),
offset: int = Query(0),
):
q = select(Item).options(selectinload(Item.assets)).where(Item.user_id == user_id)
if folder:
q = q.where(Item.folder == folder)
if tag:
q = q.where(Item.tags.contains([tag]))
if type:
q = q.where(Item.type == type)
if status:
q = q.where(Item.processing_status == status)
# Count
count_q = select(func.count()).select_from(q.subquery())
total = (await db.execute(count_q)).scalar() or 0
# Fetch
q = q.order_by(desc(Item.created_at)).offset(offset).limit(limit)
result = await db.execute(q)
items = result.scalars().all()
return ItemList(items=items, total=total)
# ── Update item ──
@router.patch("/items/{item_id}", response_model=ItemOut)
async def update_item(
item_id: str,
body: ItemUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Item).options(selectinload(Item.assets))
.where(Item.id == item_id, Item.user_id == user_id)
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if body.title is not None:
item.title = body.title
if body.folder is not None:
item.folder = body.folder
# Update folder_id FK
from app.models.taxonomy import Folder as FolderModel
folder_row = (await db.execute(
select(FolderModel).where(FolderModel.user_id == user_id, FolderModel.name == body.folder)
)).scalar_one_or_none()
item.folder_id = folder_row.id if folder_row else None
if body.tags is not None:
item.tags = body.tags
# Update item_tags relational entries
from app.models.taxonomy import Tag as TagModel, ItemTag
from sqlalchemy import delete as sa_delete
await db.execute(sa_delete(ItemTag).where(ItemTag.item_id == item.id))
for tag_name in body.tags:
tag_row = (await db.execute(
select(TagModel).where(TagModel.user_id == user_id, TagModel.name == tag_name)
)).scalar_one_or_none()
if tag_row:
db.add(ItemTag(item_id=item.id, tag_id=tag_row.id))
if body.raw_content is not None:
item.raw_content = body.raw_content
item.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(item)
await refresh_item_search_state(db, item)
return item
# ── Delete item ──
@router.delete("/items/{item_id}")
async def delete_item(
item_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Item).where(Item.id == item_id, Item.user_id == user_id)
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
# Delete stored assets
for asset in (await db.execute(
select(ItemAsset).where(ItemAsset.item_id == item_id)
)).scalars().all():
storage.delete(asset.storage_path)
await db.delete(item)
await db.commit()
return {"status": "deleted"}
@router.get("/items/{item_id}/additions", response_model=list[ItemAdditionOut])
async def list_item_additions(
item_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
item = (await db.execute(
select(Item).where(Item.id == item_id, Item.user_id == user_id)
)).scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
additions = (await db.execute(
select(ItemAddition)
.where(ItemAddition.item_id == item_id, ItemAddition.user_id == user_id)
.order_by(ItemAddition.created_at.asc())
)).scalars().all()
return additions
@router.post("/items/{item_id}/additions", response_model=ItemAdditionOut, status_code=201)
async def create_item_addition(
item_id: str,
body: ItemAdditionCreate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
item = (await db.execute(
select(Item).where(Item.id == item_id, Item.user_id == user_id)
)).scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
content = body.content.strip()
if not content:
raise HTTPException(status_code=400, detail="Addition content cannot be empty")
addition = ItemAddition(
id=str(uuid.uuid4()),
item_id=item.id,
user_id=user_id,
source=(body.source or "assistant").strip() or "assistant",
kind=(body.kind or "append").strip() or "append",
content=content,
metadata_json=body.metadata_json or {},
)
db.add(addition)
item.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(addition)
result = await db.execute(
select(Item).where(Item.id == item.id, Item.user_id == user_id)
)
fresh_item = result.scalar_one()
await refresh_item_search_state(db, fresh_item)
return addition
@router.delete("/items/{item_id}/additions/{addition_id}")
async def delete_item_addition(
item_id: str,
addition_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
item = (await db.execute(
select(Item).where(Item.id == item_id, Item.user_id == user_id)
)).scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
addition = (await db.execute(
select(ItemAddition).where(
ItemAddition.id == addition_id,
ItemAddition.item_id == item_id,
ItemAddition.user_id == user_id,
)
)).scalar_one_or_none()
if not addition:
raise HTTPException(status_code=404, detail="Addition not found")
await db.delete(addition)
item.updated_at = datetime.utcnow()
await db.commit()
result = await db.execute(
select(Item).where(Item.id == item.id, Item.user_id == user_id)
)
fresh_item = result.scalar_one()
await refresh_item_search_state(db, fresh_item)
return {"status": "deleted"}
# ── Reprocess item ──
@router.post("/items/{item_id}/reprocess", response_model=ItemOut)
async def reprocess_item(
item_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
result = await db.execute(
select(Item).options(selectinload(Item.assets))
.where(Item.id == item_id, Item.user_id == user_id)
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
item.processing_status = "pending"
item.processing_error = None
item.updated_at = datetime.utcnow()
await db.commit()
enqueue_process_item(item.id)
return item
# ── Search (keyword via Meilisearch) ──
@router.post("/search", response_model=SearchResult)
async def search_items(
body: SearchQuery,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
from app.search.engine import keyword_search
item_ids, total = await keyword_search(
user_id=user_id, q=body.q, folder=body.folder, tags=body.tags,
item_type=body.type, limit=body.limit, offset=body.offset,
)
if not item_ids:
return SearchResult(items=[], total=0, query=body.q)
result = await db.execute(
select(Item).options(selectinload(Item.assets))
.where(Item.id.in_(item_ids))
)
items_map = {i.id: i for i in result.scalars().all()}
ordered = [items_map[id] for id in item_ids if id in items_map]
return SearchResult(items=ordered, total=total, query=body.q)
# ── Semantic search (pgvector) ──
@router.post("/search/semantic", response_model=SearchResult)
async def semantic_search(
body: SemanticSearchQuery,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
from app.search.engine import vector_search
items = await vector_search(
db=db, user_id=user_id, q=body.q,
folder=body.folder, item_type=body.type, limit=body.limit,
)
return SearchResult(items=items, total=len(items), query=body.q)
# ── Hybrid search ──
@router.post("/search/hybrid", response_model=SearchResult)
async def hybrid_search(
body: HybridSearchQuery,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db_session),
):
from app.search.engine import hybrid_search as do_hybrid
items = await do_hybrid(
db=db, user_id=user_id, q=body.q,
folder=body.folder, tags=body.tags, item_type=body.type, limit=body.limit,
)
return SearchResult(items=items, total=len(items), query=body.q)
# ── Serve stored files (screenshots, archived HTML) ──
@router.get("/storage/{item_id}/{asset_type}/{filename}")
async def serve_asset(item_id: str, asset_type: str, filename: str):
"""Serve a stored asset file."""
path = f"{item_id}/{asset_type}/{filename}"
if not storage.exists(path):
raise HTTPException(status_code=404, detail="Asset not found")
data = storage.read(path)
ct = "application/octet-stream"
if filename.endswith(".png"): ct = "image/png"
elif filename.endswith(".jpg") or filename.endswith(".jpeg"): ct = "image/jpeg"
elif filename.endswith(".html"): ct = "text/html"
elif filename.endswith(".pdf"): ct = "application/pdf"
elif filename.endswith(".mp4"): ct = "video/mp4"
elif filename.endswith(".webm"): ct = "video/webm"
return Response(content=data, media_type=ct, headers={"Cache-Control": "public, max-age=3600"})