"""Search engine — Meilisearch for keywords, pgvector for semantic, hybrid merges both.""" import logging from typing import Optional import httpx from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.config import MEILI_URL, MEILI_KEY, MEILI_INDEX, OPENAI_EMBED_DIM from app.models.item import Item from app.services.embed import generate_embedding log = logging.getLogger(__name__) # ── Meilisearch helpers ── async def _meili_request(method: str, path: str, json_data: dict = None) -> dict | None: try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.request( method, f"{MEILI_URL}/{path}", json=json_data, headers={"Authorization": f"Bearer {MEILI_KEY}"}, ) if resp.status_code < 300: return resp.json() if resp.content else {} log.warning(f"Meilisearch {method} {path}: {resp.status_code}") except Exception as e: log.error(f"Meilisearch error: {e}") return None async def ensure_meili_index(): """Create the Meilisearch index if it doesn't exist.""" await _meili_request("POST", "indexes", {"uid": MEILI_INDEX, "primaryKey": "id"}) # Set filterable attributes await _meili_request("PUT", f"indexes/{MEILI_INDEX}/settings", { "filterableAttributes": ["user_id", "folder", "tags", "type", "processing_status"], "searchableAttributes": ["title", "extracted_text", "summary", "url"], "sortableAttributes": ["created_at"], }) async def index_item(item_data: dict): """Add or update an item in Meilisearch.""" await _meili_request("POST", f"indexes/{MEILI_INDEX}/documents", [item_data]) async def remove_from_index(item_id: str): """Remove an item from Meilisearch.""" await _meili_request("DELETE", f"indexes/{MEILI_INDEX}/documents/{item_id}") # ── Keyword search (Meilisearch) ── async def keyword_search( user_id: str, q: str, folder: str | None = None, tags: list[str] | None = None, item_type: str | None = None, limit: int = 20, offset: int = 0, ) -> tuple[list[str], int]: """Search Meilisearch. Returns (item_ids, total).""" filters = [f'user_id = "{user_id}"', 'processing_status = "ready"'] if folder: filters.append(f'folder = "{folder}"') if item_type: filters.append(f'type = "{item_type}"') if tags: for tag in tags: filters.append(f'tags = "{tag}"') result = await _meili_request("POST", f"indexes/{MEILI_INDEX}/search", { "q": q, "filter": " AND ".join(filters), "limit": limit, "offset": offset, }) if not result: return [], 0 ids = [hit["id"] for hit in result.get("hits", [])] total = result.get("estimatedTotalHits", len(ids)) return ids, total # ── Semantic search (pgvector) ── async def vector_search( db: AsyncSession, user_id: str, q: str, folder: str | None = None, item_type: str | None = None, limit: int = 20, ) -> list: """Semantic similarity search using pgvector cosine distance.""" query_embedding = await generate_embedding(q) if not query_embedding: return [] embedding_str = "[" + ",".join(str(x) for x in query_embedding) + "]" filters = ["i.user_id = :user_id", "i.processing_status = 'ready'", "i.embedding IS NOT NULL"] params = {"user_id": user_id, "limit": limit} if folder: filters.append("i.folder = :folder") params["folder"] = folder if item_type: filters.append("i.type = :item_type") params["item_type"] = item_type where = " AND ".join(filters) sql = text(f""" SELECT i.id, i.embedding <=> '{embedding_str}'::vector AS distance FROM items i WHERE {where} ORDER BY distance ASC LIMIT :limit """) result = await db.execute(sql, params) rows = result.fetchall() item_ids = [row[0] for row in rows] if not item_ids: return [] items_result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id.in_(item_ids)) ) items_map = {i.id: i for i in items_result.scalars().all()} return [items_map[id] for id in item_ids if id in items_map] # ── Hybrid search ── async def hybrid_search( db: AsyncSession, user_id: str, q: str, folder: str | None = None, tags: list[str] | None = None, item_type: str | None = None, limit: int = 20, ) -> list: """Merge keyword + semantic results using reciprocal rank fusion.""" # Keyword results kw_ids, _ = await keyword_search(user_id, q, folder, tags, item_type, limit=limit * 2) # Semantic results sem_items = await vector_search(db, user_id, q, folder, item_type, limit=limit * 2) sem_ids = [i.id for i in sem_items] # Reciprocal rank fusion scores: dict[str, float] = {} k = 60 # RRF constant for rank, id in enumerate(kw_ids): scores[id] = scores.get(id, 0) + 1.0 / (k + rank) for rank, id in enumerate(sem_ids): scores[id] = scores.get(id, 0) + 1.0 / (k + rank) # Sort by combined score merged_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:limit] if not merged_ids: return [] result = await db.execute( select(Item).options(selectinload(Item.assets)) .where(Item.id.in_(merged_ids)) ) items_map = {i.id: i for i in result.scalars().all()} return [items_map[id] for id in merged_ids if id in items_map]