- PDF: extracts selectable text via pymupdf, falls back to Tesseract OCR for scanned docs - PDF: renders first page as screenshot thumbnail - Images: Tesseract OCR for text extraction, OpenAI vision API fallback for photos - Plain text files: direct decode - All extracted text stored in extracted_text field for search/embedding - Tested: PDF upload → text extracted → AI classified → searchable New deps: pymupdf, pytesseract, Pillow System dep: tesseract-ocr added to both Dockerfiles Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
203 lines
6.4 KiB
Python
203 lines
6.4 KiB
Python
"""Document and image text extraction — PDF parsing, OCR, vision API."""
|
|
|
|
import io
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from app.config import OPENAI_API_KEY, OPENAI_MODEL
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def extract_pdf_text(pdf_bytes: bytes) -> dict:
|
|
"""Extract text from a PDF. Returns {text, page_count, needs_ocr, pages_text}."""
|
|
import fitz # pymupdf
|
|
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
pages_text = []
|
|
full_text = []
|
|
|
|
for page in doc:
|
|
text = page.get_text("text").strip()
|
|
pages_text.append(text)
|
|
full_text.append(text)
|
|
|
|
doc.close()
|
|
combined = "\n\n".join(full_text).strip()
|
|
|
|
# If very little text extracted, it's probably a scanned PDF
|
|
needs_ocr = len(combined) < 50 and len(pages_text) > 0
|
|
|
|
return {
|
|
"text": combined,
|
|
"page_count": len(pages_text),
|
|
"needs_ocr": needs_ocr,
|
|
"pages_text": pages_text,
|
|
}
|
|
|
|
|
|
def render_pdf_first_page(pdf_bytes: bytes) -> bytes | None:
|
|
"""Render the first page of a PDF as a PNG image. Returns PNG bytes."""
|
|
import fitz
|
|
|
|
try:
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
if len(doc) == 0:
|
|
return None
|
|
page = doc[0]
|
|
# Render at 2x resolution for quality
|
|
mat = fitz.Matrix(2, 2)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
png_bytes = pix.tobytes("png")
|
|
doc.close()
|
|
return png_bytes
|
|
except Exception as e:
|
|
log.error(f"Failed to render PDF page: {e}")
|
|
return None
|
|
|
|
|
|
def render_pdf_pages(pdf_bytes: bytes, max_pages: int = 5) -> list[bytes]:
|
|
"""Render multiple PDF pages as PNG images."""
|
|
import fitz
|
|
|
|
images = []
|
|
try:
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
for i, page in enumerate(doc):
|
|
if i >= max_pages:
|
|
break
|
|
mat = fitz.Matrix(2, 2)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
images.append(pix.tobytes("png"))
|
|
doc.close()
|
|
except Exception as e:
|
|
log.error(f"Failed to render PDF pages: {e}")
|
|
return images
|
|
|
|
|
|
def ocr_image(image_bytes: bytes) -> str:
|
|
"""Run Tesseract OCR on an image. Returns extracted text."""
|
|
import pytesseract
|
|
from PIL import Image
|
|
|
|
try:
|
|
img = Image.open(io.BytesIO(image_bytes))
|
|
text = pytesseract.image_to_string(img)
|
|
return text.strip()
|
|
except Exception as e:
|
|
log.error(f"OCR failed: {e}")
|
|
return ""
|
|
|
|
|
|
def ocr_pdf(pdf_bytes: bytes) -> str:
|
|
"""OCR a scanned PDF by rendering pages to images then running Tesseract."""
|
|
pages = render_pdf_pages(pdf_bytes, max_pages=10)
|
|
all_text = []
|
|
for i, page_img in enumerate(pages):
|
|
log.info(f"OCR page {i + 1}/{len(pages)}")
|
|
text = ocr_image(page_img)
|
|
if text:
|
|
all_text.append(text)
|
|
return "\n\n".join(all_text)
|
|
|
|
|
|
async def describe_image_with_vision(image_bytes: bytes, content_type: str = "image/png") -> str:
|
|
"""Use OpenAI Vision API to describe an image. Returns description text."""
|
|
if not OPENAI_API_KEY:
|
|
log.warning("No OPENAI_API_KEY, skipping vision description")
|
|
return ""
|
|
|
|
import base64
|
|
b64 = base64.b64encode(image_bytes).decode("utf-8")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.post(
|
|
"https://api.openai.com/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
|
|
json={
|
|
"model": OPENAI_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Describe this image in detail. Extract ALL text visible in the image. If it's a document, receipt, form, or screenshot, transcribe the text content. If it's a photo, describe what you see."
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{content_type};base64,{b64}",
|
|
"detail": "high"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"max_tokens": 2000,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data["choices"][0]["message"]["content"].strip()
|
|
except Exception as e:
|
|
log.error(f"Vision API failed: {e}")
|
|
return ""
|
|
|
|
|
|
def extract_text_from_file(file_bytes: bytes, content_type: str, filename: str) -> dict:
|
|
"""Extract text from any supported file type.
|
|
|
|
Returns {text, method, page_count, screenshot_png}.
|
|
"""
|
|
ct = content_type.lower()
|
|
fname = filename.lower()
|
|
|
|
# PDF
|
|
if ct == "application/pdf" or fname.endswith(".pdf"):
|
|
result = extract_pdf_text(file_bytes)
|
|
text = result["text"]
|
|
method = "pdf_text"
|
|
|
|
# If scanned (no text), try OCR
|
|
if result["needs_ocr"]:
|
|
log.info("PDF has no selectable text, running OCR...")
|
|
text = ocr_pdf(file_bytes)
|
|
method = "pdf_ocr"
|
|
|
|
screenshot = render_pdf_first_page(file_bytes)
|
|
|
|
return {
|
|
"text": text,
|
|
"method": method,
|
|
"page_count": result["page_count"],
|
|
"screenshot_png": screenshot,
|
|
}
|
|
|
|
# Images
|
|
if ct.startswith("image/"):
|
|
# Try OCR first
|
|
ocr_text = ocr_image(file_bytes)
|
|
method = "image_ocr"
|
|
|
|
return {
|
|
"text": ocr_text,
|
|
"method": method,
|
|
"page_count": None,
|
|
"screenshot_png": None, # The image itself is the "screenshot"
|
|
}
|
|
|
|
# Plain text files
|
|
if ct.startswith("text/") or fname.endswith((".txt", ".md", ".csv")):
|
|
try:
|
|
text = file_bytes.decode("utf-8")
|
|
return {"text": text, "method": "text_decode", "page_count": None, "screenshot_png": None}
|
|
except UnicodeDecodeError:
|
|
return {"text": "", "method": "decode_failed", "page_count": None, "screenshot_png": None}
|
|
|
|
# Unsupported
|
|
return {"text": "", "method": "unsupported", "page_count": None, "screenshot_png": None}
|