- PDF: extracts selectable text via pymupdf, falls back to Tesseract OCR for scanned docs - PDF: renders first page as screenshot thumbnail - Images: Tesseract OCR for text extraction, OpenAI vision API fallback for photos - Plain text files: direct decode - All extracted text stored in extracted_text field for search/embedding - Tested: PDF upload → text extracted → AI classified → searchable New deps: pymupdf, pytesseract, Pillow System dep: tesseract-ocr added to both Dockerfiles Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
222 lines
9.1 KiB
Python
222 lines
9.1 KiB
Python
"""Background worker tasks — processes items after creation."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
from redis import Redis
|
|
from rq import Queue
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.config import REDIS_URL, DATABASE_URL_SYNC
|
|
from app.models.item import Item, ItemAsset
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# RQ queue
|
|
_redis = Redis.from_url(REDIS_URL)
|
|
queue = Queue("brain", connection=_redis)
|
|
|
|
|
|
def enqueue_process_item(item_id: str):
|
|
"""Enqueue a background job to process an item."""
|
|
queue.enqueue(process_item_job, item_id, job_timeout=300)
|
|
|
|
|
|
def process_item_job(item_id: str):
|
|
"""Synchronous entry point for RQ — runs the async pipeline."""
|
|
asyncio.run(_process_item(item_id))
|
|
|
|
|
|
async def _process_item(item_id: str):
|
|
"""Full processing pipeline for a saved item."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
|
from app.config import DATABASE_URL
|
|
from app.services.ingest import fetch_url_content, take_screenshot, archive_html
|
|
from app.services.classify import classify_item
|
|
from app.services.embed import generate_embedding
|
|
from app.search.engine import index_item, ensure_meili_index
|
|
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
async with Session() as db:
|
|
# Load item
|
|
result = await db.execute(
|
|
select(Item).options(selectinload(Item.assets)).where(Item.id == item_id)
|
|
)
|
|
item = result.scalar_one_or_none()
|
|
if not item:
|
|
log.error(f"Item {item_id} not found")
|
|
return
|
|
|
|
try:
|
|
item.processing_status = "processing"
|
|
await db.commit()
|
|
|
|
extracted_text = item.raw_content or ""
|
|
title = item.title
|
|
html_content = None
|
|
|
|
# ── Step 1: Fetch content for URLs ──
|
|
if item.type == "link" and item.url:
|
|
log.info(f"Fetching URL: {item.url}")
|
|
content = await fetch_url_content(item.url)
|
|
html_content = content.get("html")
|
|
extracted_text = content.get("text") or extracted_text
|
|
if not title:
|
|
title = content.get("title")
|
|
item.metadata_json = item.metadata_json or {}
|
|
item.metadata_json["description"] = content.get("description")
|
|
item.metadata_json["used_browserless"] = content.get("used_browserless", False)
|
|
|
|
# Take screenshot
|
|
screenshot_path = await take_screenshot(item.url, item.id)
|
|
if screenshot_path:
|
|
asset = ItemAsset(
|
|
id=str(uuid.uuid4()),
|
|
item_id=item.id,
|
|
asset_type="screenshot",
|
|
filename="screenshot.png",
|
|
content_type="image/png",
|
|
storage_path=screenshot_path,
|
|
)
|
|
db.add(asset)
|
|
|
|
# Archive HTML
|
|
if html_content:
|
|
html_path = await archive_html(html_content, item.id)
|
|
if html_path:
|
|
asset = ItemAsset(
|
|
id=str(uuid.uuid4()),
|
|
item_id=item.id,
|
|
asset_type="archived_html",
|
|
filename="page.html",
|
|
content_type="text/html",
|
|
storage_path=html_path,
|
|
)
|
|
db.add(asset)
|
|
|
|
# ── Step 1b: Process uploaded files (PDF, image, document) ──
|
|
if item.type in ("pdf", "image", "document", "file"):
|
|
from app.services.extract import extract_text_from_file, describe_image_with_vision
|
|
from app.services.storage import storage as file_storage
|
|
|
|
# Find the original upload asset
|
|
upload_asset = None
|
|
for a in item.assets:
|
|
if a.asset_type == "original_upload":
|
|
upload_asset = a
|
|
break
|
|
|
|
if upload_asset and file_storage.exists(upload_asset.storage_path):
|
|
log.info(f"Extracting text from {item.type}: {upload_asset.filename}")
|
|
file_bytes = file_storage.read(upload_asset.storage_path)
|
|
result = extract_text_from_file(
|
|
file_bytes,
|
|
upload_asset.content_type or "application/octet-stream",
|
|
upload_asset.filename,
|
|
)
|
|
|
|
if result["text"]:
|
|
extracted_text = result["text"]
|
|
log.info(f"Extracted {len(extracted_text)} chars via {result['method']}")
|
|
|
|
# Save PDF screenshot as an asset
|
|
if result.get("screenshot_png"):
|
|
from app.services.storage import storage
|
|
path = storage.save(
|
|
item_id=item.id,
|
|
asset_type="screenshot",
|
|
filename="screenshot.png",
|
|
data=result["screenshot_png"],
|
|
)
|
|
asset = ItemAsset(
|
|
id=str(uuid.uuid4()),
|
|
item_id=item.id,
|
|
asset_type="screenshot",
|
|
filename="screenshot.png",
|
|
content_type="image/png",
|
|
storage_path=path,
|
|
)
|
|
db.add(asset)
|
|
|
|
# For images with little OCR text, try vision API for description
|
|
if item.type == "image" and len(extracted_text) < 50:
|
|
log.info("Image has little OCR text, trying vision API...")
|
|
vision_text = await describe_image_with_vision(
|
|
file_bytes,
|
|
upload_asset.content_type or "image/png",
|
|
)
|
|
if vision_text:
|
|
extracted_text = vision_text
|
|
log.info(f"Vision API returned {len(vision_text)} chars")
|
|
|
|
item.metadata_json = item.metadata_json or {}
|
|
item.metadata_json["extraction_method"] = result["method"]
|
|
if result.get("page_count"):
|
|
item.metadata_json["page_count"] = result["page_count"]
|
|
|
|
# ── Step 2: AI classification ──
|
|
log.info(f"Classifying item {item.id}")
|
|
classification = await classify_item(
|
|
item_type=item.type,
|
|
url=item.url,
|
|
title=title,
|
|
text=extracted_text,
|
|
)
|
|
|
|
item.title = classification.get("title") or title or "Untitled"
|
|
item.folder = classification.get("folder", "Knowledge")
|
|
item.tags = classification.get("tags", ["reference", "read-later"])
|
|
|
|
# For notes: replace raw_content with spell-corrected version
|
|
corrected = classification.get("corrected_text", "")
|
|
if item.type == "note" and corrected and corrected.strip():
|
|
item.raw_content = corrected
|
|
item.summary = classification.get("summary")
|
|
item.confidence = classification.get("confidence", 0.0)
|
|
item.extracted_text = extracted_text
|
|
|
|
# ── Step 3: Generate embedding ──
|
|
log.info(f"Generating embedding for item {item.id}")
|
|
embed_text = f"{item.title or ''}\n{item.summary or ''}\n{extracted_text}"
|
|
embedding = await generate_embedding(embed_text)
|
|
if embedding:
|
|
item.embedding = embedding
|
|
|
|
# ── Step 4: Update status ──
|
|
item.processing_status = "ready"
|
|
item.updated_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
# ── Step 5: Index in Meilisearch ──
|
|
log.info(f"Indexing item {item.id} in Meilisearch")
|
|
await ensure_meili_index()
|
|
await index_item({
|
|
"id": item.id,
|
|
"user_id": item.user_id,
|
|
"type": item.type,
|
|
"title": item.title,
|
|
"url": item.url,
|
|
"folder": item.folder,
|
|
"tags": item.tags or [],
|
|
"summary": item.summary,
|
|
"extracted_text": (extracted_text or "")[:10000], # Truncate for search index
|
|
"processing_status": item.processing_status,
|
|
"created_at": item.created_at.isoformat() if item.created_at else None,
|
|
})
|
|
|
|
log.info(f"Item {item.id} processed successfully")
|
|
|
|
except Exception as e:
|
|
log.error(f"Processing failed for item {item.id}: {e}", exc_info=True)
|
|
item.processing_status = "failed"
|
|
item.processing_error = str(e)[:500]
|
|
item.updated_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
await engine.dispose()
|