Full backend service with: - FastAPI REST API with CRUD, search, reprocess endpoints - PostgreSQL + pgvector for items and semantic search - Redis + RQ for background job processing - Meilisearch for fast keyword/filter search - Browserless/Chrome for JS rendering and screenshots - OpenAI structured output for AI classification - Local file storage with S3-ready abstraction - Gateway auth via X-Gateway-User-Id header - Own docker-compose stack (6 containers) Classification: fixed folders (Home/Family/Work/Travel/Knowledge/Faith/Projects) and fixed tags (28 predefined). AI assigns exactly 1 folder, 2-3 tags, title, summary, and confidence score per item. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
184 lines
5.6 KiB
Python
184 lines
5.6 KiB
Python
"""Search engine — Meilisearch for keywords, pgvector for semantic, hybrid merges both."""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from sqlalchemy import select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.config import MEILI_URL, MEILI_KEY, MEILI_INDEX, OPENAI_EMBED_DIM
|
|
from app.models.item import Item
|
|
from app.services.embed import generate_embedding
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Meilisearch helpers ──
|
|
|
|
async def _meili_request(method: str, path: str, json_data: dict = None) -> dict | None:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.request(
|
|
method,
|
|
f"{MEILI_URL}/{path}",
|
|
json=json_data,
|
|
headers={"Authorization": f"Bearer {MEILI_KEY}"},
|
|
)
|
|
if resp.status_code < 300:
|
|
return resp.json() if resp.content else {}
|
|
log.warning(f"Meilisearch {method} {path}: {resp.status_code}")
|
|
except Exception as e:
|
|
log.error(f"Meilisearch error: {e}")
|
|
return None
|
|
|
|
|
|
async def ensure_meili_index():
|
|
"""Create the Meilisearch index if it doesn't exist."""
|
|
await _meili_request("POST", "indexes", {"uid": MEILI_INDEX, "primaryKey": "id"})
|
|
# Set filterable attributes
|
|
await _meili_request("PUT", f"indexes/{MEILI_INDEX}/settings", {
|
|
"filterableAttributes": ["user_id", "folder", "tags", "type", "processing_status"],
|
|
"searchableAttributes": ["title", "extracted_text", "summary", "url"],
|
|
"sortableAttributes": ["created_at"],
|
|
})
|
|
|
|
|
|
async def index_item(item_data: dict):
|
|
"""Add or update an item in Meilisearch."""
|
|
await _meili_request("POST", f"indexes/{MEILI_INDEX}/documents", [item_data])
|
|
|
|
|
|
async def remove_from_index(item_id: str):
|
|
"""Remove an item from Meilisearch."""
|
|
await _meili_request("DELETE", f"indexes/{MEILI_INDEX}/documents/{item_id}")
|
|
|
|
|
|
# ── Keyword search (Meilisearch) ──
|
|
|
|
async def keyword_search(
|
|
user_id: str,
|
|
q: str,
|
|
folder: str | None = None,
|
|
tags: list[str] | None = None,
|
|
item_type: str | None = None,
|
|
limit: int = 20,
|
|
offset: int = 0,
|
|
) -> tuple[list[str], int]:
|
|
"""Search Meilisearch. Returns (item_ids, total)."""
|
|
filters = [f'user_id = "{user_id}"', 'processing_status = "ready"']
|
|
if folder:
|
|
filters.append(f'folder = "{folder}"')
|
|
if item_type:
|
|
filters.append(f'type = "{item_type}"')
|
|
if tags:
|
|
for tag in tags:
|
|
filters.append(f'tags = "{tag}"')
|
|
|
|
result = await _meili_request("POST", f"indexes/{MEILI_INDEX}/search", {
|
|
"q": q,
|
|
"filter": " AND ".join(filters),
|
|
"limit": limit,
|
|
"offset": offset,
|
|
})
|
|
|
|
if not result:
|
|
return [], 0
|
|
|
|
ids = [hit["id"] for hit in result.get("hits", [])]
|
|
total = result.get("estimatedTotalHits", len(ids))
|
|
return ids, total
|
|
|
|
|
|
# ── Semantic search (pgvector) ──
|
|
|
|
async def vector_search(
|
|
db: AsyncSession,
|
|
user_id: str,
|
|
q: str,
|
|
folder: str | None = None,
|
|
item_type: str | None = None,
|
|
limit: int = 20,
|
|
) -> list:
|
|
"""Semantic similarity search using pgvector cosine distance."""
|
|
query_embedding = await generate_embedding(q)
|
|
if not query_embedding:
|
|
return []
|
|
|
|
embedding_str = "[" + ",".join(str(x) for x in query_embedding) + "]"
|
|
|
|
filters = ["i.user_id = :user_id", "i.processing_status = 'ready'", "i.embedding IS NOT NULL"]
|
|
params = {"user_id": user_id, "limit": limit}
|
|
|
|
if folder:
|
|
filters.append("i.folder = :folder")
|
|
params["folder"] = folder
|
|
if item_type:
|
|
filters.append("i.type = :item_type")
|
|
params["item_type"] = item_type
|
|
|
|
where = " AND ".join(filters)
|
|
|
|
sql = text(f"""
|
|
SELECT i.id, i.embedding <=> '{embedding_str}'::vector AS distance
|
|
FROM items i
|
|
WHERE {where}
|
|
ORDER BY distance ASC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
result = await db.execute(sql, params)
|
|
rows = result.fetchall()
|
|
item_ids = [row[0] for row in rows]
|
|
|
|
if not item_ids:
|
|
return []
|
|
|
|
items_result = await db.execute(
|
|
select(Item).options(selectinload(Item.assets))
|
|
.where(Item.id.in_(item_ids))
|
|
)
|
|
items_map = {i.id: i for i in items_result.scalars().all()}
|
|
return [items_map[id] for id in item_ids if id in items_map]
|
|
|
|
|
|
# ── Hybrid search ──
|
|
|
|
async def hybrid_search(
|
|
db: AsyncSession,
|
|
user_id: str,
|
|
q: str,
|
|
folder: str | None = None,
|
|
tags: list[str] | None = None,
|
|
item_type: str | None = None,
|
|
limit: int = 20,
|
|
) -> list:
|
|
"""Merge keyword + semantic results using reciprocal rank fusion."""
|
|
# Keyword results
|
|
kw_ids, _ = await keyword_search(user_id, q, folder, tags, item_type, limit=limit * 2)
|
|
# Semantic results
|
|
sem_items = await vector_search(db, user_id, q, folder, item_type, limit=limit * 2)
|
|
sem_ids = [i.id for i in sem_items]
|
|
|
|
# Reciprocal rank fusion
|
|
scores: dict[str, float] = {}
|
|
k = 60 # RRF constant
|
|
for rank, id in enumerate(kw_ids):
|
|
scores[id] = scores.get(id, 0) + 1.0 / (k + rank)
|
|
for rank, id in enumerate(sem_ids):
|
|
scores[id] = scores.get(id, 0) + 1.0 / (k + rank)
|
|
|
|
# Sort by combined score
|
|
merged_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:limit]
|
|
|
|
if not merged_ids:
|
|
return []
|
|
|
|
result = await db.execute(
|
|
select(Item).options(selectinload(Item.assets))
|
|
.where(Item.id.in_(merged_ids))
|
|
)
|
|
items_map = {i.id: i for i in result.scalars().all()}
|
|
return [items_map[id] for id in merged_ids if id in items_map]
|