"""Show CRUD endpoints.""" from __future__ import annotations import logging import uuid from datetime import datetime from typing import Optional import feedparser import httpx from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy import select, func, delete from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.api.deps import get_user_id, get_db_session from app.models import Show, Episode, Progress log = logging.getLogger(__name__) router = APIRouter(prefix="/api/shows", tags=["shows"]) # ── Schemas ── class ShowCreate(BaseModel): feed_url: Optional[str] = None local_path: Optional[str] = None title: Optional[str] = None class ShowOut(BaseModel): id: str title: str author: Optional[str] = None description: Optional[str] = None artwork_url: Optional[str] = None feed_url: Optional[str] = None local_path: Optional[str] = None show_type: str episode_count: int = 0 unplayed_count: int = 0 created_at: Optional[str] = None updated_at: Optional[str] = None # ── Helpers ── def _parse_duration(value: str) -> Optional[int]: """Parse HH:MM:SS or MM:SS or seconds string to integer seconds.""" if not value: return None parts = value.strip().split(":") try: if len(parts) == 3: return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) elif len(parts) == 2: return int(parts[0]) * 60 + int(parts[1]) else: return int(float(parts[0])) except (ValueError, IndexError): return None async def _fetch_and_parse_feed(feed_url: str, etag: str = None, last_modified: str = None): """Fetch RSS feed and parse with feedparser.""" headers = {} if etag: headers["If-None-Match"] = etag if last_modified: headers["If-Modified-Since"] = last_modified async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: resp = await client.get(feed_url, headers=headers) if resp.status_code == 304: return None, None, None # Not modified resp.raise_for_status() feed = feedparser.parse(resp.text) new_etag = resp.headers.get("ETag") new_last_modified = resp.headers.get("Last-Modified") return feed, new_etag, new_last_modified def _extract_show_info(feed) -> dict: """Extract show metadata from parsed feed.""" f = feed.feed artwork = None if hasattr(f, "image") and f.image: artwork = getattr(f.image, "href", None) if not artwork and hasattr(f, "itunes_image"): artwork = f.get("itunes_image", {}).get("href") if isinstance(f.get("itunes_image"), dict) else None # Try another common location if not artwork: for link in getattr(f, "links", []): if link.get("rel") == "icon" or link.get("type", "").startswith("image/"): artwork = link.get("href") break return { "title": getattr(f, "title", "Unknown Show"), "author": getattr(f, "author", None) or getattr(f, "itunes_author", None), "description": getattr(f, "summary", None) or getattr(f, "subtitle", None), "artwork_url": artwork, } def _extract_episodes(feed, show_id: uuid.UUID, user_id: str) -> list[dict]: """Extract episodes from parsed feed.""" episodes = [] for entry in feed.entries: audio_url = None file_size = None for enc in getattr(entry, "enclosures", []): if enc.get("type", "").startswith("audio/") or enc.get("href", "").split("?")[0].endswith( (".mp3", ".m4a", ".ogg", ".opus") ): audio_url = enc.get("href") file_size = int(enc.get("length", 0)) or None break # Fallback: check links if not audio_url: for link in getattr(entry, "links", []): if link.get("type", "").startswith("audio/"): audio_url = link.get("href") file_size = int(link.get("length", 0)) or None break if not audio_url: continue # Skip entries without audio # Duration duration = None itunes_duration = getattr(entry, "itunes_duration", None) if itunes_duration: duration = _parse_duration(str(itunes_duration)) # Published date published = None if hasattr(entry, "published_parsed") and entry.published_parsed: try: from time import mktime published = datetime.fromtimestamp(mktime(entry.published_parsed)) except (TypeError, ValueError, OverflowError): pass # GUID guid = getattr(entry, "id", None) or audio_url # Episode artwork ep_artwork = None itunes_image = getattr(entry, "itunes_image", None) if itunes_image and isinstance(itunes_image, dict): ep_artwork = itunes_image.get("href") episodes.append({ "id": uuid.uuid4(), "show_id": show_id, "user_id": user_id, "title": getattr(entry, "title", None), "description": getattr(entry, "summary", None), "audio_url": audio_url, "duration_seconds": duration, "file_size_bytes": file_size, "published_at": published, "guid": guid, "artwork_url": ep_artwork, }) return episodes async def _scan_local_folder(local_path: str, show_id: uuid.UUID, user_id: str) -> list[dict]: """Scan a local folder for audio files and create episode dicts.""" import os from mutagen import File as MutagenFile from app.config import AUDIO_EXTENSIONS episodes = [] if not os.path.isdir(local_path): return episodes files = sorted(os.listdir(local_path)) for i, fname in enumerate(files): ext = os.path.splitext(fname)[1].lower() if ext not in AUDIO_EXTENSIONS: continue fpath = os.path.join(local_path, fname) if not os.path.isfile(fpath): continue # Read metadata with mutagen title = os.path.splitext(fname)[0] duration = None file_size = os.path.getsize(fpath) try: audio = MutagenFile(fpath) if audio and audio.info: duration = int(audio.info.length) # Try to get title from tags if audio and audio.tags: for tag_key in ("title", "TIT2", "\xa9nam"): tag_val = audio.tags.get(tag_key) if tag_val: title = str(tag_val[0]) if isinstance(tag_val, list) else str(tag_val) break except Exception: pass stat = os.stat(fpath) published = datetime.fromtimestamp(stat.st_mtime) episodes.append({ "id": uuid.uuid4(), "show_id": show_id, "user_id": user_id, "title": title, "description": None, "audio_url": fpath, "duration_seconds": duration, "file_size_bytes": file_size, "published_at": published, "guid": f"local:{fpath}", "artwork_url": None, }) return episodes # ── Endpoints ── @router.get("") async def list_shows( user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): """List user's shows with episode counts and unplayed counts.""" # Subquery: total episodes per show ep_count_sq = ( select( Episode.show_id, func.count(Episode.id).label("episode_count"), ) .where(Episode.user_id == user_id) .group_by(Episode.show_id) .subquery() ) # Subquery: episodes with completed progress played_sq = ( select( Episode.show_id, func.count(Progress.id).label("played_count"), ) .join(Progress, Progress.episode_id == Episode.id) .where(Episode.user_id == user_id, Progress.is_completed == True) # noqa: E712 .group_by(Episode.show_id) .subquery() ) stmt = ( select( Show, func.coalesce(ep_count_sq.c.episode_count, 0).label("episode_count"), func.coalesce(played_sq.c.played_count, 0).label("played_count"), ) .outerjoin(ep_count_sq, ep_count_sq.c.show_id == Show.id) .outerjoin(played_sq, played_sq.c.show_id == Show.id) .where(Show.user_id == user_id) .order_by(Show.title) ) result = await db.execute(stmt) rows = result.all() return [ { "id": str(show.id), "title": show.title, "author": show.author, "description": show.description, "artwork_url": show.artwork_url, "feed_url": show.feed_url, "local_path": show.local_path, "show_type": show.show_type, "episode_count": ep_count, "unplayed_count": ep_count - played_count, "created_at": show.created_at.isoformat() if show.created_at else None, "updated_at": show.updated_at.isoformat() if show.updated_at else None, } for show, ep_count, played_count in rows ] @router.post("", status_code=201) async def create_show( body: ShowCreate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): """Create a show from RSS feed or local folder.""" if not body.feed_url and not body.local_path: raise HTTPException(400, "Either feed_url or local_path is required") show_id = uuid.uuid4() if body.feed_url: # RSS podcast try: feed, etag, last_modified = await _fetch_and_parse_feed(body.feed_url) except Exception as e: log.error("Failed to fetch feed %s: %s", body.feed_url, e) raise HTTPException(400, f"Failed to fetch feed: {e}") if feed is None: raise HTTPException(400, "Feed returned no content") info = _extract_show_info(feed) show = Show( id=show_id, user_id=user_id, title=body.title or info["title"], author=info["author"], description=info["description"], artwork_url=info["artwork_url"], feed_url=body.feed_url, show_type="podcast", etag=etag, last_modified=last_modified, last_fetched_at=datetime.utcnow(), ) db.add(show) await db.flush() ep_dicts = _extract_episodes(feed, show_id, user_id) for ep_dict in ep_dicts: db.add(Episode(**ep_dict)) await db.commit() await db.refresh(show) return { "id": str(show.id), "title": show.title, "show_type": show.show_type, "episode_count": len(ep_dicts), } else: # Local folder if not body.title: raise HTTPException(400, "title is required for local shows") show = Show( id=show_id, user_id=user_id, title=body.title, local_path=body.local_path, show_type="local", last_fetched_at=datetime.utcnow(), ) db.add(show) await db.flush() ep_dicts = await _scan_local_folder(body.local_path, show_id, user_id) for ep_dict in ep_dicts: db.add(Episode(**ep_dict)) await db.commit() await db.refresh(show) return { "id": str(show.id), "title": show.title, "show_type": show.show_type, "episode_count": len(ep_dicts), } @router.get("/{show_id}") async def get_show( show_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): """Get show details with episodes.""" show = await db.get(Show, uuid.UUID(show_id)) if not show or show.user_id != user_id: raise HTTPException(404, "Show not found") # Fetch episodes with progress stmt = ( select(Episode, Progress) .outerjoin(Progress, (Progress.episode_id == Episode.id) & (Progress.user_id == user_id)) .where(Episode.show_id == show.id) .order_by(Episode.published_at.desc().nullslast()) ) result = await db.execute(stmt) rows = result.all() episodes = [] for ep, prog in rows: episodes.append({ "id": str(ep.id), "title": ep.title, "description": ep.description, "audio_url": ep.audio_url, "duration_seconds": ep.duration_seconds, "file_size_bytes": ep.file_size_bytes, "published_at": ep.published_at.isoformat() if ep.published_at else None, "artwork_url": ep.artwork_url, "progress": { "position_seconds": prog.position_seconds, "is_completed": prog.is_completed, "playback_speed": prog.playback_speed, "last_played_at": prog.last_played_at.isoformat() if prog.last_played_at else None, } if prog else None, }) return { "id": str(show.id), "title": show.title, "author": show.author, "description": show.description, "artwork_url": show.artwork_url, "feed_url": show.feed_url, "local_path": show.local_path, "show_type": show.show_type, "last_fetched_at": show.last_fetched_at.isoformat() if show.last_fetched_at else None, "created_at": show.created_at.isoformat() if show.created_at else None, "episodes": episodes, } @router.delete("/{show_id}", status_code=204) async def delete_show( show_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): """Delete a show and all its episodes.""" show = await db.get(Show, uuid.UUID(show_id)) if not show or show.user_id != user_id: raise HTTPException(404, "Show not found") await db.delete(show) await db.commit() @router.post("/{show_id}/refresh") async def refresh_show( show_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db_session), ): """Re-fetch RSS feed or re-scan local folder for new episodes.""" show = await db.get(Show, uuid.UUID(show_id)) if not show or show.user_id != user_id: raise HTTPException(404, "Show not found") new_count = 0 if show.show_type == "podcast" and show.feed_url: try: feed, etag, last_modified = await _fetch_and_parse_feed( show.feed_url, show.etag, show.last_modified ) except Exception as e: raise HTTPException(400, f"Failed to fetch feed: {e}") if feed is None: return {"new_episodes": 0, "message": "Feed not modified"} info = _extract_show_info(feed) show.title = info["title"] or show.title show.author = info["author"] or show.author show.description = info["description"] or show.description show.artwork_url = info["artwork_url"] or show.artwork_url show.etag = etag show.last_modified = last_modified show.last_fetched_at = datetime.utcnow() ep_dicts = _extract_episodes(feed, show.id, user_id) # Get existing guids existing = await db.execute( select(Episode.guid).where(Episode.show_id == show.id) ) existing_guids = {row[0] for row in existing.all()} for ep_dict in ep_dicts: if ep_dict["guid"] not in existing_guids: db.add(Episode(**ep_dict)) new_count += 1 elif show.show_type == "local" and show.local_path: ep_dicts = await _scan_local_folder(show.local_path, show.id, user_id) existing = await db.execute( select(Episode.guid).where(Episode.show_id == show.id) ) existing_guids = {row[0] for row in existing.all()} for ep_dict in ep_dicts: if ep_dict["guid"] not in existing_guids: db.add(Episode(**ep_dict)) new_count += 1 show.last_fetched_at = datetime.utcnow() await db.commit() return {"new_episodes": new_count}