"""Worker tasks — feed fetching, local folder scanning, scheduling loop.""" from __future__ import annotations import logging import os import time import uuid from datetime import datetime from time import mktime import feedparser import httpx from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session, sessionmaker from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, AUDIO_EXTENSIONS logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger(__name__) # Sync engine for worker engine = create_engine(DATABASE_URL_SYNC, pool_size=5, max_overflow=2) SessionLocal = sessionmaker(bind=engine) def _parse_duration(value: str) -> int | None: if not value: return None parts = value.strip().split(":") try: if len(parts) == 3: return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) elif len(parts) == 2: return int(parts[0]) * 60 + int(parts[1]) else: return int(float(parts[0])) except (ValueError, IndexError): return None def fetch_feed(feed_url: str, etag: str = None, last_modified: str = None): """Fetch and parse an RSS feed synchronously.""" headers = {} if etag: headers["If-None-Match"] = etag if last_modified: headers["If-Modified-Since"] = last_modified with httpx.Client(timeout=30, follow_redirects=True) as client: resp = client.get(feed_url, headers=headers) if resp.status_code == 304: return None, None, None resp.raise_for_status() feed = feedparser.parse(resp.text) new_etag = resp.headers.get("ETag") new_lm = resp.headers.get("Last-Modified") return feed, new_etag, new_lm def refresh_rss_show(session: Session, show_id: str, feed_url: str, etag: str, last_modified: str, user_id: str): """Refresh a single RSS show, inserting new episodes.""" try: feed, new_etag, new_lm = fetch_feed(feed_url, etag, last_modified) except Exception as e: log.error("Failed to fetch feed %s: %s", feed_url, e) return 0 if feed is None: log.debug("Feed %s not modified", feed_url) return 0 # Update show metadata session.execute( text(""" UPDATE media_shows SET etag = :etag, last_modified = :lm, last_fetched_at = NOW() WHERE id = :id """), {"etag": new_etag, "lm": new_lm, "id": show_id}, ) # Get existing guids rows = session.execute( text("SELECT guid FROM media_episodes WHERE show_id = :sid"), {"sid": show_id}, ).fetchall() existing_guids = {r[0] for r in rows} new_count = 0 for entry in feed.entries: audio_url = None file_size = None for enc in getattr(entry, "enclosures", []): if enc.get("type", "").startswith("audio/") or enc.get("href", "").split("?")[0].endswith( (".mp3", ".m4a", ".ogg", ".opus") ): audio_url = enc.get("href") file_size = int(enc.get("length", 0)) or None break if not audio_url: for link in getattr(entry, "links", []): if link.get("type", "").startswith("audio/"): audio_url = link.get("href") file_size = int(link.get("length", 0)) or None break if not audio_url: continue guid = getattr(entry, "id", None) or audio_url if guid in existing_guids: continue duration = None itunes_duration = getattr(entry, "itunes_duration", None) if itunes_duration: duration = _parse_duration(str(itunes_duration)) published = None if hasattr(entry, "published_parsed") and entry.published_parsed: try: published = datetime.fromtimestamp(mktime(entry.published_parsed)) except (TypeError, ValueError, OverflowError): pass ep_artwork = None itunes_image = getattr(entry, "itunes_image", None) if itunes_image and isinstance(itunes_image, dict): ep_artwork = itunes_image.get("href") session.execute( text(""" INSERT INTO media_episodes (id, show_id, user_id, title, description, audio_url, duration_seconds, file_size_bytes, published_at, guid, artwork_url) VALUES (:id, :show_id, :user_id, :title, :desc, :audio_url, :dur, :size, :pub, :guid, :art) ON CONFLICT (show_id, guid) DO NOTHING """), { "id": str(uuid.uuid4()), "show_id": show_id, "user_id": user_id, "title": getattr(entry, "title", None), "desc": getattr(entry, "summary", None), "audio_url": audio_url, "dur": duration, "size": file_size, "pub": published, "guid": guid, "art": ep_artwork, }, ) new_count += 1 session.commit() if new_count: log.info("Show %s: added %d new episodes", show_id, new_count) return new_count def refresh_local_show(session: Session, show_id: str, local_path: str, user_id: str): """Re-scan a local folder for new audio files.""" if not os.path.isdir(local_path): log.warning("Local path does not exist: %s", local_path) return 0 rows = session.execute( text("SELECT guid FROM media_episodes WHERE show_id = :sid"), {"sid": show_id}, ).fetchall() existing_guids = {r[0] for r in rows} new_count = 0 for fname in sorted(os.listdir(local_path)): ext = os.path.splitext(fname)[1].lower() if ext not in AUDIO_EXTENSIONS: continue fpath = os.path.join(local_path, fname) if not os.path.isfile(fpath): continue guid = f"local:{fpath}" if guid in existing_guids: continue title = os.path.splitext(fname)[0] duration = None file_size = os.path.getsize(fpath) try: from mutagen import File as MutagenFile audio = MutagenFile(fpath) if audio and audio.info: duration = int(audio.info.length) if audio and audio.tags: for tag_key in ("title", "TIT2", "\xa9nam"): tag_val = audio.tags.get(tag_key) if tag_val: title = str(tag_val[0]) if isinstance(tag_val, list) else str(tag_val) break except Exception: pass stat = os.stat(fpath) published = datetime.fromtimestamp(stat.st_mtime) session.execute( text(""" INSERT INTO media_episodes (id, show_id, user_id, title, audio_url, duration_seconds, file_size_bytes, published_at, guid) VALUES (:id, :show_id, :user_id, :title, :audio_url, :dur, :size, :pub, :guid) ON CONFLICT (show_id, guid) DO NOTHING """), { "id": str(uuid.uuid4()), "show_id": show_id, "user_id": user_id, "title": title, "audio_url": fpath, "dur": duration, "size": file_size, "pub": published, "guid": guid, }, ) new_count += 1 if new_count: session.execute( text("UPDATE media_shows SET last_fetched_at = NOW() WHERE id = :id"), {"id": show_id}, ) session.commit() log.info("Local show %s: added %d new files", show_id, new_count) return new_count def refresh_all_shows(): """Refresh all shows — RSS and local.""" session = SessionLocal() try: rows = session.execute( text("SELECT id, user_id, feed_url, local_path, show_type, etag, last_modified FROM media_shows") ).fetchall() total_new = 0 for row in rows: sid, uid, feed_url, local_path, show_type, etag, lm = row if show_type == "podcast" and feed_url: total_new += refresh_rss_show(session, str(sid), feed_url, etag, lm, uid) elif show_type == "local" and local_path: total_new += refresh_local_show(session, str(sid), local_path, uid) if total_new: log.info("Feed refresh complete: %d new episodes total", total_new) except Exception: log.exception("Error during feed refresh") session.rollback() finally: session.close() def run_scheduler(): """Simple scheduler loop — refresh feeds at configured interval.""" log.info("Media worker started — refresh interval: %ds", FEED_FETCH_INTERVAL) # Wait for DB to be ready for attempt in range(10): try: with engine.connect() as conn: conn.execute(text("SELECT 1")) break except Exception: log.info("Waiting for database... (attempt %d)", attempt + 1) time.sleep(3) while True: try: refresh_all_shows() except Exception: log.exception("Scheduler loop error") time.sleep(FEED_FETCH_INTERVAL) if __name__ == "__main__": run_scheduler()